Skip to content

Commit

Permalink
Merge pull request #178 from AbsaOSS/feature/176-upgrade-abris-401
Browse files Browse the repository at this point in the history
#176: Upgrade to Abris 4.0.1
  • Loading branch information
kevinwallimann authored Nov 19, 2020
2 parents b8ecdfc + 36d5688 commit b57cdc9
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,14 @@

package za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent

import io.confluent.kafka.schemaregistry.ParsedSchema
import io.confluent.kafka.schemaregistry.avro.AvroSchema
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient
import org.apache.commons.configuration2.BaseConfiguration
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils
import za.co.absa.abris.avro.read.confluent.SchemaManagerFactory
import za.co.absa.abris.config.AbrisConfig
import za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroDecodingTransformer._
import za.co.absa.hyperdrive.ingestor.implementation.reader.kafka.KafkaStreamReader.KEY_TOPIC
import za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroDecodingTransformer._
import za.co.absa.hyperdrive.ingestor.implementation.utils.AbrisConfigUtil

class TestConfluentAvroDecodingTransformer extends FlatSpec with Matchers with BeforeAndAfter {
Expand All @@ -34,14 +32,14 @@ class TestConfluentAvroDecodingTransformer extends FlatSpec with Matchers with B
private val SchemaRegistryValueSchemaId = "latest"

private var MockSchemaRegistryClient: MockSchemaRegistryClient = _
private val DummySchema = new AvroSchema(AvroSchemaUtils.parse("""{
private val DummySchema = AvroSchemaUtils.parse("""{
"type": "record",
"name": "default_name",
"namespace": "default_namespace",
"fields":[
{"name": "int", "type": ["int", "null"] }
]
}""")).asInstanceOf[ParsedSchema]
}""")

behavior of ConfluentAvroDecodingTransformer.getClass.getSimpleName

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

package za.co.absa.hyperdrive.ingestor.implementation.utils

import io.confluent.kafka.schemaregistry.ParsedSchema
import io.confluent.kafka.schemaregistry.avro.AvroSchema
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient
import org.apache.commons.configuration2.BaseConfiguration
import org.apache.spark.sql.functions._
Expand Down Expand Up @@ -49,8 +47,8 @@ class TestAbrisConfigUtil extends FlatSpec with Matchers with BeforeAndAfter {
]
}"""
}
private val dummyRecordNameSchema = new AvroSchema(AvroSchemaUtils.parse(getSchemaString(recordName, recordNamespace))).asInstanceOf[ParsedSchema]
private val dummyTopicNameSchema = new AvroSchema(AvroSchemaUtils.parse(getSchemaString("topLevelRecord", ""))).asInstanceOf[ParsedSchema]
private val dummyRecordNameSchema = AvroSchemaUtils.parse(getSchemaString(recordName, recordNamespace))
private val dummyTopicNameSchema = AvroSchemaUtils.parse(getSchemaString("topLevelRecord", ""))
private val dummyExpr = struct(lit(null).cast(IntegerType).as(columnName)).expr

private val keyTopic = "kafka.topic"
Expand Down Expand Up @@ -94,7 +92,7 @@ class TestAbrisConfigUtil extends FlatSpec with Matchers with BeforeAndAfter {
val settings = AbrisConfigUtil.getKeyProducerSettings(config, ProducerConfigKeys, dummyExpr)

// then
settings.schemaString shouldBe dummyTopicNameSchema.canonicalString()
settings.schemaString shouldBe dummyTopicNameSchema.toString
settings.schemaId shouldBe Some(1)
mockSchemaRegistryClient.getAllSubjects.asScala should contain theSameElementsAs Seq(s"${topic}-key")
}
Expand All @@ -111,7 +109,7 @@ class TestAbrisConfigUtil extends FlatSpec with Matchers with BeforeAndAfter {
val settings = AbrisConfigUtil.getKeyProducerSettings(config, ProducerConfigKeys, dummyExpr)

// then
settings.schemaString shouldBe dummyRecordNameSchema.canonicalString()
settings.schemaString shouldBe dummyRecordNameSchema.toString
settings.schemaId shouldBe Some(1)
mockSchemaRegistryClient.getAllSubjects.asScala should contain theSameElementsAs Seq(s"$recordNamespace.$recordName")
}
Expand All @@ -128,7 +126,7 @@ class TestAbrisConfigUtil extends FlatSpec with Matchers with BeforeAndAfter {
val settings = AbrisConfigUtil.getKeyProducerSettings(config, ProducerConfigKeys, dummyExpr)

// then
settings.schemaString shouldBe dummyRecordNameSchema.canonicalString()
settings.schemaString shouldBe dummyRecordNameSchema.toString
settings.schemaId shouldBe Some(1)
mockSchemaRegistryClient.getAllSubjects.asScala should contain theSameElementsAs Seq(s"$topic-$recordNamespace.$recordName")
}
Expand All @@ -143,7 +141,7 @@ class TestAbrisConfigUtil extends FlatSpec with Matchers with BeforeAndAfter {
val settings = AbrisConfigUtil.getValueProducerSettings(config, ProducerConfigKeys, dummyExpr)

// then
settings.schemaString shouldBe dummyTopicNameSchema.canonicalString()
settings.schemaString shouldBe dummyTopicNameSchema.toString
settings.schemaId shouldBe Some(1)
mockSchemaRegistryClient.getAllSubjects.asScala should contain theSameElementsAs Seq(s"$topic-value")
}
Expand All @@ -161,7 +159,7 @@ class TestAbrisConfigUtil extends FlatSpec with Matchers with BeforeAndAfter {
val settings = AbrisConfigUtil.getKeyConsumerSettings(config, ConsumerConfigKeys)

// then
settings.schemaString shouldBe dummyTopicNameSchema.canonicalString()
settings.schemaString shouldBe dummyTopicNameSchema.toString
settings.schemaRegistryConf.get shouldBe Map("schema.registry.url" -> dummySchemaRegistryUrl)
}

Expand All @@ -176,7 +174,7 @@ class TestAbrisConfigUtil extends FlatSpec with Matchers with BeforeAndAfter {
{"name": "${columnName}2", "type": ["int", "null"] }
]
}"""
val schema2 = new AvroSchema(AvroSchemaUtils.parse(schema2String)).asInstanceOf[ParsedSchema]
val schema2 = AvroSchemaUtils.parse(schema2String)
mockSchemaRegistryClient.register(s"$topic-key", dummyTopicNameSchema)
mockSchemaRegistryClient.register(s"$topic-key", schema2)
val config = createBaseConfiguration
Expand All @@ -188,7 +186,7 @@ class TestAbrisConfigUtil extends FlatSpec with Matchers with BeforeAndAfter {
val settings = AbrisConfigUtil.getKeyConsumerSettings(config, ConsumerConfigKeys)

// then
settings.schemaString shouldBe schema2.canonicalString()
settings.schemaString shouldBe schema2.toString
settings.schemaRegistryConf.get shouldBe Map("schema.registry.url" -> dummySchemaRegistryUrl)
}

Expand All @@ -207,7 +205,7 @@ class TestAbrisConfigUtil extends FlatSpec with Matchers with BeforeAndAfter {
val settings = AbrisConfigUtil.getKeyConsumerSettings(config, ConsumerConfigKeys)

// then
settings.schemaString shouldBe dummyRecordNameSchema.canonicalString()
settings.schemaString shouldBe dummyRecordNameSchema.toString
settings.schemaRegistryConf.get shouldBe Map("schema.registry.url" -> dummySchemaRegistryUrl)
}

Expand All @@ -226,7 +224,7 @@ class TestAbrisConfigUtil extends FlatSpec with Matchers with BeforeAndAfter {
val settings = AbrisConfigUtil.getKeyConsumerSettings(config, ConsumerConfigKeys)

// then
settings.schemaString shouldBe dummyRecordNameSchema.canonicalString()
settings.schemaString shouldBe dummyRecordNameSchema.toString
settings.schemaRegistryConf.get shouldBe Map("schema.registry.url" -> dummySchemaRegistryUrl)
}

Expand All @@ -243,7 +241,7 @@ class TestAbrisConfigUtil extends FlatSpec with Matchers with BeforeAndAfter {
val settings = AbrisConfigUtil.getValueConsumerSettings(config, ConsumerConfigKeys)

// then
settings.schemaString shouldBe dummyTopicNameSchema.canonicalString()
settings.schemaString shouldBe dummyTopicNameSchema.toString
settings.schemaRegistryConf.get shouldBe Map("schema.registry.url" -> dummySchemaRegistryUrl)
}

Expand Down
6 changes: 3 additions & 3 deletions parent-conf/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@
<properties>
<!--Enforced versions-->
<fasterxml.jackson.databind.version>2.6.7.1</fasterxml.jackson.databind.version> <!--Same as Spark uses-->
<avro.version>1.9.2</avro.version> <!--Same as Abris uses-->
<avro.version>1.8.2</avro.version> <!--Same as Abris uses-->

<!--Maven-->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>

<!--ABRiS-->
<abris.version>4.0.0</abris.version>
<abris.version>4.0.1</abris.version>

<!--Scala-->
<scalatest.version>3.0.5</scalatest.version>
Expand All @@ -65,7 +65,7 @@
<kafka.spark.version>0-10</kafka.spark.version>
<spark.sql.kafka.version>2.4.3</spark.sql.kafka.version>
<testcontainers.kafka.version>1.12.4</testcontainers.kafka.version>
<kafka.avro.serializer.version>5.5.1</kafka.avro.serializer.version>
<kafka.avro.serializer.version>5.3.4</kafka.avro.serializer.version> <!--Same as Abris uses-->

<!--Spark-->
<spark.version>2.4.3</spark.version>
Expand Down

0 comments on commit b57cdc9

Please sign in to comment.