Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#247: Paging support in Reader module #316

Open
wants to merge 8 commits into
base: master
Choose a base branch
from

Conversation

benedeki
Copy link
Contributor

@benedeki benedeki commented Feb 4, 2025

  • classes abstracting work with paging and rolling through them
  • RequestResult class refactoring
  • package basic renamed to core

Requirement for #247

Release notes:

  • Added paging support to the module.

* classes abstracting work with paging and rolling through them
* `RequestResult` class refactoring
Copy link

github-actions bot commented Feb 4, 2025

JaCoCo model module code coverage report - scala 2.13.11

Overall Project 58.76% 🍏

There is no coverage information present for the Files changed

Copy link

github-actions bot commented Feb 4, 2025

JaCoCo agent module code coverage report - scala 2.13.11

Overall Project 78.2% 🍏

There is no coverage information present for the Files changed

Copy link

github-actions bot commented Feb 4, 2025

JaCoCo reader module code coverage report - scala 2.13.11

Overall Project 96.76% 🍏
Files changed 45.29%

File Coverage
PartitioningReader.scala 100% 🍏
FlowReader.scala 100% 🍏
AbstractPage.scala 100% 🍏
PartitioningIdProvider.scala 100% 🍏
Reader.scala 100% 🍏
ReaderException.scala 100% 🍏
RequestException.scala 100%
GroupedPage.scala 99.21% -98.02%
Page.scala 97.01% -94.78%
RequestResult.scala 68.18% 🍏

Copy link

github-actions bot commented Feb 4, 2025

JaCoCo server module code coverage report - scala 2.13.11

Overall Project 68.39% 🍏

There is no coverage information present for the Files changed

@benedeki benedeki linked an issue Feb 11, 2025 that may be closed by this pull request
import sttp.model.{StatusCode, Uri}
import za.co.absa.atum.model.envelopes.ErrorResponse

abstract class RequestException(message: String) extends ReaderException(message)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could make this class sealed to restrict its subclassing to the same file. This can help in maintaining control over the hierarchy and ensuring that all subclasses are known and defined in one place.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scalafmt formatting is off (hasn't been applied).

def pageEnd: Long

def pageSize: Int = (pageEnd - pageStart).toInt + 1
def hasPrior: Boolean = pageStart > 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if pageStart is 0 or negative but the absolute value is still lower than pageSize? Could this happen?


import scala.collection.immutable.ListMap

case class GroupedPage[K, V, F[_]: MonadError](
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on the usage of this class? What do we need it for? I often try to limit the scope of my PRs as much as possible.


def next: F[RequestResult[Page[T, F]]] = next(limit)

def +(other: Page[T, F]): Page[T, F] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's not really needed at this point if even in the future ...

private[reader] val pageRoller: PageRoller[T, F]
) extends AbstractPage[Vector[T], F] {

def apply(index: Int): T = items(index)
Copy link
Collaborator

@salamonpavel salamonpavel Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it wise to provide apply which might lead to IndexOutOfBoundsExceptions? Iterating over Seq is preferable ...

this.copy(items = newItems, pageRoller = newPageRoller)
}

def prior(newPageSize: Int): F[RequestResult[Page[T, F]]] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think prior is needed. But I might be wrong or maybe I am just not seeing the usage. When I think of Unify I cannot imagine that the backend calls prior given it's stateless.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned in Teams I am of the opinion that we should force forward-only consumption pattern to try to avoid repetitive calls to db.


def apply(index: Int): T = items(index)

def map[B](f: T => B): Page[B, F] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you expect the user to use map on Page if items can be mapped easily anyway?

this.copy(items = newItems, hasNext = newHasNext, pageStart = newPageStart, pageEnd = newPageEnd)
}

def groupBy[K](f: T => K): GroupedPage[K, T, F] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could discuss these mapping/grouping stuff first on our tech call?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I honestly like it and find it very universal

}

object Page {
type PageRoller[T, F[_]] = (Int, Long) => F[RequestResult[Page[T, F]]]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clever.

@@ -23,7 +23,7 @@ import za.co.absa.atum.model.envelopes.SuccessResponse.SingleSuccessResponse
import za.co.absa.atum.model.types.basic.AtumPartitions
import za.co.absa.atum.model.types.basic.AtumPartitionsOps
import za.co.absa.atum.model.utils.JsonSyntaxExtensions.JsonSerializationSyntax
import za.co.absa.atum.reader.basic.RequestResult.RequestResult
import RequestResult.RequestResult
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer this style to be explicit, WDYT?

import za.co.absa.atum.reader.core.RequestResult.RequestResult

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am also in favor of using fully qualified imports.

def groupBy[K](f: T => K): GroupedPage[K, T, F] = {
val (newItems, itemsCounts) = items.foldLeft(ListMap.empty[K, Vector[T]], 0) { case ((groupsAcc, count), item) =>
val k = f(item)
(groupsAcc.updated(k, groupsAcc.getOrElse(k, Vector.empty) :+ item), count + 1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider extracting groupsAcc.getOrElse(k, Vector.empty) into a val, so that this line doesn't do so much

}

def groupBy[K](f: T => K): GroupedPage[K, T, F] = {
val (newItems, itemsCounts) = items.foldLeft(ListMap.empty[K, Vector[T]], 0) { case ((groupsAcc, count), item) =>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm any plans with the itemsCounts ? It looks potentially useful but it's nowhere used yet. Alternatively you can just delete and simplify the whole foldLeft block

BTW, if you decide to use and keep it, I'd propose to rename count to countAcc, just as a common practice (at least mine :D )

@@ -38,7 +39,7 @@ class PartitioningIdProviderUnitTests extends AnyFunSuiteLike {
private val atumPartitionsToNotFound = AtumPartitions(List.empty)

private implicit val serverConfig: ServerConfig = ServerConfig(serverUrl)
private implicit val monad: IdMonad.type = IdMonad
private implicit val monad: MonadError[Identity] = IdMonad
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very clever-the whole file, just read it for the first time

Copy link
Collaborator

@lsulak lsulak Feb 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why ReaderWithPartitioningIdForTest(partitioning: AtumPartitions) must have the partitioning in constructor? It seems like unnecessary duplicity potentially

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also in line 71:

val result: Long = response.getOrElse(throw new Exception("Failed to get partitioning id"))

I think in tests this is unnecessary, I'd just do something like: val result: Long = response.get

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's test that simulates something in well-isolated and very specific & deterministic scenario

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Line 91: isn't this a bit more readable?

result.left.map(e => assert(e.isInstanceOf[ParsingException])) instead of swap

TestItem(4, "cc")
)

def roller(limit: Int, offset: Long): RequestResult[Page[TestItem, Identity]] = {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't you want to make this function more universal? It only works with offset 0 or 6

assert(!page.hasPrior)

val anotherPage = new AbstractPage[Iterable[Int], Identity] {
override def items: Iterable[Int] = Seq(1, 2, 3)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking whether this logically makes sense. I understand it, pages can overlap, okay, for the test cases let's be it, but the page starts at 1 and ends at 2, so it should have only Seq(2, 3) right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd propose to implement logically correct tests so that they are understood & the intention for the code under test is well understood - tests are a sort of a documentation as well

@@ -0,0 +1,179 @@
/*
* Copyright 2025 ABSA Group Limited
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2021

@@ -0,0 +1,89 @@
/*
* Copyright 2025 ABSA Group Limited
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2021

@@ -0,0 +1,51 @@
/*
* Copyright 2025 ABSA Group Limited
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2021

@@ -0,0 +1,37 @@
/*
* Copyright 2025 ABSA Group Limited
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2021

@@ -0,0 +1,72 @@
/*
* Copyright 2025 ABSA Group Limited
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2021

hasNext = true,
limit = 6,
pageStart = 0,
pageEnd = 5
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logically speaking, what is pageEnd here, what does it represent? Number of all items in the ListMap, or just number of 'keys', i.e. 3 or 6 ? (minus 1 since we index from 0, so 2 or 5) ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the overall number of items so that this is correct, just wanted to double check

pageEnd = 5
)

val Right(priorPage2) = nextPage2.prior(7)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you test scenrio when the limit is for example 4?

5 -> Vector(PageData.TestItem(5, "dd"))
),
hasNext = false,
limit = 6,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this logically correct? Number of items is higher than 6

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Implement basics of FlowReader
3 participants