Skip to content

Commit

Permalink
* Further progress
Browse files Browse the repository at this point in the history
  • Loading branch information
benedeki committed Dec 9, 2024
1 parent 55d60e1 commit 5dfe5c5
Show file tree
Hide file tree
Showing 4 changed files with 93 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.model.types

import za.co.absa.atum.model.types.basic.AtumPartitions

case class AtumPartitionsCheckpoint(
partitioning: AtumPartitions,
checkpoint: Checkpoint
)
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ object Checkpoint {
measuredByAtumAgent = from.measuredByAtumAgent,
processStartTime = from.processStartTime,
processEndTime = from.processEndTime,
measurements = from.measurements.map()
measurements = from.measurements.map(Measurement(_))
)
}
}
67 changes: 56 additions & 11 deletions model/src/main/scala/za/co/absa/atum/model/types/Measurement.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,64 @@ package za.co.absa.atum.model.types
import za.co.absa.atum.model.ResultValueType
import za.co.absa.atum.model.dto.MeasurementDTO

case class Measurement[T] (
measureName: String,
measuredColumns: Seq[String],
valueType: ResultValueType,
value: T
)
trait Measurement {
type T
def measureName: String
def measuredColumns: Seq[String]
def valueType: ResultValueType
def value: T
def stringValue: String
}

object Measurement {

def apply[T](from: MeasurementDTO): Measurement = {
new Measurement(
measureName = from.measure.measureName,
measuredColumns = from.measure.measuredColumns,
value = from.result.mainValue.value
)
from.result.mainValue.valueType match {
case ResultValueType.StringValue => StringMeasurement(from.measure.measureName, from.measure.measuredColumns, from.result.mainValue.value)
case ResultValueType.LongValue => LongMeasurement(from.measure.measureName, from.measure.measuredColumns, from.result.mainValue.value.toLong)
case ResultValueType.BigDecimalValue => BigDecimalMeasurement(from.measure.measureName, from.measure.measuredColumns, BigDecimal(from.result.mainValue.value))
case ResultValueType.DoubleValue => DoubleMeasurement(from.measure.measureName, from.measure.measuredColumns, from.result.mainValue.value.toDouble)
}
}

case class StringMeasurement(
measureName: String,
measuredColumns: Seq[String],
value: String
) extends Measurement {
override type T = String
override def valueType: ResultValueType = ResultValueType.StringValue
override def stringValue: String = value
}

case class LongMeasurement(
measureName: String,
measuredColumns: Seq[String],
value: Long
) extends Measurement {
override type T = Long
override def valueType: ResultValueType = ResultValueType.LongValue
override def stringValue: String = value.toString
}

case class BigDecimalMeasurement(
measureName: String,
measuredColumns: Seq[String],
value: BigDecimal
) extends Measurement {
override type T = BigDecimal
override def valueType: ResultValueType = ResultValueType.BigDecimalValue
override def stringValue: String = value.toString
}

case class DoubleMeasurement(
measureName: String,
measuredColumns: Seq[String],
value: Double
) extends Measurement {
override type T = Double
override def valueType: ResultValueType = ResultValueType.DoubleValue
override def stringValue: String = value.toString
}

}
19 changes: 12 additions & 7 deletions reader/src/main/scala/za/co/absa/atum/reader/FlowReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,16 @@ import sttp.monad.MonadError
import sttp.monad.syntax._
import za.co.absa.atum.model.dto.{CheckpointWithPartitioningDTO, FlowDTO}
import za.co.absa.atum.model.envelopes.SuccessResponse.{PaginatedResponse, SingleSuccessResponse}
import za.co.absa.atum.model.types.basic.AtumPartitions
import za.co.absa.atum.model.types.basic.{AtumPartitions, PartitioningDTOOps}
import za.co.absa.atum.reader.basic.RequestResult.RequestResult
import za.co.absa.atum.reader.basic.{PartitioningIdProvider, Reader}
import za.co.absa.atum.model.ApiPaths._
import za.co.absa.atum.model.types.{AtumPartitionsCheckpoint, Checkpoint}
import za.co.absa.atum.reader.implicits.PaginatedResponseImplicits.PaginatedResponseMonadEnhancements
import za.co.absa.atum.reader.implicits.EitherImplicits.EitherMonadEnhancements
import za.co.absa.atum.reader.result.Page
import za.co.absa.atum.reader.result.Page.PageRoller
import za.co.absa.atum.reader.server.ServerConfig

import za.co.absa.atum.reader.basic.RequestResult.RequestPageResultOps

/**
Expand Down Expand Up @@ -66,8 +66,8 @@ class FlowReader[F[_]](val mainFlowPartitioning: AtumPartitions)
getQuery(endpoint, params)
}

private def doGetCheckpoints(checkpointName: Option[String], pageSize: Int = 10, offset: Long = 0): F[RequestResult[Page[CheckpointWithPartitioningDTO, F]]] = {
val pageRoller: PageRoller[CheckpointWithPartitioningDTO, F] = doGetCheckpoints(checkpointName, _, _)
private def geetCheckpointDTOs(checkpointName: Option[String], pageSize: Int = 10, offset: Long = 0): F[RequestResult[Page[CheckpointWithPartitioningDTO, F]]] = {
val pageRoller: PageRoller[CheckpointWithPartitioningDTO, F] = geetCheckpointDTOs(checkpointName, _, _)

for {
mainPartitioningId <- partitioningId(mainFlowPartitioning)
Expand All @@ -77,11 +77,16 @@ class FlowReader[F[_]](val mainFlowPartitioning: AtumPartitions)

}

def getCheckpoints(pageSize: Int = 10, offset: Long = 0) = {
doGetCheckpoints(None, pageSize, offset).map(_.pageMap(data =>))
def getCheckpoints(pageSize: Int = 10, offset: Long = 0): F[RequestResult[Page[AtumPartitionsCheckpoint, F]]] = {
def checkpointMapper(data: CheckpointWithPartitioningDTO): AtumPartitionsCheckpoint = {
val atumPartitions = data.partitioning.partitioning.toAtumPartitions
val checkpoint = Checkpoint(data)
AtumPartitionsCheckpoint(atumPartitions, checkpoint)
}
geetCheckpointDTOs(None, pageSize, offset).map(_.pageMap(checkpointMapper))
}

def getCheckpointsOfName(name: String, pageSize: Int = 10, offset: Int = 0) = {
doGetCheckpoints(Some(name), pageSize, offset)
geetCheckpointDTOs(Some(name), pageSize, offset)
}
}

0 comments on commit 5dfe5c5

Please sign in to comment.