Skip to content

Commit

Permalink
#26 add create checkpoint function (#79)
Browse files Browse the repository at this point in the history
* #26 add checkpoint creation function

* #26: finishing bear minimum of agent side

* #26: post-review improvements and refactoring

* #26: extra validation, putting emphasis on types, some generics, more unit tests, refactoring & simplification

* #26: functionName -> measureName, to be consistent

* #26: post-review changes

* #26: fixing unit test

* #26: removing redundant internal Agent's Checkpoint model and some refactoring

* #26: removing onlyForNumeric as it's not currently used and we might do this differently

* #26: post-review changes

---------

Co-authored-by: Ladislav Sulak <[email protected]>
  • Loading branch information
cerveada and lsulak authored Nov 1, 2023
1 parent 7681d9d commit 0942d07
Show file tree
Hide file tree
Showing 20 changed files with 420 additions and 236 deletions.
2 changes: 1 addition & 1 deletion LICENSE.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,4 @@
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
limitations under the License.
18 changes: 5 additions & 13 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ package za.co.absa.atum.agent
import com.typesafe.config.{Config, ConfigFactory}
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.dispatcher.{ConsoleDispatcher, HttpDispatcher}
import za.co.absa.atum.agent.model.Checkpoint
import za.co.absa.atum.model.dto.{CheckpointDTO, PartitioningDTO}

/**
* Place holder for the agent that communicate with the API.
* Entity that communicate with the API, primarily focused on spawning Atum Context(s).
*/
class AtumAgent private[agent] () {

Expand All @@ -36,23 +35,16 @@ class AtumAgent private[agent] () {

/**
* Sends `CheckpointDTO` to the AtumService API
* @param checkpoint
*/
def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
dispatcher.saveCheckpoint(checkpoint)
}

/**
* Sends `Checkpoint` to the AtumService API
*
* @param checkpoint
* @param checkpoint Already initialized Checkpoint object to store
*/
def saveCheckpoint(checkpoint: Checkpoint): Unit = {
dispatcher.saveCheckpoint(checkpoint.toCheckpointDTO)
private [agent] def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
dispatcher.saveCheckpoint(checkpoint)
}

/**
* Provides an AtumContext given a `AtumPartitions` instance. Retrieves the data from AtumService API.
*
* @param atumPartitions
* @return
*/
Expand Down
57 changes: 42 additions & 15 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ package za.co.absa.atum.agent

import org.apache.spark.sql.DataFrame
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.model.{Checkpoint, Measure, Measurement, MeasuresMapper}
import za.co.absa.atum.model.dto.{AtumContextDTO, PartitionDTO}
import za.co.absa.atum.agent.model.Measurement.MeasurementByAtum
import za.co.absa.atum.agent.model._
import za.co.absa.atum.model.dto._

import java.time.OffsetDateTime
import java.util.UUID
import scala.collection.immutable.ListMap

/**
Expand All @@ -43,24 +45,48 @@ class AtumContext private[agent] (
agent.getOrCreateAtumSubContext(atumPartitions ++ subPartitions)(this)
}

def createCheckpoint(checkpointName: String, author: String, dataToMeasure: DataFrame) = {
??? // TODO #26
private def takeMeasurements(df: DataFrame): Set[MeasurementByAtum] = {
measures.map { m =>
val measurementResult = m.function(df)
MeasurementByAtum(m, measurementResult.result, measurementResult.resultType)
}
}

def createCheckpoint(checkpointName: String, author: String, dataToMeasure: DataFrame): AtumContext = {
val startTime = OffsetDateTime.now()
val measurements = takeMeasurements(dataToMeasure)
val endTime = OffsetDateTime.now()

val checkpointDTO = CheckpointDTO(
id = UUID.randomUUID(),
name = checkpointName,
author = author,
measuredByAtumAgent = true,
partitioning = AtumPartitions.toSeqPartitionDTO(this.atumPartitions),
processStartTime = startTime,
processEndTime = Some(endTime),
measurements = measurements.map(MeasurementBuilder.buildMeasurementDTO).toSeq
)

agent.saveCheckpoint(checkpointDTO)
this
}

def createCheckpointOnProvidedData(
checkpointName: String,
author: String,
measurements: Seq[Measurement]
): Checkpoint = {
def createCheckpointOnProvidedData(checkpointName: String, author: String, measurements: Seq[Measurement]): AtumContext = {
val offsetDateTimeNow = OffsetDateTime.now()
Checkpoint(

val checkpointDTO = CheckpointDTO(
id = UUID.randomUUID(),
name = checkpointName,
author = author,
atumPartitions = this.atumPartitions,
partitioning = AtumPartitions.toSeqPartitionDTO(this.atumPartitions),
processStartTime = offsetDateTimeNow,
processEndTime = Some(offsetDateTimeNow),
measurements = measurements
measurements = measurements.map(MeasurementBuilder.buildMeasurementDTO)
)

agent.saveCheckpoint(checkpointDTO)
this
}

def addAdditionalData(key: String, value: String): Unit = {
Expand Down Expand Up @@ -121,21 +147,22 @@ object AtumContext {
new AtumContext(
AtumPartitions.fromPartitioning(atumContextDTO.partitioning),
agent,
MeasuresMapper.mapToMeasures(atumContextDTO.measures),
MeasuresBuilder.mapToMeasures(atumContextDTO.measures),
atumContextDTO.additionalData.additionalData
)
}

implicit class DatasetWrapper(df: DataFrame) {

/**
* Set a point in the pipeline to execute calculation.
* Set a point in the pipeline to execute calculation and store it.
* @param checkpointName The key assigned to this checkpoint
* @param author Author of the checkpoint
* @param atumContext Contains the calculations to be done and publish the result
* @return
*/
def createCheckpoint(checkpointName: String, author: String)(implicit atumContext: AtumContext): DataFrame = {
// todo: implement checkpoint creation
atumContext.createCheckpoint(checkpointName, author, df)
df
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package za.co.absa.atum.agent.core

import org.apache.spark.sql.DataFrame
import za.co.absa.atum.agent.core.MeasurementProcessor.MeasurementFunction
import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType

trait MeasurementProcessor {

Expand All @@ -26,6 +27,13 @@ trait MeasurementProcessor {
}

object MeasurementProcessor {
type MeasurementFunction = DataFrame => String
/**
* The raw result of measurement is always gonna be string, because we want to avoid some floating point issues
* (overflows, consistent representation of numbers - whether they are coming from Java or Scala world, and more),
* but the actual type is stored alongside the computation because we don't want to lost this information.
*/
final case class ResultOfMeasurement(result: String, resultType: ResultValueType.ResultValueType)

type MeasurementFunction = DataFrame => ResultOfMeasurement

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ class ConsoleDispatcher extends Dispatcher with Logging {
override def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
println(s"Saving checkpoint to server. $checkpoint")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,5 @@ class HttpDispatcher(config: Config) extends Dispatcher with Logging {
.post(serverUri)
.send(backend)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,7 @@

package za.co.absa.atum.agent.exception

case class UnsupportedMeasureException(msg: String) extends Exception(msg)
sealed abstract class AtumAgentException extends Exception

case class MeasurementProvidedException(msg: String) extends AtumAgentException
case class MeasureException(msg: String) extends AtumAgentException

This file was deleted.

46 changes: 0 additions & 46 deletions agent/src/main/scala/za/co/absa/atum/agent/model/Checkpoint.scala

This file was deleted.

Loading

0 comments on commit 0942d07

Please sign in to comment.