diff --git a/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala b/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala index 324350db7..4fc9ac5e1 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala @@ -103,7 +103,7 @@ class AtumContext private[agent] ( * @param measurements the measurements to be included in the checkpoint * @return the AtumContext after the checkpoint has been created */ - def createCheckpointOnProvidedData(checkpointName: String, measurements: Set[Measurement]): AtumContext = { + def createCheckpointOnProvidedData(checkpointName: String, measurements: Map[AtumMeasure, MeasureResult]): AtumContext = { val dateTimeNow = ZonedDateTime.now() val checkpointDTO = CheckpointDTO( @@ -113,7 +113,7 @@ class AtumContext private[agent] ( partitioning = AtumPartitions.toSeqPartitionDTO(this.atumPartitions), processStartTime = dateTimeNow, processEndTime = Some(dateTimeNow), - measurements = MeasurementBuilder.buildMeasurementsDTO(measurements) + measurements = MeasurementBuilder.buildAndValidateMeasurementsDTO(measurements) ) agent.saveCheckpoint(checkpointDTO) diff --git a/agent/src/main/scala/za/co/absa/atum/agent/model/MeasureResult.scala b/agent/src/main/scala/za/co/absa/atum/agent/model/MeasureResult.scala index 037f90811..aff933dbc 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/model/MeasureResult.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/model/MeasureResult.scala @@ -45,7 +45,8 @@ object MeasureResult { * These adjustments are needed to be performed - to avoid some floating point issues * (overflows, consistent representation of numbers - whether they are coming from Java or Scala world, and more). */ - case class MeasureResultByAtum private (resultValue: String, resultValueType: ResultValueType.ResultValueType) + case class MeasureResultByAtum private + (resultValue: String, resultValueType: ResultValueType.ResultValueType) extends MeasureResult {override type T = String} /** @@ -57,13 +58,14 @@ object MeasureResult { extends MeasureResult {override type T = ProvidedValueType} /** - * This method creates a measure result for a given result value. + * This method creates a measure result for a given result value. This is supposed to be used internally + * within Atum Agent basically only. * * @param resultValue A result value of the measurement. * @param resultType A result type of the measurement. * @return A measure result. */ - def apply(resultValue: String, resultType: ResultValueType.ResultValueType): MeasureResult = { + private[agent] def apply(resultValue: String, resultType: ResultValueType.ResultValueType): MeasureResult = { MeasureResultByAtum(resultValue, resultType) } diff --git a/agent/src/main/scala/za/co/absa/atum/agent/model/Measurement.scala b/agent/src/main/scala/za/co/absa/atum/agent/model/Measurement.scala deleted file mode 100644 index 08a56488a..000000000 --- a/agent/src/main/scala/za/co/absa/atum/agent/model/Measurement.scala +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Copyright 2021 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.atum.agent.model - -import za.co.absa.atum.agent.exception.AtumAgentException.MeasurementException -import za.co.absa.atum.agent.model.Measurement.validateMeasurement - -/** - * This class defines a contract for a measurement. - */ -final case class Measurement private (measure: AtumMeasure, result: MeasureResult) { - validateMeasurement(measure, result) -} - -object Measurement { - - private def validateMeasurement(measure: AtumMeasure, result: MeasureResult): Unit = { - val actualType = result.resultValueType - val requiredType = measure.resultValueType - - if (actualType != requiredType) - throw MeasurementException( - s"Type of a given provided measurement result and type that a given measure supports are not compatible! " + - s"Got $actualType but should be $requiredType" - ) - } -} diff --git a/agent/src/main/scala/za/co/absa/atum/agent/model/MeasurementBuilder.scala b/agent/src/main/scala/za/co/absa/atum/agent/model/MeasurementBuilder.scala index e07a62865..80e43a659 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/model/MeasurementBuilder.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/model/MeasurementBuilder.scala @@ -24,27 +24,20 @@ import za.co.absa.atum.model.dto.{MeasureDTO, MeasureResultDTO, MeasurementDTO} */ private [agent] object MeasurementBuilder { - private[agent] def validateMeasureUniqueness(measurements: Set[Measurement]): Unit = { - val allMeasures = measurements.toSeq.map(_.measure) + private def validateMeasureAndResultTypeCompatibility(measure: AtumMeasure, result: MeasureResult): Unit = { + val requiredType = measure.resultValueType + val actualType = result.resultValueType - val originalMeasuresCnt = allMeasures.size - val uniqueMeasureColumnCombinationCnt = allMeasures.map(m => - (m.measureName, m.measuredColumns) // there can't be two same measures defined on the same column(s) - ).distinct.size - - if (originalMeasuresCnt != uniqueMeasureColumnCombinationCnt) + if (actualType != requiredType) throw MeasurementException( - s"Measure and measuredColumn combinations must be unique, i.e. they cannot repeat! Got: $allMeasures" + s"Type of a given provided measurement result and type that a given measure supports are not compatible! " + + s"Got $actualType but should be $requiredType" ) } - private[agent] def buildMeasurementsDTO(measurements: Set[Measurement]): Set[MeasurementDTO] = { - validateMeasureUniqueness(measurements) - - measurements.map(m => buildMeasurementDTO(m.measure, m.result)) - } + def buildMeasurementDTO(measure: AtumMeasure, measureResult: MeasureResult): MeasurementDTO = { + validateMeasureAndResultTypeCompatibility(measure, measureResult) - private[agent] def buildMeasurementDTO(measure: AtumMeasure, measureResult: MeasureResult): MeasurementDTO = { val measureDTO = MeasureDTO(measure.measureName, measure.measuredColumns) val measureResultDTO = MeasureResultDTO( @@ -52,4 +45,12 @@ private [agent] object MeasurementBuilder { ) MeasurementDTO(measureDTO, measureResultDTO) } + + def buildAndValidateMeasurementsDTO(measurements: Map[AtumMeasure, MeasureResult]): Set[MeasurementDTO] = { + measurements.toSet[(AtumMeasure, MeasureResult)].map { case (measure: AtumMeasure, measureResult: MeasureResult) => + validateMeasureAndResultTypeCompatibility(measure, measureResult) + buildMeasurementDTO(measure, measureResult) + } + } + } diff --git a/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala index 63ac59590..e597cf064 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala @@ -23,9 +23,8 @@ import org.mockito.Mockito.{mock, times, verify, when} import org.scalatest.flatspec.AnyFlatSpec import org.scalatest.matchers.should.Matchers import za.co.absa.atum.agent.AtumContext.AtumPartitions -import za.co.absa.atum.agent.model.MeasurementBuilder +import za.co.absa.atum.agent.model.{AtumMeasure, MeasureResult, MeasurementBuilder} import za.co.absa.atum.agent.model.AtumMeasure._ -import za.co.absa.atum.agent.model.{Measurement, MeasureResult} import za.co.absa.atum.model.dto.CheckpointDTO import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType @@ -113,12 +112,13 @@ class AtumContextTest extends AnyFlatSpec with Matchers { val atumPartitions = AtumPartitions("key" -> "value") val atumContext: AtumContext = new AtumContext(atumPartitions, mockAgent) - val measurements = Set( - Measurement(RecordCount("col"), MeasureResult(1L)), - Measurement(SumOfValuesOfColumn("col"), MeasureResult(BigDecimal(1))) + val measurements: Map[AtumMeasure, MeasureResult] = Map( + RecordCount("col") -> MeasureResult(1L), + SumOfValuesOfColumn("col") -> MeasureResult(BigDecimal(1)) ) - atumContext.createCheckpointOnProvidedData(checkpointName = "name", measurements = measurements) + atumContext.createCheckpointOnProvidedData( + checkpointName = "name", measurements = measurements) val argument = ArgumentCaptor.forClass(classOf[CheckpointDTO]) verify(mockAgent).saveCheckpoint(argument.capture()) @@ -128,7 +128,7 @@ class AtumContextTest extends AnyFlatSpec with Matchers { assert(!argument.getValue.measuredByAtumAgent) assert(argument.getValue.partitioning == AtumPartitions.toSeqPartitionDTO(atumPartitions)) assert(argument.getValue.processStartTime == argument.getValue.processEndTime.get) - assert(argument.getValue.measurements == MeasurementBuilder.buildMeasurementsDTO(measurements)) + assert(argument.getValue.measurements == MeasurementBuilder.buildAndValidateMeasurementsDTO(measurements)) } "createCheckpoint" should "take measurements and create a Checkpoint, multiple measure changes" in { diff --git a/agent/src/test/scala/za/co/absa/atum/agent/model/MeasurementBuilderTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/model/MeasurementBuilderTest.scala index 6217586fe..88215ff34 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/model/MeasurementBuilderTest.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/model/MeasurementBuilderTest.scala @@ -20,49 +20,11 @@ import org.scalatest.flatspec.AnyFlatSpec import za.co.absa.atum.agent.exception.AtumAgentException.MeasurementException import za.co.absa.atum.model.dto.{MeasureDTO, MeasureResultDTO, MeasurementDTO} import za.co.absa.atum.agent.model.AtumMeasure._ -import za.co.absa.atum.agent.model.MeasurementBuilder.validateMeasureUniqueness import za.co.absa.atum.model.dto.MeasureResultDTO.{ResultValueType, TypedValue} class MeasurementBuilderTest extends AnyFlatSpec { - "validateMeasureUniqueness" should "accept unique measurements with unique measures" in { - val measurements = Set( - Measurement(RecordCount(), MeasureResult("1", ResultValueType.Long)), - Measurement(SumOfValuesOfColumn("col1"), MeasureResult(BigDecimal(1.2))), - Measurement(DistinctRecordCount(Seq("col2")), MeasureResult(3L)) - ) - validateMeasureUniqueness(measurements) - } - - "validateMeasureUniqueness" should "accept duplicated measures but with different measured columns" in { - val measurements = Set( - Measurement(RecordCount(), MeasureResult("1", ResultValueType.Long)), - Measurement(SumOfValuesOfColumn("col1"), MeasureResult(BigDecimal(1.2))), - Measurement(SumOfValuesOfColumn("col2"), MeasureResult(BigDecimal(1.4))), - Measurement(DistinctRecordCount(Seq("col2")), MeasureResult(3L)) - ) - validateMeasureUniqueness(measurements) - } - - "validateMeasureUniqueness" should "fail on duplicated measures with repeated measured columns" in { - val measurements = Set( - Measurement(RecordCount(), MeasureResult("1", ResultValueType.Long)), - Measurement(SumOfValuesOfColumn("col1"), MeasureResult(BigDecimal(1.2))), - Measurement(SumOfValuesOfColumn("col1"), MeasureResult(BigDecimal(1.4))), - Measurement(DistinctRecordCount(Seq("col2")), MeasureResult(3L)) - ) - assertThrows[MeasurementException](validateMeasureUniqueness(measurements)) - } - - "validateMeasureUniqueness" should "fail on duplicated measurements with different results" in { - val measurements = Set( - Measurement(RecordCount(), MeasureResult("1", ResultValueType.Long)), - Measurement(RecordCount(), MeasureResult("2", ResultValueType.Long)) - ) - assertThrows[MeasurementException](validateMeasureUniqueness(measurements)) - } - - "buildMeasurementsDTO" should + "buildMeasurementDTO" should "build MeasurementDTO for BigDecimal type of result value when Measure and MeasureResult provided" in { val measure = SumOfValuesOfColumn("col") @@ -80,7 +42,7 @@ class MeasurementBuilderTest extends AnyFlatSpec { assert(measurementDTO.result == expectedMeasureResultDTO) } - "buildMeasurementsDTO" should + "buildMeasurementDTO" should "build MeasurementDTO for BigDecimal type of result value when Measurement provided" in { val measure = SumOfValuesOfColumn("col") @@ -92,7 +54,7 @@ class MeasurementBuilderTest extends AnyFlatSpec { assert(measurementDTO.result.mainValue == expectedTypedValue) } - "buildMeasurementsDTO" should + "buildMeasurementDTO" should "build MeasurementDTO (at least for now) for compatible result type but incompatible actual type of result value " + "when Measurement provided" in { @@ -106,7 +68,7 @@ class MeasurementBuilderTest extends AnyFlatSpec { assert(measurementDTO.result.mainValue == expectedTypedValue) } - "buildMeasurementsDTO" should + "buildMeasurementDTO" should "build MeasurementDTO for BigDecimal type of result value when measured by Agent" in { val measure = SumOfValuesOfColumn("col") @@ -124,14 +86,13 @@ class MeasurementBuilderTest extends AnyFlatSpec { assert(measurementDTO.result == expectedMeasureResultDTO) } - - "buildMeasurementsDTO" should "build Seq[MeasurementDTO] for multiple measures, all unique" in { - val measurements = Set( - Measurement(DistinctRecordCount(Seq("col")), MeasureResult("1", ResultValueType.Long)), - Measurement(SumOfValuesOfColumn("col1"), MeasureResult(BigDecimal(1.2))), - Measurement(SumOfValuesOfColumn("col2"), MeasureResult(BigDecimal(1.3))) + "buildAndValidateMeasurementsDTO" should "build Seq[MeasurementDTO] for multiple measures, all unique" in { + val measurements: Map[AtumMeasure, MeasureResult] = Map( + DistinctRecordCount(Seq("col")) -> MeasureResult("1", ResultValueType.Long), + SumOfValuesOfColumn("col1") -> MeasureResult(BigDecimal(1.2)), + SumOfValuesOfColumn("col2") -> MeasureResult(BigDecimal(1.3)) ) - val measurementDTOs = MeasurementBuilder.buildMeasurementsDTO(measurements) + val measurementDTOs = MeasurementBuilder.buildAndValidateMeasurementsDTO(measurements) val expectedMeasurementDTO = Set( MeasurementDTO( @@ -148,12 +109,43 @@ class MeasurementBuilderTest extends AnyFlatSpec { assert(measurementDTOs == expectedMeasurementDTO) } - "buildMeasurementsDTO" should "throw exception for multiple measures, some of them repetitive" in { - val measurements = Set( - Measurement(DistinctRecordCount(Seq("col")), MeasureResult(1L)), - Measurement(SumOfValuesOfColumn("col"), MeasureResult(BigDecimal(1.2))), - Measurement(SumOfValuesOfColumn("col"), MeasureResult(BigDecimal(1.3))) + "buildAndValidateMeasurementsDTO" should "throw exception for unsupported result value - Double instead of BigDecimal" in { + val measure = AbsSumOfValuesOfColumn("col") + + assertThrows[MeasurementException]( + MeasurementBuilder.buildAndValidateMeasurementsDTO(Map(measure -> MeasureResult(1.0))) + ) + } + + "buildAndValidateMeasurementsDTO" should "throw exception for unsupported result value - Int instead of BigDecimal" in { + val measure = SumOfValuesOfColumn("col") + + assertThrows[MeasurementException]( + MeasurementBuilder.buildAndValidateMeasurementsDTO(Map(measure -> MeasureResult(1))) + ) + } + + "buildAndValidateMeasurementsDTO" should "throw exception for unsupported result value type in general (composite)" in { + val measure = SumOfHashesOfColumn("col") + + assertThrows[MeasurementException]( + MeasurementBuilder.buildAndValidateMeasurementsDTO(Map(measure -> MeasureResult(Map(1 -> "no-go")))) + ) + } + + "buildAndValidateMeasurementsDTO" should "throw exception for unsupported result value type for a given Measure" in { + val measure = DistinctRecordCount(Seq("col")) + + assertThrows[MeasurementException]( + MeasurementBuilder.buildAndValidateMeasurementsDTO(Map(measure -> MeasureResult("1"))) + ) + } + + "buildAndValidateMeasurementsDTO" should "throw exception for incompatible String type of result value when Measurement provided" in { + val measure = SumOfValuesOfColumn("col") + + assertThrows[MeasurementException]( + MeasurementBuilder.buildAndValidateMeasurementsDTO(Map(measure -> MeasureResult("stringValue", ResultValueType.String))) ) - assertThrows[MeasurementException](MeasurementBuilder.buildMeasurementsDTO(measurements)) } } diff --git a/agent/src/test/scala/za/co/absa/atum/agent/model/MeasurementTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/model/MeasurementTest.scala deleted file mode 100644 index db02fcb6b..000000000 --- a/agent/src/test/scala/za/co/absa/atum/agent/model/MeasurementTest.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright 2021 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package za.co.absa.atum.agent.model - -import org.scalatest.flatspec.AnyFlatSpec -import org.scalatest.matchers.should.Matchers -import za.co.absa.atum.agent.exception.AtumAgentException.MeasurementException -import za.co.absa.atum.agent.model.AtumMeasure._ -import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType -import za.co.absa.spark.commons.test.SparkTestBase - -class MeasurementTest extends AnyFlatSpec with Matchers with SparkTestBase { self => - - "Measurement" should "be able to be converted to MeasureResultWithType internally when the result is BigDecimal" in { - val measure = AbsSumOfValuesOfColumn("col") - val actualMeasurement = Measurement(measure, MeasureResult(BigDecimal(1.0))) - - assert(actualMeasurement.result.resultValue == 1.0) - assert(actualMeasurement.result.resultValueType == ResultValueType.BigDecimal) - } - - "Measurement" should "throw exception for unsupported result value - Double instead of BigDecimal" in { - val measure = AbsSumOfValuesOfColumn("col") - assertThrows[MeasurementException](Measurement(measure, MeasureResult(1.0))) - } - - "Measurement" should "throw exception for unsupported result value type in general (scalar)" in { - val measure = SumOfValuesOfColumn("col") - assertThrows[MeasurementException](Measurement(measure, MeasureResult(1))) - } - - "Measurement" should "throw exception for unsupported result value type in general (composite)" in { - val measure = SumOfHashesOfColumn("col") - assertThrows[MeasurementException](Measurement(measure, MeasureResult(Map(1 -> "no-go")))) - } - - "Measurement" should "throw exception for unsupported result value type for a given Measure" in { - val measure = DistinctRecordCount(Seq("col")) - assertThrows[MeasurementException](Measurement(measure, MeasureResult("1"))) - } - - "Measurement" should "throw exception for incompatible String type of result value when Measurement provided" in { - val measure = SumOfValuesOfColumn("col") - - assertThrows[MeasurementException]( - Measurement(measure, MeasureResult("stringValue", ResultValueType.String)) - ) - } - - "Measurement" should "throw exception for unsupported (slightly different FPN) result value type for a given Measure" in { - val measure = SumOfValuesOfColumn("col") - assertThrows[MeasurementException](Measurement(measure, MeasureResult(1.0))) - } -}