diff --git a/database/src/main/postgres/flows/V1.3.5___add_to_parent_flows.sql b/database/src/main/postgres/flows/V1.3.5___add_to_parent_flows.sql index a117a323a..d424ebf9f 100644 --- a/database/src/main/postgres/flows/V1.3.5___add_to_parent_flows.sql +++ b/database/src/main/postgres/flows/V1.3.5___add_to_parent_flows.sql @@ -19,8 +19,7 @@ CREATE OR REPLACE FUNCTION flows._add_to_parent_flows( IN i_fk_partitioning BIGINT, IN i_by_user TEXT, OUT status INTEGER, - OUT status_text TEXT, - OUT id_flow BIGINT + OUT status_text TEXT ) RETURNS record AS $$ ------------------------------------------------------------------------------- diff --git a/database/src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql b/database/src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql new file mode 100644 index 000000000..6ef71fd94 --- /dev/null +++ b/database/src/main/postgres/flows/V1.9.8__get_flow_partitionings.sql @@ -0,0 +1,97 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE OR REPLACE FUNCTION flows.get_flow_partitionings( + IN i_flow_id BIGINT, + IN i_limit INT DEFAULT 5, + IN i_offset BIGINT DEFAULT 0, + OUT status INTEGER, + OUT status_text TEXT, + OUT id BIGINT, + OUT partitioning JSONB, + OUT author TEXT, + OUT has_more BOOLEAN +) RETURNS SETOF record AS +------------------------------------------------------------------------------- +-- +-- Function: flows.get_flow_partitionings(3) +-- Retrieves all partitionings associated with the input flow. +-- +-- Note: partitionings will be retrieved in ordered fashion, by created_at column from runs.partitionings table +-- +-- Parameters: +-- i_flow_id - flow id to use for identifying the partitionings that will be retrieved +-- i_limit - (optional) maximum number of partitionings to return, default is 5 +-- i_offset - (optional) offset to use for pagination, default is 0 +-- +-- Returns: +-- status - Status code +-- status_text - Status text +-- id - ID of retrieved partitioning +-- partitioning - Partitioning value +-- author - Author of the partitioning +-- has_more - Flag indicating if there are more partitionings available +-- +-- Status codes: +-- 11 - OK +-- 41 - Flow not found +-- +------------------------------------------------------------------------------- +$$ +DECLARE + _has_more BOOLEAN; +BEGIN + PERFORM 1 FROM flows.flows WHERE id_flow = i_flow_id; + IF NOT FOUND THEN + status := 41; + status_text := 'Flow not found'; + RETURN NEXT; + RETURN; + END IF; + + IF i_limit IS NOT NULL THEN + SELECT count(*) > i_limit + FROM flows.partitioning_to_flow PTF + WHERE PTF.fk_flow = i_flow_id + LIMIT i_limit + 1 OFFSET i_offset + INTO _has_more; + ELSE + _has_more := false; + END IF; + + + RETURN QUERY + SELECT + 11 AS status, + 'OK' AS status_text, + P.id_partitioning, + P.partitioning, + P.created_by, + _has_more + FROM + runs.partitionings P INNER JOIN + flows.partitioning_to_flow PF ON PF.fk_partitioning = P.id_partitioning + WHERE + PF.fk_flow = i_flow_id + ORDER BY + P.id_partitioning, + P.created_at DESC + LIMIT i_limit OFFSET i_offset; +END; +$$ +LANGUAGE plpgsql VOLATILE SECURITY DEFINER; + +GRANT EXECUTE ON FUNCTION flows.get_flow_partitionings(BIGINT, INT, BIGINT) TO atum_owner; diff --git a/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowPartitioningsIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowPartitioningsIntegrationTests.scala new file mode 100644 index 000000000..904cd42b9 --- /dev/null +++ b/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowPartitioningsIntegrationTests.scala @@ -0,0 +1,193 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.database.flows + +import io.circe.Json +import io.circe.parser.parse +import za.co.absa.balta.DBTestSuite +import za.co.absa.balta.classes.JsonBString + +class GetFlowPartitioningsIntegrationTests extends DBTestSuite { + + private val getFlowPartitioningsFn = "flows.get_flow_partitionings" + private val createFlowFn = "flows._create_flow" + private val addToParentFlowsFn = "flows._add_to_parent_flows" + + private val partitioningsTable = "runs.partitionings" + + private val partitioning1 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyA", "keyB", "keyC"], + | "keysToValues": { + | "keyA": "valueA", + | "keyB": "valueB", + | "keyC": "valueC" + | } + |} + |""".stripMargin + ) + + private val partitioning1Parent = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyA", "keyB"], + | "keysToValues": { + | "keyA": "valueA", + | "keyB": "valueB" + | } + |} + |""".stripMargin + ) + + private val partitioning2 = JsonBString( + """ + |{ + | "version": 1, + | "keys": ["keyD", "keyE", "keyF"], + | "keysToValues": { + | "keyD": "valueD", + | "keyE": "valueE", + | "keyF": "valueF" + | } + |} + |""".stripMargin + ) + + var flowIdOfPartitioning1: Long = _ + var flowIdOfParentPartitioning1: Long = _ + var flowIdOfPartitioning2: Long = _ + var flowIdOfPartitioning3: Long = _ + + test("Returns partitioning(s) for a given flow") { + table(partitioningsTable).insert(add("partitioning", partitioning1).add("created_by", "Joseph")) + table(partitioningsTable).insert(add("partitioning", partitioning1Parent).add("created_by", "Joseph")) + table(partitioningsTable).insert(add("partitioning", partitioning2).add("created_by", "Joseph")) + + val partId1: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning1, "id_partitioning").get.get + + val partId1Parent: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning1Parent, "id_partitioning").get.get + + val partId2: Long = table(partitioningsTable) + .fieldValue("partitioning", partitioning2, "id_partitioning").get.get + + function(createFlowFn) + .setParam("i_fk_partitioning", partId1) + .setParam("i_by_user", "Joseph") + .execute { queryResult => + flowIdOfPartitioning1 = queryResult.next().getLong("id_flow").get + } + + function(createFlowFn) + .setParam("i_fk_partitioning", partId1Parent) + .setParam("i_by_user", "Joseph") + .execute { queryResult => + flowIdOfParentPartitioning1 = queryResult.next().getLong("id_flow").get + } + + function(createFlowFn) + .setParam("i_fk_partitioning", partId2) + .setParam("i_by_user", "Joseph") + .execute { queryResult => + flowIdOfPartitioning2 = queryResult.next().getLong("id_flow").get + } + + function(addToParentFlowsFn) + .setParam("i_fk_parent_partitioning", partId1Parent) + .setParam("i_fk_partitioning", partId1) + .setParam("i_by_user", "Joseph") + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "Partitioning added to flows") + } + + function(getFlowPartitioningsFn) + .setParam("i_flow_id", flowIdOfPartitioning1) + .setParam("i_limit", 1) + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "OK") + assert(result1.getLong("id").get == partId1) + val expectedPartitioningJson = parseJsonBStringOrThrow(partitioning1) + val returnedPartitioningJson = parseJsonBStringOrThrow(result1.getJsonB("partitioning").get) + assert(expectedPartitioningJson == returnedPartitioningJson) + assert(!result1.getBoolean("has_more").get) + assert(!queryResult.hasNext) + } + + function(getFlowPartitioningsFn) + .setParam("i_flow_id", flowIdOfParentPartitioning1) + .setParam("i_limit", 1) // limit is set to 1, so only one partitioning should be returned and more data available + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "OK") + assert(result1.getLong("id").get == partId1) + val expectedPartitioningJson1 = parseJsonBStringOrThrow(partitioning1) + val returnedPartitioningJson1 = parseJsonBStringOrThrow(result1.getJsonB("partitioning").get) + assert(expectedPartitioningJson1 == returnedPartitioningJson1) + assert(result1.getBoolean("has_more").get) + assert(!queryResult.hasNext) + } + + function(getFlowPartitioningsFn) + .setParam("i_flow_id", flowIdOfParentPartitioning1) + .setParam("i_limit", 2) // limit is set to 2, so both partitionings should be returned and no more data available + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 11) + assert(result1.getString("status_text").get == "OK") + assert(result1.getLong("id").get == partId1) + val expectedPartitioningJson1 = parseJsonBStringOrThrow(partitioning1) + val returnedPartitioningJson1 = parseJsonBStringOrThrow(result1.getJsonB("partitioning").get) + assert(expectedPartitioningJson1 == returnedPartitioningJson1) + assert(!result1.getBoolean("has_more").get) + assert(queryResult.hasNext) + assert(queryResult.hasNext) + val result2 = queryResult.next() + assert(result2.getLong("id").get == partId1Parent) + val expectedPartitioningJson2 = parseJsonBStringOrThrow(partitioning1Parent) + val returnedPartitioningJson2 = parseJsonBStringOrThrow(result2.getJsonB("partitioning").get) + assert(expectedPartitioningJson2 == returnedPartitioningJson2) + assert(!result2.getBoolean("has_more").get) + assert(!queryResult.hasNext) + } + } + + test("Fails for non-existent flow"){ + function(getFlowPartitioningsFn) + .setParam("i_flow_id", 999999) + .setParam("i_limit", 1) + .execute { queryResult => + val result1 = queryResult.next() + assert(result1.getInt("status").get == 41) + assert(result1.getString("status_text").get == "Flow not found") + assert(!queryResult.hasNext) + } + } + + private def parseJsonBStringOrThrow(jsonBString: JsonBString): Json = { + parse(jsonBString.value).getOrElse(throw new Exception("Failed to parse JsonBString to Json")) + } + +} diff --git a/server/src/main/scala/za/co/absa/atum/server/Main.scala b/server/src/main/scala/za/co/absa/atum/server/Main.scala index cd60a204d..be3d7bc5a 100644 --- a/server/src/main/scala/za/co/absa/atum/server/Main.scala +++ b/server/src/main/scala/za/co/absa/atum/server/Main.scala @@ -17,7 +17,7 @@ package za.co.absa.atum.server import za.co.absa.atum.server.api.controller._ -import za.co.absa.atum.server.api.database.flows.functions.GetFlowCheckpoints +import za.co.absa.atum.server.api.database.flows.functions.{GetFlowCheckpoints, GetFlowPartitionings} import za.co.absa.atum.server.api.database.{PostgresDatabaseProvider, TransactorProvider} import za.co.absa.atum.server.api.database.runs.functions._ import za.co.absa.atum.server.api.http.Server @@ -63,6 +63,7 @@ object Main extends ZIOAppDefault with Server { GetPartitioningCheckpointV2.layer, GetFlowCheckpoints.layer, GetPartitioningById.layer, + GetFlowPartitionings.layer, PostgresDatabaseProvider.layer, TransactorProvider.layer, AwsSecretsProviderImpl.layer, diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala index db212a249..605e06140 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala @@ -18,7 +18,7 @@ package za.co.absa.atum.server.api.controller import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.model.ErrorResponse -import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse} +import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, PaginatedResponse, SingleSuccessResponse} import zio.IO import zio.macros.accessible @@ -46,4 +46,10 @@ trait PartitioningController { def getPartitioningMeasuresV2( partitioningId: Long ): IO[ErrorResponse, MultiSuccessResponse[MeasureDTO]] + + def getFlowPartitionings( + flowId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[ErrorResponse, PaginatedResponse[PartitioningWithIdDTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala index 6bac1662a..c450405e8 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala @@ -20,8 +20,8 @@ import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.api.exception.ServiceError import za.co.absa.atum.server.api.http.ApiPaths.V2Paths import za.co.absa.atum.server.api.service.PartitioningService -import za.co.absa.atum.server.model.{ErrorResponse, InternalServerErrorResponse} -import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse} +import za.co.absa.atum.server.model.{ErrorResponse, InternalServerErrorResponse, PaginatedResult} +import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, PaginatedResponse, SingleSuccessResponse} import zio._ class PartitioningControllerImpl(partitioningService: PartitioningService) @@ -105,6 +105,20 @@ class PartitioningControllerImpl(partitioningService: PartitioningService) ) } + override def getFlowPartitionings( + flowId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[ErrorResponse, PaginatedResponse[PartitioningWithIdDTO]] = { + mapToPaginatedResponse( + limit.get, + offset.get, + serviceCall[PaginatedResult[PartitioningWithIdDTO], PaginatedResult[PartitioningWithIdDTO]]( + partitioningService.getFlowPartitionings(flowId, limit, offset) + ) + ) + } + } object PartitioningControllerImpl { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowPartitionings.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowPartitionings.scala new file mode 100644 index 000000000..be56853dc --- /dev/null +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowPartitionings.scala @@ -0,0 +1,81 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.database.flows.functions + +import doobie.implicits.toSqlInterpolator +import io.circe.{DecodingFailure, Json} +import za.co.absa.atum.model.dto.{PartitioningDTO, PartitioningWithIdDTO} +import za.co.absa.atum.server.api.database.PostgresDatabaseProvider +import za.co.absa.atum.server.api.database.flows.Flows +import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings._ +import za.co.absa.db.fadb.DBSchema +import za.co.absa.db.fadb.doobie.DoobieEngine +import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWithAggStatus +import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator +import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling +import zio.{Task, URLayer, ZIO, ZLayer} +import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbGet + +import scala.annotation.tailrec + +class GetFlowPartitionings(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) + extends DoobieMultipleResultFunctionWithAggStatus[GetFlowPartitioningsArgs, Option[ + GetFlowPartitioningsResult + ], Task](args => + Seq( + fr"${args.flowId}", + fr"${args.limit}", + fr"${args.offset}" + ) + ) + with StandardStatusHandling + with ByFirstErrorStatusAggregator { + + override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq("id", "partitioning", "author", "has_more") +} + +object GetFlowPartitionings { + case class GetFlowPartitioningsArgs(flowId: Long, limit: Option[Int], offset: Option[Long]) + case class GetFlowPartitioningsResult(id: Long, partitioningJson: Json, author: String, hasMore: Boolean) + + object GetFlowPartitioningsResult { + + @tailrec def resultsToPartitioningWithIdDTOs( + results: Seq[GetFlowPartitioningsResult], + acc: Seq[PartitioningWithIdDTO] + ): Either[DecodingFailure, Seq[PartitioningWithIdDTO]] = { + if (results.isEmpty) Right(acc) + else { + val head = results.head + val tail = results.tail + val decodingResult = head.partitioningJson.as[PartitioningDTO] + decodingResult match { + case Left(decodingFailure) => Left(decodingFailure) + case Right(partitioningDTO) => + resultsToPartitioningWithIdDTOs(tail, acc :+ PartitioningWithIdDTO(head.id, partitioningDTO, head.author)) + } + } + } + + } + + val layer: URLayer[PostgresDatabaseProvider, GetFlowPartitionings] = ZLayer { + for { + dbProvider <- ZIO.service[PostgresDatabaseProvider] + } yield new GetFlowPartitionings()(Flows, dbProvider.dbEngine) + } +} diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala index 46f66d095..0f4febcc5 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala @@ -148,6 +148,19 @@ trait Endpoints extends BaseEndpoints { .errorOutVariantPrepend(notFoundErrorOneOfVariant) } + protected val getFlowPartitioningsEndpointV2 + : PublicEndpoint[(Long, Option[Int], Option[Long]), ErrorResponse, PaginatedResponse[ + PartitioningWithIdDTO + ], Any] = { + apiV2.get + .in(V2Paths.Flows / path[Long]("flowId") / V2Paths.Partitionings) + .in(query[Option[Int]]("limit").default(Some(10)).validateOption(Validator.inRange(1, 1000))) + .in(query[Option[Long]]("offset").default(Some(0L)).validateOption(Validator.min(0L))) + .out(statusCode(StatusCode.Ok)) + .out(jsonBody[PaginatedResponse[PartitioningWithIdDTO]]) + .errorOutVariantPrepend(notFoundErrorOneOfVariant) + } + protected val zioMetricsEndpoint: PublicEndpoint[Unit, Unit, String, Any] = { endpoint.get.in(ZioMetrics).out(stringBody) } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala index 452ff87ea..a2456c697 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala @@ -24,7 +24,7 @@ import sttp.tapir.server.http4s.ztapir.ZHttp4sServerInterpreter import sttp.tapir.server.interceptor.metrics.MetricsRequestInterceptor import sttp.tapir.swagger.bundle.SwaggerInterpreter import sttp.tapir.ztapir._ -import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataPatchDTO, CheckpointV2DTO} +import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataPatchDTO, CheckpointV2DTO, PartitioningWithIdDTO} import za.co.absa.atum.server.Constants.{SwaggerApiName, SwaggerApiVersion} import za.co.absa.atum.server.api.controller.{CheckpointController, FlowController, PartitioningController} import za.co.absa.atum.server.config.{HttpMonitoringConfig, JvmMonitoringConfig} @@ -93,6 +93,16 @@ trait Routes extends Endpoints with ServerOptions { createServerEndpoint(getFlowCheckpointsEndpointV2, FlowController.getFlowCheckpointsV2), createServerEndpoint(getPartitioningEndpointV2, PartitioningController.getPartitioningV2), createServerEndpoint(getPartitioningMeasuresEndpointV2, PartitioningController.getPartitioningMeasuresV2), + createServerEndpoint[ + (Long, Option[Int], Option[Long]), + ErrorResponse, + PaginatedResponse[PartitioningWithIdDTO] + ]( + getFlowPartitioningsEndpointV2, + { case (flowId: Long, limit: Option[Int], offset: Option[Long]) => + PartitioningController.getFlowPartitionings(flowId, limit, offset) + } + ), createServerEndpoint(healthEndpoint, (_: Unit) => ZIO.unit) ) ZHttp4sServerInterpreter[HttpEnv.Env](http4sServerOptions(metricsInterceptorOption)).from(endpoints).toRoutes @@ -111,7 +121,8 @@ trait Routes extends Endpoints with ServerOptions { getPartitioningCheckpointsEndpointV2, getPartitioningCheckpointEndpointV2, getFlowCheckpointsEndpointV2, - getPartitioningMeasuresEndpointV2 + getPartitioningMeasuresEndpointV2, + getFlowPartitioningsEndpointV2 ) ZHttp4sServerInterpreter[HttpEnv.Env](http4sServerOptions(None)) .from(SwaggerInterpreter().fromEndpoints[HttpEnv.F](endpoints, SwaggerApiName, SwaggerApiVersion)) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala index 88ca0dbbb..54ca4f700 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala @@ -18,6 +18,7 @@ package za.co.absa.atum.server.api.repository import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.api.exception.DatabaseError +import za.co.absa.atum.server.model.PaginatedResult import zio.IO import zio.macros.accessible @@ -44,6 +45,11 @@ trait PartitioningRepository { def getPartitioning(partitioningId: Long): IO[DatabaseError, PartitioningWithIdDTO] - def getPartitioningMeasuresById(partitioningId: Long): IO[DatabaseError, Seq[MeasureDTO]] + + def getFlowPartitionings( + flowId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[DatabaseError, PaginatedResult[PartitioningWithIdDTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala index c6363589b..37e6f81ef 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala @@ -17,6 +17,11 @@ package za.co.absa.atum.server.api.repository import za.co.absa.atum.model.dto._ +import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings +import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings.{ + GetFlowPartitioningsArgs, + GetFlowPartitioningsResult +} import za.co.absa.atum.server.api.database.runs.functions.CreateOrUpdateAdditionalData.CreateOrUpdateAdditionalDataArgs import za.co.absa.atum.server.api.database.runs.functions._ import za.co.absa.atum.server.api.exception.DatabaseError @@ -24,6 +29,7 @@ import za.co.absa.atum.server.model._ import zio._ import zio.interop.catz.asyncInstance import za.co.absa.atum.server.api.exception.DatabaseError.GeneralDatabaseError +import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore} class PartitioningRepositoryImpl( createPartitioningIfNotExistsFn: CreatePartitioningIfNotExists, @@ -33,7 +39,8 @@ class PartitioningRepositoryImpl( createOrUpdateAdditionalDataFn: CreateOrUpdateAdditionalData, getPartitioningByIdFn: GetPartitioningById, getPartitioningAdditionalDataV2Fn: GetPartitioningAdditionalDataV2, - getPartitioningMeasuresByIdFn: GetPartitioningMeasuresById + getPartitioningMeasuresByIdFn: GetPartitioningMeasuresById, + getFlowPartitioningsFn: GetFlowPartitionings ) extends PartitioningRepository with BaseRepository { @@ -108,6 +115,27 @@ class PartitioningRepositoryImpl( }) } + override def getFlowPartitionings( + flowId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[DatabaseError, PaginatedResult[PartitioningWithIdDTO]] = { + dbMultipleResultCallWithAggregatedStatus( + getFlowPartitioningsFn(GetFlowPartitioningsArgs(flowId, limit, offset)), + "getFlowPartitionings" + ).map(_.flatten) + .flatMap { partitioningResults => + ZIO + .fromEither(GetFlowPartitioningsResult.resultsToPartitioningWithIdDTOs(partitioningResults, Seq.empty)) + .mapBoth( + error => GeneralDatabaseError(error.getMessage), + partitionings => { + if (partitioningResults.nonEmpty && partitioningResults.head.hasMore) ResultHasMore(partitionings) + else ResultNoMore(partitionings) + } + ) + } + } } object PartitioningRepositoryImpl { @@ -119,7 +147,8 @@ object PartitioningRepositoryImpl { with CreateOrUpdateAdditionalData with GetPartitioningAdditionalDataV2 with GetPartitioningById - with GetPartitioningMeasuresById, + with GetPartitioningMeasuresById + with GetFlowPartitionings, PartitioningRepository ] = ZLayer { for { @@ -131,6 +160,7 @@ object PartitioningRepositoryImpl { getPartitioningById <- ZIO.service[GetPartitioningById] getPartitioningAdditionalDataV2 <- ZIO.service[GetPartitioningAdditionalDataV2] getPartitioningMeasuresV2 <- ZIO.service[GetPartitioningMeasuresById] + getFlowPartitionings <- ZIO.service[GetFlowPartitionings] } yield new PartitioningRepositoryImpl( createPartitioningIfNotExists, createPartitioning, @@ -139,7 +169,8 @@ object PartitioningRepositoryImpl { createOrUpdateAdditionalData, getPartitioningById, getPartitioningAdditionalDataV2, - getPartitioningMeasuresV2 + getPartitioningMeasuresV2, + getFlowPartitionings ) } } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala index 7779a710d..a9d03b798 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala @@ -18,6 +18,7 @@ package za.co.absa.atum.server.api.service import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.api.exception.ServiceError +import za.co.absa.atum.server.model.PaginatedResult import zio.IO import zio.macros.accessible @@ -43,4 +44,10 @@ trait PartitioningService { def getPartitioning(partitioningId: Long): IO[ServiceError, PartitioningWithIdDTO] def getPartitioningMeasuresById(partitioningId: Long): IO[ServiceError, Seq[MeasureDTO]] + + def getFlowPartitionings( + flowId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[ServiceError, PaginatedResult[PartitioningWithIdDTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala index 2525568f9..6d678e89d 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala @@ -19,6 +19,7 @@ package za.co.absa.atum.server.api.service import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.api.exception.ServiceError import za.co.absa.atum.server.api.repository.PartitioningRepository +import za.co.absa.atum.server.model.PaginatedResult import zio._ class PartitioningServiceImpl(partitioningRepository: PartitioningRepository) @@ -85,6 +86,16 @@ class PartitioningServiceImpl(partitioningRepository: PartitioningRepository) ) } + override def getFlowPartitionings( + flowId: Long, + limit: Option[Int], + offset: Option[Long] + ): IO[ServiceError, PaginatedResult[PartitioningWithIdDTO]] = { + repositoryCall( + partitioningRepository.getFlowPartitionings(flowId, limit, offset), + "getFlowPartitionings" + ) + } } object PartitioningServiceImpl { diff --git a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala index aca4c0784..db8c3c390 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala @@ -25,6 +25,7 @@ import java.util.UUID import MeasureResultDTO.TypedValue import io.circe.syntax.EncoderOps import za.co.absa.atum.model.ResultValueType +import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings.GetFlowPartitioningsResult trait TestData { @@ -75,6 +76,20 @@ trait TestData { author = "author" ) + protected val getFlowPartitioningsResult1: GetFlowPartitioningsResult = GetFlowPartitioningsResult( + id = 1111L, + partitioningJson = partitioningAsJson, + author = "author", + hasMore = false + ) + + protected val getFlowPartitioningsResult2: GetFlowPartitioningsResult = GetFlowPartitioningsResult( + id = 1111L, + partitioningJson = partitioningAsJson, + author = "author", + hasMore = true + ) + // Partitioning with ID DTO protected val partitioningWithIdDTO1: PartitioningWithIdDTO = PartitioningWithIdDTO( id = partitioningFromDB1.id, diff --git a/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala index 0e249faee..315c21e76 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala @@ -18,10 +18,11 @@ package za.co.absa.atum.server.api.controller import org.mockito.Mockito.{mock, when} import za.co.absa.atum.server.api.TestData -import za.co.absa.atum.server.api.exception.ServiceError.{ConflictServiceError, GeneralServiceError, NotFoundServiceError} +import za.co.absa.atum.server.api.exception.ServiceError._ import za.co.absa.atum.server.api.service.PartitioningService -import za.co.absa.atum.server.model.{ConflictErrorResponse, InternalServerErrorResponse, NotFoundErrorResponse} -import za.co.absa.atum.server.model.SuccessResponse.SingleSuccessResponse +import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore} +import za.co.absa.atum.server.model.SuccessResponse.{PaginatedResponse, SingleSuccessResponse} +import za.co.absa.atum.server.model._ import zio._ import zio.test.Assertion.failsWithA import zio.test._ @@ -68,6 +69,15 @@ object PartitioningControllerUnitTests extends ZIOSpecDefault with TestData { when(partitioningServiceMock.getPartitioning(99L)) .thenReturn(ZIO.fail(GeneralServiceError("boom!"))) + when(partitioningServiceMock.getFlowPartitionings(1L, Some(1), Some(0))) + .thenReturn(ZIO.succeed(ResultHasMore(Seq(partitioningWithIdDTO1)))) + when(partitioningServiceMock.getFlowPartitionings(2L, Some(1), Some(0))) + .thenReturn(ZIO.succeed(ResultNoMore(Seq(partitioningWithIdDTO1)))) + when(partitioningServiceMock.getFlowPartitionings(3L, Some(1), Some(0))) + .thenReturn(ZIO.fail(GeneralServiceError("boom!"))) + when(partitioningServiceMock.getFlowPartitionings(4L, Some(1), Some(0))) + .thenReturn(ZIO.fail(NotFoundServiceError("Flow not found"))) + private val partitioningServiceMockLayer = ZLayer.succeed(partitioningServiceMock) override def spec: Spec[TestEnvironment with Scope, Any] = { @@ -161,6 +171,32 @@ object PartitioningControllerUnitTests extends ZIOSpecDefault with TestData { failsWithA[NotFoundErrorResponse] ) } + ), + suite("GetFlowPartitioningsSuite")( + test("Returns expected PaginatedResponse[PartitioningWithIdDTO] with more data available") { + for { + result <- PartitioningController.getFlowPartitionings(1L, Some(1), Some(0)) + expected = PaginatedResponse(Seq(partitioningWithIdDTO1), Pagination(1, 0L, hasMore = true), uuid1) + actual = result.copy(requestId = uuid1) + } yield assertTrue(actual == expected) + }, + test("Returns expected PaginatedResponse[PartitioningWithIdDTO] with no more data available") { + for { + result <- PartitioningController.getFlowPartitionings(2L, Some(1), Some(0)) + expected = PaginatedResponse(Seq(partitioningWithIdDTO1), Pagination(1, 0L, hasMore = false), uuid1) + actual = result.copy(requestId = uuid1) + } yield assertTrue(actual == expected) + }, + test("Returns expected InternalServerErrorResponse when service call fails with GeneralServiceError") { + assertZIO(PartitioningController.getFlowPartitionings(3L, Some(1), Some(0)).exit)( + failsWithA[InternalServerErrorResponse] + ) + }, + test("Returns expected NotFoundErrorResponse when service call fails with NotFoundServiceError") { + assertZIO(PartitioningController.getFlowPartitionings(4L, Some(1), Some(0)).exit)( + failsWithA[NotFoundErrorResponse] + ) + } ) ).provide( PartitioningControllerImpl.layer, diff --git a/server/src/test/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowPartitioningsIntegrationTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowPartitioningsIntegrationTests.scala new file mode 100644 index 000000000..3760c62b4 --- /dev/null +++ b/server/src/test/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowPartitioningsIntegrationTests.scala @@ -0,0 +1,46 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.database.flows.functions + +import za.co.absa.atum.server.ConfigProviderTest +import za.co.absa.atum.server.api.TestTransactorProvider +import za.co.absa.atum.server.api.database.PostgresDatabaseProvider +import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings.GetFlowPartitioningsArgs +import za.co.absa.db.fadb.exceptions.DataNotFoundException +import za.co.absa.db.fadb.status.FunctionStatus +import zio.{Scope, ZIO} +import zio.test.{Spec, TestEnvironment, assertTrue} +import zio.interop.catz.asyncInstance + +object GetFlowPartitioningsIntegrationTests extends ConfigProviderTest { + + override def spec: Spec[Unit with TestEnvironment with Scope, Any] = { + suite("GetFlowPartitioningsIntegrationTests")( + test("Returns expected DataNotFoundException when flow not found") { + for { + getFlowPartitionings <- ZIO.service[GetFlowPartitionings] + result <- getFlowPartitionings(GetFlowPartitioningsArgs(0L, None, None)) + } yield assertTrue(result == Left(DataNotFoundException(FunctionStatus(41, "Flow not found")))) + } + ) + }.provide( + GetFlowPartitionings.layer, + PostgresDatabaseProvider.layer, + TestTransactorProvider.layerWithRollback + ) + +} diff --git a/server/src/test/scala/za/co/absa/atum/server/api/http/GetFlowPartitioningsV2EndpointUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/http/GetFlowPartitioningsV2EndpointUnitTests.scala new file mode 100644 index 000000000..06616eedd --- /dev/null +++ b/server/src/test/scala/za/co/absa/atum/server/api/http/GetFlowPartitioningsV2EndpointUnitTests.scala @@ -0,0 +1,118 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.http + +import org.mockito.Mockito.{mock, when} +import sttp.client3.circe.asJson +import sttp.client3.testing.SttpBackendStub +import sttp.client3.{UriContext, basicRequest} +import sttp.model.StatusCode +import sttp.tapir.server.stub.TapirStubInterpreter +import sttp.tapir.ztapir.{RIOMonadError, RichZEndpoint} +import za.co.absa.atum.model.dto.PartitioningWithIdDTO +import za.co.absa.atum.server.api.TestData +import za.co.absa.atum.server.api.controller.PartitioningController +import za.co.absa.atum.server.model.{InternalServerErrorResponse, NotFoundErrorResponse, Pagination} +import za.co.absa.atum.server.model.SuccessResponse.PaginatedResponse +import zio.test.Assertion.equalTo +import zio.test.{Spec, TestEnvironment, ZIOSpecDefault, assertZIO} +import zio.{Scope, ZIO, ZLayer} + +object GetFlowPartitioningsV2EndpointUnitTests extends ZIOSpecDefault with Endpoints with TestData { + + private val partitioningControllerMock = mock(classOf[PartitioningController]) + + when(partitioningControllerMock.getFlowPartitionings(1L, Some(1), Some(0))) + .thenReturn( + ZIO.succeed( + PaginatedResponse(Seq.empty, Pagination(1, 0, hasMore = true), uuid1) + ) + ) + when(partitioningControllerMock.getFlowPartitionings(2L, Some(1), Some(0))) + .thenReturn( + ZIO.fail( + NotFoundErrorResponse("flow not found") + ) + ) + when(partitioningControllerMock.getFlowPartitionings(3L, None, None)) + .thenReturn( + ZIO.fail( + InternalServerErrorResponse("internal server error") + ) + ) + + private val partitioningControllerMockLayer = ZLayer.succeed(partitioningControllerMock) + + private val getFlowPartitioningsServerEndpoint = + getFlowPartitioningsEndpointV2.zServerLogic({ case (flowId: Long, limit: Option[Int], offset: Option[Long]) => + PartitioningController.getFlowPartitionings(flowId, limit, offset) + }) + + override def spec: Spec[TestEnvironment with Scope, Any] = { + val backendStub = TapirStubInterpreter(SttpBackendStub.apply(new RIOMonadError[PartitioningController])) + .whenServerEndpoint(getFlowPartitioningsServerEndpoint) + .thenRunLogic() + .backend() + + suite("GetFlowPartitioningsV2EndpointSuite")( + test("Returns an expected PaginatedResponse") { + val request = basicRequest + .get(uri"http://localhost:8080/api/v2/flows/1/partitionings?limit=1&offset=0") + .response(asJson[PaginatedResponse[PartitioningWithIdDTO]]) + + val response = request + .send(backendStub) + + val body = response.map(_.body) + val statusCode = response.map(_.code) + + assertZIO(body <&> statusCode)( + equalTo( + Right(PaginatedResponse(Seq.empty[PartitioningWithIdDTO], Pagination(1, 0, hasMore = true), uuid1)), + StatusCode.Ok + ) + ) + }, + test("Returns a NotFoundErrorResponse") { + val request = basicRequest + .get(uri"http://localhost:8080/api/v2/flows/2/partitionings?limit=1&offset=0") + .response(asJson[NotFoundErrorResponse]) + + val response = request + .send(backendStub) + + val statusCode = response.map(_.code) + + assertZIO(statusCode)(equalTo(StatusCode.NotFound)) + }, + test("Returns an InternalServerErrorResponse") { + val request = basicRequest + .get(uri"http://localhost:8080/api/v2/flows/3/partitionings") + .response(asJson[InternalServerErrorResponse]) + + val response = request + .send(backendStub) + + val statusCode = response.map(_.code) + + assertZIO(statusCode)(equalTo(StatusCode.InternalServerError)) + } + ) + + }.provide(partitioningControllerMockLayer) + +} diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala index a56dd9604..23e147aaa 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala @@ -19,17 +19,20 @@ package za.co.absa.atum.server.api.repository import org.mockito.Mockito.{mock, when} import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataItemDTO, PartitioningWithIdDTO} import za.co.absa.atum.server.api.TestData +import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings +import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings.GetFlowPartitioningsArgs import za.co.absa.atum.server.api.database.runs.functions.CreateOrUpdateAdditionalData.CreateOrUpdateAdditionalDataArgs import za.co.absa.atum.server.api.database.runs.functions._ import za.co.absa.atum.server.api.exception.DatabaseError import za.co.absa.atum.server.api.exception.DatabaseError._ +import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore} import za.co.absa.db.fadb.exceptions.{DataConflictException, DataNotFoundException, ErrorInDataException} import za.co.absa.db.fadb.status.{FunctionStatus, Row} import zio._ import zio.interop.catz.asyncInstance import zio.test.Assertion.failsWithA import zio.test._ -import za.co.absa.atum.server.model.{AdditionalDataFromDB, AdditionalDataItemFromDB} +import za.co.absa.atum.server.model.{AdditionalDataFromDB, AdditionalDataItemFromDB, PaginatedResult} object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { @@ -138,6 +141,20 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { private val getPartitioningMeasuresV2MockLayer = ZLayer.succeed(getPartitioningMeasuresV2Mock) + private val getFlowPartitioningsMock = mock(classOf[GetFlowPartitionings]) + + when(getFlowPartitioningsMock.apply(GetFlowPartitioningsArgs(1L, Some(10), Some(0))) + ).thenReturn(ZIO.right(Seq(Row(FunctionStatus(11, "OK"), Some(getFlowPartitioningsResult1))))) + when(getFlowPartitioningsMock.apply(GetFlowPartitioningsArgs(2L, Some(10), Some(0))) + ).thenReturn(ZIO.right(Seq(Row(FunctionStatus(11, "OK"), Some(getFlowPartitioningsResult2))))) + when(getFlowPartitioningsMock.apply(GetFlowPartitioningsArgs(0L, None, None))) + .thenReturn(ZIO.left(DataNotFoundException(FunctionStatus(41, "Flow not found")))) + when(getFlowPartitioningsMock.apply(GetFlowPartitioningsArgs(3L, Some(10), Some(0))) + ).thenReturn(ZIO.fail(new Exception("boom!"))) + + private val getFlowPartitioningsMockLayer = ZLayer.succeed(getFlowPartitioningsMock) + + override def spec: Spec[TestEnvironment with Scope, Any] = { suite("PartitioningRepositorySuite")( @@ -295,6 +312,28 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { failsWithA[GeneralDatabaseError] ) } + ), + suite("GetFlowPartitioningsSuite")( + test("Returns expected ResultNoMore[PartitioningWithIdDTO]") { + for { + result <- PartitioningRepository.getFlowPartitionings(1L, Some(10), Some(0)) + } yield assertTrue(result == ResultNoMore(Seq(partitioningWithIdDTO1))) + }, + test("Returns expected ResultHasMore[PartitioningWithIdDTO]") { + for { + result <- PartitioningRepository.getFlowPartitionings(2L, Some(10), Some(0)) + } yield assertTrue(result == ResultHasMore(Seq(partitioningWithIdDTO1))) + }, + test("Returns expected NotFoundDatabaseError") { + assertZIO(PartitioningRepository.getFlowPartitionings(0L, None, None).exit)( + failsWithA[NotFoundDatabaseError] + ) + }, + test("Returns expected GeneralDatabaseError") { + assertZIO(PartitioningRepository.getFlowPartitionings(3L, Some(10), Some(0)).exit)( + failsWithA[GeneralDatabaseError] + ) + } ) ).provide( PartitioningRepositoryImpl.layer, @@ -305,7 +344,8 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { createOrUpdateAdditionalDataMockLayer, getPartitioningByIdMockLayer, getPartitioningAdditionalDataV2MockLayer, - getPartitioningMeasuresV2MockLayer + getPartitioningMeasuresV2MockLayer, + getFlowPartitioningsMockLayer ) } diff --git a/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala index 6776dde90..3e647d5a1 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala @@ -23,6 +23,7 @@ import za.co.absa.atum.server.api.exception.DatabaseError._ import za.co.absa.atum.server.api.exception.ServiceError import za.co.absa.atum.server.api.exception.ServiceError._ import za.co.absa.atum.server.api.repository.PartitioningRepository +import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore} import zio.test.Assertion.failsWithA import zio.test._ import zio._ @@ -79,6 +80,15 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { when(partitioningRepositoryMock.getPartitioningMeasuresById(3L)) .thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) + when(partitioningRepositoryMock.getFlowPartitionings(1L, Some(1), Some(1L))) + .thenReturn(ZIO.succeed(ResultHasMore(Seq(partitioningWithIdDTO1)))) + when(partitioningRepositoryMock.getFlowPartitionings(2L, Some(1), Some(1L))) + .thenReturn(ZIO.succeed(ResultNoMore(Seq(partitioningWithIdDTO1)))) + when(partitioningRepositoryMock.getFlowPartitionings(3L, Some(1), Some(1L))) + .thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) + when(partitioningRepositoryMock.getFlowPartitionings(4L, Some(1), Some(1L))) + .thenReturn(ZIO.fail(NotFoundDatabaseError("Flow not found"))) + private val partitioningRepositoryMockLayer = ZLayer.succeed(partitioningRepositoryMock) override def spec: Spec[TestEnvironment with Scope, Any] = { @@ -218,11 +228,32 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { failsWithA[GeneralServiceError] ) } + ), + suite("GetFlowPartitioningsSuite")( + test("Returns expected Right with ResultHasMore[PartitioningWithIdDTO]") { + for { + result <- PartitioningService.getFlowPartitionings(1L, Some(1), Some(1L)) + } yield assertTrue(result == ResultHasMore(Seq(partitioningWithIdDTO1))) + }, + test("Returns expected Right with ResultNoMore[PartitioningWithIdDTO]") { + for { + result <- PartitioningService.getFlowPartitionings(2L, Some(1), Some(1L)) + } yield assertTrue(result == ResultNoMore(Seq(partitioningWithIdDTO1))) + }, + test("Returns expected GeneralServiceError when database error occurs") { + assertZIO(PartitioningService.getFlowPartitionings(3L, Some(1), Some(1L)).exit)( + failsWithA[GeneralServiceError] + ) + }, + test("Returns expected NotFoundServiceError when flow doesn't exist") { + assertZIO(PartitioningService.getFlowPartitionings(4L, Some(1), Some(1L)).exit)( + failsWithA[NotFoundServiceError] + ) + } ) - ).provide( - PartitioningServiceImpl.layer, - partitioningRepositoryMockLayer ) - - } + }.provide( + PartitioningServiceImpl.layer, + partitioningRepositoryMockLayer + ) }