diff --git a/avro/src/main/java/com/pluralsight/hydra/avro/JsonConverter.java b/avro/src/main/java/com/pluralsight/hydra/avro/JsonConverter.java index ab718c005..d412e22cc 100644 --- a/avro/src/main/java/com/pluralsight/hydra/avro/JsonConverter.java +++ b/avro/src/main/java/com/pluralsight/hydra/avro/JsonConverter.java @@ -283,7 +283,10 @@ private Object typeConvert(Object value, String name, Schema schema) throws IOEx case STRING: return value.toString(); case ENUM: - boolean valid = schema.getEnumSymbols().contains(value); + if (name.equals("dataClassification")) { + value = adaptOldDataClassificationValue(value); + } + boolean valid = schema.getEnumSymbols().contains(value.toString()); if (!valid) throw new IllegalArgumentException(value + " is not a valid symbol. Possible values are: " + schema.getEnumSymbols() + "."); @@ -335,4 +338,26 @@ private Map cleanAvro(Map oldRaw) { } return newMap; } + + private String adaptOldDataClassificationValue(Object oldValue) { + String newValue; + + switch (oldValue.toString()) { + case "InternalUseOnly": + newValue = "InternalUse"; + break; + case "ConfidentialPII": + newValue = "Confidential"; + break; + case "RestrictedFinancial": + case "RestrictedEmployeeData": + newValue = "Restricted"; + break; + default: + newValue = oldValue.toString(); + break; + } + + return newValue; + } } \ No newline at end of file diff --git a/core/src/main/scala/hydra/core/marshallers/HydraJsonSupport.scala b/core/src/main/scala/hydra/core/marshallers/HydraJsonSupport.scala index ca863dc51..b357e044f 100644 --- a/core/src/main/scala/hydra/core/marshallers/HydraJsonSupport.scala +++ b/core/src/main/scala/hydra/core/marshallers/HydraJsonSupport.scala @@ -158,7 +158,7 @@ trait HydraJsonSupport extends SprayJsonSupport with DefaultJsonProtocol { implicit val genericErrorFormat = jsonFormat2(GenericError) - implicit val topicCreationMetadataFormat = jsonFormat9(TopicMetadataRequest) + implicit val topicCreationMetadataFormat = jsonFormat10(TopicMetadataRequest) implicit val genericSchemaFormat = jsonFormat2(GenericSchema) @@ -172,6 +172,7 @@ case class TopicMetadataRequest( derived: Boolean, deprecated: Option[Boolean], dataClassification: String, + subDataClassification: Option[String], contact: String, additionalDocumentation: Option[String], notes: Option[String], diff --git a/ingest/src/main/scala/hydra.ingest/modules/Bootstrap.scala b/ingest/src/main/scala/hydra.ingest/modules/Bootstrap.scala index eb4c4e756..5c26a9f9d 100644 --- a/ingest/src/main/scala/hydra.ingest/modules/Bootstrap.scala +++ b/ingest/src/main/scala/hydra.ingest/modules/Bootstrap.scala @@ -1,7 +1,6 @@ package hydra.ingest.modules import java.time.Instant - import cats.data.NonEmptyList import cats.effect.Sync import cats.syntax.all._ @@ -11,6 +10,7 @@ import hydra.kafka.model._ import hydra.kafka.programs.CreateTopicProgram import hydra.kafka.util.KafkaUtils.TopicDetails import hydra.kafka.algebras.{HydraTag, KafkaAdminAlgebra} +import hydra.kafka.model.DataClassification.InternalUse import hydra.kafka.model.TopicMetadataV2Request.Subject final class Bootstrap[F[_]: MonadError[*[_], Throwable]] private ( @@ -55,7 +55,8 @@ final class Bootstrap[F[_]: MonadError[*[_], Throwable]] private ( StreamTypeV2.Entity, deprecated = false, None, - InternalUseOnly, + InternalUse, + Some(SubDataClassification.InternalUseOnly), NonEmptyList.of(cfg.contactMethod), Instant.now, List.empty, @@ -84,7 +85,8 @@ final class Bootstrap[F[_]: MonadError[*[_], Throwable]] private ( StreamTypeV2.Entity, deprecated = false, None, - InternalUseOnly, + InternalUse, + Some(SubDataClassification.InternalUseOnly), NonEmptyList.of(dvsConsumersTopicConfig.contactMethod), Instant.now, List.empty, @@ -116,7 +118,8 @@ final class Bootstrap[F[_]: MonadError[*[_], Throwable]] private ( StreamTypeV2.Entity, deprecated = false, None, - InternalUseOnly, + InternalUse, + Some(SubDataClassification.InternalUseOnly), NonEmptyList.of(cooTopicConfig.contactMethod), Instant.now, List.empty, @@ -147,7 +150,8 @@ final class Bootstrap[F[_]: MonadError[*[_], Throwable]] private ( StreamTypeV2.Entity, deprecated = false, None, - InternalUseOnly, + InternalUse, + Some(SubDataClassification.InternalUseOnly), NonEmptyList.of(cooTopicConfig.contactMethod), Instant.now, List.empty, diff --git a/ingest/src/test/scala/hydra/ingest/http/SchemasEndpointSpec.scala b/ingest/src/test/scala/hydra/ingest/http/SchemasEndpointSpec.scala index 5c49d6137..628915074 100644 --- a/ingest/src/test/scala/hydra/ingest/http/SchemasEndpointSpec.scala +++ b/ingest/src/test/scala/hydra/ingest/http/SchemasEndpointSpec.scala @@ -59,6 +59,7 @@ class SchemasEndpointSpec false, Some(false), "Public", + Some("Public"), "test_contact", Some("test_additionalDocumentation"), Some("test_notes"), diff --git a/ingest/src/test/scala/hydra/ingest/programs/TopicDeletionProgramSpec.scala b/ingest/src/test/scala/hydra/ingest/programs/TopicDeletionProgramSpec.scala index 448e5b34b..e4e4d3f51 100644 --- a/ingest/src/test/scala/hydra/ingest/programs/TopicDeletionProgramSpec.scala +++ b/ingest/src/test/scala/hydra/ingest/programs/TopicDeletionProgramSpec.scala @@ -15,6 +15,7 @@ import hydra.kafka.algebras.KafkaAdminAlgebra._ import hydra.kafka.algebras.MetadataAlgebra.TopicMetadataContainer import hydra.kafka.algebras.{KafkaAdminAlgebra, KafkaClientAlgebra, TestConsumerGroupsAlgebra, TestMetadataAlgebra} import hydra.kafka.model.ContactMethod.Email +import hydra.kafka.model.DataClassification.Public import hydra.kafka.model.TopicConsumer.{TopicConsumerKey, TopicConsumerValue} import hydra.kafka.model.TopicMetadataV2Request.Subject import hydra.kafka.model._ @@ -138,6 +139,7 @@ class TopicDeletionProgramSpec extends AnyFlatSpec with Matchers { deprecated = deprecated, deprecatedDate, Public, + None, NonEmptyList.of(Email.create(email).get), createdDate, List.empty, diff --git a/ingest/src/test/scala/hydra/ingest/utils/TopicUtils.scala b/ingest/src/test/scala/hydra/ingest/utils/TopicUtils.scala index 596019bf4..aeb1e10e9 100644 --- a/ingest/src/test/scala/hydra/ingest/utils/TopicUtils.scala +++ b/ingest/src/test/scala/hydra/ingest/utils/TopicUtils.scala @@ -6,6 +6,7 @@ import cats.implicits._ import hydra.kafka.algebras.MetadataAlgebra.TopicMetadataContainer import hydra.kafka.algebras.TestMetadataAlgebra import hydra.kafka.model.ContactMethod.Email +import hydra.kafka.model.DataClassification.Public import hydra.kafka.model.TopicMetadataV2Request.Subject import hydra.kafka.model._ import org.apache.avro.SchemaBuilder @@ -25,6 +26,7 @@ object TopicUtils { deprecated = false, deprecatedDate = None, Public, + None, NonEmptyList.of(Email.create("test@test.com").get), Instant.now(), List.empty, diff --git a/ingestors/kafka/src/main/resources/HydraMetadataTopic.avsc b/ingestors/kafka/src/main/resources/HydraMetadataTopic.avsc index 49e86f604..3c0ef8e7a 100644 --- a/ingestors/kafka/src/main/resources/HydraMetadataTopic.avsc +++ b/ingestors/kafka/src/main/resources/HydraMetadataTopic.avsc @@ -38,9 +38,28 @@ "type": { "name": "dataClassificationEnum", "type": "enum", - "symbols": ["Public", "InternalUseOnly", "ConfidentialPII", "RestrictedFinancial","RestrictedEmployeeData"] + "symbols": ["Public", "InternalUse", "Confidential", "Restricted"] } }, + { + "name": "subDataClassification", + "type": [ + "null", + { + "type": "enum", + "name": "SubDataClassification", + "namespace": "hydra.kafka.model", + "symbols": [ + "Public", + "InternalUseOnly", + "ConfidentialPII", + "RestrictedFinancial", + "RestrictedEmployeeData" + ] + } + ], + "default": null + }, { "name": "derived", "type": "boolean" diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/model/DataClassification.scala b/ingestors/kafka/src/main/scala/hydra/kafka/model/DataClassification.scala new file mode 100644 index 000000000..ca1efb097 --- /dev/null +++ b/ingestors/kafka/src/main/scala/hydra/kafka/model/DataClassification.scala @@ -0,0 +1,30 @@ +package hydra.kafka.model + +import enumeratum.{Enum, EnumEntry} + +import scala.collection.immutable + +sealed trait DataClassification extends EnumEntry + +object DataClassification extends Enum[DataClassification] { + + case object Public extends DataClassification + case object InternalUse extends DataClassification + case object Confidential extends DataClassification + case object Restricted extends DataClassification + + override val values: immutable.IndexedSeq[DataClassification] = findValues +} + +sealed trait SubDataClassification extends EnumEntry + +object SubDataClassification extends Enum[SubDataClassification] { + + case object Public extends SubDataClassification + case object InternalUseOnly extends SubDataClassification + case object ConfidentialPII extends SubDataClassification + case object RestrictedFinancial extends SubDataClassification + case object RestrictedEmployeeData extends SubDataClassification + + override val values: immutable.IndexedSeq[SubDataClassification] = findValues +} diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/model/TopicMetadata.scala b/ingestors/kafka/src/main/scala/hydra/kafka/model/TopicMetadata.scala index 6c822325c..649f4ca83 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/model/TopicMetadata.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/model/TopicMetadata.scala @@ -2,7 +2,6 @@ package hydra.kafka.model import java.time.Instant import java.util.UUID - import cats.data.Validated.{Invalid, Valid} import cats.data.{NonEmptyList, Validated} import cats.syntax.all._ @@ -10,6 +9,7 @@ import cats.{Applicative, ApplicativeError, Monad, MonadError} import fs2.kafka.Headers import hydra.avro.convert.{ISODateConverter, IsoDate} import hydra.core.marshallers._ +import hydra.kafka.model.DataClassification._ import hydra.kafka.model.TopicMetadataV2Request.Subject import org.apache.avro.generic.GenericRecord import org.apache.avro.io.{Encoder, EncoderFactory} @@ -31,6 +31,7 @@ case class TopicMetadata( derived: Boolean, deprecated: Option[Boolean], dataClassification: String, + subDataClassification: Option[String], contact: String, additionalDocumentation: Option[String], notes: Option[String], @@ -145,6 +146,7 @@ final case class TopicMetadataV2ValueOptionalTagList( deprecated: Boolean, deprecatedDate: Option[Instant], dataClassification: DataClassification, + subDataClassification: Option[SubDataClassification], contact: NonEmptyList[ContactMethod], createdDate: Instant, parentSubjects: List[String], @@ -160,6 +162,7 @@ final case class TopicMetadataV2ValueOptionalTagList( deprecated, deprecatedDate, dataClassification, + subDataClassification, contact, createdDate, parentSubjects, @@ -178,6 +181,7 @@ final case class TopicMetadataV2Value( deprecated: Boolean, deprecatedDate: Option[Instant], dataClassification: DataClassification, + subDataClassification: Option[SubDataClassification], contact: NonEmptyList[ContactMethod], createdDate: Instant, parentSubjects: List[String], @@ -193,6 +197,7 @@ final case class TopicMetadataV2Value( deprecated, deprecatedDate, dataClassification, + subDataClassification, contact, createdDate, parentSubjects, @@ -225,6 +230,29 @@ object TopicMetadataV2ValueOptionalTagList { implicit val dataClassificationCodec: Codec[DataClassification] = Codec.deriveEnum[DataClassification]( + symbols = List( + "Public", + "InternalUse", + "Confidential", + "Restricted" + ), + encode = { + case Public => "Public" + case InternalUse => "InternalUse" + case Confidential => "Confidential" + case Restricted => "Restricted" + }, + decode = { + case "Public" => Right(Public) + case "InternalUse" => Right(InternalUse) + case "Confidential" => Right(Confidential) + case "Restricted" => Right(Restricted) + case other => Left(AvroError(s"$other is not a DataClassification. Valid value is one of: ${DataClassification.values}")) + } + ) + + implicit val subDataClassificationCodec: Codec[SubDataClassification] = + Codec.deriveEnum[SubDataClassification]( symbols = List( "Public", "InternalUseOnly", @@ -233,19 +261,19 @@ object TopicMetadataV2ValueOptionalTagList { "RestrictedEmployeeData" ), encode = { - case Public => "Public" - case InternalUseOnly => "InternalUseOnly" - case ConfidentialPII => "ConfidentialPII" - case RestrictedFinancial => "RestrictedFinancial" - case RestrictedEmployeeData => "RestrictedEmployeeData" + case SubDataClassification.Public => "Public" + case SubDataClassification.InternalUseOnly => "InternalUseOnly" + case SubDataClassification.ConfidentialPII => "ConfidentialPII" + case SubDataClassification.RestrictedFinancial => "RestrictedFinancial" + case SubDataClassification.RestrictedEmployeeData => "RestrictedEmployeeData" }, decode = { - case "Public" => Right(Public) - case "InternalUseOnly" => Right(InternalUseOnly) - case "ConfidentialPII" => Right(ConfidentialPII) - case "RestrictedFinancial" => Right(RestrictedFinancial) - case "RestrictedEmployeeData" => Right(RestrictedEmployeeData) - case other => Left(AvroError(s"$other is not a DataClassification")) + case "Public" => Right(SubDataClassification.Public) + case "InternalUseOnly" => Right(SubDataClassification.InternalUseOnly) + case "ConfidentialPII" => Right(SubDataClassification.ConfidentialPII) + case "RestrictedFinancial" => Right(SubDataClassification.RestrictedFinancial) + case "RestrictedEmployeeData" => Right(SubDataClassification.RestrictedEmployeeData) + case other => Left(AvroError(s"$other is not a SubDataClassification. Valid value is one of: ${SubDataClassification.values}")) } ) @@ -288,6 +316,7 @@ object TopicMetadataV2ValueOptionalTagList { field("deprecated", _.deprecated), field("deprecatedDate", _.deprecatedDate, default = Some(None)), field("dataClassification", _.dataClassification), + field("subDataClassification", _.subDataClassification, default = Some(None)), field("contact", _.contact), field("createdDate", _.createdDate), field("parentSubjects", _.parentSubjects), diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/model/TopicMetadataAdapter.scala b/ingestors/kafka/src/main/scala/hydra/kafka/model/TopicMetadataAdapter.scala index 0136bb167..4afdb51bb 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/model/TopicMetadataAdapter.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/model/TopicMetadataAdapter.scala @@ -9,7 +9,7 @@ import spray.json._ */ trait TopicMetadataAdapter extends HydraJsonSupport { - implicit val topicMetadataFormat = jsonFormat12(TopicMetadata) + implicit val topicMetadataFormat = jsonFormat13(TopicMetadata) def streamLink(rel: String, id: String) = rel -> Link(href = s"/streams/$id") diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/model/TopicMetadataV2Transport.scala b/ingestors/kafka/src/main/scala/hydra/kafka/model/TopicMetadataV2Transport.scala index 0c9c76844..59e0452bf 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/model/TopicMetadataV2Transport.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/model/TopicMetadataV2Transport.scala @@ -14,18 +14,6 @@ import shapeless.Witness.Lt import java.time.Instant -sealed trait DataClassification - -case object Public extends DataClassification - -case object InternalUseOnly extends DataClassification - -case object ConfidentialPII extends DataClassification - -case object RestrictedFinancial extends DataClassification - -case object RestrictedEmployeeData extends DataClassification - sealed trait ContactMethod object ContactMethod { @@ -79,6 +67,7 @@ final case class TopicMetadataV2Request( deprecated: Boolean, deprecatedDate: Option[Instant], dataClassification: DataClassification, + subDataClassification: Option[SubDataClassification], contact: NonEmptyList[ContactMethod], createdDate: Instant, parentSubjects: List[String], @@ -96,6 +85,7 @@ final case class TopicMetadataV2Request( deprecated, deprecatedDate, dataClassification, + subDataClassification, contact, createdDate, parentSubjects, @@ -141,6 +131,7 @@ object TopicMetadataV2Request { mor.deprecated, mor.deprecatedDate, mor.dataClassification, + mor.subDataClassification, mor.contact, mor.createdDate, mor.parentSubjects, @@ -164,6 +155,7 @@ final case class TopicMetadataV2Response( deprecated: Boolean, deprecatedDate: Option[Instant], dataClassification: DataClassification, + subDataClassification: Option[SubDataClassification], contact: NonEmptyList[ContactMethod], createdDate: Instant, parentSubjects: List[String], @@ -183,6 +175,7 @@ object TopicMetadataV2Response { v.deprecated, v.deprecatedDate, v.dataClassification, + v.subDataClassification, v.contact, v.createdDate, v.parentSubjects, @@ -198,6 +191,7 @@ final case class MetadataOnlyRequest(streamType: StreamTypeV2, deprecated: Boolean, deprecatedDate: Option[Instant], dataClassification: DataClassification, + subDataClassification: Option[SubDataClassification], contact: NonEmptyList[ContactMethod], createdDate: Instant, parentSubjects: List[String], diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/programs/CreateTopicProgram.scala b/ingestors/kafka/src/main/scala/hydra/kafka/programs/CreateTopicProgram.scala index d4f45dbad..f7fe85b2b 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/programs/CreateTopicProgram.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/programs/CreateTopicProgram.scala @@ -24,7 +24,8 @@ final class CreateTopicProgram[F[_]: Bracket[*[_], Throwable]: Sleep: Logger] pr retryPolicy: RetryPolicy[F], v2MetadataTopicName: Subject, metadataAlgebra: MetadataAlgebra[F], - schemaValidator: KeyAndValueSchemaV2Validator[F] + schemaValidator: KeyAndValueSchemaV2Validator[F], + metadataValidator: TopicMetadataV2Validator[F] ) (implicit eff: Sync[F]){ private def onFailure(resourceTried: String): (Throwable, RetryDetails) => F[Unit] = { @@ -106,6 +107,7 @@ final class CreateTopicProgram[F[_]: Bracket[*[_], Throwable]: Sleep: Logger] pr createTopicRequest: TopicMetadataV2Request, ): F[Unit] = { for { + _ <- metadataValidator.validate(createTopicRequest) metadata <- metadataAlgebra.getMetadataFor(topicName) createdDate = metadata.map(_.value.createdDate).getOrElse(createTopicRequest.createdDate) deprecatedDate = metadata.map(_.value.deprecatedDate).getOrElse(createTopicRequest.deprecatedDate) match { @@ -188,7 +190,8 @@ object CreateTopicProgram { retryPolicy, v2MetadataTopicName, metadataAlgebra, - KeyAndValueSchemaV2Validator.make(schemaRegistry, metadataAlgebra) + KeyAndValueSchemaV2Validator.make(schemaRegistry, metadataAlgebra), + new TopicMetadataV2Validator() ) } diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/programs/TopicMetadataError.scala b/ingestors/kafka/src/main/scala/hydra/kafka/programs/TopicMetadataError.scala new file mode 100644 index 000000000..deaede4dd --- /dev/null +++ b/ingestors/kafka/src/main/scala/hydra/kafka/programs/TopicMetadataError.scala @@ -0,0 +1,19 @@ +package hydra.kafka.programs + +import hydra.common.validation.ValidationError +import hydra.kafka.model.SubDataClassification + +sealed trait TopicMetadataError extends ValidationError + +object TopicMetadataError { + + case class InvalidSubDataClassificationTypeError(dataClassification: String, + subDataClassification: String, + supportedValues: Seq[SubDataClassification]) extends TopicMetadataError { + + override def message: String = { + val validValues = s"Valid value is ${if (supportedValues.size == 1) s"'${supportedValues.head}'" else s"one of [${supportedValues.mkString(", ")}]"}." + s"'$subDataClassification' is not a valid SubDataClassification value for '$dataClassification' DataClassification. $validValues" + } + } +} diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/programs/TopicMetadataV2Validator.scala b/ingestors/kafka/src/main/scala/hydra/kafka/programs/TopicMetadataV2Validator.scala new file mode 100644 index 000000000..1cd02015f --- /dev/null +++ b/ingestors/kafka/src/main/scala/hydra/kafka/programs/TopicMetadataV2Validator.scala @@ -0,0 +1,38 @@ +package hydra.kafka.programs + +import cats.effect.Sync +import cats.syntax.all._ +import hydra.common.validation.Validator +import hydra.kafka.model.DataClassification._ +import hydra.kafka.model.{DataClassification, SubDataClassification, TopicMetadataV2Request} + +class TopicMetadataV2Validator[F[_] : Sync] () extends Validator { + + def validate(metadataV2Request: TopicMetadataV2Request): F[Unit] = + for { + _ <- validateSubDataClassification(metadataV2Request.dataClassification, metadataV2Request.subDataClassification) + } yield () + + private def validateSubDataClassification(dataClassification: DataClassification, + subDataClassification: Option[SubDataClassification]): F[Unit] = { + (subDataClassification map { sdc => + val correctionRequired = collectValidSubDataClassificationValues(dataClassification, sdc) + resultOf(validate( + correctionRequired.isEmpty, + TopicMetadataError.InvalidSubDataClassificationTypeError(dataClassification.entryName, sdc.entryName, correctionRequired) + )) + }).getOrElse(().pure) + } + + private def collectValidSubDataClassificationValues(dc: DataClassification, sdc: SubDataClassification): Seq[SubDataClassification] = dc match { + case Public => if (sdc != SubDataClassification.Public) Seq(SubDataClassification.Public) else Seq.empty + case InternalUse => if (sdc != SubDataClassification.InternalUseOnly) Seq(SubDataClassification.InternalUseOnly) else Seq.empty + case Confidential => if (sdc != SubDataClassification.ConfidentialPII) Seq(SubDataClassification.ConfidentialPII) else Seq.empty + case Restricted => + if (sdc != SubDataClassification.RestrictedFinancial && sdc != SubDataClassification.RestrictedEmployeeData) { + Seq(SubDataClassification.RestrictedFinancial, SubDataClassification.RestrictedEmployeeData) + } else { + Seq.empty + } + } +} diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/serializers/TopicMetadataV2Parser.scala b/ingestors/kafka/src/main/scala/hydra/kafka/serializers/TopicMetadataV2Parser.scala index 88d3d5d5f..496fabf71 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/serializers/TopicMetadataV2Parser.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/serializers/TopicMetadataV2Parser.scala @@ -160,29 +160,11 @@ sealed trait TopicMetadataV2Parser } } - implicit object DataClassificationFormat - extends RootJsonFormat[DataClassification] { - - def read(json: JsValue): DataClassification = json match { - case JsString("Public") => Public - case JsString("InternalUseOnly") => InternalUseOnly - case JsString("ConfidentialPII") => ConfidentialPII - case JsString("RestrictedFinancial") => RestrictedFinancial - case JsString("RestrictedEmployeeData") => RestrictedEmployeeData - case _ => - import scala.reflect.runtime.{universe => ru} - val tpe = ru.typeOf[DataClassification] - val knownDirectSubclasses: Set[ru.Symbol] = - tpe.typeSymbol.asClass.knownDirectSubclasses - throw DeserializationException( - DataClassificationInvalid(json, knownDirectSubclasses).errorMessage - ) - } + implicit val dataClassificationFormat: EnumEntryJsonFormat[DataClassification] = + new EnumEntryJsonFormat[DataClassification](DataClassification.values) - def write(obj: DataClassification): JsValue = { - JsString(obj.toString) - } - } + implicit val subDataClassificationFormat: EnumEntryJsonFormat[SubDataClassification] = + new EnumEntryJsonFormat[SubDataClassification](SubDataClassification.values) implicit object SchemasFormat extends RootJsonFormat[Schemas] { @@ -279,7 +261,11 @@ sealed trait TopicMetadataV2Parser case x => deserializationError(x) } - private def deserializationError(value: JsValue) = throw DeserializationException(s"Expected a value from enum $values instead of $value") + private def deserializationError(value: JsValue) = { + val className = values.headOption.map(_.getClass.getEnclosingClass.getSimpleName).getOrElse("") + throw DeserializationException( + s"For '$className': Expected a value from enum $values instead of $value") + } } implicit val additionalValidationFormat: EnumEntryJsonFormat[AdditionalValidation] = @@ -289,7 +275,7 @@ sealed trait TopicMetadataV2Parser extends RootJsonFormat[TopicMetadataV2Request] { override def write(obj: TopicMetadataV2Request): JsValue = - jsonFormat14(TopicMetadataV2Request.apply).write(obj) + jsonFormat15(TopicMetadataV2Request.apply).write(obj) override def read(json: JsValue): TopicMetadataV2Request = json match { case j: JsObject => @@ -356,15 +342,18 @@ sealed trait TopicMetadataV2Parser } else { toResult(None) } + + val payloadDataClassification = j.getFields("dataClassification").headOption + val adaptedDataClassification = adaptOldDataClassificationValue(payloadDataClassification) val dataClassification = toResult( - DataClassificationFormat.read( - j.getFields("dataClassification") - .headOption - .getOrElse( - throwDeserializationError("dataClassification", "String") - ) - ) - ) + new EnumEntryJsonFormat[DataClassification](DataClassification.values).read( + adaptedDataClassification.getOrElse(throwDeserializationError("dataClassification", "String")))) + + val maybeSubDataClassification = pickSubDataClassificationValue( + j.getFields("subDataClassification").headOption, payloadDataClassification) + val subDataClassification = toResult( + maybeSubDataClassification.map(new EnumEntryJsonFormat[SubDataClassification](SubDataClassification.values).read)) + val contact = toResult( ContactFormat.read( j.getFields("contact") @@ -409,6 +398,7 @@ sealed trait TopicMetadataV2Parser deprecated, deprecatedDate, dataClassification, + subDataClassification, contact, createdDate, parentSubjects, @@ -420,6 +410,23 @@ sealed trait TopicMetadataV2Parser toResult(None) // Never pick additionalValidations from the request. ).mapN(MetadataOnlyRequest.apply) } + + private def adaptOldDataClassificationValue(dataClassification: Option[JsValue]): Option[JsValue] = dataClassification map { + case JsString("InternalUseOnly") => JsString("InternalUse") + case JsString("ConfidentialPII") => JsString("Confidential") + case JsString("RestrictedFinancial") => JsString("Restricted") + case JsString("RestrictedEmployeeData") => JsString("Restricted") + case other: JsValue => other + } + + private def pickSubDataClassificationValue(payloadSubDataClassification: Option[JsValue], + payloadDataClassification: Option[JsValue]): Option[JsValue] = + if (payloadSubDataClassification.isEmpty && + payloadDataClassification.exists(dc => SubDataClassification.values.exists(_.entryName == dc.compactPrint))) { + payloadDataClassification + } else { + payloadSubDataClassification + } } implicit object MaybeSchemasFormat extends RootJsonFormat[MaybeSchemas] { @@ -438,7 +445,7 @@ sealed trait TopicMetadataV2Parser implicit object TopicMetadataResponseV2Format extends RootJsonFormat[TopicMetadataV2Response] { override def read(json: JsValue): TopicMetadataV2Response = throw IntentionallyUnimplemented - override def write(obj: TopicMetadataV2Response): JsValue = jsonFormat13(TopicMetadataV2Response.apply).write(obj) + override def write(obj: TopicMetadataV2Response): JsValue = jsonFormat14(TopicMetadataV2Response.apply).write(obj) } private def throwDeserializationError(key: String, `type`: String) = diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/services/StreamsManagerActor.scala b/ingestors/kafka/src/main/scala/hydra/kafka/services/StreamsManagerActor.scala index 71f3b86bf..043e829b8 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/services/StreamsManagerActor.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/services/StreamsManagerActor.scala @@ -164,6 +164,7 @@ object StreamsManagerActor { record.get("derived").toString.toBoolean, Try(Option(record.get("deprecated"))).toOption.flatten.map(_.toString.toBoolean), record.get("dataClassification").toString, + Try(Option(record.get("subDataClassification"))).toOption.flatten.map(_.toString), record.get("contact").toString, Try(Option(record.get("additionalDocumentation"))).toOption.flatten.map(_.toString), Try(Option(record.get("notes"))).toOption.flatten.map(_.toString), diff --git a/ingestors/kafka/src/main/scala/hydra/kafka/services/TopicBootstrapActor.scala b/ingestors/kafka/src/main/scala/hydra/kafka/services/TopicBootstrapActor.scala index 7e9d38e34..7f1d0696d 100644 --- a/ingestors/kafka/src/main/scala/hydra/kafka/services/TopicBootstrapActor.scala +++ b/ingestors/kafka/src/main/scala/hydra/kafka/services/TopicBootstrapActor.scala @@ -44,7 +44,7 @@ class TopicBootstrapActor( import TopicBootstrapActor._ import spray.json._ - implicit val metadataFormat = jsonFormat12(TopicMetadata) + implicit val metadataFormat = jsonFormat13(TopicMetadata) implicit val ec = context.dispatcher @@ -192,6 +192,7 @@ class TopicBootstrapActor( topicMetadataRequest.derived, topicMetadataRequest.deprecated, topicMetadataRequest.dataClassification, + topicMetadataRequest.subDataClassification, topicMetadataRequest.contact, topicMetadataRequest.additionalDocumentation, topicMetadataRequest.notes, diff --git a/ingestors/kafka/src/test/scala/hydra/kafka/TopicMetadataAdapterSpec.scala b/ingestors/kafka/src/test/scala/hydra/kafka/TopicMetadataAdapterSpec.scala index f3308b2c0..ac05ca129 100644 --- a/ingestors/kafka/src/test/scala/hydra/kafka/TopicMetadataAdapterSpec.scala +++ b/ingestors/kafka/src/test/scala/hydra/kafka/TopicMetadataAdapterSpec.scala @@ -24,6 +24,7 @@ class TopicMetadataAdapterSpec derived = false, deprecated = None, dataClassification = "public", + subDataClassification = Some("public"), contact = "alex", additionalDocumentation = None, notes = None, diff --git a/ingestors/kafka/src/test/scala/hydra/kafka/algebras/MetadataAlgebraSpec.scala b/ingestors/kafka/src/test/scala/hydra/kafka/algebras/MetadataAlgebraSpec.scala index 8fbd5f95d..bd4472cbd 100644 --- a/ingestors/kafka/src/test/scala/hydra/kafka/algebras/MetadataAlgebraSpec.scala +++ b/ingestors/kafka/src/test/scala/hydra/kafka/algebras/MetadataAlgebraSpec.scala @@ -10,6 +10,7 @@ import hydra.common.alerting.sender.InternalNotificationSender import hydra.kafka.algebras.MetadataAlgebra.TopicMetadataContainer import hydra.kafka.algebras.RetryableFs2Stream.RetryPolicy.Once import hydra.kafka.model.ContactMethod.Slack +import hydra.kafka.model.DataClassification.Public import hydra.kafka.model.TopicMetadataV2Request.Subject import hydra.kafka.model._ import org.typelevel.log4cats.SelfAwareStructuredLogger @@ -119,6 +120,7 @@ class MetadataAlgebraSpec extends AnyWordSpecLike with Matchers with Notificatio deprecated = false, None, Public, + None, NonEmptyList.one(Slack.create("#channel").get), Instant.now, List(), diff --git a/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/BootstrapEndpointSpec.scala b/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/BootstrapEndpointSpec.scala index febdb30f8..cffb8d385 100644 --- a/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/BootstrapEndpointSpec.scala +++ b/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/BootstrapEndpointSpec.scala @@ -16,7 +16,7 @@ import hydra.core.http.security.entity.AwsConfig import hydra.core.http.security.{AccessControlService, AwsSecurityService} import hydra.core.protocol.{Ingest, IngestorCompleted, IngestorError} import hydra.kafka.marshallers.HydraKafkaJsonSupport -import hydra.kafka.model.TopicMetadata +import hydra.kafka.model.{DataClassification, SubDataClassification, TopicMetadata} import hydra.kafka.producer.AvroRecord import hydra.kafka.services.StreamsManagerActor import hydra.kafka.util.KafkaUtils @@ -103,7 +103,7 @@ class BootstrapEndpointSpec private val bootstrapRoute = new BootstrapEndpoint(system, streamsManagerActor, KafkaConfigUtils.kafkaSecurityEmptyConfig, schemaRegistryEmptySecurityConfig, auth, awsSecurityService).route - implicit val f = jsonFormat12(TopicMetadata) + implicit val f = jsonFormat13(TopicMetadata) override def beforeAll: Unit = { EmbeddedKafka.start() @@ -120,6 +120,34 @@ class BootstrapEndpointSpec ) } + def dataClassificationRequest(dataClassification: String = "Public", subDataClassification: Option[String] = None): HttpEntity.Strict = HttpEntity( + ContentTypes.`application/json`, + s"""{ + | "streamType": "Notification", + | "derived": false, + | "dataClassification": "$dataClassification", + | ${if (subDataClassification.isDefined) s""""subDataClassification": "${subDataClassification.get}",""" else ""} + | "dataSourceOwner": "BARTON", + | "contact": "slackity slack dont talk back", + | "psDataLake": false, + | "additionalDocumentation": "akka://some/path/here.jpggifyo", + | "notes": "here are some notes topkek", + | "schema": { + | "namespace": "exp.assessment", + | "name": "SkillAssessmentTopicsScored", + | "type": "record", + | "version": 1, + | "fields": [ + | { + | "name": "testField", + | "type": "string" + | } + | ] + | }, + | "notificationUrl": "notification.url" + |}""".stripMargin + ) + "The bootstrap endpoint" should { "list streams" in { @@ -133,6 +161,7 @@ class BootstrapEndpointSpec | "streamType": "Notification", | "derived": false, | "dataClassification": "Public", + | "subDataClassification": "Public", | "contact": "slackity slack dont talk back", | "additionalDocumentation": "akka://some/path/here.jpggifyo", | "notes": "here are some notes topkek", @@ -174,6 +203,7 @@ class BootstrapEndpointSpec | "streamType": "Notification", | "derived": false, | "dataClassification": "Public", + | "subDataClassification": "Public", | "contact": "slackity slack dont talk back", | "additionalDocumentation": "akka://some/path/here.jpggifyo", | "notes": "here are some notes topkek", @@ -216,6 +246,7 @@ class BootstrapEndpointSpec | "streamType": "Notification", | "derived": false, | "dataClassification": "Public", + | "subDataClassification": "Public", | "dataSourceOwner": "BARTON", | "contact": "slackity slack dont talk back", | "psDataLake": false, @@ -251,6 +282,7 @@ class BootstrapEndpointSpec | "streamType": "Notification", | "derived": false, | "dataClassification": "Public", + | "subDataClassification": "Public", | "dataSourceOwner": "BARTON", | "contact": "slackity slack dont talk back", | "psDataLake": false, @@ -284,6 +316,7 @@ class BootstrapEndpointSpec | "streamType": "Notification", | "derived": false, | "dataClassification": "Public", + | "subDataClassification": "Public", | "dataSourceOwner": "BARTON", | "contact": "slackity slack dont talk back", | "psDataLake": false, @@ -317,6 +350,7 @@ class BootstrapEndpointSpec | "streamType": "Notification", | "derived": false, | "dataClassification": "Public", + | "subDataClassification": "Public", | "dataSourceOwner": "BARTON", | "contact": "slackity slack dont talk back", | "psDataLake": false, @@ -381,6 +415,7 @@ class BootstrapEndpointSpec | "streamType": "Notification", | "derived": false, | "dataClassification": "Public", + | "subDataClassification": "Public", | "dataSourceOwner": "BARTON", | "contact": "slackity slack dont talk back", | "psDataLake": false, @@ -416,6 +451,7 @@ class BootstrapEndpointSpec | "streamType": "Notification", | "derived": false, | "dataClassification": "Public", + | "subDataClassification": "Public", | "dataSourceOwner": "BARTON", | "contact": "slackity slack dont talk back", | "psDataLake": false, @@ -449,5 +485,41 @@ class BootstrapEndpointSpec } } } + + DataClassification.values foreach { dc => + s"$dc: accept valid DataClassification value" in { + Post("/streams", dataClassificationRequest(dc.entryName)) ~> bootstrapRoute ~> check { + status shouldBe StatusCodes.OK + } + } + } + + SubDataClassification.values foreach { dc => + s"$dc: accept deprecated valid DataClassification value" in { + Post("/streams", dataClassificationRequest(dc.entryName)) ~> bootstrapRoute ~> check { + status shouldBe StatusCodes.OK + } + } + } + + SubDataClassification.values foreach { value => + s"$value: accept valid SubDataClassification values" in { + Post("/streams", dataClassificationRequest(value.entryName, Some(value.entryName))) ~> bootstrapRoute ~> check { + status shouldBe StatusCodes.OK + } + } + } + + "reject invalid DataClassification metadata value" in { + Post("/streams", dataClassificationRequest("junk")) ~> bootstrapRoute ~> check { + status shouldBe StatusCodes.BadRequest + } + } + + "reject invalid SubDataClassification metadata value" in { + Post("/streams", dataClassificationRequest(subDataClassification = Some("junk"))) ~> bootstrapRoute ~> check { + status shouldBe StatusCodes.BadRequest + } + } } } diff --git a/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/BootstrapEndpointTestActors.scala b/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/BootstrapEndpointTestActors.scala index bf2d3e7c5..abe023950 100644 --- a/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/BootstrapEndpointTestActors.scala +++ b/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/BootstrapEndpointTestActors.scala @@ -35,6 +35,7 @@ trait BootstrapEndpointTestActors[F[_]] extends BootstrapEndpointActors[F] { derived = false, None, "", + None, "", None, None, diff --git a/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/BootstrapEndpointV2Spec.scala b/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/BootstrapEndpointV2Spec.scala index e090636ba..bbf5a268f 100644 --- a/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/BootstrapEndpointV2Spec.scala +++ b/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/BootstrapEndpointV2Spec.scala @@ -15,6 +15,7 @@ import hydra.core.http.security.{AccessControlService, AwsSecurityService} import hydra.kafka.algebras.RetryableFs2Stream.RetryPolicy.Once import hydra.kafka.algebras._ import hydra.kafka.model.ContactMethod.{Email, Slack} +import hydra.kafka.model.DataClassification.Public import hydra.kafka.model.TopicMetadataV2Request.Subject import hydra.kafka.model._ import hydra.kafka.programs.CreateTopicProgram @@ -127,6 +128,7 @@ final class BootstrapEndpointV2Spec deprecated = false, None, Public, + subDataClassification = None, NonEmptyList.of(Email.create("test@pluralsight.com").get, Slack.create("#dev-data-platform").get), Instant.now, List.empty, @@ -144,6 +146,7 @@ final class BootstrapEndpointV2Spec deprecated = false, None, Public, + subDataClassification = None, NonEmptyList.of(Email.create("test@pluralsight.com").get, Slack.create("#dev-data-platform").get), Instant.now, List.empty, @@ -161,6 +164,7 @@ final class BootstrapEndpointV2Spec deprecated = false, None, Public, + subDataClassification = None, NonEmptyList.of(Email.create("test@pluralsight.com").get, Slack.create("#dev-data-platform").get), Instant.now, List.empty, @@ -232,6 +236,7 @@ final class BootstrapEndpointV2Spec deprecated = false, None, Public, + subDataClassification = None, NonEmptyList.of(Email.create("test@pluralsight.com").get, Slack.create("#dev-data-platform").get), Instant.now, List.empty, @@ -306,6 +311,7 @@ final class BootstrapEndpointV2Spec deprecated = false, None, Public, + subDataClassification = None, NonEmptyList.of(Email.create("test@pluralsight.com").get, Slack.create("#dev-data-platform").get), Instant.now, List.empty, @@ -335,6 +341,7 @@ final class BootstrapEndpointV2Spec deprecated = false, None, Public, + subDataClassification = None, NonEmptyList.of(Email.create("test@pluralsight.com").get, Slack.create("#dev-data-platform").get), Instant.now, List.empty, diff --git a/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/TopicMetadataEndpointSpec.scala b/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/TopicMetadataEndpointSpec.scala index cd76a8917..26c9cce84 100644 --- a/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/TopicMetadataEndpointSpec.scala +++ b/ingestors/kafka/src/test/scala/hydra/kafka/endpoints/TopicMetadataEndpointSpec.scala @@ -16,27 +16,25 @@ import hydra.core.http.CorsSupport import hydra.core.http.security.entity.AwsConfig import hydra.core.http.security.{AccessControlService, AwsSecurityService} import hydra.kafka.algebras.RetryableFs2Stream.RetryPolicy.Once -import hydra.kafka.algebras.{HydraTag, KafkaAdminAlgebra, KafkaClientAlgebra, MetadataAlgebra, TagsAlgebra} +import hydra.kafka.algebras._ import hydra.kafka.consumer.KafkaConsumerProxy import hydra.kafka.consumer.KafkaConsumerProxy.{GetPartitionInfo, ListTopics, ListTopicsResponse, PartitionInfoResponse} import hydra.kafka.marshallers.HydraKafkaJsonSupport -import hydra.kafka.model.RequiredField +import hydra.kafka.model.{DataClassification, RequiredField, SubDataClassification} import hydra.kafka.model.TopicMetadataV2Request.Subject -import org.typelevel.log4cats.SelfAwareStructuredLogger -import org.typelevel.log4cats.slf4j.Slf4jLogger +import hydra.kafka.programs.CreateTopicProgram +import hydra.kafka.util.KafkaUtils.TopicDetails import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig} +import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder} import org.apache.kafka.common.{Node, PartitionInfo} import org.scalatest.BeforeAndAfterAll import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike - -import scala.concurrent.ExecutionContext -import hydra.kafka.programs.CreateTopicProgram -import hydra.kafka.util.KafkaUtils.TopicDetails -import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder} +import org.typelevel.log4cats.SelfAwareStructuredLogger +import org.typelevel.log4cats.slf4j.Slf4jLogger import retry.{RetryPolicies, RetryPolicy} -import java.time.Instant +import scala.concurrent.ExecutionContext class TopicMetadataEndpointSpec @@ -49,12 +47,11 @@ class TopicMetadataEndpointSpec with ConfigSupport with NotificationsTestSuite { + import ConfigSupport._ import spray.json._ import scala.concurrent.duration._ - import ConfigSupport._ - implicit private def unsafeLogger[F[_]: Sync]: SelfAwareStructuredLogger[F] = Slf4jLogger.getLogger[F] @@ -283,7 +280,8 @@ class TopicMetadataEndpointSpec val validRequest = """{ | "streamType": "Event", | "deprecated": true, - | "dataClassification": "InternalUseOnly", + | "dataClassification": "InternalUse", + | "subDataClassification": "InternalUseOnly", | "contact": { | "email": "bob@myemail.com" | }, @@ -299,7 +297,8 @@ class TopicMetadataEndpointSpec """{ | "streamType": "History", | "deprecated": true, - | "dataClassification": "InternalUseOnly", + | "dataClassification": "InternalUse", + | "subDataClassification": "InternalUseOnly", | "contact": { | "email": "bob@myemail.com" | }, @@ -313,7 +312,8 @@ class TopicMetadataEndpointSpec """{ | "streamType": "Event", | "deprecated": true, - | "dataClassification": "InternalUseOnly", + | "dataClassification": "InternalUse", + | "subDataClassification": "InternalUseOnly", | "contact": { | "email": "bob@myemail.com" | }, @@ -324,6 +324,23 @@ class TopicMetadataEndpointSpec | "tags": [] |}""".stripMargin + def dataClassificationRequest(dataClassification: String = "Public", subDataClassification: Option[String] = None) = + s"""{ + | "streamType": "Event", + | "deprecated": true, + | "dataClassification": "$dataClassification", + | ${if (subDataClassification.isDefined) s""""subDataClassification": "${subDataClassification.get}",""" else ""} + | "contact": { + | "email": "bob@myemail.com" + | }, + | "createdDate": "2020-02-02T12:34:56Z", + | "notes": "here are some notes", + | "parentSubjects": [], + | "teamName": "dvs-teamName", + | "tags": ["Source: DVS"], + | "notificationUrl": "testnotification.url" + |}""".stripMargin + "return 200 with proper metadata" in { implicit val timeout = RouteTestTimeout(5.seconds) Put("/v2/metadata/dvs.test.subject", HttpEntity(ContentTypes.`application/json`, validRequest)) ~> route ~> check { @@ -348,5 +365,44 @@ class TopicMetadataEndpointSpec rejection shouldBe a[MalformedRequestContentRejection] } } + + "reject invalid DataClassification metadata value" in { + Put("/v2/metadata/dvs.test.subject", HttpEntity(ContentTypes.`application/json`, + dataClassificationRequest("junk"))) ~> route ~> check { + rejection shouldBe a[MalformedRequestContentRejection] + } + } + + "reject invalid SubDataClassification metadata value" in { + Put("/v2/metadata/dvs.test.subject", HttpEntity(ContentTypes.`application/json`, + dataClassificationRequest(subDataClassification = Some("junk")))) ~> route ~> check { + rejection shouldBe a[MalformedRequestContentRejection] + } + } + + DataClassification.values foreach { dc => + s"$dc: accept valid DataClassification value" in { + Put("/v2/metadata/dvs.test.subject", HttpEntity(ContentTypes.`application/json`, dataClassificationRequest(dc.entryName))) ~> route ~> check { + response.status shouldBe StatusCodes.OK + } + } + } + + SubDataClassification.values foreach { dc => + s"$dc: accept deprecated valid DataClassification value" in { + Put("/v2/metadata/dvs.test.subject", HttpEntity(ContentTypes.`application/json`, dataClassificationRequest(dc.entryName))) ~> route ~> check { + response.status shouldBe StatusCodes.OK + } + } + } + + SubDataClassification.values foreach { value => + s"$value: accept valid SubDataClassification values" in { + Put("/v2/metadata/dvs.test.subject", HttpEntity(ContentTypes.`application/json`, + dataClassificationRequest(dataClassification = value.entryName, subDataClassification = Some(value.entryName)))) ~> route ~> check { + response.status shouldBe StatusCodes.OK + } + } + } } } diff --git a/ingestors/kafka/src/test/scala/hydra/kafka/model/TopicMetadataSpec.scala b/ingestors/kafka/src/test/scala/hydra/kafka/model/TopicMetadataSpec.scala index 26f66ad86..0ac8fb216 100644 --- a/ingestors/kafka/src/test/scala/hydra/kafka/model/TopicMetadataSpec.scala +++ b/ingestors/kafka/src/test/scala/hydra/kafka/model/TopicMetadataSpec.scala @@ -1,9 +1,9 @@ package hydra.kafka.model import java.time.Instant - import cats.data.NonEmptyList import cats.effect.IO +import hydra.kafka.model.DataClassification._ import hydra.kafka.model.TopicMetadataV2Request.Subject import io.confluent.kafka.schemaregistry.avro.AvroCompatibilityChecker import org.apache.avro.Schema @@ -23,22 +23,28 @@ final class TopicMetadataSpec extends AnyFlatSpecLike with Matchers { .decode(_, schema) .toOption .get - it must s"encode and decode $a" in { + it must s"${a.getClass}: encode and decode $a" in { decode(encode(a)) shouldBe a } } import TopicMetadataV2ValueOptionalTagList._ - List(StreamTypeV2.Event, StreamTypeV2.Entity, StreamTypeV2.Telemetry).map( + List(StreamTypeV2.Event, StreamTypeV2.Entity, StreamTypeV2.Telemetry).foreach( testCodec[StreamTypeV2] ) List( Public, - InternalUseOnly, - ConfidentialPII, - RestrictedFinancial, - RestrictedEmployeeData - ).map(testCodec[DataClassification]) + InternalUse, + Confidential, + Restricted + ).foreach(testCodec[DataClassification]) + List( + SubDataClassification.Public, + SubDataClassification.InternalUseOnly, + SubDataClassification.ConfidentialPII, + SubDataClassification.RestrictedFinancial, + SubDataClassification.RestrictedEmployeeData + ).foreach(testCodec[SubDataClassification]) val createdDate = Instant.now @@ -49,6 +55,7 @@ final class TopicMetadataSpec extends AnyFlatSpecLike with Matchers { false, None, Public, + Some(SubDataClassification.Public), NonEmptyList.of(ContactMethod.create("test@test.com").get), createdDate, List.empty, @@ -75,6 +82,7 @@ final class TopicMetadataSpec extends AnyFlatSpecLike with Matchers { |"deprecated": false, |"deprecatedDate": null, |"dataClassification":"Public", + |"subDataClassification": {"hydra.kafka.model.SubDataClassification": "Public"}, |"teamName":{"string":"dvs-teamName"}, |"contact":[ | { @@ -104,6 +112,7 @@ final class TopicMetadataSpec extends AnyFlatSpecLike with Matchers { false, None, Public, + Some(SubDataClassification.Public), NonEmptyList.of(ContactMethod.create("test@test.com").get), createdDate, List.empty, @@ -126,11 +135,11 @@ final class TopicMetadataSpec extends AnyFlatSpecLike with Matchers { "TopicMetadata" should "have compatible schema evolutions" in { import collection.JavaConverters._ - val schemaVersion1String = "{\"type\":\"record\",\"name\":\"TopicMetadataV2Value\",\"namespace\":\"_hydra.v2\",\"fields\":[{\"name\":\"streamType\",\"type\":{\"type\":\"enum\",\"name\":\"StreamTypeV2\",\"namespace\":\"hydra.kafka.model\",\"symbols\":[\"Event\",\"Entity\",\"Telemetry\"]}},{\"name\":\"deprecated\",\"type\":\"boolean\"},{\"name\":\"dataClassification\",\"type\":{\"type\":\"enum\",\"name\":\"DataClassification\",\"namespace\":\"hydra.kafka.model\",\"symbols\":[\"Public\",\"InternalUseOnly\",\"ConfidentialPII\",\"RestrictedFinancial\",\"RestrictedEmployeeData\"]}},{\"name\":\"contact\",\"type\":{\"type\":\"array\",\"items\":[{\"type\":\"record\",\"name\":\"Email\",\"namespace\":\"hydra.kafka.model.ContactMethod\",\"fields\":[{\"name\":\"address\",\"type\":\"string\"}]},{\"type\":\"record\",\"name\":\"Slack\",\"namespace\":\"hydra.kafka.model.ContactMethod\",\"fields\":[{\"name\":\"channel\",\"type\":\"string\"}]}]}},{\"name\":\"createdDate\",\"type\":\"string\"},{\"name\":\"parentSubjects\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"notes\",\"type\":[\"null\",\"string\"]}]}" - val schemaVersion2String = "{\"type\":\"record\",\"name\":\"TopicMetadataV2Value\",\"namespace\":\"_hydra.v2\",\"fields\":[{\"name\":\"streamType\",\"type\":{\"type\":\"enum\",\"name\":\"StreamTypeV2\",\"namespace\":\"hydra.kafka.model\",\"symbols\":[\"Event\",\"Entity\",\"Telemetry\"]}},{\"name\":\"deprecated\",\"type\":\"boolean\"},{\"name\":\"deprecatedDate\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"dataClassification\",\"type\":{\"type\":\"enum\",\"name\":\"DataClassification\",\"namespace\":\"hydra.kafka.model\",\"symbols\":[\"Public\",\"InternalUseOnly\",\"ConfidentialPII\",\"RestrictedFinancial\",\"RestrictedEmployeeData\"]}},{\"name\":\"contact\",\"type\":{\"type\":\"array\",\"items\":[{\"type\":\"record\",\"name\":\"Email\",\"namespace\":\"hydra.kafka.model.ContactMethod\",\"fields\":[{\"name\":\"address\",\"type\":\"string\"}]},{\"type\":\"record\",\"name\":\"Slack\",\"namespace\":\"hydra.kafka.model.ContactMethod\",\"fields\":[{\"name\":\"channel\",\"type\":\"string\"}]}]}},{\"name\":\"createdDate\",\"type\":\"string\"},{\"name\":\"parentSubjects\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"notes\",\"type\":[\"null\",\"string\"]}]}" - val schemaVersion3String = "{\"type\":\"record\",\"name\":\"TopicMetadataV2Value\",\"namespace\":\"_hydra.v2\",\"fields\":[{\"name\":\"streamType\",\"type\":{\"type\":\"enum\",\"name\":\"StreamTypeV2\",\"namespace\":\"hydra.kafka.model\",\"symbols\":[\"Event\",\"Entity\",\"Telemetry\"]}},{\"name\":\"deprecated\",\"type\":\"boolean\"},{\"name\":\"deprecatedDate\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"dataClassification\",\"type\":{\"type\":\"enum\",\"name\":\"DataClassification\",\"namespace\":\"hydra.kafka.model\",\"symbols\":[\"Public\",\"InternalUseOnly\",\"ConfidentialPII\",\"RestrictedFinancial\",\"RestrictedEmployeeData\"]}},{\"name\":\"contact\",\"type\":{\"type\":\"array\",\"items\":[{\"type\":\"record\",\"name\":\"Email\",\"namespace\":\"hydra.kafka.model.ContactMethod\",\"fields\":[{\"name\":\"address\",\"type\":\"string\"}]},{\"type\":\"record\",\"name\":\"Slack\",\"namespace\":\"hydra.kafka.model.ContactMethod\",\"fields\":[{\"name\":\"channel\",\"type\":\"string\"}]}]}},{\"name\":\"createdDate\",\"type\":\"string\"},{\"name\":\"parentSubjects\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"notes\",\"type\":[\"null\",\"string\"]},{\"name\":\"teamName\",\"type\":[\"null\",\"string\"],\"default\":null}]}" - val schemaVersion4String = "{\"type\":\"record\",\"name\":\"TopicMetadataV2Value\",\"namespace\":\"_hydra.v2\",\"fields\":[{\"name\":\"streamType\",\"type\":{\"type\":\"enum\",\"name\":\"StreamTypeV2\",\"namespace\":\"hydra.kafka.model\",\"symbols\":[\"Event\",\"Entity\",\"Telemetry\"]}},{\"name\":\"deprecated\",\"type\":\"boolean\"},{\"name\":\"deprecatedDate\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"dataClassification\",\"type\":{\"type\":\"enum\",\"name\":\"DataClassification\",\"namespace\":\"hydra.kafka.model\",\"symbols\":[\"Public\",\"InternalUseOnly\",\"ConfidentialPII\",\"RestrictedFinancial\",\"RestrictedEmployeeData\"]}},{\"name\":\"contact\",\"type\":{\"type\":\"array\",\"items\":[{\"type\":\"record\",\"name\":\"Email\",\"namespace\":\"hydra.kafka.model.ContactMethod\",\"fields\":[{\"name\":\"address\",\"type\":\"string\"}]},{\"type\":\"record\",\"name\":\"Slack\",\"namespace\":\"hydra.kafka.model.ContactMethod\",\"fields\":[{\"name\":\"channel\",\"type\":\"string\"}]}]}},{\"name\":\"createdDate\",\"type\":\"string\"},{\"name\":\"parentSubjects\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"notes\",\"type\":[\"null\",\"string\"]},{\"name\":\"teamName\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"tags\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"default\":null}]}" - val schemaVersion5String = "{\"type\":\"record\",\"name\":\"TopicMetadataV2Value\",\"namespace\":\"_hydra.v2\",\"fields\":[{\"name\":\"streamType\",\"type\":{\"type\":\"enum\",\"name\":\"StreamTypeV2\",\"namespace\":\"hydra.kafka.model\",\"symbols\":[\"Event\",\"Entity\",\"Telemetry\"]}},{\"name\":\"deprecated\",\"type\":\"boolean\"},{\"name\":\"deprecatedDate\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"dataClassification\",\"type\":{\"type\":\"enum\",\"name\":\"DataClassification\",\"namespace\":\"hydra.kafka.model\",\"symbols\":[\"Public\",\"InternalUseOnly\",\"ConfidentialPII\",\"RestrictedFinancial\",\"RestrictedEmployeeData\"]}},{\"name\":\"contact\",\"type\":{\"type\":\"array\",\"items\":[{\"type\":\"record\",\"name\":\"Email\",\"namespace\":\"hydra.kafka.model.ContactMethod\",\"fields\":[{\"name\":\"address\",\"type\":\"string\"}]},{\"type\":\"record\",\"name\":\"Slack\",\"namespace\":\"hydra.kafka.model.ContactMethod\",\"fields\":[{\"name\":\"channel\",\"type\":\"string\"}]}]}},{\"name\":\"createdDate\",\"type\":\"string\"},{\"name\":\"parentSubjects\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"notes\",\"type\":[\"null\",\"string\"]},{\"name\":\"teamName\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"tags\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"default\":null},{\"name\":\"notificationUrl\",\"type\":[\"null\",\"string\"],\"default\":null}]}" + val schemaVersion1String = "{\"type\":\"record\",\"name\":\"TopicMetadataV2Value\",\"namespace\":\"_hydra.v2\",\"fields\":[{\"name\":\"streamType\",\"type\":{\"type\":\"enum\",\"name\":\"StreamTypeV2\",\"namespace\":\"hydra.kafka.model\",\"symbols\":[\"Event\",\"Entity\",\"Telemetry\"]}},{\"name\":\"deprecated\",\"type\":\"boolean\"},{\"name\":\"dataClassification\",\"type\":{\"type\":\"enum\",\"name\":\"DataClassification\",\"namespace\":\"hydra.kafka.model\",\"symbols\":[\"Public\",\"InternalUse\",\"Confidential\",\"Restricted\"]}},{\"name\":\"contact\",\"type\":{\"type\":\"array\",\"items\":[{\"type\":\"record\",\"name\":\"Email\",\"namespace\":\"hydra.kafka.model.ContactMethod\",\"fields\":[{\"name\":\"address\",\"type\":\"string\"}]},{\"type\":\"record\",\"name\":\"Slack\",\"namespace\":\"hydra.kafka.model.ContactMethod\",\"fields\":[{\"name\":\"channel\",\"type\":\"string\"}]}]}},{\"name\":\"createdDate\",\"type\":\"string\"},{\"name\":\"parentSubjects\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"notes\",\"type\":[\"null\",\"string\"]}]}" + val schemaVersion2String = "{\"type\":\"record\",\"name\":\"TopicMetadataV2Value\",\"namespace\":\"_hydra.v2\",\"fields\":[{\"name\":\"streamType\",\"type\":{\"type\":\"enum\",\"name\":\"StreamTypeV2\",\"namespace\":\"hydra.kafka.model\",\"symbols\":[\"Event\",\"Entity\",\"Telemetry\"]}},{\"name\":\"deprecated\",\"type\":\"boolean\"},{\"name\":\"deprecatedDate\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"dataClassification\",\"type\":{\"type\":\"enum\",\"name\":\"DataClassification\",\"namespace\":\"hydra.kafka.model\",\"symbols\":[\"Public\",\"InternalUse\",\"Confidential\",\"Restricted\"]}},{\"name\":\"contact\",\"type\":{\"type\":\"array\",\"items\":[{\"type\":\"record\",\"name\":\"Email\",\"namespace\":\"hydra.kafka.model.ContactMethod\",\"fields\":[{\"name\":\"address\",\"type\":\"string\"}]},{\"type\":\"record\",\"name\":\"Slack\",\"namespace\":\"hydra.kafka.model.ContactMethod\",\"fields\":[{\"name\":\"channel\",\"type\":\"string\"}]}]}},{\"name\":\"createdDate\",\"type\":\"string\"},{\"name\":\"parentSubjects\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"notes\",\"type\":[\"null\",\"string\"]}]}" + val schemaVersion3String = "{\"type\":\"record\",\"name\":\"TopicMetadataV2Value\",\"namespace\":\"_hydra.v2\",\"fields\":[{\"name\":\"streamType\",\"type\":{\"type\":\"enum\",\"name\":\"StreamTypeV2\",\"namespace\":\"hydra.kafka.model\",\"symbols\":[\"Event\",\"Entity\",\"Telemetry\"]}},{\"name\":\"deprecated\",\"type\":\"boolean\"},{\"name\":\"deprecatedDate\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"dataClassification\",\"type\":{\"type\":\"enum\",\"name\":\"DataClassification\",\"namespace\":\"hydra.kafka.model\",\"symbols\":[\"Public\",\"InternalUse\",\"Confidential\",\"Restricted\"]}},{\"name\":\"contact\",\"type\":{\"type\":\"array\",\"items\":[{\"type\":\"record\",\"name\":\"Email\",\"namespace\":\"hydra.kafka.model.ContactMethod\",\"fields\":[{\"name\":\"address\",\"type\":\"string\"}]},{\"type\":\"record\",\"name\":\"Slack\",\"namespace\":\"hydra.kafka.model.ContactMethod\",\"fields\":[{\"name\":\"channel\",\"type\":\"string\"}]}]}},{\"name\":\"createdDate\",\"type\":\"string\"},{\"name\":\"parentSubjects\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"notes\",\"type\":[\"null\",\"string\"]},{\"name\":\"teamName\",\"type\":[\"null\",\"string\"],\"default\":null}]}" + val schemaVersion4String = "{\"type\":\"record\",\"name\":\"TopicMetadataV2Value\",\"namespace\":\"_hydra.v2\",\"fields\":[{\"name\":\"streamType\",\"type\":{\"type\":\"enum\",\"name\":\"StreamTypeV2\",\"namespace\":\"hydra.kafka.model\",\"symbols\":[\"Event\",\"Entity\",\"Telemetry\"]}},{\"name\":\"deprecated\",\"type\":\"boolean\"},{\"name\":\"deprecatedDate\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"dataClassification\",\"type\":{\"type\":\"enum\",\"name\":\"DataClassification\",\"namespace\":\"hydra.kafka.model\",\"symbols\":[\"Public\",\"InternalUse\",\"Confidential\",\"Restricted\"]}},{\"name\":\"contact\",\"type\":{\"type\":\"array\",\"items\":[{\"type\":\"record\",\"name\":\"Email\",\"namespace\":\"hydra.kafka.model.ContactMethod\",\"fields\":[{\"name\":\"address\",\"type\":\"string\"}]},{\"type\":\"record\",\"name\":\"Slack\",\"namespace\":\"hydra.kafka.model.ContactMethod\",\"fields\":[{\"name\":\"channel\",\"type\":\"string\"}]}]}},{\"name\":\"createdDate\",\"type\":\"string\"},{\"name\":\"parentSubjects\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"notes\",\"type\":[\"null\",\"string\"]},{\"name\":\"teamName\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"tags\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"default\":null}]}" + val schemaVersion5String = "{\"type\":\"record\",\"name\":\"TopicMetadataV2Value\",\"namespace\":\"_hydra.v2\",\"fields\":[{\"name\":\"streamType\",\"type\":{\"type\":\"enum\",\"name\":\"StreamTypeV2\",\"namespace\":\"hydra.kafka.model\",\"symbols\":[\"Event\",\"Entity\",\"Telemetry\"]}},{\"name\":\"deprecated\",\"type\":\"boolean\"},{\"name\":\"deprecatedDate\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"dataClassification\",\"type\":{\"type\":\"enum\",\"name\":\"DataClassification\",\"namespace\":\"hydra.kafka.model\",\"symbols\":[\"Public\",\"InternalUse\",\"Confidential\",\"Restricted\"]}},{\"name\":\"contact\",\"type\":{\"type\":\"array\",\"items\":[{\"type\":\"record\",\"name\":\"Email\",\"namespace\":\"hydra.kafka.model.ContactMethod\",\"fields\":[{\"name\":\"address\",\"type\":\"string\"}]},{\"type\":\"record\",\"name\":\"Slack\",\"namespace\":\"hydra.kafka.model.ContactMethod\",\"fields\":[{\"name\":\"channel\",\"type\":\"string\"}]}]}},{\"name\":\"createdDate\",\"type\":\"string\"},{\"name\":\"parentSubjects\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},{\"name\":\"notes\",\"type\":[\"null\",\"string\"]},{\"name\":\"teamName\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"tags\",\"type\":[\"null\",{\"type\":\"array\",\"items\":\"string\"}],\"default\":null},{\"name\":\"notificationUrl\",\"type\":[\"null\",\"string\"],\"default\":null}]}" def parser = new Schema.Parser() val schemaVersion1 = parser.parse(schemaVersion1String) val schemaVersion2 = parser.parse(schemaVersion2String) diff --git a/ingestors/kafka/src/test/scala/hydra/kafka/programs/CreateTopicProgramSpec.scala b/ingestors/kafka/src/test/scala/hydra/kafka/programs/CreateTopicProgramSpec.scala index d1adc9059..be5b896a6 100644 --- a/ingestors/kafka/src/test/scala/hydra/kafka/programs/CreateTopicProgramSpec.scala +++ b/ingestors/kafka/src/test/scala/hydra/kafka/programs/CreateTopicProgramSpec.scala @@ -32,10 +32,12 @@ import hydra.common.alerting.sender.InternalNotificationSender import hydra.common.validation.ValidationError.ValidationCombinedErrors import hydra.kafka.IOSuite import hydra.kafka.algebras.RetryableFs2Stream.RetryPolicy.Once +import hydra.kafka.model.DataClassification.{Confidential, InternalUse, Public, Restricted} import scala.concurrent.ExecutionContext import hydra.kafka.model.TopicMetadataV2Request.NumPartitions import hydra.kafka.programs.CreateTopicProgram.MetadataOnlyTopicDoesNotExist +import hydra.kafka.programs.TopicMetadataError.InvalidSubDataClassificationTypeError import hydra.kafka.programs.TopicSchemaError._ import hydra.kafka.utils.TopicUtils import org.apache.avro.SchemaBuilder.{FieldAssembler, GenericDefault} @@ -2168,6 +2170,92 @@ class CreateTopicProgramSpec extends AsyncFreeSpec with Matchers with IOSuite { )).asLeft) } + "valid combination of DataClassification and SubDataClassification should be successful" in { + for { + ts <- Resource.eval(initTestServices()) + _ <- ts.program.registerSchemas(subject, keySchema, valueSchema) + _ <- ts.program.createTopicResource(subject, topicDetails) + _ <- Resource.eval(ts.program.createTopicFromMetadataOnly(subject, createDataClassificationMetadataRequest(Public))) + _ <- Resource.eval(ts.program.createTopicFromMetadataOnly(subject, createDataClassificationMetadataRequest(InternalUse))) + _ <- Resource.eval(ts.program.createTopicFromMetadataOnly(subject, createDataClassificationMetadataRequest(Confidential))) + _ <- Resource.eval(ts.program.createTopicFromMetadataOnly(subject, createDataClassificationMetadataRequest(Restricted))) + _ <- Resource.eval(ts.program.createTopicFromMetadataOnly(subject, + createDataClassificationMetadataRequest(Public, Some(SubDataClassification.Public)))) + _ <- Resource.eval(ts.program.createTopicFromMetadataOnly(subject, + createDataClassificationMetadataRequest(InternalUse, Some(SubDataClassification.InternalUseOnly)))) + _ <- Resource.eval(ts.program.createTopicFromMetadataOnly(subject, + createDataClassificationMetadataRequest(Confidential, Some(SubDataClassification.ConfidentialPII)))) + _ <- Resource.eval(ts.program.createTopicFromMetadataOnly(subject, + createDataClassificationMetadataRequest(Restricted, Some(SubDataClassification.RestrictedFinancial)))) + _ <- Resource.eval(ts.program.createTopicFromMetadataOnly(subject, + createDataClassificationMetadataRequest(Restricted, Some(SubDataClassification.RestrictedEmployeeData)))) + } yield succeed + } + + "Public DataClassification: invalid combination of SubDataClassification value should result in failure" in { + val result = for { + ts <- Resource.eval(initTestServices()) + _ <- ts.program.registerSchemas(subject, keySchema, valueSchema) + _ <- ts.program.createTopicResource(subject, topicDetails) + _ <- Resource.eval(ts.program.createTopicFromMetadataOnly(subject, + createDataClassificationMetadataRequest(Public, Some(SubDataClassification.InternalUseOnly)))) + } yield () + + result.attempt.map(_ shouldBe InvalidSubDataClassificationTypeError( + Public.entryName, + SubDataClassification.InternalUseOnly.entryName, + Seq(SubDataClassification.Public) + ).asLeft) + } + + "InternalUse DataClassification: invalid combination of SubDataClassification value should result in failure" in { + val result = for { + ts <- Resource.eval(initTestServices()) + _ <- ts.program.registerSchemas(subject, keySchema, valueSchema) + _ <- ts.program.createTopicResource(subject, topicDetails) + _ <- Resource.eval(ts.program.createTopicFromMetadataOnly(subject, + createDataClassificationMetadataRequest(InternalUse, Some(SubDataClassification.Public)))) + } yield () + + result.attempt.map(_ shouldBe InvalidSubDataClassificationTypeError( + InternalUse.entryName, + SubDataClassification.Public.entryName, + Seq(SubDataClassification.InternalUseOnly) + ).asLeft) + } + + "Confidential DataClassification: invalid combination of SubDataClassification value should result in failure" in { + val result = for { + ts <- Resource.eval(initTestServices()) + _ <- ts.program.registerSchemas(subject, keySchema, valueSchema) + _ <- ts.program.createTopicResource(subject, topicDetails) + _ <- Resource.eval(ts.program.createTopicFromMetadataOnly(subject, + createDataClassificationMetadataRequest(Confidential, Some(SubDataClassification.Public)))) + } yield () + + result.attempt.map(_ shouldBe InvalidSubDataClassificationTypeError( + Confidential.entryName, + SubDataClassification.Public.entryName, + Seq(SubDataClassification.ConfidentialPII) + ).asLeft) + } + + "Restricted DataClassification: invalid combination of SubDataClassification value should result in failure" in { + val result = for { + ts <- Resource.eval(initTestServices()) + _ <- ts.program.registerSchemas(subject, keySchema, valueSchema) + _ <- ts.program.createTopicResource(subject, topicDetails) + _ <- Resource.eval(ts.program.createTopicFromMetadataOnly(subject, + createDataClassificationMetadataRequest(Restricted, Some(SubDataClassification.Public)))) + } yield () + + result.attempt.map(_ shouldBe InvalidSubDataClassificationTypeError( + Restricted.entryName, + SubDataClassification.Public.entryName, + Seq(SubDataClassification.RestrictedFinancial, SubDataClassification.RestrictedEmployeeData) + ).asLeft) + } + def createTopic(createdAtDefaultValue: Option[Long], updatedAtDefaultValue: Option[Long], existingTopic: Boolean = false) = for { m <- TestMetadataAlgebra() @@ -2293,6 +2381,7 @@ object CreateTopicProgramSpec extends NotificationsTestSuite { deprecated = deprecated, deprecatedDate, Public, + None, NonEmptyList.of(Email.create(email).get), createdDate, List.empty, @@ -2320,6 +2409,7 @@ object CreateTopicProgramSpec extends NotificationsTestSuite { deprecated = deprecated, deprecatedDate, Public, + None, NonEmptyList.of(Email.create(email).get), createdDate, List.empty, @@ -2331,6 +2421,28 @@ object CreateTopicProgramSpec extends NotificationsTestSuite { additionalValidations = None ) + def createDataClassificationMetadataRequest( + dataClassification: DataClassification, + subDataClassification: Option[SubDataClassification] = None + ): TopicMetadataV2Request = + TopicMetadataV2Request( + Schemas(keySchema, valueSchema), + StreamTypeV2.Entity, + deprecated = false, + None, + dataClassification, + subDataClassification, + NonEmptyList.of(Email.create("test@test.com").get), + Instant.now(), + List.empty, + None, + Some("dvs-teamName"), + None, + List.empty, + Some("notification.url"), + None + ) + def getSchema(name: String, createdAtDefaultValue: Option[Long] = None, updatedAtDefaultValue: Option[Long] = None): Schema = { diff --git a/ingestors/kafka/src/test/scala/hydra/kafka/serializers/TopicMetadataV2ParserSpec.scala b/ingestors/kafka/src/test/scala/hydra/kafka/serializers/TopicMetadataV2ParserSpec.scala index e1719a94a..50b4358fb 100644 --- a/ingestors/kafka/src/test/scala/hydra/kafka/serializers/TopicMetadataV2ParserSpec.scala +++ b/ingestors/kafka/src/test/scala/hydra/kafka/serializers/TopicMetadataV2ParserSpec.scala @@ -1,7 +1,6 @@ package hydra.kafka.serializers import java.time.Instant - import cats.data.NonEmptyList import hydra.kafka.algebras.MetadataAlgebra.TopicMetadataContainer import hydra.kafka.model.ContactMethod.{Email, Slack} @@ -12,6 +11,7 @@ import org.apache.avro.{Schema, SchemaBuilder} import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpecLike import eu.timepit.refined._ +import hydra.kafka.model.DataClassification._ import scala.concurrent.duration._ import hydra.kafka.model.TopicMetadataV2Request.NumPartitions @@ -215,27 +215,39 @@ class TopicMetadataV2ParserSpec extends AnyWordSpecLike with Matchers { } should have message StreamTypeInvalid(jsValue, knownDirectSubclasses).errorMessage } - "parse one of each type of DataClassification" in { - DataClassificationFormat.read(JsString("Public")) shouldBe Public - DataClassificationFormat.read(JsString("InternalUseOnly")) shouldBe InternalUseOnly - DataClassificationFormat.read(JsString("ConfidentialPII")) shouldBe ConfidentialPII - DataClassificationFormat.read(JsString("RestrictedFinancial")) shouldBe RestrictedFinancial - DataClassificationFormat.read(JsString("RestrictedEmployeeData")) shouldBe RestrictedEmployeeData + "parse DataClassification enum" in { + val dataClassificationEnumEntry = new EnumEntryJsonFormat(DataClassification.values) + dataClassificationEnumEntry.read(JsString("Public")) shouldBe Public + dataClassificationEnumEntry.read(JsString("InternalUse")) shouldBe InternalUse + dataClassificationEnumEntry.read(JsString("Confidential")) shouldBe Confidential + dataClassificationEnumEntry.read(JsString("Restricted")) shouldBe Restricted } - "throw error when parsing DataClassification" in { - val jsValue = JsString.empty - import scala.reflect.runtime.{universe => ru} - val tpe = ru.typeOf[DataClassification] - val knownDirectSubclasses: Set[ru.Symbol] = - tpe.typeSymbol.asClass.knownDirectSubclasses + "throw error when parsing invalid DataClassification" in { + val dataClassificationEnumEntry = new EnumEntryJsonFormat(DataClassification.values) + val jsValue = JsString("junk") the[DeserializationException] thrownBy { - DataClassificationFormat.read(jsValue) - } should have message DataClassificationInvalid( - jsValue, - knownDirectSubclasses - ).errorMessage + dataClassificationEnumEntry.read(jsValue) + } should have message s"For 'DataClassification': Expected a value from enum $values instead of $jsValue" + } + + "parse SubDataClassification enum" in { + val subDataClassificationEnumEntry = new EnumEntryJsonFormat(SubDataClassification.values) + subDataClassificationEnumEntry.read(JsString("Public")) shouldBe SubDataClassification.Public + subDataClassificationEnumEntry.read(JsString("InternalUseOnly")) shouldBe SubDataClassification.InternalUseOnly + subDataClassificationEnumEntry.read(JsString("ConfidentialPII")) shouldBe SubDataClassification.ConfidentialPII + subDataClassificationEnumEntry.read(JsString("RestrictedFinancial")) shouldBe SubDataClassification.RestrictedFinancial + subDataClassificationEnumEntry.read(JsString("RestrictedEmployeeData")) shouldBe SubDataClassification.RestrictedEmployeeData + } + + "throw error when parsing invalid SubDataClassification" in { + val subDataClassificationEnumEntry = new EnumEntryJsonFormat(SubDataClassification.values) + val jsValue = JsString("InternalUse") + + the[DeserializationException] thrownBy { + subDataClassificationEnumEntry.read(jsValue) + } should have message s"For 'SubDataClassification': Expected a value from enum ${SubDataClassification.values} instead of $jsValue" } "parse a valid schema" in { @@ -326,6 +338,7 @@ class TopicMetadataV2ParserSpec extends AnyWordSpecLike with Matchers { streamType, deprecated, dataClassification, + subDataClassification, email, slackChannel, parentSubjects, @@ -351,6 +364,7 @@ class TopicMetadataV2ParserSpec extends AnyWordSpecLike with Matchers { deprecated, None, dataClassification, + subDataClassification, NonEmptyList(email, slackChannel :: Nil), tmv2.createdDate, parentSubjects, @@ -370,6 +384,7 @@ class TopicMetadataV2ParserSpec extends AnyWordSpecLike with Matchers { streamType, _, dataClassification, + subDataClassification, email, slackChannel, _, @@ -396,6 +411,7 @@ class TopicMetadataV2ParserSpec extends AnyWordSpecLike with Matchers { deprecated = false, None, dataClassification, + subDataClassification, NonEmptyList(email, slackChannel :: Nil), tmv2.createdDate, parentSubjects = List(), @@ -445,6 +461,7 @@ class TopicMetadataV2ParserSpec extends AnyWordSpecLike with Matchers { streamType: StreamTypeV2 = StreamTypeV2.Entity, deprecated: Boolean = false, dataClassification: DataClassification = Public, + subDataClassification: Option[SubDataClassification] = Some(SubDataClassification.Public), validAvroSchema: JsValue = validAvroSchema, parentSubjects: List[String] = List(), notes: Option[String] = None, @@ -457,6 +474,7 @@ class TopicMetadataV2ParserSpec extends AnyWordSpecLike with Matchers { StreamTypeV2, Boolean, DataClassification, + Option[SubDataClassification], Email, Slack, List[String], @@ -471,7 +489,8 @@ class TopicMetadataV2ParserSpec extends AnyWordSpecLike with Matchers { | "value": ${validAvroSchema.compactPrint} | }, | "streamType": "${streamType.toString}", - | "dataClassification":"${dataClassification.toString}", + | "dataClassification":"${dataClassification.toString}" + | ${if (subDataClassification.isDefined) s""","subDataClassification": "${subDataClassification.get}"""" else ""}, | "teamName":"$teamName", | "contact": { | "slackChannel": "$slackChannel", @@ -491,6 +510,7 @@ class TopicMetadataV2ParserSpec extends AnyWordSpecLike with Matchers { streamType, deprecated, dataClassification, + subDataClassification, Email.create(email).get, Slack.create(slackChannel).get, parentSubjects, @@ -539,8 +559,12 @@ class TopicMetadataV2ParserSpec extends AnyWordSpecLike with Matchers { StreamTypeV2Format.write(streamType) shouldBe JsString("Entity") } - "serialize a DataClassificationFormat" in { - DataClassificationFormat.write(Public) shouldBe JsString("Public") + "serialize a DataClassification enum" in { + new EnumEntryJsonFormat(DataClassification.values).write(Public) shouldBe JsString("Public") + } + + "serialize a SubDataClassification enum" in { + new EnumEntryJsonFormat(SubDataClassification.values).write(SubDataClassification.InternalUseOnly) shouldBe JsString("InternalUseOnly") } "serialize an avro schema" in { @@ -569,6 +593,7 @@ class TopicMetadataV2ParserSpec extends AnyWordSpecLike with Matchers { val deprecated = false val deprecatedDate = None val dataClassification = Public + val subDataClassification = Some(SubDataClassification.Public) val email = Email.create("some@address.com").get val slack = Slack.create("#valid_slack_channel").get val contact = NonEmptyList(email, slack :: Nil) @@ -591,6 +616,7 @@ class TopicMetadataV2ParserSpec extends AnyWordSpecLike with Matchers { deprecated = deprecated, deprecatedDate, dataClassification = dataClassification, + subDataClassification = subDataClassification, contact = contact, createdDate = createdDate, parentSubjects = parentSubjects, @@ -612,6 +638,7 @@ class TopicMetadataV2ParserSpec extends AnyWordSpecLike with Matchers { streamType, deprecated, dataClassification, + subDataClassification, validAvroSchema, parentSubjects, notes, @@ -628,14 +655,14 @@ class TopicMetadataV2ParserSpec extends AnyWordSpecLike with Matchers { "TopicMetadataV2Format write matches TopicMetadataResponseV2Format write" in { val subject = Subject.createValidated("dvs.valid").get val tmc = TopicMetadataContainer(TopicMetadataV2Key(subject), - TopicMetadataV2Value(StreamTypeV2.Entity, false, None, Public, + TopicMetadataV2Value(StreamTypeV2.Entity, false, None, Public, None, NonEmptyList.one(ContactMethod.create("blah@pluralsight.com").get), Instant.now(), List.empty, None, Some("dvs-teamName"), List.empty, None, None), Some(new SchemaFormat(isKey = true).read(validAvroSchema)), Some(new SchemaFormat(isKey = false).read(validAvroSchema))) val response = TopicMetadataV2Response.fromTopicMetadataContainer(tmc) val request = TopicMetadataV2Request.apply(Schemas(tmc.keySchema.get, tmc.valueSchema.get),tmc.value.streamType, - tmc.value.deprecated,tmc.value.deprecatedDate,tmc.value.dataClassification,tmc.value.contact, + tmc.value.deprecated,tmc.value.deprecatedDate,tmc.value.dataClassification,None,tmc.value.contact, tmc.value.createdDate,tmc.value.parentSubjects,tmc.value.notes, teamName = tmc.value.teamName, None, List.empty, None, None) TopicMetadataV2Format.write(request).compactPrint shouldBe @@ -675,12 +702,12 @@ class TopicMetadataV2ParserSpec extends AnyWordSpecLike with Matchers { val subject = Subject.createValidated("dvs.valid").get val before = Instant.now val tmc = TopicMetadataContainer(TopicMetadataV2Key(subject), - TopicMetadataV2Value(StreamTypeV2.Entity, true, None, - Public, NonEmptyList.one(ContactMethod.create("blah@pluralsight.com").get), Instant.now(), List.empty, None, Some("dvs-teamName"), List.empty, None, None), + TopicMetadataV2Value(StreamTypeV2.Entity, true, None,Public,None, + NonEmptyList.one(ContactMethod.create("blah@pluralsight.com").get), Instant.now(), List.empty, None, Some("dvs-teamName"), List.empty, None, None), Some(new SchemaFormat(isKey = true).read(validAvroSchema)), Some(new SchemaFormat(isKey = false).read(validAvroSchema))) val request = TopicMetadataV2Request.apply(Schemas(tmc.keySchema.get, tmc.valueSchema.get),tmc.value.streamType, - tmc.value.deprecated,tmc.value.deprecatedDate,tmc.value.dataClassification,tmc.value.contact, + tmc.value.deprecated,tmc.value.deprecatedDate,tmc.value.dataClassification,None,tmc.value.contact, tmc.value.createdDate,tmc.value.parentSubjects,tmc.value.notes,tmc.value.teamName, None, List.empty, None, None) val firstDeprecatedDate = TopicMetadataV2Format.read(request.toJson).deprecatedDate.getOrElse(None) firstDeprecatedDate shouldBe None @@ -690,12 +717,12 @@ class TopicMetadataV2ParserSpec extends AnyWordSpecLike with Matchers { val subject = Subject.createValidated("dvs.valid").get val now = Instant.now val tmc = TopicMetadataContainer(TopicMetadataV2Key(subject), - TopicMetadataV2Value(StreamTypeV2.Entity, true, Some(now), - Public, NonEmptyList.one(ContactMethod.create("blah@pluralsight.com").get), Instant.now(), List.empty, None, Some("dvs-teamName"), List.empty, None, None), + TopicMetadataV2Value(StreamTypeV2.Entity, true, Some(now),Public,None, + NonEmptyList.one(ContactMethod.create("blah@pluralsight.com").get), Instant.now(), List.empty, None, Some("dvs-teamName"), List.empty, None, None), Some(new SchemaFormat(isKey = true).read(validAvroSchema)), Some(new SchemaFormat(isKey = false).read(validAvroSchema))) val request = TopicMetadataV2Request.apply(Schemas(tmc.keySchema.get, tmc.valueSchema.get),tmc.value.streamType, - tmc.value.deprecated,tmc.value.deprecatedDate,tmc.value.dataClassification,tmc.value.contact,tmc.value.createdDate, + tmc.value.deprecated,tmc.value.deprecatedDate,tmc.value.dataClassification,None,tmc.value.contact,tmc.value.createdDate, tmc.value.parentSubjects,tmc.value.notes,tmc.value.teamName, None, List.empty, None, None) val firstDeprecatedDate = TopicMetadataV2Format.read(request.toJson).deprecatedDate.get val now2 = Instant.now @@ -706,12 +733,12 @@ class TopicMetadataV2ParserSpec extends AnyWordSpecLike with Matchers { "make sure deprecatedDate works with deprecated false" in { val subject = Subject.createValidated("dvs.valid").get val tmc = TopicMetadataContainer(TopicMetadataV2Key(subject), - TopicMetadataV2Value(StreamTypeV2.Entity, false, None, - Public, NonEmptyList.one(ContactMethod.create("blah@pluralsight.com").get), Instant.now(), List.empty, None, Some("dvs-teamName"), List.empty, None, None), + TopicMetadataV2Value(StreamTypeV2.Entity, false, None, Public, None, + NonEmptyList.one(ContactMethod.create("blah@pluralsight.com").get), Instant.now(), List.empty, None, Some("dvs-teamName"), List.empty, None, None), Some(new SchemaFormat(isKey = true).read(validAvroSchema)), Some(new SchemaFormat(isKey = false).read(validAvroSchema))) val request = TopicMetadataV2Request.apply(Schemas(tmc.keySchema.get, tmc.valueSchema.get),tmc.value.streamType, - tmc.value.deprecated,tmc.value.deprecatedDate,tmc.value.dataClassification,tmc.value.contact, + tmc.value.deprecated,tmc.value.deprecatedDate,tmc.value.dataClassification,None,tmc.value.contact, tmc.value.createdDate,tmc.value.parentSubjects,tmc.value.notes, tmc.value.teamName, None, List.empty, None, None) val firstDeprecatedDate = TopicMetadataV2Format.read(request.toJson).deprecatedDate.getOrElse(None) firstDeprecatedDate shouldBe None diff --git a/ingestors/kafka/src/test/scala/hydra/kafka/services/StreamsManagerActorSpec.scala b/ingestors/kafka/src/test/scala/hydra/kafka/services/StreamsManagerActorSpec.scala index f631b3311..f65acb6f9 100644 --- a/ingestors/kafka/src/test/scala/hydra/kafka/services/StreamsManagerActorSpec.scala +++ b/ingestors/kafka/src/test/scala/hydra/kafka/services/StreamsManagerActorSpec.scala @@ -68,7 +68,7 @@ class StreamsManagerActorSpec val schema = new Schema.Parser().parse(topicMetadataJson) - implicit val format = jsonFormat12(TopicMetadata) + implicit val format = jsonFormat13(TopicMetadata) val formatter = ISODateTimeFormat.basicDateTimeNoMillis() @@ -99,6 +99,7 @@ class StreamsManagerActorSpec | "streamType": "Notification", | "derived": false, | "dataClassification": "Public", + | "subDataClassification": "Public", | "contact": "slackity slack dont talk back", | "additionalDocumentation": "akka://some/path/here.jpggifyo", | "notes": "here are some notes topkek", @@ -223,6 +224,7 @@ class StreamsManagerActorSpec | "streamType": "History", | "derived": false, | "dataClassification": "Public", + | "subDataClassification": "Public", | "contact": "slackity slack dont talk back", | "additionalDocumentation": "akka://some/path/here.jpggifyo", | "notes": "here are some notes topkek", @@ -280,6 +282,7 @@ class StreamsManagerActorSpec | "streamType": "CurrentState", | "derived": false, | "dataClassification": "Public", + | "subDataClassification": "Public", | "contact": "slackity slack dont talk back", | "additionalDocumentation": "akka://some/path/here.jpggifyo", | "notes": "here are some notes topkek", @@ -318,6 +321,7 @@ class StreamsManagerActorSpec | "streamType": "History", | "derived": false, | "dataClassification": "Public", + | "subDataClassification": "Public", | "contact": "slackity slack dont talk back", | "additionalDocumentation": "akka://some/path/here.jpggifyo", | "notes": "here are some notes topkek", @@ -342,6 +346,7 @@ class StreamsManagerActorSpec | "streamType": "History", | "derived": false, | "dataClassification": "Public", + | "subDataClassification": "Public", | "contact": "slackity slack dont talk back", | "additionalDocumentation": "akka://some/path/here.jpggifyo", | "notes": "here are some notes topkek", @@ -376,6 +381,7 @@ class StreamsManagerActorSpec | "streamType": "History", | "derived": false, | "dataClassification": "Public", + | "subDataClassification": "Public", | "contact": "slackity slack dont talk back", | "additionalDocumentation": "akka://some/path/here.jpggifyo", | "notes": "here are some notes topkek", diff --git a/ingestors/kafka/src/test/scala/hydra/kafka/services/TopicBootstrapActorSpec.scala b/ingestors/kafka/src/test/scala/hydra/kafka/services/TopicBootstrapActorSpec.scala index 19504c507..db35142f3 100644 --- a/ingestors/kafka/src/test/scala/hydra/kafka/services/TopicBootstrapActorSpec.scala +++ b/ingestors/kafka/src/test/scala/hydra/kafka/services/TopicBootstrapActorSpec.scala @@ -129,6 +129,7 @@ class TopicBootstrapActorSpec | "streamType": "Notification", | "derived": false, | "dataClassification": "Public", + | "subDataClassification": "Public", | "dataSourceOwner": "BARTON", | "contact": "slackity slack dont talk back", | "psDataLake": false, @@ -184,6 +185,7 @@ class TopicBootstrapActorSpec | "streamType": "Notification", | "derived": false, | "dataClassification": "Public", + | "subDataClassification": "Public", | "dataSourceOwner": "BARTON", | "contact": "slackity slack dont talk back", | "psDataLake": false, @@ -235,6 +237,7 @@ class TopicBootstrapActorSpec | "streamType": "Notification", | "derived": false, | "dataClassification": "Public", + | "subDataClassification": "Public", | "dataSourceOwner": "BARTON", | "contact": "slackity slack dont talk back", | "psDataLake": false, @@ -283,6 +286,7 @@ class TopicBootstrapActorSpec | "streamType": "Notification", | "derived": false, | "dataClassification": "Public", + | "subDataClassification": "Public", | "dataSourceOwner": "BARTON", | "contact": "slackity slack dont talk back", | "psDataLake": false, @@ -333,6 +337,7 @@ class TopicBootstrapActorSpec | "streamType": "Notification", | "derived": false, | "dataClassification": "Public", + | "subDataClassification": "Public", | "dataSourceOwner": "BARTON", | "contact": "slackity slack dont talk back", | "psDataLake": false, @@ -404,6 +409,7 @@ class TopicBootstrapActorSpec | "streamType": "Notification", | "derived": false, | "dataClassification": "Public", + | "subDataClassification": "Public", | "dataSourceOwner": "BARTON", | "contact": "slackity slack dont talk back", | "psDataLake": false, @@ -482,6 +488,7 @@ class TopicBootstrapActorSpec | "streamType": "Notification", | "derived": false, | "dataClassification": "Public", + | "subDataClassification": "Public", | "dataSourceOwner": "BARTON", | "contact": "slackity slack dont talk back", | "psDataLake": false, @@ -606,6 +613,7 @@ class TopicBootstrapActorSpec | "streamType": "Notification", | "derived": false, | "dataClassification": "Public", + | "subDataClassification": "Public", | "dataSourceOwner": "BARTON", | "contact": "slackity slack dont talk back", | "psDataLake": false, @@ -667,6 +675,7 @@ class TopicBootstrapActorSpec | "streamType": "History", | "derived": false, | "dataClassification": "Public", + | "subDataClassification": "Public", | "dataSourceOwner": "BARTON", | "contact": "slackity slack dont talk back", | "psDataLake": false, @@ -722,6 +731,7 @@ class TopicBootstrapActorSpec | "streamType": "History", | "derived": false, | "dataClassification": "Public", + | "subDataClassification": "Public", | "dataSourceOwner": "BARTON", | "contact": "slackity slack dont talk back", | "psDataLake": false, @@ -780,6 +790,7 @@ class MockStreamsManagerActor extends Actor { false, None, "private", + None, "alex", None, None, diff --git a/ingestors/kafka/src/test/scala/hydra/kafka/utils/TopicUtils.scala b/ingestors/kafka/src/test/scala/hydra/kafka/utils/TopicUtils.scala index 86d17e048..493775738 100644 --- a/ingestors/kafka/src/test/scala/hydra/kafka/utils/TopicUtils.scala +++ b/ingestors/kafka/src/test/scala/hydra/kafka/utils/TopicUtils.scala @@ -6,6 +6,7 @@ import cats.implicits._ import hydra.kafka.algebras.MetadataAlgebra.TopicMetadataContainer import hydra.kafka.algebras.TestMetadataAlgebra import hydra.kafka.model.ContactMethod.Email +import hydra.kafka.model.DataClassification.Public import hydra.kafka.model.TopicMetadataV2Request.Subject import hydra.kafka.model._ import org.apache.avro.SchemaBuilder @@ -25,6 +26,7 @@ object TopicUtils { deprecated = false, deprecatedDate = None, Public, + subDataClassification = None, NonEmptyList.of(Email.create("test@test.com").get), Instant.now(), List.empty,