-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
#247 Implement basics of FlowReader #306
base: feature/247-paging-in-reader
Are you sure you want to change the base?
#247 Implement basics of FlowReader #306
Conversation
* created new module Info * the new modul added to JaCoco and CI routines
* JaCoCo exclusion for model
* created Provider to query the data from server * support for Future, IO, and ZIO based providers * work in progress
* fixed license headers
…endpoints-from-info-module
…endpoints-from-info-module
@@ -14,7 +14,7 @@ | |||
* limitations under the License. | |||
*/ | |||
|
|||
package za.co.absa.atum.server.api.http | |||
package za.co.absa.atum.model |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah something inside me says that this is not really really a 'model' but in general I'm okay with this; finding a better name for 'model' is difficult and it's backward incompatible change; alternatively, creating a new model for this seems a bit too much, so I prefer your solution
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I consider the model
as the library to make communication with server "easy". So IMHO it fits, without too much mind-bending 😉
* limitations under the License. | ||
*/ | ||
|
||
package za.co.absa.atum.model.dto.traits |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm I'm not sure I like the location of this file; the trait/ dir has only this 1 file which doesn't really do much to our service in terms of better 'modularity' / 'structure of the DTO module. Besides, the fact that it's trait can be seen pretty nicely in IntelliJ, as it's visually different (green trait icon)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, I don't really see a big value in doing this at all to be fair. I mean, what if CheckpointV3 will be different? Then we can't do this or will update the trait..I mean it seems potentially limiting without too much benefits.
Also you did it only for checkpoints, not Partitioning DTOs; to me it really seem unncessary hierarchy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The trait is a solution for inability/discouragement of case class
inheritance. Because CheckpointWithPartitioningDTO
is by definition an enhanced CheckpointDTO
. And there's a a point, where I want to be able to handle both with the same function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Package traits
is debatable of course. Naming... 😉
import za.co.absa.atum.model.dto.MeasurementDTO | ||
|
||
trait Measurement { | ||
type T |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type T | |
type V |
just a suggestion
|
||
object Measurement { | ||
|
||
def apply[T](from: MeasurementDTO): Measurement = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this has some similarity with agent's model/MeasureResult - did you check if it can be reused for Reader usages? Maybe slightly changed/moved if needed. But I think if model and agent are doing conceptually similar thing maybe we can unite and reuse some of the existing code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The similarity did occur to me. But they are not 100% same unlike AtumPartitions
and AdditionalData
and did want to further delay in search for commonality. I think that can be done in future, if found useful.
import za.co.absa.atum.model.types.{AtumPartitionsCheckpoint, Checkpoint} | ||
import za.co.absa.atum.reader.implicits.PaginatedResponseImplicits.PaginatedResponseMonadEnhancements | ||
import za.co.absa.atum.reader.implicits.EitherImplicits.EitherMonadEnhancements | ||
import za.co.absa.atum.reader.implicits.PaginatedResponseImplicits |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this one is not needed due to line 29
getQuery(endpoint, params) | ||
} | ||
|
||
private def geetCheckpointDTOs(checkpointName: Option[String], pageSize: Int = 10, offset: Long = 0): F[RequestResult[Page[CheckpointWithPartitioningDTO, F]]] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private def geetCheckpointDTOs(checkpointName: Option[String], pageSize: Int = 10, offset: Long = 0): F[RequestResult[Page[CheckpointWithPartitioningDTO, F]]] = { | |
private def getCheckpointDTOs(checkpointName: Option[String], pageSize: Int = 10, offset: Long = 0): F[RequestResult[Page[CheckpointWithPartitioningDTO, F]]] = { |
val checkpoint = Checkpoint(data) | ||
AtumPartitionsCheckpoint(atumPartitions, checkpoint) | ||
} | ||
geetCheckpointDTOs(None, pageSize, offset).map(_.pageMap((checkpointMapper))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
geetCheckpointDTOs(None, pageSize, offset).map(_.pageMap((checkpointMapper))) | |
geetCheckpointDTOs(None, pageSize, offset).map(_.pageMap(checkpointMapper)) |
import za.co.absa.atum.model.envelopes.ErrorResponse | ||
import za.co.absa.atum.reader.exceptions.RequestException.{CirceError, HttpException, ParsingException} | ||
import za.co.absa.atum.reader.exceptions.{ReaderException, RequestException} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ReaderException unused
import za.co.absa.atum.model.envelopes.ErrorResponse | ||
import za.co.absa.atum.reader.exceptions.RequestException.{CirceError, HttpException, ParsingException} | ||
import za.co.absa.atum.reader.exceptions.{ReaderException, RequestException} | ||
import za.co.absa.atum.reader.result.{GroupedPage, Page} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GroupedPage unused
|
||
import za.co.absa.atum.model.types.basic.AtumPartitions | ||
|
||
case class AtumPartitionsCheckpoint( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also possibly reader-specific?
@@ -18,6 +18,7 @@ package za.co.absa.atum.reader.basic | |||
|
|||
import sttp.monad.MonadError |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider renaming package from basic
to core
? just an idea
* Finished UTs
This item depends on: |
val params = Map( | ||
"limit" -> pageSize.toString, | ||
"offset" -> offset.toString | ||
) ++ checkpointName.map("checkpoint-name" -> _) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would consider to extract this into some sort of a constant, but I don't insist - not sure if this is gonna be used elsewhere in the future
|
||
for { | ||
mainPartitioningId <- partitioningId(mainFlowPartitioning) | ||
flowId <- mainPartitioningId.project(queryFlowId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for the sake of better readability, I'd consider to reflect either types in the name - for example here it could be flowIdOrError
or so
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same below etc - all of these 'wrapper' types are a bit easier to read and maintain truth
for { | ||
mainPartitioningId <- partitioningId(mainFlowPartitioning) | ||
flowId <- mainPartitioningId.project(queryFlowId) | ||
checkpoints <- flowId.project(queryCheckpoints(_, checkpointName, pageSize, offset)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wow, I find this insanely complicated / advanced :D
IdMonad.unit(RequestFail(new RequestException("Not used"){})) | ||
} | ||
|
||
val source = PaginatedResponse(Seq(1, 2, 3), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the formatting is really wild here :D
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seriously, sometimes it worsens code readability as it's a bit unpredictable / misleading
assert(result.items == Vector(1, 2, 3)) | ||
assert(!result.hasNext) | ||
assert(result.limit == 3) | ||
assert(result.pageStart == 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider adding test on pageEnd
as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just to increase test coverage more, computation seems correct
|
||
private val partitioningEncoded = "W3sia2V5IjoiYSIsInZhbHVlIjoiYiJ9LHsia2V5IjoiYyIsInZhbHVlIjoiZCJ9XQ==" | ||
|
||
private val partitioningResponse = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider to extract the test data to another file
processEndTime = Some(ZonedDateTime.parse("2024-12-30T16:01:36.5052109+01:00[Europe/Budapest]")), | ||
measurements = Set( | ||
LongMeasurement( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary empty line
@@ -0,0 +1,47 @@ | |||
/* | |||
* Copyright 2024 ABSA Group Limited |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
2021 - there might be more occurrences like this, but I didn't check
FlowReade
r methods to read checkpoints - this is the main focus of the whole PRApiPaths
moved from_Server_ to ModelCheckpoint
andMeasurement
classes in Model to detach user from (versioned) DTOsDepends on #316
Closes #247