Skip to content

Commit

Permalink
Feature/54 checkpoint from server (#86)
Browse files Browse the repository at this point in the history
* AtumAgent fetches AtumContext from the server with Dispatcher using AtumPartitions and optional parent AtumPartitions
  • Loading branch information
salamonpavel authored Oct 16, 2023
1 parent 4a6de45 commit 2df96ab
Show file tree
Hide file tree
Showing 8 changed files with 164 additions and 20 deletions.
29 changes: 18 additions & 11 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import za.co.absa.atum.model.dto.CheckpointDTO
/**
* Place holder for the agent that communicate with the API.
*/
class AtumAgent private() {
class AtumAgent private[agent] () {

val config: Config = ConfigFactory.load()

Expand All @@ -47,25 +47,32 @@ class AtumAgent private() {
* @return
*/
def getOrCreateAtumContext(atumPartitions: AtumPartitions): AtumContext = {
contexts.getOrElse(atumPartitions, new AtumContext(atumPartitions, this))
val atumContextDTO = dispatcher.getOrCreateAtumContext(AtumPartitions.toPartitioning(atumPartitions), None)
lazy val atumContext = AtumContext.fromDTO(atumContextDTO, this)
getExistingOrNewContext(atumPartitions, atumContext)
}

def getOrCreateAtumSubContext(subPartitions: AtumPartitions)(implicit parentAtumContext: AtumContext): AtumContext = {
val newPartitions: AtumPartitions = parentAtumContext.atumPartitions ++ subPartitions
getContextOrElse(newPartitions, parentAtumContext.copy(atumPartitions = newPartitions, agent = this))
val atumContextDTO = dispatcher.getOrCreateAtumContext(
AtumPartitions.toPartitioning(newPartitions),
Some(AtumPartitions.toPartitioning(parentAtumContext.atumPartitions))
)
lazy val atumContext = AtumContext.fromDTO(atumContextDTO, this)
getExistingOrNewContext(newPartitions, atumContext)
}

private def getContextOrElse(atumPartitions: AtumPartitions, creationMethod: =>AtumContext): AtumContext = {
synchronized{
contexts.getOrElse(atumPartitions, {
val result = creationMethod
contexts = contexts + (atumPartitions -> result)
result
})
private def getExistingOrNewContext(atumPartitions: AtumPartitions, newAtumContext: => AtumContext): AtumContext = {
synchronized {
contexts.getOrElse(
atumPartitions, {
contexts = contexts + (atumPartitions -> newAtumContext)
newAtumContext
}
)
}
}


private[this] var contexts: Map[AtumPartitions, AtumContext] = Map.empty

}
Expand Down
24 changes: 23 additions & 1 deletion agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package za.co.absa.atum.agent

import org.apache.spark.sql.DataFrame
import za.co.absa.atum.agent.model.Measure
import za.co.absa.atum.agent.model.{Measure, MeasuresMapper}
import AtumContext.AtumPartitions
import za.co.absa.atum.model.{Partition, Partitioning}
import za.co.absa.atum.model.dto.AtumContextDTO

import scala.collection.immutable.ListMap

Expand Down Expand Up @@ -83,6 +85,26 @@ object AtumContext {
def apply(elems: (String, String)): AtumPartitions = {
ListMap(elems)
}

def apply(elems: Seq[(String, String)]): AtumPartitions = {
ListMap(elems:_*)
}

private[agent] def toPartitioning(atumPartitions: AtumPartitions): Partitioning = {
Partitioning(atumPartitions.toSeq.map{ case (key, value) => Partition(key, value) })
}

private[agent] def fromPartitioning(partitioning: Partitioning): AtumPartitions = {
AtumPartitions(partitioning.partitioning.map(partition => partition.key -> partition.value))
}
}

private[agent] def fromDTO(atumContextDTO: AtumContextDTO, agent: AtumAgent): AtumContext = {
new AtumContext(
AtumPartitions.fromPartitioning(atumContextDTO.partitioning),
agent,
MeasuresMapper.mapToMeasures(atumContextDTO.measures)
)
}

implicit class DatasetWrapper(df: DataFrame) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ class ConsoleDispatcher extends Dispatcher with Logging {

logInfo("using console dispatcher")

override def fetchAtumContext(
override def getOrCreateAtumContext(
partitioning: Partitioning,
parentPartitioning: Option[Partitioning]
): Option[AtumContextDTO] = {
): AtumContextDTO = {
println(s"Fetching AtumContext using ConsoleDispatcher with $partitioning and $parentPartitioning")
None
AtumContextDTO(partitioning = partitioning)
}
override def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
println(s"Saving checkpoint to server. $checkpoint")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.internal.Logging
import sttp.client3._
import sttp.model.Uri
import za.co.absa.atum.model.Partitioning
import za.co.absa.atum.model.dto.{AtumContextDTO, CheckpointDTO}
import za.co.absa.atum.model.dto.{AtumContextDTO, CheckpointDTO, PartitioningDTO}

class HttpDispatcher(config: Config) extends Dispatcher with Logging {

Expand All @@ -31,11 +31,17 @@ class HttpDispatcher(config: Config) extends Dispatcher with Logging {
logInfo("using http dispatcher")
logInfo(s"serverUri $serverUri")

override def fetchAtumContext(
override def getOrCreateAtumContext(
partitioning: Partitioning,
parentPartitioning: Option[Partitioning]
): Option[AtumContextDTO] = {
???
): AtumContextDTO = {
val partitioningDTO = PartitioningDTO(partitioning, parentPartitioning)
basicRequest
.body(s"$partitioningDTO")
.post(serverUri)
.send(backend)

AtumContextDTO(partitioning = partitioning) // todo: implement request
}

override def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.agent.model

import za.co.absa.atum.agent.model.Measure._

case class UnsupportedMeasureException(msg: String) extends Exception(msg)

object MeasuresMapper {

def mapToMeasures(measures: Set[za.co.absa.atum.model.Measure]): Set[za.co.absa.atum.agent.model.Measure] = {
measures.map(createMeasure)
}

private def createMeasure(measure: za.co.absa.atum.model.Measure): za.co.absa.atum.agent.model.Measure = {
val controlColumn = measure.controlColumns.head
measure.functionName match {
case "RecordCount" => RecordCount(controlColumn)
case "DistinctRecordCount" => DistinctRecordCount(controlColumn)
case "SumOfValuesOfColumn" => SumOfValuesOfColumn(controlColumn)
case "AbsSumOfValuesOfColumn" => AbsSumOfValuesOfColumn(controlColumn)
case "SumOfHashesOfColumn" => SumOfHashesOfColumn(controlColumn)
case unsupportedMeasure => throw UnsupportedMeasureException(s"Measure not supported: $unsupportedMeasure")
}
}

}
44 changes: 44 additions & 0 deletions agent/src/test/scala/za/co/absa/atum/agent/AtumAgentTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.agent

import org.scalatest.funsuite.AnyFunSuiteLike
import za.co.absa.atum.agent.AtumContext.AtumPartitions

class AtumAgentTest extends AnyFunSuiteLike {

test("AtumAgent creates AtumContext(s) as expected") {
val atumAgent = new AtumAgent()
val atumPartitions = AtumPartitions("abc" -> "def")
val subPartitions = AtumPartitions("ghi", "jkl")

val atumContext1 = atumAgent.getOrCreateAtumContext(atumPartitions)
val atumContext2 = atumAgent.getOrCreateAtumContext(atumPartitions)

// AtumAgent returns expected instance of AtumContext
assert(atumAgent.getOrCreateAtumContext(atumPartitions) == atumContext1)
assert(atumContext1 == atumContext2)

// AtumSubContext contains expected AtumPartitions
val atumSubContext = atumAgent.getOrCreateAtumSubContext(subPartitions)(atumContext1)
assert(atumSubContext.atumPartitions == (atumPartitions ++ subPartitions))

// AtumContext contains reference to expected AtumAgent
assert(atumSubContext.agent == atumAgent)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class AtumContextTest extends AnyFlatSpec with Matchers {

"withMeasureAddedOrOverwritten" should "add a new measure if not exists, overwrite it otherwise" in {

val atumContext = AtumAgent.getOrCreateAtumContext(AtumPartitions("foo"->"bar"))
val atumContext = AtumAgent.getOrCreateAtumContext(AtumPartitions("foo1"->"bar"))

assert(atumContext.currentMeasures.isEmpty)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.model.dto

import za.co.absa.atum.model.Partitioning

case class PartitioningDTO (
partitioning: Partitioning,
parentPartitioning: Option[Partitioning]
)

0 comments on commit 2df96ab

Please sign in to comment.