Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(only idea!) #94 measure validation on column #95

Closed
wants to merge 25 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
1d08ad8
#26 add checkpoint creation function
cerveada Oct 12, 2023
19be511
Merge branch 'master' into feature/26-checkpoint-creation-function
cerveada Oct 20, 2023
1c96dab
Merge remote-tracking branch 'origin/master' into feature/26-checkpoi…
lsulak Oct 23, 2023
02d829e
#26: finishing bear minimum of agent side
lsulak Oct 24, 2023
7fdfa56
#94: measure validation for a given column and small refactoring
lsulak Oct 25, 2023
e035c84
#26: post-review improvements and refactoring
lsulak Oct 26, 2023
93ee090
#26: extra validation, putting emphasis on types, some generics, more…
lsulak Oct 26, 2023
bc7eb58
Merge remote-tracking branch 'origin/master' into feature/26-checkpoi…
lsulak Oct 26, 2023
149a624
#26: functionName -> measureName, to be consistent
lsulak Oct 26, 2023
fab926d
Merge remote-tracking branch 'origin/feature/26-checkpoint-creation-f…
lsulak Oct 26, 2023
9730fa3
Merge branch 'master' into feature/26-checkpoint-creation-function
lsulak Oct 27, 2023
108b9e6
Merge branch 'feature/26-checkpoint-creation-function' into feature/9…
lsulak Oct 27, 2023
ecd7731
#94: refactoring and fully fixing build
lsulak Oct 27, 2023
cf9dc69
#94: removing redundant import
lsulak Oct 27, 2023
f123b6f
#26: post-review changes
lsulak Oct 27, 2023
f2928f5
#26: fixing unit test
lsulak Oct 28, 2023
c2c3405
#26: removing redundant internal Agent's Checkpoint model and some re…
lsulak Oct 28, 2023
4f0990a
Merge remote-tracking branch 'origin/feature/26-checkpoint-creation-f…
lsulak Oct 28, 2023
55acd58
#94: fixing unit tests, post merge conflicts
lsulak Oct 28, 2023
9c1f428
#94: fixing unit tests, post merge conflicts
lsulak Oct 28, 2023
a31fd04
Merge remote-tracking branch 'origin/master' into feature/26-checkpoi…
lsulak Oct 30, 2023
9be5528
Merge remote-tracking branch 'origin/feature/26-checkpoint-creation-f…
lsulak Oct 30, 2023
46ad700
#26: removing onlyForNumeric as it's not currently used and we might …
lsulak Oct 31, 2023
d60a872
Merge branch 'feature/26-checkpoint-creation-function' into feature/9…
lsulak Oct 31, 2023
89e645a
#94: adding back here, but I know that it might be removed soon
lsulak Oct 31, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion LICENSE.md
Original file line number Diff line number Diff line change
Expand Up @@ -199,4 +199,4 @@
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
limitations under the License.
18 changes: 5 additions & 13 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ package za.co.absa.atum.agent
import com.typesafe.config.{Config, ConfigFactory}
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.dispatcher.{ConsoleDispatcher, HttpDispatcher}
import za.co.absa.atum.agent.model.Checkpoint
import za.co.absa.atum.model.dto.{CheckpointDTO, PartitioningDTO}

/**
* Place holder for the agent that communicate with the API.
* Entity that communicate with the API, primarily focused on spawning Atum Context(s).
*/
class AtumAgent private[agent] () {

Expand All @@ -36,23 +35,16 @@ class AtumAgent private[agent] () {

/**
* Sends `CheckpointDTO` to the AtumService API
* @param checkpoint
*/
def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
dispatcher.saveCheckpoint(checkpoint)
}

/**
* Sends `Checkpoint` to the AtumService API
*
* @param checkpoint
* @param checkpoint Already initialized Checkpoint object to store
*/
def saveCheckpoint(checkpoint: Checkpoint): Unit = {
dispatcher.saveCheckpoint(checkpoint.toCheckpointDTO)
private [agent] def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
dispatcher.saveCheckpoint(checkpoint)
}

/**
* Provides an AtumContext given a `AtumPartitions` instance. Retrieves the data from AtumService API.
*
* @param atumPartitions
* @return
*/
Expand Down
61 changes: 45 additions & 16 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

package za.co.absa.atum.agent

import org.slf4s.Logging
import org.apache.spark.sql.DataFrame
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.model.{Checkpoint, Measure, Measurement, MeasuresMapper}
import za.co.absa.atum.model.dto.{AtumContextDTO, PartitionDTO}
import za.co.absa.atum.agent.model._
import za.co.absa.atum.model.dto._

import java.time.OffsetDateTime
import java.util.UUID
import scala.collection.immutable.ListMap

/**
Expand All @@ -35,32 +37,58 @@ class AtumContext private[agent] (
val agent: AtumAgent,
private var measures: Set[Measure] = Set.empty,
private var additionalData: Map[String, Option[String]] = Map.empty
) {
) extends Logging {

def currentMeasures: Set[Measure] = measures

def subPartitionContext(subPartitions: AtumPartitions): AtumContext = {
agent.getOrCreateAtumSubContext(atumPartitions ++ subPartitions)(this)
}

def createCheckpoint(checkpointName: String, author: String, dataToMeasure: DataFrame) = {
??? // TODO #26
private def takeMeasurements(df: DataFrame): Set[MeasurementByAtum] = {
measures.map { m =>
m.validateMeasureApplicability(df)

val measurementResult = m.function(df)
MeasurementByAtum(m, measurementResult.resultValue, measurementResult.resultType)
}
}

def createCheckpointOnProvidedData(
checkpointName: String,
author: String,
measurements: Seq[Measurement]
): Checkpoint = {
def createCheckpoint(checkpointName: String, author: String, dataToMeasure: DataFrame): AtumContext = {
val startTime = OffsetDateTime.now()
val measurements = takeMeasurements(dataToMeasure)
val endTime = OffsetDateTime.now()

val checkpointDTO = CheckpointDTO(
id = UUID.randomUUID(),
name = checkpointName,
author = author,
measuredByAtumAgent = true,
partitioning = AtumPartitions.toSeqPartitionDTO(this.atumPartitions),
processStartTime = startTime,
processEndTime = Some(endTime),
measurements = measurements.map(MeasurementBuilder.buildMeasurementDTO).toSeq
)

agent.saveCheckpoint(checkpointDTO)
this
}

def createCheckpointOnProvidedData(checkpointName: String, author: String, measurements: Seq[Measurement]): AtumContext = {
val offsetDateTimeNow = OffsetDateTime.now()
Checkpoint(

val checkpointDTO = CheckpointDTO(
id = UUID.randomUUID(),
name = checkpointName,
author = author,
atumPartitions = this.atumPartitions,
partitioning = AtumPartitions.toSeqPartitionDTO(this.atumPartitions),
processStartTime = offsetDateTimeNow,
processEndTime = Some(offsetDateTimeNow),
measurements = measurements
measurements = measurements.map(MeasurementBuilder.buildMeasurementDTO)
)

agent.saveCheckpoint(checkpointDTO)
this
}

def addAdditionalData(key: String, value: String): Unit = {
Expand Down Expand Up @@ -121,21 +149,22 @@ object AtumContext {
new AtumContext(
AtumPartitions.fromPartitioning(atumContextDTO.partitioning),
agent,
MeasuresMapper.mapToMeasures(atumContextDTO.measures),
MeasuresBuilder.mapToMeasures(atumContextDTO.measures),
atumContextDTO.additionalData.additionalData
)
}

implicit class DatasetWrapper(df: DataFrame) {

/**
* Set a point in the pipeline to execute calculation.
* Set a point in the pipeline to execute calculation and store it.
* @param checkpointName The key assigned to this checkpoint
* @param author Author of the checkpoint
* @param atumContext Contains the calculations to be done and publish the result
* @return
*/
def createCheckpoint(checkpointName: String, author: String)(implicit atumContext: AtumContext): DataFrame = {
// todo: implement checkpoint creation
atumContext.createCheckpoint(checkpointName, author, df)
df
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package za.co.absa.atum.agent.core

import org.apache.spark.sql.DataFrame
import za.co.absa.atum.agent.core.MeasurementProcessor.MeasurementFunction
import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType

trait MeasurementProcessor {

Expand All @@ -26,6 +27,13 @@ trait MeasurementProcessor {
}

object MeasurementProcessor {
type MeasurementFunction = DataFrame => String
/**
* The raw result of measurement is always gonna be string, because we want to avoid some floating point issues
* (overflows, consistent representation of numbers - whether they are coming from Java or Scala world, and more),
* but the actual type is stored alongside the computation because we don't want to lost this information.
*/
final case class ResultOfMeasurement(resultValue: String, resultType: ResultValueType.ResultValueType)

type MeasurementFunction = DataFrame => ResultOfMeasurement

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,5 @@ class ConsoleDispatcher extends Dispatcher with Logging {
override def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
println(s"Saving checkpoint to server. $checkpoint")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,5 @@ class HttpDispatcher(config: Config) extends Dispatcher with Logging {
.post(serverUri)
.send(backend)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,7 @@

package za.co.absa.atum.agent.exception

case class UnsupportedMeasureException(msg: String) extends Exception(msg)
sealed abstract class AtumAgentException extends Exception

case class MeasurementProvidedException(msg: String) extends AtumAgentException
case class MeasureException(msg: String) extends AtumAgentException

This file was deleted.

This file was deleted.

Loading