Skip to content

Commit

Permalink
* small fixes
Browse files Browse the repository at this point in the history
* added + function to `Page` and `GroupedPage` classes
  • Loading branch information
benedeki committed Dec 11, 2024
1 parent 67ffe07 commit 09e2ed8
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@ case class GroupedPage[K, V, F[_]: MonadError](
hasNext: Boolean,
limit: Int,
offset: Long,
override val pageSize: Int,
private[reader] val pageRoller: GroupPageRoller[K, V, F]
) extends AbstractPage[Map[K, Vector[V]], F] {

def apply(key: K): Vector[V] = items(key)
def keys: Iterable[K] = items.keys
def groupCount: Int = items.size

def map[K1, V1](f: ((K, Vector[V])) => (K1, Vector[V1])): GroupedPage[K1, V1, F] = {
val newItems = items.map(f)
Expand Down Expand Up @@ -86,6 +89,13 @@ case class GroupedPage[K, V, F[_]: MonadError](
}

def next: F[RequestResult[GroupedPage[K, V, F]]] = next(limit)

def +(other: GroupedPage[K, V, F]): GroupedPage[K, V, F] = {
val newItems = items ++ other.items
val newOffset = offset min other.offset
val newPageSize = pageSize + other.pageSize
this.copy(items = newItems, offset = newOffset, pageSize = newPageSize)
}
}

object GroupedPage {
Expand Down
18 changes: 12 additions & 6 deletions reader/src/main/scala/za/co/absa/atum/reader/result/Page.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import sttp.monad.MonadError
import sttp.monad.syntax._
import za.co.absa.atum.reader.basic.RequestResult.{RequestFail, RequestPageResultOps, RequestResult}
import za.co.absa.atum.reader.exceptions.RequestException.NoDataException
import za.co.absa.atum.reader.implicits.VectorImplicits.VectorEnhancements
import za.co.absa.atum.reader.result.GroupedPage.GroupPageRoller
import za.co.absa.atum.reader.result.Page.PageRoller

Expand All @@ -43,9 +42,9 @@ case class Page[T, F[_]: MonadError](
}

def groupBy[K](f: T => K): GroupedPage[K, T, F] = {
val newItems = items.foldLeft(ListMap.empty[K, Vector[T]]) { (acc, x) =>
val k = f(x)
acc.updated(k, acc.getOrElse(k, Vector.empty) :+ x)
val (newItems, itemsCounts) = items.foldLeft(ListMap.empty[K, Vector[T]], 0) { case ((groupsAcc, count), item) =>
val k = f(item)
(groupsAcc.updated(k, groupsAcc.getOrElse(k, Vector.empty) :+ item), count + 1)
}
val newPageRoller: GroupPageRoller[K, T, F] = (limit, offset) =>
pageRoller(limit, offset)
Expand All @@ -56,13 +55,14 @@ case class Page[T, F[_]: MonadError](
hasNext,
limit,
offset,
itemsCounts,
newPageRoller
)
}

def prior(newPageSize: Int): F[RequestResult[Page[T, F]]] = {
if (hasPrior) {
val newOffset = (offset - limit).max(0)
val newOffset = (offset - newPageSize).max(0)
pageRoller(newPageSize, newOffset)
} else {
MonadError[F].unit(RequestFail(NoDataException("No prior page")))
Expand All @@ -73,13 +73,19 @@ case class Page[T, F[_]: MonadError](

def next(newPageSize: Int): F[RequestResult[Page[T, F]]] = {
if (hasNext) {
pageRoller(newPageSize, offset + limit)
pageRoller(newPageSize, offset + pageSize)
} else {
MonadError[F].unit(RequestFail(NoDataException("No next page")))
}
}

def next: F[RequestResult[Page[T, F]]] = next(limit)

def +(other: Page[T, F]): Page[T, F] = {
val newItems = items ++ other.items
val newOffset = offset min other.offset
this.copy(items = newItems, offset = newOffset)
}
}

object Page {
Expand Down

0 comments on commit 09e2ed8

Please sign in to comment.