Skip to content

Commit

Permalink
#105: dropping Measurement in agent and thus simplifying the code eve…
Browse files Browse the repository at this point in the history
…n more
  • Loading branch information
lsulak committed Feb 27, 2024
1 parent 8beceef commit 830be6e
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 191 deletions.
4 changes: 2 additions & 2 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class AtumContext private[agent] (
* @param measurements the measurements to be included in the checkpoint
* @return the AtumContext after the checkpoint has been created
*/
def createCheckpointOnProvidedData(checkpointName: String, measurements: Set[Measurement]): AtumContext = {
def createCheckpointOnProvidedData(checkpointName: String, measurements: Map[AtumMeasure, MeasureResult]): AtumContext = {
val dateTimeNow = ZonedDateTime.now()

val checkpointDTO = CheckpointDTO(
Expand All @@ -113,7 +113,7 @@ class AtumContext private[agent] (
partitioning = AtumPartitions.toSeqPartitionDTO(this.atumPartitions),
processStartTime = dateTimeNow,
processEndTime = Some(dateTimeNow),
measurements = MeasurementBuilder.buildMeasurementsDTO(measurements)
measurements = MeasurementBuilder.buildAndValidateMeasurementsDTO(measurements)
)

agent.saveCheckpoint(checkpointDTO)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ object MeasureResult {
* These adjustments are needed to be performed - to avoid some floating point issues
* (overflows, consistent representation of numbers - whether they are coming from Java or Scala world, and more).
*/
case class MeasureResultByAtum private (resultValue: String, resultValueType: ResultValueType.ResultValueType)
case class MeasureResultByAtum private
(resultValue: String, resultValueType: ResultValueType.ResultValueType)
extends MeasureResult {override type T = String}

/**
Expand All @@ -57,13 +58,14 @@ object MeasureResult {
extends MeasureResult {override type T = ProvidedValueType}

/**
* This method creates a measure result for a given result value.
* This method creates a measure result for a given result value. This is supposed to be used internally
* within Atum Agent basically only.
*
* @param resultValue A result value of the measurement.
* @param resultType A result type of the measurement.
* @return A measure result.
*/
def apply(resultValue: String, resultType: ResultValueType.ResultValueType): MeasureResult = {
private[agent] def apply(resultValue: String, resultType: ResultValueType.ResultValueType): MeasureResult = {
MeasureResultByAtum(resultValue, resultType)
}

Expand Down
41 changes: 0 additions & 41 deletions agent/src/main/scala/za/co/absa/atum/agent/model/Measurement.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,33 @@ import za.co.absa.atum.model.dto.{MeasureDTO, MeasureResultDTO, MeasurementDTO}
*/
private [agent] object MeasurementBuilder {

private[agent] def validateMeasureUniqueness(measurements: Set[Measurement]): Unit = {
val allMeasures = measurements.toSeq.map(_.measure)
private def validateMeasureAndResultTypeCompatibility(measure: AtumMeasure, result: MeasureResult): Unit = {
val requiredType = measure.resultValueType
val actualType = result.resultValueType

val originalMeasuresCnt = allMeasures.size
val uniqueMeasureColumnCombinationCnt = allMeasures.map(m =>
(m.measureName, m.measuredColumns) // there can't be two same measures defined on the same column(s)
).distinct.size

if (originalMeasuresCnt != uniqueMeasureColumnCombinationCnt)
if (actualType != requiredType)
throw MeasurementException(
s"Measure and measuredColumn combinations must be unique, i.e. they cannot repeat! Got: $allMeasures"
s"Type of a given provided measurement result and type that a given measure supports are not compatible! " +
s"Got $actualType but should be $requiredType"
)
}

private[agent] def buildMeasurementsDTO(measurements: Set[Measurement]): Set[MeasurementDTO] = {
validateMeasureUniqueness(measurements)

measurements.map(m => buildMeasurementDTO(m.measure, m.result))
}
def buildMeasurementDTO(measure: AtumMeasure, measureResult: MeasureResult): MeasurementDTO = {
validateMeasureAndResultTypeCompatibility(measure, measureResult)

private[agent] def buildMeasurementDTO(measure: AtumMeasure, measureResult: MeasureResult): MeasurementDTO = {
val measureDTO = MeasureDTO(measure.measureName, measure.measuredColumns)

val measureResultDTO = MeasureResultDTO(
MeasureResultDTO.TypedValue(measureResult.resultValue.toString, measureResult.resultValueType)
)
MeasurementDTO(measureDTO, measureResultDTO)
}

def buildAndValidateMeasurementsDTO(measurements: Map[AtumMeasure, MeasureResult]): Set[MeasurementDTO] = {
measurements.toSet[(AtumMeasure, MeasureResult)].map { case (measure: AtumMeasure, measureResult: MeasureResult) =>
validateMeasureAndResultTypeCompatibility(measure, measureResult)
buildMeasurementDTO(measure, measureResult)
}
}

}
14 changes: 7 additions & 7 deletions agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ import org.mockito.Mockito.{mock, times, verify, when}
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.model.MeasurementBuilder
import za.co.absa.atum.agent.model.{AtumMeasure, MeasureResult, MeasurementBuilder}
import za.co.absa.atum.agent.model.AtumMeasure._
import za.co.absa.atum.agent.model.{Measurement, MeasureResult}
import za.co.absa.atum.model.dto.CheckpointDTO
import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType

Expand Down Expand Up @@ -113,12 +112,13 @@ class AtumContextTest extends AnyFlatSpec with Matchers {
val atumPartitions = AtumPartitions("key" -> "value")
val atumContext: AtumContext = new AtumContext(atumPartitions, mockAgent)

val measurements = Set(
Measurement(RecordCount("col"), MeasureResult(1L)),
Measurement(SumOfValuesOfColumn("col"), MeasureResult(BigDecimal(1)))
val measurements: Map[AtumMeasure, MeasureResult] = Map(
RecordCount("col") -> MeasureResult(1L),
SumOfValuesOfColumn("col") -> MeasureResult(BigDecimal(1))
)

atumContext.createCheckpointOnProvidedData(checkpointName = "name", measurements = measurements)
atumContext.createCheckpointOnProvidedData(
checkpointName = "name", measurements = measurements)

val argument = ArgumentCaptor.forClass(classOf[CheckpointDTO])
verify(mockAgent).saveCheckpoint(argument.capture())
Expand All @@ -128,7 +128,7 @@ class AtumContextTest extends AnyFlatSpec with Matchers {
assert(!argument.getValue.measuredByAtumAgent)
assert(argument.getValue.partitioning == AtumPartitions.toSeqPartitionDTO(atumPartitions))
assert(argument.getValue.processStartTime == argument.getValue.processEndTime.get)
assert(argument.getValue.measurements == MeasurementBuilder.buildMeasurementsDTO(measurements))
assert(argument.getValue.measurements == MeasurementBuilder.buildAndValidateMeasurementsDTO(measurements))
}

"createCheckpoint" should "take measurements and create a Checkpoint, multiple measure changes" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,49 +20,11 @@ import org.scalatest.flatspec.AnyFlatSpec
import za.co.absa.atum.agent.exception.AtumAgentException.MeasurementException
import za.co.absa.atum.model.dto.{MeasureDTO, MeasureResultDTO, MeasurementDTO}
import za.co.absa.atum.agent.model.AtumMeasure._
import za.co.absa.atum.agent.model.MeasurementBuilder.validateMeasureUniqueness
import za.co.absa.atum.model.dto.MeasureResultDTO.{ResultValueType, TypedValue}

class MeasurementBuilderTest extends AnyFlatSpec {

"validateMeasureUniqueness" should "accept unique measurements with unique measures" in {
val measurements = Set(
Measurement(RecordCount(), MeasureResult("1", ResultValueType.Long)),
Measurement(SumOfValuesOfColumn("col1"), MeasureResult(BigDecimal(1.2))),
Measurement(DistinctRecordCount(Seq("col2")), MeasureResult(3L))
)
validateMeasureUniqueness(measurements)
}

"validateMeasureUniqueness" should "accept duplicated measures but with different measured columns" in {
val measurements = Set(
Measurement(RecordCount(), MeasureResult("1", ResultValueType.Long)),
Measurement(SumOfValuesOfColumn("col1"), MeasureResult(BigDecimal(1.2))),
Measurement(SumOfValuesOfColumn("col2"), MeasureResult(BigDecimal(1.4))),
Measurement(DistinctRecordCount(Seq("col2")), MeasureResult(3L))
)
validateMeasureUniqueness(measurements)
}

"validateMeasureUniqueness" should "fail on duplicated measures with repeated measured columns" in {
val measurements = Set(
Measurement(RecordCount(), MeasureResult("1", ResultValueType.Long)),
Measurement(SumOfValuesOfColumn("col1"), MeasureResult(BigDecimal(1.2))),
Measurement(SumOfValuesOfColumn("col1"), MeasureResult(BigDecimal(1.4))),
Measurement(DistinctRecordCount(Seq("col2")), MeasureResult(3L))
)
assertThrows[MeasurementException](validateMeasureUniqueness(measurements))
}

"validateMeasureUniqueness" should "fail on duplicated measurements with different results" in {
val measurements = Set(
Measurement(RecordCount(), MeasureResult("1", ResultValueType.Long)),
Measurement(RecordCount(), MeasureResult("2", ResultValueType.Long))
)
assertThrows[MeasurementException](validateMeasureUniqueness(measurements))
}

"buildMeasurementsDTO" should
"buildMeasurementDTO" should
"build MeasurementDTO for BigDecimal type of result value when Measure and MeasureResult provided" in {

val measure = SumOfValuesOfColumn("col")
Expand All @@ -80,7 +42,7 @@ class MeasurementBuilderTest extends AnyFlatSpec {
assert(measurementDTO.result == expectedMeasureResultDTO)
}

"buildMeasurementsDTO" should
"buildMeasurementDTO" should
"build MeasurementDTO for BigDecimal type of result value when Measurement provided" in {

val measure = SumOfValuesOfColumn("col")
Expand All @@ -92,7 +54,7 @@ class MeasurementBuilderTest extends AnyFlatSpec {
assert(measurementDTO.result.mainValue == expectedTypedValue)
}

"buildMeasurementsDTO" should
"buildMeasurementDTO" should
"build MeasurementDTO (at least for now) for compatible result type but incompatible actual type of result value " +
"when Measurement provided" in {

Expand All @@ -106,7 +68,7 @@ class MeasurementBuilderTest extends AnyFlatSpec {
assert(measurementDTO.result.mainValue == expectedTypedValue)
}

"buildMeasurementsDTO" should
"buildMeasurementDTO" should
"build MeasurementDTO for BigDecimal type of result value when measured by Agent" in {

val measure = SumOfValuesOfColumn("col")
Expand All @@ -124,14 +86,13 @@ class MeasurementBuilderTest extends AnyFlatSpec {
assert(measurementDTO.result == expectedMeasureResultDTO)
}


"buildMeasurementsDTO" should "build Seq[MeasurementDTO] for multiple measures, all unique" in {
val measurements = Set(
Measurement(DistinctRecordCount(Seq("col")), MeasureResult("1", ResultValueType.Long)),
Measurement(SumOfValuesOfColumn("col1"), MeasureResult(BigDecimal(1.2))),
Measurement(SumOfValuesOfColumn("col2"), MeasureResult(BigDecimal(1.3)))
"buildAndValidateMeasurementsDTO" should "build Seq[MeasurementDTO] for multiple measures, all unique" in {
val measurements: Map[AtumMeasure, MeasureResult] = Map(
DistinctRecordCount(Seq("col")) -> MeasureResult("1", ResultValueType.Long),
SumOfValuesOfColumn("col1") -> MeasureResult(BigDecimal(1.2)),
SumOfValuesOfColumn("col2") -> MeasureResult(BigDecimal(1.3))
)
val measurementDTOs = MeasurementBuilder.buildMeasurementsDTO(measurements)
val measurementDTOs = MeasurementBuilder.buildAndValidateMeasurementsDTO(measurements)

val expectedMeasurementDTO = Set(
MeasurementDTO(
Expand All @@ -148,12 +109,43 @@ class MeasurementBuilderTest extends AnyFlatSpec {
assert(measurementDTOs == expectedMeasurementDTO)
}

"buildMeasurementsDTO" should "throw exception for multiple measures, some of them repetitive" in {
val measurements = Set(
Measurement(DistinctRecordCount(Seq("col")), MeasureResult(1L)),
Measurement(SumOfValuesOfColumn("col"), MeasureResult(BigDecimal(1.2))),
Measurement(SumOfValuesOfColumn("col"), MeasureResult(BigDecimal(1.3)))
"buildAndValidateMeasurementsDTO" should "throw exception for unsupported result value - Double instead of BigDecimal" in {
val measure = AbsSumOfValuesOfColumn("col")

assertThrows[MeasurementException](
MeasurementBuilder.buildAndValidateMeasurementsDTO(Map(measure -> MeasureResult(1.0)))
)
}

"buildAndValidateMeasurementsDTO" should "throw exception for unsupported result value - Int instead of BigDecimal" in {
val measure = SumOfValuesOfColumn("col")

assertThrows[MeasurementException](
MeasurementBuilder.buildAndValidateMeasurementsDTO(Map(measure -> MeasureResult(1)))
)
}

"buildAndValidateMeasurementsDTO" should "throw exception for unsupported result value type in general (composite)" in {
val measure = SumOfHashesOfColumn("col")

assertThrows[MeasurementException](
MeasurementBuilder.buildAndValidateMeasurementsDTO(Map(measure -> MeasureResult(Map(1 -> "no-go"))))
)
}

"buildAndValidateMeasurementsDTO" should "throw exception for unsupported result value type for a given Measure" in {
val measure = DistinctRecordCount(Seq("col"))

assertThrows[MeasurementException](
MeasurementBuilder.buildAndValidateMeasurementsDTO(Map(measure -> MeasureResult("1")))
)
}

"buildAndValidateMeasurementsDTO" should "throw exception for incompatible String type of result value when Measurement provided" in {
val measure = SumOfValuesOfColumn("col")

assertThrows[MeasurementException](
MeasurementBuilder.buildAndValidateMeasurementsDTO(Map(measure -> MeasureResult("stringValue", ResultValueType.String)))
)
assertThrows[MeasurementException](MeasurementBuilder.buildMeasurementsDTO(measurements))
}
}

This file was deleted.

0 comments on commit 830be6e

Please sign in to comment.