Skip to content

Commit

Permalink
#87: DTOs for measures and measure results, MeasurementBuilder, Agent…
Browse files Browse the repository at this point in the history
… refactored
  • Loading branch information
salamonpavel committed Oct 16, 2023
1 parent 2df96ab commit 6a9d5d3
Show file tree
Hide file tree
Showing 18 changed files with 113 additions and 105 deletions.
15 changes: 9 additions & 6 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package za.co.absa.atum.agent
import com.typesafe.config.{Config, ConfigFactory}
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.dispatcher.{ConsoleDispatcher, HttpDispatcher}
import za.co.absa.atum.model.dto.CheckpointDTO
import za.co.absa.atum.model.dto.{CheckpointDTO, PartitioningDTO}

/**
* Place holder for the agent that communicate with the API.
Expand Down Expand Up @@ -47,17 +47,20 @@ class AtumAgent private[agent] () {
* @return
*/
def getOrCreateAtumContext(atumPartitions: AtumPartitions): AtumContext = {
val atumContextDTO = dispatcher.getOrCreateAtumContext(AtumPartitions.toPartitioning(atumPartitions), None)
val partitioningDTO = PartitioningDTO(AtumPartitions.toSeqPartitionDTO(atumPartitions), None)
val atumContextDTO = dispatcher.getOrCreateAtumContext(partitioningDTO)
lazy val atumContext = AtumContext.fromDTO(atumContextDTO, this)
getExistingOrNewContext(atumPartitions, atumContext)
}

def getOrCreateAtumSubContext(subPartitions: AtumPartitions)(implicit parentAtumContext: AtumContext): AtumContext = {
val newPartitions: AtumPartitions = parentAtumContext.atumPartitions ++ subPartitions
val atumContextDTO = dispatcher.getOrCreateAtumContext(
AtumPartitions.toPartitioning(newPartitions),
Some(AtumPartitions.toPartitioning(parentAtumContext.atumPartitions))
)

val newPartitionsDTO = AtumPartitions.toSeqPartitionDTO(newPartitions)
val maybeParentPartitionsDTO = Some(AtumPartitions.toSeqPartitionDTO(parentAtumContext.atumPartitions))
val partitioningDTO = PartitioningDTO(newPartitionsDTO, maybeParentPartitionsDTO)

val atumContextDTO = dispatcher.getOrCreateAtumContext(partitioningDTO)
lazy val atumContext = AtumContext.fromDTO(atumContextDTO, this)
getExistingOrNewContext(newPartitions, atumContext)
}
Expand Down
11 changes: 5 additions & 6 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ package za.co.absa.atum.agent
import org.apache.spark.sql.DataFrame
import za.co.absa.atum.agent.model.{Measure, MeasuresMapper}
import AtumContext.AtumPartitions
import za.co.absa.atum.model.{Partition, Partitioning}
import za.co.absa.atum.model.dto.AtumContextDTO
import za.co.absa.atum.model.dto.{AtumContextDTO, PartitionDTO}

import scala.collection.immutable.ListMap

Expand Down Expand Up @@ -90,12 +89,12 @@ object AtumContext {
ListMap(elems:_*)
}

private[agent] def toPartitioning(atumPartitions: AtumPartitions): Partitioning = {
Partitioning(atumPartitions.toSeq.map{ case (key, value) => Partition(key, value) })
private[agent] def toSeqPartitionDTO(atumPartitions: AtumPartitions): Seq[PartitionDTO] = {
atumPartitions.map { case (key, value) => PartitionDTO(key, value) }.toSeq
}

private[agent] def fromPartitioning(partitioning: Partitioning): AtumPartitions = {
AtumPartitions(partitioning.partitioning.map(partition => partition.key -> partition.value))
private[agent] def fromPartitioning(partitioning: Seq[PartitionDTO]): AtumPartitions = {
AtumPartitions(partitioning.map(partition => partition.key -> partition.value))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@
package za.co.absa.atum.agent.dispatcher

import org.apache.spark.internal.Logging
import za.co.absa.atum.model.Partitioning
import za.co.absa.atum.model.dto.{AtumContextDTO, CheckpointDTO}
import za.co.absa.atum.model.dto.{AtumContextDTO, CheckpointDTO, PartitioningDTO}

/**
* dispatcher useful for development, testing and debugging
Expand All @@ -27,12 +26,9 @@ class ConsoleDispatcher extends Dispatcher with Logging {

logInfo("using console dispatcher")

override def getOrCreateAtumContext(
partitioning: Partitioning,
parentPartitioning: Option[Partitioning]
): AtumContextDTO = {
println(s"Fetching AtumContext using ConsoleDispatcher with $partitioning and $parentPartitioning")
AtumContextDTO(partitioning = partitioning)
override def getOrCreateAtumContext(partitioning: PartitioningDTO): AtumContextDTO = {
println(s"Fetching AtumContext using ConsoleDispatcher with partitioning $partitioning")
AtumContextDTO(partitioning = partitioning.partitioning)
}
override def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
println(s"Saving checkpoint to server. $checkpoint")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@

package za.co.absa.atum.agent.dispatcher

import za.co.absa.atum.model.Partitioning
import za.co.absa.atum.model.dto.{AtumContextDTO, CheckpointDTO}
import za.co.absa.atum.model.dto.{AtumContextDTO, CheckpointDTO, PartitioningDTO}

trait Dispatcher {
def getOrCreateAtumContext(partitioning: Partitioning, parentPartitioning: Option[Partitioning]): AtumContextDTO
def getOrCreateAtumContext(partitioning: PartitioningDTO): AtumContextDTO

def saveCheckpoint(checkpoint: CheckpointDTO): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import com.typesafe.config.Config
import org.apache.spark.internal.Logging
import sttp.client3._
import sttp.model.Uri
import za.co.absa.atum.model.Partitioning
import za.co.absa.atum.model.dto.{AtumContextDTO, CheckpointDTO, PartitioningDTO}

class HttpDispatcher(config: Config) extends Dispatcher with Logging {
Expand All @@ -31,17 +30,13 @@ class HttpDispatcher(config: Config) extends Dispatcher with Logging {
logInfo("using http dispatcher")
logInfo(s"serverUri $serverUri")

override def getOrCreateAtumContext(
partitioning: Partitioning,
parentPartitioning: Option[Partitioning]
): AtumContextDTO = {
val partitioningDTO = PartitioningDTO(partitioning, parentPartitioning)
override def getOrCreateAtumContext(partitioning: PartitioningDTO): AtumContextDTO = {
basicRequest
.body(s"$partitioningDTO")
.body(s"$partitioning")
.post(serverUri)
.send(backend)

AtumContextDTO(partitioning = partitioning) // todo: implement request
AtumContextDTO(partitioning = partitioning.partitioning) // todo: implement request
}

override def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package za.co.absa.atum.agent.model

import za.co.absa.atum.model.dto.{MeasureDTO, MeasureResultDTO, MeasurementDTO}
import za.co.absa.atum.model.dto.MeasureResultDTO.{ResultValueType, TypedValue}

object MeasurementBuilder {

def buildLongMeasurement(functionName: String, controlCols: Seq[String], resultValue: Long): MeasurementDTO = {
MeasurementDTO(
MeasureDTO(functionName, controlCols),
MeasureResultDTO(TypedValue(resultValue.toString, ResultValueType.Long))
)
}

def buildDoubleMeasureResult(functionName: String, controlCols: Seq[String], resultValue: Double): MeasurementDTO = {
MeasurementDTO(
MeasureDTO(functionName, controlCols),
MeasureResultDTO(TypedValue(resultValue.toString, ResultValueType.Double))
)
}

def buildBigDecimalMeasureResult(functionName: String, controlCols: Seq[String], resultValue: BigDecimal): MeasurementDTO = {
MeasurementDTO(
MeasureDTO(functionName, controlCols),
MeasureResultDTO(TypedValue(resultValue.toString, ResultValueType.BigDecimal))
)
}

def buildStringMeasureResult(functionName: String, controlCols: Seq[String], resultValue: String): MeasurementDTO = {
MeasurementDTO(
MeasureDTO(functionName, controlCols),
MeasureResultDTO(TypedValue(resultValue, ResultValueType.String))
)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,26 @@
package za.co.absa.atum.agent.model

import za.co.absa.atum.agent.model.Measure._
import za.co.absa.atum.model.dto

case class UnsupportedMeasureException(msg: String) extends Exception(msg)

object MeasuresMapper {

def mapToMeasures(measures: Set[za.co.absa.atum.model.Measure]): Set[za.co.absa.atum.agent.model.Measure] = {
def mapToMeasures(measures: Set[dto.MeasureDTO]): Set[za.co.absa.atum.agent.model.Measure] = {
measures.map(createMeasure)
}

private def createMeasure(measure: za.co.absa.atum.model.Measure): za.co.absa.atum.agent.model.Measure = {
private def createMeasure(measure: dto.MeasureDTO): za.co.absa.atum.agent.model.Measure = {
val controlColumn = measure.controlColumns.head
measure.functionName match {
case "RecordCount" => RecordCount(controlColumn)
case "DistinctRecordCount" => DistinctRecordCount(controlColumn)
case "SumOfValuesOfColumn" => SumOfValuesOfColumn(controlColumn)
case "AbsSumOfValuesOfColumn" => AbsSumOfValuesOfColumn(controlColumn)
case "SumOfHashesOfColumn" => SumOfHashesOfColumn(controlColumn)
case unsupportedMeasure => throw UnsupportedMeasureException(s"Measure not supported: $unsupportedMeasure")
case RecordCount.getClass.getSimpleName => RecordCount(controlColumn)
case DistinctRecordCount.getClass.getSimpleName => DistinctRecordCount(controlColumn)
case SumOfValuesOfColumn.getClass.getSimpleName => SumOfValuesOfColumn(controlColumn)
case AbsSumOfValuesOfColumn.getClass.getSimpleName => AbsSumOfValuesOfColumn(controlColumn)
case SumOfHashesOfColumn.getClass.getSimpleName => SumOfHashesOfColumn(controlColumn)
case unsupportedMeasure =>
throw UnsupportedMeasureException(s"Measure not supported: $unsupportedMeasure")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import za.co.absa.atum.agent.AtumContext.{AtumPartitions, DatasetWrapper}
import za.co.absa.atum.agent.model.Measure._
import za.co.absa.spark.commons.test.SparkTestBase

class MeasureTest extends AnyFlatSpec with Matchers with SparkTestBase { self =>
class MeasureDTOTest extends AnyFlatSpec with Matchers with SparkTestBase { self =>

"Measure" should "be based on the dataframe" in {

Expand Down
21 changes: 0 additions & 21 deletions model/src/main/scala/za/co/absa/atum/model/MeasureResult.scala

This file was deleted.

21 changes: 0 additions & 21 deletions model/src/main/scala/za/co/absa/atum/model/Partitioning.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
* limitations under the License.
*/

package za.co.absa.atum.model
package za.co.absa.atum.model.dto

case class AdditionalData(
case class AdditionalDataDTO(
additionalData: Map[String, Option[String]]
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@

package za.co.absa.atum.model.dto

import za.co.absa.atum.model._

case class AtumContextDTO(
partitioning: Partitioning,
measures: Set[Measure] = Set.empty,
additionalData: AdditionalData = AdditionalData(additionalData = Map.empty)
partitioning: Seq[PartitionDTO],
measures: Set[MeasureDTO] = Set.empty,
additionalData: AdditionalDataDTO = AdditionalDataDTO(additionalData = Map.empty)
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package za.co.absa.atum.model.dto

import za.co.absa.atum.model._

import java.time.ZonedDateTime
import java.util.UUID

Expand All @@ -26,8 +24,8 @@ case class CheckpointDTO(
name: String,
author: String,
measuredByAtumAgent: Boolean = false,
partitioning: Partitioning,
partitioning: Seq[PartitionDTO],
processStartTime: ZonedDateTime,
processEndTime: Option[ZonedDateTime],
measurements: Seq[Measurement]
measurements: Seq[MeasurementDTO]
)
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
* limitations under the License.
*/

package za.co.absa.atum.model
package za.co.absa.atum.model.dto

case class Measure(
case class MeasureDTO(
functionName: String,
controlColumns: Seq[String]
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package za.co.absa.atum.model.dto

import com.fasterxml.jackson.core.`type`.TypeReference
import com.fasterxml.jackson.module.scala.JsonScalaEnumeration
import za.co.absa.atum.model.dto.MeasureResultDTO.TypedValue

case class MeasureResultDTO(
mainValue: TypedValue,
supportValues: Map[String, TypedValue] = Map.empty
)


object MeasureResultDTO {
case class TypedValue(
value: String,
@JsonScalaEnumeration(classOf[ResultValueType]) valueType: ResultValueType.ResultValueType
)

class ResultValueType extends TypeReference[ResultValueType.type]

object ResultValueType extends Enumeration {
type ResultValueType = Value
val String, Long, BigDecimal, Double = Value
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
* limitations under the License.
*/

package za.co.absa.atum.model
package za.co.absa.atum.model.dto

case class Measurement(
measure: Measure,
result: MeasureResult[String]
case class MeasurementDTO(
measure: MeasureDTO,
result: MeasureResultDTO
)
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
* limitations under the License.
*/

package za.co.absa.atum.model
package za.co.absa.atum.model.dto

case class Partition(
case class PartitionDTO(
key: String,
value: String
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@

package za.co.absa.atum.model.dto

import za.co.absa.atum.model.Partitioning

case class PartitioningDTO (
partitioning: Partitioning,
parentPartitioning: Option[Partitioning]
partitioning: Seq[PartitionDTO],
parentPartitioning: Option[Seq[PartitionDTO]]
)

0 comments on commit 6a9d5d3

Please sign in to comment.