diff --git a/database/src/main/postgres/flows/V0.2.0.57__get_flow_checkpoints.sql b/database/src/main/postgres/flows/V0.2.0.57__get_flow_checkpoints.sql index d3c90f411..72df99e7f 100644 --- a/database/src/main/postgres/flows/V0.2.0.57__get_flow_checkpoints.sql +++ b/database/src/main/postgres/flows/V0.2.0.57__get_flow_checkpoints.sql @@ -15,8 +15,9 @@ */ CREATE OR REPLACE FUNCTION flows.get_flow_checkpoints( - IN i_partitioning_of_flow JSONB, - IN i_limit INT DEFAULT 5, + IN i_flow_id BIGINT, + IN i_checkpoints_limit INT DEFAULT 5, + IN i_offset BIGINT DEFAULT 0, IN i_checkpoint_name TEXT DEFAULT NULL, OUT status INTEGER, OUT status_text TEXT, @@ -28,12 +29,13 @@ CREATE OR REPLACE FUNCTION flows.get_flow_checkpoints( OUT measured_columns TEXT[], OUT measurement_value JSONB, OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE, - OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE + OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE, + OUT has_more BOOLEAN ) RETURNS SETOF record AS $$ -------------------------------------------------------------------------------- +-------------------------------------------------------------------------------------------------------------------- -- --- Function: flows.get_flow_checkpoints(3) +-- Function: flows.get_flow_checkpoints(4) -- Retrieves all checkpoints (measures and their measurement details) related to a primary flow -- associated with the input partitioning. -- @@ -46,13 +48,13 @@ $$ -- Parameters: -- i_partitioning_of_flow - partitioning to use for identifying the flow associate with checkpoints -- that will be retrieved --- i_limit - (optional) maximum number of checkpoint's measurements to return --- if 0 specified, all data will be returned, i.e. no limit will be applied +-- i_checkpoints_limit - (optional) maximum number of checkpoint to return, returns all of them if NULL +-- i_offset - (optional) offset for checkpoints pagination -- i_checkpoint_name - (optional) if specified, returns data related to particular checkpoint's name -- --- Note: checkpoint name uniqueness is not enforced by the data model, so there can be multiple different --- checkpoints with the same name in the DB, i.e. multiple checkpoints can be retrieved even when --- specifying `i_checkpoint_name` parameter +-- Note: i_checkpoint_limit and i_offset are used for pagination purposes; +-- checkpoints are ordered by process_start_time in descending order +-- and then by id_checkpoint in ascending order -- -- Returns: -- status - Status code @@ -65,54 +67,83 @@ $$ -- measure_name - measure name associated with a given checkpoint -- measured_columns - measure columns associated with a given checkpoint -- measurement_value - measurement details associated with a given checkpoint --- checkpoint_time - time +-- checkpoint_start_time - Time of the checkpoint +-- checkpoint_end_time - End time of the checkpoint computation +-- has_more - flag indicating whether there are more checkpoints available, always `false` if `i_limit` is NULL -- -- Status codes: --- 11 - OK --- 41 - Partitioning not found --- -------------------------------------------------------------------------------- - +-- 11 - OK +-- 42 - Flow not found +--------------------------------------------------------------------------------------------------- DECLARE - _fk_partitioning BIGINT; - _fk_flow BIGINT; + _has_more BOOLEAN; BEGIN - _fk_partitioning = runs._get_id_partitioning(i_partitioning_of_flow); - - IF _fk_partitioning IS NULL THEN - status := 41; - status_text := 'Partitioning not found'; + -- Check if the flow exists by querying the partitioning_to_flow table. + -- Rationale: + -- This table is preferred over the flows table because: + -- 1. Every flow has at least one record in partitioning_to_flow. + -- 2. This table is used in subsequent queries, providing a caching advantage. + -- 3. Improves performance by reducing the need to query the flows table directly. + PERFORM 1 FROM flows.partitioning_to_flow WHERE fk_flow = i_flow_id; + IF NOT FOUND THEN + status := 42; + status_text := 'Flow not found'; RETURN NEXT; RETURN; END IF; - SELECT id_flow - FROM flows.flows - WHERE fk_primary_partitioning = _fk_partitioning - INTO _fk_flow; + -- Determine if there are more checkpoints than the limit + IF i_checkpoints_limit IS NOT NULL THEN + SELECT count(*) > i_checkpoints_limit + FROM runs.checkpoints C + JOIN flows.partitioning_to_flow PF ON C.fk_partitioning = PF.fk_partitioning + WHERE PF.fk_flow = i_flow_id + AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name) + LIMIT i_checkpoints_limit + 1 OFFSET i_offset + INTO _has_more; + ELSE + _has_more := false; + END IF; + -- Retrieve the checkpoints and their associated measurements RETURN QUERY - SELECT 11 AS status, 'OK' AS status_text, - CP.id_checkpoint, CP.checkpoint_name, - CP.created_by AS author, CP.measured_by_atum_agent, - MD.measure_name, MD.measured_columns, - M.measurement_value, - CP.process_start_time AS checkpoint_start_time, CP.process_end_time AS checkpoint_end_time - FROM flows.partitioning_to_flow AS PF - JOIN runs.checkpoints AS CP - ON PF.fk_partitioning = CP.fk_partitioning - JOIN runs.measurements AS M - ON CP.id_checkpoint = M.fk_checkpoint - JOIN runs.measure_definitions AS MD - ON M.fk_measure_definition = MD.id_measure_definition - WHERE PF.fk_flow = _fk_flow - AND (i_checkpoint_name IS NULL OR CP.checkpoint_name = i_checkpoint_name) - ORDER BY CP.process_start_time, - CP.id_checkpoint - LIMIT nullif(i_limit, 0); -- NULL means no limit will be applied, it's same as LIMIT ALL - + WITH limited_checkpoints AS ( + SELECT C.id_checkpoint, + C.checkpoint_name, + C.created_by, + C.measured_by_atum_agent, + C.process_start_time, + C.process_end_time + FROM runs.checkpoints C + JOIN flows.partitioning_to_flow PF ON C.fk_partitioning = PF.fk_partitioning + WHERE PF.fk_flow = i_flow_id + AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name) + ORDER BY C.id_checkpoint, C.process_start_time + LIMIT i_checkpoints_limit OFFSET i_offset + ) + SELECT + 11 AS status, + 'OK' AS status_text, + LC.id_checkpoint, + LC.checkpoint_name, + LC.created_by AS author, + LC.measured_by_atum_agent, + MD.measure_name, + MD.measured_columns, + M.measurement_value, + LC.process_start_time AS checkpoint_start_time, + LC.process_end_time AS checkpoint_end_time, + _has_more AS has_more + FROM + limited_checkpoints LC + INNER JOIN + runs.measurements M ON LC.id_checkpoint = M.fk_checkpoint + INNER JOIN + runs.measure_definitions MD ON M.fk_measure_definition = MD.id_measure_definition + ORDER BY + LC.id_checkpoint, LC.process_start_time; END; $$ LANGUAGE plpgsql VOLATILE SECURITY DEFINER; -GRANT EXECUTE ON FUNCTION flows.get_flow_checkpoints(JSONB, INT, TEXT) TO atum_owner; +GRANT EXECUTE ON FUNCTION flows.get_flow_checkpoints(BIGINT, INT, BIGINT, TEXT) TO atum_owner; diff --git a/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowCheckpointsIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowCheckpointsIntegrationTests.scala index 9895eb994..ed06fe224 100644 --- a/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowCheckpointsIntegrationTests.scala +++ b/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowCheckpointsIntegrationTests.scala @@ -1,19 +1,3 @@ -/* - * Copyright 2021 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package za.co.absa.atum.database.flows import za.co.absa.balta.DBTestSuite @@ -24,15 +8,8 @@ import java.time.OffsetDateTime import java.util.UUID import scala.util.Random - class GetFlowCheckpointsIntegrationTests extends DBTestSuite { - private val fncGetFlowCheckpoints = "flows.get_flow_checkpoints" - - case class MeasuredDetails ( - measureName: String, - measureColumns: Seq[String], - measurementValue: JsonBString - ) + private val fncGetFlowCheckpointsV2 = "flows.get_flow_checkpoints" private val partitioning = JsonBString( """ @@ -48,6 +25,12 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { |""".stripMargin ) + case class MeasuredDetails ( + measureName: String, + measureColumns: Seq[String], + measurementValue: JsonBString + ) + private val measurementCnt = JsonBString( """ |{ @@ -93,7 +76,8 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { |""".stripMargin ) - test("Testing get_flow_checkpoints, partitioning and flow exist, but there are no checkpoints") { + test("getFlowCheckpointsV2 should return all checkpoints for a given flow") { + val partitioningId: Long = Random.nextLong() table("runs.partitionings").insert( add("id_partitioning", partitioningId) @@ -104,10 +88,9 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { val flowId: Long = Random.nextLong() table("flows.flows").insert( add("id_flow", flowId) - .add("flow_name", "test_flow1") - .add("flow_description", "Test Flow 1") + .add("flow_name", "flowName") .add("from_pattern", false) - .add("created_by", "ObviouslySomeTest") + .add("created_by", "Joseph") .add("fk_primary_partitioning", partitioningId) ) @@ -117,14 +100,160 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { .add("created_by", "ObviouslySomeTest") ) - function(fncGetFlowCheckpoints) - .setParam("i_partitioning_of_flow", partitioning) - .execute { queryResult => - assert(!queryResult.hasNext) + // Insert checkpoints and measure definitions + val checkpointId1 = UUID.randomUUID() + val startTime = OffsetDateTime.parse("1993-02-14T10:00:00Z") + val endTime = OffsetDateTime.parse("2024-04-24T10:00:00Z") + table("runs.checkpoints").insert( + add("id_checkpoint", checkpointId1) + .add("fk_partitioning", partitioningId) + .add("checkpoint_name", "CheckpointNameCntAndAvg") + .add("measured_by_atum_agent", true) + .add("process_start_time", startTime) + .add("process_end_time", endTime) + .add("created_by", "Joseph") + ) + + val checkpointId2 = UUID.randomUUID() + val startTimeOther = OffsetDateTime.parse("1993-02-14T10:00:00Z") + val endTimeOther = OffsetDateTime.parse("2024-04-24T10:00:00Z") + table("runs.checkpoints").insert( + add("id_checkpoint", checkpointId2) + .add("fk_partitioning", partitioningId) + .add("checkpoint_name", "CheckpointNameOther") + .add("measured_by_atum_agent", true) + .add("process_start_time", startTimeOther) + .add("process_end_time", endTimeOther) + .add("created_by", "Joseph") + ) + + // Insert measure definitions and measurements + val measureDefinitionAvgId: Long = Random.nextLong() + table("runs.measure_definitions").insert( + add("id_measure_definition", measureDefinitionAvgId) + .add("fk_partitioning", partitioningId) + .add("measure_name", "avg") + .add("measured_columns", CustomDBType("""{"a","b"}""", "TEXT[]")) + .add("created_by", "Joseph") + ) + + val measureDefinitionCntId: Long = Random.nextLong() + table("runs.measure_definitions").insert( + add("id_measure_definition", measureDefinitionCntId) + .add("fk_partitioning", partitioningId) + .add("measure_name", "cnt") + .add("measured_columns", CustomDBType("""{"col1"}""", "TEXT[]")) + .add("created_by", "Joseph") + ) + + val measureDefinitionOtherId: Long = Random.nextLong() + table("runs.measure_definitions").insert( + add("id_measure_definition", measureDefinitionOtherId) + .add("fk_partitioning", partitioningId) + .add("measure_name", "sum") + .add("measured_columns", CustomDBType("""{"colOther"}""", "TEXT[]")) + .add("created_by", "Joseph") + ) + + table("runs.measurements").insert( + add("fk_measure_definition", measureDefinitionCntId) + .add("fk_checkpoint", checkpointId1) + .add("measurement_value", measurementCnt) + ) + + table("runs.measurements").insert( + add("fk_measure_definition", measureDefinitionAvgId) + .add("fk_checkpoint", checkpointId1) + .add("measurement_value", measurementAvg) + ) + + table("runs.measurements").insert( + add("fk_measure_definition", measureDefinitionOtherId) + .add("fk_checkpoint", checkpointId2) + .add("measurement_value", measurementSum) + ) + + // Actual test execution and assertions with limit and offset applied + val actualMeasures: Seq[MeasuredDetails] = function(fncGetFlowCheckpointsV2) + .setParam("i_flow_id", flowId) + .execute("checkpoint_name") { queryResult => + assert(queryResult.hasNext) + + val row1 = queryResult.next() + assert(row1.getInt("status").contains(11)) + assert(row1.getString("status_text").contains("OK")) + assert(row1.getUUID("id_checkpoint").contains(checkpointId1)) + assert(row1.getString("checkpoint_name").contains("CheckpointNameCntAndAvg")) + assert(row1.getString("author").contains("Joseph")) + assert(row1.getBoolean("measured_by_atum_agent").contains(true)) + assert(row1.getOffsetDateTime("checkpoint_start_time").contains(startTime)) + assert(row1.getOffsetDateTime("checkpoint_end_time").contains(endTime)) + + val measure1 = MeasuredDetails( + row1.getString("measure_name").get, + row1.getArray[String]("measured_columns").map(_.toList).get, + row1.getJsonB("measurement_value").get + ) + + val row2 = queryResult.next() + assert(row2.getInt("status").contains(11)) + assert(row2.getString("status_text").contains("OK")) + assert(row2.getUUID("id_checkpoint").contains(checkpointId1)) + assert(row2.getString("checkpoint_name").contains("CheckpointNameCntAndAvg")) + assert(row2.getString("author").contains("Joseph")) + assert(row2.getBoolean("measured_by_atum_agent").contains(true)) + assert(row2.getOffsetDateTime("checkpoint_start_time").contains(startTimeOther)) + assert(row2.getOffsetDateTime("checkpoint_end_time").contains(endTimeOther)) + + val measure2 = MeasuredDetails( + row2.getString("measure_name").get, + row2.getArray[String]("measured_columns").map(_.toList).get, + row2.getJsonB("measurement_value").get + ) + + assert(queryResult.hasNext) + + val row3 = queryResult.next() + assert(row3.getInt("status").contains(11)) + assert(row3.getString("status_text").contains("OK")) + assert(row3.getUUID("id_checkpoint").contains(checkpointId2)) + assert(row3.getString("checkpoint_name").contains("CheckpointNameOther")) + assert(row3.getString("author").contains("Joseph")) + assert(row3.getBoolean("measured_by_atum_agent").contains(true)) + assert(row3.getOffsetDateTime("checkpoint_start_time").contains(startTimeOther)) + assert(row3.getOffsetDateTime("checkpoint_end_time").contains(endTimeOther)) + + val measure3 = MeasuredDetails( + row3.getString("measure_name").get, + row3.getArray[String]("measured_columns").map(_.toList).get, + row3.getJsonB("measurement_value").get + ) + + Seq(measure1, measure2, measure3) } + + // Assertions for measures + assert(actualMeasures.map(_.measureName).toSet == Set("avg", "cnt", "sum")) + assert(actualMeasures.map(_.measureColumns).toSet == Set(List("a", "b"), List("col1"), List("colOther"))) + + actualMeasures.foreach { currVal => + val currValStr = currVal.measurementValue.value + + currVal.measureName match { + case "cnt" => + assert(currValStr.contains(""""value": "3"""")) + case "avg" => + assert(currValStr.contains(""""value": "2.71"""")) + case "sum" => + assert(currValStr.contains(""""value": "3000"""")) + case other => + fail(s"Unexpected measure name: $other") + } + } } - test("Testing get_flow_checkpoints, partitioning, flow and checkpoints all exist") { + test("getFlowCheckpointsV2 should return limited with checkpoints for a given flow") { + val partitioningId: Long = Random.nextLong() table("runs.partitionings").insert( add("id_partitioning", partitioningId) @@ -135,10 +264,9 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { val flowId: Long = Random.nextLong() table("flows.flows").insert( add("id_flow", flowId) - .add("flow_name", "test_flow1") - .add("flow_description", "Test Flow 1") + .add("flow_name", "flowName") .add("from_pattern", false) - .add("created_by", "ObviouslySomeTest") + .add("created_by", "Joseph") .add("fk_primary_partitioning", partitioningId) ) @@ -148,39 +276,41 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { .add("created_by", "ObviouslySomeTest") ) - val checkpointId = UUID.randomUUID + // Insert checkpoints and measure definitions + val checkpointId1 = UUID.randomUUID() val startTime = OffsetDateTime.parse("1993-02-14T10:00:00Z") val endTime = OffsetDateTime.parse("2024-04-24T10:00:00Z") table("runs.checkpoints").insert( - add("id_checkpoint", checkpointId) + add("id_checkpoint", checkpointId1) .add("fk_partitioning", partitioningId) .add("checkpoint_name", "CheckpointNameCntAndAvg") .add("measured_by_atum_agent", true) .add("process_start_time", startTime) .add("process_end_time", endTime) - .add("created_by", "ObviouslySomeTest") + .add("created_by", "Joseph") ) - val checkpointOtherId = UUID.randomUUID + val checkpointId2 = UUID.randomUUID() val startTimeOther = OffsetDateTime.parse("1993-02-14T10:00:00Z") val endTimeOther = OffsetDateTime.parse("2024-04-24T10:00:00Z") table("runs.checkpoints").insert( - add("id_checkpoint", checkpointOtherId) + add("id_checkpoint", checkpointId2) .add("fk_partitioning", partitioningId) .add("checkpoint_name", "CheckpointNameOther") .add("measured_by_atum_agent", true) .add("process_start_time", startTimeOther) .add("process_end_time", endTimeOther) - .add("created_by", "ObviouslySomeTest") + .add("created_by", "Joseph") ) + // Insert measure definitions and measurements val measureDefinitionAvgId: Long = Random.nextLong() table("runs.measure_definitions").insert( add("id_measure_definition", measureDefinitionAvgId) .add("fk_partitioning", partitioningId) .add("measure_name", "avg") .add("measured_columns", CustomDBType("""{"a","b"}""", "TEXT[]")) - .add("created_by", "ObviouslySomeTest") + .add("created_by", "Joseph") ) val measureDefinitionCntId: Long = Random.nextLong() @@ -189,7 +319,7 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { .add("fk_partitioning", partitioningId) .add("measure_name", "cnt") .add("measured_columns", CustomDBType("""{"col1"}""", "TEXT[]")) - .add("created_by", "ObviouslySomeTest") + .add("created_by", "Joseph") ) val measureDefinitionOtherId: Long = Random.nextLong() @@ -198,41 +328,44 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { .add("fk_partitioning", partitioningId) .add("measure_name", "sum") .add("measured_columns", CustomDBType("""{"colOther"}""", "TEXT[]")) - .add("created_by", "ObviouslySomeTest") + .add("created_by", "Joseph") ) table("runs.measurements").insert( add("fk_measure_definition", measureDefinitionCntId) - .add("fk_checkpoint", checkpointId) + .add("fk_checkpoint", checkpointId1) .add("measurement_value", measurementCnt) ) table("runs.measurements").insert( add("fk_measure_definition", measureDefinitionAvgId) - .add("fk_checkpoint", checkpointId) + .add("fk_checkpoint", checkpointId1) .add("measurement_value", measurementAvg) ) table("runs.measurements").insert( add("fk_measure_definition", measureDefinitionOtherId) - .add("fk_checkpoint", checkpointOtherId) + .add("fk_checkpoint", checkpointId2) .add("measurement_value", measurementSum) ) - val actualMeasures: Seq[MeasuredDetails] = function(fncGetFlowCheckpoints) - .setParam("i_partitioning_of_flow", partitioning) - .setParam("i_checkpoint_name", "CheckpointNameCntAndAvg") - .execute { queryResult => + // Actual test execution and assertions + val actualMeasures: Seq[MeasuredDetails] = function(fncGetFlowCheckpointsV2) + .setParam("i_flow_id", flowId) + .setParam("i_checkpoints_limit", 2) + .setParam("i_offset", 0L) + .execute("checkpoint_name") { queryResult => assert(queryResult.hasNext) val row1 = queryResult.next() assert(row1.getInt("status").contains(11)) assert(row1.getString("status_text").contains("OK")) - assert(row1.getUUID("id_checkpoint").contains(checkpointId)) + assert(row1.getUUID("id_checkpoint").contains(checkpointId1)) assert(row1.getString("checkpoint_name").contains("CheckpointNameCntAndAvg")) - assert(row1.getString("author").contains("ObviouslySomeTest")) + assert(row1.getString("author").contains("Joseph")) assert(row1.getBoolean("measured_by_atum_agent").contains(true)) assert(row1.getOffsetDateTime("checkpoint_start_time").contains(startTime)) assert(row1.getOffsetDateTime("checkpoint_end_time").contains(endTime)) + assert(queryResult.hasNext) val measure1 = MeasuredDetails( row1.getString("measure_name").get, @@ -243,12 +376,13 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { val row2 = queryResult.next() assert(row2.getInt("status").contains(11)) assert(row2.getString("status_text").contains("OK")) - assert(row2.getUUID("id_checkpoint").contains(checkpointId)) + assert(row2.getUUID("id_checkpoint").contains(checkpointId1)) assert(row2.getString("checkpoint_name").contains("CheckpointNameCntAndAvg")) - assert(row1.getString("author").contains("ObviouslySomeTest")) - assert(row1.getBoolean("measured_by_atum_agent").contains(true)) - assert(row2.getOffsetDateTime("checkpoint_start_time").contains(startTime)) - assert(row2.getOffsetDateTime("checkpoint_end_time").contains(endTime)) + assert(row2.getString("author").contains("Joseph")) + assert(row2.getBoolean("measured_by_atum_agent").contains(true)) + assert(row2.getOffsetDateTime("checkpoint_start_time").contains(startTimeOther)) + assert(row2.getOffsetDateTime("checkpoint_end_time").contains(endTimeOther)) + assert(queryResult.hasNext) val measure2 = MeasuredDetails( row2.getString("measure_name").get, @@ -256,17 +390,61 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { row2.getJsonB("measurement_value").get ) + val row3 = queryResult.next() + assert(row3.getInt("status").contains(11)) + assert(row3.getString("status_text").contains("OK")) + assert(row3.getUUID("id_checkpoint").contains(checkpointId2)) + assert(row3.getString("checkpoint_name").contains("CheckpointNameOther")) + assert(row3.getString("author").contains("Joseph")) + assert(row3.getBoolean("measured_by_atum_agent").contains(true)) + assert(row3.getOffsetDateTime("checkpoint_start_time").contains(startTimeOther)) + assert(row3.getOffsetDateTime("checkpoint_end_time").contains(endTimeOther)) + + val measure3 = MeasuredDetails( + row3.getString("measure_name").get, + row3.getArray[String]("measured_columns").map(_.toList).get, + row3.getJsonB("measurement_value").get + ) + assert(!queryResult.hasNext) - Seq(measure1, measure2) + Seq(measure1, measure2, measure3) } - assert(actualMeasures.map(_.measureName).toSet == Set("avg", "cnt")) - assert(actualMeasures.map(_.measureColumns).toSet == Set(Seq("col1"), Seq("a", "b"))) + // Assertions for measures + assert(actualMeasures.map(_.measureName).toSet == Set("avg", "sum")) + assert(actualMeasures.map(_.measureColumns).toSet == Set(List("a", "b"), List("colOther"))) + actualMeasures.foreach { currVal => val currValStr = currVal.measurementValue.value - // Exact comparison is not trivial, we would need to deserialize (and potentially introduce some case classes - // for this) and modify the JSON strings - not worth it, this should be enough as a sanity check. - assert (currValStr.contains(""""value": "2.71"""") || currValStr.contains(""""value": "3"""")) + + currVal.measureName match { + case "avg" => + assert(currValStr.contains(""""value": "2.71"""")) + case "cnt" => + assert(currValStr.contains(""""value": "3"""")) + case "sum" => + assert(currValStr.contains(""""value": "3000"""")) + case other => + fail(s"Unexpected measure name: $other") + } } } + + test("getFlowCheckpointsV2 should return no flows when flow_id is not found") { + + // Create a non-existent flowId that doesn't exist in the database + val nonExistentFlowId: Long = Random.nextLong() + + // Execute the function with the non-existent flowId + val queryResult = function(fncGetFlowCheckpointsV2) + .setParam("i_flow_id", nonExistentFlowId) + .execute { queryResult => + val results = queryResult.next() + assert(results.getString("status_text").contains("Flow not found")) + assert(results.getInt("status").contains(42)) + assert(!queryResult.hasNext) + } + + } + } diff --git a/server/src/main/scala/za/co/absa/atum/server/Main.scala b/server/src/main/scala/za/co/absa/atum/server/Main.scala index c4ac42381..588779876 100644 --- a/server/src/main/scala/za/co/absa/atum/server/Main.scala +++ b/server/src/main/scala/za/co/absa/atum/server/Main.scala @@ -17,7 +17,8 @@ package za.co.absa.atum.server import za.co.absa.atum.server.api.controller._ -import za.co.absa.atum.server.api.database.flows.functions._ +import za.co.absa.atum.server.api.database.flows.functions.GetFlowCheckpoints +import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings import za.co.absa.atum.server.api.database.{PostgresDatabaseProvider, TransactorProvider} import za.co.absa.atum.server.api.database.runs.functions._ import za.co.absa.atum.server.api.http.Server diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala index 5eaab9068..c0b5dff1f 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala @@ -20,8 +20,8 @@ import io.circe.{Decoder, parser} import za.co.absa.atum.server.api.exception.ServiceError import za.co.absa.atum.server.api.exception.ServiceError._ import za.co.absa.atum.server.api.http.ApiPaths -import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore} import za.co.absa.atum.server.model.SuccessResponse._ +import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore} import za.co.absa.atum.server.model._ import zio._ diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowController.scala index 684c8e8ac..9e2410608 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowController.scala @@ -16,15 +16,19 @@ package za.co.absa.atum.server.api.controller -import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointQueryDTO} +import za.co.absa.atum.model.dto.CheckpointV2DTO import za.co.absa.atum.server.model.ErrorResponse -import za.co.absa.atum.server.model.SuccessResponse.MultiSuccessResponse +import za.co.absa.atum.server.model.SuccessResponse.PaginatedResponse import zio.IO import zio.macros.accessible @accessible trait FlowController { - def getFlowCheckpointsV2( - checkpointQueryDTO: CheckpointQueryDTO - ): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] + def getFlowCheckpoints( + flowId: Long, + limit: Option[Int], + offset: Option[Long], + checkpointName: Option[String], + ): IO[ErrorResponse, PaginatedResponse[CheckpointV2DTO]] + } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowControllerImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowControllerImpl.scala index 2683b529a..5d673ff20 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowControllerImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowControllerImpl.scala @@ -16,25 +16,26 @@ package za.co.absa.atum.server.api.controller -import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointQueryDTO} +import za.co.absa.atum.model.dto.CheckpointV2DTO import za.co.absa.atum.server.api.service.FlowService -import za.co.absa.atum.server.model.ErrorResponse -import za.co.absa.atum.server.model.SuccessResponse.MultiSuccessResponse +import za.co.absa.atum.server.model.{ErrorResponse, PaginatedResult} +import za.co.absa.atum.server.model.SuccessResponse.PaginatedResponse import zio._ class FlowControllerImpl(flowService: FlowService) extends FlowController with BaseController { // to be replaced (and moved to checkpointcontroller) with new implementation in #233 - override def getFlowCheckpointsV2( - checkpointQueryDTO: CheckpointQueryDTO - ): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] = { - mapToMultiSuccessResponse( - serviceCall[Seq[CheckpointDTO], Seq[CheckpointDTO]]( - flowService.getFlowCheckpoints(checkpointQueryDTO) - ) + override def getFlowCheckpoints( + flowId: Long, + limit: Option[Int], + offset: Option[Long], + checkpointName: Option[String] + ): IO[ErrorResponse, PaginatedResponse[CheckpointV2DTO]] = { + val flowData = serviceCall[PaginatedResult[CheckpointV2DTO], PaginatedResult[CheckpointV2DTO]]( + flowService.getFlowCheckpoints(flowId, limit, offset, checkpointName) ) + mapToPaginatedResponse(limit.get, offset.get, flowData) } - } object FlowControllerImpl { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpoints.scala index 7d0c8e079..b24b06122 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpoints.scala @@ -17,29 +17,31 @@ package za.co.absa.atum.server.api.database.flows.functions import doobie.implicits.toSqlInterpolator -import za.co.absa.atum.model.dto.CheckpointQueryDTO import za.co.absa.atum.server.api.database.PostgresDatabaseProvider import za.co.absa.atum.server.api.database.flows.Flows -import za.co.absa.atum.server.model.{CheckpointFromDB, PartitioningForDB} +import za.co.absa.atum.server.api.database.flows.functions.GetFlowCheckpoints.GetFlowCheckpointsArgs +import za.co.absa.atum.server.model.CheckpointItemFromDB import za.co.absa.db.fadb.DBSchema import za.co.absa.db.fadb.doobie.DoobieEngine import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWithAggStatus +import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator +import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling import zio._ + +import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbGet import za.co.absa.atum.server.api.database.DoobieImplicits.Sequence.get import doobie.postgres.implicits._ -import za.co.absa.db.fadb.doobie.postgres.circe.implicits.{jsonbGet, jsonbPut} -import io.circe.syntax.EncoderOps -import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator -import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling + class GetFlowCheckpoints(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) - extends DoobieMultipleResultFunctionWithAggStatus[CheckpointQueryDTO, CheckpointFromDB, Task](values => - Seq( - fr"${PartitioningForDB.fromSeqPartitionDTO(values.partitioning).asJson}", - fr"${values.limit}", - fr"${values.checkpointName}" - ) + extends DoobieMultipleResultFunctionWithAggStatus[GetFlowCheckpointsArgs, Option[CheckpointItemFromDB], Task](input => + Seq( + fr"${input.flowId}", + fr"${input.limit}", + fr"${input.offset}", + fr"${input.checkpointName}" ) + ) with StandardStatusHandling with ByFirstErrorStatusAggregator { @@ -52,11 +54,19 @@ class GetFlowCheckpoints(implicit schema: DBSchema, dbEngine: DoobieEngine[Task] "measured_columns", "measurement_value", "checkpoint_start_time", - "checkpoint_end_time" + "checkpoint_end_time", + "has_more" ) } object GetFlowCheckpoints { + case class GetFlowCheckpointsArgs( + flowId: Long, + limit: Option[Int], + offset: Option[Long], + checkpointName: Option[String] + ) + val layer: URLayer[PostgresDatabaseProvider, GetFlowCheckpoints] = ZLayer { for { dbProvider <- ZIO.service[PostgresDatabaseProvider] diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala index c524d4828..6499d5145 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala @@ -132,6 +132,18 @@ trait Endpoints extends BaseEndpoints { .errorOutVariantPrepend(notFoundErrorOneOfVariant) } + protected val getFlowCheckpointsEndpointV2 + : PublicEndpoint[(Long, Option[Int], Option[Long], Option[String]), ErrorResponse, PaginatedResponse[CheckpointV2DTO], Any] = { + apiV2.get + .in(V2Paths.Flows / path[Long]("flowId") / V2Paths.Checkpoints) + .in(query[Option[Int]]("limit").default(Some(10)).validateOption(Validator.inRange(1, 1000))) + .in(query[Option[Long]]("offset").default(Some(0L)).validateOption(Validator.min(0L))) + .in(query[Option[String]]("checkpoint-name")) + .out(statusCode(StatusCode.Ok)) + .out(jsonBody[PaginatedResponse[CheckpointV2DTO]]) + .errorOutVariantPrepend(notFoundErrorOneOfVariant) + } + protected val getPartitioningByIdEndpointV2 : PublicEndpoint[Long, ErrorResponse, SingleSuccessResponse[PartitioningWithIdDTO], Any] = { apiV2.get diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala index f8095ec4f..b010bf915 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala @@ -90,6 +90,14 @@ trait Routes extends Endpoints with ServerOptions { CheckpointController.getPartitioningCheckpoints(partitioningId, limit, offset, checkpointName) } ), + createServerEndpoint[ + (Long, Option[Int], Option[Long], Option[String]), + ErrorResponse, + PaginatedResponse[CheckpointV2DTO] + ](getFlowCheckpointsEndpointV2, { + case (flowId: Long, limit: Option[Int], offset: Option[Long], checkpointName: Option[String]) => + FlowController.getFlowCheckpoints(flowId, limit, offset, checkpointName) + }), createServerEndpoint(getPartitioningByIdEndpointV2, PartitioningController.getPartitioningByIdV2), createServerEndpoint(getPartitioningMeasuresEndpointV2, PartitioningController.getPartitioningMeasuresV2), createServerEndpoint(getPartitioningMainFlowEndpointV2, PartitioningController.getPartitioningMainFlow), @@ -124,7 +132,8 @@ trait Routes extends Endpoints with ServerOptions { // getPartitioningEndpointV2, // getPartitioningMeasuresEndpointV2, // getFlowPartitioningsEndpointV2, - // getPartitioningMainFlowEndpointV2 + // getPartitioningMainFlowEndpointV2, + // getFlowCheckpointsEndpointV2, healthEndpoint ) ZHttp4sServerInterpreter[HttpEnv.Env](http4sServerOptions(None)) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepository.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepository.scala index 6f56aa145..1749d13be 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepository.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepository.scala @@ -16,13 +16,20 @@ package za.co.absa.atum.server.api.repository -import za.co.absa.atum.model.dto.CheckpointQueryDTO +import za.co.absa.atum.model.dto.CheckpointV2DTO import za.co.absa.atum.server.api.exception.DatabaseError -import za.co.absa.atum.server.model.CheckpointFromDB +import za.co.absa.atum.server.model.PaginatedResult import zio._ import zio.macros.accessible @accessible trait FlowRepository { - def getFlowCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[DatabaseError, Seq[CheckpointFromDB]] + + def getFlowCheckpoints( + partitioningId: Long, + limit: Option[Int], + offset: Option[Long], + checkpointName: Option[String] + ): IO[DatabaseError, PaginatedResult[CheckpointV2DTO]] + } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepositoryImpl.scala index 71152b335..d52ef71c3 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepositoryImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepositoryImpl.scala @@ -16,25 +16,48 @@ package za.co.absa.atum.server.api.repository -import za.co.absa.atum.model.dto.CheckpointQueryDTO +import za.co.absa.atum.model.dto.CheckpointV2DTO import za.co.absa.atum.server.api.database.flows.functions.GetFlowCheckpoints import za.co.absa.atum.server.api.exception.DatabaseError -import za.co.absa.atum.server.model.CheckpointFromDB +import za.co.absa.atum.server.model.{CheckpointItemFromDB, PaginatedResult} +import za.co.absa.atum.server.api.database.flows.functions.GetFlowCheckpoints.GetFlowCheckpointsArgs +import za.co.absa.atum.server.api.exception.DatabaseError.GeneralDatabaseError +import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore} import zio._ import zio.interop.catz.asyncInstance -class FlowRepositoryImpl(getFlowCheckpointsFn: GetFlowCheckpoints) extends FlowRepository with BaseRepository { +class FlowRepositoryImpl(getFlowCheckpointsFn: GetFlowCheckpoints) + extends FlowRepository with BaseRepository { - override def getFlowCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[DatabaseError, Seq[CheckpointFromDB]] = { - dbMultipleResultCallWithAggregatedStatus(getFlowCheckpointsFn(checkpointQueryDTO), "getFlowCheckpoints") - } + override def getFlowCheckpoints( + flowId: Long, + limit: Option[Int], + offset: Option[Long], + checkpointName: Option[String] + ): IO[DatabaseError, PaginatedResult[CheckpointV2DTO]] = { + dbMultipleResultCallWithAggregatedStatus( + getFlowCheckpointsFn(GetFlowCheckpointsArgs(flowId, limit, offset, checkpointName)), + "getFlowCheckpoints" + ) + .map(_.flatten) + .flatMap { checkpointItems => + ZIO + .fromEither(CheckpointItemFromDB.groupAndConvertItemsToCheckpointV2DTOs(checkpointItems)) + .mapBoth( + error => GeneralDatabaseError(error.getMessage), + checkpoints => + if (checkpointItems.nonEmpty && checkpointItems.head.hasMore) ResultHasMore(checkpoints) + else ResultNoMore(checkpoints) + ) + } + } } object FlowRepositoryImpl { val layer: URLayer[GetFlowCheckpoints, FlowRepository] = ZLayer { for { - getFlowCheckpoints <- ZIO.service[GetFlowCheckpoints] - } yield new FlowRepositoryImpl(getFlowCheckpoints) + getFlowCheckpointsV2 <- ZIO.service[GetFlowCheckpoints] + } yield new FlowRepositoryImpl(getFlowCheckpointsV2) } } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala index 45023a4e7..e0177509a 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala @@ -34,9 +34,7 @@ trait PartitioningRepository { def getPartitioningAdditionalData(partitioning: PartitioningDTO): IO[DatabaseError, InitialAdditionalDataDTO] - def getPartitioningAdditionalDataV2( - partitioningId: Long - ): IO[DatabaseError, AdditionalDataDTO] + def getPartitioningAdditionalDataV2(partitioningId: Long): IO[DatabaseError, AdditionalDataDTO] def createOrUpdateAdditionalData( partitioningId: Long, diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/FlowService.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/FlowService.scala index 72945d705..b3e9fc823 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/FlowService.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/FlowService.scala @@ -16,12 +16,20 @@ package za.co.absa.atum.server.api.service -import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointQueryDTO} +import za.co.absa.atum.model.dto.CheckpointV2DTO import za.co.absa.atum.server.api.exception.ServiceError +import za.co.absa.atum.server.model.PaginatedResult import zio._ import zio.macros.accessible @accessible trait FlowService { - def getFlowCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[ServiceError, Seq[CheckpointDTO]] + + def getFlowCheckpoints( + flowId: Long, + limit: Option[Int], + offset: Option[Long], + checkpointName: Option[String] + ): IO[ServiceError, PaginatedResult[CheckpointV2DTO]] + } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/FlowServiceImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/FlowServiceImpl.scala index 497d1c4d8..8ba8a6b9f 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/FlowServiceImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/FlowServiceImpl.scala @@ -18,25 +18,22 @@ package za.co.absa.atum.server.api.service import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.api.exception.ServiceError -import za.co.absa.atum.server.api.exception.ServiceError._ import za.co.absa.atum.server.api.repository.FlowRepository -import za.co.absa.atum.server.model.CheckpointFromDB +import za.co.absa.atum.server.model.PaginatedResult import zio._ class FlowServiceImpl(flowRepository: FlowRepository) extends FlowService with BaseService { - override def getFlowCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[ServiceError, Seq[CheckpointDTO]] = { - for { - checkpointsFromDB <- repositoryCall( - flowRepository.getFlowCheckpoints(checkpointQueryDTO), - "getFlowCheckpoints" - ) - checkpointDTOs <- ZIO.foreach(checkpointsFromDB) { checkpointFromDB => - ZIO - .fromEither(CheckpointFromDB.toCheckpointDTO(checkpointQueryDTO.partitioning, checkpointFromDB)) - .mapError(error => GeneralServiceError(error.getMessage)) - } - } yield checkpointDTOs + override def getFlowCheckpoints( + flowId: Long, + limit: Option[Int], + offset: Option[Long], + checkpointName: Option[String] + ): IO[ServiceError, PaginatedResult[CheckpointV2DTO]] = { + repositoryCall( + flowRepository.getFlowCheckpoints(flowId, limit, offset, checkpointName), + "getFlowCheckpoints" + ) } } diff --git a/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemFromDB.scala b/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemFromDB.scala index 3d8ca67c9..3df68eb9b 100644 --- a/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemFromDB.scala +++ b/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemFromDB.scala @@ -16,9 +16,9 @@ package za.co.absa.atum.server.model + import io.circe.{DecodingFailure, Json} import za.co.absa.atum.model.dto.{CheckpointV2DTO, MeasureDTO, MeasureResultDTO, MeasurementDTO} - import java.time.ZonedDateTime import java.util.UUID diff --git a/server/src/main/scala/za/co/absa/atum/server/model/PaginatedResult.scala b/server/src/main/scala/za/co/absa/atum/server/model/PaginatedResult.scala index e3a2898e4..4adebf4ef 100644 --- a/server/src/main/scala/za/co/absa/atum/server/model/PaginatedResult.scala +++ b/server/src/main/scala/za/co/absa/atum/server/model/PaginatedResult.scala @@ -23,6 +23,7 @@ sealed trait PaginatedResult[R] { object PaginatedResult { case class ResultHasMore[R](data: Seq[R]) extends PaginatedResult[R] + case class ResultNoMore[R](data: Seq[R]) extends PaginatedResult[R] } diff --git a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala index 7ac7ed79a..467317ac0 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala @@ -350,6 +350,7 @@ trait TestData { checkpointStartTime = Some(checkpointDTO1.processStartTime), checkpointEndTime = checkpointDTO1.processEndTime ) + protected val checkpointFromDB2: CheckpointFromDB = checkpointFromDB1 .copy( idCheckpoint = Some(checkpointDTO2.id), @@ -381,6 +382,19 @@ trait TestData { hasMore = true ) + protected val checkpointItemFromDB2: CheckpointItemFromDB = CheckpointItemFromDB( + idCheckpoint = checkpointV2DTO2.id, + checkpointName = checkpointV2DTO2.name, + author = checkpointV2DTO2.author, + measuredByAtumAgent = checkpointV2DTO2.measuredByAtumAgent, + measureName = checkpointV2DTO2.measurements.head.measure.measureName, + measuredColumns = checkpointV2DTO2.measurements.head.measure.measuredColumns, + measurementValue = checkpointV2DTO2.measurements.head.result.asJson, + checkpointStartTime = checkpointV2DTO2.processStartTime, + checkpointEndTime = checkpointV2DTO2.processEndTime, + hasMore = false + ) + protected def createAtumContextDTO(partitioningSubmitDTO: PartitioningSubmitDTO): AtumContextDTO = { val measures: Set[MeasureDTO] = Set(MeasureDTO("count", Seq("*"))) val additionalData: InitialAdditionalDataDTO = Map.empty diff --git a/server/src/test/scala/za/co/absa/atum/server/api/controller/FlowControllerUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/controller/FlowControllerUnitTests.scala index 4d2f90fe3..33c0c8215 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/controller/FlowControllerUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/controller/FlowControllerUnitTests.scala @@ -20,33 +20,41 @@ import org.mockito.Mockito.{mock, when} import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.exception.ServiceError._ import za.co.absa.atum.server.api.service.FlowService -import za.co.absa.atum.server.model.InternalServerErrorResponse +import za.co.absa.atum.server.model.{NotFoundErrorResponse, Pagination} +import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore} import zio._ import zio.test.Assertion.failsWithA import zio.test._ object FlowControllerUnitTests extends ZIOSpecDefault with TestData { private val flowServiceMock = mock(classOf[FlowService]) - when(flowServiceMock.getFlowCheckpoints(checkpointQueryDTO1)) - .thenReturn(ZIO.fail(GeneralServiceError("boom!"))) - when(flowServiceMock.getFlowCheckpoints(checkpointQueryDTO2)) - .thenReturn(ZIO.succeed(Seq(checkpointDTO2))) + when(flowServiceMock.getFlowCheckpoints(1L, Some(5), Some(2), None)) + .thenReturn(ZIO.succeed(ResultHasMore(Seq(checkpointV2DTO1)))) + when(flowServiceMock.getFlowCheckpoints(2L, Some(5), Some(0), None)) + .thenReturn(ZIO.succeed(ResultNoMore(Seq(checkpointV2DTO2)))) + when(flowServiceMock.getFlowCheckpoints(3L, Some(5), Some(0), None)) + .thenReturn(ZIO.fail(NotFoundServiceError("Flow not found"))) private val flowServiceMockLayer = ZLayer.succeed(flowServiceMock) override def spec: Spec[TestEnvironment with Scope, Any] = { suite("FlowControllerSuite")( - suite("GetFlowCheckpointsSuite")( - test("Returns expected InternalServerErrorResponse") { - assertZIO(FlowController.getFlowCheckpointsV2(checkpointQueryDTO1).exit)( - failsWithA[InternalServerErrorResponse] - ) + suite("GetFlowCheckpointsV2Suite")( + test("Returns expected Seq[CheckpointV2DTO] with Pagination indicating there is more data available") { + for { + result <- FlowController.getFlowCheckpoints(1L, Some(5), Some(2), None) + } yield assertTrue(result.data == Seq(checkpointV2DTO1) && result.pagination == Pagination(5, 2, hasMore = true)) }, - test("Returns expected CheckpointDTO") { + test("Returns expected Seq[CheckpointV2DTO] with Pagination indicating there is no more data available") { for { - result <- FlowController.getFlowCheckpointsV2(checkpointQueryDTO2) - } yield assertTrue(result.data == Seq(checkpointDTO2)) + result <- FlowController.getFlowCheckpoints(2L, Some(5), Some(0), None) + } yield assertTrue(result.data == Seq(checkpointV2DTO2) && result.pagination == Pagination(5, 0, hasMore = false)) + }, + test("Returns expected NotFoundServiceError when service returns NotFoundServiceError") { + assertZIO(FlowController.getFlowCheckpoints(3L, Some(5), Some(0), None).exit)( + failsWithA[NotFoundErrorResponse] + ) } ) ).provide( diff --git a/server/src/test/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpointsIntegrationTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpointsIntegrationTests.scala index 67ad6c05a..f6798d7ae 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpointsIntegrationTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpointsIntegrationTests.scala @@ -17,36 +17,33 @@ package za.co.absa.atum.server.api.database.flows.functions import za.co.absa.atum.server.ConfigProviderTest -import za.co.absa.atum.model.dto.{CheckpointQueryDTO, PartitionDTO, PartitioningDTO} import za.co.absa.atum.server.api.TestTransactorProvider import za.co.absa.atum.server.api.database.PostgresDatabaseProvider import za.co.absa.db.fadb.exceptions.DataNotFoundException import za.co.absa.db.fadb.status.FunctionStatus -import zio.interop.catz.asyncInstance -import zio.{Scope, ZIO} +import zio._ import zio.test._ +import zio.interop.catz.asyncInstance object GetFlowCheckpointsIntegrationTests extends ConfigProviderTest { override def spec: Spec[TestEnvironment with Scope, Any] = { - val partitioningDTO1: PartitioningDTO = Seq( - PartitionDTO("stringA", "stringA"), - PartitionDTO("stringB", "stringB") - ) - suite("GetFlowCheckpointsIntegrationTests")( - test("Returns expected sequence of flow of Checkpoints with existing partitioning") { - val partitioningQueryDTO: CheckpointQueryDTO = CheckpointQueryDTO( - partitioning = partitioningDTO1, + test("Should return checkpoints with the correct flowId, limit, and offset") { + + // Define the GetFlowCheckpointsArgs DTO + val args = GetFlowCheckpoints.GetFlowCheckpointsArgs( + flowId = 1L, limit = Some(10), - checkpointName = Some("checkpointName") + offset = Some(0L), + checkpointName = Some("TestCheckpointName") ) for { getFlowCheckpoints <- ZIO.service[GetFlowCheckpoints] - result <- getFlowCheckpoints(partitioningQueryDTO) - } yield assertTrue(result == Left(DataNotFoundException(FunctionStatus(41, "Partitioning not found")))) + result <- getFlowCheckpoints(args) + } yield assertTrue(result == Left(DataNotFoundException(FunctionStatus(42, "Flow not found")))) } ).provide( GetFlowCheckpoints.layer, diff --git a/server/src/test/scala/za/co/absa/atum/server/api/http/GetFlowCheckpointsV2EndpointUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/http/GetFlowCheckpointsV2EndpointUnitTests.scala new file mode 100644 index 000000000..7d33bdf38 --- /dev/null +++ b/server/src/test/scala/za/co/absa/atum/server/api/http/GetFlowCheckpointsV2EndpointUnitTests.scala @@ -0,0 +1,135 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.http + +import org.mockito.Mockito.{mock, when} +import sttp.client3.testing.SttpBackendStub +import sttp.client3.{UriContext, basicRequest} +import sttp.client3.circe.asJson +import sttp.model.StatusCode +import sttp.tapir.server.stub.TapirStubInterpreter +import sttp.tapir.ztapir.{RIOMonadError, RichZEndpoint} +import za.co.absa.atum.model.dto.CheckpointV2DTO +import za.co.absa.atum.server.api.TestData +import za.co.absa.atum.server.api.controller.FlowController +import za.co.absa.atum.server.model.{NotFoundErrorResponse, Pagination} +import za.co.absa.atum.server.model.SuccessResponse.PaginatedResponse +import zio.{Scope, ZIO, ZLayer} +import zio.test.{Spec, TestEnvironment, ZIOSpecDefault, assertZIO} +import zio.test.Assertion.equalTo + +import java.util.UUID + +object GetFlowCheckpointsV2EndpointUnitTests extends ZIOSpecDefault with Endpoints with TestData { + private val flowControllerMockV2 = mock(classOf[FlowController]) + private val uuid = UUID.randomUUID() + + when(flowControllerMockV2.getFlowCheckpoints(1L, Some(5), Some(0), None)) + .thenReturn(ZIO.succeed(PaginatedResponse(Seq(checkpointV2DTO1), Pagination(5, 0, hasMore = true), uuid))) + when(flowControllerMockV2.getFlowCheckpoints(2L, Some(5), Some(0), None)) + .thenReturn(ZIO.succeed(PaginatedResponse(Seq(checkpointV2DTO2), Pagination(5, 0, hasMore = false), uuid))) + when(flowControllerMockV2.getFlowCheckpoints(3L, Some(5), Some(0), None)) + .thenReturn(ZIO.fail(NotFoundErrorResponse("Flow not found for a given ID"))) + + private val flowControllerMockLayerV2 = ZLayer.succeed(flowControllerMockV2) + + private val getFlowCheckpointServerEndpointV2 = getFlowCheckpointsEndpointV2.zServerLogic({ + case (flowId: Long, limit: Option[Int], offset: Option[Long], checkpointName: Option[String]) => + FlowController.getFlowCheckpoints(flowId, limit, offset, checkpointName) + }) + + def spec: Spec[TestEnvironment with Scope, Any] = { + val backendStub = TapirStubInterpreter(SttpBackendStub.apply(new RIOMonadError[FlowController])) + .whenServerEndpoint(getFlowCheckpointServerEndpointV2) + .thenRunLogic() + .backend() + + suite("GetFlowCheckpointsEndpointSuite")( + test("Returns an expected PaginatedResponse[CheckpointV2DTO] with more data available") { + val baseUri = uri"https://test.com/api/v2/flows/1/checkpoints?limit=5&offset=0" + val response = basicRequest + .get(baseUri) + .response(asJson[PaginatedResponse[CheckpointV2DTO]]) + .send(backendStub) + + val body = response.map(_.body) + val statusCode = response.map(_.code) + + assertZIO(body <&> statusCode)( + equalTo( + Right(PaginatedResponse(Seq(checkpointV2DTO1), Pagination(5, 0, hasMore = true), uuid)), + StatusCode.Ok + ) + ) + }, + test("Returns an expected PaginatedResponse[CheckpointV2DTO] with no more data available") { + val baseUri = uri"https://test.com/api/v2/flows/2/checkpoints?limit=5&offset=0" + val response = basicRequest + .get(baseUri) + .response(asJson[PaginatedResponse[CheckpointV2DTO]]) + .send(backendStub) + + val body = response.map(_.body) + val statusCode = response.map(_.code) + + println(s"body: $body and statusCode: $statusCode") + + assertZIO(body <&> statusCode)( + equalTo( + Right(PaginatedResponse(Seq(checkpointV2DTO2), Pagination(5, 0, hasMore = false), uuid)), + StatusCode.Ok + ) + ) + }, + test("Returns expected 404 when checkpoint data for a given ID doesn't exist") { + val baseUri = uri"https://test.com/api/v2/flows/3/checkpoints?limit=5&offset=0" + val response = basicRequest + .get(baseUri) + .response(asJson[PaginatedResponse[CheckpointV2DTO]]) + .send(backendStub) + + val statusCode = response.map(_.code) + + assertZIO(statusCode)(equalTo(StatusCode.NotFound)) + }, + test("Returns expected 400 when limit is out of range") { + val baseUri = uri"https://test.com/api/v2/flows/1/checkpoints?limit=1005&offset=0" + val response = basicRequest + .get(baseUri) + .response(asJson[PaginatedResponse[CheckpointV2DTO]]) + .send(backendStub) + + val statusCode = response.map(_.code) + + assertZIO(statusCode)(equalTo(StatusCode.BadRequest)) + }, + test("Returns expected 400 when offset is negative") { + val baseUri = uri"https://test.com/api/v2/flows/1/checkpoints?limit=-1&offset=0" + val response = basicRequest + .get(baseUri) + .response(asJson[PaginatedResponse[CheckpointV2DTO]]) + .send(backendStub) + + val statusCode = response.map(_.code) + + assertZIO(statusCode)(equalTo(StatusCode.BadRequest)) + } + ) + }.provide( + flowControllerMockLayerV2 + ) +} diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/FlowRepositoryUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/FlowRepositoryUnitTests.scala index 7f34bab93..595eb56e5 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/repository/FlowRepositoryUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/FlowRepositoryUnitTests.scala @@ -17,9 +17,13 @@ package za.co.absa.atum.server.api.repository import org.mockito.Mockito.{mock, when} +import za.co.absa.atum.model.dto.CheckpointV2DTO import za.co.absa.atum.server.api.TestData +import za.co.absa.atum.server.api.database.flows.functions.GetFlowCheckpoints.GetFlowCheckpointsArgs import za.co.absa.atum.server.api.database.flows.functions.GetFlowCheckpoints -import za.co.absa.atum.server.api.exception.DatabaseError +import za.co.absa.atum.server.api.exception.DatabaseError.NotFoundDatabaseError +import za.co.absa.db.fadb.exceptions.DataNotFoundException +import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore} import zio._ import zio.interop.catz.asyncInstance import zio.test.Assertion.failsWithA @@ -28,36 +32,47 @@ import za.co.absa.db.fadb.status.{FunctionStatus, Row} object FlowRepositoryUnitTests extends ZIOSpecDefault with TestData { - private val getFlowCheckpointsMock = mock(classOf[GetFlowCheckpoints]) + private val getFlowCheckpointsV2Mock = mock(classOf[GetFlowCheckpoints]) - when(getFlowCheckpointsMock.apply(checkpointQueryDTO1)).thenReturn(ZIO.fail(new Exception("boom!"))) - when(getFlowCheckpointsMock.apply(checkpointQueryDTO2)) + when(getFlowCheckpointsV2Mock.apply(GetFlowCheckpointsArgs(1, Some(1), Some(1), None))) .thenReturn( ZIO.right( - Seq(Row(FunctionStatus(0, "success"), checkpointFromDB1), Row(FunctionStatus(0, "success"), checkpointFromDB2)) + Seq( + Row(FunctionStatus(11, "success"), Some(checkpointItemFromDB1)), + Row(FunctionStatus(11, "success"), Some(checkpointItemFromDB2)) + ) ) ) + when(getFlowCheckpointsV2Mock.apply(GetFlowCheckpointsArgs(2, Some(1), Some(1), None))) + .thenReturn(ZIO.right(Seq(Row(FunctionStatus(11, "success"), None)))) + when(getFlowCheckpointsV2Mock.apply(GetFlowCheckpointsArgs(3, None, None, None))) + .thenReturn(ZIO.fail(DataNotFoundException(FunctionStatus(42, "Flow not found")))) - private val getFlowCheckpointsMockLayer = ZLayer.succeed(getFlowCheckpointsMock) + private val getFlowCheckpointsV2MockLayer = ZLayer.succeed(getFlowCheckpointsV2Mock) override def spec: Spec[TestEnvironment with Scope, Any] = { suite("FlowRepositoryIntegrationSuite")( - suite("GetFlowCheckpointsSuite")( - test("Returns expected DatabaseError") { - assertZIO(FlowRepository.getFlowCheckpoints(checkpointQueryDTO1).exit)( - failsWithA[DatabaseError] - ) + suite("GetFlowCheckpointsV2Suite")( + test("Returns expected Right with CheckpointV2DTO") { + for { + result <- FlowRepository.getFlowCheckpoints(1, Some(1), Some(1), None) + } yield assertTrue(result == ResultHasMore(Seq(checkpointV2DTO1, checkpointV2DTO2))) }, - test("Returns expected Left with StatusException") { + test("Returns expected Right with CheckpointV2DTO") { for { - result <- FlowRepository.getFlowCheckpoints(checkpointQueryDTO2) - } yield assertTrue(result == Seq(checkpointFromDB1, checkpointFromDB2)) + result <- FlowRepository.getFlowCheckpoints(2, Some(1), Some(1), None) + } yield assertTrue(result == ResultNoMore(Seq.empty[CheckpointV2DTO])) + }, + test("Returns expected DatabaseError") { + assertZIO(FlowRepository.getFlowCheckpoints(3, None, None, None).exit)( + failsWithA[NotFoundDatabaseError] + ) } ) ).provide( FlowRepositoryImpl.layer, - getFlowCheckpointsMockLayer + getFlowCheckpointsV2MockLayer ) } diff --git a/server/src/test/scala/za/co/absa/atum/server/api/service/FlowServiceUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/service/FlowServiceUnitTests.scala index a3c856fec..c1bc930cb 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/service/FlowServiceUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/service/FlowServiceUnitTests.scala @@ -18,9 +18,10 @@ package za.co.absa.atum.server.api.service import org.mockito.Mockito.{mock, when} import za.co.absa.atum.server.api.TestData -import za.co.absa.atum.server.api.exception.DatabaseError.GeneralDatabaseError -import za.co.absa.atum.server.api.exception.ServiceError import za.co.absa.atum.server.api.repository.FlowRepository +import za.co.absa.atum.server.model.PaginatedResult.ResultHasMore +import za.co.absa.atum.server.api.exception.DatabaseError.NotFoundDatabaseError +import za.co.absa.atum.server.api.exception.ServiceError.NotFoundServiceError import zio._ import zio.test.Assertion.failsWithA import zio.test._ @@ -28,27 +29,28 @@ import zio.test._ object FlowServiceUnitTests extends ZIOSpecDefault with TestData { private val flowRepositoryMock = mock(classOf[FlowRepository]) - when(flowRepositoryMock.getFlowCheckpoints(checkpointQueryDTO1)).thenReturn(ZIO.fail(GeneralDatabaseError("boom!"))) - when(flowRepositoryMock.getFlowCheckpoints(checkpointQueryDTO2)) - .thenReturn(ZIO.succeed(Seq(checkpointFromDB2))) + when(flowRepositoryMock.getFlowCheckpoints(1L, None, None, None)) + .thenReturn(ZIO.succeed(ResultHasMore(Seq(checkpointV2DTO1)))) + when(flowRepositoryMock.getFlowCheckpoints(2L, None, None, None)) + .thenReturn(ZIO.fail(NotFoundDatabaseError("Flow not found"))) private val flowRepositoryMockLayer = ZLayer.succeed(flowRepositoryMock) override def spec: Spec[TestEnvironment with Scope, Any] = { suite("FlowServiceSuite")( - suite("GetFlowCheckpointsSuite")( - test("Returns expected ServiceError") { - assertZIO(FlowService.getFlowCheckpoints(checkpointQueryDTO1).exit)( - failsWithA[ServiceError] - ) - }, - test("Returns expected Seq[CheckpointDTO]") { + suite("GetFlowCheckpointsV2Suite")( + test("Returns expected PaginatedResult[CheckpointV2DTO]") { for { - result <- FlowService.getFlowCheckpoints(checkpointQueryDTO2) + result <- FlowService.getFlowCheckpoints(1L, None, None, None) } yield assertTrue { - result == Seq(checkpointDTO2) + result == ResultHasMore(Seq(checkpointV2DTO1)) } + }, + test("Returns expected ServiceError") { + assertZIO(FlowService.getFlowCheckpoints(2L, None, None, None).exit)( + failsWithA[NotFoundServiceError] + ) } ) ).provide(