Skip to content

Commit

Permalink
Better validation handling - Set for measurements and measures as the…
Browse files Browse the repository at this point in the history
… order there is not important
  • Loading branch information
lsulak committed Nov 1, 2023
1 parent 7b013f9 commit 0b3b5b4
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 19 deletions.
4 changes: 2 additions & 2 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,14 @@ class AtumContext private[agent] (
partitioning = AtumPartitions.toSeqPartitionDTO(this.atumPartitions),
processStartTime = startTime,
processEndTime = Some(endTime),
measurements = measurementDTOs.toSeq
measurements = measurementDTOs
)

agent.saveCheckpoint(checkpointDTO)
this
}

def createCheckpointOnProvidedData(checkpointName: String, author: String, measurements: Seq[Measurement]): AtumContext = {
def createCheckpointOnProvidedData(checkpointName: String, author: String, measurements: Set[Measurement]): AtumContext = {
val offsetDateTimeNow = OffsetDateTime.now()

val checkpointDTO = CheckpointDTO(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,19 @@ private [agent] object MeasurementBuilder {
)
}

private def validateMeasuresUniqueness(measures: Seq[Measure]): Unit = {
val originalMeasureCnt = measures.size
val uniqueMeasuresCnt = measures.map(m => Tuple2(m.measureName, m.controlCol)).distinct.size
private def validateMeasurementUniqueness(measurements: Set[Measurement]): Unit = {
val originalMeasurementsCnt = measurements.size
val uniqueMeasuresCnt = measurements.toSeq.map(m =>
Tuple2(m.measure.measureName, m.measure.controlCol) // there can't be 2 same measures defined on the same column
).distinct.size

val areMeasuresUnique = originalMeasureCnt == uniqueMeasuresCnt
val areMeasuresUnique = originalMeasurementsCnt == uniqueMeasuresCnt

require(areMeasuresUnique, s"Measures must be unique, i.e. they cannot repeat! Got: $measures")
require(areMeasuresUnique, s"Measures must be unique, i.e. they cannot repeat! Got: ${measurements.map(_.measure)}")
}

private[agent] def buildMeasurementDTO(measurements: Seq[Measurement]): Seq[MeasurementDTO] = {
val allMeasures = measurements.map(_.measure)
validateMeasuresUniqueness(allMeasures)
private[agent] def buildMeasurementDTO(measurements: Set[Measurement]): Set[MeasurementDTO] = {
validateMeasurementUniqueness(measurements)

measurements.map(m => buildMeasurementDTO(m.measure, m.result))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class AtumContextTest extends AnyFlatSpec with Matchers {
val atumPartitions = AtumPartitions("key" -> "value")
val atumContext: AtumContext = new AtumContext(atumPartitions, mockAgent)

val measurements = Seq(
val measurements = Set(
Measurement(RecordCount("col"), MeasureResult(1L)),
Measurement(SumOfValuesOfColumn("col"), MeasureResult(BigDecimal(1)))
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,14 @@ class MeasurementBuilderTest extends AnyFlatSpec {
}

"buildMeasurementDTO" should "build Seq[MeasurementDTO] for multiple measures, all unique" in {
val measurements = Seq(
val measurements = Set(
Measurement(DistinctRecordCount("col"), MeasureResult("1", ResultValueType.Long)),
Measurement(SumOfValuesOfColumn("col1"), MeasureResult(BigDecimal(1.2))),
Measurement(SumOfValuesOfColumn("col2"), MeasureResult(BigDecimal(1.3)))
)
val measurementDTOs = MeasurementBuilder.buildMeasurementDTO(measurements)

val expectedMeasurementDTO = Seq(
val expectedMeasurementDTO = Set(
MeasurementDTO(
MeasureDTO("distinctCount", Seq("col")), MeasureResultDTO(TypedValue("1", ResultValueType.Long))
),
Expand All @@ -141,7 +141,7 @@ class MeasurementBuilderTest extends AnyFlatSpec {
}

"buildMeasurementDTO" should "throw exception for multiple measures, some of them repetitive" in {
val measurements = Seq(
val measurements = Set(
Measurement(DistinctRecordCount("col"), MeasureResult("1")),
Measurement(SumOfValuesOfColumn("col"), MeasureResult(BigDecimal(1.2))),
Measurement(SumOfValuesOfColumn("col"), MeasureResult(BigDecimal(1.3)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,5 @@ case class CheckpointDTO(
partitioning: Seq[PartitionDTO],
processStartTime: OffsetDateTime,
processEndTime: Option[OffsetDateTime],
measurements: Seq[MeasurementDTO]
measurements: Set[MeasurementDTO]
)
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class SerializationUtilsTest extends AnyFlatSpecLike {
val seqPartitionDTO = Seq(PartitionDTO("key", "val"))
val timeWithZone = OffsetDateTime.of(2023, 10, 24, 10, 20, 59, 5000000, ZoneOffset.ofHours(2))

val seqMeasurementDTO = Seq(
val setMeasurementDTO = Set(
MeasurementDTO(
measure = MeasureDTO("count", Seq("col")), result = MeasureResultDTO(
mainValue = TypedValue("1", ResultValueType.Long)
Expand All @@ -147,7 +147,7 @@ class SerializationUtilsTest extends AnyFlatSpecLike {
partitioning = seqPartitionDTO,
processStartTime = timeWithZone,
processEndTime = Some(timeWithZone),
measurements = seqMeasurementDTO
measurements = setMeasurementDTO
)

val expectedCheckpointDTOJson = "{\"id\":\"" + uuid + "\",\"name\":\"checkpoint\",\"author\":\"author\",\"measuredByAtumAgent\":true,\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"processStartTime\":\"2023-10-24 10:20:59.005000+02\",\"processEndTime\":\"2023-10-24 10:20:59.005000+02\",\"measurements\":[{\"measure\":{\"measureName\":\"count\",\"controlColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}]}"
Expand All @@ -163,7 +163,7 @@ class SerializationUtilsTest extends AnyFlatSpecLike {

val checkpointDTOJson = "{\"id\":\"" + uuid + "\",\"name\":\"checkpoint\",\"author\":\"author\",\"measuredByAtumAgent\":true,\"partitioning\":[{\"key\":\"key\",\"value\":\"val\"}],\"processStartTime\":\"2023-10-24 10:20:59.005000+02\",\"processEndTime\":\"2023-10-24 10:20:59.005000+02\",\"measurements\":[{\"measure\":{\"measureName\":\"count\",\"controlColumns\":[\"col\"]},\"result\":{\"mainValue\":{\"value\":\"1\",\"valueType\":\"Long\"},\"supportValues\":{}}}]}"

val seqMeasurementDTO = Seq(
val setMeasurementDTO = Set(
MeasurementDTO(
measure = MeasureDTO("count", Seq("col")), result = MeasureResultDTO(
mainValue = TypedValue("1", ResultValueType.Long)
Expand All @@ -179,7 +179,7 @@ class SerializationUtilsTest extends AnyFlatSpecLike {
partitioning = seqPartitionDTO,
processStartTime = timeWithZone,
processEndTime = Some(timeWithZone),
measurements = seqMeasurementDTO
measurements = setMeasurementDTO
)

val actualCheckpointDTO = SerializationUtils.fromJson[CheckpointDTO](checkpointDTOJson)
Expand Down

0 comments on commit 0b3b5b4

Please sign in to comment.