From 6ab4763c12d86fb2d7d38e0309789391c68ac35e Mon Sep 17 00:00:00 2001 From: Ladislav Sulak Date: Fri, 10 Nov 2023 16:53:55 +0100 Subject: [PATCH] #116: controlCol -> measuredCol --- agent/README.md | 4 +- .../za/co/absa/atum/agent/model/Measure.scala | 62 +++++++++---------- .../atum/agent/model/MeasurementBuilder.scala | 4 +- .../atum/agent/model/MeasuresBuilder.scala | 12 ++-- .../absa/atum/agent/model/MeasureTest.scala | 11 ++-- .../co/absa/atum/model/dto/MeasureDTO.scala | 2 +- .../model/utils/SerializationUtilsTest.scala | 16 ++--- .../absa/atum/server/api/database/Runs.scala | 6 +- 8 files changed, 58 insertions(+), 59 deletions(-) diff --git a/agent/README.md b/agent/README.md index 2c340612e..a44cc817f 100644 --- a/agent/README.md +++ b/agent/README.md @@ -12,10 +12,10 @@ Create multiple `AtumContext` with different control measures to be applied ### Option 1 ```scala val atumContextInstanceWithRecordCount = AtumContext(processor = processor) - .withMeasureAdded(RecordCount(MockMeasureNames.recordCount1, controlCol = "id")) + .withMeasureAdded(RecordCount(MockMeasureNames.recordCount1, measuredCol = "id")) val atumContextWithSalaryAbsMeasure = atumContextInstanceWithRecordCount - .withMeasureAdded(AbsSumOfValuesOfColumn(controlCol = "salary")) + .withMeasureAdded(AbsSumOfValuesOfColumn(measuredCol = "salary")) ``` ### Option 2 diff --git a/agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala b/agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala index 411f90514..52d432be1 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala @@ -28,7 +28,7 @@ import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancem * Type of different measures to be applied to the columns. */ sealed trait Measure extends MeasurementProcessor with MeasureType { - val controlCol: String + val measuredColumn: String } trait MeasureType { @@ -50,39 +50,39 @@ object Measure { val supportedMeasureNames: Seq[String] = supportedMeasures.map(_.measureName) case class RecordCount private ( - controlCol: String, - measureName: String, - resultValueType: ResultValueType.ResultValueType + measuredColumn: String, + measureName: String, + resultValueType: ResultValueType.ResultValueType ) extends Measure { override def function: MeasurementFunction = (ds: DataFrame) => { - val resultValue = ds.select(col(controlCol)).count().toString + val resultValue = ds.select(col(measuredColumn)).count().toString ResultOfMeasurement(resultValue, resultValueType) } } object RecordCount extends MeasureType { - def apply(controlCol: String): RecordCount = RecordCount(controlCol, measureName, resultValueType) + def apply(measuredCol: String): RecordCount = RecordCount(measuredCol, measureName, resultValueType) override val measureName: String = "count" override val resultValueType: ResultValueType.ResultValueType = ResultValueType.Long } case class DistinctRecordCount private ( - controlCol: String, - measureName: String, - resultValueType: ResultValueType.ResultValueType + measuredColumn: String, + measureName: String, + resultValueType: ResultValueType.ResultValueType ) extends Measure { override def function: MeasurementFunction = (ds: DataFrame) => { - val resultValue = ds.select(col(controlCol)).distinct().count().toString + val resultValue = ds.select(col(measuredColumn)).distinct().count().toString ResultOfMeasurement(resultValue, resultValueType) } } object DistinctRecordCount extends MeasureType { - def apply(controlCol: String): DistinctRecordCount = { - DistinctRecordCount(controlCol, measureName, resultValueType) + def apply(measuredCol: String): DistinctRecordCount = { + DistinctRecordCount(measuredCol, measureName, resultValueType) } override val measureName: String = "distinctCount" @@ -90,20 +90,20 @@ object Measure { } case class SumOfValuesOfColumn private ( - controlCol: String, - measureName: String, - resultValueType: ResultValueType.ResultValueType + measuredColumn: String, + measureName: String, + resultValueType: ResultValueType.ResultValueType ) extends Measure { override def function: MeasurementFunction = (ds: DataFrame) => { val aggCol = sum(col(valueColumnName)) - val resultValue = aggregateColumn(ds, controlCol, aggCol) + val resultValue = aggregateColumn(ds, measuredColumn, aggCol) ResultOfMeasurement(resultValue, resultValueType) } } object SumOfValuesOfColumn extends MeasureType { - def apply(controlCol: String): SumOfValuesOfColumn = { - SumOfValuesOfColumn(controlCol, measureName, resultValueType) + def apply(measuredCol: String): SumOfValuesOfColumn = { + SumOfValuesOfColumn(measuredCol, measureName, resultValueType) } override val measureName: String = "aggregatedTotal" @@ -111,20 +111,20 @@ object Measure { } case class AbsSumOfValuesOfColumn private ( - controlCol: String, - measureName: String, - resultValueType: ResultValueType.ResultValueType + measuredColumn: String, + measureName: String, + resultValueType: ResultValueType.ResultValueType ) extends Measure { override def function: MeasurementFunction = (ds: DataFrame) => { val aggCol = sum(abs(col(valueColumnName))) - val resultValue = aggregateColumn(ds, controlCol, aggCol) + val resultValue = aggregateColumn(ds, measuredColumn, aggCol) ResultOfMeasurement(resultValue, resultValueType) } } object AbsSumOfValuesOfColumn extends MeasureType { - def apply(controlCol: String): AbsSumOfValuesOfColumn = { - AbsSumOfValuesOfColumn(controlCol, measureName, resultValueType) + def apply(measuredCol: String): AbsSumOfValuesOfColumn = { + AbsSumOfValuesOfColumn(measuredCol, measureName, resultValueType) } override val measureName: String = "absAggregatedTotal" @@ -132,16 +132,16 @@ object Measure { } case class SumOfHashesOfColumn private ( - controlCol: String, - measureName: String, - resultValueType: ResultValueType.ResultValueType + measuredColumn: String, + measureName: String, + resultValueType: ResultValueType.ResultValueType ) extends Measure { override def function: MeasurementFunction = (ds: DataFrame) => { val aggregatedColumnName = ds.schema.getClosestUniqueName("sum_of_hashes") val value = ds - .withColumn(aggregatedColumnName, crc32(col(controlCol).cast("String"))) + .withColumn(aggregatedColumnName, crc32(col(measuredColumn).cast("String"))) .agg(sum(col(aggregatedColumnName))) .collect()(0)(0) val resultValue = if (value == null) "" else value.toString @@ -149,8 +149,8 @@ object Measure { } } object SumOfHashesOfColumn extends MeasureType { - def apply(controlCol: String): SumOfHashesOfColumn = { - SumOfHashesOfColumn(controlCol, measureName, resultValueType) + def apply(measuredCol: String): SumOfHashesOfColumn = { + SumOfHashesOfColumn(measuredCol, measureName, resultValueType) } override val measureName: String = "hashCrc32" @@ -169,7 +169,7 @@ object Measure { // scala> sc.parallelize(List(Long.MaxValue, 1)).toDF.agg(sum("value")).take(1)(0)(0) // res11: Any = -9223372036854775808 // Converting to BigDecimal fixes the issue - // val ds2 = ds.select(col(measurement.controlCol).cast(DecimalType(38, 0)).as("value")) + // val ds2 = ds.select(col(measurement.measuredCol).cast(DecimalType(38, 0)).as("value")) // ds2.agg(sum(abs($"value"))).collect()(0)(0) val ds2 = ds.select( col(measureColumn).cast(DecimalType(38, 0)).as(valueColumnName) diff --git a/agent/src/main/scala/za/co/absa/atum/agent/model/MeasurementBuilder.scala b/agent/src/main/scala/za/co/absa/atum/agent/model/MeasurementBuilder.scala index 716827ef0..4bb320425 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/model/MeasurementBuilder.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/model/MeasurementBuilder.scala @@ -23,8 +23,8 @@ private [agent] object MeasurementBuilder { private [agent] def buildMeasurementDTO(measurement: Measurement): MeasurementDTO = { val measureName = measurement.measure.measureName - val controlCols = Seq(measurement.measure.controlCol) - val measureDTO = MeasureDTO(measureName, controlCols) + val measuredColumns = Seq(measurement.measure.measuredColumn) + val measureDTO = MeasureDTO(measureName, measuredColumns) val measureResultDTO = MeasureResultDTO(TypedValue(measurement.resultValue.toString, measurement.resultType)) diff --git a/agent/src/main/scala/za/co/absa/atum/agent/model/MeasuresBuilder.scala b/agent/src/main/scala/za/co/absa/atum/agent/model/MeasuresBuilder.scala index f73c6e3b1..e43426b89 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/model/MeasuresBuilder.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/model/MeasuresBuilder.scala @@ -27,14 +27,14 @@ private [agent] object MeasuresBuilder { } private def createMeasure(measure: dto.MeasureDTO): za.co.absa.atum.agent.model.Measure = { - val controlColumn = measure.controlColumns.head + val measuredColumn = measure.measuredColumns.head measure.measureName match { - case RecordCount.measureName => RecordCount(controlColumn) - case DistinctRecordCount.measureName => DistinctRecordCount(controlColumn) - case SumOfValuesOfColumn.measureName => SumOfValuesOfColumn(controlColumn) - case AbsSumOfValuesOfColumn.measureName => AbsSumOfValuesOfColumn(controlColumn) - case SumOfHashesOfColumn.measureName => SumOfHashesOfColumn(controlColumn) + case RecordCount.measureName => RecordCount(measuredColumn) + case DistinctRecordCount.measureName => DistinctRecordCount(measuredColumn) + case SumOfValuesOfColumn.measureName => SumOfValuesOfColumn(measuredColumn) + case AbsSumOfValuesOfColumn.measureName => AbsSumOfValuesOfColumn(measuredColumn) + case SumOfHashesOfColumn.measureName => SumOfHashesOfColumn(measuredColumn) case unsupportedMeasure => throw MeasureException( s"Measure not supported: $unsupportedMeasure. Supported measures are: ${Measure.supportedMeasureNames}" diff --git a/agent/src/test/scala/za/co/absa/atum/agent/model/MeasureTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/model/MeasureTest.scala index 031b48244..4563a0d5b 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/model/MeasureTest.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/model/MeasureTest.scala @@ -29,12 +29,11 @@ class MeasureTest extends AnyFlatSpec with Matchers with SparkTestBase { self => "Measure" should "be based on the dataframe" in { // Measures - val measureIds: Measure = RecordCount(controlCol = "id") - val salaryAbsSum: Measure = AbsSumOfValuesOfColumn( - controlCol = "salary" - ) - val salarySum = SumOfValuesOfColumn(controlCol = "salary") - val sumOfHashes: Measure = SumOfHashesOfColumn(controlCol = "id") + val measureIds: Measure = RecordCount(measuredCol = "id") + val salaryAbsSum: Measure = AbsSumOfValuesOfColumn(measuredCol = "salary") + + val salarySum = SumOfValuesOfColumn(measuredCol = "salary") + val sumOfHashes: Measure = SumOfHashesOfColumn(measuredCol = "id") // AtumContext contains `Measurement` val atumContextInstanceWithRecordCount = AtumAgent diff --git a/model/src/main/scala/za/co/absa/atum/model/dto/MeasureDTO.scala b/model/src/main/scala/za/co/absa/atum/model/dto/MeasureDTO.scala index 511848dc8..89ce8d018 100644 --- a/model/src/main/scala/za/co/absa/atum/model/dto/MeasureDTO.scala +++ b/model/src/main/scala/za/co/absa/atum/model/dto/MeasureDTO.scala @@ -18,5 +18,5 @@ package za.co.absa.atum.model.dto case class MeasureDTO( measureName: String, - controlColumns: Seq[String] + measuredColumns: Seq[String] ) diff --git a/model/src/test/scala/za/co/absa/atum/model/utils/SerializationUtilsTest.scala b/model/src/test/scala/za/co/absa/atum/model/utils/SerializationUtilsTest.scala index 0802f5515..fd2a6c0a2 100644 --- a/model/src/test/scala/za/co/absa/atum/model/utils/SerializationUtilsTest.scala +++ b/model/src/test/scala/za/co/absa/atum/model/utils/SerializationUtilsTest.scala @@ -82,14 +82,14 @@ class SerializationUtilsTest extends AnyFlatSpecLike { measures = seqMeasureDTO ) - val expectedAdditionalDataJson = "{\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"measures\":[{\"measureName\":\"count\",\"controlColumns\":[\"col\"]}],\"additionalData\":{\"additionalData\":{}}}" + val expectedAdditionalDataJson = "{\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"measures\":[{\"measureName\":\"count\",\"measuredColumns\":[\"col\"]}],\"additionalData\":{\"additionalData\":{}}}" val actualAdditionalDataJson = SerializationUtils.asJson(atumContextDTO) assert(expectedAdditionalDataJson == actualAdditionalDataJson) } "fromJson" should "deserialize AtumContextDTO from json string" in { - val atumContextDTOJson = "{\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"measures\":[{\"measureName\":\"count\",\"controlColumns\":[\"col\"]}],\"additionalData\":{\"additionalData\":{}}}" + val atumContextDTOJson = "{\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"measures\":[{\"measureName\":\"count\",\"measuredColumns\":[\"col\"]}],\"additionalData\":{\"additionalData\":{}}}" val seqPartitionDTO = Seq(PartitionDTO("key", "val")) val seqMeasureDTO = Set(MeasureDTO("count", Seq("col"))) @@ -150,7 +150,7 @@ class SerializationUtilsTest extends AnyFlatSpecLike { measurements = seqMeasurementDTO ) - val expectedCheckpointDTOJson = "{\"id\":\"" + uuid + "\",\"name\":\"checkpoint\",\"author\":\"author\",\"measuredByAtumAgent\":true,\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"processStartTime\":\"2023-10-24 10:20:59.005000+02\",\"processEndTime\":\"2023-10-24 10:20:59.005000+02\",\"measurements\":[{\"measure\":{\"measureName\":\"count\",\"controlColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}]}" + val expectedCheckpointDTOJson = "{\"id\":\"" + uuid + "\",\"name\":\"checkpoint\",\"author\":\"author\",\"measuredByAtumAgent\":true,\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"processStartTime\":\"2023-10-24 10:20:59.005000+02\",\"processEndTime\":\"2023-10-24 10:20:59.005000+02\",\"measurements\":[{\"measure\":{\"measureName\":\"count\",\"measuredColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}]}" val actualCheckpointDTOJson = SerializationUtils.asJson(checkpointDTO) assert(expectedCheckpointDTOJson == actualCheckpointDTOJson) @@ -161,7 +161,7 @@ class SerializationUtilsTest extends AnyFlatSpecLike { val seqPartitionDTO = Seq(PartitionDTO("key", "val")) val timeWithZone = OffsetDateTime.of(2023, 10, 24, 10, 20, 59, 5000000, ZoneOffset.ofHours(2)) - val checkpointDTOJson = "{\"id\":\"" + uuid + "\",\"name\":\"checkpoint\",\"author\":\"author\",\"measuredByAtumAgent\":true,\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"processStartTime\":\"2023-10-24 10:20:59.005000+02\",\"processEndTime\":\"2023-10-24 10:20:59.005000+02\",\"measurements\":[{\"measure\":{\"measureName\":\"count\",\"controlColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}]}" + val checkpointDTOJson = "{\"id\":\"" + uuid + "\",\"name\":\"checkpoint\",\"author\":\"author\",\"measuredByAtumAgent\":true,\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"processStartTime\":\"2023-10-24 10:20:59.005000+02\",\"processEndTime\":\"2023-10-24 10:20:59.005000+02\",\"measurements\":[{\"measure\":{\"measureName\":\"count\",\"measuredColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}]}" val seqMeasurementDTO = Seq( MeasurementDTO( @@ -191,14 +191,14 @@ class SerializationUtilsTest extends AnyFlatSpecLike { "asJson" should "serialize MeasureDTO into json string" in { val measureDTO = MeasureDTO("count", Seq("col")) - val expectedMeasureDTOJson = "{\"measureName\":\"count\",\"controlColumns\":[\"col\"]}" + val expectedMeasureDTOJson = "{\"measureName\":\"count\",\"measuredColumns\":[\"col\"]}" val actualMeasureDTOJson = SerializationUtils.asJson(measureDTO) assert(expectedMeasureDTOJson == actualMeasureDTOJson) } "fromJson" should "deserialize MeasureDTO from json string" in { - val measureDTOJson = "{\"measureName\":\"count\",\"controlColumns\":[\"col\"]}" + val measureDTOJson = "{\"measureName\":\"count\",\"measuredColumns\":[\"col\"]}" val expectedMeasureDTO = MeasureDTO("count", Seq("col")) val actualMeasureDTO = SerializationUtils.fromJson[MeasureDTO](measureDTOJson) @@ -213,14 +213,14 @@ class SerializationUtilsTest extends AnyFlatSpecLike { val measurementDTO = MeasurementDTO(measureDTO, measureResultDTO) - val expectedMeasurementDTOJson = "{\"measure\":{\"measureName\":\"count\",\"controlColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}" + val expectedMeasurementDTOJson = "{\"measure\":{\"measureName\":\"count\",\"measuredColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}" val actualMeasurementDTOJson = SerializationUtils.asJson(measurementDTO) assert(expectedMeasurementDTOJson == actualMeasurementDTOJson) } "fromJson" should "deserialize MeasurementDTO from json string" in { - val measurementDTOJson = "{\"measure\":{\"measureName\":\"count\",\"controlColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}" + val measurementDTOJson = "{\"measure\":{\"measureName\":\"count\",\"measuredColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}" val measureDTO = MeasureDTO("count", Seq("col")) val measureResultDTO = MeasureResultDTO(mainValue = TypedValue("1", ResultValueType.Long)) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/Runs.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/Runs.scala index 148d03347..d522e96f2 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/database/Runs.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/Runs.scala @@ -62,8 +62,8 @@ object Runs { val measureNames = values.measurements.map(_.measure.measureName).toSeq val measureNamesNormalized = scalaSeqToPgArray(measureNames) - val controlColumns = values.measurements.map(_.measure.controlColumns).toSeq - val controlColumnsNormalized = nestedScalaSeqToPgArray(controlColumns) + val measuredColumns = values.measurements.map(_.measure.measuredColumns).toSeq + val measuredColumnsNormalized = nestedScalaSeqToPgArray(measuredColumns) val measureResults = values.measurements.map(_.result).toSeq val measureResultsNormalized = measureResults.map(SerializationUtils.asJson) @@ -76,7 +76,7 @@ object Runs { ${values.processStartTime}::TIMESTAMPTZ, ${values.processEndTime}::TIMESTAMPTZ, $measureNamesNormalized::TEXT[], - $controlColumnsNormalized::TEXT[][], + $measuredColumnsNormalized::TEXT[][], $measureResultsNormalized::JSONB[], ${values.author} ) #$alias;"""