Skip to content

Commit

Permalink
* Flow reader methods to read checkpoints
Browse files Browse the repository at this point in the history
* `Page` class to nicely handle paginated results
* `GroupedPage` class to handle paginated results that can be grouped
  • Loading branch information
benedeki committed Dec 11, 2024
1 parent 5dfe5c5 commit 67ffe07
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 18 deletions.
12 changes: 6 additions & 6 deletions reader/src/main/scala/za/co/absa/atum/reader/FlowReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@ import sttp.monad.syntax._
import za.co.absa.atum.model.dto.{CheckpointWithPartitioningDTO, FlowDTO}
import za.co.absa.atum.model.envelopes.SuccessResponse.{PaginatedResponse, SingleSuccessResponse}
import za.co.absa.atum.model.types.basic.{AtumPartitions, PartitioningDTOOps}
import za.co.absa.atum.reader.basic.RequestResult.RequestResult
import za.co.absa.atum.reader.basic.RequestResult.{RequestPageResultOps, RequestResult}
import za.co.absa.atum.reader.basic.{PartitioningIdProvider, Reader}
import za.co.absa.atum.model.ApiPaths._
import za.co.absa.atum.model.types.{AtumPartitionsCheckpoint, Checkpoint}
import za.co.absa.atum.reader.implicits.PaginatedResponseImplicits.PaginatedResponseMonadEnhancements
import za.co.absa.atum.reader.implicits.EitherImplicits.EitherMonadEnhancements
import za.co.absa.atum.reader.implicits.PaginatedResponseImplicits
import za.co.absa.atum.reader.result.Page
import za.co.absa.atum.reader.result.Page.PageRoller
import za.co.absa.atum.reader.server.ServerConfig
import za.co.absa.atum.reader.basic.RequestResult.RequestPageResultOps
import za.co.absa.atum.reader.result.Page.PageRoller

/**
* This class is a reader that reads data tight to a flow.
Expand Down Expand Up @@ -62,7 +62,7 @@ class FlowReader[F[_]](val mainFlowPartitioning: AtumPartitions)
val params = Map(
"limit" -> pageSize.toString,
"offset" -> offset.toString
) ++ checkpointName.map(("checkpoint-name" -> _))
) ++ checkpointName.map("checkpoint-name" -> _)
getQuery(endpoint, params)
}

Expand All @@ -83,10 +83,10 @@ class FlowReader[F[_]](val mainFlowPartitioning: AtumPartitions)
val checkpoint = Checkpoint(data)
AtumPartitionsCheckpoint(atumPartitions, checkpoint)
}
geetCheckpointDTOs(None, pageSize, offset).map(_.pageMap(checkpointMapper))
geetCheckpointDTOs(None, pageSize, offset).map(_.pageMap((checkpointMapper)))
}

def getCheckpointsOfName(name: String, pageSize: Int = 10, offset: Int = 0) = {
def getCheckpointsOfName(name: String, pageSize: Int = 10, offset: Int = 0): F[RequestResult[Page[CheckpointWithPartitioningDTO, F]]] = {
geetCheckpointDTOs(Some(name), pageSize, offset)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import sttp.monad.MonadError
import za.co.absa.atum.model.envelopes.ErrorResponse
import za.co.absa.atum.reader.exceptions.RequestException.{CirceError, HttpException, ParsingException}
import za.co.absa.atum.reader.exceptions.{ReaderException, RequestException}
import za.co.absa.atum.reader.result.Page
import za.co.absa.atum.reader.result.{GroupedPage, Page}

object RequestResult {
type RequestResult[R] = Either[RequestException, R]
Expand All @@ -45,4 +45,5 @@ object RequestResult {
implicit class RequestPageResultOps[A, F[_]: MonadError](requestResult: RequestResult[Page[A, F]]) {
def pageMap[B](f: A => B): RequestResult[Page[B, F]] = requestResult.map(_.map(f))
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.reader.result

import sttp.monad.MonadError
import za.co.absa.atum.reader.basic.RequestResult.RequestResult

abstract class AbstractPage [T <: Iterable[_], F[_]: MonadError] {
def items: T
def hasNext: Boolean
def limit: Int
def offset: Long

def pageSize: Int = items.size
def hasPrior: Boolean = offset > 0
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2024 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.reader.result

import sttp.monad.MonadError
import sttp.monad.syntax._
import za.co.absa.atum.reader.basic.RequestResult.{RequestFail, RequestResult}
import za.co.absa.atum.reader.exceptions.RequestException.NoDataException
import za.co.absa.atum.reader.result.GroupedPage.GroupPageRoller
import za.co.absa.atum.reader.result.Page.PageRoller

import scala.collection.immutable.ListMap

case class GroupedPage[K, V, F[_]: MonadError](
items: ListMap[K, Vector[V]],
hasNext: Boolean,
limit: Int,
offset: Long,
private[reader] val pageRoller: GroupPageRoller[K, V, F]
) extends AbstractPage[Map[K, Vector[V]], F] {

def apply(key: K): Vector[V] = items(key)

def map[K1, V1](f: ((K, Vector[V])) => (K1, Vector[V1])): GroupedPage[K1, V1, F] = {
val newItems = items.map(f)
val newPageRoller: GroupPageRoller[K1, V1, F] = (limit, offset) => pageRoller(limit, offset).map(_.map(_.map(f)))
this.copy(items = newItems, pageRoller = newPageRoller)
}

def mapValues[B](f: V => B): GroupedPage[K, B, F] = {
def mapper(item: (K, Vector[V])): (K, Vector[B]) = (item._1, item._2.map(f))

val newItems = items.map(mapper)
val newPageRoller: GroupPageRoller[K, B, F] = (limit, offset) => pageRoller(limit, offset).map(_.map(_.mapValues(f)))
this.copy(items = newItems, pageRoller = newPageRoller)

}

def flatten: Page[V, F] = {
val newItems = items.values.flatten.toVector
val newPageRoller: PageRoller[V, F] = (limit, offset) => pageRoller(limit, offset).map(_.map(_.flatten))
Page(
items = newItems,
hasNext = hasNext,
limit = limit,
offset = offset,
pageRoller = newPageRoller
)
}

def flatMap[K1, T](f: ((K, Vector[V])) => (K1, Vector[T])): Page[T, F] = {
map(f).flatten
}

def prior(newPageSize: Int): F[RequestResult[GroupedPage[K, V, F]]] = {
if (hasPrior) {
val newOffset = (offset - limit).max(0)
pageRoller(newPageSize, newOffset)
} else {
MonadError[F].unit(RequestFail(NoDataException("No prior page")))
}
}

def prior(): F[RequestResult[GroupedPage[K, V, F]]] = prior(limit)

def next(newPageSize: Int): F[RequestResult[GroupedPage[K, V, F]]] = {
if (hasNext) {
pageRoller(newPageSize, offset + limit)
} else {
MonadError[F].unit(RequestFail(NoDataException("No next page")))
}
}

def next: F[RequestResult[GroupedPage[K, V, F]]] = next(limit)
}

object GroupedPage {
type GroupPageRoller[K, V, F[_]] = (Int, Long) => F[RequestResult[GroupedPage[K, V, F]]]
}
34 changes: 23 additions & 11 deletions reader/src/main/scala/za/co/absa/atum/reader/result/Page.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,47 @@ package za.co.absa.atum.reader.result

import sttp.monad.MonadError
import sttp.monad.syntax._
import za.co.absa.atum.reader.basic.RequestResult.{RequestFail, RequestResult}
import za.co.absa.atum.reader.basic.RequestResult.{RequestFail, RequestPageResultOps, RequestResult}
import za.co.absa.atum.reader.exceptions.RequestException.NoDataException
import za.co.absa.atum.reader.implicits.VectorImplicits.VectorEnhancements
import za.co.absa.atum.reader.result.GroupedPage.GroupPageRoller
import za.co.absa.atum.reader.result.Page.PageRoller

import scala.collection.immutable.ListMap

case class Page[T, F[_]: MonadError](
items: Vector[T],
hasNext: Boolean,
limit: Int,
offset: Long,
private[reader] val pageRoller: PageRoller[T, F]
) {
) extends AbstractPage[Vector[T], F] {

def apply(index: Int): T = items(index)

def map[B](f: T => B): Page[B, F] = {
val newItems = items.map(f)
val newPageRoller: PageRoller[B, F] = (limit, offset) => pageRoller(limit, offset).map(_.map(_.map(f)))
val newPageRoller: PageRoller[B, F] = (limit, offset) => pageRoller(limit, offset).map(_.pageMap(f))
this.copy(items = newItems, pageRoller = newPageRoller)
}

// def flatMap[B](f: T => IterableOnce[B]): Page[B, F] = {
// val newItems = items.flatMap(f)
// ???
// TODO
// }

def pageSize: Int = items.size
def groupBy[K](f: T => K): GroupedPage[K, T, F] = {
val newItems = items.foldLeft(ListMap.empty[K, Vector[T]]) { (acc, x) =>
val k = f(x)
acc.updated(k, acc.getOrElse(k, Vector.empty) :+ x)
}
val newPageRoller: GroupPageRoller[K, T, F] = (limit, offset) =>
pageRoller(limit, offset)
.map(_.map(_.groupBy(f)))

def hasPrior: Boolean = offset > 0
GroupedPage(
newItems,
hasNext,
limit,
offset,
newPageRoller
)
}

def prior(newPageSize: Int): F[RequestResult[Page[T, F]]] = {
if (hasPrior) {
Expand Down

0 comments on commit 67ffe07

Please sign in to comment.