Skip to content

Commit

Permalink
Feature/227 nullable fields (#228)
Browse files Browse the repository at this point in the history
* Refactor schema generation

* Implement schema updating

* Add properties to ConfluentAvroEncodingTransformer to specify optional columns
  • Loading branch information
kevinwallimann authored May 10, 2021
1 parent d154403 commit a388653
Show file tree
Hide file tree
Showing 6 changed files with 266 additions and 27 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,12 @@ The `ConfluentAvroStreamEncodingTransformer` is built on [ABRiS](https://github.
| `transformer.{transformer-id}.value.schema.naming.strategy` | Yes | Subject name strategy of Schema Registry. Possible values are `topic.name`, `record.name` or `topic.record.name`. Equivalent to ABRiS property `SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY` |
| `transformer.{transformer-id}.value.schema.record.name` | Yes for naming strategies `record.name` and `topic.record.name` | Name of the record. Equivalent to ABRiS property `SchemaManager.PARAM_SCHEMA_NAME_FOR_RECORD_STRATEGY` |
| `transformer.{transformer-id}.value.schema.record.namespace` | Yes for naming strategies `record.name` and `topic.record.name` | Namespace of the record. Equivalent to ABRiS property `SchemaManager.PARAM_SCHEMA_NAMESPACE_FOR_RECORD_STRATEGY` |
| `transformer.{transformer-id}.value.optional.fields` | No | Comma-separated list of nullable value columns that should get default value null in the avro schema. Nested columns' names should be concatenated with the dot (`.`) |
| `transformer.{transformer-id}.produce.keys` | No | If set to `true`, keys will be produced according to the properties `key.column.prefix` and `key.column.names` of the [Hyperdrive Context](#hyperdrive-context) |
| `transformer.{transformer-id}.key.schema.naming.strategy` | Yes if `produce.keys` is true | Subject name strategy for key |
| `transformer.{transformer-id}.key.schema.record.name` | Yes for key naming strategies `record.name` and `topic.record.name` | Name of the record. |
| `transformer.{transformer-id}.key.schema.record.namespace` | Yes for key naming strategies `record.name` and `topic.record.name` | Namespace of the record. |
| `transformer.{transformer-id}.key.optional.fields` | No | Comma-separated list of nullable key columns that should get default value null in the avro schema. Nested columns' names should be concatenated with the dot (`.`) |
| `transformer.{transformer-id}.schema.registry.basic.auth.user.info.file` | No | A path to a text file, that contains one line in the form `<username>:<password>`. It will be passed as `basic.auth.user.info` to the schema registry config |

Any additional properties for the schema registry config can be added with the prefix `transformer.{transformer-id}.schema.registry.options.`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent

import org.apache.avro.JsonProperties
import org.apache.commons.configuration2.Configuration
import org.apache.logging.log4j.LogManager
import org.apache.spark.sql.DataFrame
Expand Down Expand Up @@ -74,6 +75,7 @@ private[transformer] class ConfluentAvroEncodingTransformer(
}

object ConfluentAvroEncodingTransformer extends StreamTransformerFactory with ConfluentAvroEncodingTransformerAttributes {
private val logger = LogManager.getLogger

object AbrisConfigKeys extends AbrisProducerConfigKeys {
override val topic: String = KEY_TOPIC
Expand All @@ -93,12 +95,23 @@ object ConfluentAvroEncodingTransformer extends StreamTransformerFactory with Co

def getKeyAvroConfig(config: Configuration, expression: Expression): ToAvroConfig = {
val schemaRegistryConfig = SchemaRegistryConfigUtil.getSchemaRegistryConfig(config)
AbrisConfigUtil.getKeyProducerSettings(config, AbrisConfigKeys, expression, schemaRegistryConfig)
val newDefaultValues = ConfigUtils.getSeqOrNone(KEY_KEY_OPTIONAL_FIELDS, config)
.map(optionalFields => optionalFields.map(_ -> JsonProperties.NULL_VALUE).toMap)
.getOrElse(Map())

val schema = AbrisConfigUtil.generateSchema(config, AbrisConfigKeys, expression, newDefaultValues)
logger.info(s"Generated key schema\n${schema.toString(true)}")
AbrisConfigUtil.getKeyProducerSettings(config, AbrisConfigKeys, schema, schemaRegistryConfig)
}

def getValueAvroConfig(config: Configuration, expression: Expression): ToAvroConfig = {
val schemaRegistryConfig = SchemaRegistryConfigUtil.getSchemaRegistryConfig(config)
AbrisConfigUtil.getValueProducerSettings(config, AbrisConfigKeys, expression, schemaRegistryConfig)
val newDefaultValues = ConfigUtils.getSeqOrNone(KEY_VALUE_OPTIONAL_FIELDS, config)
.map(optionalFields => optionalFields.map(_ -> JsonProperties.NULL_VALUE).toMap)
.getOrElse(Map())
val schema = AbrisConfigUtil.generateSchema(config, AbrisConfigKeys, expression, newDefaultValues)
logger.info(s"Generated value schema\n${schema.toString(true)}")
AbrisConfigUtil.getValueProducerSettings(config, AbrisConfigKeys, schema, schemaRegistryConfig)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ trait ConfluentAvroEncodingTransformerAttributes extends HasComponentAttributes
val KEY_SCHEMA_REGISTRY_VALUE_NAMING_STRATEGY = "value.schema.naming.strategy"
val KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAME = "value.schema.record.name"
val KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAMESPACE = "value.schema.record.namespace"
val KEY_VALUE_OPTIONAL_FIELDS = "value.optional.fields"

val KEY_PRODUCE_KEYS = "produce.keys"
val KEY_SCHEMA_REGISTRY_KEY_NAMING_STRATEGY = "key.schema.naming.strategy"
val KEY_SCHEMA_REGISTRY_KEY_RECORD_NAME = "key.schema.record.name"
val KEY_SCHEMA_REGISTRY_KEY_RECORD_NAMESPACE = "key.schema.record.namespace"
val KEY_KEY_OPTIONAL_FIELDS = "key.optional.fields"

override def getName: String = "Confluent Avro Stream Encoder"

Expand All @@ -40,12 +42,14 @@ trait ConfluentAvroEncodingTransformerAttributes extends HasComponentAttributes
Some("Record name for naming strategies record.name or topic.record.name"), required = false),
KEY_SCHEMA_REGISTRY_VALUE_RECORD_NAMESPACE -> PropertyMetadata("Value-Record namespace",
Some("Record namespace for naming strategies record.name or topic.record.name"), required = false),
KEY_VALUE_OPTIONAL_FIELDS -> PropertyMetadata("Value-Record optional fields", Some("Comma-separated list of nullable value columns that should get default value null in the avro schema"), required = false),

KEY_SCHEMA_REGISTRY_KEY_NAMING_STRATEGY -> PropertyMetadata("Key-Schema naming strategy",
Some("Subject name strategy of Schema Registry. Must be one of \"topic.name\", \"record.name\" or \"topic.record.name\""), required = false),
KEY_SCHEMA_REGISTRY_KEY_RECORD_NAME -> PropertyMetadata("Key-Record name", Some("Key-Record name for naming strategies record.name or topic.record.name"), required = false),
KEY_SCHEMA_REGISTRY_KEY_RECORD_NAMESPACE -> PropertyMetadata("Key-Record namespace", Some("Key-Record namespace for naming strategies record.name or topic.record.name"), required = false),
KEY_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO_FILE -> PropertyMetadata("Basic auth user info file", Some("Text file containing one line in the form <username>:<password> for basic auth in schema registry"), required = false)
KEY_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO_FILE -> PropertyMetadata("Basic auth user info file", Some("Text file containing one line in the form <username>:<password> for basic auth in schema registry"), required = false),
KEY_KEY_OPTIONAL_FIELDS -> PropertyMetadata("Key-Record optional fields", Some("Comma-separated list of nullable key columns that should get default value null in the avro schema"), required = false)
)

override def getExtraConfigurationPrefix: Option[String] = Some(KEY_SCHEMA_REGISTRY_EXTRA_CONFS_ROOT)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package za.co.absa.hyperdrive.ingestor.implementation.utils

import org.apache.avro.{JsonProperties, Schema}
import org.apache.commons.configuration2.Configuration
import org.apache.spark.sql.avro.SchemaConverters.toAvroType
import org.apache.spark.sql.catalyst.expressions.Expression
Expand Down Expand Up @@ -63,32 +64,27 @@ private[hyperdrive] object AbrisConfigUtil {
fromSchemaRegisteringConfigFragment.usingSchemaRegistry(schemaRegistryConfig)
}

def getKeyProducerSettings(configuration: Configuration, configKeys: AbrisProducerConfigKeys, expression: Expression,
def getKeyProducerSettings(configuration: Configuration, configKeys: AbrisProducerConfigKeys, schema: Schema,
schemaRegistryConfig: Map[String, String]): ToAvroConfig =
getProducerSettings(configuration, configKeys, isKey = true, expression, schemaRegistryConfig)
getProducerSettings(configuration, configKeys, isKey = true, schema, schemaRegistryConfig)

def getValueProducerSettings(configuration: Configuration, configKeys: AbrisProducerConfigKeys, expression: Expression,
def getValueProducerSettings(configuration: Configuration, configKeys: AbrisProducerConfigKeys, schema: Schema,
schemaRegistryConfig: Map[String, String]): ToAvroConfig =
getProducerSettings(configuration, configKeys, isKey = false, expression, schemaRegistryConfig)
getProducerSettings(configuration, configKeys, isKey = false, schema, schemaRegistryConfig)

private def getProducerSettings(configuration: Configuration, configKeys: AbrisProducerConfigKeys, isKey: Boolean,
expression: Expression, schemaRegistryConfig: Map[String, String]): ToAvroConfig = {
schema: Schema, schemaRegistryConfig: Map[String, String]): ToAvroConfig = {
val schemaManager = SchemaManagerFactory.create(schemaRegistryConfig)
val topic = getTopic(configuration, configKeys)
val namingStrategy = getNamingStrategy(configuration, configKeys)
val schemaId = namingStrategy match {
case TopicNameStrategy =>
val schema = toAvroType(expression.dataType, expression.nullable)
val subject = SchemaSubject.usingTopicNameStrategy(topic, isKey)
schemaManager.register(subject, schema)
case RecordNameStrategy =>
val schema = toAvroType(expression.dataType, expression.nullable, getRecordName(configuration, configKeys),
getRecordNamespace(configuration, configKeys))
val subject = SchemaSubject.usingRecordNameStrategy(schema)
schemaManager.register(subject, schema)
case TopicRecordNameStrategy =>
val schema = toAvroType(expression.dataType, expression.nullable, getRecordName(configuration, configKeys),
getRecordNamespace(configuration, configKeys))
val subject = SchemaSubject.usingTopicRecordNameStrategy(topic, schema)
schemaManager.register(subject, schema)
case _ => throw new IllegalArgumentException("Naming strategy must be one of topic.name, record.name or topic.record.name")
Expand All @@ -100,6 +96,75 @@ private[hyperdrive] object AbrisConfigUtil {
.usingSchemaRegistry(schemaRegistryConfig)
}

/**
* Generates an avro schema given a Spark expression. Record name and namespace are derived according to the
* configured naming strategy. Default values for the avro schema can be passed using a key-value map. The keys
* need to correspond to the field names. In case of nested structs, nested field names should be concatenated
* using the dot (.), e.g. "parent.childField.subChildField". Note that dots in avro field names are not allowed.
*/
def generateSchema(configuration: Configuration, configKeys: AbrisProducerConfigKeys, expression: Expression,
newDefaultValues: Map[String, Object]): Schema = {
val namingStrategy = getNamingStrategy(configuration, configKeys)
val initialSchema = namingStrategy match {
case TopicNameStrategy => toAvroType(expression.dataType, expression.nullable)
case x if x == RecordNameStrategy || x == TopicRecordNameStrategy => toAvroType(expression.dataType,
expression.nullable, getRecordName(configuration, configKeys), getRecordNamespace(configuration, configKeys))
case _ => throw new IllegalArgumentException("Naming strategy must be one of topic.name, record.name or topic.record.name")
}

updateSchema(initialSchema, newDefaultValues)
}

/**
* This method is intended to update schemas created by [[org.apache.spark.sql.avro.SchemaConverters.toAvroType]] with
* new default values.
* Apart from the basic types, it only supports the complex types Record, Map and Array. New default values for Enum
* or Fixed cannot be assigned. Updating default values for the union type is only supported for a union with null.
* The correct order of arbitrary unions with respect to the given default value is not guaranteed.
*/
private def updateSchema(schema: Schema, newDefaultValues: Map[String, Object], fieldPrefix: String = ""): Schema = {
val prefixSeparator = if (fieldPrefix.isEmpty) "" else "."
import scala.collection.JavaConverters._
schema.getType match {
case Schema.Type.UNION =>
val newSchemas = schema.getTypes.asScala.map(t =>
updateSchema(t, newDefaultValues, fieldPrefix)
)
Schema.createUnion(newSchemas.asJava)
case Schema.Type.RECORD =>
val newFields = schema.getFields.asScala.map(f => {
val fullFieldName = s"$fieldPrefix$prefixSeparator${f.name()}"
val defaultValue = newDefaultValues.getOrElse(fullFieldName, f.defaultVal())
val newSchema = updateSchema(f.schema(), newDefaultValues, fullFieldName)
val newSchemaReordered = reorderUnionTypesForDefaultValueNull(newSchema, defaultValue)
new Schema.Field(f.name(), newSchemaReordered, f.doc(), defaultValue, f.order())
})
Schema.createRecord(schema.getName, schema.getDoc, schema.getNamespace, schema.isError, newFields.asJava)
case Schema.Type.ARRAY =>
val newSchema = updateSchema(schema.getElementType, newDefaultValues, fieldPrefix)
Schema.createArray(newSchema)
case Schema.Type.MAP =>
val newSchema = updateSchema(schema.getValueType, newDefaultValues, fieldPrefix)
Schema.createMap(newSchema)
case _ => schema
}
}

private def reorderUnionTypesForDefaultValueNull(schema: Schema, defaultValue: Object) = {
import scala.collection.JavaConverters._
lazy val schemaTypes = schema.getTypes.asScala
if (schema.getType == Schema.Type.UNION &&
schemaTypes.size == 2 &&
schemaTypes.head.getType != Schema.Type.NULL &&
schemaTypes(1).getType == Schema.Type.NULL &&
defaultValue.isInstanceOf[JsonProperties.Null]
) {
Schema.createUnion(Schema.create(Schema.Type.NULL), schemaTypes.head)
} else {
schema
}
}

private def getTopic(configuration: Configuration, configKeys: AbrisConfigKeys): String =
getOrThrow(configKeys.topic, configuration, errorMessage = s"Topic not found. Is '${configKeys.topic}' properly set?")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,22 @@

package za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent

import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient
import org.apache.commons.configuration2.BaseConfiguration
import org.apache.commons.configuration2.convert.DefaultListDelimiterHandler
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.functions.{array, lit, map, struct}
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.{BooleanType, IntegerType, StringType, StructField, StructType}
import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils
import za.co.absa.abris.avro.read.confluent.SchemaManagerFactory
import za.co.absa.abris.config.AbrisConfig
import za.co.absa.commons.spark.SparkTestBase
import za.co.absa.hyperdrive.ingestor.api.context.HyperdriveContext
import za.co.absa.hyperdrive.ingestor.implementation.HyperdriveContextKeys
import za.co.absa.hyperdrive.ingestor.implementation.testutils.HyperdriveMockSchemaRegistryClient
import za.co.absa.hyperdrive.ingestor.implementation.transformer.avro.confluent.ConfluentAvroEncodingTransformer._
import za.co.absa.hyperdrive.ingestor.implementation.utils.AbrisConfigUtil
Expand All @@ -31,11 +40,11 @@ class TestConfluentAvroEncodingTransformer extends FlatSpec with Matchers with B

private val topic = "topic"
private val SchemaRegistryURL = "http://localhost:8081"

private var mockSchemaRegistryClient: MockSchemaRegistryClient = _
behavior of ConfluentAvroEncodingTransformer.getClass.getSimpleName

before {
val mockSchemaRegistryClient = new HyperdriveMockSchemaRegistryClient()
mockSchemaRegistryClient = new HyperdriveMockSchemaRegistryClient()
SchemaManagerFactory.resetSRClientInstance()
SchemaManagerFactory.addSRClientInstance(Map(AbrisConfig.SCHEMA_REGISTRY_URL -> SchemaRegistryURL), mockSchemaRegistryClient)
}
Expand All @@ -52,7 +61,7 @@ class TestConfluentAvroEncodingTransformer extends FlatSpec with Matchers with B
encoder.withKey shouldBe false
}

it should "encode the values" in {
"transform" should "encode the values" in {
// given
import spark.implicits._
val queryName = "dummyQuery"
Expand Down Expand Up @@ -83,4 +92,78 @@ class TestConfluentAvroEncodingTransformer extends FlatSpec with Matchers with B
val byteArrays = outputDf.select("value").map(_ (0).asInstanceOf[Array[Byte]]).collect()
byteArrays.distinct.length shouldBe byteArrays.length
}

it should "register a schema with optional fields" in {
// given
val schema = StructType(Seq(
StructField("key__col1", IntegerType, nullable = true),
StructField("col2", StringType, nullable = true),
StructField("col3", StructType(
Seq(StructField("subCol1", StringType, nullable = true))
), nullable = true)
)
)
HyperdriveContext.put(HyperdriveContextKeys.keyColumnPrefix, "key__")
HyperdriveContext.put(HyperdriveContextKeys.keyColumnNames, Seq("col1"))
val memoryStream = new MemoryStream[Row](1, spark.sqlContext)(RowEncoder(schema))

val config = new BaseConfiguration()
config.setListDelimiterHandler(new DefaultListDelimiterHandler(','))
config.addProperty(KafkaStreamWriter.KEY_TOPIC, topic)
config.addProperty(KEY_SCHEMA_REGISTRY_URL, SchemaRegistryURL)
config.addProperty(KEY_SCHEMA_REGISTRY_VALUE_NAMING_STRATEGY, AbrisConfigUtil.TopicNameStrategy)
config.addProperty(KEY_PRODUCE_KEYS, "true")
config.addProperty(KEY_KEY_OPTIONAL_FIELDS, "col1")
config.addProperty(KEY_VALUE_OPTIONAL_FIELDS, "col2, col3, col3.subCol1")
val encoder = ConfluentAvroEncodingTransformer(config)

val expectedKeySchemaString = {
raw"""{
| "type" : "record",
| "name" : "topLevelRecord",
| "fields" : [ {
| "name" : "col1",
| "type" : [ "null", "int" ],
| "default" : null
| } ]
|}
|""".stripMargin
}
val expectedKeySchema = AvroSchemaUtils.parse(expectedKeySchemaString)

val expectedValueSchemaString =
raw"""{
| "type" : "record",
| "name" : "topLevelRecord",
| "fields" : [ {
| "name" : "col2",
| "type" : [ "null", "string" ],
| "default" : null
| }, {
| "name" : "col3",
| "type" : [ "null", {
| "type" : "record",
| "name" : "col3",
| "namespace" : "topLevelRecord",
| "fields" : [ {
| "name" : "subCol1",
| "type" : [ "null", "string" ],
| "default" : null
| } ]
| } ],
| "default" : null
| } ]
|}
|""".stripMargin
val expectedValueSchema = AvroSchemaUtils.parse(expectedValueSchemaString)

// when
encoder.transform(memoryStream.toDF())

// then
val keySchema = mockSchemaRegistryClient.getLatestSchemaMetadata(s"$topic-key")
keySchema.getSchema shouldBe expectedKeySchema.toString
val valueSchema = mockSchemaRegistryClient.getLatestSchemaMetadata(s"$topic-value")
valueSchema.getSchema shouldBe expectedValueSchema.toString
}
}
Loading

0 comments on commit a388653

Please sign in to comment.