Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(only idea!) #94 measure validation on column #95

Closed
wants to merge 25 commits into from
Closed
Changes from 5 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
1d08ad8
#26 add checkpoint creation function
cerveada Oct 12, 2023
19be511
Merge branch 'master' into feature/26-checkpoint-creation-function
cerveada Oct 20, 2023
1c96dab
Merge remote-tracking branch 'origin/master' into feature/26-checkpoi…
lsulak Oct 23, 2023
02d829e
#26: finishing bear minimum of agent side
lsulak Oct 24, 2023
7fdfa56
#94: measure validation for a given column and small refactoring
lsulak Oct 25, 2023
e035c84
#26: post-review improvements and refactoring
lsulak Oct 26, 2023
93ee090
#26: extra validation, putting emphasis on types, some generics, more…
lsulak Oct 26, 2023
bc7eb58
Merge remote-tracking branch 'origin/master' into feature/26-checkpoi…
lsulak Oct 26, 2023
149a624
#26: functionName -> measureName, to be consistent
lsulak Oct 26, 2023
fab926d
Merge remote-tracking branch 'origin/feature/26-checkpoint-creation-f…
lsulak Oct 26, 2023
9730fa3
Merge branch 'master' into feature/26-checkpoint-creation-function
lsulak Oct 27, 2023
108b9e6
Merge branch 'feature/26-checkpoint-creation-function' into feature/9…
lsulak Oct 27, 2023
ecd7731
#94: refactoring and fully fixing build
lsulak Oct 27, 2023
cf9dc69
#94: removing redundant import
lsulak Oct 27, 2023
f123b6f
#26: post-review changes
lsulak Oct 27, 2023
f2928f5
#26: fixing unit test
lsulak Oct 28, 2023
c2c3405
#26: removing redundant internal Agent's Checkpoint model and some re…
lsulak Oct 28, 2023
4f0990a
Merge remote-tracking branch 'origin/feature/26-checkpoint-creation-f…
lsulak Oct 28, 2023
55acd58
#94: fixing unit tests, post merge conflicts
lsulak Oct 28, 2023
9c1f428
#94: fixing unit tests, post merge conflicts
lsulak Oct 28, 2023
a31fd04
Merge remote-tracking branch 'origin/master' into feature/26-checkpoi…
lsulak Oct 30, 2023
9be5528
Merge remote-tracking branch 'origin/feature/26-checkpoint-creation-f…
lsulak Oct 30, 2023
46ad700
#26: removing onlyForNumeric as it's not currently used and we might …
lsulak Oct 31, 2023
d60a872
Merge branch 'feature/26-checkpoint-creation-function' into feature/9…
lsulak Oct 31, 2023
89e645a
#94: adding back here, but I know that it might be removed soon
lsulak Oct 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@ import za.co.absa.atum.agent.model.Checkpoint
import za.co.absa.atum.model.dto.{CheckpointDTO, PartitioningDTO}

/**
* Place holder for the agent that communicate with the API.
* Entity that communicate with the API, primarily focused on spawning Atum Context(s).
*/
class AtumAgent private[agent] () {

@@ -39,7 +39,7 @@ class AtumAgent private[agent] () {
*
* @param checkpoint Already initialized Checkpoint object to store
*/
def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
private [agent] def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
dispatcher.saveCheckpoint(checkpoint)
}

@@ -48,7 +48,7 @@ class AtumAgent private[agent] () {
*
* @param checkpoint Already initialized Checkpoint object to store
*/
def saveCheckpoint(checkpoint: Checkpoint): Unit = {
private [agent] def saveCheckpoint(checkpoint: Checkpoint): Unit = {
dispatcher.saveCheckpoint(checkpoint.toCheckpointDTO)
}

38 changes: 27 additions & 11 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala
Original file line number Diff line number Diff line change
@@ -20,7 +20,7 @@ import org.slf4s.Logging
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.NumericType
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.model.{Checkpoint, Measure, Measurement, MeasurementByAtum, MeasuresMapper}
import za.co.absa.atum.agent.model._
import za.co.absa.atum.model.dto._

import java.time.OffsetDateTime
@@ -54,14 +54,14 @@ class AtumContext private[agent] (
val colDataType = df.select(measure.controlCol).schema.fields(0).dataType
val isColDataTypeNumeric = colDataType.isInstanceOf[NumericType]
if (measure.onlyForNumeric && !isColDataTypeNumeric) {
log.warn( // TODO: discuss, throw exception or warn message? Or both, parametrized?
log.warn( // TODO: discuss, throw exception or warn message? Or both, parametrized?
s"Column ${measure.controlCol} measurement ${measure.measureName} requested, but the field is not numeric! " +
s"Found: ${colDataType.simpleString} datatype."
)
}
}

private def takeMeasurements(df: DataFrame): Set[Measurement] = {
private def takeMeasurements(df: DataFrame): Set[MeasurementByAtum] = {
measures.map { m =>
validateMeasureApplicability(m, df)

@@ -70,10 +70,10 @@ class AtumContext private[agent] (
}
}

def createCheckpoint(checkpointName: String, author: String, dataToMeasure: DataFrame): Checkpoint = {
val startTime = ZonedDateTime.now()
private [agent] def createCheckpoint(checkpointName: String, author: String, dataToMeasure: DataFrame): Checkpoint = {
val startTime = OffsetDateTime.now()
val measurements = takeMeasurements(dataToMeasure)
val endTime = ZonedDateTime.now()
val endTime = OffsetDateTime.now()

Checkpoint(
name = checkpointName,
@@ -86,10 +86,18 @@ class AtumContext private[agent] (
)
}

def createCheckpointOnProvidedData(
def createAndSaveCheckpoint(checkpointName: String, author: String, dataToMeasure: DataFrame): AtumContext = {
val checkpoint = createCheckpoint(checkpointName, author, dataToMeasure)
val checkpointDTO = checkpoint.toCheckpointDTO

agent.saveCheckpoint(checkpointDTO)
this
}

private [agent] def createCheckpointOnProvidedData(
checkpointName: String, author: String, measurements: Seq[Measurement]
): Checkpoint = {
val zonedDateTimeNow = ZonedDateTime.now()
val offsetDateTimeNow = OffsetDateTime.now()

Checkpoint(
name = checkpointName,
@@ -101,6 +109,16 @@ class AtumContext private[agent] (
)
}

def createAndSaveCheckpointOnProvidedData(
checkpointName: String, author: String, measurements: Seq[Measurement]
): AtumContext = {
val checkpoint = createCheckpointOnProvidedData(checkpointName, author, measurements)
val checkpointDTO = checkpoint.toCheckpointDTO

agent.saveCheckpoint(checkpointDTO)
this
}

def addAdditionalData(key: String, value: String) = {
??? // TODO #60
}
@@ -168,9 +186,7 @@ object AtumContext {
* @return
*/
def createAndSaveCheckpoint(checkpointName: String, author: String)(implicit atumContext: AtumContext): DataFrame = {
val checkpoint = atumContext.createCheckpoint(checkpointName, author, df)
val checkpointDTO = checkpoint.toCheckpointDTO
atumContext.agent.saveCheckpoint(checkpointDTO)
atumContext.createAndSaveCheckpoint(checkpointName, author, df)
df
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.agent.exception

case class MeasurementProvidedException(msg: String) extends Exception(msg)
Original file line number Diff line number Diff line change
@@ -27,17 +27,12 @@ case class Checkpoint(
author: String,
measuredByAtumAgent: Boolean = false,
atumPartitions: AtumPartitions,
processStartTime: ZonedDateTime,
processEndTime: Option[ZonedDateTime],
processStartTime: OffsetDateTime,
processEndTime: Option[OffsetDateTime],
measurements: Seq[Measurement]
) {
private [agent] def toCheckpointDTO: CheckpointDTO = {
val measurementDTOs = measurements.map {
case provided: MeasurementProvided =>
MeasurementBuilder.buildMeasurementDTO(provided)
case byAtum: MeasurementByAtum =>
MeasurementBuilder.buildMeasurementDTO(byAtum)
}
val measurementDTOs = measurements.map(MeasurementBuilder.buildMeasurementDTO)

CheckpointDTO(
id = UUID.randomUUID(),
75 changes: 50 additions & 25 deletions agent/src/main/scala/za/co/absa/atum/agent/model/Measure.scala
Original file line number Diff line number Diff line change
@@ -34,93 +34,118 @@ sealed trait Measure extends MeasurementProcessor with MeasureType {
trait MeasureType {
val measureName: String
val onlyForNumeric: Boolean
val resultValueType: ResultValueType.ResultValueType
}

object Measure {

private val valueColumnName: String = "value"

val supportedMeasures: Seq[MeasureType] = Seq(
RecordCount, DistinctRecordCount, SumOfValuesOfColumn, AbsSumOfValuesOfColumn, SumOfHashesOfColumn
RecordCount,
DistinctRecordCount,
SumOfValuesOfColumn,
AbsSumOfValuesOfColumn,
SumOfHashesOfColumn
)
val supportedMeasureNames: Seq[String] = supportedMeasures.map(_.measureName)

case class RecordCount private (controlCol: String, measureName: String, onlyForNumeric: Boolean) extends Measure {
case class RecordCount private (
controlCol: String,
measureName: String,
onlyForNumeric: Boolean,
resultValueType: ResultValueType.ResultValueType
) extends Measure {

override def function: MeasurementFunction =
(ds: DataFrame) => {
val resultValue = ds.select(col(controlCol)).count().toString
ResultOfMeasurement(resultValue, ResultValueType.Long)
ResultOfMeasurement(resultValue, resultValueType)
}
}
object RecordCount extends MeasureType {
def apply(controlCol: String): RecordCount = {
RecordCount(controlCol, measureName, onlyForNumeric)
}
def apply(controlCol: String): RecordCount = RecordCount(controlCol, measureName, onlyForNumeric, resultValueType)

override val measureName: String = "count"
override val onlyForNumeric: Boolean = false
override val resultValueType: ResultValueType.ResultValueType = ResultValueType.Long
}

case class DistinctRecordCount private (controlCol: String, measureName: String, onlyForNumeric: Boolean)
extends Measure {
case class DistinctRecordCount private (
controlCol: String,
measureName: String,
onlyForNumeric: Boolean,
resultValueType: ResultValueType.ResultValueType
) extends Measure {

override def function: MeasurementFunction =
(ds: DataFrame) => {
val resultValue = ds.select(col(controlCol)).distinct().count().toString
ResultOfMeasurement(resultValue, ResultValueType.Long)
ResultOfMeasurement(resultValue, resultValueType)
}
}

object DistinctRecordCount extends MeasureType {
def apply(controlCol: String): DistinctRecordCount = {
DistinctRecordCount(controlCol, measureName, onlyForNumeric)
DistinctRecordCount(controlCol, measureName, onlyForNumeric, resultValueType)
}

override val measureName: String = "distinctCount"
override val onlyForNumeric: Boolean = false
override val resultValueType: ResultValueType.ResultValueType = ResultValueType.Long
}

case class SumOfValuesOfColumn private (controlCol: String, measureName: String, onlyForNumeric: Boolean)
extends Measure {
case class SumOfValuesOfColumn private (
controlCol: String,
measureName: String,
onlyForNumeric: Boolean,
resultValueType: ResultValueType.ResultValueType
) extends Measure {

override def function: MeasurementFunction = (ds: DataFrame) => {
val aggCol = sum(col(valueColumnName))
val resultValue = aggregateColumn(ds, controlCol, aggCol)
ResultOfMeasurement(resultValue, ResultValueType.BigDecimal)
ResultOfMeasurement(resultValue, resultValueType)
}
}

object SumOfValuesOfColumn extends MeasureType {
def apply(controlCol: String): SumOfValuesOfColumn = {
SumOfValuesOfColumn(controlCol, measureName, onlyForNumeric)
SumOfValuesOfColumn(controlCol, measureName, onlyForNumeric, resultValueType)
}

override val measureName: String = "aggregatedTotal"
override val onlyForNumeric: Boolean = true
override val resultValueType: ResultValueType.ResultValueType = ResultValueType.BigDecimal
}

case class AbsSumOfValuesOfColumn private (controlCol: String, measureName: String, onlyForNumeric: Boolean)
extends Measure {
case class AbsSumOfValuesOfColumn private (
controlCol: String,
measureName: String,
onlyForNumeric: Boolean,
resultValueType: ResultValueType.ResultValueType
) extends Measure {

override def function: MeasurementFunction = (ds: DataFrame) => {
val aggCol = sum(abs(col(valueColumnName)))
val resultValue = aggregateColumn(ds, controlCol, aggCol)
ResultOfMeasurement(resultValue, ResultValueType.Double)
ResultOfMeasurement(resultValue, resultValueType)
}
}

object AbsSumOfValuesOfColumn extends MeasureType {
def apply(controlCol: String): AbsSumOfValuesOfColumn = {
AbsSumOfValuesOfColumn(controlCol, measureName, onlyForNumeric)
AbsSumOfValuesOfColumn(controlCol, measureName, onlyForNumeric, resultValueType)
}

override val measureName: String = "absAggregatedTotal"
override val onlyForNumeric: Boolean = true
override val resultValueType: ResultValueType.ResultValueType = ResultValueType.Double
}

case class SumOfHashesOfColumn private (controlCol: String, measureName: String, onlyForNumeric: Boolean)
extends Measure {
case class SumOfHashesOfColumn private (
controlCol: String,
measureName: String,
onlyForNumeric: Boolean,
resultValueType: ResultValueType.ResultValueType
) extends Measure {

override def function: MeasurementFunction = (ds: DataFrame) => {

@@ -133,14 +158,14 @@ object Measure {
ResultOfMeasurement(resultValue, ResultValueType.String)
}
}

object SumOfHashesOfColumn extends MeasureType {
def apply(controlCol: String): SumOfHashesOfColumn = {
SumOfHashesOfColumn(controlCol, measureName, onlyForNumeric)
SumOfHashesOfColumn(controlCol, measureName, onlyForNumeric, resultValueType)
}

override val measureName: String = "hashCrc32"
override val onlyForNumeric: Boolean = false
override val resultValueType: ResultValueType.ResultValueType = ResultValueType.String
}

private def aggregateColumn(
Original file line number Diff line number Diff line change
@@ -16,22 +16,62 @@

package za.co.absa.atum.agent.model

import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType.ResultValueType
import za.co.absa.atum.agent.exception.MeasurementProvidedException
import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType

trait Measurement {
val measure: Measure
val result: Any
val resultValue: Any
val resultType: ResultValueType.ResultValueType
}

/**
* When the application/user of Atum Agent provides actual results by himself, the type is precise and we don't need
* to do any adjustments.
*/
case class MeasurementProvided(measure: Measure, result: Any) extends Measurement
case class MeasurementProvided[T](measure: Measure, resultValue: T, resultType: ResultValueType.ResultValueType)
extends Measurement

object MeasurementProvided {

def handleSpecificType[T](
measure: Measure, resultValue: T, requiredType: ResultValueType.ResultValueType
): MeasurementProvided[T] = {

val actualType = measure.resultValueType
if (actualType != requiredType)
throw MeasurementProvidedException(
s"Type of a given provided measurement result and type that a given measure supports are not compatible! " +
s"Got $actualType but should be $requiredType"
)
MeasurementProvided[T](measure, resultValue, requiredType)
}

def apply[T](measure: Measure, resultValue: T): Measurement = {
resultValue match {
case l: Long =>
handleSpecificType[Long](measure, l, ResultValueType.Long)
case d: Double =>
handleSpecificType[Double](measure, d, ResultValueType.Double)
case bd: BigDecimal =>
handleSpecificType[BigDecimal](measure, bd, ResultValueType.BigDecimal)
case s: String =>
handleSpecificType[String](measure, s, ResultValueType.String)

case unsupportedType =>
val className = unsupportedType.getClass.getSimpleName
throw MeasurementProvidedException(
s"Unsupported type of measurement for measure ${measure.measureName}: $className " +
s"for provided result: $resultValue"
)
}
}
}

/**
* When the Atum Agent itself performs the measurements, using Spark, then in some cases some adjustments are
* needed - thus we are converting the results to strings always - but we need to keep the information about
* the actual type as well.
*/
case class MeasurementByAtum(measure: Measure, result: String, resultType: ResultValueType) extends Measurement
case class MeasurementByAtum(measure: Measure, resultValue: String, resultType: ResultValueType.ResultValueType)
extends Measurement
Loading