Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#247 Implement basics of FlowReader #306

Open
wants to merge 44 commits into
base: feature/247-paging-in-reader
Choose a base branch
from

Conversation

benedeki
Copy link
Contributor

@benedeki benedeki commented Dec 11, 2024

  • FlowReader methods to read checkpoints - this is the main focus of the whole PR
  • ApiPaths moved from_Server_ to Model
  • Checkpoint and Measurement classes in Model to detach user from (versioned) DTOs
  • Other classes moved from Agent to Model_, as they are used in Reader too.

Depends on #316
Closes #247

benedeki and others added 30 commits October 31, 2024 01:23
* created new module Info
* the new modul added to JaCoco and CI routines
* JaCoCo exclusion for model
* created Provider to query the data from server
* support for Future, IO, and ZIO based providers
* work in progress
@@ -14,7 +14,7 @@
* limitations under the License.
*/

package za.co.absa.atum.server.api.http
package za.co.absa.atum.model
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah something inside me says that this is not really really a 'model' but in general I'm okay with this; finding a better name for 'model' is difficult and it's backward incompatible change; alternatively, creating a new model for this seems a bit too much, so I prefer your solution

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I consider the model as the library to make communication with server "easy". So IMHO it fits, without too much mind-bending 😉

* limitations under the License.
*/

package za.co.absa.atum.model.dto.traits
Copy link
Collaborator

@lsulak lsulak Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm I'm not sure I like the location of this file; the trait/ dir has only this 1 file which doesn't really do much to our service in terms of better 'modularity' / 'structure of the DTO module. Besides, the fact that it's trait can be seen pretty nicely in IntelliJ, as it's visually different (green trait icon)

Copy link
Collaborator

@lsulak lsulak Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact, I don't really see a big value in doing this at all to be fair. I mean, what if CheckpointV3 will be different? Then we can't do this or will update the trait..I mean it seems potentially limiting without too much benefits.

Also you did it only for checkpoints, not Partitioning DTOs; to me it really seem unncessary hierarchy

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The trait is a solution for inability/discouragement of case class inheritance. Because CheckpointWithPartitioningDTO is by definition an enhanced CheckpointDTO. And there's a a point, where I want to be able to handle both with the same function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Package traits is debatable of course. Naming... 😉

import za.co.absa.atum.model.dto.MeasurementDTO

trait Measurement {
type T
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
type T
type V

just a suggestion


object Measurement {

def apply[T](from: MeasurementDTO): Measurement = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this has some similarity with agent's model/MeasureResult - did you check if it can be reused for Reader usages? Maybe slightly changed/moved if needed. But I think if model and agent are doing conceptually similar thing maybe we can unite and reuse some of the existing code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The similarity did occur to me. But they are not 100% same unlike AtumPartitions and AdditionalData and did want to further delay in search for commonality. I think that can be done in future, if found useful.

import za.co.absa.atum.model.types.{AtumPartitionsCheckpoint, Checkpoint}
import za.co.absa.atum.reader.implicits.PaginatedResponseImplicits.PaginatedResponseMonadEnhancements
import za.co.absa.atum.reader.implicits.EitherImplicits.EitherMonadEnhancements
import za.co.absa.atum.reader.implicits.PaginatedResponseImplicits
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this one is not needed due to line 29

getQuery(endpoint, params)
}

private def geetCheckpointDTOs(checkpointName: Option[String], pageSize: Int = 10, offset: Long = 0): F[RequestResult[Page[CheckpointWithPartitioningDTO, F]]] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private def geetCheckpointDTOs(checkpointName: Option[String], pageSize: Int = 10, offset: Long = 0): F[RequestResult[Page[CheckpointWithPartitioningDTO, F]]] = {
private def getCheckpointDTOs(checkpointName: Option[String], pageSize: Int = 10, offset: Long = 0): F[RequestResult[Page[CheckpointWithPartitioningDTO, F]]] = {

val checkpoint = Checkpoint(data)
AtumPartitionsCheckpoint(atumPartitions, checkpoint)
}
geetCheckpointDTOs(None, pageSize, offset).map(_.pageMap((checkpointMapper)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
geetCheckpointDTOs(None, pageSize, offset).map(_.pageMap((checkpointMapper)))
geetCheckpointDTOs(None, pageSize, offset).map(_.pageMap(checkpointMapper))

import za.co.absa.atum.model.envelopes.ErrorResponse
import za.co.absa.atum.reader.exceptions.RequestException.{CirceError, HttpException, ParsingException}
import za.co.absa.atum.reader.exceptions.{ReaderException, RequestException}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReaderException unused

import za.co.absa.atum.model.envelopes.ErrorResponse
import za.co.absa.atum.reader.exceptions.RequestException.{CirceError, HttpException, ParsingException}
import za.co.absa.atum.reader.exceptions.{ReaderException, RequestException}
import za.co.absa.atum.reader.result.{GroupedPage, Page}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GroupedPage unused


import za.co.absa.atum.model.types.basic.AtumPartitions

case class AtumPartitionsCheckpoint(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also possibly reader-specific?

@@ -18,6 +18,7 @@ package za.co.absa.atum.reader.basic

import sttp.monad.MonadError
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider renaming package from basic to core ? just an idea

@benedeki benedeki changed the base branch from master to feature/247-paging-in-reader February 4, 2025 11:29
@benedeki benedeki marked this pull request as ready for review February 4, 2025 11:31
@benedeki benedeki added the dependent The item depends on some other open item (Issue or PR) label Feb 5, 2025
@benedeki
Copy link
Contributor Author

benedeki commented Feb 5, 2025

This item depends on:

val params = Map(
"limit" -> pageSize.toString,
"offset" -> offset.toString
) ++ checkpointName.map("checkpoint-name" -> _)
Copy link
Collaborator

@lsulak lsulak Feb 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would consider to extract this into some sort of a constant, but I don't insist - not sure if this is gonna be used elsewhere in the future


for {
mainPartitioningId <- partitioningId(mainFlowPartitioning)
flowId <- mainPartitioningId.project(queryFlowId)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the sake of better readability, I'd consider to reflect either types in the name - for example here it could be flowIdOrError or so

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same below etc - all of these 'wrapper' types are a bit easier to read and maintain truth

for {
mainPartitioningId <- partitioningId(mainFlowPartitioning)
flowId <- mainPartitioningId.project(queryFlowId)
checkpoints <- flowId.project(queryCheckpoints(_, checkpointName, pageSize, offset))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow, I find this insanely complicated / advanced :D

IdMonad.unit(RequestFail(new RequestException("Not used"){}))
}

val source = PaginatedResponse(Seq(1, 2, 3),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the formatting is really wild here :D

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seriously, sometimes it worsens code readability as it's a bit unpredictable / misleading

assert(result.items == Vector(1, 2, 3))
assert(!result.hasNext)
assert(result.limit == 3)
assert(result.pageStart == 1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider adding test on pageEnd as well

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to increase test coverage more, computation seems correct


private val partitioningEncoded = "W3sia2V5IjoiYSIsInZhbHVlIjoiYiJ9LHsia2V5IjoiYyIsInZhbHVlIjoiZCJ9XQ=="

private val partitioningResponse =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider to extract the test data to another file

processEndTime = Some(ZonedDateTime.parse("2024-12-30T16:01:36.5052109+01:00[Europe/Budapest]")),
measurements = Set(
LongMeasurement(

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unnecessary empty line

@@ -0,0 +1,47 @@
/*
* Copyright 2024 ABSA Group Limited
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2021 - there might be more occurrences like this, but I didn't check

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
dependent The item depends on some other open item (Issue or PR) work in progress Work on this item is not yet finished (mainly intended for PRs)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement basics of FlowReader
2 participants