Skip to content

Commit

Permalink
* major progress
Browse files Browse the repository at this point in the history
  • Loading branch information
benedeki committed Dec 9, 2024
1 parent b53ba99 commit 55d60e1
Show file tree
Hide file tree
Showing 23 changed files with 403 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package za.co.absa.atum.server.api.http
package za.co.absa.atum.model

object ApiPaths {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,20 @@ package za.co.absa.atum.model.dto

import io.circe.{Decoder, Encoder}
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
import za.co.absa.atum.model.dto.traits.CheckpointCore

import java.time.ZonedDateTime
import java.util.UUID

case class CheckpointV2DTO(
case class CheckpointV2DTO (
id: UUID,
name: String,
author: String,
measuredByAtumAgent: Boolean = false,
processStartTime: ZonedDateTime,
processEndTime: Option[ZonedDateTime],
measurements: Set[MeasurementDTO]
)
) extends CheckpointCore

object CheckpointV2DTO {
implicit val decodeCheckpointDTO: Decoder[CheckpointV2DTO] = deriveDecoder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package za.co.absa.atum.model.dto

import io.circe.{Decoder, Encoder}
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
import za.co.absa.atum.model.dto.traits.CheckpointCore

import java.time.ZonedDateTime
import java.util.UUID
Expand All @@ -31,7 +32,7 @@ case class CheckpointWithPartitioningDTO(
processEndTime: Option[ZonedDateTime],
measurements: Set[MeasurementDTO],
partitioning: PartitioningWithIdDTO
)
) extends CheckpointCore

object CheckpointWithPartitioningDTO {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.model.dto.traits

import za.co.absa.atum.model.dto.MeasurementDTO

import java.time.ZonedDateTime
import java.util.UUID

trait CheckpointCore {
def id: UUID
def name: String
def author: String
def measuredByAtumAgent: Boolean
def processStartTime: ZonedDateTime
def processEndTime: Option[ZonedDateTime]
def measurements: Set[MeasurementDTO]
}
46 changes: 46 additions & 0 deletions model/src/main/scala/za/co/absa/atum/model/types/Checkpoint.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.model.types

import za.co.absa.atum.model.dto.traits.CheckpointCore

import java.time.ZonedDateTime
import java.util.UUID

case class Checkpoint (
id: UUID,
name: String,
author: String,
measuredByAtumAgent: Boolean = false,
processStartTime: ZonedDateTime,
processEndTime: Option[ZonedDateTime],
measurements: Set[Measurement]
)

object Checkpoint {
def apply(from: CheckpointCore): Checkpoint = {
new Checkpoint(
id = from.id,
name = from.name,
author = from.author,
measuredByAtumAgent = from.measuredByAtumAgent,
processStartTime = from.processStartTime,
processEndTime = from.processEndTime,
measurements = from.measurements.map()
)
}
}
37 changes: 37 additions & 0 deletions model/src/main/scala/za/co/absa/atum/model/types/Measurement.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.model.types

import za.co.absa.atum.model.ResultValueType
import za.co.absa.atum.model.dto.MeasurementDTO

case class Measurement[T] (
measureName: String,
measuredColumns: Seq[String],
valueType: ResultValueType,
value: T
)

object Measurement {
def apply[T](from: MeasurementDTO): Measurement = {
new Measurement(
measureName = from.measure.measureName,
measuredColumns = from.measure.measuredColumns,
value = from.result.mainValue.value
)
}
}
49 changes: 49 additions & 0 deletions reader/src/main/scala/za/co/absa/atum/reader/FlowReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,21 @@ package za.co.absa.atum.reader

import sttp.client3.SttpBackend
import sttp.monad.MonadError
import sttp.monad.syntax._
import za.co.absa.atum.model.dto.{CheckpointWithPartitioningDTO, FlowDTO}
import za.co.absa.atum.model.envelopes.SuccessResponse.{PaginatedResponse, SingleSuccessResponse}
import za.co.absa.atum.model.types.basic.AtumPartitions
import za.co.absa.atum.reader.basic.RequestResult.RequestResult
import za.co.absa.atum.reader.basic.{PartitioningIdProvider, Reader}
import za.co.absa.atum.model.ApiPaths._
import za.co.absa.atum.reader.implicits.PaginatedResponseImplicits.PaginatedResponseMonadEnhancements
import za.co.absa.atum.reader.implicits.EitherImplicits.EitherMonadEnhancements
import za.co.absa.atum.reader.result.Page
import za.co.absa.atum.reader.result.Page.PageRoller
import za.co.absa.atum.reader.server.ServerConfig

import za.co.absa.atum.reader.basic.RequestResult.RequestPageResultOps

/**
* This class is a reader that reads data tight to a flow.
* @param mainFlowPartitioning - the partitioning of the main flow; renamed from ancestor's 'flowPartitioning'
Expand All @@ -35,4 +46,42 @@ class FlowReader[F[_]](val mainFlowPartitioning: AtumPartitions)
(implicit serverConfig: ServerConfig, backend: SttpBackend[F, Any], ev: MonadError[F])
extends Reader[F] with PartitioningIdProvider[F]{

private def flowId(mainPartitioningId: Long): F[RequestResult[Long]] = {
val endpoint = s"/$Api/$V2/${V2Paths.Partitionings}/$mainPartitioningId/${V2Paths.MainFlow}"
val queryResult = getQuery[SingleSuccessResponse[FlowDTO]](endpoint)
queryResult.map{ result =>
result.map(_.data.id)
}
}

private def queryCheckpoints(flowId: Long,
checkpointName: Option[String],
pageSize: Int,
offset: Long): F[RequestResult[PaginatedResponse[CheckpointWithPartitioningDTO]]] = {
val endpoint = s"/$Api/$V2/${V2Paths.Flows}/$flowId/${V2Paths.Checkpoints}"
val params = Map(
"limit" -> pageSize.toString,
"offset" -> offset.toString
) ++ checkpointName.map(("checkpoint-name" -> _))
getQuery(endpoint, params)
}

private def doGetCheckpoints(checkpointName: Option[String], pageSize: Int = 10, offset: Long = 0): F[RequestResult[Page[CheckpointWithPartitioningDTO, F]]] = {
val pageRoller: PageRoller[CheckpointWithPartitioningDTO, F] = doGetCheckpoints(checkpointName, _, _)

for {
mainPartitioningId <- partitioningId(mainFlowPartitioning)
flowId <- mainPartitioningId.project(flowId)
checkpoints <- flowId.project(queryCheckpoints(_, checkpointName, pageSize, offset))
} yield checkpoints.map(_.toPage(pageRoller))

}

def getCheckpoints(pageSize: Int = 10, offset: Long = 0) = {
doGetCheckpoints(None, pageSize, offset).map(_.pageMap(data =>))
}

def getCheckpointsOfName(name: String, pageSize: Int = 10, offset: Int = 0) = {
doGetCheckpoints(Some(name), pageSize, offset)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package za.co.absa.atum.reader.basic

import sttp.monad.MonadError
import sttp.monad.syntax._
import za.co.absa.atum.model.ApiPaths._
import za.co.absa.atum.model.dto.PartitioningWithIdDTO
import za.co.absa.atum.model.envelopes.SuccessResponse.SingleSuccessResponse
import za.co.absa.atum.model.types.basic.AtumPartitions
Expand All @@ -28,8 +29,8 @@ import za.co.absa.atum.reader.basic.RequestResult.RequestResult
trait PartitioningIdProvider[F[_]] {self: Reader[F] =>
def partitioningId(partitioning: AtumPartitions)(implicit monad: MonadError[F]): F[RequestResult[Long]] = {
val encodedPartitioning = partitioning.toPartitioningDTO.asBase64EncodedJsonString
val queryResult = getQuery[SingleSuccessResponse[PartitioningWithIdDTO]]("/api/v2/partitionings", Map("partitioning" -> encodedPartitioning))
queryResult.map{result =>
val queryResult = getQuery[SingleSuccessResponse[PartitioningWithIdDTO]](s"/$Api/$V2/${V2Paths.Partitionings}", Map("partitioning" -> encodedPartitioning))
queryResult.map{ result =>
result.map(_.data.id)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import sttp.monad.MonadError
import sttp.monad.syntax._
import za.co.absa.atum.reader.server.ServerConfig
import za.co.absa.atum.reader.basic.RequestResult._
import za.co.absa.atum.reader.exceptions.RequestException.CirceError

/**
* Reader is a base class for reading data from a remote server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,32 @@
package za.co.absa.atum.reader.basic

import sttp.client3.{DeserializationException, HttpError, Response, ResponseException}
import sttp.monad.MonadError
import za.co.absa.atum.model.envelopes.ErrorResponse
import za.co.absa.atum.reader.exceptions.RequestException.{CirceError, HttpException, ParsingException}
import za.co.absa.atum.reader.exceptions.{ReaderException, RequestException}
import za.co.absa.atum.reader.result.Page

object RequestResult {
type CirceError = io.circe.Error
type RequestResult[R] = Either[ResponseException[ErrorResponse, CirceError], R]
type RequestResult[R] = Either[RequestException, R]

def RequestOK[T](value: T): RequestResult[T] = Right(value)
def RequestFail[T](error: RequestException): RequestResult[T] = Left(error)

implicit class ResponseOps[R](val response: Response[Either[ResponseException[String, CirceError], R]]) extends AnyVal {
def toRequestResult: RequestResult[R] = {
response.body.left.map {
case he: HttpError[String] =>
ErrorResponse.basedOnStatusCode(he.statusCode.code, he.body) match {
case Right(er) => HttpError(er, he.statusCode)
case Left(ce) => DeserializationException(he.body, ce)
case Right(er) => HttpException(he.getMessage, he.statusCode, er, response.request.uri)
case Left(ce) => ParsingException.fromCirceError(ce, he.body)
}
case de: DeserializationException[CirceError] => de
case de: DeserializationException[CirceError] => ParsingException.fromCirceError(de.error, de.body)
}
}
}

implicit class RequestPageResultOps[A, F[_]: MonadError](requestResult: RequestResult[Page[A, F]]) {
def pageMap[B](f: A => B): RequestResult[Page[B, F]] = requestResult.map(_.map(f))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.reader.exceptions

class ReaderException(message: String) extends Exception(message)
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.reader.exceptions

import sttp.client3.HttpError
import sttp.model.{StatusCode, Uri}
import za.co.absa.atum.model.envelopes.ErrorResponse

abstract class RequestException(message: String) extends ReaderException(message)


object RequestException {
type CirceError = io.circe.Error

final case class HttpException(
message: String,
statusCode: StatusCode,
errorResponse: ErrorResponse,
request: Uri
) extends RequestException(message)

final case class ParsingException(
message: String,
body: String
) extends RequestException(message)
object ParsingException {
def fromCirceError(error: CirceError, body: String): ParsingException = {
ParsingException(error.getMessage, body)
}
}


final case class NoDataException(
message: String
) extends RequestException(message)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.reader.implicits

import sttp.monad.MonadError

object EitherImplicits {

implicit class EitherMonadEnhancements[A, B](val either: Either[A, B]) extends AnyVal {
def project[C, F[_]: MonadError](f: B => F[Either[A, C]]): F[Either[A, C]] = either match {
case Right(b) => f(b)
case Left(a) => implicitly[MonadError[F]].unit(Left(a))
}
}

}
Loading

0 comments on commit 55d60e1

Please sign in to comment.