From 470e091007a4cc739bbe801a86c2fa0cf9904028 Mon Sep 17 00:00:00 2001 From: Ladislav Sulak Date: Fri, 14 Jun 2024 13:53:47 +0200 Subject: [PATCH] #188: new Flow service in API v2 (#195) * #188: adding new controller / service / repository Flow, with server support and new DTO --- .../flows/V1.9.1__get_flow_checkpoints.sql | 46 +++-- .../GetFlowCheckpointsIntegrationTests.scala | 4 + .../atum/model/dto/MeasureResultDTO.scala | 27 +++ project/Dependencies.scala | 4 +- .../za/co/absa/atum/server/Constants.scala | 14 +- .../scala/za/co/absa/atum/server/Main.scala | 9 +- .../api/controller/FlowController.scala | 28 +++ .../api/controller/FlowControllerImpl.scala | 46 +++++ .../controller/PartitioningController.scala | 12 +- .../PartitioningControllerImpl.scala | 4 +- .../server/api/database/DoobieImplicits.scala | 27 --- .../server/api/database/flows/Flows.scala | 22 ++ .../flows/functions/GetFlowCheckpoints.scala | 75 +++++++ .../atum/server/api/http/BaseEndpoints.scala | 8 + .../absa/atum/server/api/http/Endpoints.scala | 13 +- .../absa/atum/server/api/http/HttpEnv.scala | 4 +- .../co/absa/atum/server/api/http/Routes.scala | 10 +- .../api/repository/FlowRepository.scala | 28 +++ .../api/repository/FlowRepositoryImpl.scala | 39 ++++ .../repository/PartitioningRepository.scala | 2 +- .../PartitioningRepositoryImpl.scala | 4 +- .../atum/server/api/service/FlowService.scala | 27 +++ .../server/api/service/FlowServiceImpl.scala | 50 +++++ .../atum/server/model/CheckpointFromDB.scala | 10 +- .../atum/server/model/PlayJsonImplicits.scala | 7 +- .../za/co/absa/atum/server/api/TestData.scala | 188 +++++++++--------- ...la => CheckpointControllerUnitTests.scala} | 3 +- .../controller/FlowControllerUnitTests.scala | 59 ++++++ .../PartitioningControllerUnitTests.scala | 6 +- ...UpdateAdditionalDataIntegrationTests.scala | 5 +- ...titioningIfNotExistsIntegrationTests.scala | 5 +- .../WriteCheckpointIntegrationTests.scala | 5 +- .../api/http/BaseEndpointsUnitTest.scala | 85 ++++++++ ...> CreateCheckpointEndpointUnitTests.scala} | 4 +- ...CreatePartitioningEndpointUnitTests.scala} | 6 +- .../GetFlowCheckpointsEndpointUnitTests.scala | 98 +++++++++ ...la => CheckpointRepositoryUnitTests.scala} | 5 +- .../repository/FlowRepositoryUnitTests.scala | 58 ++++++ ... => PartitioningRepositoryUnitTests.scala} | 21 +- ...scala => CheckpointServiceUnitTests.scala} | 5 +- .../api/service/FlowServiceUnitTests.scala | 60 ++++++ .../PartitioningServiceUnitTests.scala | 9 +- .../aws/AwsSecretsProviderUnitTests.scala | 5 +- 43 files changed, 914 insertions(+), 233 deletions(-) create mode 100644 server/src/main/scala/za/co/absa/atum/server/api/controller/FlowController.scala create mode 100644 server/src/main/scala/za/co/absa/atum/server/api/controller/FlowControllerImpl.scala create mode 100644 server/src/main/scala/za/co/absa/atum/server/api/database/flows/Flows.scala create mode 100644 server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpoints.scala create mode 100644 server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepository.scala create mode 100644 server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepositoryImpl.scala create mode 100644 server/src/main/scala/za/co/absa/atum/server/api/service/FlowService.scala create mode 100644 server/src/main/scala/za/co/absa/atum/server/api/service/FlowServiceImpl.scala rename server/src/test/scala/za/co/absa/atum/server/api/controller/{CheckpointControllerIntegrationTests.scala => CheckpointControllerUnitTests.scala} (94%) create mode 100644 server/src/test/scala/za/co/absa/atum/server/api/controller/FlowControllerUnitTests.scala create mode 100644 server/src/test/scala/za/co/absa/atum/server/api/http/BaseEndpointsUnitTest.scala rename server/src/test/scala/za/co/absa/atum/server/api/http/{CreateCheckpointEndpointIntegrationTests.scala => CreateCheckpointEndpointUnitTests.scala} (95%) rename server/src/test/scala/za/co/absa/atum/server/api/http/{CreatePartitioningEndpointIntegrationTests.scala => CreatePartitioningEndpointUnitTests.scala} (95%) create mode 100644 server/src/test/scala/za/co/absa/atum/server/api/http/GetFlowCheckpointsEndpointUnitTests.scala rename server/src/test/scala/za/co/absa/atum/server/api/repository/{CheckpointRepositoryIntegrationTests.scala => CheckpointRepositoryUnitTests.scala} (92%) create mode 100644 server/src/test/scala/za/co/absa/atum/server/api/repository/FlowRepositoryUnitTests.scala rename server/src/test/scala/za/co/absa/atum/server/api/repository/{PartitioningRepositoryIntegrationTests.scala => PartitioningRepositoryUnitTests.scala} (89%) rename server/src/test/scala/za/co/absa/atum/server/api/service/{CheckpointServiceIntegrationTests.scala => CheckpointServiceUnitTests.scala} (92%) create mode 100644 server/src/test/scala/za/co/absa/atum/server/api/service/FlowServiceUnitTests.scala diff --git a/database/src/main/postgres/flows/V1.9.1__get_flow_checkpoints.sql b/database/src/main/postgres/flows/V1.9.1__get_flow_checkpoints.sql index da1a31df8..d3c90f411 100644 --- a/database/src/main/postgres/flows/V1.9.1__get_flow_checkpoints.sql +++ b/database/src/main/postgres/flows/V1.9.1__get_flow_checkpoints.sql @@ -15,18 +15,20 @@ */ CREATE OR REPLACE FUNCTION flows.get_flow_checkpoints( - IN i_partitioning_of_flow JSONB, - IN i_limit INT DEFAULT 5, - IN i_checkpoint_name TEXT DEFAULT NULL, - OUT status INTEGER, - OUT status_text TEXT, - OUT id_checkpoint UUID, - OUT checkpoint_name TEXT, - OUT measure_name TEXT, - OUT measured_columns TEXT[], - OUT measurement_value JSONB, - OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE, - OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE + IN i_partitioning_of_flow JSONB, + IN i_limit INT DEFAULT 5, + IN i_checkpoint_name TEXT DEFAULT NULL, + OUT status INTEGER, + OUT status_text TEXT, + OUT id_checkpoint UUID, + OUT checkpoint_name TEXT, + OUT author TEXT, + OUT measured_by_atum_agent BOOLEAN, + OUT measure_name TEXT, + OUT measured_columns TEXT[], + OUT measurement_value JSONB, + OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE, + OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE ) RETURNS SETOF record AS $$ ------------------------------------------------------------------------------- @@ -53,14 +55,17 @@ $$ -- specifying `i_checkpoint_name` parameter -- -- Returns: --- status - Status code --- status_text - Status text --- id_checkpoint - id of retrieved checkpoint --- checkpoint_name - name of retrieved checkpoint --- measure_name - measure name associated with a given checkpoint --- measured_columns - measure columns associated with a given checkpoint --- measurement_value - measurement details associated with a given checkpoint --- checkpoint_time - time +-- status - Status code +-- status_text - Status text +-- id_checkpoint - ID of retrieved checkpoint +-- checkpoint_name - Name of the retrieved checkpoint +-- author - Author of the checkpoint +-- measured_by_atum_agent - Flag indicating whether the checkpoint was measured by Atum Agent +-- (if false, data supplied manually) +-- measure_name - measure name associated with a given checkpoint +-- measured_columns - measure columns associated with a given checkpoint +-- measurement_value - measurement details associated with a given checkpoint +-- checkpoint_time - time -- -- Status codes: -- 11 - OK @@ -89,6 +94,7 @@ BEGIN RETURN QUERY SELECT 11 AS status, 'OK' AS status_text, CP.id_checkpoint, CP.checkpoint_name, + CP.created_by AS author, CP.measured_by_atum_agent, MD.measure_name, MD.measured_columns, M.measurement_value, CP.process_start_time AS checkpoint_start_time, CP.process_end_time AS checkpoint_end_time diff --git a/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowCheckpointsIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowCheckpointsIntegrationTests.scala index b879f86e8..ea9b7e6a1 100644 --- a/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowCheckpointsIntegrationTests.scala +++ b/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowCheckpointsIntegrationTests.scala @@ -229,6 +229,8 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { assert(row1.getString("status_text").contains("OK")) assert(row1.getUUID("id_checkpoint").contains(checkpointId)) assert(row1.getString("checkpoint_name").contains("CheckpointNameCntAndAvg")) + assert(row1.getString("author").contains("ObviouslySomeTest")) + assert(row1.getBoolean("measured_by_atum_agent").contains(true)) assert(row1.getOffsetDateTime("checkpoint_start_time").contains(startTime)) assert(row1.getOffsetDateTime("checkpoint_end_time").contains(endTime)) @@ -243,6 +245,8 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { assert(row2.getString("status_text").contains("OK")) assert(row2.getUUID("id_checkpoint").contains(checkpointId)) assert(row2.getString("checkpoint_name").contains("CheckpointNameCntAndAvg")) + assert(row1.getString("author").contains("ObviouslySomeTest")) + assert(row1.getBoolean("measured_by_atum_agent").contains(true)) assert(row2.getOffsetDateTime("checkpoint_start_time").contains(startTime)) assert(row2.getOffsetDateTime("checkpoint_end_time").contains(endTime)) diff --git a/model/src/main/scala/za/co/absa/atum/model/dto/MeasureResultDTO.scala b/model/src/main/scala/za/co/absa/atum/model/dto/MeasureResultDTO.scala index dd959b943..61b26cac6 100644 --- a/model/src/main/scala/za/co/absa/atum/model/dto/MeasureResultDTO.scala +++ b/model/src/main/scala/za/co/absa/atum/model/dto/MeasureResultDTO.scala @@ -16,6 +16,8 @@ package za.co.absa.atum.model.dto +import io.circe.{Decoder, Encoder} + case class MeasureResultDTO( mainValue: MeasureResultDTO.TypedValue, supportValues: Map[String, MeasureResultDTO.TypedValue] = Map.empty @@ -36,4 +38,29 @@ object MeasureResultDTO { case object Double extends ResultValueType } + + implicit val encodeResultValueType: Encoder[MeasureResultDTO.ResultValueType] = Encoder.encodeString.contramap { + case MeasureResultDTO.ResultValueType.String => "String" + case MeasureResultDTO.ResultValueType.Long => "Long" + case MeasureResultDTO.ResultValueType.BigDecimal => "BigDecimal" + case MeasureResultDTO.ResultValueType.Double => "Double" + } + + implicit val decodeResultValueType: Decoder[MeasureResultDTO.ResultValueType] = Decoder.decodeString.emap { + case "String" => Right(MeasureResultDTO.ResultValueType.String) + case "Long" => Right(MeasureResultDTO.ResultValueType.Long) + case "BigDecimal" => Right(MeasureResultDTO.ResultValueType.BigDecimal) + case "Double" => Right(MeasureResultDTO.ResultValueType.Double) + case other => Left(s"Cannot decode $other as ResultValueType") + } + + implicit val encodeTypedValue: Encoder[MeasureResultDTO.TypedValue] = + Encoder.forProduct2("value", "valueType")(tv => (tv.value, tv.valueType)) + + implicit val decodeTypedValue: Decoder[MeasureResultDTO.TypedValue] = + Decoder.forProduct2("value", "valueType")(MeasureResultDTO.TypedValue.apply) + + implicit val decodeMeasureResultDTO: Decoder[MeasureResultDTO] = + Decoder.forProduct2("mainValue", "supportValues")(MeasureResultDTO.apply) + } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 6f952da64..dfc97afe1 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -126,7 +126,7 @@ object Dependencies { json4sJackson, json4sNative, circeCore, - circeParser + circeParser, ) } @@ -252,7 +252,7 @@ object Dependencies { jsonSerdeDependencies(scalaVersion) } - def databaseDependencies: Seq[ModuleID] = { + def databaseDependencies: Seq[ModuleID] = { lazy val scalaTest = "org.scalatest" %% "scalatest" % Versions.scalatest % Test lazy val balta = "za.co.absa" %% "balta" % Versions.balta % Test diff --git a/server/src/main/scala/za/co/absa/atum/server/Constants.scala b/server/src/main/scala/za/co/absa/atum/server/Constants.scala index 51cbb07c0..6b200ae05 100644 --- a/server/src/main/scala/za/co/absa/atum/server/Constants.scala +++ b/server/src/main/scala/za/co/absa/atum/server/Constants.scala @@ -24,14 +24,18 @@ object Constants { final val V1 = "v1" final val V2 = "v2" - final val CreatePartitioning = "createPartitioning" - final val CreateOrUpdateAdditionalData = "createOrUpdateAdditionalData" - final val CreateCheckpoint = "createCheckpoint" - final val GetPartitioningCheckpoints = "getPartitioningCheckpoints" + // todo to uppercase no hyphen for v1, backward compatibility + // todo - better API paths?? + final val CreatePartitioning = "create-partitioning" + final val CreateCheckpoint = "create-checkpoint" + + final val CreateOrUpdateAdditionalData = "upsert-additional-data" + + final val GetPartitioningCheckpoints = "get-partitioning-checkpoints" + final val GetFlowCheckpoints = "get-flow-checkpoints" final val Health = "health" final val ZioMetrics = "zio-metrics" - } final val SwaggerApiName = "Atum API" diff --git a/server/src/main/scala/za/co/absa/atum/server/Main.scala b/server/src/main/scala/za/co/absa/atum/server/Main.scala index 09acb510d..34295349e 100644 --- a/server/src/main/scala/za/co/absa/atum/server/Main.scala +++ b/server/src/main/scala/za/co/absa/atum/server/Main.scala @@ -17,11 +17,12 @@ package za.co.absa.atum.server import za.co.absa.atum.server.api.controller._ +import za.co.absa.atum.server.api.database.flows.functions.GetFlowCheckpoints import za.co.absa.atum.server.api.database.{PostgresDatabaseProvider, TransactorProvider} import za.co.absa.atum.server.api.database.runs.functions._ import za.co.absa.atum.server.api.http.Server -import za.co.absa.atum.server.api.repository.{CheckpointRepositoryImpl, PartitioningRepositoryImpl} -import za.co.absa.atum.server.api.service.{CheckpointServiceImpl, PartitioningServiceImpl} +import za.co.absa.atum.server.api.repository.{CheckpointRepositoryImpl, FlowRepositoryImpl, PartitioningRepositoryImpl} +import za.co.absa.atum.server.api.service.{CheckpointServiceImpl, FlowServiceImpl, PartitioningServiceImpl} import za.co.absa.atum.server.aws.AwsSecretsProviderImpl import za.co.absa.atum.server.config.JvmMonitoringConfig import zio._ @@ -42,16 +43,20 @@ object Main extends ZIOAppDefault with Server { .provide( PartitioningControllerImpl.layer, CheckpointControllerImpl.layer, + FlowControllerImpl.layer, PartitioningServiceImpl.layer, CheckpointServiceImpl.layer, + FlowServiceImpl.layer, PartitioningRepositoryImpl.layer, CheckpointRepositoryImpl.layer, + FlowRepositoryImpl.layer, CreatePartitioningIfNotExists.layer, GetPartitioningMeasures.layer, GetPartitioningAdditionalData.layer, CreateOrUpdateAdditionalData.layer, GetPartitioningCheckpoints.layer, WriteCheckpoint.layer, + GetFlowCheckpoints.layer, PostgresDatabaseProvider.layer, TransactorProvider.layer, AwsSecretsProviderImpl.layer, diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowController.scala new file mode 100644 index 000000000..20783e9f4 --- /dev/null +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowController.scala @@ -0,0 +1,28 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.controller + +import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointQueryDTO} +import za.co.absa.atum.server.model.ErrorResponse.ErrorResponse +import za.co.absa.atum.server.model.SuccessResponse.MultiSuccessResponse +import zio.IO +import zio.macros.accessible + +@accessible +trait FlowController { + def getFlowCheckpointsV2(checkpointQueryDTO: CheckpointQueryDTO): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] +} diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowControllerImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowControllerImpl.scala new file mode 100644 index 000000000..23d12d8d2 --- /dev/null +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowControllerImpl.scala @@ -0,0 +1,46 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.controller + +import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointQueryDTO} +import za.co.absa.atum.server.api.service.FlowService +import za.co.absa.atum.server.model.ErrorResponse.ErrorResponse +import za.co.absa.atum.server.model.SuccessResponse.MultiSuccessResponse +import zio._ + +class FlowControllerImpl(flowService: FlowService) extends FlowController with BaseController { + + override def getFlowCheckpointsV2( + checkpointQueryDTO: CheckpointQueryDTO + ): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] = { + mapToMultiSuccessResponse( + serviceCall[Seq[CheckpointDTO], Seq[CheckpointDTO]]( + flowService.getFlowCheckpoints(checkpointQueryDTO), + identity + ) + ) + } + +} + +object FlowControllerImpl { + val layer: URLayer[FlowService, FlowController] = ZLayer { + for { + flowService <- ZIO.service[FlowService] + } yield new FlowControllerImpl(flowService) + } +} diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala index d8fd2669c..c5e7dc737 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningController.scala @@ -16,7 +16,13 @@ package za.co.absa.atum.server.api.controller -import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, AtumContextDTO, CheckpointDTO, CheckpointQueryDTO, PartitioningSubmitDTO} +import za.co.absa.atum.model.dto.{ + AdditionalDataSubmitDTO, + AtumContextDTO, + CheckpointDTO, + CheckpointQueryDTO, + PartitioningSubmitDTO +} import za.co.absa.atum.server.model.ErrorResponse.ErrorResponse import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse} import zio.IO @@ -36,5 +42,7 @@ trait PartitioningController { additionalData: AdditionalDataSubmitDTO ): IO[ErrorResponse, SingleSuccessResponse[AdditionalDataSubmitDTO]] - def getPartitioningCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] + def getPartitioningCheckpointsV2( + checkpointQueryDTO: CheckpointQueryDTO + ): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala index e9bbcef18..964d57634 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala @@ -66,13 +66,13 @@ class PartitioningControllerImpl(partitioningService: PartitioningService) ) } - override def getPartitioningCheckpoints( + override def getPartitioningCheckpointsV2( checkpointQueryDTO: CheckpointQueryDTO ): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] = { mapToMultiSuccessResponse( serviceCall[Seq[CheckpointDTO], Seq[CheckpointDTO]]( partitioningService.getPartitioningCheckpoints(checkpointQueryDTO), - checkpoints => checkpoints + identity ) ) } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/DoobieImplicits.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/DoobieImplicits.scala index a2360492a..a1131e65b 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/database/DoobieImplicits.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/DoobieImplicits.scala @@ -20,10 +20,8 @@ import cats.Show import cats.data.NonEmptyList import doobie.{Get, Put} import doobie.postgres.implicits._ -import io.circe.{Decoder, Encoder} import org.postgresql.jdbc.PgArray import org.postgresql.util.PGobject -import za.co.absa.atum.model.dto.MeasureResultDTO import scala.util.{Failure, Success, Try} @@ -155,31 +153,6 @@ object DoobieImplicits { } ) } - - } - - implicit val encodeResultValueType: Encoder[MeasureResultDTO.ResultValueType] = Encoder.encodeString.contramap { - case MeasureResultDTO.ResultValueType.String => "String" - case MeasureResultDTO.ResultValueType.Long => "Long" - case MeasureResultDTO.ResultValueType.BigDecimal => "BigDecimal" - case MeasureResultDTO.ResultValueType.Double => "Double" - } - - implicit val decodeResultValueType: Decoder[MeasureResultDTO.ResultValueType] = Decoder.decodeString.emap { - case "String" => Right(MeasureResultDTO.ResultValueType.String) - case "Long" => Right(MeasureResultDTO.ResultValueType.Long) - case "BigDecimal" => Right(MeasureResultDTO.ResultValueType.BigDecimal) - case "Double" => Right(MeasureResultDTO.ResultValueType.Double) - case other => Left(s"Cannot decode $other as ResultValueType") } - implicit val encodeTypedValue: Encoder[MeasureResultDTO.TypedValue] = - Encoder.forProduct2("value", "valueType")(tv => (tv.value, tv.valueType)) - - implicit val decodeTypedValue: Decoder[MeasureResultDTO.TypedValue] = - Decoder.forProduct2("value", "valueType")(MeasureResultDTO.TypedValue.apply) - - implicit val decodeMeasureResultDTO: Decoder[MeasureResultDTO] = - Decoder.forProduct2("mainValue", "supportValues")(MeasureResultDTO.apply) - } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/flows/Flows.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/flows/Flows.scala new file mode 100644 index 000000000..94ca4c13e --- /dev/null +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/flows/Flows.scala @@ -0,0 +1,22 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.database.flows + +import za.co.absa.fadb.DBSchema +import za.co.absa.fadb.naming.implementations.SnakeCaseNaming.Implicits._ + +object Flows extends DBSchema diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpoints.scala new file mode 100644 index 000000000..0b30ee8d3 --- /dev/null +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpoints.scala @@ -0,0 +1,75 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.database.flows.functions + +import doobie.Fragment +import doobie.implicits.toSqlInterpolator +import doobie.util.Read +import play.api.libs.json.Json +import za.co.absa.atum.model.dto.CheckpointQueryDTO +import za.co.absa.atum.server.api.database.PostgresDatabaseProvider +import za.co.absa.atum.server.api.database.flows.Flows +import za.co.absa.atum.server.model.{CheckpointFromDB, PartitioningForDB} +import za.co.absa.fadb.DBSchema +import za.co.absa.fadb.doobie.DoobieEngine +import za.co.absa.fadb.doobie.DoobieFunction.DoobieMultipleResultFunction +import zio._ +import zio.interop.catz._ + +import za.co.absa.atum.server.api.database.DoobieImplicits.Sequence.get + +import doobie.postgres.implicits._ +import doobie.postgres.circe.jsonb.implicits._ +import io.circe.syntax.EncoderOps +import io.circe.generic.auto._ + +class GetFlowCheckpoints(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) + extends DoobieMultipleResultFunction[CheckpointQueryDTO, CheckpointFromDB, Task] { + + override val fieldsToSelect: Seq[String] = Seq( + "id_checkpoint", + "checkpoint_name", + "author", + "measured_by_atum_agent", + "measure_name", "measured_columns", "measurement_value", + "checkpoint_start_time", "checkpoint_end_time", + ) + + override def sql(values: CheckpointQueryDTO)(implicit read: Read[CheckpointFromDB]): Fragment = { + val partitioning = PartitioningForDB.fromSeqPartitionDTO(values.partitioning) + val partitioningNormalized = Json.toJson(partitioning).toString + + sql"""SELECT ${Fragment.const(selectEntry)} + FROM ${Fragment.const(functionName)}( + ${ + import za.co.absa.atum.server.api.database.DoobieImplicits.Jsonb.jsonbPutUsingString + partitioningNormalized + }, + ${values.limit}, + ${values.checkpointName} + ) AS ${Fragment.const(alias)};""" + } + +} + +object GetFlowCheckpoints { + val layer: URLayer[PostgresDatabaseProvider, GetFlowCheckpoints] = ZLayer { + for { + dbProvider <- ZIO.service[PostgresDatabaseProvider] + } yield new GetFlowCheckpoints()(Flows, dbProvider.dbEngine) + } +} diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/BaseEndpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/BaseEndpoints.scala index 797183968..4f067fbc5 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/BaseEndpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/BaseEndpoints.scala @@ -70,4 +70,12 @@ trait BaseEndpoints { baseEndpoint.in(Api / V2) } + def pathToAPIv1CompatibleFormat(apiURLPath: String): String = { + // this is basically kebab-case/snake_case to camelCase + val inputParts = apiURLPath.split("[_-]") + + // Capitalize the first letter of each part except the first one (lowercase always) + inputParts.head.toLowerCase + inputParts.tail.map(_.capitalize).mkString("") + } + } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala index 9adbcd84b..339a0dded 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala @@ -32,7 +32,7 @@ trait Endpoints extends BaseEndpoints { protected val createCheckpointEndpointV1 : PublicEndpoint[CheckpointDTO, ErrorResponse, CheckpointDTO, Any] = { apiV1.post - .in(CreateCheckpoint) + .in(pathToAPIv1CompatibleFormat(CreateCheckpoint)) .in(jsonBody[CheckpointDTO]) .out(statusCode(StatusCode.Created)) .out(jsonBody[CheckpointDTO]) @@ -50,7 +50,7 @@ trait Endpoints extends BaseEndpoints { protected val createPartitioningEndpointV1 : PublicEndpoint[PartitioningSubmitDTO, ErrorResponse, AtumContextDTO, Any] = { apiV1.post - .in(CreatePartitioning) + .in(pathToAPIv1CompatibleFormat(CreatePartitioning)) .in(jsonBody[PartitioningSubmitDTO]) .out(statusCode(StatusCode.Ok)) .out(jsonBody[AtumContextDTO]) @@ -83,6 +83,15 @@ trait Endpoints extends BaseEndpoints { .out(jsonBody[MultiSuccessResponse[CheckpointDTO]]) } + protected val getFlowCheckpointsEndpointV2 + : PublicEndpoint[CheckpointQueryDTO, ErrorResponse, MultiSuccessResponse[CheckpointDTO], Any] = { + apiV2.post + .in(GetFlowCheckpoints) + .in(jsonBody[CheckpointQueryDTO]) + .out(statusCode(StatusCode.Ok)) + .out(jsonBody[MultiSuccessResponse[CheckpointDTO]]) + } + protected val zioMetricsEndpoint: PublicEndpoint[Unit, Unit, String, Any] = { endpoint.get.in(ZioMetrics).out(stringBody) } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/HttpEnv.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/HttpEnv.scala index 0484f8b59..30f69e792 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/HttpEnv.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/HttpEnv.scala @@ -16,13 +16,13 @@ package za.co.absa.atum.server.api.http -import za.co.absa.atum.server.api.controller.{CheckpointController, PartitioningController} +import za.co.absa.atum.server.api.controller.{CheckpointController, FlowController, PartitioningController} import zio.RIO import zio.metrics.connectors.prometheus.PrometheusPublisher object HttpEnv { - type Env = PartitioningController with CheckpointController with PrometheusPublisher + type Env = PartitioningController with CheckpointController with FlowController with PrometheusPublisher // naming effect types as `F` is a convention in Scala community type F[A] = RIO[Env, A] diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala index 70fdf940c..8f03690c1 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala @@ -25,7 +25,7 @@ import sttp.tapir.server.interceptor.metrics.MetricsRequestInterceptor import sttp.tapir.swagger.bundle.SwaggerInterpreter import sttp.tapir.ztapir._ import za.co.absa.atum.server.Constants.{SwaggerApiName, SwaggerApiVersion} -import za.co.absa.atum.server.api.controller.{CheckpointController, PartitioningController} +import za.co.absa.atum.server.api.controller.{CheckpointController, PartitioningController, FlowController} import za.co.absa.atum.server.config.{HttpMonitoringConfig, JvmMonitoringConfig} import zio._ import zio.interop.catz._ @@ -43,8 +43,9 @@ trait Routes extends Endpoints with ServerOptions { createServerEndpoint(createPartitioningEndpointV1, PartitioningController.createPartitioningIfNotExistsV1), createServerEndpoint(createPartitioningEndpointV2, PartitioningController.createPartitioningIfNotExistsV2), createServerEndpoint(createOrUpdateAdditionalDataEndpointV2, PartitioningController.createOrUpdateAdditionalDataV2), - createServerEndpoint(getPartitioningCheckpointsEndpointV2, PartitioningController.getPartitioningCheckpoints), - createServerEndpoint(healthEndpoint, (_: Unit) => ZIO.unit) + createServerEndpoint(getPartitioningCheckpointsEndpointV2, PartitioningController.getPartitioningCheckpointsV2), + createServerEndpoint(getFlowCheckpointsEndpointV2, FlowController.getFlowCheckpointsV2), + createServerEndpoint(healthEndpoint, (_: Unit) => ZIO.unit), ) ZHttp4sServerInterpreter[HttpEnv.Env](http4sServerOptions(metricsInterceptorOption)).from(endpoints).toRoutes } @@ -59,7 +60,8 @@ trait Routes extends Endpoints with ServerOptions { createPartitioningEndpointV1, createPartitioningEndpointV2, createOrUpdateAdditionalDataEndpointV2, - getPartitioningCheckpointsEndpointV2 + getPartitioningCheckpointsEndpointV2, + getFlowCheckpointsEndpointV2, ) ZHttp4sServerInterpreter[HttpEnv.Env](http4sServerOptions(None)) .from(SwaggerInterpreter().fromEndpoints[HttpEnv.F](endpoints, SwaggerApiName, SwaggerApiVersion)) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepository.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepository.scala new file mode 100644 index 000000000..6f56aa145 --- /dev/null +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepository.scala @@ -0,0 +1,28 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.repository + +import za.co.absa.atum.model.dto.CheckpointQueryDTO +import za.co.absa.atum.server.api.exception.DatabaseError +import za.co.absa.atum.server.model.CheckpointFromDB +import zio._ +import zio.macros.accessible + +@accessible +trait FlowRepository { + def getFlowCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[DatabaseError, Seq[CheckpointFromDB]] +} diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepositoryImpl.scala new file mode 100644 index 000000000..3a8e96a88 --- /dev/null +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepositoryImpl.scala @@ -0,0 +1,39 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.repository + +import za.co.absa.atum.model.dto.CheckpointQueryDTO +import za.co.absa.atum.server.api.database.flows.functions.GetFlowCheckpoints +import za.co.absa.atum.server.api.exception.DatabaseError +import za.co.absa.atum.server.model.CheckpointFromDB +import zio._ + +class FlowRepositoryImpl(getFlowCheckpointsFn: GetFlowCheckpoints) extends FlowRepository with BaseRepository { + + override def getFlowCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[DatabaseError, Seq[CheckpointFromDB]] = { + dbCall(getFlowCheckpointsFn(checkpointQueryDTO), "getFlowCheckpoints") + } + +} + +object FlowRepositoryImpl { + val layer: URLayer[GetFlowCheckpoints, FlowRepository] = ZLayer { + for { + getFlowCheckpoints <- ZIO.service[GetFlowCheckpoints] + } yield new FlowRepositoryImpl(getFlowCheckpoints) + } +} diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala index eb8571fb3..d61c13a6f 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala @@ -40,6 +40,6 @@ trait PartitioningRepository { def createOrUpdateAdditionalData(additionalData: AdditionalDataSubmitDTO): IO[DatabaseError, Either[StatusException, Unit]] - def getPartitioningCheckpoints(partitioningQueryDetails: CheckpointQueryDTO): + def getPartitioningCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[DatabaseError, Seq[CheckpointFromDB]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala index 24107ecad..a9441d66b 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala @@ -64,9 +64,9 @@ class PartitioningRepositoryImpl( getPartitioningAdditionalDataFn(partitioning).mapBoth(err => DatabaseError(err.getMessage), _.toMap) } - override def getPartitioningCheckpoints(partitioningName: CheckpointQueryDTO): + override def getPartitioningCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[DatabaseError, Seq[CheckpointFromDB]] = { - dbCall(getPartitioningCheckpointsFn(partitioningName), "getPartitioningCheckpoints") + dbCall(getPartitioningCheckpointsFn(checkpointQueryDTO), "getPartitioningCheckpoints") } } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/FlowService.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/FlowService.scala new file mode 100644 index 000000000..34781cbdf --- /dev/null +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/FlowService.scala @@ -0,0 +1,27 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.service + +import za.co.absa.atum.model.dto.{CheckpointQueryDTO, CheckpointDTO} +import za.co.absa.atum.server.api.exception.ServiceError +import zio._ +import zio.macros.accessible + +@accessible +trait FlowService { + def getFlowCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[ServiceError, Seq[CheckpointDTO]] +} diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/FlowServiceImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/FlowServiceImpl.scala new file mode 100644 index 000000000..93e333c67 --- /dev/null +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/FlowServiceImpl.scala @@ -0,0 +1,50 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.service + +import za.co.absa.atum.model.dto._ +import za.co.absa.atum.server.api.exception.ServiceError +import za.co.absa.atum.server.api.repository.FlowRepository +import za.co.absa.atum.server.model.CheckpointFromDB +import zio._ + + +class FlowServiceImpl(flowRepository: FlowRepository) + extends FlowService with BaseService { + + override def getFlowCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[ServiceError, Seq[CheckpointDTO]] = { + for { + checkpointsFromDB <- repositoryCall( + flowRepository.getFlowCheckpoints(checkpointQueryDTO), "getFlowCheckpoints" + ) + checkpointDTOs <- ZIO.foreach(checkpointsFromDB) { + checkpointFromDB => + ZIO.fromEither(CheckpointFromDB.toCheckpointDTO(checkpointQueryDTO.partitioning, checkpointFromDB)) + .mapError(error => ServiceError(error.getMessage)) + } + } yield checkpointDTOs + } + +} + +object FlowServiceImpl { + val layer: URLayer[FlowRepository, FlowService] = ZLayer { + for { + flowRepository <- ZIO.service[FlowRepository] + } yield new FlowServiceImpl(flowRepository) + } +} diff --git a/server/src/main/scala/za/co/absa/atum/server/model/CheckpointFromDB.scala b/server/src/main/scala/za/co/absa/atum/server/model/CheckpointFromDB.scala index 7c9b3a7d7..705e6c319 100644 --- a/server/src/main/scala/za/co/absa/atum/server/model/CheckpointFromDB.scala +++ b/server/src/main/scala/za/co/absa/atum/server/model/CheckpointFromDB.scala @@ -18,13 +18,10 @@ package za.co.absa.atum.server.model import za.co.absa.atum.model.dto.{CheckpointDTO, MeasureDTO, MeasureResultDTO, MeasurementDTO, PartitioningDTO} import io.circe.{DecodingFailure, Json} -import io.circe.generic.auto._ import java.time.ZonedDateTime import java.util.UUID -import za.co.absa.atum.server.api.database.DoobieImplicits.decodeResultValueType - case class CheckpointFromDB( idCheckpoint: UUID, checkpointName: String, @@ -32,14 +29,16 @@ case class CheckpointFromDB( measuredByAtumAgent: Boolean = false, measureName: String, measuredColumns: Seq[String], - measurementValue: Json, + measurementValue: Json, // it's easier to convert this attribute to our `MeasurementDTO` after we received this as JSON from DB checkpointStartTime: ZonedDateTime, checkpointEndTime: Option[ZonedDateTime] ) object CheckpointFromDB { - def toCheckpointDTO(partitioning: PartitioningDTO, checkpointQueryResult: CheckpointFromDB + def toCheckpointDTO( + partitioning: PartitioningDTO, + checkpointQueryResult: CheckpointFromDB ): Either[DecodingFailure, CheckpointDTO] = { val measureResultOrErr = checkpointQueryResult.measurementValue.as[MeasureResultDTO] @@ -70,4 +69,3 @@ object CheckpointFromDB { } } - diff --git a/server/src/main/scala/za/co/absa/atum/server/model/PlayJsonImplicits.scala b/server/src/main/scala/za/co/absa/atum/server/model/PlayJsonImplicits.scala index ae98fe973..a4e32fe0f 100644 --- a/server/src/main/scala/za/co/absa/atum/server/model/PlayJsonImplicits.scala +++ b/server/src/main/scala/za/co/absa/atum/server/model/PlayJsonImplicits.scala @@ -58,6 +58,9 @@ object PlayJsonImplicits { } } + implicit val readsMeasureDTO: Reads[MeasureDTO] = Json.reads[MeasureDTO] + implicit val writesMeasureDTO: Writes[MeasureDTO] = Json.writes[MeasureDTO] + implicit val readsTypedValue: Reads[MeasureResultDTO.TypedValue] = Json.reads[MeasureResultDTO.TypedValue] implicit val writesTypedValue: Writes[MeasureResultDTO.TypedValue] = Json.writes[MeasureResultDTO.TypedValue] @@ -69,9 +72,6 @@ object PlayJsonImplicits { implicit val writesMeasureResultDTO: Writes[MeasureResultDTO] = Json.writes[MeasureResultDTO] - implicit val readsMeasureDTO: Reads[MeasureDTO] = Json.reads[MeasureDTO] - implicit val writesMeasureDTO: Writes[MeasureDTO] = Json.writes[MeasureDTO] - implicit val readsMeasurementDTO: Reads[MeasurementDTO] = Json.reads[MeasurementDTO] implicit val writesMeasurementDTO: Writes[MeasurementDTO] = Json.writes[MeasurementDTO] @@ -99,5 +99,4 @@ object PlayJsonImplicits { implicit val readsCheckpointQueryDTO: Reads[CheckpointQueryDTO] = Json.reads[CheckpointQueryDTO] implicit val writesCheckpointQueryDTO: Writes[CheckpointQueryDTO] = Json.writes[CheckpointQueryDTO] - } diff --git a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala index 65a1bf7bb..2fa568e28 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala @@ -16,7 +16,7 @@ package za.co.absa.atum.server.api -import io.circe.{Json, parser} +import io.circe.parser import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.model.CheckpointFromDB @@ -27,7 +27,7 @@ import MeasureResultDTO.ResultValueType._ trait TestData { - protected val uuid = UUID.randomUUID() + protected val uuid: UUID = UUID.randomUUID() // Partitioning DTO protected val partitioningDTO1: PartitioningDTO = Seq( @@ -47,27 +47,10 @@ trait TestData { authorIfNew = "" ) - protected val checkpointQueryDTO1: CheckpointQueryDTO = CheckpointQueryDTO( - partitioning = partitioningDTO1, - limit = Option(2), - checkpointName = Option("checkpointName") - ) - - protected val checkpointQueryDTO2: CheckpointQueryDTO = CheckpointQueryDTO( - partitioning = partitioningDTO2, - limit = Option(5), - checkpointName = Option("noCheckpoints") - ) - - protected val checkpointQueryDTO3: CheckpointQueryDTO = CheckpointQueryDTO( - partitioning = partitioningDTO3, - limit = None, - checkpointName = None - ) - // PartitioningSubmitDTO with different author protected val partitioningSubmitDTO2: PartitioningSubmitDTO = partitioningSubmitDTO1.copy(authorIfNew = "differentAuthor") + protected val partitioningSubmitDTO3: PartitioningSubmitDTO = partitioningSubmitDTO1.copy(authorIfNew = "yetAnotherAuthor") @@ -81,7 +64,7 @@ trait TestData { "key2" -> None, "key3" -> Some("value3") ) - protected val additionalDataDTO2: AdditionalDataDTO = Map( + protected val additionalDataDTO2: AdditionalDataDTO = Map( "key1" -> Some("value1"), "key2" -> Some("value2"), "key3" -> Some("value3") @@ -136,11 +119,55 @@ trait TestData { "key3" -> Some("value3") ) + // Additional Data Submit DTO + protected val additionalDataSubmitDTO1: AdditionalDataSubmitDTO = AdditionalDataSubmitDTO( + partitioning = Seq.empty, + additionalData = Map.empty, + author = "" + ) + protected val additionalDataSubmitDTO2: AdditionalDataSubmitDTO = + additionalDataSubmitDTO1.copy(author = "differentADAuthor") + + protected val additionalDataSubmitDTO3: AdditionalDataSubmitDTO = + additionalDataSubmitDTO1.copy(author = "yetAnotherADAuthor") + + // Atum Context + protected val atumContextDTO1: AtumContextDTO = AtumContextDTO( + partitioning = partitioningSubmitDTO1.partitioning, + measures = Set(measureDTO1, measureDTO2), + additionalData = Map.empty + ) + + protected val atumContextDTO2: AtumContextDTO = atumContextDTO1.copy( + partitioning = partitioningSubmitDTO2.partitioning, + measures = Set(MeasureDTO("count", Seq("1"))) + ) + + // Checkpoint Query DTO + protected val checkpointQueryDTO1: CheckpointQueryDTO = CheckpointQueryDTO( + partitioning = partitioningDTO1, + limit = Option(2), + checkpointName = Option("checkpointName"), + ) + + protected val checkpointQueryDTO2: CheckpointQueryDTO = CheckpointQueryDTO( + partitioning = partitioningDTO2, + limit = Option(5), + checkpointName = Option("noCheckpoints"), + ) + + protected val checkpointQueryDTO3: CheckpointQueryDTO = CheckpointQueryDTO( + partitioning = partitioningDTO3, + limit = None, + checkpointName = None + ) + // Checkpoint DTO protected val checkpointDTO1: CheckpointDTO = CheckpointDTO( id = UUID.randomUUID(), - name = "name", + name = checkpointQueryDTO1.checkpointName.get, author = "author", + measuredByAtumAgent = true, partitioning = checkpointQueryDTO1.partitioning, processStartTime = ZonedDateTime.now(), processEndTime = Some(ZonedDateTime.now()), @@ -149,9 +176,10 @@ trait TestData { protected val checkpointDTO2: CheckpointDTO = CheckpointDTO( id = UUID.randomUUID(), - name = "name2", + name = checkpointQueryDTO2.checkpointName.get, author = "author2", - partitioning = checkpointQueryDTO1.partitioning, + measuredByAtumAgent = true, + partitioning = checkpointQueryDTO2.partitioning, processStartTime = ZonedDateTime.now(), processEndTime = Some(ZonedDateTime.now()), measurements = measurementsDTO2.toSet @@ -159,86 +187,56 @@ trait TestData { protected val checkpointDTO3: CheckpointDTO = checkpointDTO1.copy(id = UUID.randomUUID()) - // Additional Data DTO as a map - val defaultJsonString: String = """ - |{ - | "mainValue": { - | "value": "123", - | "valueType": "Long" - | }, - | "supportValues": { - | "key1": { - | "value": "123456789", - | "valueType": "Long" - | }, - | "key2": { - | "value": "12345.6789", - | "valueType": "BigDecimal" - | } - | } - |} - |""".stripMargin - - protected val defaultJson: Json = parser.parse(defaultJsonString).getOrElse(throw new Exception("Failed to pass JSON")) - - // Checkpoint from DB DTO + // Checkpoint From DB protected val checkpointFromDB1: CheckpointFromDB = CheckpointFromDB( idCheckpoint = checkpointDTO1.id, - checkpointName = "name", + checkpointName = checkpointQueryDTO1.checkpointName.get, author = "author", + measuredByAtumAgent = true, measureName = measureDTO1.measureName, - measuredColumns = Seq("col_A1", "col_B1"), - measurementValue = defaultJson, + measuredColumns = measureDTO1.measuredColumns, + measurementValue = parser + .parse( + """ + |{ + | "mainValue": { + | "value": "123", + | "valueType": "Long" + | }, + | "supportValues": { + | "key1": { + | "value": "123456789", + | "valueType": "Long" + | }, + | "key2": { + | "value": "12345.6789", + | "valueType": "BigDecimal" + | } + | } + |} + |""".stripMargin + ) + .getOrElse { + throw new Exception("Failed to parse JSON") + }, checkpointStartTime = checkpointDTO1.processStartTime, checkpointEndTime = checkpointDTO1.processEndTime ) + protected val checkpointFromDB2: CheckpointFromDB = checkpointFromDB1 + .copy( + idCheckpoint = checkpointDTO2.id, + checkpointName = checkpointQueryDTO2.checkpointName.get, + author = "author2", + measuredByAtumAgent = true, + measureName = measureDTO2.measureName, + measuredColumns = measureDTO2.measuredColumns, + checkpointStartTime = checkpointDTO2.processStartTime, + checkpointEndTime = checkpointDTO2.processEndTime - protected val checkpointFromDB2: CheckpointFromDB = CheckpointFromDB( - idCheckpoint = checkpointDTO2.id, - checkpointName = "name2", - author = "author2", - measureName = measureDTO2.measureName, - measuredColumns = Seq("col_A2", "col_B2"), - measurementValue = defaultJson, - checkpointStartTime = checkpointDTO2.processStartTime, - checkpointEndTime = checkpointDTO2.processEndTime - ) - - protected val checkpointFromDB3: CheckpointFromDB = CheckpointFromDB( - idCheckpoint = checkpointDTO1.id, - checkpointName = "name", - author = "author", - measuredByAtumAgent = true, - measureName = "cnt", - measuredColumns = Seq("col3_A", "col3_B"), - measurementValue = defaultJson, - checkpointStartTime = checkpointDTO3.processStartTime, - checkpointEndTime = None, - ) - - // Additional Data submit DTO - protected val additionalDataSubmitDTO1: AdditionalDataSubmitDTO = AdditionalDataSubmitDTO( - partitioning = Seq.empty, - additionalData = Map.empty, - author = "" - ) - protected val additionalDataSubmitDTO2: AdditionalDataSubmitDTO = - additionalDataSubmitDTO1.copy(author = "differentADAuthor") - - protected val additionalDataSubmitDTO3: AdditionalDataSubmitDTO = - additionalDataSubmitDTO1.copy(author = "yetAnotherADAuthor") - - // Atum Context - protected val atumContextDTO1: AtumContextDTO = AtumContextDTO( - partitioning = partitioningSubmitDTO1.partitioning, - measures = Set(measureDTO1, measureDTO2), - additionalData = Map.empty - ) + ) - protected val atumContextDTO2: AtumContextDTO = atumContextDTO1.copy( - partitioning = partitioningSubmitDTO1.partitioning, - measures = Set(MeasureDTO("count", Seq("1"))) - ) + protected val checkpointFromDB3: CheckpointFromDB = checkpointFromDB1 + .copy(idCheckpoint = checkpointDTO3.id, checkpointStartTime = checkpointDTO3.processStartTime) protected def createAtumContextDTO(partitioningSubmitDTO: PartitioningSubmitDTO): AtumContextDTO = { val measures: Set[MeasureDTO] = Set(MeasureDTO("count", Seq("*"))) diff --git a/server/src/test/scala/za/co/absa/atum/server/api/controller/CheckpointControllerIntegrationTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/controller/CheckpointControllerUnitTests.scala similarity index 94% rename from server/src/test/scala/za/co/absa/atum/server/api/controller/CheckpointControllerIntegrationTests.scala rename to server/src/test/scala/za/co/absa/atum/server/api/controller/CheckpointControllerUnitTests.scala index 5c71eb3e9..80cdf125c 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/controller/CheckpointControllerIntegrationTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/controller/CheckpointControllerUnitTests.scala @@ -21,14 +21,13 @@ import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.exception.ServiceError import za.co.absa.atum.server.api.service.CheckpointService import za.co.absa.atum.server.model.ErrorResponse.{GeneralErrorResponse, InternalServerErrorResponse} -import za.co.absa.atum.server.model.SuccessResponse.SingleSuccessResponse import za.co.absa.fadb.exceptions.ErrorInDataException import za.co.absa.fadb.status.FunctionStatus import zio.test.Assertion.failsWithA import zio._ import zio.test._ -object CheckpointControllerIntegrationTests extends ZIOSpecDefault with TestData { +object CheckpointControllerUnitTests extends ZIOSpecDefault with TestData { private val checkpointServiceMock = mock(classOf[CheckpointService]) diff --git a/server/src/test/scala/za/co/absa/atum/server/api/controller/FlowControllerUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/controller/FlowControllerUnitTests.scala new file mode 100644 index 000000000..98bd3a0f1 --- /dev/null +++ b/server/src/test/scala/za/co/absa/atum/server/api/controller/FlowControllerUnitTests.scala @@ -0,0 +1,59 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.controller + +import org.mockito.Mockito.{mock, when} +import za.co.absa.atum.server.api.TestData +import za.co.absa.atum.server.api.exception.ServiceError +import za.co.absa.atum.server.api.service.FlowService +import za.co.absa.atum.server.model.ErrorResponse.InternalServerErrorResponse +import zio._ +import zio.test.Assertion.failsWithA +import zio.test._ + +object FlowControllerUnitTests extends ZIOSpecDefault with TestData { + private val flowServiceMock = mock(classOf[FlowService]) + when(flowServiceMock.getFlowCheckpoints(checkpointQueryDTO1)) + .thenReturn(ZIO.fail(ServiceError("boom!"))) + + when(flowServiceMock.getFlowCheckpoints(checkpointQueryDTO2)) + .thenReturn(ZIO.succeed(Seq(checkpointDTO2))) + + private val flowServiceMockLayer = ZLayer.succeed(flowServiceMock) + + override def spec: Spec[TestEnvironment with Scope, Any] = { + suite("FlowControllerSuite")( + suite("GetFlowCheckpointsSuite")( + test("Returns expected InternalServerErrorResponse") { + assertZIO(FlowController.getFlowCheckpointsV2(checkpointQueryDTO1).exit)( + failsWithA[InternalServerErrorResponse] + ) + }, + + test("Returns expected CheckpointDTO") { + for { + result <- FlowController.getFlowCheckpointsV2(checkpointQueryDTO2) + } yield assertTrue (result.data == Seq(checkpointDTO2)) + } + + ) + ).provide( + FlowControllerImpl.layer, + flowServiceMockLayer + ) + } +} diff --git a/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala index ebc7b8d61..340949d8e 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala @@ -87,16 +87,16 @@ object PartitioningControllerUnitTests extends ZIOSpecDefault with TestData { suite("GetPartitioningCheckpointsSuite")( test("Returns expected Seq[MeasureDTO]") { for { - result <- PartitioningController.getPartitioningCheckpoints(checkpointQueryDTO1) + result <- PartitioningController.getPartitioningCheckpointsV2(checkpointQueryDTO1) } yield assertTrue(result.data == Seq(checkpointDTO1, checkpointDTO2)) }, test("Returns expected empty sequence") { for { - result <- PartitioningController.getPartitioningCheckpoints(checkpointQueryDTO2) + result <- PartitioningController.getPartitioningCheckpointsV2(checkpointQueryDTO2) } yield assertTrue(result.data == Seq.empty[CheckpointDTO]) }, test("Returns expected InternalServerErrorResponse") { - assertZIO(PartitioningController.getPartitioningCheckpoints(checkpointQueryDTO3).exit)( + assertZIO(PartitioningController.getPartitioningCheckpointsV2(checkpointQueryDTO3).exit)( failsWithA[InternalServerErrorResponse] ) } diff --git a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/CreateOrUpdateAdditionalDataIntegrationTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/CreateOrUpdateAdditionalDataIntegrationTests.scala index d1465462c..a0a282109 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/CreateOrUpdateAdditionalDataIntegrationTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/CreateOrUpdateAdditionalDataIntegrationTests.scala @@ -16,7 +16,6 @@ package za.co.absa.atum.server.api.database.runs.functions -import org.junit.runner.RunWith import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, PartitionDTO} import za.co.absa.atum.server.ConfigProviderTest import za.co.absa.atum.server.api.TestTransactorProvider @@ -25,10 +24,8 @@ import za.co.absa.fadb.exceptions.DataNotFoundException import za.co.absa.fadb.status.FunctionStatus import zio._ import zio.test._ -import zio.test.junit.ZTestJUnitRunner -@RunWith(classOf[ZTestJUnitRunner]) -class CreateOrUpdateAdditionalDataIntegrationTests extends ConfigProviderTest { +object CreateOrUpdateAdditionalDataIntegrationTests extends ConfigProviderTest { override def spec: Spec[TestEnvironment with Scope, Any] = { diff --git a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/CreatePartitioningIfNotExistsIntegrationTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/CreatePartitioningIfNotExistsIntegrationTests.scala index 97cc18bdb..fa3f9dddb 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/CreatePartitioningIfNotExistsIntegrationTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/CreatePartitioningIfNotExistsIntegrationTests.scala @@ -16,17 +16,14 @@ package za.co.absa.atum.server.api.database.runs.functions -import org.junit.runner.RunWith import za.co.absa.atum.model.dto.{PartitionDTO, PartitioningSubmitDTO} import za.co.absa.atum.server.ConfigProviderTest import za.co.absa.atum.server.api.TestTransactorProvider import za.co.absa.atum.server.api.database.PostgresDatabaseProvider import zio._ import zio.test._ -import zio.test.junit.ZTestJUnitRunner -@RunWith(classOf[ZTestJUnitRunner]) -class CreatePartitioningIfNotExistsIntegrationTests extends ConfigProviderTest { +object CreatePartitioningIfNotExistsIntegrationTests extends ConfigProviderTest { override def spec: Spec[TestEnvironment with Scope, Any] = { diff --git a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointIntegrationTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointIntegrationTests.scala index 68afea595..f76e044e8 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointIntegrationTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointIntegrationTests.scala @@ -16,7 +16,6 @@ package za.co.absa.atum.server.api.database.runs.functions -import org.junit.runner.RunWith import za.co.absa.atum.model.dto.MeasureResultDTO.{ResultValueType, TypedValue} import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.ConfigProviderTest @@ -26,13 +25,11 @@ import za.co.absa.fadb.exceptions.DataNotFoundException import za.co.absa.fadb.status.FunctionStatus import zio._ import zio.test._ -import zio.test.junit.ZTestJUnitRunner import java.time.ZonedDateTime import java.util.UUID -@RunWith(classOf[ZTestJUnitRunner]) -class WriteCheckpointIntegrationTests extends ConfigProviderTest { +object WriteCheckpointIntegrationTests extends ConfigProviderTest { override def spec: Spec[TestEnvironment with Scope, Any] = { diff --git a/server/src/test/scala/za/co/absa/atum/server/api/http/BaseEndpointsUnitTest.scala b/server/src/test/scala/za/co/absa/atum/server/api/http/BaseEndpointsUnitTest.scala new file mode 100644 index 000000000..44e289e62 --- /dev/null +++ b/server/src/test/scala/za/co/absa/atum/server/api/http/BaseEndpointsUnitTest.scala @@ -0,0 +1,85 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.http + +import org.scalatest.flatspec.AnyFlatSpec + +class BaseEndpointsUnitTest extends AnyFlatSpec { + + object BaseEndpointsForTests extends BaseEndpoints + + "pathToAPIv1CompatibleFormat" should "successfully handle empty input" in { + val input = "" + val actual = BaseEndpointsForTests.pathToAPIv1CompatibleFormat(input) + val expected = "" + assert(actual == expected) + } + + "pathToAPIv1CompatibleFormat" should + "successfully convert our standard API path format to format compatible with API V1 (kebab)" in { + + val input = "create-checkpoint" + val actual = BaseEndpointsForTests.pathToAPIv1CompatibleFormat(input) + val expected = "createCheckpoint" + assert(actual == expected) + } + + "pathToAPIv1CompatibleFormat" should + "successfully convert our standard API path format to format compatible with API V1 (kebab2)" in { + + val input = "create-check-point2" + val actual = BaseEndpointsForTests.pathToAPIv1CompatibleFormat(input) + val expected = "createCheckPoint2" + assert(actual == expected) + } + + "pathToAPIv1CompatibleFormat" should + "successfully convert our standard API path format to format compatible with API V1 (kebab3)" in { + + val input = "Create-check-" + val actual = BaseEndpointsForTests.pathToAPIv1CompatibleFormat(input) + val expected = "createCheck" + assert(actual == expected) + } + + "pathToAPIv1CompatibleFormat" should + "successfully convert our standard API path format to format compatible with API V1 (snake)" in { + + val input = "_create_check_point" + val actual = BaseEndpointsForTests.pathToAPIv1CompatibleFormat(input) + val expected = "createCheckPoint" + assert(actual == expected) + } + + "pathToAPIv1CompatibleFormat" should + "successfully convert our standard API path format to format compatible with API V1 (kebab and snake)" in { + + val input = "Create-check_Point" + val actual = BaseEndpointsForTests.pathToAPIv1CompatibleFormat(input) + val expected = "createCheckPoint" + assert(actual == expected) + } + + "pathToAPIv1CompatibleFormat" should + "successfully convert our standard API path format to format compatible with API V1 (one word)" in { + + val input = "createcheckpoint" + val actual = BaseEndpointsForTests.pathToAPIv1CompatibleFormat(input) + val expected = "createcheckpoint" + assert(actual == expected) + } +} diff --git a/server/src/test/scala/za/co/absa/atum/server/api/http/CreateCheckpointEndpointIntegrationTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/http/CreateCheckpointEndpointUnitTests.scala similarity index 95% rename from server/src/test/scala/za/co/absa/atum/server/api/http/CreateCheckpointEndpointIntegrationTests.scala rename to server/src/test/scala/za/co/absa/atum/server/api/http/CreateCheckpointEndpointUnitTests.scala index 6568ae944..a232eabf0 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/http/CreateCheckpointEndpointIntegrationTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/http/CreateCheckpointEndpointUnitTests.scala @@ -33,7 +33,7 @@ import zio._ import zio.test.Assertion.equalTo import zio.test._ -object CreateCheckpointEndpointIntegrationTests extends ZIOSpecDefault with Endpoints with TestData { +object CreateCheckpointEndpointUnitTests extends ZIOSpecDefault with Endpoints with TestData { private val checkpointControllerMock = mock(classOf[CheckpointController]) @@ -56,7 +56,7 @@ object CreateCheckpointEndpointIntegrationTests extends ZIOSpecDefault with Endp .backend() val request = basicRequest - .post(uri"https://test.com/api/v2/createCheckpoint") + .post(uri"https://test.com/api/v2/create-checkpoint") .response(asJson[SingleSuccessResponse[CheckpointDTO]]) suite("CreateCheckpointEndpointSuite")( diff --git a/server/src/test/scala/za/co/absa/atum/server/api/http/CreatePartitioningEndpointIntegrationTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/http/CreatePartitioningEndpointUnitTests.scala similarity index 95% rename from server/src/test/scala/za/co/absa/atum/server/api/http/CreatePartitioningEndpointIntegrationTests.scala rename to server/src/test/scala/za/co/absa/atum/server/api/http/CreatePartitioningEndpointUnitTests.scala index 9e4d60484..9f5a5fe4a 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/http/CreatePartitioningEndpointIntegrationTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/http/CreatePartitioningEndpointUnitTests.scala @@ -33,9 +33,7 @@ import zio._ import zio.test.Assertion.equalTo import zio.test._ -import java.util.UUID - -object CreatePartitioningEndpointIntegrationTests extends ZIOSpecDefault with Endpoints with TestData { +object CreatePartitioningEndpointUnitTests extends ZIOSpecDefault with Endpoints with TestData { private val createPartitioningEndpointMock = mock(classOf[PartitioningController]) @@ -58,7 +56,7 @@ object CreatePartitioningEndpointIntegrationTests extends ZIOSpecDefault with En .backend() val request = basicRequest - .post(uri"https://test.com/api/v2/createPartitioning") + .post(uri"https://test.com/api/v2/create-partitioning") .response(asJson[SingleSuccessResponse[AtumContextDTO]]) suite("CreatePartitioningEndpointSuite")( diff --git a/server/src/test/scala/za/co/absa/atum/server/api/http/GetFlowCheckpointsEndpointUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/http/GetFlowCheckpointsEndpointUnitTests.scala new file mode 100644 index 000000000..699ae3a34 --- /dev/null +++ b/server/src/test/scala/za/co/absa/atum/server/api/http/GetFlowCheckpointsEndpointUnitTests.scala @@ -0,0 +1,98 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.http + +import org.mockito.Mockito.{mock, when} +import sttp.client3._ +import sttp.client3.playJson._ +import sttp.client3.testing.SttpBackendStub +import sttp.model.StatusCode +import sttp.tapir.server.stub.TapirStubInterpreter +import sttp.tapir.ztapir.{RIOMonadError, RichZEndpoint} +import za.co.absa.atum.model.dto.CheckpointDTO +import za.co.absa.atum.server.api.TestData +import za.co.absa.atum.server.api.controller.FlowController +import za.co.absa.atum.server.model.ErrorResponse.{GeneralErrorResponse, InternalServerErrorResponse} +import za.co.absa.atum.server.model.PlayJsonImplicits._ +import za.co.absa.atum.server.model.SuccessResponse.MultiSuccessResponse +import zio._ +import zio.test.Assertion.equalTo +import zio.test._ + +object GetFlowCheckpointsEndpointUnitTests extends ZIOSpecDefault with Endpoints with TestData { + + private val flowControllerMock = mock(classOf[FlowController]) + + when(flowControllerMock.getFlowCheckpointsV2(checkpointQueryDTO1)) + .thenReturn(ZIO.succeed(MultiSuccessResponse(Seq(checkpointDTO1, checkpointDTO2), uuid))) + when(flowControllerMock.getFlowCheckpointsV2(checkpointQueryDTO2)) + .thenReturn(ZIO.fail(GeneralErrorResponse("error"))) + when(flowControllerMock.getFlowCheckpointsV2(checkpointQueryDTO3)) + .thenReturn(ZIO.fail(InternalServerErrorResponse("error"))) + + private val flowControllerMockLayer = ZLayer.succeed(flowControllerMock) + + private val getFlowCheckpointsServerEndpoint = + getFlowCheckpointsEndpointV2.zServerLogic(FlowController.getFlowCheckpointsV2) + + def spec: Spec[TestEnvironment with Scope, Any] = { + val backendStub = TapirStubInterpreter(SttpBackendStub.apply(new RIOMonadError[FlowController])) + .whenServerEndpoint(getFlowCheckpointsServerEndpoint) + .thenRunLogic() + .backend() + + val request = basicRequest + .post(uri"https://test.com/api/v2/get-flow-checkpoints") + .response(asJson[MultiSuccessResponse[CheckpointDTO]]) + + suite("GetFlowCheckpointsEndpointSuite")( + test("Returns expected CheckpointDTO") { + val response = request + .body(checkpointQueryDTO1) + .send(backendStub) + + val body = response.map(_.body) + val statusCode = response.map(_.code) + + val expectedResult = MultiSuccessResponse(Seq(checkpointDTO1, checkpointDTO2), uuid) + + assertZIO(body <&> statusCode)(equalTo(Right(expectedResult), StatusCode.Ok)) + }, + test("Returns expected BadRequest") { + val response = request + .body(checkpointQueryDTO2) + .send(backendStub) + + val statusCode = response.map(_.code) + + assertZIO(statusCode)(equalTo(StatusCode.BadRequest)) + }, + test("Returns expected InternalServerError") { + val response = request + .body(checkpointQueryDTO3) + .send(backendStub) + + val statusCode = response.map(_.code) + + assertZIO(statusCode)(equalTo(StatusCode.InternalServerError)) + } + ) + }.provide( + flowControllerMockLayer + ) + +} diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryIntegrationTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryUnitTests.scala similarity index 92% rename from server/src/test/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryIntegrationTests.scala rename to server/src/test/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryUnitTests.scala index b3b9979de..fb4097e5f 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryIntegrationTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryUnitTests.scala @@ -16,7 +16,6 @@ package za.co.absa.atum.server.api.repository -import org.junit.runner.RunWith import org.mockito.Mockito.{mock, when} import za.co.absa.atum.server.api.database.runs.functions.WriteCheckpoint import za.co.absa.atum.server.api.exception.DatabaseError @@ -26,10 +25,8 @@ import za.co.absa.fadb.status.FunctionStatus import zio._ import zio.test.Assertion.failsWithA import zio.test._ -import zio.test.junit.ZTestJUnitRunner -@RunWith(classOf[ZTestJUnitRunner]) -class CheckpointRepositoryIntegrationTests extends ZIOSpecDefault with TestData { +object CheckpointRepositoryUnitTests extends ZIOSpecDefault with TestData { private val writeCheckpointMock: WriteCheckpoint = mock(classOf[WriteCheckpoint]) diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/FlowRepositoryUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/FlowRepositoryUnitTests.scala new file mode 100644 index 000000000..a0baa3be7 --- /dev/null +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/FlowRepositoryUnitTests.scala @@ -0,0 +1,58 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.repository + +import org.mockito.Mockito.{mock, when} +import za.co.absa.atum.server.api.TestData +import za.co.absa.atum.server.api.database.flows.functions.GetFlowCheckpoints +import za.co.absa.atum.server.api.exception.DatabaseError +import zio._ +import zio.test.Assertion.failsWithA +import zio.test._ + +object FlowRepositoryUnitTests extends ZIOSpecDefault with TestData { + + private val getFlowCheckpointsMock = mock(classOf[GetFlowCheckpoints]) + + when(getFlowCheckpointsMock.apply(checkpointQueryDTO1)).thenReturn(ZIO.fail(new Exception("boom!"))) + when(getFlowCheckpointsMock.apply(checkpointQueryDTO2)).thenReturn(ZIO.succeed(Seq(checkpointFromDB1, checkpointFromDB2))) + + private val getFlowCheckpointsMockLayer = ZLayer.succeed(getFlowCheckpointsMock) + + override def spec: Spec[TestEnvironment with Scope, Any] = { + + suite("FlowRepositoryIntegrationSuite")( + suite("GetFlowCheckpointsSuite")( + test("Returns expected DatabaseError") { + assertZIO(FlowRepository.getFlowCheckpoints(checkpointQueryDTO1).exit)( + failsWithA[DatabaseError] + ) + }, + test("Returns expected Left with StatusException") { + for { + result <- FlowRepository.getFlowCheckpoints(checkpointQueryDTO2) + } yield assertTrue(result == Seq(checkpointFromDB1, checkpointFromDB2)) + }, + ), + ).provide( + FlowRepositoryImpl.layer, + getFlowCheckpointsMockLayer, + ) + + } + +} diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryIntegrationTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala similarity index 89% rename from server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryIntegrationTests.scala rename to server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala index 475d5870b..aad01953a 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryIntegrationTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala @@ -16,27 +16,18 @@ package za.co.absa.atum.server.api.repository -import org.junit.runner.RunWith import org.mockito.Mockito.{mock, when} -import za.co.absa.atum.model.dto.{AdditionalDataDTO, MeasureDTO} -import za.co.absa.atum.server.api.database.runs.functions.{ - CreateOrUpdateAdditionalData, - CreatePartitioningIfNotExists, - GetPartitioningAdditionalData, - GetPartitioningCheckpoints, - GetPartitioningMeasures} -import za.co.absa.atum.server.api.exception.DatabaseError import za.co.absa.atum.server.api.TestData +import za.co.absa.atum.server.api.database.runs.functions._ +import za.co.absa.atum.server.api.exception.DatabaseError import za.co.absa.atum.server.model.CheckpointFromDB import za.co.absa.fadb.exceptions.ErrorInDataException import za.co.absa.fadb.status.FunctionStatus import zio._ import zio.test.Assertion.failsWithA import zio.test._ -import zio.test.junit.ZTestJUnitRunner -@RunWith(classOf[ZTestJUnitRunner]) -class PartitioningRepositoryIntegrationTests extends ZIOSpecDefault with TestData { +object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { // Create Partitioning Mocks private val createPartitioningIfNotExistsMock = mock(classOf[CreatePartitioningIfNotExists]) @@ -129,7 +120,7 @@ class PartitioningRepositoryIntegrationTests extends ZIOSpecDefault with TestDat test("Returns expected Seq") { for { result <- PartitioningRepository.getPartitioningMeasures(partitioningDTO1) - } yield assertTrue(result.isInstanceOf[Seq[MeasureDTO]] && result == Seq(measureDTO1, measureDTO2)) + } yield assertTrue(result == Seq(measureDTO1, measureDTO2)) }, test("Returns expected Exception") { assertZIO(PartitioningRepository.getPartitioningMeasures(partitioningDTO2).exit)( @@ -142,7 +133,7 @@ class PartitioningRepositoryIntegrationTests extends ZIOSpecDefault with TestDat test("Returns expected Right with empty Map") { for { result <- PartitioningRepository.getPartitioningAdditionalData(partitioningDTO1) - } yield assertTrue(result.isInstanceOf[AdditionalDataDTO] && result == additionalDataDTO1) + } yield assertTrue(result == additionalDataDTO1) }, test("Returns expected Left with DatabaseError") { assertZIO(PartitioningRepository.getPartitioningAdditionalData(partitioningDTO2).exit)( @@ -155,7 +146,7 @@ class PartitioningRepositoryIntegrationTests extends ZIOSpecDefault with TestDat test("Returns expected Seq") { for { result <- PartitioningRepository.getPartitioningCheckpoints(checkpointQueryDTO1) - } yield assertTrue(result.isInstanceOf[Seq[CheckpointFromDB]] && result == Seq(checkpointFromDB1)) + } yield assertTrue(result == Seq(checkpointFromDB1)) }, test("Returns expected DatabaseError") { assertZIO(PartitioningRepository.getPartitioningCheckpoints(checkpointQueryDTO2).exit)( diff --git a/server/src/test/scala/za/co/absa/atum/server/api/service/CheckpointServiceIntegrationTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/service/CheckpointServiceUnitTests.scala similarity index 92% rename from server/src/test/scala/za/co/absa/atum/server/api/service/CheckpointServiceIntegrationTests.scala rename to server/src/test/scala/za/co/absa/atum/server/api/service/CheckpointServiceUnitTests.scala index 384d2f5f5..c68a2ede7 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/service/CheckpointServiceIntegrationTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/service/CheckpointServiceUnitTests.scala @@ -16,7 +16,6 @@ package za.co.absa.atum.server.api.service -import org.junit.runner.RunWith import org.mockito.Mockito.{mock, when} import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.exception.{DatabaseError, ServiceError} @@ -26,10 +25,8 @@ import za.co.absa.fadb.status.FunctionStatus import zio.test.Assertion.failsWithA import zio.test._ import zio._ -import zio.test.junit.ZTestJUnitRunner -@RunWith(classOf[ZTestJUnitRunner]) -class CheckpointServiceIntegrationTests extends ZIOSpecDefault with TestData { +object CheckpointServiceUnitTests extends ZIOSpecDefault with TestData { private val checkpointRepositoryMock = mock(classOf[CheckpointRepository]) diff --git a/server/src/test/scala/za/co/absa/atum/server/api/service/FlowServiceUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/service/FlowServiceUnitTests.scala new file mode 100644 index 000000000..8b70529ae --- /dev/null +++ b/server/src/test/scala/za/co/absa/atum/server/api/service/FlowServiceUnitTests.scala @@ -0,0 +1,60 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.service + +import org.mockito.Mockito.{mock, when} +import za.co.absa.atum.server.api.TestData +import za.co.absa.atum.server.api.exception.{DatabaseError, ServiceError} +import za.co.absa.atum.server.api.repository.FlowRepository +import zio._ +import zio.test.Assertion.failsWithA +import zio.test._ + +object FlowServiceUnitTests extends ZIOSpecDefault with TestData { + private val flowRepositoryMock = mock(classOf[FlowRepository]) + + when(flowRepositoryMock.getFlowCheckpoints(checkpointQueryDTO1)).thenReturn(ZIO.fail(DatabaseError("boom!"))) + when(flowRepositoryMock.getFlowCheckpoints(checkpointQueryDTO2)) + .thenReturn(ZIO.succeed(Seq(checkpointFromDB2))) + + private val flowRepositoryMockLayer = ZLayer.succeed(flowRepositoryMock) + + override def spec: Spec[TestEnvironment with Scope, Any] = { + + suite("FlowServiceSuite")( + suite("GetFlowCheckpointsSuite")( + test("Returns expected ServiceError") { + assertZIO(FlowService.getFlowCheckpoints(checkpointQueryDTO1).exit)( + failsWithA[ServiceError] + ) + }, + test("Returns expected Seq[CheckpointDTO]") { + for { + result <- FlowService.getFlowCheckpoints(checkpointQueryDTO2) + } yield assertTrue{ + result == Seq(checkpointDTO2) + } + }, + + ), + ).provide( + FlowServiceImpl.layer, + flowRepositoryMockLayer + ) + + } +} diff --git a/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala index 8b2d9f0f9..1308faaa6 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala @@ -16,9 +16,7 @@ package za.co.absa.atum.server.api.service - import org.mockito.Mockito.{mock, when} -import za.co.absa.atum.model.dto.{AdditionalDataDTO, CheckpointDTO, MeasureDTO} import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.exception.{DatabaseError, ServiceError} import za.co.absa.atum.server.api.repository.PartitioningRepository @@ -28,7 +26,6 @@ import zio.test.Assertion.failsWithA import zio.test._ import zio._ - object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { private val partitioningRepositoryMock = mock(classOf[PartitioningRepository]) @@ -104,7 +101,6 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { for { result <- PartitioningService.getPartitioningMeasures(partitioningDTO1) } yield assertTrue{ - result.isInstanceOf[Seq[MeasureDTO]] result == Seq(measureDTO1, measureDTO2) } }, @@ -118,7 +114,7 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { test("Returns expected Right with Seq[AdditionalDataDTO]") { for { result <- PartitioningService.getPartitioningAdditionalData(partitioningDTO1) - } yield assertTrue(result.isInstanceOf[AdditionalDataDTO]) + } yield assertTrue{result == additionalDataDTO1} }, test("Returns expected ServiceError") { assertZIO(PartitioningService.getPartitioningAdditionalData(partitioningDTO2).exit)( @@ -131,8 +127,7 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { for { result <- PartitioningService.getPartitioningCheckpoints(checkpointQueryDTO1) } yield assertTrue{ - result.isInstanceOf[Seq[CheckpointDTO]] - result == Seq(checkpointDTO1, checkpointDTO2) + result == Seq(checkpointDTO1, checkpointDTO2.copy(partitioning = checkpointDTO1.partitioning)) } }, test("Returns expected ServiceError") { diff --git a/server/src/test/scala/za/co/absa/atum/server/aws/AwsSecretsProviderUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/aws/AwsSecretsProviderUnitTests.scala index 779036c24..b53fdc2ef 100644 --- a/server/src/test/scala/za/co/absa/atum/server/aws/AwsSecretsProviderUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/aws/AwsSecretsProviderUnitTests.scala @@ -16,7 +16,6 @@ package za.co.absa.atum.server.aws -import org.junit.runner.RunWith import org.mockito.ArgumentMatchers.any import org.mockito.Mockito.{mock, when} import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient @@ -24,11 +23,9 @@ import software.amazon.awssdk.services.secretsmanager.model.{GetSecretValueReque import za.co.absa.atum.server.ConfigProviderTest import za.co.absa.atum.server.config.AwsConfig import zio.test._ -import zio.test.junit.ZTestJUnitRunner import zio.{Scope, ZIO, ZLayer} -@RunWith(classOf[ZTestJUnitRunner]) -class AwsSecretsProviderUnitTests extends ConfigProviderTest { +object AwsSecretsProviderUnitTests extends ConfigProviderTest { private val secretsManagerClientMock = mock(classOf[SecretsManagerClient])