Skip to content
This repository has been archived by the owner on Nov 15, 2024. It is now read-only.

Commit

Permalink
Accept old DataClassification values in the request
Browse files Browse the repository at this point in the history
  • Loading branch information
aman-minz committed Jan 10, 2024
1 parent 99c9952 commit 70ac672
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 54 deletions.
27 changes: 26 additions & 1 deletion avro/src/main/java/com/pluralsight/hydra/avro/JsonConverter.java
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,10 @@ private Object typeConvert(Object value, String name, Schema schema) throws IOEx
case STRING:
return value.toString();
case ENUM:
boolean valid = schema.getEnumSymbols().contains(value);
if (name.equals("dataClassification")) {
value = adaptOldDataClassificationValue(value);
}
boolean valid = schema.getEnumSymbols().contains(value.toString());
if (!valid)
throw new IllegalArgumentException(value + " is not a valid symbol. Possible values are: " +
schema.getEnumSymbols() + ".");
Expand Down Expand Up @@ -335,4 +338,26 @@ private Map cleanAvro(Map<String, Object> oldRaw) {
}
return newMap;
}

private String adaptOldDataClassificationValue(Object oldValue) {
String newValue;

switch (oldValue.toString()) {
case "InternalUseOnly":
newValue = "InternalUse";
break;
case "ConfidentialPII":
newValue = "Confidential";
break;
case "RestrictedFinancial":
case "RestrictedEmployeeData":
newValue = "Restricted";
break;
default:
newValue = oldValue.toString();
break;
}

return newValue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,11 @@ sealed trait TopicMetadataV2Parser
case x => deserializationError(x)
}

private def deserializationError(value: JsValue) = throw DeserializationException(s"Expected a value from enum $values instead of $value")
private def deserializationError(value: JsValue) = {
val className = if (values.nonEmpty) values.head.getClass.getEnclosingClass.getSimpleName else ""
throw DeserializationException(
s"For '$className': Expected a value from enum $values instead of $value")
}
}

implicit val additionalValidationFormat: EnumEntryJsonFormat[AdditionalValidation] =
Expand Down Expand Up @@ -338,19 +342,18 @@ sealed trait TopicMetadataV2Parser
} else {
toResult(None)
}

val payloadDataClassification = j.getFields("dataClassification").headOption
val adaptedDataClassification = adaptOldDataClassificationValue(payloadDataClassification)
val dataClassification = toResult(
new EnumEntryJsonFormat[DataClassification](DataClassification.values).read(
j.getFields("dataClassification")
.headOption
.getOrElse(
throwDeserializationError("dataClassification", "String")
)
)
)
adaptedDataClassification.getOrElse(throwDeserializationError("dataClassification", "String"))))

val maybeSubDataClassification = pickSubDataClassificationValue(
j.getFields("subDataClassification").headOption, payloadDataClassification)
val subDataClassification = toResult(
j.getFields("subDataClassification").headOption map {
new EnumEntryJsonFormat[SubDataClassification](SubDataClassification.values).read
})
maybeSubDataClassification.map(new EnumEntryJsonFormat[SubDataClassification](SubDataClassification.values).read))

val contact = toResult(
ContactFormat.read(
j.getFields("contact")
Expand Down Expand Up @@ -407,6 +410,23 @@ sealed trait TopicMetadataV2Parser
toResult(None) // Never pick additionalValidations from the request.
).mapN(MetadataOnlyRequest.apply)
}

private def adaptOldDataClassificationValue(dataClassification: Option[JsValue]): Option[JsValue] = dataClassification map {
case JsString("InternalUseOnly") => JsString("InternalUse")
case JsString("ConfidentialPII") => JsString("Confidential")
case JsString("RestrictedFinancial") => JsString("Restricted")
case JsString("RestrictedEmployeeData") => JsString("Restricted")
case other: JsValue => other
}

private def pickSubDataClassificationValue(payloadSubDataClassification: Option[JsValue],
payloadDataClassification: Option[JsValue]): Option[JsValue] =
if (payloadSubDataClassification.isEmpty &&
payloadDataClassification.exists(dc => SubDataClassification.values.exists(_.entryName == dc.compactPrint))) {
payloadDataClassification
} else {
payloadSubDataClassification
}
}

implicit object MaybeSchemasFormat extends RootJsonFormat[MaybeSchemas] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import hydra.core.http.security.entity.AwsConfig
import hydra.core.http.security.{AccessControlService, AwsSecurityService}
import hydra.core.protocol.{Ingest, IngestorCompleted, IngestorError}
import hydra.kafka.marshallers.HydraKafkaJsonSupport
import hydra.kafka.model.TopicMetadata
import hydra.kafka.model.{DataClassification, SubDataClassification, TopicMetadata}
import hydra.kafka.producer.AvroRecord
import hydra.kafka.services.StreamsManagerActor
import hydra.kafka.util.KafkaUtils
Expand Down Expand Up @@ -120,6 +120,34 @@ class BootstrapEndpointSpec
)
}

def dataClassificationRequest(dataClassification: String = "Public", subDataClassification: Option[String] = None): HttpEntity.Strict = HttpEntity(
ContentTypes.`application/json`,
s"""{
| "streamType": "Notification",
| "derived": false,
| "dataClassification": "$dataClassification",
| ${if (subDataClassification.isDefined) s""""subDataClassification": "${subDataClassification.get}",""" else ""}
| "dataSourceOwner": "BARTON",
| "contact": "slackity slack dont talk back",
| "psDataLake": false,
| "additionalDocumentation": "akka://some/path/here.jpggifyo",
| "notes": "here are some notes topkek",
| "schema": {
| "namespace": "exp.assessment",
| "name": "SkillAssessmentTopicsScored",
| "type": "record",
| "version": 1,
| "fields": [
| {
| "name": "testField",
| "type": "string"
| }
| ]
| },
| "notificationUrl": "notification.url"
|}""".stripMargin
)

"The bootstrap endpoint" should {
"list streams" in {

Expand Down Expand Up @@ -457,5 +485,41 @@ class BootstrapEndpointSpec
}
}
}

DataClassification.values foreach { dc =>
s"$dc: accept valid DataClassification value" in {
Post("/streams", dataClassificationRequest(dc.entryName)) ~> bootstrapRoute ~> check {
status shouldBe StatusCodes.OK
}
}
}

SubDataClassification.values foreach { dc =>
s"$dc: accept deprecated valid DataClassification value" in {
Post("/streams", dataClassificationRequest(dc.entryName)) ~> bootstrapRoute ~> check {
status shouldBe StatusCodes.OK
}
}
}

SubDataClassification.values foreach { value =>
s"$value: accept valid SubDataClassification values" in {
Post("/streams", dataClassificationRequest(value.entryName, Some(value.entryName))) ~> bootstrapRoute ~> check {
status shouldBe StatusCodes.OK
}
}
}

"reject invalid DataClassification metadata value" in {
Post("/streams", dataClassificationRequest("junk")) ~> bootstrapRoute ~> check {
status shouldBe StatusCodes.BadRequest
}
}

"reject invalid SubDataClassification metadata value" in {
Post("/streams", dataClassificationRequest(subDataClassification = Some("junk"))) ~> bootstrapRoute ~> check {
status shouldBe StatusCodes.BadRequest
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,25 @@ import hydra.core.http.CorsSupport
import hydra.core.http.security.entity.AwsConfig
import hydra.core.http.security.{AccessControlService, AwsSecurityService}
import hydra.kafka.algebras.RetryableFs2Stream.RetryPolicy.Once
import hydra.kafka.algebras.{HydraTag, KafkaAdminAlgebra, KafkaClientAlgebra, MetadataAlgebra, TagsAlgebra}
import hydra.kafka.algebras._
import hydra.kafka.consumer.KafkaConsumerProxy
import hydra.kafka.consumer.KafkaConsumerProxy.{GetPartitionInfo, ListTopics, ListTopicsResponse, PartitionInfoResponse}
import hydra.kafka.marshallers.HydraKafkaJsonSupport
import hydra.kafka.model.RequiredField
import hydra.kafka.model.{DataClassification, RequiredField, SubDataClassification}
import hydra.kafka.model.TopicMetadataV2Request.Subject
import org.typelevel.log4cats.SelfAwareStructuredLogger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import hydra.kafka.programs.CreateTopicProgram
import hydra.kafka.util.KafkaUtils.TopicDetails
import net.manub.embeddedkafka.{EmbeddedKafka, EmbeddedKafkaConfig}
import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder}
import org.apache.kafka.common.{Node, PartitionInfo}
import org.scalatest.BeforeAndAfterAll
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpecLike

import scala.concurrent.ExecutionContext
import hydra.kafka.programs.CreateTopicProgram
import hydra.kafka.util.KafkaUtils.TopicDetails
import org.apache.avro.{LogicalTypes, Schema, SchemaBuilder}
import org.typelevel.log4cats.SelfAwareStructuredLogger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import retry.{RetryPolicies, RetryPolicy}

import java.time.Instant
import scala.concurrent.ExecutionContext


class TopicMetadataEndpointSpec
Expand All @@ -49,12 +47,11 @@ class TopicMetadataEndpointSpec
with ConfigSupport
with NotificationsTestSuite {

import ConfigSupport._
import spray.json._

import scala.concurrent.duration._

import ConfigSupport._

implicit private def unsafeLogger[F[_]: Sync]: SelfAwareStructuredLogger[F] =
Slf4jLogger.getLogger[F]

Expand Down Expand Up @@ -327,29 +324,12 @@ class TopicMetadataEndpointSpec
| "tags": []
|}""".stripMargin

val invalidDataClassificationRequest =
"""{
def dataClassificationRequest(dataClassification: String = "Public", subDataClassification: Option[String] = None) =
s"""{
| "streamType": "Event",
| "deprecated": true,
| "dataClassification": "InternalUseOnly",
| "subDataClassification": "InternalUseOnly",
| "contact": {
| "email": "[email protected]"
| },
| "createdDate": "2020-02-02T12:34:56Z",
| "notes": "here are some notes",
| "parentSubjects": [],
| "teamName": "dvs-teamName",
| "tags": ["Source: DVS"],
| "notificationUrl": "testnotification.url"
|}""".stripMargin

val invalidSubDataClassificationRequest =
"""{
| "streamType": "Event",
| "deprecated": true,
| "dataClassification": "InternalUseOnly",
| "subDataClassification": "InternalUse",
| "dataClassification": "$dataClassification",
| ${if (subDataClassification.isDefined) s""""subDataClassification": "${subDataClassification.get}",""" else ""}
| "contact": {
| "email": "[email protected]"
| },
Expand Down Expand Up @@ -386,16 +366,43 @@ class TopicMetadataEndpointSpec
}
}

"reject invalid DataClassification metadata" in {
Put("/v2/metadata/dvs.test.subject", HttpEntity(ContentTypes.`application/json`, invalidDataClassificationRequest)) ~> route ~> check {
"reject invalid DataClassification metadata value" in {
Put("/v2/metadata/dvs.test.subject", HttpEntity(ContentTypes.`application/json`,
dataClassificationRequest("junk"))) ~> route ~> check {
rejection shouldBe a[MalformedRequestContentRejection]
}
}

"reject invalid SubDataClassification metadata" in {
Put("/v2/metadata/dvs.test.subject", HttpEntity(ContentTypes.`application/json`, invalidSubDataClassificationRequest)) ~> route ~> check {
"reject invalid SubDataClassification metadata value" in {
Put("/v2/metadata/dvs.test.subject", HttpEntity(ContentTypes.`application/json`,
dataClassificationRequest(subDataClassification = Some("junk")))) ~> route ~> check {
rejection shouldBe a[MalformedRequestContentRejection]
}
}

DataClassification.values foreach { dc =>
s"$dc: accept valid DataClassification value" in {
Put("/v2/metadata/dvs.test.subject", HttpEntity(ContentTypes.`application/json`, dataClassificationRequest(dc.entryName))) ~> route ~> check {
response.status shouldBe StatusCodes.OK
}
}
}

SubDataClassification.values foreach { dc =>
s"$dc: accept deprecated valid DataClassification value" in {
Put("/v2/metadata/dvs.test.subject", HttpEntity(ContentTypes.`application/json`, dataClassificationRequest(dc.entryName))) ~> route ~> check {
response.status shouldBe StatusCodes.OK
}
}
}

SubDataClassification.values foreach { value =>
s"$value: accept valid SubDataClassification values" in {
Put("/v2/metadata/dvs.test.subject", HttpEntity(ContentTypes.`application/json`,
dataClassificationRequest(dataClassification = value.entryName, subDataClassification = Some(value.entryName)))) ~> route ~> check {
response.status shouldBe StatusCodes.OK
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -223,13 +223,13 @@ class TopicMetadataV2ParserSpec extends AnyWordSpecLike with Matchers {
dataClassificationEnumEntry.read(JsString("Restricted")) shouldBe Restricted
}

"throw error when parsing DataClassification" in {
"throw error when parsing invalid DataClassification" in {
val dataClassificationEnumEntry = new EnumEntryJsonFormat(DataClassification.values)
val jsValue = JsString("junk")

the[DeserializationException] thrownBy {
dataClassificationEnumEntry.read(jsValue)
} should have message s"Expected a value from enum $values instead of $jsValue"
} should have message s"For 'DataClassification': Expected a value from enum $values instead of $jsValue"
}

"parse SubDataClassification enum" in {
Expand All @@ -241,13 +241,13 @@ class TopicMetadataV2ParserSpec extends AnyWordSpecLike with Matchers {
subDataClassificationEnumEntry.read(JsString("RestrictedEmployeeData")) shouldBe SubDataClassification.RestrictedEmployeeData
}

"throw error when parsing SubDataClassification" in {
"throw error when parsing invalid SubDataClassification" in {
val subDataClassificationEnumEntry = new EnumEntryJsonFormat(SubDataClassification.values)
val jsValue = JsString("InternalUse")

the[DeserializationException] thrownBy {
subDataClassificationEnumEntry.read(jsValue)
} should have message s"Expected a value from enum ${SubDataClassification.values} instead of $jsValue"
} should have message s"For 'SubDataClassification': Expected a value from enum ${SubDataClassification.values} instead of $jsValue"
}

"parse a valid schema" in {
Expand Down

0 comments on commit 70ac672

Please sign in to comment.