Skip to content

Commit

Permalink
Add option to disable nullability preservation (#216)
Browse files Browse the repository at this point in the history
* Add option to disable nullability preservation

* Fix for scala 2.11
  • Loading branch information
kevinwallimann authored Apr 13, 2021
1 parent 28b8d81 commit bc9437c
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 6 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ The `ConfluentAvroStreamDecodingTransformer` is built on [ABRiS](https://github.
| `transformer.{transformer-id}.key.schema.record.name` | Yes for key naming strategies `record.name` and `topic.record.name` | Name of the record. |
| `transformer.{transformer-id}.key.schema.record.namespace` | Yes for key naming strategies `record.name` and `topic.record.name` | Namespace of the record. |
| `transformer.{transformer-id}.keep.columns` | No | Comma-separated list of columns to keep (e.g. offset, partition) |
| `transformer.{transformer-id}.disable.nullability.preservation` | No | Set to true to ignore fix [#137](https://github.com/AbsaOSS/hyperdrive/issues/137) and to keep the same behaviour as for versions prior to and including v3.2.2. Default value: `false` |

For detailed information on the subject name strategy, please take a look at the [Schema Registry Documentation](https://docs.confluent.io/current/schema-registry/).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,8 @@ import za.co.absa.hyperdrive.ingestor.implementation.utils.{AbrisConfigUtil, Sch
private[transformer] class ConfluentAvroDecodingTransformer(
val valueAvroConfig: FromAvroConfig,
val keyAvroConfigOpt: Option[FromAvroConfig],
val keepColumns: Seq[String]
val keepColumns: Seq[String],
val disableNullabilityPreservation: Boolean
)
extends StreamTransformer {

Expand Down Expand Up @@ -96,9 +97,13 @@ private[transformer] class ConfluentAvroDecodingTransformer(
}

private def setColumnNonNullable(dataFrame: DataFrame, columnName: String) = {
dataFrame
.filter(col(columnName).isNotNull)
.withColumn(columnName, new Column(AssertNotNull(col(columnName).expr)))
if (disableNullabilityPreservation) {
dataFrame
} else {
dataFrame
.filter(col(columnName).isNotNull)
.withColumn(columnName, new Column(AssertNotNull(col(columnName).expr)))
}
}

}
Expand All @@ -125,11 +130,12 @@ object ConfluentAvroDecodingTransformer extends StreamTransformerFactory with Co
None
}
val keepColumns = ConfigUtils.getSeqOrNone(KEY_KEEP_COLUMNS, config).getOrElse(Seq())
val disableNullabilityPreservation = ConfigUtils.getOptionalBoolean(KEY_DISABLE_NULLABILITY_PRESERVATION, config).getOrElse(false)
LogManager.getLogger.info(
s"Going to create ConfluentAvroDecodingTransformer instance using " +
s"value avro config='$valueAvroConfig', key avro config='$keyAvroConfigOpt', keepColumns='$keepColumns'")

new ConfluentAvroDecodingTransformer(valueAvroConfig, keyAvroConfigOpt, keepColumns)
new ConfluentAvroDecodingTransformer(valueAvroConfig, keyAvroConfigOpt, keepColumns, disableNullabilityPreservation)
}

override def getMappingFromRetainedGlobalConfigToLocalConfig(globalConfig: Configuration): Map[String, String] = Map(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ trait ConfluentAvroDecodingTransformerAttributes extends HasComponentAttributes
val KEY_SCHEMA_REGISTRY_KEY_RECORD_NAMESPACE = "key.schema.record.namespace"

val KEY_KEEP_COLUMNS = "keep.columns"
val KEY_DISABLE_NULLABILITY_PRESERVATION = "disable.nullability.preservation"

override def getName: String = "Confluent Avro Stream Decoder"

Expand All @@ -52,6 +53,7 @@ trait ConfluentAvroDecodingTransformerAttributes extends HasComponentAttributes
Some("Subject name strategy of Schema Registry. Must be one of \"topic.name\", \"record.name\" or \"topic.record.name\""), required = false),
KEY_SCHEMA_REGISTRY_KEY_RECORD_NAME -> PropertyMetadata("Key-Record name", Some("Key-Record name for naming strategies record.name or topic.record.name"), required = false),
KEY_SCHEMA_REGISTRY_KEY_RECORD_NAMESPACE -> PropertyMetadata("Key-Record namespace", Some("Key-Record namespace for naming strategies record.name or topic.record.name"), required = false),
KEY_KEEP_COLUMNS -> PropertyMetadata("Columns to keep", Some("Comma-separated list of columns to keep (e.g. offset, partition)"), required = false)
KEY_KEEP_COLUMNS -> PropertyMetadata("Columns to keep", Some("Comma-separated list of columns to keep (e.g. offset, partition)"), required = false),
KEY_DISABLE_NULLABILITY_PRESERVATION -> PropertyMetadata("Disable nullability preservation", Some("Keep same behaviour as for versions prior to and including v3.2.2"), required = false)
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,29 @@ class TestConfluentAvroDecodingTransformer extends FlatSpec with Matchers with B
result.map(_.getAs[Int]("partition")) should contain theSameElementsAs partitions
}

it should "make value columns nullable if disableNullabilityPreservation is true" in {
// given
MockSchemaRegistryClient.register(s"$Topic-value", ValueSchemaAvro)
val records = createValueRecords(1, 100)
val serializer = new KafkaAvroSerializer(MockSchemaRegistryClient)
val rows = records.map(record => Row(serializer.serialize(Topic, record)))
val schema = new StructType().add("value", BinaryType)
val memoryStream = new MemoryStream[Row](1, spark.sqlContext)(RowEncoder(schema))
memoryStream.addData(rows)
val df = memoryStream.toDF()
val decoder = createBasicDecoder(Map(
KEY_DISABLE_NULLABILITY_PRESERVATION -> "true"
))

// when
val resultDf = decoder.transform(df)

// then
val fields = resultDf.schema.toList
fields.foreach(_.nullable shouldBe true)
}


it should "throw an exception if there is a column name collision between columns to keep and value columns" in {
// given
MockSchemaRegistryClient.register(s"$Topic-value", ValueSchemaAvro)
Expand Down

0 comments on commit bc9437c

Please sign in to comment.