From 1d08ad8b46bbf233d31ec7bd33387baf0ed08eab Mon Sep 17 00:00:00 2001 From: Adam Cervenka Date: Thu, 12 Oct 2023 17:26:29 +0200 Subject: [PATCH 01/15] #26 add checkpoint creation function --- LICENSE.md | 2 +- .../za/co/absa/atum/agent/AtumContext.scala | 46 +++++++++++++++++-- .../agent/dispatcher/ConsoleDispatcher.scala | 1 + .../agent/dispatcher/HttpDispatcher.scala | 1 + .../co/absa/atum/agent/AtumContextTest.scala | 32 +++++++++++++ 5 files changed, 77 insertions(+), 5 deletions(-) diff --git a/LICENSE.md b/LICENSE.md index 7a4a3ea24..d64569567 100644 --- a/LICENSE.md +++ b/LICENSE.md @@ -199,4 +199,4 @@ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and - limitations under the License. \ No newline at end of file + limitations under the License. diff --git a/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala b/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala index 3f58d8676..953f5822f 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala @@ -17,10 +17,13 @@ package za.co.absa.atum.agent import org.apache.spark.sql.DataFrame +import za.co.absa.atum.agent.AtumContext.AtumPartitions import za.co.absa.atum.agent.model.{Measure, MeasuresMapper} -import AtumContext.AtumPartitions -import za.co.absa.atum.model.dto.{AtumContextDTO, PartitionDTO} +import za.co.absa.atum.model.dto.MeasureResultDTO.{ResultValueType, TypedValue} +import za.co.absa.atum.model.dto._ +import java.time.ZonedDateTime +import java.util.UUID import scala.collection.immutable.ListMap /** @@ -41,8 +44,43 @@ class AtumContext private[agent] ( agent.getOrCreateAtumSubContext(atumPartitions ++ subPartitions)(this) } - def createCheckpoint(checkpointName: String, author: String, dataToMeasure: DataFrame) = { - ??? // TODO #26 + def createCheckpoint(checkpointName: String, author: String, dataToMeasure: DataFrame): AtumContext = { + val startTime = ZonedDateTime.now() + val measurements = takeMeasurements(dataToMeasure) + val endTime = ZonedDateTime.now() + + val partitions = atumPartitions.map { case (key, value) => PartitionDTO(key, value) } + + val checkpoint = CheckpointDTO( + id = UUID.randomUUID(), + name = checkpointName, + author = author, + measuredByAtumAgent = true, + partitioning = partitions.toSeq, + processStartTime = startTime, + processEndTime = Some(endTime), + measurements = measurements.toSeq + ) + + agent.saveCheckpoint(checkpoint) + this + } + + private def takeMeasurements(df: DataFrame): Set[MeasurementDTO] = { + measures.map { m => + val result = m.function(df) + val measureResultDto = MeasureResultDTO(TypedValue(result, ResultValueType.String)) + + val measureDto = MeasureDTO( + functionName = m.getClass().getName, + controlColumns = Seq(m.controlCol) + ) + + MeasurementDTO( + measure = measureDto, + result = measureResultDto + ) + } } def saveCheckpointMeasurements(checkpointName: String, measurements: Seq[Measure]) = { diff --git a/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/ConsoleDispatcher.scala b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/ConsoleDispatcher.scala index ef51e109f..109be4f6a 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/ConsoleDispatcher.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/ConsoleDispatcher.scala @@ -33,4 +33,5 @@ class ConsoleDispatcher extends Dispatcher with Logging { override def saveCheckpoint(checkpoint: CheckpointDTO): Unit = { println(s"Saving checkpoint to server. $checkpoint") } + } diff --git a/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/HttpDispatcher.scala b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/HttpDispatcher.scala index 444260d60..8aa5457a4 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/HttpDispatcher.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/HttpDispatcher.scala @@ -45,4 +45,5 @@ class HttpDispatcher(config: Config) extends Dispatcher with Logging { .post(serverUri) .send(backend) } + } diff --git a/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala index 4d3273ddd..af6e9f7b4 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala @@ -16,10 +16,14 @@ package za.co.absa.atum.agent +import org.apache.spark.sql.SparkSession +import org.mockito.Mockito.{mock, verify} +import org.mockito.ArgumentCaptor import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import za.co.absa.atum.agent.AtumContext.AtumPartitions import za.co.absa.atum.agent.model.Measure.RecordCount +import za.co.absa.atum.model.dto._ class AtumContextTest extends AnyFlatSpec with Matchers { @@ -63,4 +67,32 @@ class AtumContextTest extends AnyFlatSpec with Matchers { assert(atumContextRemoved.currentMeasures.head == RecordCount("other")) } + "createCheckpoint" should "take measurements and create a checkpoints" in { + val mockAgent = mock(classOf[AtumAgent]) + + val atumContext = new AtumContext(AtumPartitions("foo2" -> "bar"), mockAgent) + .addMeasure(RecordCount("letter")) + + val spark = SparkSession.builder + .master("local") + .config("spark.driver.host", "localhost") + .config("spark.ui.enabled", "false") + .getOrCreate() + + import spark.implicits._ + + val rdd = spark.sparkContext.parallelize(Seq("A", "B", "C")) + val df = rdd.toDF("letter") + + atumContext.createCheckpoint("testCheckpoint", "Hans", df) + + val argument = ArgumentCaptor.forClass(classOf[CheckpointDTO]) + verify(mockAgent).saveCheckpoint(argument.capture()) + + assert(argument.getValue.name == "testCheckpoint") + assert(argument.getValue.author == "Hans") + assert(argument.getValue.partitioning == Seq(PartitionDTO("foo2", "bar"))) + assert(argument.getValue.measurements.head.result.mainValue.value == "3") + } + } From 02d829e60fb005d769f0058b2e439ffa2df14027 Mon Sep 17 00:00:00 2001 From: Ladislav Sulak Date: Tue, 24 Oct 2023 17:04:49 +0200 Subject: [PATCH 02/15] #26: finishing bear minimum of agent side --- .../za/co/absa/atum/agent/AtumAgent.scala | 6 +- .../za/co/absa/atum/agent/AtumContext.scala | 56 +++++-------- .../agent/core/MeasurementProcessor.scala | 10 ++- .../co/absa/atum/agent/model/Checkpoint.scala | 25 ++++-- .../za/co/absa/atum/agent/model/Measure.scala | 22 +++-- .../absa/atum/agent/model/Measurement.scala | 20 ++++- .../atum/agent/model/MeasurementBuilder.scala | 55 ++++++------- .../atum/agent/model/MeasuresMapper.scala | 2 +- .../co/absa/atum/agent/AtumContextTest.scala | 82 +++++++++++++++---- .../absa/atum/agent/model/MeasureTest.scala | 49 +++++++---- .../agent/model/MeasurementBuilderTest.scala | 35 +++++--- .../atum/agent/model/MeasuresMapperTest.scala | 2 +- 12 files changed, 234 insertions(+), 130 deletions(-) diff --git a/agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala b/agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala index 0a227fc9e..deeb0a486 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala @@ -36,7 +36,8 @@ class AtumAgent private[agent] () { /** * Sends `CheckpointDTO` to the AtumService API - * @param checkpoint + * + * @param checkpoint Already initialized Checkpoint object to store */ def saveCheckpoint(checkpoint: CheckpointDTO): Unit = { dispatcher.saveCheckpoint(checkpoint) @@ -45,7 +46,7 @@ class AtumAgent private[agent] () { /** * Sends `Checkpoint` to the AtumService API * - * @param checkpoint + * @param checkpoint Already initialized Checkpoint object to store */ def saveCheckpoint(checkpoint: Checkpoint): Unit = { dispatcher.saveCheckpoint(checkpoint.toCheckpointDTO) @@ -53,6 +54,7 @@ class AtumAgent private[agent] () { /** * Provides an AtumContext given a `AtumPartitions` instance. Retrieves the data from AtumService API. + * * @param atumPartitions * @return */ diff --git a/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala b/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala index 807bc941c..6c940c5b7 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala @@ -18,12 +18,10 @@ package za.co.absa.atum.agent import org.apache.spark.sql.DataFrame import za.co.absa.atum.agent.AtumContext.AtumPartitions -import za.co.absa.atum.agent.model.{Checkpoint, Measure, Measurement, MeasuresMapper} -import za.co.absa.atum.model.dto.MeasureResultDTO.{ResultValueType, TypedValue} +import za.co.absa.atum.agent.model.{Checkpoint, Measure, Measurement, MeasurementByAtum, MeasuresMapper} import za.co.absa.atum.model.dto._ import java.time.ZonedDateTime -import java.util.UUID import scala.collection.immutable.ListMap /** @@ -44,51 +42,34 @@ class AtumContext private[agent] ( agent.getOrCreateAtumSubContext(atumPartitions ++ subPartitions)(this) } - def createCheckpoint(checkpointName: String, author: String, dataToMeasure: DataFrame): AtumContext = { + private def takeMeasurements(df: DataFrame): Set[Measurement] = { + measures.map { m => + val measurementResult = m.function(df) + MeasurementByAtum(m, measurementResult.result, measurementResult.resultType) + } + } + + def createCheckpoint(checkpointName: String, author: String, dataToMeasure: DataFrame): Checkpoint = { val startTime = ZonedDateTime.now() val measurements = takeMeasurements(dataToMeasure) val endTime = ZonedDateTime.now() - val partitions = atumPartitions.map { case (key, value) => PartitionDTO(key, value) } - - val checkpoint = CheckpointDTO( - id = UUID.randomUUID(), + Checkpoint( name = checkpointName, author = author, measuredByAtumAgent = true, - partitioning = partitions.toSeq, + atumPartitions = this.atumPartitions, processStartTime = startTime, processEndTime = Some(endTime), measurements = measurements.toSeq ) - - agent.saveCheckpoint(checkpoint) - this - } - - private def takeMeasurements(df: DataFrame): Set[MeasurementDTO] = { - measures.map { m => - val result = m.function(df) - val measureResultDto = MeasureResultDTO(TypedValue(result, ResultValueType.String)) - - val measureDto = MeasureDTO( - functionName = m.getClass().getName, - controlColumns = Seq(m.controlCol) - ) - - MeasurementDTO( - measure = measureDto, - result = measureResultDto - ) - } } def createCheckpointOnProvidedData( - checkpointName: String, - author: String, - measurements: Seq[Measurement] - ): Checkpoint = { + checkpointName: String, author: String, measurements: Seq[Measurement] + ): Checkpoint = { val zonedDateTimeNow = ZonedDateTime.now() + Checkpoint( name = checkpointName, author = author, @@ -159,13 +140,16 @@ object AtumContext { implicit class DatasetWrapper(df: DataFrame) { /** - * Set a point in the pipeline to execute calculation. + * Set a point in the pipeline to execute calculation and store it. * @param checkpointName The key assigned to this checkpoint + * @param author Author of the checkpoint * @param atumContext Contains the calculations to be done and publish the result * @return */ - def createCheckpoint(checkpointName: String, author: String)(implicit atumContext: AtumContext): DataFrame = { - // todo: implement checkpoint creation + def createAndSaveCheckpoint(checkpointName: String, author: String)(implicit atumContext: AtumContext): DataFrame = { + val checkpoint = atumContext.createCheckpoint(checkpointName, author, df) + val checkpointDTO = checkpoint.toCheckpointDTO + atumContext.agent.saveCheckpoint(checkpointDTO) df } diff --git a/agent/src/main/scala/za/co/absa/atum/agent/core/MeasurementProcessor.scala b/agent/src/main/scala/za/co/absa/atum/agent/core/MeasurementProcessor.scala index c21a45a61..026f71e9c 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/core/MeasurementProcessor.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/core/MeasurementProcessor.scala @@ -18,6 +18,7 @@ package za.co.absa.atum.agent.core import org.apache.spark.sql.DataFrame import za.co.absa.atum.agent.core.MeasurementProcessor.MeasurementFunction +import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType trait MeasurementProcessor { @@ -26,6 +27,13 @@ trait MeasurementProcessor { } object MeasurementProcessor { - type MeasurementFunction = DataFrame => String + /** + * The raw result of measurement is always gonna be string, because we want to avoid some floating point issues + * (overflows, consistent representation of numbers - whether they are coming from Java or Scala world, and more), + * but the actual type is stored alongside the computation because we don't want to lost this information. + */ + final case class ResultOfMeasurement(result: String, resultType: ResultValueType.ResultValueType) + + type MeasurementFunction = DataFrame => ResultOfMeasurement } diff --git a/agent/src/main/scala/za/co/absa/atum/agent/model/Checkpoint.scala b/agent/src/main/scala/za/co/absa/atum/agent/model/Checkpoint.scala index 43f82d434..3a2c05181 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/model/Checkpoint.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/model/Checkpoint.scala @@ -23,15 +23,22 @@ import java.time.ZonedDateTime import java.util.UUID case class Checkpoint( - name: String, - author: String, - measuredByAtumAgent: Boolean = false, - atumPartitions: AtumPartitions, - processStartTime: ZonedDateTime, - processEndTime: Option[ZonedDateTime], - measurements: Seq[Measurement] - ) { + name: String, + author: String, + measuredByAtumAgent: Boolean = false, + atumPartitions: AtumPartitions, + processStartTime: ZonedDateTime, + processEndTime: Option[ZonedDateTime], + measurements: Seq[Measurement] +) { private [agent] def toCheckpointDTO: CheckpointDTO = { + val measurementDTOs = measurements.map { + case provided: MeasurementProvided => + MeasurementBuilder.buildMeasurementDTO(provided) + case byAtum: MeasurementByAtum => + MeasurementBuilder.buildMeasurementDTO(byAtum) + } + CheckpointDTO( id = UUID.randomUUID(), name = name, @@ -40,7 +47,7 @@ case class Checkpoint( partitioning = AtumPartitions.toSeqPartitionDTO(atumPartitions), processStartTime = processStartTime, processEndTime = processEndTime, - measurements = measurements.map(MeasurementBuilder.buildMeasurementDTO) + measurements = measurementDTOs ) } } diff --git a/agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala b/agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala index 33cc71323..1dcb5731d 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala @@ -20,7 +20,8 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types.{DecimalType, LongType, StringType} import org.apache.spark.sql.{Column, DataFrame} import za.co.absa.atum.agent.core.MeasurementProcessor -import za.co.absa.atum.agent.core.MeasurementProcessor.MeasurementFunction +import za.co.absa.atum.agent.core.MeasurementProcessor.{MeasurementFunction, ResultOfMeasurement} +import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancements /** @@ -47,7 +48,10 @@ object Measure { case class RecordCount private (controlCol: String, measureName: String, onlyForNumeric: Boolean) extends Measure { override def function: MeasurementFunction = - (ds: DataFrame) => ds.select(col(controlCol)).count().toString + (ds: DataFrame) => { + val result = ds.select(col(controlCol)).count().toString + ResultOfMeasurement(result, ResultValueType.Long) + } } object RecordCount extends MeasureType { def apply(controlCol: String): RecordCount = { @@ -62,7 +66,10 @@ object Measure { extends Measure { override def function: MeasurementFunction = - (ds: DataFrame) => ds.select(col(controlCol)).distinct().count().toString + (ds: DataFrame) => { + val result = ds.select(col(controlCol)).distinct().count().toString + ResultOfMeasurement(result, ResultValueType.Long) + } } object DistinctRecordCount extends MeasureType { @@ -79,7 +86,8 @@ object Measure { override def function: MeasurementFunction = (ds: DataFrame) => { val aggCol = sum(col(valueColumnName)) - aggregateColumn(ds, controlCol, aggCol) + val result = aggregateColumn(ds, controlCol, aggCol) + ResultOfMeasurement(result, ResultValueType.BigDecimal) } } @@ -97,7 +105,8 @@ object Measure { override def function: MeasurementFunction = (ds: DataFrame) => { val aggCol = sum(abs(col(valueColumnName))) - aggregateColumn(ds, controlCol, aggCol) + val result = aggregateColumn(ds, controlCol, aggCol) + ResultOfMeasurement(result, ResultValueType.Double) } } @@ -120,7 +129,8 @@ object Measure { .withColumn(aggregatedColumnName, crc32(col(controlCol).cast("String"))) .agg(sum(col(aggregatedColumnName))) .collect()(0)(0) - if (value == null) "" else value.toString + val result = if (value == null) "" else value.toString + ResultOfMeasurement(result, ResultValueType.String) } } diff --git a/agent/src/main/scala/za/co/absa/atum/agent/model/Measurement.scala b/agent/src/main/scala/za/co/absa/atum/agent/model/Measurement.scala index da37f071b..a7c0851a5 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/model/Measurement.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/model/Measurement.scala @@ -16,4 +16,22 @@ package za.co.absa.atum.agent.model -case class Measurement(measure: Measure, result: Any) +import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType.ResultValueType + +trait Measurement { + val measure: Measure + val result: Any +} + +/** + * When the application/user of Atum Agent provides actual results by himself, the type is precise and we don't need + * to do any adjustments. + */ +case class MeasurementProvided(measure: Measure, result: Any) extends Measurement + +/** + * When the Atum Agent itself performs the measurements, using Spark, then in some cases some adjustments are + * needed - thus we are converting the results to strings always - but we need to keep the information about + * the actual type as well. + */ +case class MeasurementByAtum(measure: Measure, result: String, resultType: ResultValueType) extends Measurement diff --git a/agent/src/main/scala/za/co/absa/atum/agent/model/MeasurementBuilder.scala b/agent/src/main/scala/za/co/absa/atum/agent/model/MeasurementBuilder.scala index d1b927675..4fd4acc09 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/model/MeasurementBuilder.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/model/MeasurementBuilder.scala @@ -22,49 +22,40 @@ import za.co.absa.atum.model.dto.MeasureResultDTO.{ResultValueType, TypedValue} private [agent] object MeasurementBuilder { - def buildMeasurementDTO(measurement: Measurement): MeasurementDTO = { + private [agent] def buildMeasurementDTO(measurement: MeasurementByAtum): MeasurementDTO = { val measureName = measurement.measure.measureName - measurement.result match { - case l: Long => - buildLongMeasurement(measureName, Seq(measurement.measure.controlCol), l) - case d: Double => - buildDoubleMeasureResult(measureName, Seq(measurement.measure.controlCol), d) - case bd: BigDecimal => - buildBigDecimalMeasureResult(measureName, Seq(measurement.measure.controlCol), bd) - case s: String => - buildStringMeasureResult(measureName, Seq(measurement.measure.controlCol), s) - case unsupportedType => - val className = unsupportedType.getClass.getSimpleName - throw UnsupportedMeasureResultType(s"Unsupported type of measure $measureName: $className") - } - } + val controlCols = Seq(measurement.measure.controlCol) - private def buildLongMeasurement(functionName: String, controlCols: Seq[String], resultValue: Long): MeasurementDTO = { MeasurementDTO( - MeasureDTO(functionName, controlCols), - MeasureResultDTO(TypedValue(resultValue.toString, ResultValueType.Long)) + MeasureDTO(measureName, controlCols), + MeasureResultDTO(TypedValue(measurement.result, measurement.resultType)) ) } - private def buildDoubleMeasureResult(functionName: String, controlCols: Seq[String], resultValue: Double): MeasurementDTO = { - MeasurementDTO( - MeasureDTO(functionName, controlCols), - MeasureResultDTO(TypedValue(resultValue.toString, ResultValueType.Double)) - ) - } + private [agent] def buildMeasurementDTO(measurement: MeasurementProvided): MeasurementDTO = { + val measureName = measurement.measure.measureName + val controlCols = Seq(measurement.measure.controlCol) - private def buildBigDecimalMeasureResult(functionName: String, controlCols: Seq[String], resultValue: BigDecimal): MeasurementDTO = { MeasurementDTO( - MeasureDTO(functionName, controlCols), - MeasureResultDTO(TypedValue(resultValue.toString, ResultValueType.BigDecimal)) + MeasureDTO(measureName, controlCols), + buildMeasureResultDTO(measureName, measurement.result) ) } - private def buildStringMeasureResult(functionName: String, controlCols: Seq[String], resultValue: String): MeasurementDTO = { - MeasurementDTO( - MeasureDTO(functionName, controlCols), - MeasureResultDTO(TypedValue(resultValue, ResultValueType.String)) - ) + private [agent] def buildMeasureResultDTO(measureName: String, result: Any): MeasureResultDTO = { + result match { + case l: Long => + MeasureResultDTO(TypedValue(l.toString, ResultValueType.Long)) + case d: Double => + MeasureResultDTO(TypedValue(d.toString, ResultValueType.Double)) + case bd: BigDecimal => + MeasureResultDTO(TypedValue(bd.toString, ResultValueType.BigDecimal)) + case s: String => + MeasureResultDTO(TypedValue(s, ResultValueType.String)) + case unsupportedType => + val className = unsupportedType.getClass.getSimpleName + throw UnsupportedMeasureResultType(s"Unsupported type of measure $measureName: $className for result: $result") + } } } diff --git a/agent/src/main/scala/za/co/absa/atum/agent/model/MeasuresMapper.scala b/agent/src/main/scala/za/co/absa/atum/agent/model/MeasuresMapper.scala index 9e6836c54..a5384dd65 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/model/MeasuresMapper.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/model/MeasuresMapper.scala @@ -22,7 +22,7 @@ import za.co.absa.atum.model.dto private [agent] object MeasuresMapper { - def mapToMeasures(measures: Set[dto.MeasureDTO]): Set[za.co.absa.atum.agent.model.Measure] = { + private [agent] def mapToMeasures(measures: Set[dto.MeasureDTO]): Set[za.co.absa.atum.agent.model.Measure] = { measures.map(createMeasure) } diff --git a/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala index 067b8a178..f356d9d75 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala @@ -16,15 +16,18 @@ package za.co.absa.atum.agent -import org.apache.spark.sql.SparkSession -import org.mockito.Mockito.{mock, verify} +import org.apache.spark.sql.types.DoubleType +import org.apache.spark.sql.{Row, SparkSession} +import org.mockito.Mockito.{mock, times, verify} import org.mockito.ArgumentCaptor import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import za.co.absa.atum.agent.AtumContext.AtumPartitions -import za.co.absa.atum.model.dto._ import za.co.absa.atum.agent.model.Measure.{RecordCount, SumOfValuesOfColumn} -import za.co.absa.atum.agent.model.Measurement +import za.co.absa.atum.agent.model.MeasurementProvided +import za.co.absa.atum.model.dto._ +import org.apache.spark.sql.types.{StringType, StructField, StructType} +import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType class AtumContextTest extends AnyFlatSpec with Matchers { @@ -68,7 +71,7 @@ class AtumContextTest extends AnyFlatSpec with Matchers { assert(atumContextRemoved.currentMeasures.head == RecordCount("other")) } - "createCheckpoint" should "take measurements and create a checkpoints" in { + "createCheckpoint" should "take measurements and create a Checkpoint" in { val mockAgent = mock(classOf[AtumAgent]) val atumContext = new AtumContext(AtumPartitions("foo2" -> "bar"), mockAgent) @@ -85,15 +88,12 @@ class AtumContextTest extends AnyFlatSpec with Matchers { val rdd = spark.sparkContext.parallelize(Seq("A", "B", "C")) val df = rdd.toDF("letter") - atumContext.createCheckpoint("testCheckpoint", "Hans", df) - - val argument = ArgumentCaptor.forClass(classOf[CheckpointDTO]) - verify(mockAgent).saveCheckpoint(argument.capture()) + val checkpoint = atumContext.createCheckpoint("testCheckpoint", "Hans", df) - assert(argument.getValue.name == "testCheckpoint") - assert(argument.getValue.author == "Hans") - assert(argument.getValue.partitioning == Seq(PartitionDTO("foo2", "bar"))) - assert(argument.getValue.measurements.head.result.mainValue.value == "3") + assert(checkpoint.name == "testCheckpoint") + assert(checkpoint.author == "Hans") + assert(checkpoint.atumPartitions == AtumPartitions("foo2", "bar")) + assert(checkpoint.measurements.head.result == "3") } "createCheckpointOnProvidedData" should "create a Checkpoint on provided data" in { @@ -101,7 +101,10 @@ class AtumContextTest extends AnyFlatSpec with Matchers { val atumPartitions = AtumPartitions("key" -> "value") val atumContext = atumAgent.getOrCreateAtumContext(atumPartitions) - val measurements = Seq(Measurement(RecordCount("col"), "1"), Measurement(SumOfValuesOfColumn("col"), 1)) + val measurements = Seq( + MeasurementProvided(RecordCount("col"), "1"), + MeasurementProvided(SumOfValuesOfColumn("col"), 1) + ) val checkpoint = atumContext.createCheckpointOnProvidedData( checkpointName = "name", @@ -117,4 +120,55 @@ class AtumContextTest extends AnyFlatSpec with Matchers { assert(checkpoint.measurements == measurements) } + "createAndSaveCheckpoint" should "take measurements and create a Checkpoint, multiple measure changes" in { + val mockAgent = mock(classOf[AtumAgent]) + + implicit val atumContext: AtumContext = new AtumContext(AtumPartitions("foo2" -> "bar"), mockAgent) + .addMeasure(RecordCount("notImportantColumn")) + + val spark = SparkSession.builder + .master("local") + .config("spark.driver.host", "localhost") + .config("spark.ui.enabled", "false") + .getOrCreate() + + val rdd = spark.sparkContext.parallelize( + Seq( + Row("A", 8.0), + Row("B", 2.9), + Row("C", 9.1), + Row("D", 2.5) + ) + ) + val schema = new StructType() + .add(StructField("notImportantColumn", StringType)) + .add(StructField("columnForSum", DoubleType)) + + import AtumContext._ + + val df = spark.createDataFrame(rdd, schema) + .createAndSaveCheckpoint("checkPointNameCount", "authorOfCount") + + val argumentFirst = ArgumentCaptor.forClass(classOf[CheckpointDTO]) + verify(mockAgent, times(1)).saveCheckpoint(argumentFirst.capture()) + + assert(argumentFirst.getValue.name == "checkPointNameCount") + assert(argumentFirst.getValue.author == "authorOfCount") + assert(argumentFirst.getValue.partitioning == Seq(PartitionDTO("foo2", "bar"))) + assert(argumentFirst.getValue.measurements.head.result.mainValue.value == "4") + assert(argumentFirst.getValue.measurements.head.result.mainValue.valueType == ResultValueType.Long) + + atumContext.addMeasure(SumOfValuesOfColumn("columnForSum")) + df.createAndSaveCheckpoint("checkPointNameSum", "authorOfSum") + + val argumentSecond = ArgumentCaptor.forClass(classOf[CheckpointDTO]) + verify(mockAgent, times(2)).saveCheckpoint(argumentSecond.capture()) + + assert(argumentSecond.getValue.name == "checkPointNameSum") + assert(argumentSecond.getValue.author == "authorOfSum") + assert(argumentSecond.getValue.partitioning == Seq(PartitionDTO("foo2", "bar"))) + assert(argumentSecond.getValue.measurements.tail.head.result.mainValue.value == "22.5") + assert(argumentSecond.getValue.measurements.tail.head.result.mainValue.valueType == ResultValueType.BigDecimal) + } + } diff --git a/agent/src/test/scala/za/co/absa/atum/agent/model/MeasureTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/model/MeasureTest.scala index be7a75bfd..44ed54236 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/model/MeasureTest.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/model/MeasureTest.scala @@ -21,6 +21,7 @@ import org.scalatest.matchers.should.Matchers import za.co.absa.atum.agent.AtumAgent import za.co.absa.atum.agent.AtumContext.{AtumPartitions, DatasetWrapper} import za.co.absa.atum.agent.model.Measure._ +import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType import za.co.absa.spark.commons.test.SparkTestBase class MeasureTest extends AnyFlatSpec with Matchers with SparkTestBase { self => @@ -36,11 +37,14 @@ class MeasureTest extends AnyFlatSpec with Matchers with SparkTestBase { self => val sumOfHashes: Measure = SumOfHashesOfColumn(controlCol = "id") // AtumContext contains `Measurement` - val atumContextInstanceWithRecordCount = AtumAgent.getOrCreateAtumContext(AtumPartitions("foo"->"bar")) + val atumContextInstanceWithRecordCount = AtumAgent + .getOrCreateAtumContext(AtumPartitions("foo"->"bar")) .addMeasure(measureIds) - val atumContextWithSalaryAbsMeasure = atumContextInstanceWithRecordCount.subPartitionContext(AtumPartitions("sub"->"partition")) + val atumContextWithSalaryAbsMeasure = atumContextInstanceWithRecordCount + .subPartitionContext(AtumPartitions("sub"->"partition")) .addMeasure(salaryAbsSum) - val atumContextWithNameHashSum = atumContextInstanceWithRecordCount.subPartitionContext(AtumPartitions("another"->"partition")) + val atumContextWithNameHashSum = atumContextInstanceWithRecordCount + .subPartitionContext(AtumPartitions("another"->"partition")) .addMeasure(sumOfHashes) // Pipeline @@ -48,21 +52,20 @@ class MeasureTest extends AnyFlatSpec with Matchers with SparkTestBase { self => .format("csv") .option("header", "true") .load("agent/src/test/resources/random-dataset/persons.csv") - .createCheckpoint("name1", "author")(atumContextInstanceWithRecordCount) - .createCheckpoint("name2", "author")(atumContextWithNameHashSum) + .createAndSaveCheckpoint("name1", "author")(atumContextInstanceWithRecordCount) + .createAndSaveCheckpoint("name2", "author")(atumContextWithNameHashSum) val dsEnrichment = spark.read .format("csv") .option("header", "true") .load("agent/src/test/resources/random-dataset/persons-enriched.csv") - .createCheckpoint("name3", "author")( - atumContextWithSalaryAbsMeasure - .removeMeasure(salaryAbsSum) + .createAndSaveCheckpoint("name3", "author")( + atumContextWithSalaryAbsMeasure.removeMeasure(salaryAbsSum) ) val dfFull = dfPersons .join(dsEnrichment, Seq("id")) - .createCheckpoint("other different name", "author")(atumContextWithSalaryAbsMeasure) + .createAndSaveCheckpoint("other different name", "author")(atumContextWithSalaryAbsMeasure) val dfExtraPersonWithNegativeSalary = spark .createDataFrame( @@ -74,20 +77,32 @@ class MeasureTest extends AnyFlatSpec with Matchers with SparkTestBase { self => val dfExtraPerson = dfExtraPersonWithNegativeSalary.union(dfPersons) - dfExtraPerson.createCheckpoint("a checkpoint name", "author")( + dfExtraPerson.createAndSaveCheckpoint("a checkpoint name", "author")( atumContextWithSalaryAbsMeasure .removeMeasure(measureIds) .removeMeasure(salaryAbsSum) ) - // Assertions - assert(measureIds.function(dfPersons) == "1000") - assert(measureIds.function(dfFull) == "1000") - assert(salaryAbsSum.function(dfFull) == "2987144") - assert(sumOfHashes.function(dfFull) == "2044144307532") - assert(salarySum.function(dfExtraPerson) == "2986144") - assert(salarySum.function(dfFull) == "2987144") + val dfPersonCntResult = measureIds.function(dfPersons) + val dfFullCntResult = measureIds.function(dfFull) + val dfFullSalaryAbsSumResult = salaryAbsSum.function(dfFull) + val dfFullHashResult = sumOfHashes.function(dfFull) + val dfExtraPersonSalarySumResult = salarySum.function(dfExtraPerson) + val dfFullSalarySumResult = salarySum.function(dfFull) + // Assertions + assert(dfPersonCntResult.result == "1000") + assert(dfPersonCntResult.resultType == ResultValueType.Long) + assert(dfFullCntResult.result == "1000") + assert(dfFullCntResult.resultType == ResultValueType.Long) + assert(dfFullSalaryAbsSumResult.result == "2987144") + assert(dfFullSalaryAbsSumResult.resultType == ResultValueType.Double) + assert(dfFullHashResult.result == "2044144307532") + assert(dfFullHashResult.resultType == ResultValueType.String) + assert(dfExtraPersonSalarySumResult.result == "2986144") + assert(dfExtraPersonSalarySumResult.resultType == ResultValueType.BigDecimal) + assert(dfFullSalarySumResult.result == "2987144") + assert(dfFullSalarySumResult.resultType == ResultValueType.BigDecimal) } } diff --git a/agent/src/test/scala/za/co/absa/atum/agent/model/MeasurementBuilderTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/model/MeasurementBuilderTest.scala index b7ae5643c..324c5e6d9 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/model/MeasurementBuilderTest.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/model/MeasurementBuilderTest.scala @@ -24,9 +24,9 @@ import za.co.absa.atum.model.dto.MeasureResultDTO.{ResultValueType, TypedValue} class MeasurementBuilderTest extends AnyFlatSpec { - "buildMeasurementDTO" should "build expected MeasurementDTO for Long type of result value" in { + "buildMeasurementDTO" should "build MeasurementDTO for Long type of result value when Measurement provided" in { val measure = SumOfValuesOfColumn("col") - val measurement = Measurement(measure, 1L) + val measurement = MeasurementProvided(measure, 1L) val measurementDTO = MeasurementBuilder.buildMeasurementDTO(measurement) val expectedMeasureDTO = MeasureDTO("aggregatedTotal", Seq("col")) @@ -39,9 +39,9 @@ class MeasurementBuilderTest extends AnyFlatSpec { assert(measurementDTO.result == expectedMeasureResultDTO) } - "buildMeasurementDTO" should "build MeasurementDTO with expected TypedValue for Double type of result value" in { + "buildMeasurementDTO" should "build MeasurementDTO for Double type of result value when Measurement provided" in { val measure = SumOfValuesOfColumn("col") - val measurement = Measurement(measure, 3.14) + val measurement = MeasurementProvided(measure, 3.14) val measurementDTO = MeasurementBuilder.buildMeasurementDTO(measurement) val expectedTypedValue = TypedValue("3.14", ResultValueType.Double) @@ -49,9 +49,9 @@ class MeasurementBuilderTest extends AnyFlatSpec { assert(measurementDTO.result.mainValue == expectedTypedValue) } - "buildMeasurementDTO" should "build MeasurementDTO with expected TypedValue for BigDecimal type of result value" in { + "buildMeasurementDTO" should "build MeasurementDTO for BigDecimal type of result value when Measurement provided" in { val measure = SumOfValuesOfColumn("col") - val measurement = Measurement(measure, BigDecimal(3.14)) + val measurement = MeasurementProvided(measure, BigDecimal(3.14)) val measurementDTO = MeasurementBuilder.buildMeasurementDTO(measurement) val expectedTypedValue = TypedValue("3.14", ResultValueType.BigDecimal) @@ -59,9 +59,9 @@ class MeasurementBuilderTest extends AnyFlatSpec { assert(measurementDTO.result.mainValue == expectedTypedValue) } - "buildMeasurementDTO" should "build MeasurementDTO with expected TypedValue for String type of result value" in { + "buildMeasurementDTO" should "build MeasurementDTO for String type of result value when Measurement provided" in { val measure = SumOfValuesOfColumn("col") - val measurement = Measurement(measure, "stringValue") + val measurement = MeasurementProvided(measure, "stringValue") val measurementDTO = MeasurementBuilder.buildMeasurementDTO(measurement) val expectedTypedValue = TypedValue("stringValue", ResultValueType.String) @@ -69,11 +69,26 @@ class MeasurementBuilderTest extends AnyFlatSpec { assert(measurementDTO.result.mainValue == expectedTypedValue) } - "buildMeasurementDTO" should "throw exception for unsupported result value type" in { + "buildMeasurementDTO" should "throw exception for unsupported result value type when Measurement provided" in { val measure = SumOfValuesOfColumn("col") - val measurement = Measurement(measure, 1) + val measurement = MeasurementProvided(measure, 1) assertThrows[UnsupportedMeasureResultType](MeasurementBuilder.buildMeasurementDTO(measurement)) } + "buildMeasurementDTO" should "build MeasurementDTO for Long type of result value when measured by Agent" in { + val measure = SumOfValuesOfColumn("col") + val measurement = MeasurementByAtum(measure, "1", ResultValueType.Long) + val measurementDTO = MeasurementBuilder.buildMeasurementDTO(measurement) + + val expectedMeasureDTO = MeasureDTO("aggregatedTotal", Seq("col")) + + val expectedMeasureResultDTO = MeasureResultDTO( + TypedValue("1", ResultValueType.Long) + ) + + assert(measurementDTO.measure == expectedMeasureDTO) + assert(measurementDTO.result == expectedMeasureResultDTO) + } + } diff --git a/agent/src/test/scala/za/co/absa/atum/agent/model/MeasuresMapperTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/model/MeasuresMapperTest.scala index 84b5a7d50..79b8ba296 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/model/MeasuresMapperTest.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/model/MeasuresMapperTest.scala @@ -18,7 +18,7 @@ package za.co.absa.atum.agent.model import org.scalatest.flatspec.AnyFlatSpecLike import za.co.absa.atum.agent.exception.UnsupportedMeasureException -import za.co.absa.atum.agent.model.Measure.{AbsSumOfValuesOfColumn, DistinctRecordCount, RecordCount, SumOfHashesOfColumn, SumOfValuesOfColumn} +import za.co.absa.atum.agent.model.Measure._ import za.co.absa.atum.model.dto.MeasureDTO class MeasuresMapperTest extends AnyFlatSpecLike { From 7fdfa5636f29cdd656b2cc676eab7935c6488167 Mon Sep 17 00:00:00 2001 From: Ladislav Sulak Date: Wed, 25 Oct 2023 14:42:26 +0200 Subject: [PATCH 03/15] #94: measure validation for a given column and small refactoring --- .../za/co/absa/atum/agent/AtumContext.scala | 25 +++++- .../agent/core/MeasurementProcessor.scala | 2 +- .../za/co/absa/atum/agent/model/Measure.scala | 20 ++--- .../co/absa/atum/agent/AtumContextTest.scala | 85 ++++++++++++++++++- .../absa/atum/agent/model/MeasureTest.scala | 12 +-- project/Dependencies.scala | 5 ++ 6 files changed, 126 insertions(+), 23 deletions(-) diff --git a/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala b/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala index 6c940c5b7..4fa120907 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala @@ -16,7 +16,9 @@ package za.co.absa.atum.agent +import org.slf4s.Logging import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.types.NumericType import za.co.absa.atum.agent.AtumContext.AtumPartitions import za.co.absa.atum.agent.model.{Checkpoint, Measure, Measurement, MeasurementByAtum, MeasuresMapper} import za.co.absa.atum.model.dto._ @@ -34,7 +36,7 @@ class AtumContext private[agent] ( val atumPartitions: AtumPartitions, val agent: AtumAgent, private var measures: Set[Measure] = Set.empty -) { +) extends Logging { def currentMeasures: Set[Measure] = measures @@ -42,10 +44,29 @@ class AtumContext private[agent] ( agent.getOrCreateAtumSubContext(atumPartitions ++ subPartitions)(this) } + private def validateMeasureApplicability(measure: Measure, df: DataFrame): Unit = { + require( + df.columns.contains(measure.controlCol), + s"Column(s) '${measure.controlCol}' must be present in dataframe, but it's not. " + + s"Columns in the dataframe: ${df.columns.mkString(", ")}." + ) + + val colDataType = df.select(measure.controlCol).schema.fields(0).dataType + val isColDataTypeNumeric = colDataType.isInstanceOf[NumericType] + if (measure.onlyForNumeric && !isColDataTypeNumeric) { + log.warn( // TODO: discuss, throw exception or warn message? Or both, parametrized? + s"Column ${measure.controlCol} measurement ${measure.measureName} requested, but the field is not numeric! " + + s"Found: ${colDataType.simpleString} datatype." + ) + } + } + private def takeMeasurements(df: DataFrame): Set[Measurement] = { measures.map { m => + validateMeasureApplicability(m, df) + val measurementResult = m.function(df) - MeasurementByAtum(m, measurementResult.result, measurementResult.resultType) + MeasurementByAtum(m, measurementResult.resultValue, measurementResult.resultType) } } diff --git a/agent/src/main/scala/za/co/absa/atum/agent/core/MeasurementProcessor.scala b/agent/src/main/scala/za/co/absa/atum/agent/core/MeasurementProcessor.scala index 026f71e9c..fdc41b0e8 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/core/MeasurementProcessor.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/core/MeasurementProcessor.scala @@ -32,7 +32,7 @@ object MeasurementProcessor { * (overflows, consistent representation of numbers - whether they are coming from Java or Scala world, and more), * but the actual type is stored alongside the computation because we don't want to lost this information. */ - final case class ResultOfMeasurement(result: String, resultType: ResultValueType.ResultValueType) + final case class ResultOfMeasurement(resultValue: String, resultType: ResultValueType.ResultValueType) type MeasurementFunction = DataFrame => ResultOfMeasurement diff --git a/agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala b/agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala index 1dcb5731d..d95666dff 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala @@ -49,8 +49,8 @@ object Measure { override def function: MeasurementFunction = (ds: DataFrame) => { - val result = ds.select(col(controlCol)).count().toString - ResultOfMeasurement(result, ResultValueType.Long) + val resultValue = ds.select(col(controlCol)).count().toString + ResultOfMeasurement(resultValue, ResultValueType.Long) } } object RecordCount extends MeasureType { @@ -67,8 +67,8 @@ object Measure { override def function: MeasurementFunction = (ds: DataFrame) => { - val result = ds.select(col(controlCol)).distinct().count().toString - ResultOfMeasurement(result, ResultValueType.Long) + val resultValue = ds.select(col(controlCol)).distinct().count().toString + ResultOfMeasurement(resultValue, ResultValueType.Long) } } @@ -86,8 +86,8 @@ object Measure { override def function: MeasurementFunction = (ds: DataFrame) => { val aggCol = sum(col(valueColumnName)) - val result = aggregateColumn(ds, controlCol, aggCol) - ResultOfMeasurement(result, ResultValueType.BigDecimal) + val resultValue = aggregateColumn(ds, controlCol, aggCol) + ResultOfMeasurement(resultValue, ResultValueType.BigDecimal) } } @@ -105,8 +105,8 @@ object Measure { override def function: MeasurementFunction = (ds: DataFrame) => { val aggCol = sum(abs(col(valueColumnName))) - val result = aggregateColumn(ds, controlCol, aggCol) - ResultOfMeasurement(result, ResultValueType.Double) + val resultValue = aggregateColumn(ds, controlCol, aggCol) + ResultOfMeasurement(resultValue, ResultValueType.Double) } } @@ -129,8 +129,8 @@ object Measure { .withColumn(aggregatedColumnName, crc32(col(controlCol).cast("String"))) .agg(sum(col(aggregatedColumnName))) .collect()(0)(0) - val result = if (value == null) "" else value.toString - ResultOfMeasurement(result, ResultValueType.String) + val resultValue = if (value == null) "" else value.toString + ResultOfMeasurement(resultValue, ResultValueType.String) } } diff --git a/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala index f356d9d75..6a9af7c49 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala @@ -16,18 +16,20 @@ package za.co.absa.atum.agent -import org.apache.spark.sql.types.DoubleType +import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} -import org.mockito.Mockito.{mock, times, verify} import org.mockito.ArgumentCaptor +import org.mockito.IdiomaticMockito.StubbingOps +import org.mockito.Mockito.{mock, times, verify} import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers +import org.slf4s +import org.slf4j.{Logger => Underlying} import za.co.absa.atum.agent.AtumContext.AtumPartitions import za.co.absa.atum.agent.model.Measure.{RecordCount, SumOfValuesOfColumn} import za.co.absa.atum.agent.model.MeasurementProvided -import za.co.absa.atum.model.dto._ -import org.apache.spark.sql.types.{StringType, StructField, StructType} import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType +import za.co.absa.atum.model.dto._ class AtumContextTest extends AnyFlatSpec with Matchers { @@ -171,4 +173,79 @@ class AtumContextTest extends AnyFlatSpec with Matchers { assert(argumentSecond.getValue.measurements.tail.head.result.mainValue.valueType == ResultValueType.BigDecimal) } + "createCheckpoint" should "take measurements and fail because numeric measure is defined on non-numeric column" in { + val mockAgent = mock(classOf[AtumAgent]) + val mockLogger = mock(classOf[Underlying]) + mockLogger.isWarnEnabled returns true + + implicit val atumContext: AtumContext = new AtumContext(AtumPartitions("foo2" -> "bar"), mockAgent) { + override val log: slf4s.Logger = slf4s.Logger(mockLogger) + }.addMeasure(SumOfValuesOfColumn("nonNumericalColumn")) + + val spark = SparkSession.builder + .master("local") + .config("spark.driver.host", "localhost") + .config("spark.ui.enabled", "false") + .getOrCreate() + + val rdd = spark.sparkContext.parallelize( + Seq( + Row("A", 8.0), + Row("B", 2.9), + Row("C", 9.1), + Row("D", 2.5) + ) + ) + val schema = new StructType() + .add(StructField("nonNumericalColumn", StringType)) + .add(StructField("numericalColumn", DoubleType)) + + import AtumContext._ + + val df = spark.createDataFrame(rdd, schema) + df.createAndSaveCheckpoint("checkPointNameCountInvalid", "authorOfCount") + + verify(mockLogger).warn( + "Column nonNumericalColumn measurement aggregatedTotal requested, " + + "but the field is not numeric! Found: string datatype." + ) + } + + "createCheckpoint" should "take measurements and fail because column doesn't exist" in { + val mockAgent = mock(classOf[AtumAgent]) + + implicit val atumContext: AtumContext = new AtumContext(AtumPartitions("foo2" -> "bar"), mockAgent) + .addMeasure(RecordCount("nonExistingColumn")) + + val spark = SparkSession.builder + .master("local") + .config("spark.driver.host", "localhost") + .config("spark.ui.enabled", "false") + .getOrCreate() + + val rdd = spark.sparkContext.parallelize( + Seq( + Row("A", 8.0), + Row("B", 2.9), + Row("C", 9.1), + Row("D", 2.5) + ) + ) + val schema = new StructType() + .add(StructField("nonNumericalColumn", StringType)) + .add(StructField("numericalColumn", DoubleType)) + + import AtumContext._ + + val df = spark.createDataFrame(rdd, schema) + + val caughtException = the[IllegalArgumentException] thrownBy { + df.createAndSaveCheckpoint("checkPointNameCountColNonExisting", "authorOfCount") + } + caughtException.getMessage should include( + "Column(s) 'nonExistingColumn' must be present in dataframe, but it's not. " + + s"Columns in the dataframe: nonNumericalColumn, numericalColumn." + ) + } + } diff --git a/agent/src/test/scala/za/co/absa/atum/agent/model/MeasureTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/model/MeasureTest.scala index 44ed54236..9a4126505 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/model/MeasureTest.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/model/MeasureTest.scala @@ -91,17 +91,17 @@ class MeasureTest extends AnyFlatSpec with Matchers with SparkTestBase { self => val dfFullSalarySumResult = salarySum.function(dfFull) // Assertions - assert(dfPersonCntResult.result == "1000") + assert(dfPersonCntResult.resultValue == "1000") assert(dfPersonCntResult.resultType == ResultValueType.Long) - assert(dfFullCntResult.result == "1000") + assert(dfFullCntResult.resultValue == "1000") assert(dfFullCntResult.resultType == ResultValueType.Long) - assert(dfFullSalaryAbsSumResult.result == "2987144") + assert(dfFullSalaryAbsSumResult.resultValue == "2987144") assert(dfFullSalaryAbsSumResult.resultType == ResultValueType.Double) - assert(dfFullHashResult.result == "2044144307532") + assert(dfFullHashResult.resultValue == "2044144307532") assert(dfFullHashResult.resultType == ResultValueType.String) - assert(dfExtraPersonSalarySumResult.result == "2986144") + assert(dfExtraPersonSalarySumResult.resultValue == "2986144") assert(dfExtraPersonSalarySumResult.resultType == ResultValueType.BigDecimal) - assert(dfFullSalarySumResult.result == "2987144") + assert(dfFullSalarySumResult.resultValue == "2987144") assert(dfFullSalarySumResult.resultType == ResultValueType.BigDecimal) } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index a5d804d4c..9733978e5 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -43,6 +43,9 @@ object Dependencies { val sparkCommons = "0.6.1" val sttp = "3.5.2" + + val slf4s = "1.7.25" + val logback = "1.3.0" } private def limitVersion(version: String, parts: Int): String = { @@ -58,6 +61,8 @@ object Dependencies { } def commonDependencies: Seq[ModuleID] = Seq( + "org.slf4s" %% "slf4s-api" % Versions.slf4s, + "ch.qos.logback" % "logback-classic" % Versions.logback, "org.scalatest" %% "scalatest" % Versions.scalatest % Test, "org.mockito" %% "mockito-scala" % Versions.scalaMockito % Test ) From e035c84602713c1506b88ff79164fdd52e4391af Mon Sep 17 00:00:00 2001 From: Ladislav Sulak Date: Thu, 26 Oct 2023 14:02:05 +0200 Subject: [PATCH 04/15] #26: post-review improvements and refactoring --- .../za/co/absa/atum/agent/AtumAgent.scala | 6 ++-- .../za/co/absa/atum/agent/AtumContext.scala | 30 ++++++++++++---- .../co/absa/atum/agent/model/Checkpoint.scala | 7 +--- .../absa/atum/agent/model/Measurement.scala | 35 ++++++++++++++++++- .../atum/agent/model/MeasurementBuilder.scala | 23 +++++------- .../co/absa/atum/agent/AtumContextTest.scala | 2 +- .../agent/model/MeasurementBuilderTest.scala | 6 ++-- 7 files changed, 73 insertions(+), 36 deletions(-) diff --git a/agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala b/agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala index deeb0a486..0de25b152 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala @@ -22,7 +22,7 @@ import za.co.absa.atum.agent.model.Checkpoint import za.co.absa.atum.model.dto.{CheckpointDTO, PartitioningDTO} /** - * Place holder for the agent that communicate with the API. + * Entity that communicate with the API, primarily focused on spawning Atum Context(s). */ class AtumAgent private[agent] () { @@ -39,7 +39,7 @@ class AtumAgent private[agent] () { * * @param checkpoint Already initialized Checkpoint object to store */ - def saveCheckpoint(checkpoint: CheckpointDTO): Unit = { + private [agent] def saveCheckpoint(checkpoint: CheckpointDTO): Unit = { dispatcher.saveCheckpoint(checkpoint) } @@ -48,7 +48,7 @@ class AtumAgent private[agent] () { * * @param checkpoint Already initialized Checkpoint object to store */ - def saveCheckpoint(checkpoint: Checkpoint): Unit = { + private [agent] def saveCheckpoint(checkpoint: Checkpoint): Unit = { dispatcher.saveCheckpoint(checkpoint.toCheckpointDTO) } diff --git a/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala b/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala index 6c940c5b7..c989bc1d9 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala @@ -18,7 +18,7 @@ package za.co.absa.atum.agent import org.apache.spark.sql.DataFrame import za.co.absa.atum.agent.AtumContext.AtumPartitions -import za.co.absa.atum.agent.model.{Checkpoint, Measure, Measurement, MeasurementByAtum, MeasuresMapper} +import za.co.absa.atum.agent.model._ import za.co.absa.atum.model.dto._ import java.time.ZonedDateTime @@ -42,14 +42,14 @@ class AtumContext private[agent] ( agent.getOrCreateAtumSubContext(atumPartitions ++ subPartitions)(this) } - private def takeMeasurements(df: DataFrame): Set[Measurement] = { + private def takeMeasurements(df: DataFrame): Set[MeasurementByAtum] = { measures.map { m => val measurementResult = m.function(df) MeasurementByAtum(m, measurementResult.result, measurementResult.resultType) } } - def createCheckpoint(checkpointName: String, author: String, dataToMeasure: DataFrame): Checkpoint = { + private [agent] def createCheckpoint(checkpointName: String, author: String, dataToMeasure: DataFrame): Checkpoint = { val startTime = ZonedDateTime.now() val measurements = takeMeasurements(dataToMeasure) val endTime = ZonedDateTime.now() @@ -65,7 +65,15 @@ class AtumContext private[agent] ( ) } - def createCheckpointOnProvidedData( + def createAndSaveCheckpoint(checkpointName: String, author: String, dataToMeasure: DataFrame): AtumContext = { + val checkpoint = createCheckpoint(checkpointName, author, dataToMeasure) + val checkpointDTO = checkpoint.toCheckpointDTO + + agent.saveCheckpoint(checkpointDTO) + this + } + + private [agent] def createCheckpointOnProvidedData( checkpointName: String, author: String, measurements: Seq[Measurement] ): Checkpoint = { val zonedDateTimeNow = ZonedDateTime.now() @@ -80,6 +88,16 @@ class AtumContext private[agent] ( ) } + def createAndSaveCheckpointOnProvidedData( + checkpointName: String, author: String, measurements: Seq[Measurement] + ): AtumContext = { + val checkpoint = createCheckpointOnProvidedData(checkpointName, author, measurements) + val checkpointDTO = checkpoint.toCheckpointDTO + + agent.saveCheckpoint(checkpointDTO) + this + } + def addAdditionalData(key: String, value: String) = { ??? // TODO #60 } @@ -147,9 +165,7 @@ object AtumContext { * @return */ def createAndSaveCheckpoint(checkpointName: String, author: String)(implicit atumContext: AtumContext): DataFrame = { - val checkpoint = atumContext.createCheckpoint(checkpointName, author, df) - val checkpointDTO = checkpoint.toCheckpointDTO - atumContext.agent.saveCheckpoint(checkpointDTO) + atumContext.createAndSaveCheckpoint(checkpointName, author, df) df } diff --git a/agent/src/main/scala/za/co/absa/atum/agent/model/Checkpoint.scala b/agent/src/main/scala/za/co/absa/atum/agent/model/Checkpoint.scala index 3a2c05181..666f07936 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/model/Checkpoint.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/model/Checkpoint.scala @@ -32,12 +32,7 @@ case class Checkpoint( measurements: Seq[Measurement] ) { private [agent] def toCheckpointDTO: CheckpointDTO = { - val measurementDTOs = measurements.map { - case provided: MeasurementProvided => - MeasurementBuilder.buildMeasurementDTO(provided) - case byAtum: MeasurementByAtum => - MeasurementBuilder.buildMeasurementDTO(byAtum) - } + val measurementDTOs = measurements.map(MeasurementBuilder.buildMeasurementDTO) CheckpointDTO( id = UUID.randomUUID(), diff --git a/agent/src/main/scala/za/co/absa/atum/agent/model/Measurement.scala b/agent/src/main/scala/za/co/absa/atum/agent/model/Measurement.scala index a7c0851a5..72bf9ade7 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/model/Measurement.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/model/Measurement.scala @@ -16,6 +16,7 @@ package za.co.absa.atum.agent.model +import za.co.absa.atum.agent.exception.UnsupportedMeasureResultType import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType.ResultValueType trait Measurement { @@ -27,7 +28,39 @@ trait Measurement { * When the application/user of Atum Agent provides actual results by himself, the type is precise and we don't need * to do any adjustments. */ -case class MeasurementProvided(measure: Measure, result: Any) extends Measurement +abstract class MeasurementProvided[T](measure: Measure, result: T) extends Measurement + +private case class MeasurementProvidedAsLong (override val measure: Measure, override val result: Long) + extends MeasurementProvided[Long](measure, result) + +private case class MeasurementProvidedAsDouble (override val measure: Measure, override val result: Double) + extends MeasurementProvided[Double](measure, result) + +private case class MeasurementProvidedAsBigDecimal(override val measure: Measure, override val result: BigDecimal) + extends MeasurementProvided[BigDecimal](measure, result) + +private case class MeasurementProvidedAsString(override val measure: Measure, override val result: String) + extends MeasurementProvided[String](measure, result) + +object MeasurementProvided { + def apply[T](measure: Measure, result: T): Measurement = { + result match { + case l: Long => + MeasurementProvidedAsLong(measure, l) + case d: Double => + MeasurementProvidedAsDouble(measure, d) + case bd: BigDecimal => + MeasurementProvidedAsBigDecimal(measure, bd) + case s: String => + MeasurementProvidedAsString(measure, s) + case unsupportedType => + val className = unsupportedType.getClass.getSimpleName + throw UnsupportedMeasureResultType( + s"Unsupported type of measurement for measure ${measure.measureName}: $className for provided result: $result" + ) + } + } +} /** * When the Atum Agent itself performs the measurements, using Spark, then in some cases some adjustments are diff --git a/agent/src/main/scala/za/co/absa/atum/agent/model/MeasurementBuilder.scala b/agent/src/main/scala/za/co/absa/atum/agent/model/MeasurementBuilder.scala index 4fd4acc09..ec1174062 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/model/MeasurementBuilder.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/model/MeasurementBuilder.scala @@ -22,24 +22,19 @@ import za.co.absa.atum.model.dto.MeasureResultDTO.{ResultValueType, TypedValue} private [agent] object MeasurementBuilder { - private [agent] def buildMeasurementDTO(measurement: MeasurementByAtum): MeasurementDTO = { + private [agent] def buildMeasurementDTO(measurement: Measurement): MeasurementDTO = { val measureName = measurement.measure.measureName val controlCols = Seq(measurement.measure.controlCol) + val measureDTO = MeasureDTO(measureName, controlCols) - MeasurementDTO( - MeasureDTO(measureName, controlCols), - MeasureResultDTO(TypedValue(measurement.result, measurement.resultType)) - ) - } - - private [agent] def buildMeasurementDTO(measurement: MeasurementProvided): MeasurementDTO = { - val measureName = measurement.measure.measureName - val controlCols = Seq(measurement.measure.controlCol) + val measureResultDTO = measurement match { + case m: MeasurementByAtum => + MeasureResultDTO(TypedValue(m.result, m.resultType)) + case m: Measurement => + buildMeasureResultDTO(measureName, m.result) + } - MeasurementDTO( - MeasureDTO(measureName, controlCols), - buildMeasureResultDTO(measureName, measurement.result) - ) + MeasurementDTO(measureDTO, measureResultDTO) } private [agent] def buildMeasureResultDTO(measureName: String, result: Any): MeasureResultDTO = { diff --git a/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala index f356d9d75..8e7d02952 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala @@ -103,7 +103,7 @@ class AtumContextTest extends AnyFlatSpec with Matchers { val measurements = Seq( MeasurementProvided(RecordCount("col"), "1"), - MeasurementProvided(SumOfValuesOfColumn("col"), 1) + MeasurementProvided(SumOfValuesOfColumn("col"), 1L) ) val checkpoint = atumContext.createCheckpointOnProvidedData( diff --git a/agent/src/test/scala/za/co/absa/atum/agent/model/MeasurementBuilderTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/model/MeasurementBuilderTest.scala index 324c5e6d9..3d538547c 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/model/MeasurementBuilderTest.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/model/MeasurementBuilderTest.scala @@ -26,7 +26,7 @@ class MeasurementBuilderTest extends AnyFlatSpec { "buildMeasurementDTO" should "build MeasurementDTO for Long type of result value when Measurement provided" in { val measure = SumOfValuesOfColumn("col") - val measurement = MeasurementProvided(measure, 1L) + val measurement = MeasurementProvided[Long](measure, 1L) val measurementDTO = MeasurementBuilder.buildMeasurementDTO(measurement) val expectedMeasureDTO = MeasureDTO("aggregatedTotal", Seq("col")) @@ -71,9 +71,7 @@ class MeasurementBuilderTest extends AnyFlatSpec { "buildMeasurementDTO" should "throw exception for unsupported result value type when Measurement provided" in { val measure = SumOfValuesOfColumn("col") - val measurement = MeasurementProvided(measure, 1) - - assertThrows[UnsupportedMeasureResultType](MeasurementBuilder.buildMeasurementDTO(measurement)) + assertThrows[UnsupportedMeasureResultType](MeasurementProvided(measure, 1)) } "buildMeasurementDTO" should "build MeasurementDTO for Long type of result value when measured by Agent" in { From 93ee090cfabccddffd915417e8666f4fb01e8d74 Mon Sep 17 00:00:00 2001 From: Ladislav Sulak Date: Thu, 26 Oct 2023 17:53:18 +0200 Subject: [PATCH 05/15] #26: extra validation, putting emphasis on types, some generics, more unit tests, refactoring & simplification --- .../MeasurementProvidedException.scala | 19 ++++ .../za/co/absa/atum/agent/model/Measure.scala | 87 ++++++++++++------- .../absa/atum/agent/model/Measurement.scala | 53 ++++++----- .../atum/agent/model/MeasurementBuilder.scala | 27 +----- .../co/absa/atum/agent/AtumContextTest.scala | 7 +- .../agent/model/MeasurementBuilderTest.scala | 31 ++----- .../atum/agent/model/MeasurementTest.scala | 60 +++++++++++++ 7 files changed, 178 insertions(+), 106 deletions(-) create mode 100644 agent/src/main/scala/za/co/absa/atum/agent/exception/MeasurementProvidedException.scala create mode 100644 agent/src/test/scala/za/co/absa/atum/agent/model/MeasurementTest.scala diff --git a/agent/src/main/scala/za/co/absa/atum/agent/exception/MeasurementProvidedException.scala b/agent/src/main/scala/za/co/absa/atum/agent/exception/MeasurementProvidedException.scala new file mode 100644 index 000000000..afd90ece2 --- /dev/null +++ b/agent/src/main/scala/za/co/absa/atum/agent/exception/MeasurementProvidedException.scala @@ -0,0 +1,19 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.agent.exception + +case class MeasurementProvidedException(msg: String) extends Exception(msg) diff --git a/agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala b/agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala index 1dcb5731d..0ce5a190e 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala @@ -34,6 +34,7 @@ sealed trait Measure extends MeasurementProcessor with MeasureType { trait MeasureType { val measureName: String val onlyForNumeric: Boolean + val resultValueType: ResultValueType.ResultValueType } object Measure { @@ -41,86 +42,110 @@ object Measure { private val valueColumnName: String = "value" val supportedMeasures: Seq[MeasureType] = Seq( - RecordCount, DistinctRecordCount, SumOfValuesOfColumn, AbsSumOfValuesOfColumn, SumOfHashesOfColumn + RecordCount, + DistinctRecordCount, + SumOfValuesOfColumn, + AbsSumOfValuesOfColumn, + SumOfHashesOfColumn ) val supportedMeasureNames: Seq[String] = supportedMeasures.map(_.measureName) - case class RecordCount private (controlCol: String, measureName: String, onlyForNumeric: Boolean) extends Measure { + case class RecordCount private ( + controlCol: String, + measureName: String, + onlyForNumeric: Boolean, + resultValueType: ResultValueType.ResultValueType + ) extends Measure { override def function: MeasurementFunction = (ds: DataFrame) => { - val result = ds.select(col(controlCol)).count().toString - ResultOfMeasurement(result, ResultValueType.Long) + val resultValue = ds.select(col(controlCol)).count().toString + ResultOfMeasurement(resultValue, resultValueType) } } object RecordCount extends MeasureType { - def apply(controlCol: String): RecordCount = { - RecordCount(controlCol, measureName, onlyForNumeric) - } + def apply(controlCol: String): RecordCount = RecordCount(controlCol, measureName, onlyForNumeric, resultValueType) override val measureName: String = "count" override val onlyForNumeric: Boolean = false + override val resultValueType: ResultValueType.ResultValueType = ResultValueType.Long } - case class DistinctRecordCount private (controlCol: String, measureName: String, onlyForNumeric: Boolean) - extends Measure { + case class DistinctRecordCount private ( + controlCol: String, + measureName: String, + onlyForNumeric: Boolean, + resultValueType: ResultValueType.ResultValueType + ) extends Measure { override def function: MeasurementFunction = (ds: DataFrame) => { - val result = ds.select(col(controlCol)).distinct().count().toString - ResultOfMeasurement(result, ResultValueType.Long) + val resultValue = ds.select(col(controlCol)).distinct().count().toString + ResultOfMeasurement(resultValue, resultValueType) } } - object DistinctRecordCount extends MeasureType { def apply(controlCol: String): DistinctRecordCount = { - DistinctRecordCount(controlCol, measureName, onlyForNumeric) + DistinctRecordCount(controlCol, measureName, onlyForNumeric, resultValueType) } override val measureName: String = "distinctCount" override val onlyForNumeric: Boolean = false + override val resultValueType: ResultValueType.ResultValueType = ResultValueType.Long } - case class SumOfValuesOfColumn private (controlCol: String, measureName: String, onlyForNumeric: Boolean) - extends Measure { + case class SumOfValuesOfColumn private ( + controlCol: String, + measureName: String, + onlyForNumeric: Boolean, + resultValueType: ResultValueType.ResultValueType + ) extends Measure { override def function: MeasurementFunction = (ds: DataFrame) => { val aggCol = sum(col(valueColumnName)) - val result = aggregateColumn(ds, controlCol, aggCol) - ResultOfMeasurement(result, ResultValueType.BigDecimal) + val resultValue = aggregateColumn(ds, controlCol, aggCol) + ResultOfMeasurement(resultValue, resultValueType) } } - object SumOfValuesOfColumn extends MeasureType { def apply(controlCol: String): SumOfValuesOfColumn = { - SumOfValuesOfColumn(controlCol, measureName, onlyForNumeric) + SumOfValuesOfColumn(controlCol, measureName, onlyForNumeric, resultValueType) } override val measureName: String = "aggregatedTotal" override val onlyForNumeric: Boolean = true + override val resultValueType: ResultValueType.ResultValueType = ResultValueType.BigDecimal } - case class AbsSumOfValuesOfColumn private (controlCol: String, measureName: String, onlyForNumeric: Boolean) - extends Measure { + case class AbsSumOfValuesOfColumn private ( + controlCol: String, + measureName: String, + onlyForNumeric: Boolean, + resultValueType: ResultValueType.ResultValueType + ) extends Measure { override def function: MeasurementFunction = (ds: DataFrame) => { val aggCol = sum(abs(col(valueColumnName))) - val result = aggregateColumn(ds, controlCol, aggCol) - ResultOfMeasurement(result, ResultValueType.Double) + val resultValue = aggregateColumn(ds, controlCol, aggCol) + ResultOfMeasurement(resultValue, resultValueType) } } - object AbsSumOfValuesOfColumn extends MeasureType { def apply(controlCol: String): AbsSumOfValuesOfColumn = { - AbsSumOfValuesOfColumn(controlCol, measureName, onlyForNumeric) + AbsSumOfValuesOfColumn(controlCol, measureName, onlyForNumeric, resultValueType) } override val measureName: String = "absAggregatedTotal" override val onlyForNumeric: Boolean = true + override val resultValueType: ResultValueType.ResultValueType = ResultValueType.Double } - case class SumOfHashesOfColumn private (controlCol: String, measureName: String, onlyForNumeric: Boolean) - extends Measure { + case class SumOfHashesOfColumn private ( + controlCol: String, + measureName: String, + onlyForNumeric: Boolean, + resultValueType: ResultValueType.ResultValueType + ) extends Measure { override def function: MeasurementFunction = (ds: DataFrame) => { @@ -129,18 +154,18 @@ object Measure { .withColumn(aggregatedColumnName, crc32(col(controlCol).cast("String"))) .agg(sum(col(aggregatedColumnName))) .collect()(0)(0) - val result = if (value == null) "" else value.toString - ResultOfMeasurement(result, ResultValueType.String) + val resultValue = if (value == null) "" else value.toString + ResultOfMeasurement(resultValue, ResultValueType.String) } } - object SumOfHashesOfColumn extends MeasureType { def apply(controlCol: String): SumOfHashesOfColumn = { - SumOfHashesOfColumn(controlCol, measureName, onlyForNumeric) + SumOfHashesOfColumn(controlCol, measureName, onlyForNumeric, resultValueType) } override val measureName: String = "hashCrc32" override val onlyForNumeric: Boolean = false + override val resultValueType: ResultValueType.ResultValueType = ResultValueType.String } private def aggregateColumn( diff --git a/agent/src/main/scala/za/co/absa/atum/agent/model/Measurement.scala b/agent/src/main/scala/za/co/absa/atum/agent/model/Measurement.scala index 72bf9ade7..c2bdd829f 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/model/Measurement.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/model/Measurement.scala @@ -16,47 +16,53 @@ package za.co.absa.atum.agent.model -import za.co.absa.atum.agent.exception.UnsupportedMeasureResultType -import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType.ResultValueType +import za.co.absa.atum.agent.exception.MeasurementProvidedException +import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType trait Measurement { val measure: Measure - val result: Any + val resultValue: Any + val resultType: ResultValueType.ResultValueType } /** * When the application/user of Atum Agent provides actual results by himself, the type is precise and we don't need * to do any adjustments. */ -abstract class MeasurementProvided[T](measure: Measure, result: T) extends Measurement +case class MeasurementProvided[T](measure: Measure, resultValue: T, resultType: ResultValueType.ResultValueType) + extends Measurement -private case class MeasurementProvidedAsLong (override val measure: Measure, override val result: Long) - extends MeasurementProvided[Long](measure, result) - -private case class MeasurementProvidedAsDouble (override val measure: Measure, override val result: Double) - extends MeasurementProvided[Double](measure, result) +object MeasurementProvided { -private case class MeasurementProvidedAsBigDecimal(override val measure: Measure, override val result: BigDecimal) - extends MeasurementProvided[BigDecimal](measure, result) + def handleSpecificType[T]( + measure: Measure, resultValue: T, requiredType: ResultValueType.ResultValueType + ): MeasurementProvided[T] = { -private case class MeasurementProvidedAsString(override val measure: Measure, override val result: String) - extends MeasurementProvided[String](measure, result) + val actualType = measure.resultValueType + if (actualType != requiredType) + throw MeasurementProvidedException( + s"Type of a given provided measurement result and type that a given measure supports are not compatible! " + + s"Got $actualType but should be $requiredType" + ) + MeasurementProvided[T](measure, resultValue, requiredType) + } -object MeasurementProvided { - def apply[T](measure: Measure, result: T): Measurement = { - result match { + def apply[T](measure: Measure, resultValue: T): Measurement = { + resultValue match { case l: Long => - MeasurementProvidedAsLong(measure, l) + handleSpecificType[Long](measure, l, ResultValueType.Long) case d: Double => - MeasurementProvidedAsDouble(measure, d) + handleSpecificType[Double](measure, d, ResultValueType.Double) case bd: BigDecimal => - MeasurementProvidedAsBigDecimal(measure, bd) + handleSpecificType[BigDecimal](measure, bd, ResultValueType.BigDecimal) case s: String => - MeasurementProvidedAsString(measure, s) + handleSpecificType[String](measure, s, ResultValueType.String) + case unsupportedType => val className = unsupportedType.getClass.getSimpleName - throw UnsupportedMeasureResultType( - s"Unsupported type of measurement for measure ${measure.measureName}: $className for provided result: $result" + throw MeasurementProvidedException( + s"Unsupported type of measurement for measure ${measure.measureName}: $className " + + s"for provided result: $resultValue" ) } } @@ -67,4 +73,5 @@ object MeasurementProvided { * needed - thus we are converting the results to strings always - but we need to keep the information about * the actual type as well. */ -case class MeasurementByAtum(measure: Measure, result: String, resultType: ResultValueType) extends Measurement +case class MeasurementByAtum(measure: Measure, resultValue: String, resultType: ResultValueType.ResultValueType) + extends Measurement diff --git a/agent/src/main/scala/za/co/absa/atum/agent/model/MeasurementBuilder.scala b/agent/src/main/scala/za/co/absa/atum/agent/model/MeasurementBuilder.scala index ec1174062..716827ef0 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/model/MeasurementBuilder.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/model/MeasurementBuilder.scala @@ -16,9 +16,8 @@ package za.co.absa.atum.agent.model -import za.co.absa.atum.agent.exception.UnsupportedMeasureResultType import za.co.absa.atum.model.dto.{MeasureDTO, MeasureResultDTO, MeasurementDTO} -import za.co.absa.atum.model.dto.MeasureResultDTO.{ResultValueType, TypedValue} +import za.co.absa.atum.model.dto.MeasureResultDTO.TypedValue private [agent] object MeasurementBuilder { @@ -27,30 +26,8 @@ private [agent] object MeasurementBuilder { val controlCols = Seq(measurement.measure.controlCol) val measureDTO = MeasureDTO(measureName, controlCols) - val measureResultDTO = measurement match { - case m: MeasurementByAtum => - MeasureResultDTO(TypedValue(m.result, m.resultType)) - case m: Measurement => - buildMeasureResultDTO(measureName, m.result) - } + val measureResultDTO = MeasureResultDTO(TypedValue(measurement.resultValue.toString, measurement.resultType)) MeasurementDTO(measureDTO, measureResultDTO) } - - private [agent] def buildMeasureResultDTO(measureName: String, result: Any): MeasureResultDTO = { - result match { - case l: Long => - MeasureResultDTO(TypedValue(l.toString, ResultValueType.Long)) - case d: Double => - MeasureResultDTO(TypedValue(d.toString, ResultValueType.Double)) - case bd: BigDecimal => - MeasureResultDTO(TypedValue(bd.toString, ResultValueType.BigDecimal)) - case s: String => - MeasureResultDTO(TypedValue(s, ResultValueType.String)) - case unsupportedType => - val className = unsupportedType.getClass.getSimpleName - throw UnsupportedMeasureResultType(s"Unsupported type of measure $measureName: $className for result: $result") - } - } - } diff --git a/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala index 8e7d02952..fffbb25d3 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala @@ -53,7 +53,6 @@ class AtumContextTest extends AnyFlatSpec with Matchers { ) assert(atumContextWithTwoDistinctRecordCount.currentMeasures.size == 3) - } "withMeasureRemoved" should "remove a measure if exists" in { @@ -93,7 +92,7 @@ class AtumContextTest extends AnyFlatSpec with Matchers { assert(checkpoint.name == "testCheckpoint") assert(checkpoint.author == "Hans") assert(checkpoint.atumPartitions == AtumPartitions("foo2", "bar")) - assert(checkpoint.measurements.head.result == "3") + assert(checkpoint.measurements.head.resultValue == "3") } "createCheckpointOnProvidedData" should "create a Checkpoint on provided data" in { @@ -102,8 +101,8 @@ class AtumContextTest extends AnyFlatSpec with Matchers { val atumContext = atumAgent.getOrCreateAtumContext(atumPartitions) val measurements = Seq( - MeasurementProvided(RecordCount("col"), "1"), - MeasurementProvided(SumOfValuesOfColumn("col"), 1L) + MeasurementProvided(RecordCount("col"), 1L), + MeasurementProvided(SumOfValuesOfColumn("col"), BigDecimal(1)) ) val checkpoint = atumContext.createCheckpointOnProvidedData( diff --git a/agent/src/test/scala/za/co/absa/atum/agent/model/MeasurementBuilderTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/model/MeasurementBuilderTest.scala index 3d538547c..927ae96df 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/model/MeasurementBuilderTest.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/model/MeasurementBuilderTest.scala @@ -17,7 +17,6 @@ package za.co.absa.atum.agent.model import org.scalatest.flatspec.AnyFlatSpec -import za.co.absa.atum.agent.exception.UnsupportedMeasureResultType import za.co.absa.atum.agent.model.Measure.SumOfValuesOfColumn import za.co.absa.atum.model.dto.{MeasureDTO, MeasureResultDTO} import za.co.absa.atum.model.dto.MeasureResultDTO.{ResultValueType, TypedValue} @@ -26,29 +25,19 @@ class MeasurementBuilderTest extends AnyFlatSpec { "buildMeasurementDTO" should "build MeasurementDTO for Long type of result value when Measurement provided" in { val measure = SumOfValuesOfColumn("col") - val measurement = MeasurementProvided[Long](measure, 1L) + val measurement = MeasurementProvided(measure, BigDecimal(1)) val measurementDTO = MeasurementBuilder.buildMeasurementDTO(measurement) val expectedMeasureDTO = MeasureDTO("aggregatedTotal", Seq("col")) val expectedMeasureResultDTO = MeasureResultDTO( - TypedValue("1", ResultValueType.Long) + TypedValue("1", ResultValueType.BigDecimal) ) assert(measurementDTO.measure == expectedMeasureDTO) assert(measurementDTO.result == expectedMeasureResultDTO) } - "buildMeasurementDTO" should "build MeasurementDTO for Double type of result value when Measurement provided" in { - val measure = SumOfValuesOfColumn("col") - val measurement = MeasurementProvided(measure, 3.14) - val measurementDTO = MeasurementBuilder.buildMeasurementDTO(measurement) - - val expectedTypedValue = TypedValue("3.14", ResultValueType.Double) - - assert(measurementDTO.result.mainValue == expectedTypedValue) - } - "buildMeasurementDTO" should "build MeasurementDTO for BigDecimal type of result value when Measurement provided" in { val measure = SumOfValuesOfColumn("col") val measurement = MeasurementProvided(measure, BigDecimal(3.14)) @@ -59,9 +48,10 @@ class MeasurementBuilderTest extends AnyFlatSpec { assert(measurementDTO.result.mainValue == expectedTypedValue) } - "buildMeasurementDTO" should "build MeasurementDTO for String type of result value when Measurement provided" in { + "buildMeasurementDTO" should "not build MeasurementDTO for incompatible String type of result value when Measurement provided" in { val measure = SumOfValuesOfColumn("col") - val measurement = MeasurementProvided(measure, "stringValue") + val measurement = MeasurementByAtum(measure, "stringValue", ResultValueType.String) + val measurementDTO = MeasurementBuilder.buildMeasurementDTO(measurement) val expectedTypedValue = TypedValue("stringValue", ResultValueType.String) @@ -69,20 +59,15 @@ class MeasurementBuilderTest extends AnyFlatSpec { assert(measurementDTO.result.mainValue == expectedTypedValue) } - "buildMeasurementDTO" should "throw exception for unsupported result value type when Measurement provided" in { - val measure = SumOfValuesOfColumn("col") - assertThrows[UnsupportedMeasureResultType](MeasurementProvided(measure, 1)) - } - - "buildMeasurementDTO" should "build MeasurementDTO for Long type of result value when measured by Agent" in { + "buildMeasurementDTO" should "build MeasurementDTO for BigDecimal type of result value when measured by Agent" in { val measure = SumOfValuesOfColumn("col") - val measurement = MeasurementByAtum(measure, "1", ResultValueType.Long) + val measurement = MeasurementByAtum(measure, "1", ResultValueType.BigDecimal) val measurementDTO = MeasurementBuilder.buildMeasurementDTO(measurement) val expectedMeasureDTO = MeasureDTO("aggregatedTotal", Seq("col")) val expectedMeasureResultDTO = MeasureResultDTO( - TypedValue("1", ResultValueType.Long) + TypedValue("1", ResultValueType.BigDecimal) ) assert(measurementDTO.measure == expectedMeasureDTO) diff --git a/agent/src/test/scala/za/co/absa/atum/agent/model/MeasurementTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/model/MeasurementTest.scala new file mode 100644 index 000000000..5e48a703c --- /dev/null +++ b/agent/src/test/scala/za/co/absa/atum/agent/model/MeasurementTest.scala @@ -0,0 +1,60 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.agent.model + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import za.co.absa.atum.agent.exception.MeasurementProvidedException +import za.co.absa.atum.agent.model.Measure._ +import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType +import za.co.absa.spark.commons.test.SparkTestBase + +class MeasurementTest extends AnyFlatSpec with Matchers with SparkTestBase { self => + + "MeasurementProvided" should "be able to be converted to MeasurementProvided object when the result is Double" in { + val measure = AbsSumOfValuesOfColumn("col") + val actualMeasurement = MeasurementProvided(measure, 1.0) + + assert(actualMeasurement.resultValue == 1.0) + assert(actualMeasurement.resultType == ResultValueType.Double) + } + + "MeasurementProvided" should "throw exception for unsupported result value - BigDecimal instead of Double" in { + val measure = AbsSumOfValuesOfColumn("col") + assertThrows[MeasurementProvidedException](MeasurementProvided(measure, BigDecimal(1.0))) + } + + "MeasurementProvided" should "throw exception for unsupported result value type in general (scalar)" in { + val measure = SumOfValuesOfColumn("col") + assertThrows[MeasurementProvidedException](MeasurementProvided(measure, 1)) + } + + "MeasurementProvided" should "throw exception for unsupported result value type in general (composite)" in { + val measure = SumOfHashesOfColumn("col") + assertThrows[MeasurementProvidedException](MeasurementProvided(measure, Map(1 -> "no-go"))) + } + + "MeasurementProvided" should "throw exception for unsupported result value type for a given Measure" in { + val measure = DistinctRecordCount("col") + assertThrows[MeasurementProvidedException](MeasurementProvided(measure, "1")) + } + + "MeasurementProvided" should "throw exception for unsupported (slightly different FPN) result value type for a given Measure" in { + val measure = SumOfValuesOfColumn("col") + assertThrows[MeasurementProvidedException](MeasurementProvided(measure, 1.0)) + } +} From 149a6242e039ebe36b824016cb1c198e0658b6fe Mon Sep 17 00:00:00 2001 From: Ladislav Sulak Date: Thu, 26 Oct 2023 18:07:18 +0200 Subject: [PATCH 06/15] #26: functionName -> measureName, to be consistent --- .../absa/atum/agent/model/MeasuresMapper.scala | 2 +- .../za/co/absa/atum/model/dto/MeasureDTO.scala | 2 +- .../model/utils/SerializationUtilsTest.scala | 16 ++++++++-------- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/agent/src/main/scala/za/co/absa/atum/agent/model/MeasuresMapper.scala b/agent/src/main/scala/za/co/absa/atum/agent/model/MeasuresMapper.scala index a5384dd65..d2a9f39e2 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/model/MeasuresMapper.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/model/MeasuresMapper.scala @@ -29,7 +29,7 @@ private [agent] object MeasuresMapper { private def createMeasure(measure: dto.MeasureDTO): za.co.absa.atum.agent.model.Measure = { val controlColumn = measure.controlColumns.head - measure.functionName match { + measure.measureName match { case RecordCount.measureName => RecordCount(controlColumn) case DistinctRecordCount.measureName => DistinctRecordCount(controlColumn) case SumOfValuesOfColumn.measureName => SumOfValuesOfColumn(controlColumn) diff --git a/model/src/main/scala/za/co/absa/atum/model/dto/MeasureDTO.scala b/model/src/main/scala/za/co/absa/atum/model/dto/MeasureDTO.scala index d82730685..511848dc8 100644 --- a/model/src/main/scala/za/co/absa/atum/model/dto/MeasureDTO.scala +++ b/model/src/main/scala/za/co/absa/atum/model/dto/MeasureDTO.scala @@ -17,6 +17,6 @@ package za.co.absa.atum.model.dto case class MeasureDTO( - functionName: String, + measureName: String, controlColumns: Seq[String] ) diff --git a/model/src/test/scala/za/co/absa/atum/model/utils/SerializationUtilsTest.scala b/model/src/test/scala/za/co/absa/atum/model/utils/SerializationUtilsTest.scala index 302958926..37a5d5ffe 100644 --- a/model/src/test/scala/za/co/absa/atum/model/utils/SerializationUtilsTest.scala +++ b/model/src/test/scala/za/co/absa/atum/model/utils/SerializationUtilsTest.scala @@ -82,14 +82,14 @@ class SerializationUtilsTest extends AnyFlatSpecLike { measures = seqMeasureDTO ) - val expectedAdditionalDataJson = "{\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"measures\":[{\"functionName\":\"count\",\"controlColumns\":[\"col\"]}],\"additionalData\":{\"additionalData\":{}}}" + val expectedAdditionalDataJson = "{\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"measures\":[{\"measureName\":\"count\",\"controlColumns\":[\"col\"]}],\"additionalData\":{\"additionalData\":{}}}" val actualAdditionalDataJson = SerializationUtils.asJson(atumContextDTO) assert(expectedAdditionalDataJson == actualAdditionalDataJson) } "fromJson" should "deserialize AtumContextDTO from json string" in { - val atumContextDTOJson = "{\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"measures\":[{\"functionName\":\"count\",\"controlColumns\":[\"col\"]}],\"additionalData\":{\"additionalData\":{}}}" + val atumContextDTOJson = "{\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"measures\":[{\"measureName\":\"count\",\"controlColumns\":[\"col\"]}],\"additionalData\":{\"additionalData\":{}}}" val seqPartitionDTO = Seq(PartitionDTO("key", "val")) val seqMeasureDTO = Set(MeasureDTO("count", Seq("col"))) @@ -150,7 +150,7 @@ class SerializationUtilsTest extends AnyFlatSpecLike { measurements = seqMeasurementDTO ) - val expectedCheckpointDTOJson = "{\"id\":\"" + uuid + "\",\"name\":\"checkpoint\",\"author\":\"author\",\"measuredByAtumAgent\":true,\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"processStartTime\":\"2023-10-24 10:20:59.005000+02\",\"processEndTime\":\"2023-10-24 10:20:59.005000+02\",\"measurements\":[{\"measure\":{\"functionName\":\"count\",\"controlColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}]}" + val expectedCheckpointDTOJson = "{\"id\":\"" + uuid + "\",\"name\":\"checkpoint\",\"author\":\"author\",\"measuredByAtumAgent\":true,\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"processStartTime\":\"2023-10-24 10:20:59.005000+02\",\"processEndTime\":\"2023-10-24 10:20:59.005000+02\",\"measurements\":[{\"measure\":{\"measureName\":\"count\",\"controlColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}]}" val actualCheckpointDTOJson = SerializationUtils.asJson(checkpointDTO) assert(expectedCheckpointDTOJson == actualCheckpointDTOJson) @@ -161,7 +161,7 @@ class SerializationUtilsTest extends AnyFlatSpecLike { val seqPartitionDTO = Seq(PartitionDTO("key", "val")) val timeWithZone = OffsetDateTime.of(2023, 10, 24, 10, 20, 59, 5000000, ZoneOffset.ofHours(2)) - val checkpointDTOJson = "{\"id\":\"" + uuid + "\",\"name\":\"checkpoint\",\"author\":\"author\",\"measuredByAtumAgent\":true,\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"processStartTime\":\"2023-10-24 10:20:59.005000+02\",\"processEndTime\":\"2023-10-24 10:20:59.005000+02\",\"measurements\":[{\"measure\":{\"functionName\":\"count\",\"controlColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}]}" + val checkpointDTOJson = "{\"id\":\"" + uuid + "\",\"name\":\"checkpoint\",\"author\":\"author\",\"measuredByAtumAgent\":true,\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"processStartTime\":\"2023-10-24 10:20:59.005000+02\",\"processEndTime\":\"2023-10-24 10:20:59.005000+02\",\"measurements\":[{\"measure\":{\"measureName\":\"count\",\"controlColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}]}" val seqMeasurementDTO = Seq( MeasurementDTO( @@ -191,14 +191,14 @@ class SerializationUtilsTest extends AnyFlatSpecLike { "asJson" should "serialize MeasureDTO into json string" in { val measureDTO = MeasureDTO("count", Seq("col")) - val expectedMeasureDTOJson = "{\"functionName\":\"count\",\"controlColumns\":[\"col\"]}" + val expectedMeasureDTOJson = "{\"measureName\":\"count\",\"controlColumns\":[\"col\"]}" val actualMeasureDTOJson = SerializationUtils.asJson(measureDTO) assert(expectedMeasureDTOJson == actualMeasureDTOJson) } "fromJson" should "deserialize MeasureDTO from json string" in { - val measureDTOJson = "{\"functionName\":\"count\",\"controlColumns\":[\"col\"]}" + val measureDTOJson = "{\"measureName\":\"count\",\"controlColumns\":[\"col\"]}" val expectedMeasureDTO = MeasureDTO("count", Seq("col")) val actualMeasureDTO = SerializationUtils.fromJson[MeasureDTO](measureDTOJson) @@ -213,14 +213,14 @@ class SerializationUtilsTest extends AnyFlatSpecLike { val measurementDTO = MeasurementDTO(measureDTO, measureResultDTO) - val expectedMeasurementDTOJson = "{\"measure\":{\"functionName\":\"count\",\"controlColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}" + val expectedMeasurementDTOJson = "{\"measure\":{\"measureName\":\"count\",\"controlColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}" val actualMeasurementDTOJson = SerializationUtils.asJson(measurementDTO) assert(expectedMeasurementDTOJson == actualMeasurementDTOJson) } "fromJson" should "deserialize MeasurementDTO from json string" in { - val measurementDTOJson = "{\"measure\":{\"functionName\":\"count\",\"controlColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}" + val measurementDTOJson = "{\"measure\":{\"measureName\":\"count\",\"controlColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}" val measureDTO = MeasureDTO("count", Seq("col")) val measureResultDTO = MeasureResultDTO(mainValue = TypedValue("1", ResultValueType.Long)) From ecd7731781dacc0bc642fc592a5f81dcbbdb6224 Mon Sep 17 00:00:00 2001 From: Ladislav Sulak Date: Fri, 27 Oct 2023 17:37:26 +0200 Subject: [PATCH 07/15] #94: refactoring and fully fixing build --- .../za/co/absa/atum/agent/AtumContext.scala | 20 +------------------ .../za/co/absa/atum/agent/model/Measure.scala | 20 ++++++++++++++++++- .../co/absa/atum/agent/AtumContextTest.scala | 16 ++++++--------- project/Dependencies.scala | 4 +++- 4 files changed, 29 insertions(+), 31 deletions(-) diff --git a/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala b/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala index 9aeaac0cc..6422ecb2e 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala @@ -18,7 +18,6 @@ package za.co.absa.atum.agent import org.slf4s.Logging import org.apache.spark.sql.DataFrame -import org.apache.spark.sql.types.NumericType import za.co.absa.atum.agent.AtumContext.AtumPartitions import za.co.absa.atum.agent.model._ import za.co.absa.atum.model.dto._ @@ -44,26 +43,9 @@ class AtumContext private[agent] ( agent.getOrCreateAtumSubContext(atumPartitions ++ subPartitions)(this) } - private def validateMeasureApplicability(measure: Measure, df: DataFrame): Unit = { - require( - df.columns.contains(measure.controlCol), - s"Column(s) '${measure.controlCol}' must be present in dataframe, but it's not. " + - s"Columns in the dataframe: ${df.columns.mkString(", ")}." - ) - - val colDataType = df.select(measure.controlCol).schema.fields(0).dataType - val isColDataTypeNumeric = colDataType.isInstanceOf[NumericType] - if (measure.onlyForNumeric && !isColDataTypeNumeric) { - log.warn( // TODO: discuss, throw exception or warn message? Or both, parametrized? - s"Column ${measure.controlCol} measurement ${measure.measureName} requested, but the field is not numeric! " + - s"Found: ${colDataType.simpleString} datatype." - ) - } - } - private def takeMeasurements(df: DataFrame): Set[MeasurementByAtum] = { measures.map { m => - validateMeasureApplicability(m, df) + m.validateMeasureApplicability(df) val measurementResult = m.function(df) MeasurementByAtum(m, measurementResult.resultValue, measurementResult.resultType) diff --git a/agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala b/agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala index 0ce5a190e..92e96d524 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala @@ -17,7 +17,7 @@ package za.co.absa.atum.agent.model import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types.{DecimalType, LongType, StringType} +import org.apache.spark.sql.types.{DecimalType, LongType, NumericType, StringType} import org.apache.spark.sql.{Column, DataFrame} import za.co.absa.atum.agent.core.MeasurementProcessor import za.co.absa.atum.agent.core.MeasurementProcessor.{MeasurementFunction, ResultOfMeasurement} @@ -29,6 +29,24 @@ import za.co.absa.spark.commons.implicits.StructTypeImplicits.StructTypeEnhancem */ sealed trait Measure extends MeasurementProcessor with MeasureType { val controlCol: String + + def validateMeasureApplicability(df: DataFrame): Unit = { + require( + df.columns.contains(controlCol), + s"Column(s) '$controlCol' must be present in dataframe, but it's not. " + + s"Columns in the dataframe: ${df.columns.mkString(", ")}." + ) + + if (onlyForNumeric) { + val colDataType = df.select(controlCol).schema.fields(0).dataType + val isColDataTypeNumeric = colDataType.isInstanceOf[NumericType] + require( + isColDataTypeNumeric, + s"Column $controlCol measurement $measureName requested, but the field is not numeric! " + + s"Found: ${colDataType.simpleString} datatype." + ) + } + } } trait MeasureType { diff --git a/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala index 3d63f9e3b..c712de882 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala @@ -19,12 +19,9 @@ package za.co.absa.atum.agent import org.apache.spark.sql.types.{DoubleType, StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} import org.mockito.ArgumentCaptor -import org.mockito.IdiomaticMockito.StubbingOps import org.mockito.Mockito.{mock, times, verify} import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers -import org.slf4s -import org.slf4j.{Logger => Underlying} import za.co.absa.atum.agent.AtumContext.AtumPartitions import za.co.absa.atum.agent.model.Measure.{RecordCount, SumOfValuesOfColumn} import za.co.absa.atum.agent.model.MeasurementProvided @@ -174,12 +171,9 @@ class AtumContextTest extends AnyFlatSpec with Matchers { "createCheckpoint" should "take measurements and fail because numeric measure is defined on non-numeric column" in { val mockAgent = mock(classOf[AtumAgent]) - val mockLogger = mock(classOf[Underlying]) - mockLogger.isWarnEnabled returns true - implicit val atumContext: AtumContext = new AtumContext(AtumPartitions("foo2" -> "bar"), mockAgent) { - override val log: slf4s.Logger = slf4s.Logger(mockLogger) - }.addMeasure(SumOfValuesOfColumn("nonNumericalColumn")) + implicit val atumContext: AtumContext = new AtumContext(AtumPartitions("foo2" -> "bar"), mockAgent) + .addMeasure(SumOfValuesOfColumn("nonNumericalColumn")) val spark = SparkSession.builder .master("local") @@ -202,9 +196,11 @@ class AtumContextTest extends AnyFlatSpec with Matchers { import AtumContext._ val df = spark.createDataFrame(rdd, schema) - df.createAndSaveCheckpoint("checkPointNameCountInvalid", "authorOfCount") - verify(mockLogger).warn( + val caughtException = the[IllegalArgumentException] thrownBy { + df.createAndSaveCheckpoint("checkPointNameCountInvalid", "authorOfCount") + } + caughtException.getMessage should include( "Column nonNumericalColumn measurement aggregatedTotal requested, " + "but the field is not numeric! Found: string datatype." ) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 2df9bec34..f207c7050 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -47,8 +47,9 @@ object Dependencies { val json4s_spark2 = "3.5.3" val json4s_spark3 = "3.7.0-M11" + val slf4j = "1.7.25" val slf4s = "1.7.25" - val logback = "1.3.0" + val logback = "1.2.3" } private def limitVersion(version: String, parts: Int): String = { @@ -64,6 +65,7 @@ object Dependencies { } def commonDependencies: Seq[ModuleID] = Seq( + "org.slf4j" % "slf4j-api" % Versions.slf4j, "org.slf4s" %% "slf4s-api" % Versions.slf4s, "ch.qos.logback" % "logback-classic" % Versions.logback, "org.scalatest" %% "scalatest" % Versions.scalatest % Test, From cf9dc69b04950b024fe59c2f9fdbe208bcd1e539 Mon Sep 17 00:00:00 2001 From: Ladislav Sulak Date: Fri, 27 Oct 2023 17:41:01 +0200 Subject: [PATCH 08/15] #94: removing redundant import --- project/Dependencies.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index f207c7050..81e232374 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -47,7 +47,6 @@ object Dependencies { val json4s_spark2 = "3.5.3" val json4s_spark3 = "3.7.0-M11" - val slf4j = "1.7.25" val slf4s = "1.7.25" val logback = "1.2.3" } @@ -65,7 +64,6 @@ object Dependencies { } def commonDependencies: Seq[ModuleID] = Seq( - "org.slf4j" % "slf4j-api" % Versions.slf4j, "org.slf4s" %% "slf4s-api" % Versions.slf4s, "ch.qos.logback" % "logback-classic" % Versions.logback, "org.scalatest" %% "scalatest" % Versions.scalatest % Test, From f123b6fa7fe551c861aa862055d343dcd530c84d Mon Sep 17 00:00:00 2001 From: Ladislav Sulak Date: Fri, 27 Oct 2023 18:32:12 +0200 Subject: [PATCH 09/15] #26: post-review changes --- .../za/co/absa/atum/agent/AtumContext.scala | 30 +++------ ...ception.scala => AtumAgentException.scala} | 5 +- .../UnsupportedMeasureException.scala | 19 ------ .../UnsupportedMeasureResultType.scala | 19 ------ .../atum/agent/model/MeasuresMapper.scala | 4 +- .../co/absa/atum/agent/AtumContextTest.scala | 62 ++++++++++--------- .../absa/atum/agent/model/MeasureTest.scala | 10 +-- .../atum/agent/model/MeasuresMapperTest.scala | 4 +- 8 files changed, 56 insertions(+), 97 deletions(-) rename agent/src/main/scala/za/co/absa/atum/agent/exception/{MeasurementProvidedException.scala => AtumAgentException.scala} (75%) delete mode 100644 agent/src/main/scala/za/co/absa/atum/agent/exception/UnsupportedMeasureException.scala delete mode 100644 agent/src/main/scala/za/co/absa/atum/agent/exception/UnsupportedMeasureResultType.scala diff --git a/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala b/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala index 40776441f..a0322dfba 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala @@ -49,12 +49,12 @@ class AtumContext private[agent] ( } } - private [agent] def createCheckpoint(checkpointName: String, author: String, dataToMeasure: DataFrame): Checkpoint = { + def createCheckpoint(checkpointName: String, author: String, dataToMeasure: DataFrame): AtumContext = { val startTime = OffsetDateTime.now() val measurements = takeMeasurements(dataToMeasure) val endTime = OffsetDateTime.now() - Checkpoint( + val checkpoint = Checkpoint( name = checkpointName, author = author, measuredByAtumAgent = true, @@ -63,22 +63,17 @@ class AtumContext private[agent] ( processEndTime = Some(endTime), measurements = measurements.toSeq ) - } - - def createAndSaveCheckpoint(checkpointName: String, author: String, dataToMeasure: DataFrame): AtumContext = { - val checkpoint = createCheckpoint(checkpointName, author, dataToMeasure) - val checkpointDTO = checkpoint.toCheckpointDTO - agent.saveCheckpoint(checkpointDTO) + agent.saveCheckpoint(checkpoint) this } - private [agent] def createCheckpointOnProvidedData( + def createCheckpointOnProvidedData( checkpointName: String, author: String, measurements: Seq[Measurement] - ): Checkpoint = { + ): AtumContext = { val offsetDateTimeNow = OffsetDateTime.now() - Checkpoint( + val checkpoint = Checkpoint( name = checkpointName, author = author, atumPartitions = this.atumPartitions, @@ -86,15 +81,8 @@ class AtumContext private[agent] ( processEndTime = Some(offsetDateTimeNow), measurements = measurements ) - } - - def createAndSaveCheckpointOnProvidedData( - checkpointName: String, author: String, measurements: Seq[Measurement] - ): AtumContext = { - val checkpoint = createCheckpointOnProvidedData(checkpointName, author, measurements) - val checkpointDTO = checkpoint.toCheckpointDTO - agent.saveCheckpoint(checkpointDTO) + agent.saveCheckpoint(checkpoint) this } @@ -164,8 +152,8 @@ object AtumContext { * @param atumContext Contains the calculations to be done and publish the result * @return */ - def createAndSaveCheckpoint(checkpointName: String, author: String)(implicit atumContext: AtumContext): DataFrame = { - atumContext.createAndSaveCheckpoint(checkpointName, author, df) + def createCheckpoint(checkpointName: String, author: String)(implicit atumContext: AtumContext): DataFrame = { + atumContext.createCheckpoint(checkpointName, author, df) df } diff --git a/agent/src/main/scala/za/co/absa/atum/agent/exception/MeasurementProvidedException.scala b/agent/src/main/scala/za/co/absa/atum/agent/exception/AtumAgentException.scala similarity index 75% rename from agent/src/main/scala/za/co/absa/atum/agent/exception/MeasurementProvidedException.scala rename to agent/src/main/scala/za/co/absa/atum/agent/exception/AtumAgentException.scala index afd90ece2..3e3ced104 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/exception/MeasurementProvidedException.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/exception/AtumAgentException.scala @@ -16,4 +16,7 @@ package za.co.absa.atum.agent.exception -case class MeasurementProvidedException(msg: String) extends Exception(msg) +sealed abstract class AtumAgentException extends Exception + +case class MeasurementProvidedException(msg: String) extends AtumAgentException +case class MeasureException(msg: String) extends AtumAgentException diff --git a/agent/src/main/scala/za/co/absa/atum/agent/exception/UnsupportedMeasureException.scala b/agent/src/main/scala/za/co/absa/atum/agent/exception/UnsupportedMeasureException.scala deleted file mode 100644 index 86b40166f..000000000 --- a/agent/src/main/scala/za/co/absa/atum/agent/exception/UnsupportedMeasureException.scala +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright 2021 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.atum.agent.exception - -case class UnsupportedMeasureException(msg: String) extends Exception(msg) diff --git a/agent/src/main/scala/za/co/absa/atum/agent/exception/UnsupportedMeasureResultType.scala b/agent/src/main/scala/za/co/absa/atum/agent/exception/UnsupportedMeasureResultType.scala deleted file mode 100644 index 2f0762475..000000000 --- a/agent/src/main/scala/za/co/absa/atum/agent/exception/UnsupportedMeasureResultType.scala +++ /dev/null @@ -1,19 +0,0 @@ -/* - * Copyright 2021 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.atum.agent.exception - -case class UnsupportedMeasureResultType(msg: String) extends Exception(msg) diff --git a/agent/src/main/scala/za/co/absa/atum/agent/model/MeasuresMapper.scala b/agent/src/main/scala/za/co/absa/atum/agent/model/MeasuresMapper.scala index d2a9f39e2..aa75ba8f1 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/model/MeasuresMapper.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/model/MeasuresMapper.scala @@ -16,7 +16,7 @@ package za.co.absa.atum.agent.model -import za.co.absa.atum.agent.exception.UnsupportedMeasureException +import za.co.absa.atum.agent.exception.MeasureException import za.co.absa.atum.agent.model.Measure._ import za.co.absa.atum.model.dto @@ -36,7 +36,7 @@ private [agent] object MeasuresMapper { case AbsSumOfValuesOfColumn.measureName => AbsSumOfValuesOfColumn(controlColumn) case SumOfHashesOfColumn.measureName => SumOfHashesOfColumn(controlColumn) case unsupportedMeasure => - throw UnsupportedMeasureException( + throw MeasureException( s"Measure not supported: $unsupportedMeasure. Supported measures are: ${Measure.supportedMeasureNames}" ) } diff --git a/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala index fffbb25d3..1d6770865 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala @@ -24,7 +24,7 @@ import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import za.co.absa.atum.agent.AtumContext.AtumPartitions import za.co.absa.atum.agent.model.Measure.{RecordCount, SumOfValuesOfColumn} -import za.co.absa.atum.agent.model.MeasurementProvided +import za.co.absa.atum.agent.model.{Checkpoint, MeasurementProvided} import za.co.absa.atum.model.dto._ import org.apache.spark.sql.types.{StringType, StructField, StructType} import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType @@ -87,42 +87,48 @@ class AtumContextTest extends AnyFlatSpec with Matchers { val rdd = spark.sparkContext.parallelize(Seq("A", "B", "C")) val df = rdd.toDF("letter") - val checkpoint = atumContext.createCheckpoint("testCheckpoint", "Hans", df) + atumContext.createCheckpoint("testCheckpoint", "Hans", df) - assert(checkpoint.name == "testCheckpoint") - assert(checkpoint.author == "Hans") - assert(checkpoint.atumPartitions == AtumPartitions("foo2", "bar")) - assert(checkpoint.measurements.head.resultValue == "3") + val argument = ArgumentCaptor.forClass(classOf[Checkpoint]) + verify(mockAgent).saveCheckpoint(argument.capture()) + + assert(argument.getValue.name == "testCheckpoint") + assert(argument.getValue.author == "Hans") + assert(argument.getValue.atumPartitions == AtumPartitions("foo2", "bar")) + assert(argument.getValue.measurements.head.resultValue == "3") } "createCheckpointOnProvidedData" should "create a Checkpoint on provided data" in { - val atumAgent = new AtumAgent + val mockAgent = mock(classOf[AtumAgent]) val atumPartitions = AtumPartitions("key" -> "value") - val atumContext = atumAgent.getOrCreateAtumContext(atumPartitions) + val atumContext = mockAgent.getOrCreateAtumContext(atumPartitions) val measurements = Seq( MeasurementProvided(RecordCount("col"), 1L), MeasurementProvided(SumOfValuesOfColumn("col"), BigDecimal(1)) ) - val checkpoint = atumContext.createCheckpointOnProvidedData( + atumContext.createCheckpointOnProvidedData( checkpointName = "name", author = "author", measurements = measurements ) - assert(checkpoint.name == "name") - assert(checkpoint.author == "author") - assert(!checkpoint.measuredByAtumAgent) - assert(checkpoint.atumPartitions == atumPartitions) - assert(checkpoint.processStartTime == checkpoint.processEndTime.get) - assert(checkpoint.measurements == measurements) + val argument = ArgumentCaptor.forClass(classOf[Checkpoint]) + verify(mockAgent).saveCheckpoint(argument.capture()) + + assert(argument.getValue.name == "name") + assert(argument.getValue.author == "author") + assert(!argument.getValue.measuredByAtumAgent) + assert(argument.getValue.atumPartitions == atumPartitions) + assert(argument.getValue.processStartTime == argument.getValue.processEndTime.get) + assert(argument.getValue.measurements == measurements) } - "createAndSaveCheckpoint" should "take measurements and create a Checkpoint, multiple measure changes" in { + "createCheckpoint" should "take measurements and create a Checkpoint, multiple measure changes" in { val mockAgent = mock(classOf[AtumAgent]) - - implicit val atumContext: AtumContext = new AtumContext(AtumPartitions("foo2" -> "bar"), mockAgent) + val atumPartitions = AtumPartitions("foo2" -> "bar") + implicit val atumContext: AtumContext = new AtumContext(atumPartitions, mockAgent) .addMeasure(RecordCount("notImportantColumn")) val spark = SparkSession.builder @@ -146,28 +152,28 @@ class AtumContextTest extends AnyFlatSpec with Matchers { import AtumContext._ val df = spark.createDataFrame(rdd, schema) - .createAndSaveCheckpoint("checkPointNameCount", "authorOfCount") + .createCheckpoint("checkPointNameCount", "authorOfCount") - val argumentFirst = ArgumentCaptor.forClass(classOf[CheckpointDTO]) + val argumentFirst = ArgumentCaptor.forClass(classOf[Checkpoint]) verify(mockAgent, times(1)).saveCheckpoint(argumentFirst.capture()) assert(argumentFirst.getValue.name == "checkPointNameCount") assert(argumentFirst.getValue.author == "authorOfCount") - assert(argumentFirst.getValue.partitioning == Seq(PartitionDTO("foo2", "bar"))) - assert(argumentFirst.getValue.measurements.head.result.mainValue.value == "4") - assert(argumentFirst.getValue.measurements.head.result.mainValue.valueType == ResultValueType.Long) + assert(argumentFirst.getValue.atumPartitions == atumPartitions) + assert(argumentFirst.getValue.measurements.head.resultValue == "4") + assert(argumentFirst.getValue.measurements.head.resultType == ResultValueType.Long) atumContext.addMeasure(SumOfValuesOfColumn("columnForSum")) - df.createAndSaveCheckpoint("checkPointNameSum", "authorOfSum") + df.createCheckpoint("checkPointNameSum", "authorOfSum") - val argumentSecond = ArgumentCaptor.forClass(classOf[CheckpointDTO]) + val argumentSecond = ArgumentCaptor.forClass(classOf[Checkpoint]) verify(mockAgent, times(2)).saveCheckpoint(argumentSecond.capture()) assert(argumentSecond.getValue.name == "checkPointNameSum") assert(argumentSecond.getValue.author == "authorOfSum") - assert(argumentSecond.getValue.partitioning == Seq(PartitionDTO("foo2", "bar"))) - assert(argumentSecond.getValue.measurements.tail.head.result.mainValue.value == "22.5") - assert(argumentSecond.getValue.measurements.tail.head.result.mainValue.valueType == ResultValueType.BigDecimal) + assert(argumentSecond.getValue.atumPartitions == atumPartitions) + assert(argumentSecond.getValue.measurements.tail.head.resultValue == "22.5") + assert(argumentSecond.getValue.measurements.tail.head.resultType == ResultValueType.BigDecimal) } } diff --git a/agent/src/test/scala/za/co/absa/atum/agent/model/MeasureTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/model/MeasureTest.scala index 44ed54236..031b48244 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/model/MeasureTest.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/model/MeasureTest.scala @@ -52,20 +52,20 @@ class MeasureTest extends AnyFlatSpec with Matchers with SparkTestBase { self => .format("csv") .option("header", "true") .load("agent/src/test/resources/random-dataset/persons.csv") - .createAndSaveCheckpoint("name1", "author")(atumContextInstanceWithRecordCount) - .createAndSaveCheckpoint("name2", "author")(atumContextWithNameHashSum) + .createCheckpoint("name1", "author")(atumContextInstanceWithRecordCount) + .createCheckpoint("name2", "author")(atumContextWithNameHashSum) val dsEnrichment = spark.read .format("csv") .option("header", "true") .load("agent/src/test/resources/random-dataset/persons-enriched.csv") - .createAndSaveCheckpoint("name3", "author")( + .createCheckpoint("name3", "author")( atumContextWithSalaryAbsMeasure.removeMeasure(salaryAbsSum) ) val dfFull = dfPersons .join(dsEnrichment, Seq("id")) - .createAndSaveCheckpoint("other different name", "author")(atumContextWithSalaryAbsMeasure) + .createCheckpoint("other different name", "author")(atumContextWithSalaryAbsMeasure) val dfExtraPersonWithNegativeSalary = spark .createDataFrame( @@ -77,7 +77,7 @@ class MeasureTest extends AnyFlatSpec with Matchers with SparkTestBase { self => val dfExtraPerson = dfExtraPersonWithNegativeSalary.union(dfPersons) - dfExtraPerson.createAndSaveCheckpoint("a checkpoint name", "author")( + dfExtraPerson.createCheckpoint("a checkpoint name", "author")( atumContextWithSalaryAbsMeasure .removeMeasure(measureIds) .removeMeasure(salaryAbsSum) diff --git a/agent/src/test/scala/za/co/absa/atum/agent/model/MeasuresMapperTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/model/MeasuresMapperTest.scala index 79b8ba296..6f43c44db 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/model/MeasuresMapperTest.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/model/MeasuresMapperTest.scala @@ -17,7 +17,7 @@ package za.co.absa.atum.agent.model import org.scalatest.flatspec.AnyFlatSpecLike -import za.co.absa.atum.agent.exception.UnsupportedMeasureException +import za.co.absa.atum.agent.exception.MeasureException import za.co.absa.atum.agent.model.Measure._ import za.co.absa.atum.model.dto.MeasureDTO @@ -50,7 +50,7 @@ class MeasuresMapperTest extends AnyFlatSpecLike { MeasureDTO("unsupportedMeasure", Seq("col")) ) - assertThrows[UnsupportedMeasureException](MeasuresMapper.mapToMeasures(unsupportedMeasure)) + assertThrows[MeasureException](MeasuresMapper.mapToMeasures(unsupportedMeasure)) } } From f2928f5690f576cc4bf4dfe63ce3c39a4406fcc8 Mon Sep 17 00:00:00 2001 From: Ladislav Sulak Date: Sat, 28 Oct 2023 09:19:35 +0200 Subject: [PATCH 10/15] #26: fixing unit test --- agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala | 4 +--- .../test/scala/za/co/absa/atum/agent/AtumContextTest.scala | 3 +-- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala b/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala index a0322dfba..67aff1853 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala @@ -68,9 +68,7 @@ class AtumContext private[agent] ( this } - def createCheckpointOnProvidedData( - checkpointName: String, author: String, measurements: Seq[Measurement] - ): AtumContext = { + def createCheckpointOnProvidedData(checkpointName: String, author: String, measurements: Seq[Measurement]): AtumContext = { val offsetDateTimeNow = OffsetDateTime.now() val checkpoint = Checkpoint( diff --git a/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala index 1d6770865..4447d037f 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala @@ -25,7 +25,6 @@ import org.scalatest.matchers.should.Matchers import za.co.absa.atum.agent.AtumContext.AtumPartitions import za.co.absa.atum.agent.model.Measure.{RecordCount, SumOfValuesOfColumn} import za.co.absa.atum.agent.model.{Checkpoint, MeasurementProvided} -import za.co.absa.atum.model.dto._ import org.apache.spark.sql.types.{StringType, StructField, StructType} import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType @@ -101,7 +100,7 @@ class AtumContextTest extends AnyFlatSpec with Matchers { "createCheckpointOnProvidedData" should "create a Checkpoint on provided data" in { val mockAgent = mock(classOf[AtumAgent]) val atumPartitions = AtumPartitions("key" -> "value") - val atumContext = mockAgent.getOrCreateAtumContext(atumPartitions) + val atumContext: AtumContext = new AtumContext(atumPartitions, mockAgent) val measurements = Seq( MeasurementProvided(RecordCount("col"), 1L), From c2c340569d3d1ff630936595ccc07831aeb99557 Mon Sep 17 00:00:00 2001 From: Ladislav Sulak Date: Sat, 28 Oct 2023 09:47:19 +0200 Subject: [PATCH 11/15] #26: removing redundant internal Agent's Checkpoint model and some refactoring --- .../za/co/absa/atum/agent/AtumAgent.scala | 10 ---- .../za/co/absa/atum/agent/AtumContext.scala | 21 ++++---- .../co/absa/atum/agent/model/Checkpoint.scala | 48 ------------------- ...uresMapper.scala => MeasuresBuilder.scala} | 2 +- .../co/absa/atum/agent/AtumContextTest.scala | 35 +++++++------- ...erTest.scala => MeasuresBuilderTest.scala} | 6 +-- 6 files changed, 35 insertions(+), 87 deletions(-) delete mode 100644 agent/src/main/scala/za/co/absa/atum/agent/model/Checkpoint.scala rename agent/src/main/scala/za/co/absa/atum/agent/model/{MeasuresMapper.scala => MeasuresBuilder.scala} (97%) rename agent/src/test/scala/za/co/absa/atum/agent/model/{MeasuresMapperTest.scala => MeasuresBuilderTest.scala} (89%) diff --git a/agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala b/agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala index 0de25b152..b520d139d 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala @@ -18,7 +18,6 @@ package za.co.absa.atum.agent import com.typesafe.config.{Config, ConfigFactory} import za.co.absa.atum.agent.AtumContext.AtumPartitions import za.co.absa.atum.agent.dispatcher.{ConsoleDispatcher, HttpDispatcher} -import za.co.absa.atum.agent.model.Checkpoint import za.co.absa.atum.model.dto.{CheckpointDTO, PartitioningDTO} /** @@ -43,15 +42,6 @@ class AtumAgent private[agent] () { dispatcher.saveCheckpoint(checkpoint) } - /** - * Sends `Checkpoint` to the AtumService API - * - * @param checkpoint Already initialized Checkpoint object to store - */ - private [agent] def saveCheckpoint(checkpoint: Checkpoint): Unit = { - dispatcher.saveCheckpoint(checkpoint.toCheckpointDTO) - } - /** * Provides an AtumContext given a `AtumPartitions` instance. Retrieves the data from AtumService API. * diff --git a/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala b/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala index 67aff1853..837fd4644 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala @@ -22,6 +22,7 @@ import za.co.absa.atum.agent.model._ import za.co.absa.atum.model.dto._ import java.time.OffsetDateTime +import java.util.UUID import scala.collection.immutable.ListMap /** @@ -54,33 +55,35 @@ class AtumContext private[agent] ( val measurements = takeMeasurements(dataToMeasure) val endTime = OffsetDateTime.now() - val checkpoint = Checkpoint( + val checkpointDTO = CheckpointDTO( + id = UUID.randomUUID(), name = checkpointName, author = author, measuredByAtumAgent = true, - atumPartitions = this.atumPartitions, + partitioning = AtumPartitions.toSeqPartitionDTO(this.atumPartitions), processStartTime = startTime, processEndTime = Some(endTime), - measurements = measurements.toSeq + measurements = measurements.map(MeasurementBuilder.buildMeasurementDTO).toSeq ) - agent.saveCheckpoint(checkpoint) + agent.saveCheckpoint(checkpointDTO) this } def createCheckpointOnProvidedData(checkpointName: String, author: String, measurements: Seq[Measurement]): AtumContext = { val offsetDateTimeNow = OffsetDateTime.now() - val checkpoint = Checkpoint( + val checkpointDTO = CheckpointDTO( + id = UUID.randomUUID(), name = checkpointName, author = author, - atumPartitions = this.atumPartitions, + partitioning = AtumPartitions.toSeqPartitionDTO(this.atumPartitions), processStartTime = offsetDateTimeNow, processEndTime = Some(offsetDateTimeNow), - measurements = measurements + measurements = measurements.map(MeasurementBuilder.buildMeasurementDTO) ) - agent.saveCheckpoint(checkpoint) + agent.saveCheckpoint(checkpointDTO) this } @@ -137,7 +140,7 @@ object AtumContext { new AtumContext( AtumPartitions.fromPartitioning(atumContextDTO.partitioning), agent, - MeasuresMapper.mapToMeasures(atumContextDTO.measures) + MeasuresBuilder.mapToMeasures(atumContextDTO.measures) ) } diff --git a/agent/src/main/scala/za/co/absa/atum/agent/model/Checkpoint.scala b/agent/src/main/scala/za/co/absa/atum/agent/model/Checkpoint.scala deleted file mode 100644 index 3a4708656..000000000 --- a/agent/src/main/scala/za/co/absa/atum/agent/model/Checkpoint.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2021 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.atum.agent.model - -import za.co.absa.atum.agent.AtumContext.AtumPartitions -import za.co.absa.atum.model.dto.CheckpointDTO - -import java.time.OffsetDateTime -import java.util.UUID - -case class Checkpoint( - name: String, - author: String, - measuredByAtumAgent: Boolean = false, - atumPartitions: AtumPartitions, - processStartTime: OffsetDateTime, - processEndTime: Option[OffsetDateTime], - measurements: Seq[Measurement] -) { - private [agent] def toCheckpointDTO: CheckpointDTO = { - val measurementDTOs = measurements.map(MeasurementBuilder.buildMeasurementDTO) - - CheckpointDTO( - id = UUID.randomUUID(), - name = name, - author = author, - measuredByAtumAgent = measuredByAtumAgent, - partitioning = AtumPartitions.toSeqPartitionDTO(atumPartitions), - processStartTime = processStartTime, - processEndTime = processEndTime, - measurements = measurementDTOs - ) - } -} diff --git a/agent/src/main/scala/za/co/absa/atum/agent/model/MeasuresMapper.scala b/agent/src/main/scala/za/co/absa/atum/agent/model/MeasuresBuilder.scala similarity index 97% rename from agent/src/main/scala/za/co/absa/atum/agent/model/MeasuresMapper.scala rename to agent/src/main/scala/za/co/absa/atum/agent/model/MeasuresBuilder.scala index aa75ba8f1..f73c6e3b1 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/model/MeasuresMapper.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/model/MeasuresBuilder.scala @@ -20,7 +20,7 @@ import za.co.absa.atum.agent.exception.MeasureException import za.co.absa.atum.agent.model.Measure._ import za.co.absa.atum.model.dto -private [agent] object MeasuresMapper { +private [agent] object MeasuresBuilder { private [agent] def mapToMeasures(measures: Set[dto.MeasureDTO]): Set[za.co.absa.atum.agent.model.Measure] = { measures.map(createMeasure) diff --git a/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala index 4447d037f..41b08cd1e 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala @@ -24,8 +24,9 @@ import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import za.co.absa.atum.agent.AtumContext.AtumPartitions import za.co.absa.atum.agent.model.Measure.{RecordCount, SumOfValuesOfColumn} -import za.co.absa.atum.agent.model.{Checkpoint, MeasurementProvided} +import za.co.absa.atum.agent.model.{MeasurementBuilder, MeasurementProvided} import org.apache.spark.sql.types.{StringType, StructField, StructType} +import za.co.absa.atum.model.dto.CheckpointDTO import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType class AtumContextTest extends AnyFlatSpec with Matchers { @@ -71,8 +72,9 @@ class AtumContextTest extends AnyFlatSpec with Matchers { "createCheckpoint" should "take measurements and create a Checkpoint" in { val mockAgent = mock(classOf[AtumAgent]) + val atumPartitions = AtumPartitions("foo2" -> "bar") - val atumContext = new AtumContext(AtumPartitions("foo2" -> "bar"), mockAgent) + val atumContext = new AtumContext(atumPartitions, mockAgent) .addMeasure(RecordCount("letter")) val spark = SparkSession.builder @@ -88,13 +90,14 @@ class AtumContextTest extends AnyFlatSpec with Matchers { atumContext.createCheckpoint("testCheckpoint", "Hans", df) - val argument = ArgumentCaptor.forClass(classOf[Checkpoint]) + val argument = ArgumentCaptor.forClass(classOf[CheckpointDTO]) verify(mockAgent).saveCheckpoint(argument.capture()) assert(argument.getValue.name == "testCheckpoint") assert(argument.getValue.author == "Hans") - assert(argument.getValue.atumPartitions == AtumPartitions("foo2", "bar")) - assert(argument.getValue.measurements.head.resultValue == "3") + assert(argument.getValue.partitioning == AtumPartitions.toSeqPartitionDTO(atumPartitions)) + assert(argument.getValue.measurements.head.result.mainValue.value == "3") + assert(argument.getValue.measurements.head.result.mainValue.valueType == ResultValueType.Long) } "createCheckpointOnProvidedData" should "create a Checkpoint on provided data" in { @@ -113,15 +116,15 @@ class AtumContextTest extends AnyFlatSpec with Matchers { measurements = measurements ) - val argument = ArgumentCaptor.forClass(classOf[Checkpoint]) + val argument = ArgumentCaptor.forClass(classOf[CheckpointDTO]) verify(mockAgent).saveCheckpoint(argument.capture()) assert(argument.getValue.name == "name") assert(argument.getValue.author == "author") assert(!argument.getValue.measuredByAtumAgent) - assert(argument.getValue.atumPartitions == atumPartitions) + assert(argument.getValue.partitioning == AtumPartitions.toSeqPartitionDTO(atumPartitions)) assert(argument.getValue.processStartTime == argument.getValue.processEndTime.get) - assert(argument.getValue.measurements == measurements) + assert(argument.getValue.measurements == measurements.map(MeasurementBuilder.buildMeasurementDTO)) } "createCheckpoint" should "take measurements and create a Checkpoint, multiple measure changes" in { @@ -153,26 +156,26 @@ class AtumContextTest extends AnyFlatSpec with Matchers { val df = spark.createDataFrame(rdd, schema) .createCheckpoint("checkPointNameCount", "authorOfCount") - val argumentFirst = ArgumentCaptor.forClass(classOf[Checkpoint]) + val argumentFirst = ArgumentCaptor.forClass(classOf[CheckpointDTO]) verify(mockAgent, times(1)).saveCheckpoint(argumentFirst.capture()) assert(argumentFirst.getValue.name == "checkPointNameCount") assert(argumentFirst.getValue.author == "authorOfCount") - assert(argumentFirst.getValue.atumPartitions == atumPartitions) - assert(argumentFirst.getValue.measurements.head.resultValue == "4") - assert(argumentFirst.getValue.measurements.head.resultType == ResultValueType.Long) + assert(argumentFirst.getValue.partitioning == AtumPartitions.toSeqPartitionDTO(atumPartitions)) + assert(argumentFirst.getValue.measurements.head.result.mainValue.value == "4") + assert(argumentFirst.getValue.measurements.head.result.mainValue.valueType == ResultValueType.Long) atumContext.addMeasure(SumOfValuesOfColumn("columnForSum")) df.createCheckpoint("checkPointNameSum", "authorOfSum") - val argumentSecond = ArgumentCaptor.forClass(classOf[Checkpoint]) + val argumentSecond = ArgumentCaptor.forClass(classOf[CheckpointDTO]) verify(mockAgent, times(2)).saveCheckpoint(argumentSecond.capture()) assert(argumentSecond.getValue.name == "checkPointNameSum") assert(argumentSecond.getValue.author == "authorOfSum") - assert(argumentSecond.getValue.atumPartitions == atumPartitions) - assert(argumentSecond.getValue.measurements.tail.head.resultValue == "22.5") - assert(argumentSecond.getValue.measurements.tail.head.resultType == ResultValueType.BigDecimal) + assert(argumentSecond.getValue.partitioning == AtumPartitions.toSeqPartitionDTO(atumPartitions)) + assert(argumentSecond.getValue.measurements.tail.head.result.mainValue.value == "22.5") + assert(argumentSecond.getValue.measurements.tail.head.result.mainValue.valueType == ResultValueType.BigDecimal) } } diff --git a/agent/src/test/scala/za/co/absa/atum/agent/model/MeasuresMapperTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/model/MeasuresBuilderTest.scala similarity index 89% rename from agent/src/test/scala/za/co/absa/atum/agent/model/MeasuresMapperTest.scala rename to agent/src/test/scala/za/co/absa/atum/agent/model/MeasuresBuilderTest.scala index 6f43c44db..209fd3ae2 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/model/MeasuresMapperTest.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/model/MeasuresBuilderTest.scala @@ -21,7 +21,7 @@ import za.co.absa.atum.agent.exception.MeasureException import za.co.absa.atum.agent.model.Measure._ import za.co.absa.atum.model.dto.MeasureDTO -class MeasuresMapperTest extends AnyFlatSpecLike { +class MeasuresBuilderTest extends AnyFlatSpecLike { "mapToMeasures" should "map MeasureDTO into Measure for supported measures" in { val supportedMeasures = Set( @@ -40,7 +40,7 @@ class MeasuresMapperTest extends AnyFlatSpecLike { SumOfHashesOfColumn("hashCrc32Col") ) - val actualMeasures = MeasuresMapper.mapToMeasures(supportedMeasures) + val actualMeasures = MeasuresBuilder.mapToMeasures(supportedMeasures) assert(expectedMeasures == actualMeasures) } @@ -50,7 +50,7 @@ class MeasuresMapperTest extends AnyFlatSpecLike { MeasureDTO("unsupportedMeasure", Seq("col")) ) - assertThrows[MeasureException](MeasuresMapper.mapToMeasures(unsupportedMeasure)) + assertThrows[MeasureException](MeasuresBuilder.mapToMeasures(unsupportedMeasure)) } } From 55acd58109cc4bba0b6ff6228184ced3338528c2 Mon Sep 17 00:00:00 2001 From: Ladislav Sulak Date: Sat, 28 Oct 2023 09:50:22 +0200 Subject: [PATCH 12/15] #94: fixing unit tests, post merge conflicts --- .../test/scala/za/co/absa/atum/agent/AtumContextTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala index b54c74868..61fdadbc7 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala @@ -206,7 +206,7 @@ class AtumContextTest extends AnyFlatSpec with Matchers { val df = spark.createDataFrame(rdd, schema) val caughtException = the[IllegalArgumentException] thrownBy { - df.createAndSaveCheckpoint("checkPointNameCountInvalid", "authorOfCount") + df.createCheckpoint("checkPointNameCountInvalid", "authorOfCount") } caughtException.getMessage should include( "Column nonNumericalColumn measurement aggregatedTotal requested, " + @@ -243,7 +243,7 @@ class AtumContextTest extends AnyFlatSpec with Matchers { val df = spark.createDataFrame(rdd, schema) val caughtException = the[IllegalArgumentException] thrownBy { - df.createAndSaveCheckpoint("checkPointNameCountColNonExisting", "authorOfCount") + df.createCheckpoint("checkPointNameCountColNonExisting", "authorOfCount") } caughtException.getMessage should include( "Column(s) 'nonExistingColumn' must be present in dataframe, but it's not. " + From 9c1f42808b5965855307cf3174b5a63df7304aa2 Mon Sep 17 00:00:00 2001 From: Ladislav Sulak Date: Sat, 28 Oct 2023 09:53:07 +0200 Subject: [PATCH 13/15] #94: fixing unit tests, post merge conflicts --- .../src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala index 61fdadbc7..fc15b56a4 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala @@ -24,7 +24,7 @@ import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import za.co.absa.atum.agent.AtumContext.AtumPartitions import za.co.absa.atum.agent.model.Measure.{RecordCount, SumOfValuesOfColumn} -import za.co.absa.atum.agent.model.MeasurementProvided +import za.co.absa.atum.agent.model.{MeasurementBuilder, MeasurementProvided} import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType import za.co.absa.atum.model.dto._ From 46ad700cc0a56b948763ef0e184a63be07b47096 Mon Sep 17 00:00:00 2001 From: Ladislav Sulak Date: Tue, 31 Oct 2023 17:20:58 +0100 Subject: [PATCH 14/15] #26: removing onlyForNumeric as it's not currently used and we might do this differently --- .../za/co/absa/atum/agent/model/Measure.scala | 21 +++++-------------- 1 file changed, 5 insertions(+), 16 deletions(-) diff --git a/agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala b/agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala index 0ce5a190e..411f90514 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala @@ -33,7 +33,6 @@ sealed trait Measure extends MeasurementProcessor with MeasureType { trait MeasureType { val measureName: String - val onlyForNumeric: Boolean val resultValueType: ResultValueType.ResultValueType } @@ -53,7 +52,6 @@ object Measure { case class RecordCount private ( controlCol: String, measureName: String, - onlyForNumeric: Boolean, resultValueType: ResultValueType.ResultValueType ) extends Measure { @@ -64,17 +62,15 @@ object Measure { } } object RecordCount extends MeasureType { - def apply(controlCol: String): RecordCount = RecordCount(controlCol, measureName, onlyForNumeric, resultValueType) + def apply(controlCol: String): RecordCount = RecordCount(controlCol, measureName, resultValueType) override val measureName: String = "count" - override val onlyForNumeric: Boolean = false override val resultValueType: ResultValueType.ResultValueType = ResultValueType.Long } case class DistinctRecordCount private ( controlCol: String, measureName: String, - onlyForNumeric: Boolean, resultValueType: ResultValueType.ResultValueType ) extends Measure { @@ -86,18 +82,16 @@ object Measure { } object DistinctRecordCount extends MeasureType { def apply(controlCol: String): DistinctRecordCount = { - DistinctRecordCount(controlCol, measureName, onlyForNumeric, resultValueType) + DistinctRecordCount(controlCol, measureName, resultValueType) } override val measureName: String = "distinctCount" - override val onlyForNumeric: Boolean = false override val resultValueType: ResultValueType.ResultValueType = ResultValueType.Long } case class SumOfValuesOfColumn private ( controlCol: String, measureName: String, - onlyForNumeric: Boolean, resultValueType: ResultValueType.ResultValueType ) extends Measure { @@ -109,18 +103,16 @@ object Measure { } object SumOfValuesOfColumn extends MeasureType { def apply(controlCol: String): SumOfValuesOfColumn = { - SumOfValuesOfColumn(controlCol, measureName, onlyForNumeric, resultValueType) + SumOfValuesOfColumn(controlCol, measureName, resultValueType) } override val measureName: String = "aggregatedTotal" - override val onlyForNumeric: Boolean = true override val resultValueType: ResultValueType.ResultValueType = ResultValueType.BigDecimal } case class AbsSumOfValuesOfColumn private ( controlCol: String, measureName: String, - onlyForNumeric: Boolean, resultValueType: ResultValueType.ResultValueType ) extends Measure { @@ -132,18 +124,16 @@ object Measure { } object AbsSumOfValuesOfColumn extends MeasureType { def apply(controlCol: String): AbsSumOfValuesOfColumn = { - AbsSumOfValuesOfColumn(controlCol, measureName, onlyForNumeric, resultValueType) + AbsSumOfValuesOfColumn(controlCol, measureName, resultValueType) } override val measureName: String = "absAggregatedTotal" - override val onlyForNumeric: Boolean = true override val resultValueType: ResultValueType.ResultValueType = ResultValueType.Double } case class SumOfHashesOfColumn private ( controlCol: String, measureName: String, - onlyForNumeric: Boolean, resultValueType: ResultValueType.ResultValueType ) extends Measure { @@ -160,11 +150,10 @@ object Measure { } object SumOfHashesOfColumn extends MeasureType { def apply(controlCol: String): SumOfHashesOfColumn = { - SumOfHashesOfColumn(controlCol, measureName, onlyForNumeric, resultValueType) + SumOfHashesOfColumn(controlCol, measureName, resultValueType) } override val measureName: String = "hashCrc32" - override val onlyForNumeric: Boolean = false override val resultValueType: ResultValueType.ResultValueType = ResultValueType.String } From 89e645a45dd163c99701dc04e7634b7ea81df507 Mon Sep 17 00:00:00 2001 From: Ladislav Sulak Date: Tue, 31 Oct 2023 17:27:17 +0100 Subject: [PATCH 15/15] #94: adding back here, but I know that it might be removed soon --- .../za/co/absa/atum/agent/model/Measure.scala | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala b/agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala index ce8099579..92e96d524 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala @@ -51,6 +51,7 @@ sealed trait Measure extends MeasurementProcessor with MeasureType { trait MeasureType { val measureName: String + val onlyForNumeric: Boolean val resultValueType: ResultValueType.ResultValueType } @@ -70,6 +71,7 @@ object Measure { case class RecordCount private ( controlCol: String, measureName: String, + onlyForNumeric: Boolean, resultValueType: ResultValueType.ResultValueType ) extends Measure { @@ -80,15 +82,17 @@ object Measure { } } object RecordCount extends MeasureType { - def apply(controlCol: String): RecordCount = RecordCount(controlCol, measureName, resultValueType) + def apply(controlCol: String): RecordCount = RecordCount(controlCol, measureName, onlyForNumeric, resultValueType) override val measureName: String = "count" + override val onlyForNumeric: Boolean = false override val resultValueType: ResultValueType.ResultValueType = ResultValueType.Long } case class DistinctRecordCount private ( controlCol: String, measureName: String, + onlyForNumeric: Boolean, resultValueType: ResultValueType.ResultValueType ) extends Measure { @@ -100,16 +104,18 @@ object Measure { } object DistinctRecordCount extends MeasureType { def apply(controlCol: String): DistinctRecordCount = { - DistinctRecordCount(controlCol, measureName, resultValueType) + DistinctRecordCount(controlCol, measureName, onlyForNumeric, resultValueType) } override val measureName: String = "distinctCount" + override val onlyForNumeric: Boolean = false override val resultValueType: ResultValueType.ResultValueType = ResultValueType.Long } case class SumOfValuesOfColumn private ( controlCol: String, measureName: String, + onlyForNumeric: Boolean, resultValueType: ResultValueType.ResultValueType ) extends Measure { @@ -121,16 +127,18 @@ object Measure { } object SumOfValuesOfColumn extends MeasureType { def apply(controlCol: String): SumOfValuesOfColumn = { - SumOfValuesOfColumn(controlCol, measureName, resultValueType) + SumOfValuesOfColumn(controlCol, measureName, onlyForNumeric, resultValueType) } override val measureName: String = "aggregatedTotal" + override val onlyForNumeric: Boolean = true override val resultValueType: ResultValueType.ResultValueType = ResultValueType.BigDecimal } case class AbsSumOfValuesOfColumn private ( controlCol: String, measureName: String, + onlyForNumeric: Boolean, resultValueType: ResultValueType.ResultValueType ) extends Measure { @@ -142,16 +150,18 @@ object Measure { } object AbsSumOfValuesOfColumn extends MeasureType { def apply(controlCol: String): AbsSumOfValuesOfColumn = { - AbsSumOfValuesOfColumn(controlCol, measureName, resultValueType) + AbsSumOfValuesOfColumn(controlCol, measureName, onlyForNumeric, resultValueType) } override val measureName: String = "absAggregatedTotal" + override val onlyForNumeric: Boolean = true override val resultValueType: ResultValueType.ResultValueType = ResultValueType.Double } case class SumOfHashesOfColumn private ( controlCol: String, measureName: String, + onlyForNumeric: Boolean, resultValueType: ResultValueType.ResultValueType ) extends Measure { @@ -168,10 +178,11 @@ object Measure { } object SumOfHashesOfColumn extends MeasureType { def apply(controlCol: String): SumOfHashesOfColumn = { - SumOfHashesOfColumn(controlCol, measureName, resultValueType) + SumOfHashesOfColumn(controlCol, measureName, onlyForNumeric, resultValueType) } override val measureName: String = "hashCrc32" + override val onlyForNumeric: Boolean = false override val resultValueType: ResultValueType.ResultValueType = ResultValueType.String }