diff --git a/.gitignore b/.gitignore index 68eb03cb..a864bff5 100644 --- a/.gitignore +++ b/.gitignore @@ -91,3 +91,4 @@ utils/resources/*.conf /server/certs/ /server/selfsigned.crt /server/selfsigned.p12 +/.bloop/ diff --git a/database/src/main/postgres/flows/V0.2.0.57__get_flow_checkpoints.sql b/database/src/main/postgres/flows/V0.2.0.57__get_flow_checkpoints.sql index 72df99e7..2d1d1d3f 100644 --- a/database/src/main/postgres/flows/V0.2.0.57__get_flow_checkpoints.sql +++ b/database/src/main/postgres/flows/V0.2.0.57__get_flow_checkpoints.sql @@ -33,7 +33,7 @@ CREATE OR REPLACE FUNCTION flows.get_flow_checkpoints( OUT has_more BOOLEAN ) RETURNS SETOF record AS $$ --------------------------------------------------------------------------------------------------------------------- + -------------------------------------------------------------------------------------------------------------------- -- -- Function: flows.get_flow_checkpoints(4) -- Retrieves all checkpoints (measures and their measurement details) related to a primary flow @@ -144,6 +144,6 @@ BEGIN LC.id_checkpoint, LC.process_start_time; END; $$ -LANGUAGE plpgsql VOLATILE SECURITY DEFINER; + LANGUAGE plpgsql VOLATILE SECURITY DEFINER; GRANT EXECUTE ON FUNCTION flows.get_flow_checkpoints(BIGINT, INT, BIGINT, TEXT) TO atum_owner; diff --git a/database/src/main/postgres/flows/V0.3.0.1__get_flow_checkpoints.sql b/database/src/main/postgres/flows/V0.3.0.1__get_flow_checkpoints.sql new file mode 100644 index 00000000..925235bd --- /dev/null +++ b/database/src/main/postgres/flows/V0.3.0.1__get_flow_checkpoints.sql @@ -0,0 +1,182 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +DROP FUNCTION IF EXISTS flows.get_flow_checkpoints( + i_flow_id BIGINT, + i_checkpoints_limit INT, + i_offset BIGINT, + i_checkpoint_name TEXT +); + +CREATE OR REPLACE FUNCTION flows.get_flow_checkpoints( + IN i_flow_id BIGINT, + IN i_checkpoints_limit INT DEFAULT 5, + IN i_offset BIGINT DEFAULT 0, + IN i_checkpoint_name TEXT DEFAULT NULL, + IN i_latest_first BOOLEAN DEFAULT TRUE, + OUT status INTEGER, + OUT status_text TEXT, + OUT id_checkpoint UUID, + OUT checkpoint_name TEXT, + OUT checkpoint_author TEXT, + OUT measured_by_atum_agent BOOLEAN, + OUT measure_name TEXT, + OUT measured_columns TEXT[], + OUT measurement_value JSONB, + OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE, + OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE, + OUT id_partitioning BIGINT, + OUT partitioning JSONB, + OUT partitioning_author TEXT, + OUT has_more BOOLEAN +) RETURNS SETOF record AS +$$ +-------------------------------------------------------------------------------------------------------------------- +-- +-- Function: flows.get_flow_checkpoints(4) +-- Retrieves all checkpoints (measures and their measurement details) related to a primary flow +-- associated with the input partitioning. +-- +-- Note: a single row returned from this function doesn't contain all data related to a single checkpoint - it only +-- represents one measure associated with a checkpoint. So even if only a single checkpoint would be retrieved, +-- this function can potentially return multiple rows. +-- +-- Note: checkpoints will be retrieved in ordered fashion, by checkpoint_time and id_checkpoint +-- +-- Parameters: +-- i_partitioning_of_flow - partitioning to use for identifying the flow associate with checkpoints +-- that will be retrieved +-- i_checkpoints_limit - (optional) maximum number of checkpoint to return, returns all of them if NULL +-- i_offset - (optional) offset for checkpoints pagination +-- i_checkpoint_name - (optional) if specified, returns data related to particular checkpoint's name +-- i_latest_first - (optional) if true, checkpoints are ordered by process_start_time in descending order +-- +-- Note: i_checkpoint_limit and i_offset are used for pagination purposes; +-- checkpoints are ordered by process_start_time in descending order +-- and then by id_checkpoint in ascending order +-- +-- Returns: +-- status - Status code +-- status_text - Status text +-- id_checkpoint - ID of retrieved checkpoint +-- checkpoint_name - Name of the retrieved checkpoint +-- checkpoint_author - Author of the checkpoint +-- measured_by_atum_agent - Flag indicating whether the checkpoint was measured by Atum Agent +-- (if false, data supplied manually) +-- measure_name - measure name associated with a given checkpoint +-- measured_columns - measure columns associated with a given checkpoint +-- measurement_value - measurement details associated with a given checkpoint +-- checkpoint_start_time - Time of the checkpoint +-- checkpoint_end_time - End time of the checkpoint computation +-- id_partitioning - ID of the partitioning +-- partitioning - Partitioning value +-- partitioning_author - Author of the partitioning +-- has_more - flag indicating whether there are more checkpoints available, always `false` if `i_limit` is NULL +-- +-- Status codes: +-- 11 - OK +-- 42 - Flow not found +--------------------------------------------------------------------------------------------------- +DECLARE + _has_more BOOLEAN; + _latest_first BOOLEAN := coalesce(i_latest_first, TRUE); +BEGIN + -- Check if the flow exists by querying the partitioning_to_flow table. + -- Rationale: + -- This table is preferred over the flows table because: + -- 1. Every flow has at least one record in partitioning_to_flow. + -- 2. This table is used in subsequent queries, providing a caching advantage. + -- 3. Improves performance by reducing the need to query the flows table directly. + PERFORM 1 FROM flows.partitioning_to_flow WHERE fk_flow = i_flow_id; + IF NOT FOUND THEN + status := 42; + status_text := 'Flow not found'; + RETURN NEXT; + RETURN; + END IF; + + -- Determine if there are more checkpoints than the limit + IF i_checkpoints_limit IS NOT NULL THEN + SELECT count(*) > i_checkpoints_limit + FROM runs.checkpoints C + JOIN flows.partitioning_to_flow PF ON C.fk_partitioning = PF.fk_partitioning + WHERE PF.fk_flow = i_flow_id + AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name) + LIMIT i_checkpoints_limit + 1 OFFSET i_offset + INTO _has_more; + ELSE + _has_more := false; + END IF; + + -- Retrieve the checkpoints and their associated measurements + RETURN QUERY + WITH limited_checkpoints AS ( + SELECT C.id_checkpoint, + C.fk_partitioning, + C.checkpoint_name, + C.created_by, + C.measured_by_atum_agent, + C.process_start_time, + C.process_end_time + FROM runs.checkpoints C + JOIN flows.partitioning_to_flow PF ON C.fk_partitioning = PF.fk_partitioning + WHERE PF.fk_flow = i_flow_id + AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name) + ORDER BY + CASE + WHEN _latest_first THEN C.process_start_time + END DESC, + CASE + WHEN NOT _latest_first THEN C.process_start_time + END ASC + LIMIT i_checkpoints_limit OFFSET i_offset + ) + SELECT + 11 AS status, + 'OK' AS status_text, + LC.id_checkpoint, + LC.checkpoint_name, + LC.created_by AS author, + LC.measured_by_atum_agent, + MD.measure_name, + MD.measured_columns, + M.measurement_value, + LC.process_start_time AS checkpoint_start_time, + LC.process_end_time AS checkpoint_end_time, + LC.fk_partitioning AS id_partitioning, + P.partitioning AS partitioning, + P.created_by AS partitioning_author, + _has_more AS has_more + FROM + limited_checkpoints LC + INNER JOIN + runs.measurements M ON LC.id_checkpoint = M.fk_checkpoint + INNER JOIN + runs.measure_definitions MD ON M.fk_measure_definition = MD.id_measure_definition + INNER JOIN + runs.partitionings P ON LC.fk_partitioning = P.id_partitioning + ORDER BY + CASE + WHEN _latest_first THEN LC.process_start_time + END DESC, + CASE + WHEN NOT _latest_first THEN LC.process_start_time + END ASC; +END; +$$ +LANGUAGE plpgsql VOLATILE SECURITY DEFINER; + +GRANT EXECUTE ON FUNCTION flows.get_flow_checkpoints(BIGINT, INT, BIGINT, TEXT, BOOLEAN) TO atum_owner; diff --git a/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowCheckpointsIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowCheckpointsIntegrationTests.scala index ed06fe22..a22b4013 100644 --- a/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowCheckpointsIntegrationTests.scala +++ b/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowCheckpointsIntegrationTests.scala @@ -1,5 +1,7 @@ package za.co.absa.atum.database.flows +import io.circe.Json +import io.circe.parser.parse import za.co.absa.balta.DBTestSuite import za.co.absa.balta.classes.JsonBString import za.co.absa.balta.classes.setter.CustomDBType @@ -184,7 +186,7 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { assert(row1.getString("status_text").contains("OK")) assert(row1.getUUID("id_checkpoint").contains(checkpointId1)) assert(row1.getString("checkpoint_name").contains("CheckpointNameCntAndAvg")) - assert(row1.getString("author").contains("Joseph")) + assert(row1.getString("checkpoint_author").contains("Joseph")) assert(row1.getBoolean("measured_by_atum_agent").contains(true)) assert(row1.getOffsetDateTime("checkpoint_start_time").contains(startTime)) assert(row1.getOffsetDateTime("checkpoint_end_time").contains(endTime)) @@ -200,7 +202,7 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { assert(row2.getString("status_text").contains("OK")) assert(row2.getUUID("id_checkpoint").contains(checkpointId1)) assert(row2.getString("checkpoint_name").contains("CheckpointNameCntAndAvg")) - assert(row2.getString("author").contains("Joseph")) + assert(row2.getString("checkpoint_author").contains("Joseph")) assert(row2.getBoolean("measured_by_atum_agent").contains(true)) assert(row2.getOffsetDateTime("checkpoint_start_time").contains(startTimeOther)) assert(row2.getOffsetDateTime("checkpoint_end_time").contains(endTimeOther)) @@ -218,7 +220,7 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { assert(row3.getString("status_text").contains("OK")) assert(row3.getUUID("id_checkpoint").contains(checkpointId2)) assert(row3.getString("checkpoint_name").contains("CheckpointNameOther")) - assert(row3.getString("author").contains("Joseph")) + assert(row3.getString("checkpoint_author").contains("Joseph")) assert(row3.getBoolean("measured_by_atum_agent").contains(true)) assert(row3.getOffsetDateTime("checkpoint_start_time").contains(startTimeOther)) assert(row3.getOffsetDateTime("checkpoint_end_time").contains(endTimeOther)) @@ -252,6 +254,242 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { } } + test("getFlowCheckpointsV2 should return all checkpoints of a given name and a given flow") { + + val partitioningId: Long = Random.nextLong() + table("runs.partitionings").insert( + add("id_partitioning", partitioningId) + .add("partitioning", partitioning) + .add("created_by", "Joseph") + ) + + val flowId: Long = Random.nextLong() + table("flows.flows").insert( + add("id_flow", flowId) + .add("flow_name", "flowName") + .add("from_pattern", false) + .add("created_by", "Joseph") + .add("fk_primary_partitioning", partitioningId) + ) + + table("flows.partitioning_to_flow").insert( + add("fk_flow", flowId) + .add("fk_partitioning", partitioningId) + .add("created_by", "ObviouslySomeTest") + ) + + // Insert checkpoints and measure definitions + val checkpointId1 = UUID.fromString("e9efe108-d8fc-42ad-b367-aa1b17f6c450") + val startTime1 = OffsetDateTime.parse("1993-02-14T10:00:00Z") + val endTime1 = OffsetDateTime.parse("2024-04-24T10:00:00Z") + table("runs.checkpoints").insert( + add("id_checkpoint", checkpointId1) + .add("fk_partitioning", partitioningId) + .add("checkpoint_name", "CheckpointNameCntAndAvg") + .add("measured_by_atum_agent", true) + .add("process_start_time", startTime1) + .add("process_end_time", endTime1) + .add("created_by", "Joseph") + ) + + val checkpointId2 = UUID.randomUUID() + val startTimeOther = OffsetDateTime.parse("1993-02-14T10:00:00Z") + val endTimeOther = OffsetDateTime.parse("2024-04-24T10:00:00Z") + table("runs.checkpoints").insert( + add("id_checkpoint", checkpointId2) + .add("fk_partitioning", partitioningId) + .add("checkpoint_name", "CheckpointNameOther") + .add("measured_by_atum_agent", true) + .add("process_start_time", startTimeOther) + .add("process_end_time", endTimeOther) + .add("created_by", "Joseph") + ) + + val checkpointId3 = UUID.fromString("41ea35cd-6398-42ed-b6d3-a8d613176575") + val startTime3 = OffsetDateTime.parse("1993-02-14T11:00:00Z") + val endTime3 = OffsetDateTime.parse("2024-04-24T11:00:00Z") + table("runs.checkpoints").insert( + add("id_checkpoint", checkpointId3) + .add("fk_partitioning", partitioningId) + .add("checkpoint_name", "CheckpointNameCntAndAvg") + .add("measured_by_atum_agent", true) + .add("process_start_time", startTime3) + .add("process_end_time", endTime3) + .add("created_by", "Joseph") + ) + + // Insert measure definitions and measurements + val measureDefinitionAvgId: Long = Random.nextLong() + table("runs.measure_definitions").insert( + add("id_measure_definition", measureDefinitionAvgId) + .add("fk_partitioning", partitioningId) + .add("measure_name", "avg") + .add("measured_columns", CustomDBType("""{"a","b"}""", "TEXT[]")) + .add("created_by", "Joseph") + ) + + val measureDefinitionCntId: Long = Random.nextLong() + table("runs.measure_definitions").insert( + add("id_measure_definition", measureDefinitionCntId) + .add("fk_partitioning", partitioningId) + .add("measure_name", "cnt") + .add("measured_columns", CustomDBType("""{"col1"}""", "TEXT[]")) + .add("created_by", "Joseph") + ) + + val measureDefinitionOtherId: Long = Random.nextLong() + table("runs.measure_definitions").insert( + add("id_measure_definition", measureDefinitionOtherId) + .add("fk_partitioning", partitioningId) + .add("measure_name", "sum") + .add("measured_columns", CustomDBType("""{"colOther"}""", "TEXT[]")) + .add("created_by", "Joseph") + ) + + table("runs.measurements").insert( + add("fk_measure_definition", measureDefinitionCntId) + .add("fk_checkpoint", checkpointId1) + .add("measurement_value", measurementCnt) + ) + + table("runs.measurements").insert( + add("fk_measure_definition", measureDefinitionAvgId) + .add("fk_checkpoint", checkpointId1) + .add("measurement_value", measurementAvg) + ) + + table("runs.measurements").insert( + add("fk_measure_definition", measureDefinitionCntId) + .add("fk_checkpoint", checkpointId3) + .add("measurement_value", measurementCnt) + ) + + table("runs.measurements").insert( + add("fk_measure_definition", measureDefinitionAvgId) + .add("fk_checkpoint", checkpointId3) + .add("measurement_value", measurementAvg) + ) + + table("runs.measurements").insert( + add("fk_measure_definition", Random.nextLong()) + .add("fk_checkpoint", checkpointId2) + .add("measurement_value", measurementSum) + ) + + // Actual test execution and assertions with limit and offset applied + val actualMeasures: Seq[MeasuredDetails] = function(fncGetFlowCheckpointsV2) + .setParam("i_flow_id", flowId) + .setParam("i_checkpoint_name", "CheckpointNameCntAndAvg") + .execute { queryResult => + assert(queryResult.hasNext) + + val row1 = queryResult.next() + assert(row1.getInt("status").contains(11)) + assert(row1.getString("status_text").contains("OK")) + assert(row1.getUUID("id_checkpoint").contains(checkpointId3)) + assert(row1.getString("checkpoint_name").contains("CheckpointNameCntAndAvg")) + assert(row1.getString("checkpoint_author").contains("Joseph")) + assert(row1.getBoolean("measured_by_atum_agent").contains(true)) + assert(row1.getOffsetDateTime("checkpoint_start_time").contains(startTime3)) + assert(row1.getOffsetDateTime("checkpoint_end_time").contains(endTime3)) + assert(row1.getLong("id_partitioning").contains(partitioningId)) + val expectedPartitioningJson1 = parseJsonBStringOrThrow(partitioning) + val returnedPartitioningJson1 = parseJsonBStringOrThrow(row1.getJsonB("partitioning").get) + assert(returnedPartitioningJson1 == expectedPartitioningJson1) + assert(row1.getString("partitioning_author").contains("Joseph")) + + val measure1 = MeasuredDetails( + row1.getString("measure_name").get, + row1.getArray[String]("measured_columns").map(_.toList).get, + row1.getJsonB("measurement_value").get + ) + + val row2 = queryResult.next() + assert(row2.getInt("status").contains(11)) + assert(row2.getString("status_text").contains("OK")) + assert(row2.getUUID("id_checkpoint").contains(checkpointId3)) + assert(row2.getString("checkpoint_name").contains("CheckpointNameCntAndAvg")) + assert(row2.getString("checkpoint_author").contains("Joseph")) + assert(row2.getBoolean("measured_by_atum_agent").contains(true)) + assert(row2.getOffsetDateTime("checkpoint_start_time").contains(startTime3)) + assert(row2.getOffsetDateTime("checkpoint_end_time").contains(endTime3)) + assert(row2.getLong("id_partitioning").contains(partitioningId)) + val expectedPartitioningJson2 = parseJsonBStringOrThrow(partitioning) + val returnedPartitioningJson2 = parseJsonBStringOrThrow(row2.getJsonB("partitioning").get) + assert(returnedPartitioningJson2 == expectedPartitioningJson2) + assert(row2.getString("partitioning_author").contains("Joseph")) + + val measure2 = MeasuredDetails( + row2.getString("measure_name").get, + row2.getArray[String]("measured_columns").map(_.toList).get, + row2.getJsonB("measurement_value").get + ) + + assert(queryResult.hasNext) + + val row3 = queryResult.next() + assert(row3.getInt("status").contains(11)) + assert(row3.getString("status_text").contains("OK")) + assert(row3.getUUID("id_checkpoint").contains(checkpointId1)) + assert(row3.getString("checkpoint_name").contains("CheckpointNameCntAndAvg")) + assert(row3.getString("checkpoint_author").contains("Joseph")) + assert(row3.getBoolean("measured_by_atum_agent").contains(true)) + assert(row3.getOffsetDateTime("checkpoint_start_time").contains(startTime1)) + assert(row3.getOffsetDateTime("checkpoint_end_time").contains(endTime1)) + assert(row3.getLong("id_partitioning").contains(partitioningId)) + val expectedPartitioningJson3 = parseJsonBStringOrThrow(partitioning) + val returnedPartitioningJson3 = parseJsonBStringOrThrow(row2.getJsonB("partitioning").get) + assert(returnedPartitioningJson3 == expectedPartitioningJson3) + assert(row3.getString("partitioning_author").contains("Joseph")) + + val measure3 = MeasuredDetails( + row3.getString("measure_name").get, + row3.getArray[String]("measured_columns").map(_.toList).get, + row3.getJsonB("measurement_value").get + ) + + val row4 = queryResult.next() + assert(row4.getInt("status").contains(11)) + assert(row4.getString("status_text").contains("OK")) + assert(row4.getUUID("id_checkpoint").contains(checkpointId1)) + assert(row4.getString("checkpoint_name").contains("CheckpointNameCntAndAvg")) + assert(row4.getString("checkpoint_author").contains("Joseph")) + assert(row4.getBoolean("measured_by_atum_agent").contains(true)) + assert(row4.getOffsetDateTime("checkpoint_start_time").contains(startTime1)) + assert(row4.getOffsetDateTime("checkpoint_end_time").contains(endTime1)) + assert(row4.getLong("id_partitioning").contains(partitioningId)) + val expectedPartitioningJson4 = parseJsonBStringOrThrow(partitioning) + val returnedPartitioningJson4 = parseJsonBStringOrThrow(row2.getJsonB("partitioning").get) + assert(returnedPartitioningJson4 == expectedPartitioningJson4) + assert(row4.getString("partitioning_author").contains("Joseph")) + + val measure4 = MeasuredDetails( + row4.getString("measure_name").get, + row4.getArray[String]("measured_columns").map(_.toList).get, + row4.getJsonB("measurement_value").get + ) + + Seq(measure1, measure2, measure3, measure4) + } + + // Assertions for measures + assert(actualMeasures.map(_.measureName).toSet == Set("avg", "cnt")) + assert(actualMeasures.map(_.measureColumns).toSet == Set(List("a", "b"), List("col1"))) + + actualMeasures.foreach { currVal => + val currValStr = currVal.measurementValue.value + + currVal.measureName match { + case "cnt" => + assert(currValStr.contains(""""value": "3"""")) + case "avg" => + assert(currValStr.contains(""""value": "2.71"""")) + case other => + fail(s"Unexpected measure name: $other") + } + } + } + test("getFlowCheckpointsV2 should return limited with checkpoints for a given flow") { val partitioningId: Long = Random.nextLong() @@ -361,7 +599,7 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { assert(row1.getString("status_text").contains("OK")) assert(row1.getUUID("id_checkpoint").contains(checkpointId1)) assert(row1.getString("checkpoint_name").contains("CheckpointNameCntAndAvg")) - assert(row1.getString("author").contains("Joseph")) + assert(row1.getString("checkpoint_author").contains("Joseph")) assert(row1.getBoolean("measured_by_atum_agent").contains(true)) assert(row1.getOffsetDateTime("checkpoint_start_time").contains(startTime)) assert(row1.getOffsetDateTime("checkpoint_end_time").contains(endTime)) @@ -378,7 +616,7 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { assert(row2.getString("status_text").contains("OK")) assert(row2.getUUID("id_checkpoint").contains(checkpointId1)) assert(row2.getString("checkpoint_name").contains("CheckpointNameCntAndAvg")) - assert(row2.getString("author").contains("Joseph")) + assert(row2.getString("checkpoint_author").contains("Joseph")) assert(row2.getBoolean("measured_by_atum_agent").contains(true)) assert(row2.getOffsetDateTime("checkpoint_start_time").contains(startTimeOther)) assert(row2.getOffsetDateTime("checkpoint_end_time").contains(endTimeOther)) @@ -395,7 +633,7 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { assert(row3.getString("status_text").contains("OK")) assert(row3.getUUID("id_checkpoint").contains(checkpointId2)) assert(row3.getString("checkpoint_name").contains("CheckpointNameOther")) - assert(row3.getString("author").contains("Joseph")) + assert(row3.getString("checkpoint_author").contains("Joseph")) assert(row3.getBoolean("measured_by_atum_agent").contains(true)) assert(row3.getOffsetDateTime("checkpoint_start_time").contains(startTimeOther)) assert(row3.getOffsetDateTime("checkpoint_end_time").contains(endTimeOther)) @@ -447,4 +685,8 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { } + private def parseJsonBStringOrThrow(jsonBString: JsonBString): Json = { + parse(jsonBString.value).getOrElse(throw new Exception("Failed to parse JsonBString to Json")) + } + } diff --git a/model/src/main/scala/za/co/absa/atum/model/dto/CheckpointWithPartitioningDTO.scala b/model/src/main/scala/za/co/absa/atum/model/dto/CheckpointWithPartitioningDTO.scala new file mode 100644 index 00000000..d7226320 --- /dev/null +++ b/model/src/main/scala/za/co/absa/atum/model/dto/CheckpointWithPartitioningDTO.scala @@ -0,0 +1,41 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.model.dto + +import io.circe.{Decoder, Encoder} +import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} + +import java.time.ZonedDateTime +import java.util.UUID + +case class CheckpointWithPartitioningDTO( + id: UUID, + name: String, + author: String, + measuredByAtumAgent: Boolean = false, + processStartTime: ZonedDateTime, + processEndTime: Option[ZonedDateTime], + measurements: Set[MeasurementDTO], + partitioning: PartitioningWithIdDTO +) + +object CheckpointWithPartitioningDTO { + + implicit val decodeCheckpointWithPartitioningDTO: Decoder[CheckpointWithPartitioningDTO] = deriveDecoder + implicit val encodeCheckpointWithPartitioningDTO: Encoder[CheckpointWithPartitioningDTO] = deriveEncoder + +} diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowController.scala index e3168637..505656ae 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowController.scala @@ -16,7 +16,7 @@ package za.co.absa.atum.server.api.controller -import za.co.absa.atum.model.dto.CheckpointV2DTO +import za.co.absa.atum.model.dto.CheckpointWithPartitioningDTO import za.co.absa.atum.model.envelopes.ErrorResponse import za.co.absa.atum.model.envelopes.SuccessResponse.PaginatedResponse import zio.IO @@ -29,6 +29,6 @@ trait FlowController { limit: Option[Int], offset: Option[Long], checkpointName: Option[String], - ): IO[ErrorResponse, PaginatedResponse[CheckpointV2DTO]] + ): IO[ErrorResponse, PaginatedResponse[CheckpointWithPartitioningDTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowControllerImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowControllerImpl.scala index 727e5729..ebd70030 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowControllerImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowControllerImpl.scala @@ -16,7 +16,7 @@ package za.co.absa.atum.server.api.controller -import za.co.absa.atum.model.dto.CheckpointV2DTO +import za.co.absa.atum.model.dto.CheckpointWithPartitioningDTO import za.co.absa.atum.model.envelopes.ErrorResponse import za.co.absa.atum.model.envelopes.SuccessResponse.PaginatedResponse import za.co.absa.atum.server.api.service.FlowService @@ -31,8 +31,8 @@ class FlowControllerImpl(flowService: FlowService) extends FlowController with B limit: Option[Int], offset: Option[Long], checkpointName: Option[String] - ): IO[ErrorResponse, PaginatedResponse[CheckpointV2DTO]] = { - val flowData = serviceCall[PaginatedResult[CheckpointV2DTO], PaginatedResult[CheckpointV2DTO]]( + ): IO[ErrorResponse, PaginatedResponse[CheckpointWithPartitioningDTO]] = { + val flowData = serviceCall[PaginatedResult[CheckpointWithPartitioningDTO], PaginatedResult[CheckpointWithPartitioningDTO]]( flowService.getFlowCheckpoints(flowId, limit, offset, checkpointName) ) mapToPaginatedResponse(limit.get, offset.get, flowData) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpoints.scala index b24b0612..77858873 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpoints.scala @@ -20,51 +20,54 @@ import doobie.implicits.toSqlInterpolator import za.co.absa.atum.server.api.database.PostgresDatabaseProvider import za.co.absa.atum.server.api.database.flows.Flows import za.co.absa.atum.server.api.database.flows.functions.GetFlowCheckpoints.GetFlowCheckpointsArgs -import za.co.absa.atum.server.model.CheckpointItemFromDB +import za.co.absa.atum.server.model.CheckpointItemWithPartitioningFromDB import za.co.absa.db.fadb.DBSchema import za.co.absa.db.fadb.doobie.DoobieEngine import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWithAggStatus import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling import zio._ - import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbGet import za.co.absa.atum.server.api.database.DoobieImplicits.Sequence.get import doobie.postgres.implicits._ - class GetFlowCheckpoints(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) - extends DoobieMultipleResultFunctionWithAggStatus[GetFlowCheckpointsArgs, Option[CheckpointItemFromDB], Task](input => - Seq( - fr"${input.flowId}", - fr"${input.limit}", - fr"${input.offset}", - fr"${input.checkpointName}" + extends DoobieMultipleResultFunctionWithAggStatus[GetFlowCheckpointsArgs, Option[ + CheckpointItemWithPartitioningFromDB + ], Task](input => + Seq( + fr"${input.flowId}", + fr"${input.limit}", + fr"${input.offset}", + fr"${input.checkpointName}" + ) ) - ) with StandardStatusHandling with ByFirstErrorStatusAggregator { override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq( "id_checkpoint", "checkpoint_name", - "author", + "checkpoint_author", "measured_by_atum_agent", "measure_name", "measured_columns", "measurement_value", "checkpoint_start_time", "checkpoint_end_time", + "id_partitioning", + "partitioning", + "partitioning_author", "has_more" ) } object GetFlowCheckpoints { case class GetFlowCheckpointsArgs( - flowId: Long, - limit: Option[Int], - offset: Option[Long], - checkpointName: Option[String] + flowId: Long, + limit: Option[Int], + offset: Option[Long], + checkpointName: Option[String] ) val layer: URLayer[PostgresDatabaseProvider, GetFlowCheckpoints] = ZLayer { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala index 2c078918..8a138d4b 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala @@ -133,14 +133,14 @@ trait Endpoints extends BaseEndpoints { } protected val getFlowCheckpointsEndpointV2 - : PublicEndpoint[(Long, Option[Int], Option[Long], Option[String]), ErrorResponse, PaginatedResponse[CheckpointV2DTO], Any] = { + : PublicEndpoint[(Long, Option[Int], Option[Long], Option[String]), ErrorResponse, PaginatedResponse[CheckpointWithPartitioningDTO], Any] = { apiV2.get .in(V2Paths.Flows / path[Long]("flowId") / V2Paths.Checkpoints) .in(query[Option[Int]]("limit").default(Some(10)).validateOption(Validator.inRange(1, 1000))) .in(query[Option[Long]]("offset").default(Some(0L)).validateOption(Validator.min(0L))) .in(query[Option[String]]("checkpoint-name")) .out(statusCode(StatusCode.Ok)) - .out(jsonBody[PaginatedResponse[CheckpointV2DTO]]) + .out(jsonBody[PaginatedResponse[CheckpointWithPartitioningDTO]]) .errorOutVariantPrepend(notFoundErrorOneOfVariant) } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala index cc13fffa..00ade985 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala @@ -24,7 +24,7 @@ import sttp.tapir.server.http4s.ztapir.ZHttp4sServerInterpreter import sttp.tapir.server.interceptor.metrics.MetricsRequestInterceptor import sttp.tapir.swagger.bundle.SwaggerInterpreter import sttp.tapir.ztapir._ -import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataPatchDTO, CheckpointV2DTO, PartitioningWithIdDTO} +import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataPatchDTO, CheckpointV2DTO, CheckpointWithPartitioningDTO, PartitioningWithIdDTO} import za.co.absa.atum.model.envelopes.{ErrorResponse, StatusResponse} import za.co.absa.atum.server.api.controller.{CheckpointController, FlowController, PartitioningController} import za.co.absa.atum.server.config.{HttpMonitoringConfig, JvmMonitoringConfig} @@ -93,7 +93,7 @@ trait Routes extends Endpoints with ServerOptions { createServerEndpoint[ (Long, Option[Int], Option[Long], Option[String]), ErrorResponse, - PaginatedResponse[CheckpointV2DTO] + PaginatedResponse[CheckpointWithPartitioningDTO] ](getFlowCheckpointsEndpointV2, { case (flowId: Long, limit: Option[Int], offset: Option[Long], checkpointName: Option[String]) => FlowController.getFlowCheckpoints(flowId, limit, offset, checkpointName) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepository.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepository.scala index 1749d13b..ae4412e9 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepository.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepository.scala @@ -16,7 +16,7 @@ package za.co.absa.atum.server.api.repository -import za.co.absa.atum.model.dto.CheckpointV2DTO +import za.co.absa.atum.model.dto.CheckpointWithPartitioningDTO import za.co.absa.atum.server.api.exception.DatabaseError import za.co.absa.atum.server.model.PaginatedResult import zio._ @@ -30,6 +30,6 @@ trait FlowRepository { limit: Option[Int], offset: Option[Long], checkpointName: Option[String] - ): IO[DatabaseError, PaginatedResult[CheckpointV2DTO]] + ): IO[DatabaseError, PaginatedResult[CheckpointWithPartitioningDTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepositoryImpl.scala index d52ef71c..b8bb9dcb 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepositoryImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepositoryImpl.scala @@ -16,10 +16,10 @@ package za.co.absa.atum.server.api.repository -import za.co.absa.atum.model.dto.CheckpointV2DTO +import za.co.absa.atum.model.dto.CheckpointWithPartitioningDTO import za.co.absa.atum.server.api.database.flows.functions.GetFlowCheckpoints import za.co.absa.atum.server.api.exception.DatabaseError -import za.co.absa.atum.server.model.{CheckpointItemFromDB, PaginatedResult} +import za.co.absa.atum.server.model.{CheckpointItemWithPartitioningFromDB, PaginatedResult} import za.co.absa.atum.server.api.database.flows.functions.GetFlowCheckpoints.GetFlowCheckpointsArgs import za.co.absa.atum.server.api.exception.DatabaseError.GeneralDatabaseError import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore} @@ -34,7 +34,7 @@ class FlowRepositoryImpl(getFlowCheckpointsFn: GetFlowCheckpoints) limit: Option[Int], offset: Option[Long], checkpointName: Option[String] - ): IO[DatabaseError, PaginatedResult[CheckpointV2DTO]] = { + ): IO[DatabaseError, PaginatedResult[CheckpointWithPartitioningDTO]] = { dbMultipleResultCallWithAggregatedStatus( getFlowCheckpointsFn(GetFlowCheckpointsArgs(flowId, limit, offset, checkpointName)), "getFlowCheckpoints" @@ -42,7 +42,7 @@ class FlowRepositoryImpl(getFlowCheckpointsFn: GetFlowCheckpoints) .map(_.flatten) .flatMap { checkpointItems => ZIO - .fromEither(CheckpointItemFromDB.groupAndConvertItemsToCheckpointV2DTOs(checkpointItems)) + .fromEither(CheckpointItemWithPartitioningFromDB.groupAndConvertItemsToCheckpointWithPartitioningDTOs(checkpointItems)) .mapBoth( error => GeneralDatabaseError(error.getMessage), checkpoints => diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/FlowService.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/FlowService.scala index b3e9fc82..73017fb9 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/FlowService.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/FlowService.scala @@ -16,7 +16,7 @@ package za.co.absa.atum.server.api.service -import za.co.absa.atum.model.dto.CheckpointV2DTO +import za.co.absa.atum.model.dto.CheckpointWithPartitioningDTO import za.co.absa.atum.server.api.exception.ServiceError import za.co.absa.atum.server.model.PaginatedResult import zio._ @@ -30,6 +30,6 @@ trait FlowService { limit: Option[Int], offset: Option[Long], checkpointName: Option[String] - ): IO[ServiceError, PaginatedResult[CheckpointV2DTO]] + ): IO[ServiceError, PaginatedResult[CheckpointWithPartitioningDTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/FlowServiceImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/FlowServiceImpl.scala index 8ba8a6b9..66a520c2 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/FlowServiceImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/FlowServiceImpl.scala @@ -29,7 +29,7 @@ class FlowServiceImpl(flowRepository: FlowRepository) extends FlowService with B limit: Option[Int], offset: Option[Long], checkpointName: Option[String] - ): IO[ServiceError, PaginatedResult[CheckpointV2DTO]] = { + ): IO[ServiceError, PaginatedResult[CheckpointWithPartitioningDTO]] = { repositoryCall( flowRepository.getFlowCheckpoints(flowId, limit, offset, checkpointName), "getFlowCheckpoints" diff --git a/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDB.scala b/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDB.scala new file mode 100644 index 00000000..f75b1472 --- /dev/null +++ b/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDB.scala @@ -0,0 +1,119 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.model + +import io.circe.Json +import za.co.absa.atum.model.dto.{ + CheckpointWithPartitioningDTO, + MeasureDTO, + MeasureResultDTO, + MeasurementDTO, + PartitionDTO, + PartitioningWithIdDTO +} + +import java.time.ZonedDateTime +import java.util.UUID + +case class CheckpointItemWithPartitioningFromDB( + idCheckpoint: UUID, + checkpointName: String, + author: String, + measuredByAtumAgent: Boolean, + measureName: String, + measuredColumns: Seq[String], + measurementValue: Json, // JSON representation of `MeasurementDTO` + checkpointStartTime: ZonedDateTime, + checkpointEndTime: Option[ZonedDateTime], + idPartitioning: Long, + partitioning: Json, // JSON representation of `PartitioningForDB` + partitioningAuthor: String, + hasMore: Boolean +) + +object CheckpointItemWithPartitioningFromDB { + + private def fromItemsToCheckpointWithPartitioningDTO( + checkpointItems: Seq[CheckpointItemWithPartitioningFromDB] + ): Either[Throwable, CheckpointWithPartitioningDTO] = { + for { + measurements <- extractMeasurements(checkpointItems) + partitioning <- extractPartitioning(checkpointItems) + } yield { + CheckpointWithPartitioningDTO( + id = checkpointItems.head.idCheckpoint, + name = checkpointItems.head.checkpointName, + author = checkpointItems.head.author, + measuredByAtumAgent = checkpointItems.head.measuredByAtumAgent, + processStartTime = checkpointItems.head.checkpointStartTime, + processEndTime = checkpointItems.head.checkpointEndTime, + measurements = measurements.toSet, + partitioning + ) + } + } + + private def extractMeasurements( + checkpointItems: Seq[CheckpointItemWithPartitioningFromDB] + ): Either[Throwable, Seq[MeasurementDTO]] = { + val measurementsOrErr = checkpointItems.map { checkpointItem => + checkpointItem.measurementValue.as[MeasureResultDTO].map { measureResult => + MeasurementDTO( + measure = MeasureDTO( + measureName = checkpointItem.measureName, + measuredColumns = checkpointItem.measuredColumns + ), + result = measureResult + ) + } + } + measurementsOrErr + .collectFirst { case Left(err) => Left(err) } + .getOrElse(Right(measurementsOrErr.collect { case Right(measurement) => measurement })) + } + + private def extractPartitioning( + checkpointItems: Seq[CheckpointItemWithPartitioningFromDB] + ): Either[Throwable, PartitioningWithIdDTO] = { + checkpointItems.head.partitioning.as[PartitioningForDB].map { partitioningForDB => + val partitioningDTO = partitioningForDB.keys.map { key => + PartitionDTO(key, partitioningForDB.keysToValuesMap(key)) + } + PartitioningWithIdDTO( + id = checkpointItems.head.idPartitioning, + partitioning = partitioningDTO, + author = checkpointItems.head.partitioningAuthor + ) + } + } + + def groupAndConvertItemsToCheckpointWithPartitioningDTOs( + checkpointItems: Seq[CheckpointItemWithPartitioningFromDB] + ): Either[Throwable, Seq[CheckpointWithPartitioningDTO]] = { + val groupedItems = checkpointItems.groupBy(_.idCheckpoint) + val orderedCheckpointIds = checkpointItems + .sortBy(_.checkpointStartTime)(Ordering[ZonedDateTime].reverse) + .map(_.idCheckpoint) + .distinct + + val result = orderedCheckpointIds.map { id => fromItemsToCheckpointWithPartitioningDTO(groupedItems(id)) } + + val error = result.collectFirst { case Left(err) => Left(err) } + error.getOrElse(Right(result.collect { case Right(dto) => dto })) + } + +} diff --git a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala index 6d39bf2f..cd9647c5 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala @@ -16,13 +16,13 @@ package za.co.absa.atum.server.api -import io.circe.parser +import io.circe.{Json, parser} import io.circe.syntax.EncoderOps import za.co.absa.atum.model.dto.MeasureResultDTO.TypedValue import za.co.absa.atum.model.dto._ import za.co.absa.atum.model.{ResultValueType, dto} import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings.GetFlowPartitioningsResult -import za.co.absa.atum.server.model.{CheckpointFromDB, CheckpointItemFromDB, MeasureFromDB, PartitioningFromDB} +import za.co.absa.atum.server.model.{CheckpointFromDB, CheckpointItemFromDB, CheckpointItemWithPartitioningFromDB, MeasureFromDB, PartitioningFromDB} import java.time.ZonedDateTime import java.util.{Base64, UUID} @@ -50,7 +50,7 @@ trait TestData { authorIfNew = "" ) - private val partitioningAsJson = parser + protected val partitioningAsJson: Json = parser .parse( """ |{ @@ -299,6 +299,29 @@ trait TestData { protected val checkpointV2DTO3: CheckpointV2DTO = checkpointV2DTO1.copy(id = UUID.randomUUID()) protected val checkpointV2DTO4: CheckpointV2DTO = checkpointV2DTO1.copy(id = UUID.randomUUID()) + protected val measurementValue1: Json = + parser + .parse( + """ + |{ + | "mainValue": { + | "value": "123", + | "valueType": "Long" + | }, + | "supportValues": { + | "key1": { + | "value": "123456789", + | "valueType": "Long" + | }, + | "key2": { + | "value": "12345.6789", + | "valueType": "BigDecimal" + | } + | } + |} + |""".stripMargin + ).toOption.get + // Checkpoint From DB protected val checkpointFromDB1: CheckpointFromDB = CheckpointFromDB( idCheckpoint = Some(checkpointDTO1.id), @@ -381,6 +404,84 @@ trait TestData { hasMore = false ) + protected val checkpointItemWithPartitioningFromDB1: CheckpointItemWithPartitioningFromDB = + CheckpointItemWithPartitioningFromDB( + idCheckpoint = checkpointItemFromDB1.idCheckpoint, + checkpointName = checkpointItemFromDB1.checkpointName, + author = checkpointItemFromDB1.author, + measuredByAtumAgent = checkpointItemFromDB1.measuredByAtumAgent, + measureName = checkpointItemFromDB1.measureName, + measuredColumns = checkpointItemFromDB1.measuredColumns, + measurementValue = checkpointItemFromDB1.measurementValue, + checkpointStartTime = checkpointItemFromDB1.checkpointStartTime, + checkpointEndTime = checkpointItemFromDB1.checkpointEndTime, + idPartitioning = partitioningFromDB1.id, + partitioning = partitioningFromDB1.partitioning, + partitioningAuthor = partitioningFromDB1.author, + hasMore = checkpointItemFromDB1.hasMore + ) + + protected val checkpointItemWithPartitioningFromDB2: CheckpointItemWithPartitioningFromDB = + CheckpointItemWithPartitioningFromDB( + idCheckpoint = checkpointItemFromDB2.idCheckpoint, + checkpointName = checkpointItemFromDB2.checkpointName, + author = checkpointItemFromDB2.author, + measuredByAtumAgent = checkpointItemFromDB2.measuredByAtumAgent, + measureName = checkpointItemFromDB2.measureName, + measuredColumns = checkpointItemFromDB2.measuredColumns, + measurementValue = checkpointItemFromDB2.measurementValue, + checkpointStartTime = checkpointItemFromDB2.checkpointStartTime, + checkpointEndTime = checkpointItemFromDB2.checkpointEndTime, + idPartitioning = partitioningFromDB1.id, + partitioning = partitioningFromDB1.partitioning, + partitioningAuthor = partitioningFromDB1.author, + hasMore = checkpointItemFromDB2.hasMore + ) + + protected val checkpointWithPartitioningDTO1: CheckpointWithPartitioningDTO = + CheckpointWithPartitioningDTO( + id = checkpointItemWithPartitioningFromDB1.idCheckpoint, + name = checkpointItemWithPartitioningFromDB1.checkpointName, + author = checkpointItemWithPartitioningFromDB1.author, + measuredByAtumAgent = checkpointItemWithPartitioningFromDB1.measuredByAtumAgent, + processStartTime = checkpointItemWithPartitioningFromDB1.checkpointStartTime, + processEndTime = checkpointItemWithPartitioningFromDB1.checkpointEndTime, + measurements = Seq( + MeasurementDTO( + measure = MeasureDTO( + measureName = checkpointItemWithPartitioningFromDB1.measureName, + measuredColumns = checkpointItemWithPartitioningFromDB1.measuredColumns + ), + result = checkpointItemWithPartitioningFromDB1.measurementValue.as[MeasureResultDTO].getOrElse( + throw new Exception("Failed to parse JSON") + ) + ) + ).toSet, + partitioning = partitioningWithIdDTO1 + ) + + protected val checkpointWithPartitioningDTO2: CheckpointWithPartitioningDTO = + CheckpointWithPartitioningDTO( + id = checkpointItemWithPartitioningFromDB2.idCheckpoint, + name = checkpointItemWithPartitioningFromDB2.checkpointName, + author = checkpointItemWithPartitioningFromDB2.author, + measuredByAtumAgent = checkpointItemWithPartitioningFromDB2.measuredByAtumAgent, + processStartTime = checkpointItemWithPartitioningFromDB2.checkpointStartTime, + processEndTime = checkpointItemWithPartitioningFromDB2.checkpointEndTime, + measurements = Seq( + MeasurementDTO( + measure = MeasureDTO( + measureName = checkpointItemWithPartitioningFromDB2.measureName, + measuredColumns = checkpointItemWithPartitioningFromDB2.measuredColumns + ), + result = checkpointItemWithPartitioningFromDB2.measurementValue.as[MeasureResultDTO].getOrElse( + throw new Exception("Failed to parse JSON") + ) + ) + ).toSet, + partitioning = partitioningWithIdDTO1 + ) + protected def createAtumContextDTO(partitioningSubmitDTO: PartitioningSubmitDTO): AtumContextDTO = { val measures: Set[MeasureDTO] = Set(MeasureDTO("count", Seq("*"))) val additionalData: Map[String, Option[String]] = Map.empty diff --git a/server/src/test/scala/za/co/absa/atum/server/api/controller/FlowControllerUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/controller/FlowControllerUnitTests.scala index 5dcf9e87..8db88a54 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/controller/FlowControllerUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/controller/FlowControllerUnitTests.scala @@ -30,9 +30,9 @@ object FlowControllerUnitTests extends ZIOSpecDefault with TestData { private val flowServiceMock = mock(classOf[FlowService]) when(flowServiceMock.getFlowCheckpoints(1L, Some(5), Some(2), None)) - .thenReturn(ZIO.succeed(ResultHasMore(Seq(checkpointV2DTO1)))) + .thenReturn(ZIO.succeed(ResultHasMore(Seq(checkpointWithPartitioningDTO1)))) when(flowServiceMock.getFlowCheckpoints(2L, Some(5), Some(0), None)) - .thenReturn(ZIO.succeed(ResultNoMore(Seq(checkpointV2DTO2)))) + .thenReturn(ZIO.succeed(ResultNoMore(Seq(checkpointWithPartitioningDTO2)))) when(flowServiceMock.getFlowCheckpoints(3L, Some(5), Some(0), None)) .thenReturn(ZIO.fail(NotFoundServiceError("Flow not found"))) @@ -44,12 +44,16 @@ object FlowControllerUnitTests extends ZIOSpecDefault with TestData { test("Returns expected Seq[CheckpointV2DTO] with Pagination indicating there is more data available") { for { result <- FlowController.getFlowCheckpoints(1L, Some(5), Some(2), None) - } yield assertTrue(result.data == Seq(checkpointV2DTO1) && result.pagination == Pagination(5, 2, hasMore = true)) + } yield assertTrue( + result.data == Seq(checkpointWithPartitioningDTO1) && result.pagination == Pagination(5, 2, hasMore = true) + ) }, test("Returns expected Seq[CheckpointV2DTO] with Pagination indicating there is no more data available") { for { result <- FlowController.getFlowCheckpoints(2L, Some(5), Some(0), None) - } yield assertTrue(result.data == Seq(checkpointV2DTO2) && result.pagination == Pagination(5, 0, hasMore = false)) + } yield assertTrue( + result.data == Seq(checkpointWithPartitioningDTO2) && result.pagination == Pagination(5, 0, hasMore = false) + ) }, test("Returns expected NotFoundServiceError when service returns NotFoundServiceError") { assertZIO(FlowController.getFlowCheckpoints(3L, Some(5), Some(0), None).exit)( diff --git a/server/src/test/scala/za/co/absa/atum/server/api/http/GetFlowCheckpointsV2EndpointUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/http/GetFlowCheckpointsV2EndpointUnitTests.scala index 3bc315f0..50a97eac 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/http/GetFlowCheckpointsV2EndpointUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/http/GetFlowCheckpointsV2EndpointUnitTests.scala @@ -23,7 +23,7 @@ import sttp.client3.circe.asJson import sttp.model.StatusCode import sttp.tapir.server.stub.TapirStubInterpreter import sttp.tapir.ztapir.{RIOMonadError, RichZEndpoint} -import za.co.absa.atum.model.dto.CheckpointV2DTO +import za.co.absa.atum.model.dto.CheckpointWithPartitioningDTO import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.controller.FlowController import za.co.absa.atum.model.envelopes.{NotFoundErrorResponse, Pagination} @@ -39,18 +39,22 @@ object GetFlowCheckpointsV2EndpointUnitTests extends ZIOSpecDefault with Endpoin private val uuid = UUID.randomUUID() when(flowControllerMockV2.getFlowCheckpoints(1L, Some(5), Some(0), None)) - .thenReturn(ZIO.succeed(PaginatedResponse(Seq(checkpointV2DTO1), Pagination(5, 0, hasMore = true), uuid))) + .thenReturn( + ZIO.succeed(PaginatedResponse(Seq(checkpointWithPartitioningDTO1), Pagination(5, 0, hasMore = true), uuid)) + ) when(flowControllerMockV2.getFlowCheckpoints(2L, Some(5), Some(0), None)) - .thenReturn(ZIO.succeed(PaginatedResponse(Seq(checkpointV2DTO2), Pagination(5, 0, hasMore = false), uuid))) + .thenReturn( + ZIO.succeed(PaginatedResponse(Seq(checkpointWithPartitioningDTO2), Pagination(5, 0, hasMore = false), uuid)) + ) when(flowControllerMockV2.getFlowCheckpoints(3L, Some(5), Some(0), None)) .thenReturn(ZIO.fail(NotFoundErrorResponse("Flow not found for a given ID"))) private val flowControllerMockLayerV2 = ZLayer.succeed(flowControllerMockV2) private val getFlowCheckpointServerEndpointV2 = getFlowCheckpointsEndpointV2.zServerLogic({ - case (flowId: Long, limit: Option[Int], offset: Option[Long], checkpointName: Option[String]) => - FlowController.getFlowCheckpoints(flowId, limit, offset, checkpointName) - }) + case (flowId: Long, limit: Option[Int], offset: Option[Long], checkpointName: Option[String]) => + FlowController.getFlowCheckpoints(flowId, limit, offset, checkpointName) + }) def spec: Spec[TestEnvironment with Scope, Any] = { val backendStub = TapirStubInterpreter(SttpBackendStub.apply(new RIOMonadError[FlowController])) @@ -63,7 +67,7 @@ object GetFlowCheckpointsV2EndpointUnitTests extends ZIOSpecDefault with Endpoin val baseUri = uri"https://test.com/api/v2/flows/1/checkpoints?limit=5&offset=0" val response = basicRequest .get(baseUri) - .response(asJson[PaginatedResponse[CheckpointV2DTO]]) + .response(asJson[PaginatedResponse[CheckpointWithPartitioningDTO]]) .send(backendStub) val body = response.map(_.body) @@ -71,7 +75,7 @@ object GetFlowCheckpointsV2EndpointUnitTests extends ZIOSpecDefault with Endpoin assertZIO(body <&> statusCode)( equalTo( - Right(PaginatedResponse(Seq(checkpointV2DTO1), Pagination(5, 0, hasMore = true), uuid)), + Right(PaginatedResponse(Seq(checkpointWithPartitioningDTO1), Pagination(5, 0, hasMore = true), uuid)), StatusCode.Ok ) ) @@ -80,7 +84,7 @@ object GetFlowCheckpointsV2EndpointUnitTests extends ZIOSpecDefault with Endpoin val baseUri = uri"https://test.com/api/v2/flows/2/checkpoints?limit=5&offset=0" val response = basicRequest .get(baseUri) - .response(asJson[PaginatedResponse[CheckpointV2DTO]]) + .response(asJson[PaginatedResponse[CheckpointWithPartitioningDTO]]) .send(backendStub) val body = response.map(_.body) @@ -90,7 +94,7 @@ object GetFlowCheckpointsV2EndpointUnitTests extends ZIOSpecDefault with Endpoin assertZIO(body <&> statusCode)( equalTo( - Right(PaginatedResponse(Seq(checkpointV2DTO2), Pagination(5, 0, hasMore = false), uuid)), + Right(PaginatedResponse(Seq(checkpointWithPartitioningDTO2), Pagination(5, 0, hasMore = false), uuid)), StatusCode.Ok ) ) @@ -99,7 +103,7 @@ object GetFlowCheckpointsV2EndpointUnitTests extends ZIOSpecDefault with Endpoin val baseUri = uri"https://test.com/api/v2/flows/3/checkpoints?limit=5&offset=0" val response = basicRequest .get(baseUri) - .response(asJson[PaginatedResponse[CheckpointV2DTO]]) + .response(asJson[PaginatedResponse[CheckpointWithPartitioningDTO]]) .send(backendStub) val statusCode = response.map(_.code) @@ -110,7 +114,7 @@ object GetFlowCheckpointsV2EndpointUnitTests extends ZIOSpecDefault with Endpoin val baseUri = uri"https://test.com/api/v2/flows/1/checkpoints?limit=1005&offset=0" val response = basicRequest .get(baseUri) - .response(asJson[PaginatedResponse[CheckpointV2DTO]]) + .response(asJson[PaginatedResponse[CheckpointWithPartitioningDTO]]) .send(backendStub) val statusCode = response.map(_.code) @@ -121,7 +125,7 @@ object GetFlowCheckpointsV2EndpointUnitTests extends ZIOSpecDefault with Endpoin val baseUri = uri"https://test.com/api/v2/flows/1/checkpoints?limit=-1&offset=0" val response = basicRequest .get(baseUri) - .response(asJson[PaginatedResponse[CheckpointV2DTO]]) + .response(asJson[PaginatedResponse[CheckpointWithPartitioningDTO]]) .send(backendStub) val statusCode = response.map(_.code) diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/FlowRepositoryUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/FlowRepositoryUnitTests.scala index 595eb56e..e2eef59e 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/repository/FlowRepositoryUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/FlowRepositoryUnitTests.scala @@ -17,7 +17,7 @@ package za.co.absa.atum.server.api.repository import org.mockito.Mockito.{mock, when} -import za.co.absa.atum.model.dto.CheckpointV2DTO +import za.co.absa.atum.model.dto.CheckpointWithPartitioningDTO import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.database.flows.functions.GetFlowCheckpoints.GetFlowCheckpointsArgs import za.co.absa.atum.server.api.database.flows.functions.GetFlowCheckpoints @@ -38,8 +38,7 @@ object FlowRepositoryUnitTests extends ZIOSpecDefault with TestData { .thenReturn( ZIO.right( Seq( - Row(FunctionStatus(11, "success"), Some(checkpointItemFromDB1)), - Row(FunctionStatus(11, "success"), Some(checkpointItemFromDB2)) + Row(FunctionStatus(11, "success"), Some(checkpointItemWithPartitioningFromDB1)) ) ) ) @@ -57,12 +56,12 @@ object FlowRepositoryUnitTests extends ZIOSpecDefault with TestData { test("Returns expected Right with CheckpointV2DTO") { for { result <- FlowRepository.getFlowCheckpoints(1, Some(1), Some(1), None) - } yield assertTrue(result == ResultHasMore(Seq(checkpointV2DTO1, checkpointV2DTO2))) + } yield assertTrue(result == ResultHasMore(Seq(checkpointWithPartitioningDTO1))) }, test("Returns expected Right with CheckpointV2DTO") { for { result <- FlowRepository.getFlowCheckpoints(2, Some(1), Some(1), None) - } yield assertTrue(result == ResultNoMore(Seq.empty[CheckpointV2DTO])) + } yield assertTrue(result == ResultNoMore(Seq.empty[CheckpointWithPartitioningDTO])) }, test("Returns expected DatabaseError") { assertZIO(FlowRepository.getFlowCheckpoints(3, None, None, None).exit)( diff --git a/server/src/test/scala/za/co/absa/atum/server/api/service/FlowServiceUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/service/FlowServiceUnitTests.scala index c1bc930c..7dd687ea 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/service/FlowServiceUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/service/FlowServiceUnitTests.scala @@ -30,7 +30,7 @@ object FlowServiceUnitTests extends ZIOSpecDefault with TestData { private val flowRepositoryMock = mock(classOf[FlowRepository]) when(flowRepositoryMock.getFlowCheckpoints(1L, None, None, None)) - .thenReturn(ZIO.succeed(ResultHasMore(Seq(checkpointV2DTO1)))) + .thenReturn(ZIO.succeed(ResultHasMore(Seq(checkpointWithPartitioningDTO1)))) when(flowRepositoryMock.getFlowCheckpoints(2L, None, None, None)) .thenReturn(ZIO.fail(NotFoundDatabaseError("Flow not found"))) @@ -44,7 +44,7 @@ object FlowServiceUnitTests extends ZIOSpecDefault with TestData { for { result <- FlowService.getFlowCheckpoints(1L, None, None, None) } yield assertTrue { - result == ResultHasMore(Seq(checkpointV2DTO1)) + result == ResultHasMore(Seq(checkpointWithPartitioningDTO1)) } }, test("Returns expected ServiceError") { diff --git a/server/src/test/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDBUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDBUnitTests.scala new file mode 100644 index 00000000..c2107f5e --- /dev/null +++ b/server/src/test/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDBUnitTests.scala @@ -0,0 +1,70 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.model + +import org.scalatest.funsuite.AnyFunSuiteLike +import za.co.absa.atum.server.api.TestData + +import java.time.ZonedDateTime +import java.util.UUID + +class CheckpointItemWithPartitioningFromDBUnitTests extends AnyFunSuiteLike with TestData { + + test("groupAndConvertItemsToCheckpointWithPartitioningDTOs should group by idCheckpoint and sort by checkpointStartTime in descending order") { + val checkpointItem1 = CheckpointItemWithPartitioningFromDB( + idCheckpoint = UUID.randomUUID(), + checkpointName = "checkpoint1", + author = "author1", + measuredByAtumAgent = true, + measureName = "measure1", + measuredColumns = Seq("col1"), + measurementValue = measurementValue1, + checkpointStartTime = ZonedDateTime.now().minusDays(1), + checkpointEndTime = Some(ZonedDateTime.now()), + idPartitioning = 1L, + partitioning = partitioningAsJson, + partitioningAuthor = "author1", + hasMore = false + ) + + val checkpointItem2 = CheckpointItemWithPartitioningFromDB( + idCheckpoint = UUID.randomUUID(), + checkpointName = "checkpoint2", + author = "author2", + measuredByAtumAgent = false, + measureName = "measure2", + measuredColumns = Seq("col2"), + measurementValue = measurementValue1, + checkpointStartTime = ZonedDateTime.now(), + checkpointEndTime = Some(ZonedDateTime.now().plusDays(1)), + idPartitioning = 2L, + partitioning = partitioningAsJson, + partitioningAuthor = "author2", + hasMore = true + ) + + val checkpointItems = Seq(checkpointItem1, checkpointItem2) + + val result = CheckpointItemWithPartitioningFromDB.groupAndConvertItemsToCheckpointWithPartitioningDTOs(checkpointItems) + + assert(result.isRight) + val checkpoints = result.toOption.get + + assert(checkpoints.size == 2) + assert(checkpoints.head.processStartTime.isAfter(checkpoints(1).processStartTime)) + } +}