-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#247 Implement basics of FlowReader #306
base: master
Are you sure you want to change the base?
Changes from all commits
b97b603
e623974
5e4eadb
2e1e2ea
738c904
df8c9bd
5affd82
0f1e121
d773a93
0776f9c
38fde1c
1ac2233
e6dcb52
6968b02
b9bacef
bbb1e7f
33e6628
f7ced56
ca2116b
e5e6f63
fe07272
7656f6f
eb9a678
7641c07
bc82a5b
0e7675e
432716a
11b0a16
2c3f145
e07dffb
3955a50
b287a66
d04d23b
c344249
c0b0988
b53ba99
55d60e1
5dfe5c5
67ffe07
09e2ed8
e7ff732
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,32 @@ | ||
/* | ||
* Copyright 2021 ABSA Group Limited | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package za.co.absa.atum.model.dto.traits | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. hmm I'm not sure I like the location of this file; the trait/ dir has only this 1 file which doesn't really do much to our service in terms of better 'modularity' / 'structure of the DTO module. Besides, the fact that it's trait can be seen pretty nicely in IntelliJ, as it's visually different (green trait icon) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In fact, I don't really see a big value in doing this at all to be fair. I mean, what if CheckpointV3 will be different? Then we can't do this or will update the trait..I mean it seems potentially limiting without too much benefits. Also you did it only for checkpoints, not Partitioning DTOs; to me it really seem unncessary hierarchy There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The trait is a solution for inability/discouragement of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Package |
||
|
||
import za.co.absa.atum.model.dto.MeasurementDTO | ||
|
||
import java.time.ZonedDateTime | ||
import java.util.UUID | ||
|
||
trait CheckpointCore { | ||
def id: UUID | ||
def name: String | ||
def author: String | ||
def measuredByAtumAgent: Boolean | ||
def processStartTime: ZonedDateTime | ||
def processEndTime: Option[ZonedDateTime] | ||
def measurements: Set[MeasurementDTO] | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
/* | ||
* Copyright 2021 ABSA Group Limited | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package za.co.absa.atum.model.types | ||
|
||
import za.co.absa.atum.model.types.basic.AtumPartitions | ||
|
||
case class AtumPartitionsCheckpoint( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also possibly reader-specific? |
||
partitioning: AtumPartitions, | ||
checkpoint: Checkpoint | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
/* | ||
* Copyright 2021 ABSA Group Limited | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package za.co.absa.atum.model.types | ||
|
||
import za.co.absa.atum.model.dto.traits.CheckpointCore | ||
|
||
import java.time.ZonedDateTime | ||
import java.util.UUID | ||
|
||
case class Checkpoint ( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wonder if this should be in Model or only in Reader? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm in 30% of the review only so far, but I thing that this is an internal checkpoint representation used in Reader, thus it shouldn't be in model; Reader seems like a very good place for it. The same logic applies to internal data structures used in Agent; if we put all of these into model then it would be (even more) messy (meaning, we have already quite a lot of DTOs in model :D) |
||
id: UUID, | ||
name: String, | ||
author: String, | ||
measuredByAtumAgent: Boolean = false, | ||
processStartTime: ZonedDateTime, | ||
processEndTime: Option[ZonedDateTime], | ||
measurements: Set[Measurement] | ||
) | ||
|
||
object Checkpoint { | ||
def apply(from: CheckpointCore): Checkpoint = { | ||
new Checkpoint( | ||
id = from.id, | ||
name = from.name, | ||
author = from.author, | ||
measuredByAtumAgent = from.measuredByAtumAgent, | ||
processStartTime = from.processStartTime, | ||
processEndTime = from.processEndTime, | ||
measurements = from.measurements.map(Measurement(_)) | ||
) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,82 @@ | ||||||
/* | ||||||
* Copyright 2021 ABSA Group Limited | ||||||
* | ||||||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||||||
* you may not use this file except in compliance with the License. | ||||||
* You may obtain a copy of the License at | ||||||
* | ||||||
* http://www.apache.org/licenses/LICENSE-2.0 | ||||||
* | ||||||
* Unless required by applicable law or agreed to in writing, software | ||||||
* distributed under the License is distributed on an "AS IS" BASIS, | ||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||||
* See the License for the specific language governing permissions and | ||||||
* limitations under the License. | ||||||
*/ | ||||||
|
||||||
package za.co.absa.atum.model.types | ||||||
|
||||||
import za.co.absa.atum.model.ResultValueType | ||||||
import za.co.absa.atum.model.dto.MeasurementDTO | ||||||
|
||||||
trait Measurement { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same question as above - should this be in Model or only in Reader? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. probably same answer - if this is only gonna be used in Reader, then it should be in Reader. If it's common enough to be also used in Agent, then I'd say let's keep it in model/. |
||||||
type T | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
just a suggestion |
||||||
def measureName: String | ||||||
def measuredColumns: Seq[String] | ||||||
def valueType: ResultValueType | ||||||
def value: T | ||||||
def stringValue: String | ||||||
} | ||||||
|
||||||
object Measurement { | ||||||
|
||||||
def apply[T](from: MeasurementDTO): Measurement = { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this has some similarity with agent's model/MeasureResult - did you check if it can be reused for Reader usages? Maybe slightly changed/moved if needed. But I think if model and agent are doing conceptually similar thing maybe we can unite and reuse some of the existing code There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The similarity did occur to me. But they are not 100% same unlike |
||||||
from.result.mainValue.valueType match { | ||||||
case ResultValueType.StringValue => StringMeasurement(from.measure.measureName, from.measure.measuredColumns, from.result.mainValue.value) | ||||||
case ResultValueType.LongValue => LongMeasurement(from.measure.measureName, from.measure.measuredColumns, from.result.mainValue.value.toLong) | ||||||
case ResultValueType.BigDecimalValue => BigDecimalMeasurement(from.measure.measureName, from.measure.measuredColumns, BigDecimal(from.result.mainValue.value)) | ||||||
case ResultValueType.DoubleValue => DoubleMeasurement(from.measure.measureName, from.measure.measuredColumns, from.result.mainValue.value.toDouble) | ||||||
} | ||||||
} | ||||||
|
||||||
case class StringMeasurement( | ||||||
measureName: String, | ||||||
measuredColumns: Seq[String], | ||||||
value: String | ||||||
) extends Measurement { | ||||||
override type T = String | ||||||
override def valueType: ResultValueType = ResultValueType.StringValue | ||||||
override def stringValue: String = value | ||||||
} | ||||||
|
||||||
case class LongMeasurement( | ||||||
measureName: String, | ||||||
measuredColumns: Seq[String], | ||||||
value: Long | ||||||
) extends Measurement { | ||||||
override type T = Long | ||||||
override def valueType: ResultValueType = ResultValueType.LongValue | ||||||
override def stringValue: String = value.toString | ||||||
} | ||||||
|
||||||
case class BigDecimalMeasurement( | ||||||
measureName: String, | ||||||
measuredColumns: Seq[String], | ||||||
value: BigDecimal | ||||||
) extends Measurement { | ||||||
override type T = BigDecimal | ||||||
override def valueType: ResultValueType = ResultValueType.BigDecimalValue | ||||||
override def stringValue: String = value.toString | ||||||
} | ||||||
|
||||||
case class DoubleMeasurement( | ||||||
measureName: String, | ||||||
measuredColumns: Seq[String], | ||||||
value: Double | ||||||
) extends Measurement { | ||||||
override type T = Double | ||||||
override def valueType: ResultValueType = ResultValueType.DoubleValue | ||||||
override def stringValue: String = value.toString | ||||||
} | ||||||
|
||||||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -18,9 +18,20 @@ package za.co.absa.atum.reader | |||||
|
||||||
import sttp.client3.SttpBackend | ||||||
import sttp.monad.MonadError | ||||||
import za.co.absa.atum.model.types.basic.AtumPartitions | ||||||
import sttp.monad.syntax._ | ||||||
import za.co.absa.atum.model.dto.{CheckpointWithPartitioningDTO, FlowDTO} | ||||||
import za.co.absa.atum.model.envelopes.SuccessResponse.{PaginatedResponse, SingleSuccessResponse} | ||||||
import za.co.absa.atum.model.types.basic.{AtumPartitions, PartitioningDTOOps} | ||||||
import za.co.absa.atum.reader.basic.RequestResult.{RequestPageResultOps, RequestResult} | ||||||
import za.co.absa.atum.reader.basic.{PartitioningIdProvider, Reader} | ||||||
import za.co.absa.atum.model.ApiPaths._ | ||||||
import za.co.absa.atum.model.types.{AtumPartitionsCheckpoint, Checkpoint} | ||||||
import za.co.absa.atum.reader.implicits.PaginatedResponseImplicits.PaginatedResponseMonadEnhancements | ||||||
import za.co.absa.atum.reader.implicits.EitherImplicits.EitherMonadEnhancements | ||||||
import za.co.absa.atum.reader.implicits.PaginatedResponseImplicits | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this one is not needed due to line 29 |
||||||
import za.co.absa.atum.reader.result.Page | ||||||
import za.co.absa.atum.reader.server.ServerConfig | ||||||
import za.co.absa.atum.reader.result.Page.PageRoller | ||||||
|
||||||
/** | ||||||
* This class is a reader that reads data tight to a flow. | ||||||
|
@@ -35,4 +46,47 @@ class FlowReader[F[_]](val mainFlowPartitioning: AtumPartitions) | |||||
(implicit serverConfig: ServerConfig, backend: SttpBackend[F, Any], ev: MonadError[F]) | ||||||
extends Reader[F] with PartitioningIdProvider[F]{ | ||||||
|
||||||
private def flowId(mainPartitioningId: Long): F[RequestResult[Long]] = { | ||||||
val endpoint = s"/$Api/$V2/${V2Paths.Partitionings}/$mainPartitioningId/${V2Paths.MainFlow}" | ||||||
val queryResult = getQuery[SingleSuccessResponse[FlowDTO]](endpoint) | ||||||
queryResult.map{ result => | ||||||
result.map(_.data.id) | ||||||
} | ||||||
} | ||||||
|
||||||
private def queryCheckpoints(flowId: Long, | ||||||
checkpointName: Option[String], | ||||||
pageSize: Int, | ||||||
offset: Long): F[RequestResult[PaginatedResponse[CheckpointWithPartitioningDTO]]] = { | ||||||
val endpoint = s"/$Api/$V2/${V2Paths.Flows}/$flowId/${V2Paths.Checkpoints}" | ||||||
val params = Map( | ||||||
"limit" -> pageSize.toString, | ||||||
"offset" -> offset.toString | ||||||
) ++ checkpointName.map("checkpoint-name" -> _) | ||||||
getQuery(endpoint, params) | ||||||
} | ||||||
|
||||||
private def geetCheckpointDTOs(checkpointName: Option[String], pageSize: Int = 10, offset: Long = 0): F[RequestResult[Page[CheckpointWithPartitioningDTO, F]]] = { | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
val pageRoller: PageRoller[CheckpointWithPartitioningDTO, F] = geetCheckpointDTOs(checkpointName, _, _) | ||||||
|
||||||
for { | ||||||
mainPartitioningId <- partitioningId(mainFlowPartitioning) | ||||||
flowId <- mainPartitioningId.project(flowId) | ||||||
checkpoints <- flowId.project(queryCheckpoints(_, checkpointName, pageSize, offset)) | ||||||
} yield checkpoints.map(_.toPage(pageRoller)) | ||||||
|
||||||
} | ||||||
|
||||||
def getCheckpoints(pageSize: Int = 10, offset: Long = 0): F[RequestResult[Page[AtumPartitionsCheckpoint, F]]] = { | ||||||
def checkpointMapper(data: CheckpointWithPartitioningDTO): AtumPartitionsCheckpoint = { | ||||||
val atumPartitions = data.partitioning.partitioning.toAtumPartitions | ||||||
val checkpoint = Checkpoint(data) | ||||||
AtumPartitionsCheckpoint(atumPartitions, checkpoint) | ||||||
} | ||||||
geetCheckpointDTOs(None, pageSize, offset).map(_.pageMap((checkpointMapper))) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
} | ||||||
|
||||||
def getCheckpointsOfName(name: String, pageSize: Int = 10, offset: Int = 0): F[RequestResult[Page[CheckpointWithPartitioningDTO, F]]] = { | ||||||
geetCheckpointDTOs(Some(name), pageSize, offset) | ||||||
} | ||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,6 +18,7 @@ package za.co.absa.atum.reader.basic | |
|
||
import sttp.monad.MonadError | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Consider renaming package from |
||
import sttp.monad.syntax._ | ||
import za.co.absa.atum.model.ApiPaths._ | ||
import za.co.absa.atum.model.dto.PartitioningWithIdDTO | ||
import za.co.absa.atum.model.envelopes.SuccessResponse.SingleSuccessResponse | ||
import za.co.absa.atum.model.types.basic.AtumPartitions | ||
|
@@ -28,8 +29,8 @@ import za.co.absa.atum.reader.basic.RequestResult.RequestResult | |
trait PartitioningIdProvider[F[_]] {self: Reader[F] => | ||
def partitioningId(partitioning: AtumPartitions)(implicit monad: MonadError[F]): F[RequestResult[Long]] = { | ||
val encodedPartitioning = partitioning.toPartitioningDTO.asBase64EncodedJsonString | ||
val queryResult = getQuery[SingleSuccessResponse[PartitioningWithIdDTO]]("/api/v2/partitionings", Map("partitioning" -> encodedPartitioning)) | ||
queryResult.map{result => | ||
val queryResult = getQuery[SingleSuccessResponse[PartitioningWithIdDTO]](s"/$Api/$V2/${V2Paths.Partitionings}", Map("partitioning" -> encodedPartitioning)) | ||
queryResult.map{ result => | ||
result.map(_.data.id) | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,22 +17,33 @@ | |
package za.co.absa.atum.reader.basic | ||
|
||
import sttp.client3.{DeserializationException, HttpError, Response, ResponseException} | ||
import sttp.monad.MonadError | ||
import za.co.absa.atum.model.envelopes.ErrorResponse | ||
import za.co.absa.atum.reader.exceptions.RequestException.{CirceError, HttpException, ParsingException} | ||
import za.co.absa.atum.reader.exceptions.{ReaderException, RequestException} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ReaderException unused |
||
import za.co.absa.atum.reader.result.{GroupedPage, Page} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. GroupedPage unused |
||
|
||
object RequestResult { | ||
type CirceError = io.circe.Error | ||
type RequestResult[R] = Either[ResponseException[ErrorResponse, CirceError], R] | ||
type RequestResult[R] = Either[RequestException, R] | ||
|
||
def RequestOK[T](value: T): RequestResult[T] = Right(value) | ||
def RequestFail[T](error: RequestException): RequestResult[T] = Left(error) | ||
|
||
implicit class ResponseOps[R](val response: Response[Either[ResponseException[String, CirceError], R]]) extends AnyVal { | ||
def toRequestResult: RequestResult[R] = { | ||
response.body.left.map { | ||
case he: HttpError[String] => | ||
ErrorResponse.basedOnStatusCode(he.statusCode.code, he.body) match { | ||
case Right(er) => HttpError(er, he.statusCode) | ||
case Left(ce) => DeserializationException(he.body, ce) | ||
case Right(er) => HttpException(he.getMessage, he.statusCode, er, response.request.uri) | ||
case Left(ce) => ParsingException.fromCirceError(ce, he.body) | ||
} | ||
case de: DeserializationException[CirceError] => de | ||
case de: DeserializationException[CirceError] => ParsingException.fromCirceError(de.error, de.body) | ||
} | ||
} | ||
} | ||
|
||
implicit class RequestPageResultOps[A, F[_]: MonadError](requestResult: RequestResult[Page[A, F]]) { | ||
def pageMap[B](f: A => B): RequestResult[Page[B, F]] = requestResult.map(_.map(f)) | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah something inside me says that this is not really really a 'model' but in general I'm okay with this; finding a better name for 'model' is difficult and it's backward incompatible change; alternatively, creating a new model for this seems a bit too much, so I prefer your solution