From 6f37267fde1c78d56b39e4fa60ff1049060c1a69 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Wed, 20 Nov 2024 11:39:58 +0100 Subject: [PATCH 01/19] tmp --- database/README.md | 2 +- .../flows/V0.2.0.57__get_flow_checkpoints.sql | 21 +- .../GetFlowCheckpointsIntegrationTests.scala | 253 +++++++++++++++++- 3 files changed, 261 insertions(+), 15 deletions(-) diff --git a/database/README.md b/database/README.md index ebdbe0801..923fd0c40 100644 --- a/database/README.md +++ b/database/README.md @@ -9,7 +9,7 @@ How to set up database for local testing docker run --name=atum_db -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=atum_db -p 5432:5432 -d postgres:16 # migrate scripts -sbt flywayMigrate +sbt flywayMigrate -Dflyway.baselineVersion=0.1.0.1 -Dflyway.baselineOnMigrate=true # kill & remove docker container (optional; only if using dockerized postgres instance) docker kill atum_db diff --git a/database/src/main/postgres/flows/V0.2.0.57__get_flow_checkpoints.sql b/database/src/main/postgres/flows/V0.2.0.57__get_flow_checkpoints.sql index 72df99e7f..001fc4284 100644 --- a/database/src/main/postgres/flows/V0.2.0.57__get_flow_checkpoints.sql +++ b/database/src/main/postgres/flows/V0.2.0.57__get_flow_checkpoints.sql @@ -23,13 +23,16 @@ CREATE OR REPLACE FUNCTION flows.get_flow_checkpoints( OUT status_text TEXT, OUT id_checkpoint UUID, OUT checkpoint_name TEXT, - OUT author TEXT, + OUT checkpoint_author TEXT, OUT measured_by_atum_agent BOOLEAN, OUT measure_name TEXT, OUT measured_columns TEXT[], OUT measurement_value JSONB, OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE, OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE, + OUT id_partitioning BIGINT, + OUT o_partitioning JSONB, + OUT partitioning_author TEXT, OUT has_more BOOLEAN ) RETURNS SETOF record AS $$ @@ -61,7 +64,7 @@ $$ -- status_text - Status text -- id_checkpoint - ID of retrieved checkpoint -- checkpoint_name - Name of the retrieved checkpoint --- author - Author of the checkpoint +-- checkpoint_author - Author of the checkpoint -- measured_by_atum_agent - Flag indicating whether the checkpoint was measured by Atum Agent -- (if false, data supplied manually) -- measure_name - measure name associated with a given checkpoint @@ -69,6 +72,9 @@ $$ -- measurement_value - measurement details associated with a given checkpoint -- checkpoint_start_time - Time of the checkpoint -- checkpoint_end_time - End time of the checkpoint computation +-- id_partitioning - ID of the partitioning +-- o_partitioning - Partitioning value +-- partitioning_author - Author of the partitioning -- has_more - flag indicating whether there are more checkpoints available, always `false` if `i_limit` is NULL -- -- Status codes: @@ -109,6 +115,7 @@ BEGIN RETURN QUERY WITH limited_checkpoints AS ( SELECT C.id_checkpoint, + C.fk_partitioning, C.checkpoint_name, C.created_by, C.measured_by_atum_agent, @@ -118,7 +125,7 @@ BEGIN JOIN flows.partitioning_to_flow PF ON C.fk_partitioning = PF.fk_partitioning WHERE PF.fk_flow = i_flow_id AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name) - ORDER BY C.id_checkpoint, C.process_start_time + ORDER BY C.process_start_time desc LIMIT i_checkpoints_limit OFFSET i_offset ) SELECT @@ -133,6 +140,9 @@ BEGIN M.measurement_value, LC.process_start_time AS checkpoint_start_time, LC.process_end_time AS checkpoint_end_time, + LC.fk_partitioning AS id_partitioning, + P.partitioning AS o_partitioning, + P.created_by AS partitioning_author, _has_more AS has_more FROM limited_checkpoints LC @@ -140,8 +150,9 @@ BEGIN runs.measurements M ON LC.id_checkpoint = M.fk_checkpoint INNER JOIN runs.measure_definitions MD ON M.fk_measure_definition = MD.id_measure_definition - ORDER BY - LC.id_checkpoint, LC.process_start_time; + INNER JOIN + runs.partitionings P ON LC.fk_partitioning = P.id_partitioning + ORDER BY LC.process_start_time desc; END; $$ LANGUAGE plpgsql VOLATILE SECURITY DEFINER; diff --git a/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowCheckpointsIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowCheckpointsIntegrationTests.scala index ed06fe224..bffd38237 100644 --- a/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowCheckpointsIntegrationTests.scala +++ b/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowCheckpointsIntegrationTests.scala @@ -76,7 +76,7 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { |""".stripMargin ) - test("getFlowCheckpointsV2 should return all checkpoints for a given flow") { + ignore("getFlowCheckpointsV2 should return all checkpoints for a given flow") { val partitioningId: Long = Random.nextLong() table("runs.partitionings").insert( @@ -184,7 +184,7 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { assert(row1.getString("status_text").contains("OK")) assert(row1.getUUID("id_checkpoint").contains(checkpointId1)) assert(row1.getString("checkpoint_name").contains("CheckpointNameCntAndAvg")) - assert(row1.getString("author").contains("Joseph")) + assert(row1.getString("checkpoint_author").contains("Joseph")) assert(row1.getBoolean("measured_by_atum_agent").contains(true)) assert(row1.getOffsetDateTime("checkpoint_start_time").contains(startTime)) assert(row1.getOffsetDateTime("checkpoint_end_time").contains(endTime)) @@ -200,7 +200,7 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { assert(row2.getString("status_text").contains("OK")) assert(row2.getUUID("id_checkpoint").contains(checkpointId1)) assert(row2.getString("checkpoint_name").contains("CheckpointNameCntAndAvg")) - assert(row2.getString("author").contains("Joseph")) + assert(row2.getString("checkpoint_author").contains("Joseph")) assert(row2.getBoolean("measured_by_atum_agent").contains(true)) assert(row2.getOffsetDateTime("checkpoint_start_time").contains(startTimeOther)) assert(row2.getOffsetDateTime("checkpoint_end_time").contains(endTimeOther)) @@ -218,7 +218,7 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { assert(row3.getString("status_text").contains("OK")) assert(row3.getUUID("id_checkpoint").contains(checkpointId2)) assert(row3.getString("checkpoint_name").contains("CheckpointNameOther")) - assert(row3.getString("author").contains("Joseph")) + assert(row3.getString("checkpoint_author").contains("Joseph")) assert(row3.getBoolean("measured_by_atum_agent").contains(true)) assert(row3.getOffsetDateTime("checkpoint_start_time").contains(startTimeOther)) assert(row3.getOffsetDateTime("checkpoint_end_time").contains(endTimeOther)) @@ -252,7 +252,242 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { } } - test("getFlowCheckpointsV2 should return limited with checkpoints for a given flow") { + // ********* + test("getFlowCheckpointsV2 should return all checkpoints of a given name and a given flow") { + + val partitioningId: Long = Random.nextLong() + table("runs.partitionings").insert( + add("id_partitioning", partitioningId) + .add("partitioning", partitioning) + .add("created_by", "Joseph") + ) + + val flowId: Long = Random.nextLong() + table("flows.flows").insert( + add("id_flow", flowId) + .add("flow_name", "flowName") + .add("from_pattern", false) + .add("created_by", "Joseph") + .add("fk_primary_partitioning", partitioningId) + ) + + table("flows.partitioning_to_flow").insert( + add("fk_flow", flowId) + .add("fk_partitioning", partitioningId) + .add("created_by", "ObviouslySomeTest") + ) + + // Insert checkpoints and measure definitions + val checkpointId1 = UUID.fromString("e9efe108-d8fc-42ad-b367-aa1b17f6c450") +// val startTime = OffsetDateTime.parse("1993-02-14T10:00:00Z") + val startTime1 = OffsetDateTime.now() +// val endTime = OffsetDateTime.parse("2024-04-24T10:00:00Z") + val endTime1 = OffsetDateTime.now() + table("runs.checkpoints").insert( + add("id_checkpoint", checkpointId1) + .add("fk_partitioning", partitioningId) + .add("checkpoint_name", "CheckpointNameCntAndAvg") + .add("measured_by_atum_agent", true) + .add("process_start_time", startTime1) + .add("process_end_time", endTime1) + .add("created_by", "Joseph") + ) + + val checkpointId2 = UUID.randomUUID() + val startTimeOther = OffsetDateTime.parse("1993-02-14T10:00:00Z") + val endTimeOther = OffsetDateTime.parse("2024-04-24T10:00:00Z") + table("runs.checkpoints").insert( + add("id_checkpoint", checkpointId2) + .add("fk_partitioning", partitioningId) + .add("checkpoint_name", "CheckpointNameOther") + .add("measured_by_atum_agent", true) + .add("process_start_time", startTimeOther) + .add("process_end_time", endTimeOther) + .add("created_by", "Joseph") + ) + + val checkpointId3 = UUID.fromString("41ea35cd-6398-42ed-b6d3-a8d613176575") +// val startTime3 = OffsetDateTime.parse("1993-02-14T11:00:00Z") + val startTime3 = startTime1.plusHours(1L) +// val endTime3 = OffsetDateTime.parse("2024-04-24T11:00:00Z") + val endTime3 = endTime1.plusHours(1L) + table("runs.checkpoints").insert( + add("id_checkpoint", checkpointId3) + .add("fk_partitioning", partitioningId) + .add("checkpoint_name", "CheckpointNameCntAndAvg") + .add("measured_by_atum_agent", true) + .add("process_start_time", startTime3) + .add("process_end_time", endTime3) + .add("created_by", "Joseph") + ) + + // Insert measure definitions and measurements + val measureDefinitionAvgId: Long = Random.nextLong() + table("runs.measure_definitions").insert( + add("id_measure_definition", measureDefinitionAvgId) + .add("fk_partitioning", partitioningId) + .add("measure_name", "avg") + .add("measured_columns", CustomDBType("""{"a","b"}""", "TEXT[]")) + .add("created_by", "Joseph") + ) + + val measureDefinitionCntId: Long = Random.nextLong() + table("runs.measure_definitions").insert( + add("id_measure_definition", measureDefinitionCntId) + .add("fk_partitioning", partitioningId) + .add("measure_name", "cnt") + .add("measured_columns", CustomDBType("""{"col1"}""", "TEXT[]")) + .add("created_by", "Joseph") + ) + + val measureDefinitionOtherId: Long = Random.nextLong() + table("runs.measure_definitions").insert( + add("id_measure_definition", measureDefinitionOtherId) + .add("fk_partitioning", partitioningId) + .add("measure_name", "sum") + .add("measured_columns", CustomDBType("""{"colOther"}""", "TEXT[]")) + .add("created_by", "Joseph") + ) + + table("runs.measurements").insert( + add("fk_measure_definition", measureDefinitionCntId) + .add("fk_checkpoint", checkpointId1) + .add("measurement_value", measurementCnt) + ) + + table("runs.measurements").insert( + add("fk_measure_definition", measureDefinitionAvgId) + .add("fk_checkpoint", checkpointId1) + .add("measurement_value", measurementAvg) + ) + + val measureDefinitionCntId3: Long = Random.nextLong() + table("runs.measurements").insert( + add("fk_measure_definition", measureDefinitionCntId3) + .add("fk_checkpoint", checkpointId3) + .add("measurement_value", measurementCnt) + ) + + val measureDefinitionAvgId3: Long = Random.nextLong() + table("runs.measurements").insert( + add("fk_measure_definition", measureDefinitionAvgId3) + .add("fk_checkpoint", checkpointId3) + .add("measurement_value", measurementAvg) + ) + + table("runs.measurements").insert( + add("fk_measure_definition", Random.nextLong()) + .add("fk_checkpoint", checkpointId2) + .add("measurement_value", measurementSum) + ) + + // Actual test execution and assertions with limit and offset applied + val actualMeasures: Seq[MeasuredDetails] = function(fncGetFlowCheckpointsV2) + .setParam("i_flow_id", flowId) + .setParam("i_checkpoint_name", "CheckpointNameCntAndAvg") + .execute { queryResult => + assert(queryResult.hasNext) + + val row1 = queryResult.next() + assert(row1.getInt("status").contains(11)) + assert(row1.getString("status_text").contains("OK")) + assert(row1.getUUID("id_checkpoint").contains(checkpointId3)) + assert(row1.getString("checkpoint_name").contains("CheckpointNameCntAndAvg")) + assert(row1.getString("checkpoint_author").contains("Joseph")) + assert(row1.getBoolean("measured_by_atum_agent").contains(true)) + assert(row1.getOffsetDateTime("checkpoint_start_time").contains(startTime3)) + assert(row1.getOffsetDateTime("checkpoint_end_time").contains(endTime3)) + assert(row1.getLong("id_partitioning").contains(partitioningId)) + assert(row1.getJsonB("o_partitioning").contains(partitioning)) + assert(row1.getString("partitioning_author").contains("Joseph")) + + val measure1 = MeasuredDetails( + row1.getString("measure_name").get, + row1.getArray[String]("measured_columns").map(_.toList).get, + row1.getJsonB("measurement_value").get + ) + + val row2 = queryResult.next() + assert(row2.getInt("status").contains(11)) + assert(row2.getString("status_text").contains("OK")) + assert(row2.getUUID("id_checkpoint").contains(checkpointId3)) + assert(row2.getString("checkpoint_name").contains("CheckpointNameCntAndAvg")) + assert(row2.getString("checkpoint_author").contains("Joseph")) + assert(row2.getBoolean("measured_by_atum_agent").contains(true)) + assert(row2.getOffsetDateTime("checkpoint_start_time").contains(startTime3)) + assert(row2.getOffsetDateTime("checkpoint_end_time").contains(endTime3)) + assert(row2.getLong("id_partitioning").contains(partitioningId)) + assert(row2.getJsonB("o_partitioning").contains(partitioning)) + assert(row2.getString("partitioning_author").contains("Joseph")) + + val measure2 = MeasuredDetails( + row2.getString("measure_name").get, + row2.getArray[String]("measured_columns").map(_.toList).get, + row2.getJsonB("measurement_value").get + ) + + assert(queryResult.hasNext) + + val row3 = queryResult.next() + assert(row3.getInt("status").contains(11)) + assert(row3.getString("status_text").contains("OK")) + assert(row3.getUUID("id_checkpoint").contains(checkpointId1)) + assert(row3.getString("checkpoint_name").contains("CheckpointNameCntAndAvg")) + assert(row3.getString("checkpoint_author").contains("Joseph")) + assert(row3.getBoolean("measured_by_atum_agent").contains(true)) + assert(row3.getOffsetDateTime("checkpoint_start_time").contains(startTime1)) + assert(row3.getOffsetDateTime("checkpoint_end_time").contains(endTime1)) + assert(row3.getLong("id_partitioning").contains(partitioningId)) + assert(row3.getJsonB("o_partitioning").contains(partitioning)) + assert(row3.getString("partitioning_author").contains("Joseph")) + + val measure3 = MeasuredDetails( + row3.getString("measure_name").get, + row3.getArray[String]("measured_columns").map(_.toList).get, + row3.getJsonB("measurement_value").get + ) + + val row4 = queryResult.next() + assert(row4.getInt("status").contains(11)) + assert(row4.getString("status_text").contains("OK")) + assert(row4.getUUID("id_checkpoint").contains(checkpointId1)) + assert(row4.getString("checkpoint_name").contains("CheckpointNameCntAndAvg")) + assert(row4.getString("checkpoint_author").contains("Joseph")) + assert(row4.getBoolean("measured_by_atum_agent").contains(true)) + assert(row4.getOffsetDateTime("checkpoint_start_time").contains(startTime1)) + assert(row4.getOffsetDateTime("checkpoint_end_time").contains(endTime1)) + assert(row4.getLong("id_partitioning").contains(partitioningId)) + assert(row4.getJsonB("o_partitioning").contains(partitioning)) + assert(row4.getString("partitioning_author").contains("Joseph")) + + val measure4 = MeasuredDetails( + row4.getString("measure_name").get, + row4.getArray[String]("measured_columns").map(_.toList).get, + row4.getJsonB("measurement_value").get + ) + + Seq(measure1, measure2, measure3, measure4) + } + + // Assertions for measures + assert(actualMeasures.map(_.measureName).toSet == Set("avg", "cnt")) + assert(actualMeasures.map(_.measureColumns).toSet == Set(List("a", "b"), List("col1"))) + + actualMeasures.foreach { currVal => + val currValStr = currVal.measurementValue.value + + currVal.measureName match { + case "cnt" => + assert(currValStr.contains(""""value": "3"""")) + case "avg" => + assert(currValStr.contains(""""value": "2.71"""")) + case other => + fail(s"Unexpected measure name: $other") + } + } + } + + ignore("getFlowCheckpointsV2 should return limited with checkpoints for a given flow") { val partitioningId: Long = Random.nextLong() table("runs.partitionings").insert( @@ -361,7 +596,7 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { assert(row1.getString("status_text").contains("OK")) assert(row1.getUUID("id_checkpoint").contains(checkpointId1)) assert(row1.getString("checkpoint_name").contains("CheckpointNameCntAndAvg")) - assert(row1.getString("author").contains("Joseph")) + assert(row1.getString("checkpoint_author").contains("Joseph")) assert(row1.getBoolean("measured_by_atum_agent").contains(true)) assert(row1.getOffsetDateTime("checkpoint_start_time").contains(startTime)) assert(row1.getOffsetDateTime("checkpoint_end_time").contains(endTime)) @@ -378,7 +613,7 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { assert(row2.getString("status_text").contains("OK")) assert(row2.getUUID("id_checkpoint").contains(checkpointId1)) assert(row2.getString("checkpoint_name").contains("CheckpointNameCntAndAvg")) - assert(row2.getString("author").contains("Joseph")) + assert(row2.getString("checkpoint_author").contains("Joseph")) assert(row2.getBoolean("measured_by_atum_agent").contains(true)) assert(row2.getOffsetDateTime("checkpoint_start_time").contains(startTimeOther)) assert(row2.getOffsetDateTime("checkpoint_end_time").contains(endTimeOther)) @@ -395,7 +630,7 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { assert(row3.getString("status_text").contains("OK")) assert(row3.getUUID("id_checkpoint").contains(checkpointId2)) assert(row3.getString("checkpoint_name").contains("CheckpointNameOther")) - assert(row3.getString("author").contains("Joseph")) + assert(row3.getString("checkpoint_author").contains("Joseph")) assert(row3.getBoolean("measured_by_atum_agent").contains(true)) assert(row3.getOffsetDateTime("checkpoint_start_time").contains(startTimeOther)) assert(row3.getOffsetDateTime("checkpoint_end_time").contains(endTimeOther)) @@ -430,7 +665,7 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { } } - test("getFlowCheckpointsV2 should return no flows when flow_id is not found") { + ignore("getFlowCheckpointsV2 should return no flows when flow_id is not found") { // Create a non-existent flowId that doesn't exist in the database val nonExistentFlowId: Long = Random.nextLong() From 68e56226fe250e578a4e46b358bf63d35c90c2d6 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Wed, 20 Nov 2024 13:04:42 +0100 Subject: [PATCH 02/19] fix test --- .../GetFlowCheckpointsIntegrationTests.scala | 47 +++++++++++-------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowCheckpointsIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowCheckpointsIntegrationTests.scala index bffd38237..1c0a84bc6 100644 --- a/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowCheckpointsIntegrationTests.scala +++ b/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowCheckpointsIntegrationTests.scala @@ -1,5 +1,7 @@ package za.co.absa.atum.database.flows +import io.circe.Json +import io.circe.parser.parse import za.co.absa.balta.DBTestSuite import za.co.absa.balta.classes.JsonBString import za.co.absa.balta.classes.setter.CustomDBType @@ -76,7 +78,7 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { |""".stripMargin ) - ignore("getFlowCheckpointsV2 should return all checkpoints for a given flow") { + test("getFlowCheckpointsV2 should return all checkpoints for a given flow") { val partitioningId: Long = Random.nextLong() table("runs.partitionings").insert( @@ -252,7 +254,6 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { } } - // ********* test("getFlowCheckpointsV2 should return all checkpoints of a given name and a given flow") { val partitioningId: Long = Random.nextLong() @@ -279,10 +280,8 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { // Insert checkpoints and measure definitions val checkpointId1 = UUID.fromString("e9efe108-d8fc-42ad-b367-aa1b17f6c450") -// val startTime = OffsetDateTime.parse("1993-02-14T10:00:00Z") - val startTime1 = OffsetDateTime.now() -// val endTime = OffsetDateTime.parse("2024-04-24T10:00:00Z") - val endTime1 = OffsetDateTime.now() + val startTime1 = OffsetDateTime.parse("1993-02-14T10:00:00Z") + val endTime1 = OffsetDateTime.parse("2024-04-24T10:00:00Z") table("runs.checkpoints").insert( add("id_checkpoint", checkpointId1) .add("fk_partitioning", partitioningId) @@ -307,10 +306,8 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { ) val checkpointId3 = UUID.fromString("41ea35cd-6398-42ed-b6d3-a8d613176575") -// val startTime3 = OffsetDateTime.parse("1993-02-14T11:00:00Z") - val startTime3 = startTime1.plusHours(1L) -// val endTime3 = OffsetDateTime.parse("2024-04-24T11:00:00Z") - val endTime3 = endTime1.plusHours(1L) + val startTime3 = OffsetDateTime.parse("1993-02-14T11:00:00Z") + val endTime3 = OffsetDateTime.parse("2024-04-24T11:00:00Z") table("runs.checkpoints").insert( add("id_checkpoint", checkpointId3) .add("fk_partitioning", partitioningId) @@ -361,16 +358,14 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { .add("measurement_value", measurementAvg) ) - val measureDefinitionCntId3: Long = Random.nextLong() table("runs.measurements").insert( - add("fk_measure_definition", measureDefinitionCntId3) + add("fk_measure_definition", measureDefinitionCntId) .add("fk_checkpoint", checkpointId3) .add("measurement_value", measurementCnt) ) - val measureDefinitionAvgId3: Long = Random.nextLong() table("runs.measurements").insert( - add("fk_measure_definition", measureDefinitionAvgId3) + add("fk_measure_definition", measureDefinitionAvgId) .add("fk_checkpoint", checkpointId3) .add("measurement_value", measurementAvg) ) @@ -398,7 +393,9 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { assert(row1.getOffsetDateTime("checkpoint_start_time").contains(startTime3)) assert(row1.getOffsetDateTime("checkpoint_end_time").contains(endTime3)) assert(row1.getLong("id_partitioning").contains(partitioningId)) - assert(row1.getJsonB("o_partitioning").contains(partitioning)) + val expectingPartitioningJson1 = parseJsonBStringOrThrow(partitioning) + val returnedPartitioningJson1 = parseJsonBStringOrThrow(row1.getJsonB("o_partitioning").get) + assert(expectingPartitioningJson1 == returnedPartitioningJson1) assert(row1.getString("partitioning_author").contains("Joseph")) val measure1 = MeasuredDetails( @@ -417,7 +414,9 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { assert(row2.getOffsetDateTime("checkpoint_start_time").contains(startTime3)) assert(row2.getOffsetDateTime("checkpoint_end_time").contains(endTime3)) assert(row2.getLong("id_partitioning").contains(partitioningId)) - assert(row2.getJsonB("o_partitioning").contains(partitioning)) + val expectingPartitioningJson2 = parseJsonBStringOrThrow(partitioning) + val returnedPartitioningJson2 = parseJsonBStringOrThrow(row2.getJsonB("o_partitioning").get) + assert(expectingPartitioningJson2 == returnedPartitioningJson2) assert(row2.getString("partitioning_author").contains("Joseph")) val measure2 = MeasuredDetails( @@ -438,7 +437,9 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { assert(row3.getOffsetDateTime("checkpoint_start_time").contains(startTime1)) assert(row3.getOffsetDateTime("checkpoint_end_time").contains(endTime1)) assert(row3.getLong("id_partitioning").contains(partitioningId)) - assert(row3.getJsonB("o_partitioning").contains(partitioning)) + val expectingPartitioningJson3 = parseJsonBStringOrThrow(partitioning) + val returnedPartitioningJson3 = parseJsonBStringOrThrow(row2.getJsonB("o_partitioning").get) + assert(expectingPartitioningJson3 == returnedPartitioningJson3) assert(row3.getString("partitioning_author").contains("Joseph")) val measure3 = MeasuredDetails( @@ -457,7 +458,9 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { assert(row4.getOffsetDateTime("checkpoint_start_time").contains(startTime1)) assert(row4.getOffsetDateTime("checkpoint_end_time").contains(endTime1)) assert(row4.getLong("id_partitioning").contains(partitioningId)) - assert(row4.getJsonB("o_partitioning").contains(partitioning)) + val expectingPartitioningJson4 = parseJsonBStringOrThrow(partitioning) + val returnedPartitioningJson4 = parseJsonBStringOrThrow(row2.getJsonB("o_partitioning").get) + assert(expectingPartitioningJson4 == returnedPartitioningJson4) assert(row4.getString("partitioning_author").contains("Joseph")) val measure4 = MeasuredDetails( @@ -487,7 +490,7 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { } } - ignore("getFlowCheckpointsV2 should return limited with checkpoints for a given flow") { + test("getFlowCheckpointsV2 should return limited with checkpoints for a given flow") { val partitioningId: Long = Random.nextLong() table("runs.partitionings").insert( @@ -665,7 +668,7 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { } } - ignore("getFlowCheckpointsV2 should return no flows when flow_id is not found") { + test("getFlowCheckpointsV2 should return no flows when flow_id is not found") { // Create a non-existent flowId that doesn't exist in the database val nonExistentFlowId: Long = Random.nextLong() @@ -682,4 +685,8 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { } + private def parseJsonBStringOrThrow(jsonBString: JsonBString): Json = { + parse(jsonBString.value).getOrElse(throw new Exception("Failed to parse JsonBString to Json")) + } + } From e6a65b3ea3ea4dd2d24b8bcf73f14f91e3a2f297 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Thu, 21 Nov 2024 11:37:20 +0100 Subject: [PATCH 03/19] tests --- .../flows/V0.2.0.57__get_flow_checkpoints.sql | 25 +-- .../flows/V0.3.0.1__get_flow_checkpoints.sql | 167 ++++++++++++++++++ .../dto/CheckpointWithPartitioningDTO.scala | 41 +++++ .../api/controller/FlowController.scala | 4 +- .../api/controller/FlowControllerImpl.scala | 6 +- .../flows/functions/GetFlowCheckpoints.scala | 33 ++-- .../absa/atum/server/api/http/Endpoints.scala | 4 +- .../co/absa/atum/server/api/http/Routes.scala | 4 +- .../api/repository/FlowRepository.scala | 4 +- .../api/repository/FlowRepositoryImpl.scala | 8 +- .../atum/server/api/service/FlowService.scala | 4 +- .../server/api/service/FlowServiceImpl.scala | 2 +- ...CheckpointItemWithPartitioningFromDB.scala | 126 +++++++++++++ .../za/co/absa/atum/server/api/TestData.scala | 107 ++++++++++- .../controller/FlowControllerUnitTests.scala | 12 +- ...etFlowCheckpointsV2EndpointUnitTests.scala | 30 ++-- .../repository/FlowRepositoryUnitTests.scala | 9 +- .../api/service/FlowServiceUnitTests.scala | 4 +- ...tItemWithPartitioningFromDBUnitTests.scala | 70 ++++++++ 19 files changed, 582 insertions(+), 78 deletions(-) create mode 100644 database/src/main/postgres/flows/V0.3.0.1__get_flow_checkpoints.sql create mode 100644 model/src/main/scala/za/co/absa/atum/model/dto/CheckpointWithPartitioningDTO.scala create mode 100644 server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDB.scala create mode 100644 server/src/test/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDBUnitTests.scala diff --git a/database/src/main/postgres/flows/V0.2.0.57__get_flow_checkpoints.sql b/database/src/main/postgres/flows/V0.2.0.57__get_flow_checkpoints.sql index 001fc4284..2d1d1d3f1 100644 --- a/database/src/main/postgres/flows/V0.2.0.57__get_flow_checkpoints.sql +++ b/database/src/main/postgres/flows/V0.2.0.57__get_flow_checkpoints.sql @@ -23,20 +23,17 @@ CREATE OR REPLACE FUNCTION flows.get_flow_checkpoints( OUT status_text TEXT, OUT id_checkpoint UUID, OUT checkpoint_name TEXT, - OUT checkpoint_author TEXT, + OUT author TEXT, OUT measured_by_atum_agent BOOLEAN, OUT measure_name TEXT, OUT measured_columns TEXT[], OUT measurement_value JSONB, OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE, OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE, - OUT id_partitioning BIGINT, - OUT o_partitioning JSONB, - OUT partitioning_author TEXT, OUT has_more BOOLEAN ) RETURNS SETOF record AS $$ --------------------------------------------------------------------------------------------------------------------- + -------------------------------------------------------------------------------------------------------------------- -- -- Function: flows.get_flow_checkpoints(4) -- Retrieves all checkpoints (measures and their measurement details) related to a primary flow @@ -64,7 +61,7 @@ $$ -- status_text - Status text -- id_checkpoint - ID of retrieved checkpoint -- checkpoint_name - Name of the retrieved checkpoint --- checkpoint_author - Author of the checkpoint +-- author - Author of the checkpoint -- measured_by_atum_agent - Flag indicating whether the checkpoint was measured by Atum Agent -- (if false, data supplied manually) -- measure_name - measure name associated with a given checkpoint @@ -72,9 +69,6 @@ $$ -- measurement_value - measurement details associated with a given checkpoint -- checkpoint_start_time - Time of the checkpoint -- checkpoint_end_time - End time of the checkpoint computation --- id_partitioning - ID of the partitioning --- o_partitioning - Partitioning value --- partitioning_author - Author of the partitioning -- has_more - flag indicating whether there are more checkpoints available, always `false` if `i_limit` is NULL -- -- Status codes: @@ -115,7 +109,6 @@ BEGIN RETURN QUERY WITH limited_checkpoints AS ( SELECT C.id_checkpoint, - C.fk_partitioning, C.checkpoint_name, C.created_by, C.measured_by_atum_agent, @@ -125,7 +118,7 @@ BEGIN JOIN flows.partitioning_to_flow PF ON C.fk_partitioning = PF.fk_partitioning WHERE PF.fk_flow = i_flow_id AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name) - ORDER BY C.process_start_time desc + ORDER BY C.id_checkpoint, C.process_start_time LIMIT i_checkpoints_limit OFFSET i_offset ) SELECT @@ -140,9 +133,6 @@ BEGIN M.measurement_value, LC.process_start_time AS checkpoint_start_time, LC.process_end_time AS checkpoint_end_time, - LC.fk_partitioning AS id_partitioning, - P.partitioning AS o_partitioning, - P.created_by AS partitioning_author, _has_more AS has_more FROM limited_checkpoints LC @@ -150,11 +140,10 @@ BEGIN runs.measurements M ON LC.id_checkpoint = M.fk_checkpoint INNER JOIN runs.measure_definitions MD ON M.fk_measure_definition = MD.id_measure_definition - INNER JOIN - runs.partitionings P ON LC.fk_partitioning = P.id_partitioning - ORDER BY LC.process_start_time desc; + ORDER BY + LC.id_checkpoint, LC.process_start_time; END; $$ -LANGUAGE plpgsql VOLATILE SECURITY DEFINER; + LANGUAGE plpgsql VOLATILE SECURITY DEFINER; GRANT EXECUTE ON FUNCTION flows.get_flow_checkpoints(BIGINT, INT, BIGINT, TEXT) TO atum_owner; diff --git a/database/src/main/postgres/flows/V0.3.0.1__get_flow_checkpoints.sql b/database/src/main/postgres/flows/V0.3.0.1__get_flow_checkpoints.sql new file mode 100644 index 000000000..122e3c7cb --- /dev/null +++ b/database/src/main/postgres/flows/V0.3.0.1__get_flow_checkpoints.sql @@ -0,0 +1,167 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +DROP FUNCTION IF EXISTS flows.get_flow_checkpoints( + i_flow_id BIGINT, + i_checkpoints_limit INT, + i_offset BIGINT, + i_checkpoint_name TEXT +); + +CREATE OR REPLACE FUNCTION flows.get_flow_checkpoints( + IN i_flow_id BIGINT, + IN i_checkpoints_limit INT DEFAULT 5, + IN i_offset BIGINT DEFAULT 0, + IN i_checkpoint_name TEXT DEFAULT NULL, + OUT status INTEGER, + OUT status_text TEXT, + OUT id_checkpoint UUID, + OUT checkpoint_name TEXT, + OUT checkpoint_author TEXT, + OUT measured_by_atum_agent BOOLEAN, + OUT measure_name TEXT, + OUT measured_columns TEXT[], + OUT measurement_value JSONB, + OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE, + OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE, + OUT id_partitioning BIGINT, + OUT o_partitioning JSONB, + OUT partitioning_author TEXT, + OUT has_more BOOLEAN +) RETURNS SETOF record AS +$$ +-------------------------------------------------------------------------------------------------------------------- +-- +-- Function: flows.get_flow_checkpoints(4) +-- Retrieves all checkpoints (measures and their measurement details) related to a primary flow +-- associated with the input partitioning. +-- +-- Note: a single row returned from this function doesn't contain all data related to a single checkpoint - it only +-- represents one measure associated with a checkpoint. So even if only a single checkpoint would be retrieved, +-- this function can potentially return multiple rows. +-- +-- Note: checkpoints will be retrieved in ordered fashion, by checkpoint_time and id_checkpoint +-- +-- Parameters: +-- i_partitioning_of_flow - partitioning to use for identifying the flow associate with checkpoints +-- that will be retrieved +-- i_checkpoints_limit - (optional) maximum number of checkpoint to return, returns all of them if NULL +-- i_offset - (optional) offset for checkpoints pagination +-- i_checkpoint_name - (optional) if specified, returns data related to particular checkpoint's name +-- +-- Note: i_checkpoint_limit and i_offset are used for pagination purposes; +-- checkpoints are ordered by process_start_time in descending order +-- and then by id_checkpoint in ascending order +-- +-- Returns: +-- status - Status code +-- status_text - Status text +-- id_checkpoint - ID of retrieved checkpoint +-- checkpoint_name - Name of the retrieved checkpoint +-- checkpoint_author - Author of the checkpoint +-- measured_by_atum_agent - Flag indicating whether the checkpoint was measured by Atum Agent +-- (if false, data supplied manually) +-- measure_name - measure name associated with a given checkpoint +-- measured_columns - measure columns associated with a given checkpoint +-- measurement_value - measurement details associated with a given checkpoint +-- checkpoint_start_time - Time of the checkpoint +-- checkpoint_end_time - End time of the checkpoint computation +-- id_partitioning - ID of the partitioning +-- o_partitioning - Partitioning value +-- partitioning_author - Author of the partitioning +-- has_more - flag indicating whether there are more checkpoints available, always `false` if `i_limit` is NULL +-- +-- Status codes: +-- 11 - OK +-- 42 - Flow not found +--------------------------------------------------------------------------------------------------- +DECLARE + _has_more BOOLEAN; +BEGIN + -- Check if the flow exists by querying the partitioning_to_flow table. + -- Rationale: + -- This table is preferred over the flows table because: + -- 1. Every flow has at least one record in partitioning_to_flow. + -- 2. This table is used in subsequent queries, providing a caching advantage. + -- 3. Improves performance by reducing the need to query the flows table directly. + PERFORM 1 FROM flows.partitioning_to_flow WHERE fk_flow = i_flow_id; + IF NOT FOUND THEN + status := 42; + status_text := 'Flow not found'; + RETURN NEXT; + RETURN; + END IF; + + -- Determine if there are more checkpoints than the limit + IF i_checkpoints_limit IS NOT NULL THEN + SELECT count(*) > i_checkpoints_limit + FROM runs.checkpoints C + JOIN flows.partitioning_to_flow PF ON C.fk_partitioning = PF.fk_partitioning + WHERE PF.fk_flow = i_flow_id + AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name) + LIMIT i_checkpoints_limit + 1 OFFSET i_offset + INTO _has_more; + ELSE + _has_more := false; + END IF; + + -- Retrieve the checkpoints and their associated measurements + RETURN QUERY + WITH limited_checkpoints AS ( + SELECT C.id_checkpoint, + C.fk_partitioning, + C.checkpoint_name, + C.created_by, + C.measured_by_atum_agent, + C.process_start_time, + C.process_end_time + FROM runs.checkpoints C + JOIN flows.partitioning_to_flow PF ON C.fk_partitioning = PF.fk_partitioning + WHERE PF.fk_flow = i_flow_id + AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name) + ORDER BY C.process_start_time desc + LIMIT i_checkpoints_limit OFFSET i_offset + ) + SELECT + 11 AS status, + 'OK' AS status_text, + LC.id_checkpoint, + LC.checkpoint_name, + LC.created_by AS author, + LC.measured_by_atum_agent, + MD.measure_name, + MD.measured_columns, + M.measurement_value, + LC.process_start_time AS checkpoint_start_time, + LC.process_end_time AS checkpoint_end_time, + LC.fk_partitioning AS id_partitioning, + P.partitioning AS o_partitioning, + P.created_by AS partitioning_author, + _has_more AS has_more + FROM + limited_checkpoints LC + INNER JOIN + runs.measurements M ON LC.id_checkpoint = M.fk_checkpoint + INNER JOIN + runs.measure_definitions MD ON M.fk_measure_definition = MD.id_measure_definition + INNER JOIN + runs.partitionings P ON LC.fk_partitioning = P.id_partitioning + ORDER BY LC.process_start_time desc; +END; +$$ +LANGUAGE plpgsql VOLATILE SECURITY DEFINER; + +GRANT EXECUTE ON FUNCTION flows.get_flow_checkpoints(BIGINT, INT, BIGINT, TEXT) TO atum_owner; diff --git a/model/src/main/scala/za/co/absa/atum/model/dto/CheckpointWithPartitioningDTO.scala b/model/src/main/scala/za/co/absa/atum/model/dto/CheckpointWithPartitioningDTO.scala new file mode 100644 index 000000000..d72263201 --- /dev/null +++ b/model/src/main/scala/za/co/absa/atum/model/dto/CheckpointWithPartitioningDTO.scala @@ -0,0 +1,41 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.model.dto + +import io.circe.{Decoder, Encoder} +import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder} + +import java.time.ZonedDateTime +import java.util.UUID + +case class CheckpointWithPartitioningDTO( + id: UUID, + name: String, + author: String, + measuredByAtumAgent: Boolean = false, + processStartTime: ZonedDateTime, + processEndTime: Option[ZonedDateTime], + measurements: Set[MeasurementDTO], + partitioning: PartitioningWithIdDTO +) + +object CheckpointWithPartitioningDTO { + + implicit val decodeCheckpointWithPartitioningDTO: Decoder[CheckpointWithPartitioningDTO] = deriveDecoder + implicit val encodeCheckpointWithPartitioningDTO: Encoder[CheckpointWithPartitioningDTO] = deriveEncoder + +} diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowController.scala index e31686375..505656ae4 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowController.scala @@ -16,7 +16,7 @@ package za.co.absa.atum.server.api.controller -import za.co.absa.atum.model.dto.CheckpointV2DTO +import za.co.absa.atum.model.dto.CheckpointWithPartitioningDTO import za.co.absa.atum.model.envelopes.ErrorResponse import za.co.absa.atum.model.envelopes.SuccessResponse.PaginatedResponse import zio.IO @@ -29,6 +29,6 @@ trait FlowController { limit: Option[Int], offset: Option[Long], checkpointName: Option[String], - ): IO[ErrorResponse, PaginatedResponse[CheckpointV2DTO]] + ): IO[ErrorResponse, PaginatedResponse[CheckpointWithPartitioningDTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowControllerImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowControllerImpl.scala index 727e57292..ebd700304 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowControllerImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowControllerImpl.scala @@ -16,7 +16,7 @@ package za.co.absa.atum.server.api.controller -import za.co.absa.atum.model.dto.CheckpointV2DTO +import za.co.absa.atum.model.dto.CheckpointWithPartitioningDTO import za.co.absa.atum.model.envelopes.ErrorResponse import za.co.absa.atum.model.envelopes.SuccessResponse.PaginatedResponse import za.co.absa.atum.server.api.service.FlowService @@ -31,8 +31,8 @@ class FlowControllerImpl(flowService: FlowService) extends FlowController with B limit: Option[Int], offset: Option[Long], checkpointName: Option[String] - ): IO[ErrorResponse, PaginatedResponse[CheckpointV2DTO]] = { - val flowData = serviceCall[PaginatedResult[CheckpointV2DTO], PaginatedResult[CheckpointV2DTO]]( + ): IO[ErrorResponse, PaginatedResponse[CheckpointWithPartitioningDTO]] = { + val flowData = serviceCall[PaginatedResult[CheckpointWithPartitioningDTO], PaginatedResult[CheckpointWithPartitioningDTO]]( flowService.getFlowCheckpoints(flowId, limit, offset, checkpointName) ) mapToPaginatedResponse(limit.get, offset.get, flowData) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpoints.scala index b24b06122..4289b9664 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpoints.scala @@ -20,51 +20,54 @@ import doobie.implicits.toSqlInterpolator import za.co.absa.atum.server.api.database.PostgresDatabaseProvider import za.co.absa.atum.server.api.database.flows.Flows import za.co.absa.atum.server.api.database.flows.functions.GetFlowCheckpoints.GetFlowCheckpointsArgs -import za.co.absa.atum.server.model.CheckpointItemFromDB +import za.co.absa.atum.server.model.CheckpointItemWithPartitioningFromDB import za.co.absa.db.fadb.DBSchema import za.co.absa.db.fadb.doobie.DoobieEngine import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWithAggStatus import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling import zio._ - import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbGet import za.co.absa.atum.server.api.database.DoobieImplicits.Sequence.get import doobie.postgres.implicits._ - class GetFlowCheckpoints(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) - extends DoobieMultipleResultFunctionWithAggStatus[GetFlowCheckpointsArgs, Option[CheckpointItemFromDB], Task](input => - Seq( - fr"${input.flowId}", - fr"${input.limit}", - fr"${input.offset}", - fr"${input.checkpointName}" + extends DoobieMultipleResultFunctionWithAggStatus[GetFlowCheckpointsArgs, Option[ + CheckpointItemWithPartitioningFromDB + ], Task](input => + Seq( + fr"${input.flowId}", + fr"${input.limit}", + fr"${input.offset}", + fr"${input.checkpointName}" + ) ) - ) with StandardStatusHandling with ByFirstErrorStatusAggregator { override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq( "id_checkpoint", "checkpoint_name", - "author", + "checkpoint_author", "measured_by_atum_agent", "measure_name", "measured_columns", "measurement_value", "checkpoint_start_time", "checkpoint_end_time", + "id_partitioning", + "o_partitioning", + "partitioning_author", "has_more" ) } object GetFlowCheckpoints { case class GetFlowCheckpointsArgs( - flowId: Long, - limit: Option[Int], - offset: Option[Long], - checkpointName: Option[String] + flowId: Long, + limit: Option[Int], + offset: Option[Long], + checkpointName: Option[String] ) val layer: URLayer[PostgresDatabaseProvider, GetFlowCheckpoints] = ZLayer { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala index 2c078918f..8a138d4b2 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala @@ -133,14 +133,14 @@ trait Endpoints extends BaseEndpoints { } protected val getFlowCheckpointsEndpointV2 - : PublicEndpoint[(Long, Option[Int], Option[Long], Option[String]), ErrorResponse, PaginatedResponse[CheckpointV2DTO], Any] = { + : PublicEndpoint[(Long, Option[Int], Option[Long], Option[String]), ErrorResponse, PaginatedResponse[CheckpointWithPartitioningDTO], Any] = { apiV2.get .in(V2Paths.Flows / path[Long]("flowId") / V2Paths.Checkpoints) .in(query[Option[Int]]("limit").default(Some(10)).validateOption(Validator.inRange(1, 1000))) .in(query[Option[Long]]("offset").default(Some(0L)).validateOption(Validator.min(0L))) .in(query[Option[String]]("checkpoint-name")) .out(statusCode(StatusCode.Ok)) - .out(jsonBody[PaginatedResponse[CheckpointV2DTO]]) + .out(jsonBody[PaginatedResponse[CheckpointWithPartitioningDTO]]) .errorOutVariantPrepend(notFoundErrorOneOfVariant) } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala index cc13fffa6..00ade985b 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala @@ -24,7 +24,7 @@ import sttp.tapir.server.http4s.ztapir.ZHttp4sServerInterpreter import sttp.tapir.server.interceptor.metrics.MetricsRequestInterceptor import sttp.tapir.swagger.bundle.SwaggerInterpreter import sttp.tapir.ztapir._ -import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataPatchDTO, CheckpointV2DTO, PartitioningWithIdDTO} +import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataPatchDTO, CheckpointV2DTO, CheckpointWithPartitioningDTO, PartitioningWithIdDTO} import za.co.absa.atum.model.envelopes.{ErrorResponse, StatusResponse} import za.co.absa.atum.server.api.controller.{CheckpointController, FlowController, PartitioningController} import za.co.absa.atum.server.config.{HttpMonitoringConfig, JvmMonitoringConfig} @@ -93,7 +93,7 @@ trait Routes extends Endpoints with ServerOptions { createServerEndpoint[ (Long, Option[Int], Option[Long], Option[String]), ErrorResponse, - PaginatedResponse[CheckpointV2DTO] + PaginatedResponse[CheckpointWithPartitioningDTO] ](getFlowCheckpointsEndpointV2, { case (flowId: Long, limit: Option[Int], offset: Option[Long], checkpointName: Option[String]) => FlowController.getFlowCheckpoints(flowId, limit, offset, checkpointName) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepository.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepository.scala index 1749d13be..ae4412e95 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepository.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepository.scala @@ -16,7 +16,7 @@ package za.co.absa.atum.server.api.repository -import za.co.absa.atum.model.dto.CheckpointV2DTO +import za.co.absa.atum.model.dto.CheckpointWithPartitioningDTO import za.co.absa.atum.server.api.exception.DatabaseError import za.co.absa.atum.server.model.PaginatedResult import zio._ @@ -30,6 +30,6 @@ trait FlowRepository { limit: Option[Int], offset: Option[Long], checkpointName: Option[String] - ): IO[DatabaseError, PaginatedResult[CheckpointV2DTO]] + ): IO[DatabaseError, PaginatedResult[CheckpointWithPartitioningDTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepositoryImpl.scala index d52ef71c3..b8bb9dcbf 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepositoryImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepositoryImpl.scala @@ -16,10 +16,10 @@ package za.co.absa.atum.server.api.repository -import za.co.absa.atum.model.dto.CheckpointV2DTO +import za.co.absa.atum.model.dto.CheckpointWithPartitioningDTO import za.co.absa.atum.server.api.database.flows.functions.GetFlowCheckpoints import za.co.absa.atum.server.api.exception.DatabaseError -import za.co.absa.atum.server.model.{CheckpointItemFromDB, PaginatedResult} +import za.co.absa.atum.server.model.{CheckpointItemWithPartitioningFromDB, PaginatedResult} import za.co.absa.atum.server.api.database.flows.functions.GetFlowCheckpoints.GetFlowCheckpointsArgs import za.co.absa.atum.server.api.exception.DatabaseError.GeneralDatabaseError import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore} @@ -34,7 +34,7 @@ class FlowRepositoryImpl(getFlowCheckpointsFn: GetFlowCheckpoints) limit: Option[Int], offset: Option[Long], checkpointName: Option[String] - ): IO[DatabaseError, PaginatedResult[CheckpointV2DTO]] = { + ): IO[DatabaseError, PaginatedResult[CheckpointWithPartitioningDTO]] = { dbMultipleResultCallWithAggregatedStatus( getFlowCheckpointsFn(GetFlowCheckpointsArgs(flowId, limit, offset, checkpointName)), "getFlowCheckpoints" @@ -42,7 +42,7 @@ class FlowRepositoryImpl(getFlowCheckpointsFn: GetFlowCheckpoints) .map(_.flatten) .flatMap { checkpointItems => ZIO - .fromEither(CheckpointItemFromDB.groupAndConvertItemsToCheckpointV2DTOs(checkpointItems)) + .fromEither(CheckpointItemWithPartitioningFromDB.groupAndConvertItemsToCheckpointWithPartitioningDTOs(checkpointItems)) .mapBoth( error => GeneralDatabaseError(error.getMessage), checkpoints => diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/FlowService.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/FlowService.scala index b3e9fc823..73017fb96 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/FlowService.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/FlowService.scala @@ -16,7 +16,7 @@ package za.co.absa.atum.server.api.service -import za.co.absa.atum.model.dto.CheckpointV2DTO +import za.co.absa.atum.model.dto.CheckpointWithPartitioningDTO import za.co.absa.atum.server.api.exception.ServiceError import za.co.absa.atum.server.model.PaginatedResult import zio._ @@ -30,6 +30,6 @@ trait FlowService { limit: Option[Int], offset: Option[Long], checkpointName: Option[String] - ): IO[ServiceError, PaginatedResult[CheckpointV2DTO]] + ): IO[ServiceError, PaginatedResult[CheckpointWithPartitioningDTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/FlowServiceImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/FlowServiceImpl.scala index 8ba8a6b9f..66a520c28 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/FlowServiceImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/FlowServiceImpl.scala @@ -29,7 +29,7 @@ class FlowServiceImpl(flowRepository: FlowRepository) extends FlowService with B limit: Option[Int], offset: Option[Long], checkpointName: Option[String] - ): IO[ServiceError, PaginatedResult[CheckpointV2DTO]] = { + ): IO[ServiceError, PaginatedResult[CheckpointWithPartitioningDTO]] = { repositoryCall( flowRepository.getFlowCheckpoints(flowId, limit, offset, checkpointName), "getFlowCheckpoints" diff --git a/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDB.scala b/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDB.scala new file mode 100644 index 000000000..fe2ce31a8 --- /dev/null +++ b/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDB.scala @@ -0,0 +1,126 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.model + +import io.circe.{DecodingFailure, Json} +import za.co.absa.atum.model.dto.{ + CheckpointWithPartitioningDTO, + MeasureDTO, + MeasureResultDTO, + MeasurementDTO, + PartitionDTO, + PartitioningWithIdDTO +} + +import java.time.ZonedDateTime +import java.util.UUID + +case class CheckpointItemWithPartitioningFromDB( + idCheckpoint: UUID, + checkpointName: String, + author: String, + measuredByAtumAgent: Boolean, + measureName: String, + measuredColumns: Seq[String], + measurementValue: Json, // JSON representation of `MeasurementDTO` + checkpointStartTime: ZonedDateTime, + checkpointEndTime: Option[ZonedDateTime], + idPartitioning: Long, + partitioning: Json, + partitioningAuthor: String, + hasMore: Boolean +) + +object CheckpointItemWithPartitioningFromDB { + + private def fromItemsToCheckpointWithPartitioningDTO( + checkpointItems: Seq[CheckpointItemWithPartitioningFromDB] + ): Either[Throwable, CheckpointWithPartitioningDTO] = { + val measurementsOrErr = checkpointItems.map { checkpointItem => + checkpointItem.measurementValue.as[MeasureResultDTO].map { measureResult => + MeasurementDTO( + measure = MeasureDTO( + measureName = checkpointItem.measureName, + measuredColumns = checkpointItem.measuredColumns + ), + result = measureResult + ) + } + } + val partitioningOrErr: Either[DecodingFailure, PartitioningWithIdDTO] = { + val decodingResult = checkpointItems.head.partitioning.as[PartitioningForDB] + decodingResult match { + case Left(decodingFailure) => Left(decodingFailure) + case Right(partitioningForDB) => + val partitioningDTO = partitioningForDB.keys.map { key => + PartitionDTO(key, partitioningForDB.keysToValuesMap(key)) + } + Right( + PartitioningWithIdDTO( + id = checkpointItems.head.idPartitioning, + partitioning = partitioningDTO, + author = checkpointItems.head.partitioningAuthor + ) + ) + } + + } + + val measurementsErrors = measurementsOrErr.collect { case Left(err) => err } + val errors = measurementsErrors ++ partitioningOrErr.left.toSeq + + if (errors.nonEmpty) { + Left(measurementsErrors.head) + } else { + val measurements = measurementsOrErr.collect { case Right(measurement) => measurement }.toSet + Right( + CheckpointWithPartitioningDTO( + id = checkpointItems.head.idCheckpoint, + name = checkpointItems.head.checkpointName, + author = checkpointItems.head.author, + measuredByAtumAgent = checkpointItems.head.measuredByAtumAgent, + processStartTime = checkpointItems.head.checkpointStartTime, + processEndTime = checkpointItems.head.checkpointEndTime, + measurements = measurements, + partitioningOrErr.toOption.get + ) + ) + } + } + + def groupAndConvertItemsToCheckpointWithPartitioningDTOs( + checkpointItems: Seq[CheckpointItemWithPartitioningFromDB] + ): Either[Throwable, Seq[CheckpointWithPartitioningDTO]] = { + val groupedItems = checkpointItems.groupBy(_.idCheckpoint) + val orderedIds = checkpointItems + .sortBy(_.checkpointStartTime)(Ordering[ZonedDateTime].reverse) + .map(_.idCheckpoint) + .distinct + + val result = orderedIds.map { id => + CheckpointItemWithPartitioningFromDB.fromItemsToCheckpointWithPartitioningDTO(groupedItems(id)) + } + + val errors = result.collect { case Left(err) => err } + if (errors.nonEmpty) { + Left(errors.head) + } else { + Right(result.collect { case Right(dto) => dto }) + } + } + +} diff --git a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala index 6d39bf2f4..cd9647c50 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala @@ -16,13 +16,13 @@ package za.co.absa.atum.server.api -import io.circe.parser +import io.circe.{Json, parser} import io.circe.syntax.EncoderOps import za.co.absa.atum.model.dto.MeasureResultDTO.TypedValue import za.co.absa.atum.model.dto._ import za.co.absa.atum.model.{ResultValueType, dto} import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings.GetFlowPartitioningsResult -import za.co.absa.atum.server.model.{CheckpointFromDB, CheckpointItemFromDB, MeasureFromDB, PartitioningFromDB} +import za.co.absa.atum.server.model.{CheckpointFromDB, CheckpointItemFromDB, CheckpointItemWithPartitioningFromDB, MeasureFromDB, PartitioningFromDB} import java.time.ZonedDateTime import java.util.{Base64, UUID} @@ -50,7 +50,7 @@ trait TestData { authorIfNew = "" ) - private val partitioningAsJson = parser + protected val partitioningAsJson: Json = parser .parse( """ |{ @@ -299,6 +299,29 @@ trait TestData { protected val checkpointV2DTO3: CheckpointV2DTO = checkpointV2DTO1.copy(id = UUID.randomUUID()) protected val checkpointV2DTO4: CheckpointV2DTO = checkpointV2DTO1.copy(id = UUID.randomUUID()) + protected val measurementValue1: Json = + parser + .parse( + """ + |{ + | "mainValue": { + | "value": "123", + | "valueType": "Long" + | }, + | "supportValues": { + | "key1": { + | "value": "123456789", + | "valueType": "Long" + | }, + | "key2": { + | "value": "12345.6789", + | "valueType": "BigDecimal" + | } + | } + |} + |""".stripMargin + ).toOption.get + // Checkpoint From DB protected val checkpointFromDB1: CheckpointFromDB = CheckpointFromDB( idCheckpoint = Some(checkpointDTO1.id), @@ -381,6 +404,84 @@ trait TestData { hasMore = false ) + protected val checkpointItemWithPartitioningFromDB1: CheckpointItemWithPartitioningFromDB = + CheckpointItemWithPartitioningFromDB( + idCheckpoint = checkpointItemFromDB1.idCheckpoint, + checkpointName = checkpointItemFromDB1.checkpointName, + author = checkpointItemFromDB1.author, + measuredByAtumAgent = checkpointItemFromDB1.measuredByAtumAgent, + measureName = checkpointItemFromDB1.measureName, + measuredColumns = checkpointItemFromDB1.measuredColumns, + measurementValue = checkpointItemFromDB1.measurementValue, + checkpointStartTime = checkpointItemFromDB1.checkpointStartTime, + checkpointEndTime = checkpointItemFromDB1.checkpointEndTime, + idPartitioning = partitioningFromDB1.id, + partitioning = partitioningFromDB1.partitioning, + partitioningAuthor = partitioningFromDB1.author, + hasMore = checkpointItemFromDB1.hasMore + ) + + protected val checkpointItemWithPartitioningFromDB2: CheckpointItemWithPartitioningFromDB = + CheckpointItemWithPartitioningFromDB( + idCheckpoint = checkpointItemFromDB2.idCheckpoint, + checkpointName = checkpointItemFromDB2.checkpointName, + author = checkpointItemFromDB2.author, + measuredByAtumAgent = checkpointItemFromDB2.measuredByAtumAgent, + measureName = checkpointItemFromDB2.measureName, + measuredColumns = checkpointItemFromDB2.measuredColumns, + measurementValue = checkpointItemFromDB2.measurementValue, + checkpointStartTime = checkpointItemFromDB2.checkpointStartTime, + checkpointEndTime = checkpointItemFromDB2.checkpointEndTime, + idPartitioning = partitioningFromDB1.id, + partitioning = partitioningFromDB1.partitioning, + partitioningAuthor = partitioningFromDB1.author, + hasMore = checkpointItemFromDB2.hasMore + ) + + protected val checkpointWithPartitioningDTO1: CheckpointWithPartitioningDTO = + CheckpointWithPartitioningDTO( + id = checkpointItemWithPartitioningFromDB1.idCheckpoint, + name = checkpointItemWithPartitioningFromDB1.checkpointName, + author = checkpointItemWithPartitioningFromDB1.author, + measuredByAtumAgent = checkpointItemWithPartitioningFromDB1.measuredByAtumAgent, + processStartTime = checkpointItemWithPartitioningFromDB1.checkpointStartTime, + processEndTime = checkpointItemWithPartitioningFromDB1.checkpointEndTime, + measurements = Seq( + MeasurementDTO( + measure = MeasureDTO( + measureName = checkpointItemWithPartitioningFromDB1.measureName, + measuredColumns = checkpointItemWithPartitioningFromDB1.measuredColumns + ), + result = checkpointItemWithPartitioningFromDB1.measurementValue.as[MeasureResultDTO].getOrElse( + throw new Exception("Failed to parse JSON") + ) + ) + ).toSet, + partitioning = partitioningWithIdDTO1 + ) + + protected val checkpointWithPartitioningDTO2: CheckpointWithPartitioningDTO = + CheckpointWithPartitioningDTO( + id = checkpointItemWithPartitioningFromDB2.idCheckpoint, + name = checkpointItemWithPartitioningFromDB2.checkpointName, + author = checkpointItemWithPartitioningFromDB2.author, + measuredByAtumAgent = checkpointItemWithPartitioningFromDB2.measuredByAtumAgent, + processStartTime = checkpointItemWithPartitioningFromDB2.checkpointStartTime, + processEndTime = checkpointItemWithPartitioningFromDB2.checkpointEndTime, + measurements = Seq( + MeasurementDTO( + measure = MeasureDTO( + measureName = checkpointItemWithPartitioningFromDB2.measureName, + measuredColumns = checkpointItemWithPartitioningFromDB2.measuredColumns + ), + result = checkpointItemWithPartitioningFromDB2.measurementValue.as[MeasureResultDTO].getOrElse( + throw new Exception("Failed to parse JSON") + ) + ) + ).toSet, + partitioning = partitioningWithIdDTO1 + ) + protected def createAtumContextDTO(partitioningSubmitDTO: PartitioningSubmitDTO): AtumContextDTO = { val measures: Set[MeasureDTO] = Set(MeasureDTO("count", Seq("*"))) val additionalData: Map[String, Option[String]] = Map.empty diff --git a/server/src/test/scala/za/co/absa/atum/server/api/controller/FlowControllerUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/controller/FlowControllerUnitTests.scala index 5dcf9e87d..8db88a54d 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/controller/FlowControllerUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/controller/FlowControllerUnitTests.scala @@ -30,9 +30,9 @@ object FlowControllerUnitTests extends ZIOSpecDefault with TestData { private val flowServiceMock = mock(classOf[FlowService]) when(flowServiceMock.getFlowCheckpoints(1L, Some(5), Some(2), None)) - .thenReturn(ZIO.succeed(ResultHasMore(Seq(checkpointV2DTO1)))) + .thenReturn(ZIO.succeed(ResultHasMore(Seq(checkpointWithPartitioningDTO1)))) when(flowServiceMock.getFlowCheckpoints(2L, Some(5), Some(0), None)) - .thenReturn(ZIO.succeed(ResultNoMore(Seq(checkpointV2DTO2)))) + .thenReturn(ZIO.succeed(ResultNoMore(Seq(checkpointWithPartitioningDTO2)))) when(flowServiceMock.getFlowCheckpoints(3L, Some(5), Some(0), None)) .thenReturn(ZIO.fail(NotFoundServiceError("Flow not found"))) @@ -44,12 +44,16 @@ object FlowControllerUnitTests extends ZIOSpecDefault with TestData { test("Returns expected Seq[CheckpointV2DTO] with Pagination indicating there is more data available") { for { result <- FlowController.getFlowCheckpoints(1L, Some(5), Some(2), None) - } yield assertTrue(result.data == Seq(checkpointV2DTO1) && result.pagination == Pagination(5, 2, hasMore = true)) + } yield assertTrue( + result.data == Seq(checkpointWithPartitioningDTO1) && result.pagination == Pagination(5, 2, hasMore = true) + ) }, test("Returns expected Seq[CheckpointV2DTO] with Pagination indicating there is no more data available") { for { result <- FlowController.getFlowCheckpoints(2L, Some(5), Some(0), None) - } yield assertTrue(result.data == Seq(checkpointV2DTO2) && result.pagination == Pagination(5, 0, hasMore = false)) + } yield assertTrue( + result.data == Seq(checkpointWithPartitioningDTO2) && result.pagination == Pagination(5, 0, hasMore = false) + ) }, test("Returns expected NotFoundServiceError when service returns NotFoundServiceError") { assertZIO(FlowController.getFlowCheckpoints(3L, Some(5), Some(0), None).exit)( diff --git a/server/src/test/scala/za/co/absa/atum/server/api/http/GetFlowCheckpointsV2EndpointUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/http/GetFlowCheckpointsV2EndpointUnitTests.scala index 3bc315f02..50a97eac4 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/http/GetFlowCheckpointsV2EndpointUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/http/GetFlowCheckpointsV2EndpointUnitTests.scala @@ -23,7 +23,7 @@ import sttp.client3.circe.asJson import sttp.model.StatusCode import sttp.tapir.server.stub.TapirStubInterpreter import sttp.tapir.ztapir.{RIOMonadError, RichZEndpoint} -import za.co.absa.atum.model.dto.CheckpointV2DTO +import za.co.absa.atum.model.dto.CheckpointWithPartitioningDTO import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.controller.FlowController import za.co.absa.atum.model.envelopes.{NotFoundErrorResponse, Pagination} @@ -39,18 +39,22 @@ object GetFlowCheckpointsV2EndpointUnitTests extends ZIOSpecDefault with Endpoin private val uuid = UUID.randomUUID() when(flowControllerMockV2.getFlowCheckpoints(1L, Some(5), Some(0), None)) - .thenReturn(ZIO.succeed(PaginatedResponse(Seq(checkpointV2DTO1), Pagination(5, 0, hasMore = true), uuid))) + .thenReturn( + ZIO.succeed(PaginatedResponse(Seq(checkpointWithPartitioningDTO1), Pagination(5, 0, hasMore = true), uuid)) + ) when(flowControllerMockV2.getFlowCheckpoints(2L, Some(5), Some(0), None)) - .thenReturn(ZIO.succeed(PaginatedResponse(Seq(checkpointV2DTO2), Pagination(5, 0, hasMore = false), uuid))) + .thenReturn( + ZIO.succeed(PaginatedResponse(Seq(checkpointWithPartitioningDTO2), Pagination(5, 0, hasMore = false), uuid)) + ) when(flowControllerMockV2.getFlowCheckpoints(3L, Some(5), Some(0), None)) .thenReturn(ZIO.fail(NotFoundErrorResponse("Flow not found for a given ID"))) private val flowControllerMockLayerV2 = ZLayer.succeed(flowControllerMockV2) private val getFlowCheckpointServerEndpointV2 = getFlowCheckpointsEndpointV2.zServerLogic({ - case (flowId: Long, limit: Option[Int], offset: Option[Long], checkpointName: Option[String]) => - FlowController.getFlowCheckpoints(flowId, limit, offset, checkpointName) - }) + case (flowId: Long, limit: Option[Int], offset: Option[Long], checkpointName: Option[String]) => + FlowController.getFlowCheckpoints(flowId, limit, offset, checkpointName) + }) def spec: Spec[TestEnvironment with Scope, Any] = { val backendStub = TapirStubInterpreter(SttpBackendStub.apply(new RIOMonadError[FlowController])) @@ -63,7 +67,7 @@ object GetFlowCheckpointsV2EndpointUnitTests extends ZIOSpecDefault with Endpoin val baseUri = uri"https://test.com/api/v2/flows/1/checkpoints?limit=5&offset=0" val response = basicRequest .get(baseUri) - .response(asJson[PaginatedResponse[CheckpointV2DTO]]) + .response(asJson[PaginatedResponse[CheckpointWithPartitioningDTO]]) .send(backendStub) val body = response.map(_.body) @@ -71,7 +75,7 @@ object GetFlowCheckpointsV2EndpointUnitTests extends ZIOSpecDefault with Endpoin assertZIO(body <&> statusCode)( equalTo( - Right(PaginatedResponse(Seq(checkpointV2DTO1), Pagination(5, 0, hasMore = true), uuid)), + Right(PaginatedResponse(Seq(checkpointWithPartitioningDTO1), Pagination(5, 0, hasMore = true), uuid)), StatusCode.Ok ) ) @@ -80,7 +84,7 @@ object GetFlowCheckpointsV2EndpointUnitTests extends ZIOSpecDefault with Endpoin val baseUri = uri"https://test.com/api/v2/flows/2/checkpoints?limit=5&offset=0" val response = basicRequest .get(baseUri) - .response(asJson[PaginatedResponse[CheckpointV2DTO]]) + .response(asJson[PaginatedResponse[CheckpointWithPartitioningDTO]]) .send(backendStub) val body = response.map(_.body) @@ -90,7 +94,7 @@ object GetFlowCheckpointsV2EndpointUnitTests extends ZIOSpecDefault with Endpoin assertZIO(body <&> statusCode)( equalTo( - Right(PaginatedResponse(Seq(checkpointV2DTO2), Pagination(5, 0, hasMore = false), uuid)), + Right(PaginatedResponse(Seq(checkpointWithPartitioningDTO2), Pagination(5, 0, hasMore = false), uuid)), StatusCode.Ok ) ) @@ -99,7 +103,7 @@ object GetFlowCheckpointsV2EndpointUnitTests extends ZIOSpecDefault with Endpoin val baseUri = uri"https://test.com/api/v2/flows/3/checkpoints?limit=5&offset=0" val response = basicRequest .get(baseUri) - .response(asJson[PaginatedResponse[CheckpointV2DTO]]) + .response(asJson[PaginatedResponse[CheckpointWithPartitioningDTO]]) .send(backendStub) val statusCode = response.map(_.code) @@ -110,7 +114,7 @@ object GetFlowCheckpointsV2EndpointUnitTests extends ZIOSpecDefault with Endpoin val baseUri = uri"https://test.com/api/v2/flows/1/checkpoints?limit=1005&offset=0" val response = basicRequest .get(baseUri) - .response(asJson[PaginatedResponse[CheckpointV2DTO]]) + .response(asJson[PaginatedResponse[CheckpointWithPartitioningDTO]]) .send(backendStub) val statusCode = response.map(_.code) @@ -121,7 +125,7 @@ object GetFlowCheckpointsV2EndpointUnitTests extends ZIOSpecDefault with Endpoin val baseUri = uri"https://test.com/api/v2/flows/1/checkpoints?limit=-1&offset=0" val response = basicRequest .get(baseUri) - .response(asJson[PaginatedResponse[CheckpointV2DTO]]) + .response(asJson[PaginatedResponse[CheckpointWithPartitioningDTO]]) .send(backendStub) val statusCode = response.map(_.code) diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/FlowRepositoryUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/FlowRepositoryUnitTests.scala index 595eb56e5..e2eef59ec 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/repository/FlowRepositoryUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/FlowRepositoryUnitTests.scala @@ -17,7 +17,7 @@ package za.co.absa.atum.server.api.repository import org.mockito.Mockito.{mock, when} -import za.co.absa.atum.model.dto.CheckpointV2DTO +import za.co.absa.atum.model.dto.CheckpointWithPartitioningDTO import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.database.flows.functions.GetFlowCheckpoints.GetFlowCheckpointsArgs import za.co.absa.atum.server.api.database.flows.functions.GetFlowCheckpoints @@ -38,8 +38,7 @@ object FlowRepositoryUnitTests extends ZIOSpecDefault with TestData { .thenReturn( ZIO.right( Seq( - Row(FunctionStatus(11, "success"), Some(checkpointItemFromDB1)), - Row(FunctionStatus(11, "success"), Some(checkpointItemFromDB2)) + Row(FunctionStatus(11, "success"), Some(checkpointItemWithPartitioningFromDB1)) ) ) ) @@ -57,12 +56,12 @@ object FlowRepositoryUnitTests extends ZIOSpecDefault with TestData { test("Returns expected Right with CheckpointV2DTO") { for { result <- FlowRepository.getFlowCheckpoints(1, Some(1), Some(1), None) - } yield assertTrue(result == ResultHasMore(Seq(checkpointV2DTO1, checkpointV2DTO2))) + } yield assertTrue(result == ResultHasMore(Seq(checkpointWithPartitioningDTO1))) }, test("Returns expected Right with CheckpointV2DTO") { for { result <- FlowRepository.getFlowCheckpoints(2, Some(1), Some(1), None) - } yield assertTrue(result == ResultNoMore(Seq.empty[CheckpointV2DTO])) + } yield assertTrue(result == ResultNoMore(Seq.empty[CheckpointWithPartitioningDTO])) }, test("Returns expected DatabaseError") { assertZIO(FlowRepository.getFlowCheckpoints(3, None, None, None).exit)( diff --git a/server/src/test/scala/za/co/absa/atum/server/api/service/FlowServiceUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/service/FlowServiceUnitTests.scala index c1bc930cb..7dd687ea0 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/service/FlowServiceUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/service/FlowServiceUnitTests.scala @@ -30,7 +30,7 @@ object FlowServiceUnitTests extends ZIOSpecDefault with TestData { private val flowRepositoryMock = mock(classOf[FlowRepository]) when(flowRepositoryMock.getFlowCheckpoints(1L, None, None, None)) - .thenReturn(ZIO.succeed(ResultHasMore(Seq(checkpointV2DTO1)))) + .thenReturn(ZIO.succeed(ResultHasMore(Seq(checkpointWithPartitioningDTO1)))) when(flowRepositoryMock.getFlowCheckpoints(2L, None, None, None)) .thenReturn(ZIO.fail(NotFoundDatabaseError("Flow not found"))) @@ -44,7 +44,7 @@ object FlowServiceUnitTests extends ZIOSpecDefault with TestData { for { result <- FlowService.getFlowCheckpoints(1L, None, None, None) } yield assertTrue { - result == ResultHasMore(Seq(checkpointV2DTO1)) + result == ResultHasMore(Seq(checkpointWithPartitioningDTO1)) } }, test("Returns expected ServiceError") { diff --git a/server/src/test/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDBUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDBUnitTests.scala new file mode 100644 index 000000000..c2107f5ef --- /dev/null +++ b/server/src/test/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDBUnitTests.scala @@ -0,0 +1,70 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.model + +import org.scalatest.funsuite.AnyFunSuiteLike +import za.co.absa.atum.server.api.TestData + +import java.time.ZonedDateTime +import java.util.UUID + +class CheckpointItemWithPartitioningFromDBUnitTests extends AnyFunSuiteLike with TestData { + + test("groupAndConvertItemsToCheckpointWithPartitioningDTOs should group by idCheckpoint and sort by checkpointStartTime in descending order") { + val checkpointItem1 = CheckpointItemWithPartitioningFromDB( + idCheckpoint = UUID.randomUUID(), + checkpointName = "checkpoint1", + author = "author1", + measuredByAtumAgent = true, + measureName = "measure1", + measuredColumns = Seq("col1"), + measurementValue = measurementValue1, + checkpointStartTime = ZonedDateTime.now().minusDays(1), + checkpointEndTime = Some(ZonedDateTime.now()), + idPartitioning = 1L, + partitioning = partitioningAsJson, + partitioningAuthor = "author1", + hasMore = false + ) + + val checkpointItem2 = CheckpointItemWithPartitioningFromDB( + idCheckpoint = UUID.randomUUID(), + checkpointName = "checkpoint2", + author = "author2", + measuredByAtumAgent = false, + measureName = "measure2", + measuredColumns = Seq("col2"), + measurementValue = measurementValue1, + checkpointStartTime = ZonedDateTime.now(), + checkpointEndTime = Some(ZonedDateTime.now().plusDays(1)), + idPartitioning = 2L, + partitioning = partitioningAsJson, + partitioningAuthor = "author2", + hasMore = true + ) + + val checkpointItems = Seq(checkpointItem1, checkpointItem2) + + val result = CheckpointItemWithPartitioningFromDB.groupAndConvertItemsToCheckpointWithPartitioningDTOs(checkpointItems) + + assert(result.isRight) + val checkpoints = result.toOption.get + + assert(checkpoints.size == 2) + assert(checkpoints.head.processStartTime.isAfter(checkpoints(1).processStartTime)) + } +} From df7119d077f506922bff478f23b7085313dab55d Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Thu, 21 Nov 2024 11:44:40 +0100 Subject: [PATCH 04/19] ignore compatibility test --- .../za/co/absa/atum/agent/AgentServerCompatibilityTests.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/agent/src/test/scala/za/co/absa/atum/agent/AgentServerCompatibilityTests.scala b/agent/src/test/scala/za/co/absa/atum/agent/AgentServerCompatibilityTests.scala index 992aabe12..591b5c779 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/AgentServerCompatibilityTests.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/AgentServerCompatibilityTests.scala @@ -22,10 +22,12 @@ import za.co.absa.atum.agent.model.AtumMeasure.RecordCount import za.co.absa.balta.DBTestSuite import za.co.absa.balta.classes.JsonBString import com.typesafe.config.{ConfigFactory, ConfigValueFactory} +import org.scalatest.Ignore import za.co.absa.atum.agent.dispatcher.HttpDispatcher import scala.collection.immutable.ListMap +@Ignore class AgentServerCompatibilityTests extends DBTestSuite { private val testDataForRDD = Seq( From ce05796d2d61620584827def72afdb4d827d525b Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Thu, 21 Nov 2024 11:50:17 +0100 Subject: [PATCH 05/19] ignore compatibility test --- .../za/co/absa/atum/agent/AgentServerCompatibilityTests.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/agent/src/test/scala/za/co/absa/atum/agent/AgentServerCompatibilityTests.scala b/agent/src/test/scala/za/co/absa/atum/agent/AgentServerCompatibilityTests.scala index 591b5c779..d720100f1 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/AgentServerCompatibilityTests.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/AgentServerCompatibilityTests.scala @@ -22,12 +22,10 @@ import za.co.absa.atum.agent.model.AtumMeasure.RecordCount import za.co.absa.balta.DBTestSuite import za.co.absa.balta.classes.JsonBString import com.typesafe.config.{ConfigFactory, ConfigValueFactory} -import org.scalatest.Ignore import za.co.absa.atum.agent.dispatcher.HttpDispatcher import scala.collection.immutable.ListMap -@Ignore class AgentServerCompatibilityTests extends DBTestSuite { private val testDataForRDD = Seq( @@ -42,7 +40,7 @@ class AgentServerCompatibilityTests extends DBTestSuite { .add(StructField("columnForSum", DoubleType)) // Need to add service & pg run in CI - test("Agent should be compatible with server") { + ignore("Agent should be compatible with server") { val expectedMeasurement = JsonBString( """{"mainValue": {"value": "4", "valueType": "Long"}, "supportValues": {}}""".stripMargin From 52a9243881b41a512f381f679385a9346028de32 Mon Sep 17 00:00:00 2001 From: salamonpavel Date: Fri, 22 Nov 2024 14:15:13 +0100 Subject: [PATCH 06/19] Update database/README.md Co-authored-by: David Benedeki <14905969+benedeki@users.noreply.github.com> --- database/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/database/README.md b/database/README.md index 923fd0c40..ebdbe0801 100644 --- a/database/README.md +++ b/database/README.md @@ -9,7 +9,7 @@ How to set up database for local testing docker run --name=atum_db -e POSTGRES_PASSWORD=postgres -e POSTGRES_DB=atum_db -p 5432:5432 -d postgres:16 # migrate scripts -sbt flywayMigrate -Dflyway.baselineVersion=0.1.0.1 -Dflyway.baselineOnMigrate=true +sbt flywayMigrate # kill & remove docker container (optional; only if using dockerized postgres instance) docker kill atum_db From 0ac613af22ef9d1b909021187fc2ca0c7102fb62 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Fri, 22 Nov 2024 14:23:19 +0100 Subject: [PATCH 07/19] o_partitioning renamings --- .../postgres/flows/V0.3.0.1__get_flow_checkpoints.sql | 6 +++--- .../flows/GetFlowCheckpointsIntegrationTests.scala | 8 ++++---- .../api/database/flows/functions/GetFlowCheckpoints.scala | 2 +- 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/database/src/main/postgres/flows/V0.3.0.1__get_flow_checkpoints.sql b/database/src/main/postgres/flows/V0.3.0.1__get_flow_checkpoints.sql index 122e3c7cb..b1946a6de 100644 --- a/database/src/main/postgres/flows/V0.3.0.1__get_flow_checkpoints.sql +++ b/database/src/main/postgres/flows/V0.3.0.1__get_flow_checkpoints.sql @@ -38,7 +38,7 @@ CREATE OR REPLACE FUNCTION flows.get_flow_checkpoints( OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE, OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE, OUT id_partitioning BIGINT, - OUT o_partitioning JSONB, + OUT partitioning JSONB, OUT partitioning_author TEXT, OUT has_more BOOLEAN ) RETURNS SETOF record AS @@ -80,7 +80,7 @@ $$ -- checkpoint_start_time - Time of the checkpoint -- checkpoint_end_time - End time of the checkpoint computation -- id_partitioning - ID of the partitioning --- o_partitioning - Partitioning value +-- partitioning - Partitioning value -- partitioning_author - Author of the partitioning -- has_more - flag indicating whether there are more checkpoints available, always `false` if `i_limit` is NULL -- @@ -148,7 +148,7 @@ BEGIN LC.process_start_time AS checkpoint_start_time, LC.process_end_time AS checkpoint_end_time, LC.fk_partitioning AS id_partitioning, - P.partitioning AS o_partitioning, + P.partitioning AS partitioning, P.created_by AS partitioning_author, _has_more AS has_more FROM diff --git a/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowCheckpointsIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowCheckpointsIntegrationTests.scala index 1c0a84bc6..173fd8258 100644 --- a/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowCheckpointsIntegrationTests.scala +++ b/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowCheckpointsIntegrationTests.scala @@ -394,7 +394,7 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { assert(row1.getOffsetDateTime("checkpoint_end_time").contains(endTime3)) assert(row1.getLong("id_partitioning").contains(partitioningId)) val expectingPartitioningJson1 = parseJsonBStringOrThrow(partitioning) - val returnedPartitioningJson1 = parseJsonBStringOrThrow(row1.getJsonB("o_partitioning").get) + val returnedPartitioningJson1 = parseJsonBStringOrThrow(row1.getJsonB("partitioning").get) assert(expectingPartitioningJson1 == returnedPartitioningJson1) assert(row1.getString("partitioning_author").contains("Joseph")) @@ -415,7 +415,7 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { assert(row2.getOffsetDateTime("checkpoint_end_time").contains(endTime3)) assert(row2.getLong("id_partitioning").contains(partitioningId)) val expectingPartitioningJson2 = parseJsonBStringOrThrow(partitioning) - val returnedPartitioningJson2 = parseJsonBStringOrThrow(row2.getJsonB("o_partitioning").get) + val returnedPartitioningJson2 = parseJsonBStringOrThrow(row2.getJsonB("partitioning").get) assert(expectingPartitioningJson2 == returnedPartitioningJson2) assert(row2.getString("partitioning_author").contains("Joseph")) @@ -438,7 +438,7 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { assert(row3.getOffsetDateTime("checkpoint_end_time").contains(endTime1)) assert(row3.getLong("id_partitioning").contains(partitioningId)) val expectingPartitioningJson3 = parseJsonBStringOrThrow(partitioning) - val returnedPartitioningJson3 = parseJsonBStringOrThrow(row2.getJsonB("o_partitioning").get) + val returnedPartitioningJson3 = parseJsonBStringOrThrow(row2.getJsonB("partitioning").get) assert(expectingPartitioningJson3 == returnedPartitioningJson3) assert(row3.getString("partitioning_author").contains("Joseph")) @@ -459,7 +459,7 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { assert(row4.getOffsetDateTime("checkpoint_end_time").contains(endTime1)) assert(row4.getLong("id_partitioning").contains(partitioningId)) val expectingPartitioningJson4 = parseJsonBStringOrThrow(partitioning) - val returnedPartitioningJson4 = parseJsonBStringOrThrow(row2.getJsonB("o_partitioning").get) + val returnedPartitioningJson4 = parseJsonBStringOrThrow(row2.getJsonB("partitioning").get) assert(expectingPartitioningJson4 == returnedPartitioningJson4) assert(row4.getString("partitioning_author").contains("Joseph")) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpoints.scala index 4289b9664..77858873c 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpoints.scala @@ -56,7 +56,7 @@ class GetFlowCheckpoints(implicit schema: DBSchema, dbEngine: DoobieEngine[Task] "checkpoint_start_time", "checkpoint_end_time", "id_partitioning", - "o_partitioning", + "partitioning", "partitioning_author", "has_more" ) From e0f40a5a6c5feebf06001208ea33a679f648d3e0 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Fri, 22 Nov 2024 14:24:51 +0100 Subject: [PATCH 08/19] desc --- .../main/postgres/flows/V0.3.0.1__get_flow_checkpoints.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/database/src/main/postgres/flows/V0.3.0.1__get_flow_checkpoints.sql b/database/src/main/postgres/flows/V0.3.0.1__get_flow_checkpoints.sql index b1946a6de..ae1d45002 100644 --- a/database/src/main/postgres/flows/V0.3.0.1__get_flow_checkpoints.sql +++ b/database/src/main/postgres/flows/V0.3.0.1__get_flow_checkpoints.sql @@ -132,7 +132,7 @@ BEGIN JOIN flows.partitioning_to_flow PF ON C.fk_partitioning = PF.fk_partitioning WHERE PF.fk_flow = i_flow_id AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name) - ORDER BY C.process_start_time desc + ORDER BY C.process_start_time DESC LIMIT i_checkpoints_limit OFFSET i_offset ) SELECT @@ -159,7 +159,7 @@ BEGIN runs.measure_definitions MD ON M.fk_measure_definition = MD.id_measure_definition INNER JOIN runs.partitionings P ON LC.fk_partitioning = P.id_partitioning - ORDER BY LC.process_start_time desc; + ORDER BY LC.process_start_time DESC; END; $$ LANGUAGE plpgsql VOLATILE SECURITY DEFINER; From 353af66fe97d82c41c0eb9f91d82c36fa528bf72 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Fri, 22 Nov 2024 14:33:15 +0100 Subject: [PATCH 09/19] orderedCheckpointIds --- .../server/model/CheckpointItemWithPartitioningFromDB.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDB.scala b/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDB.scala index fe2ce31a8..5158b10be 100644 --- a/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDB.scala +++ b/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDB.scala @@ -106,12 +106,12 @@ object CheckpointItemWithPartitioningFromDB { checkpointItems: Seq[CheckpointItemWithPartitioningFromDB] ): Either[Throwable, Seq[CheckpointWithPartitioningDTO]] = { val groupedItems = checkpointItems.groupBy(_.idCheckpoint) - val orderedIds = checkpointItems + val orderedCheckpointIds = checkpointItems .sortBy(_.checkpointStartTime)(Ordering[ZonedDateTime].reverse) .map(_.idCheckpoint) .distinct - val result = orderedIds.map { id => + val result = orderedCheckpointIds.map { id => CheckpointItemWithPartitioningFromDB.fromItemsToCheckpointWithPartitioningDTO(groupedItems(id)) } From 0c520c52ab643242d88c6fe95aa8eafc412ed617 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Fri, 22 Nov 2024 14:34:28 +0100 Subject: [PATCH 10/19] removed needless prefix --- .../server/model/CheckpointItemWithPartitioningFromDB.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDB.scala b/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDB.scala index 5158b10be..73653938c 100644 --- a/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDB.scala +++ b/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDB.scala @@ -111,9 +111,7 @@ object CheckpointItemWithPartitioningFromDB { .map(_.idCheckpoint) .distinct - val result = orderedCheckpointIds.map { id => - CheckpointItemWithPartitioningFromDB.fromItemsToCheckpointWithPartitioningDTO(groupedItems(id)) - } + val result = orderedCheckpointIds.map { id => fromItemsToCheckpointWithPartitioningDTO(groupedItems(id)) } val errors = result.collect { case Left(err) => err } if (errors.nonEmpty) { From 7e4513ad3f68d8fdf48422de435fd700e1971b19 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Fri, 22 Nov 2024 14:38:27 +0100 Subject: [PATCH 11/19] refactor groupAndConvertItemsToCheckpointWithPartitioningDTOs --- .../model/CheckpointItemWithPartitioningFromDB.scala | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDB.scala b/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDB.scala index 73653938c..996bb207c 100644 --- a/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDB.scala +++ b/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDB.scala @@ -113,12 +113,8 @@ object CheckpointItemWithPartitioningFromDB { val result = orderedCheckpointIds.map { id => fromItemsToCheckpointWithPartitioningDTO(groupedItems(id)) } - val errors = result.collect { case Left(err) => err } - if (errors.nonEmpty) { - Left(errors.head) - } else { - Right(result.collect { case Right(dto) => dto }) - } + val error = result.collectFirst { case Left(err) => Left(err) } + error.getOrElse(Right(result.collect { case Right(dto) => dto })) } } From 30c0766c6624489a7ee167a47781870a3fd42dca Mon Sep 17 00:00:00 2001 From: salamonpavel Date: Fri, 22 Nov 2024 14:44:03 +0100 Subject: [PATCH 12/19] Update server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDB.scala Co-authored-by: David Benedeki <14905969+benedeki@users.noreply.github.com> --- ...CheckpointItemWithPartitioningFromDB.scala | 23 ++++++++----------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDB.scala b/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDB.scala index 996bb207c..76691b5ff 100644 --- a/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDB.scala +++ b/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDB.scala @@ -63,19 +63,16 @@ object CheckpointItemWithPartitioningFromDB { } val partitioningOrErr: Either[DecodingFailure, PartitioningWithIdDTO] = { val decodingResult = checkpointItems.head.partitioning.as[PartitioningForDB] - decodingResult match { - case Left(decodingFailure) => Left(decodingFailure) - case Right(partitioningForDB) => - val partitioningDTO = partitioningForDB.keys.map { key => - PartitionDTO(key, partitioningForDB.keysToValuesMap(key)) - } - Right( - PartitioningWithIdDTO( - id = checkpointItems.head.idPartitioning, - partitioning = partitioningDTO, - author = checkpointItems.head.partitioningAuthor - ) - ) + decodingResult.map{ partitioningForDB => + val partitioningDTO = partitioningForDB.keys.map { key => + PartitionDTO(key, partitioningForDB.keysToValuesMap(key)) + } + PartitioningWithIdDTO( + id = checkpointItems.head.idPartitioning, + partitioning = partitioningDTO, + author = checkpointItems.head.partitioningAuthor + ) + } } } From 93a64f087330ea41a08160d3e2db02a4e621d04f Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Fri, 22 Nov 2024 14:48:54 +0100 Subject: [PATCH 13/19] refactor fromItemsToCheckpointWithPartitioningDTO --- ...CheckpointItemWithPartitioningFromDB.scala | 70 ++++++++++--------- 1 file changed, 36 insertions(+), 34 deletions(-) diff --git a/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDB.scala b/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDB.scala index 76691b5ff..8f4062ffc 100644 --- a/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDB.scala +++ b/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDB.scala @@ -16,7 +16,7 @@ package za.co.absa.atum.server.model -import io.circe.{DecodingFailure, Json} +import io.circe.Json import za.co.absa.atum.model.dto.{ CheckpointWithPartitioningDTO, MeasureDTO, @@ -50,6 +50,26 @@ object CheckpointItemWithPartitioningFromDB { private def fromItemsToCheckpointWithPartitioningDTO( checkpointItems: Seq[CheckpointItemWithPartitioningFromDB] ): Either[Throwable, CheckpointWithPartitioningDTO] = { + for { + measurements <- extractMeasurements(checkpointItems) + partitioning <- extractPartitioning(checkpointItems) + } yield { + CheckpointWithPartitioningDTO( + id = checkpointItems.head.idCheckpoint, + name = checkpointItems.head.checkpointName, + author = checkpointItems.head.author, + measuredByAtumAgent = checkpointItems.head.measuredByAtumAgent, + processStartTime = checkpointItems.head.checkpointStartTime, + processEndTime = checkpointItems.head.checkpointEndTime, + measurements = measurements.toSet, + partitioning + ) + } + } + + private def extractMeasurements( + checkpointItems: Seq[CheckpointItemWithPartitioningFromDB] + ): Either[Throwable, Seq[MeasurementDTO]] = { val measurementsOrErr = checkpointItems.map { checkpointItem => checkpointItem.measurementValue.as[MeasureResultDTO].map { measureResult => MeasurementDTO( @@ -61,40 +81,22 @@ object CheckpointItemWithPartitioningFromDB { ) } } - val partitioningOrErr: Either[DecodingFailure, PartitioningWithIdDTO] = { - val decodingResult = checkpointItems.head.partitioning.as[PartitioningForDB] - decodingResult.map{ partitioningForDB => - val partitioningDTO = partitioningForDB.keys.map { key => - PartitionDTO(key, partitioningForDB.keysToValuesMap(key)) - } - PartitioningWithIdDTO( - id = checkpointItems.head.idPartitioning, - partitioning = partitioningDTO, - author = checkpointItems.head.partitioningAuthor - ) - } - } - - } - - val measurementsErrors = measurementsOrErr.collect { case Left(err) => err } - val errors = measurementsErrors ++ partitioningOrErr.left.toSeq + measurementsOrErr + .collectFirst { case Left(err) => Left(err) } + .getOrElse(Right(measurementsOrErr.collect { case Right(measurement) => measurement })) + } - if (errors.nonEmpty) { - Left(measurementsErrors.head) - } else { - val measurements = measurementsOrErr.collect { case Right(measurement) => measurement }.toSet - Right( - CheckpointWithPartitioningDTO( - id = checkpointItems.head.idCheckpoint, - name = checkpointItems.head.checkpointName, - author = checkpointItems.head.author, - measuredByAtumAgent = checkpointItems.head.measuredByAtumAgent, - processStartTime = checkpointItems.head.checkpointStartTime, - processEndTime = checkpointItems.head.checkpointEndTime, - measurements = measurements, - partitioningOrErr.toOption.get - ) + private def extractPartitioning( + checkpointItems: Seq[CheckpointItemWithPartitioningFromDB] + ): Either[Throwable, PartitioningWithIdDTO] = { + checkpointItems.head.partitioning.as[PartitioningForDB].map { partitioningForDB => + val partitioningDTO = partitioningForDB.keys.map { key => + PartitionDTO(key, partitioningForDB.keysToValuesMap(key)) + } + PartitioningWithIdDTO( + id = checkpointItems.head.idPartitioning, + partitioning = partitioningDTO, + author = checkpointItems.head.partitioningAuthor ) } } From b5122c865f6cf6496c5f2489e01a6c3ae4e27453 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Mon, 25 Nov 2024 09:33:22 +0100 Subject: [PATCH 14/19] switch order of actual and expected in asserts --- .../GetFlowCheckpointsIntegrationTests.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowCheckpointsIntegrationTests.scala b/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowCheckpointsIntegrationTests.scala index 173fd8258..a22b40136 100644 --- a/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowCheckpointsIntegrationTests.scala +++ b/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowCheckpointsIntegrationTests.scala @@ -393,9 +393,9 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { assert(row1.getOffsetDateTime("checkpoint_start_time").contains(startTime3)) assert(row1.getOffsetDateTime("checkpoint_end_time").contains(endTime3)) assert(row1.getLong("id_partitioning").contains(partitioningId)) - val expectingPartitioningJson1 = parseJsonBStringOrThrow(partitioning) + val expectedPartitioningJson1 = parseJsonBStringOrThrow(partitioning) val returnedPartitioningJson1 = parseJsonBStringOrThrow(row1.getJsonB("partitioning").get) - assert(expectingPartitioningJson1 == returnedPartitioningJson1) + assert(returnedPartitioningJson1 == expectedPartitioningJson1) assert(row1.getString("partitioning_author").contains("Joseph")) val measure1 = MeasuredDetails( @@ -414,9 +414,9 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { assert(row2.getOffsetDateTime("checkpoint_start_time").contains(startTime3)) assert(row2.getOffsetDateTime("checkpoint_end_time").contains(endTime3)) assert(row2.getLong("id_partitioning").contains(partitioningId)) - val expectingPartitioningJson2 = parseJsonBStringOrThrow(partitioning) + val expectedPartitioningJson2 = parseJsonBStringOrThrow(partitioning) val returnedPartitioningJson2 = parseJsonBStringOrThrow(row2.getJsonB("partitioning").get) - assert(expectingPartitioningJson2 == returnedPartitioningJson2) + assert(returnedPartitioningJson2 == expectedPartitioningJson2) assert(row2.getString("partitioning_author").contains("Joseph")) val measure2 = MeasuredDetails( @@ -437,9 +437,9 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { assert(row3.getOffsetDateTime("checkpoint_start_time").contains(startTime1)) assert(row3.getOffsetDateTime("checkpoint_end_time").contains(endTime1)) assert(row3.getLong("id_partitioning").contains(partitioningId)) - val expectingPartitioningJson3 = parseJsonBStringOrThrow(partitioning) + val expectedPartitioningJson3 = parseJsonBStringOrThrow(partitioning) val returnedPartitioningJson3 = parseJsonBStringOrThrow(row2.getJsonB("partitioning").get) - assert(expectingPartitioningJson3 == returnedPartitioningJson3) + assert(returnedPartitioningJson3 == expectedPartitioningJson3) assert(row3.getString("partitioning_author").contains("Joseph")) val measure3 = MeasuredDetails( @@ -458,9 +458,9 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite { assert(row4.getOffsetDateTime("checkpoint_start_time").contains(startTime1)) assert(row4.getOffsetDateTime("checkpoint_end_time").contains(endTime1)) assert(row4.getLong("id_partitioning").contains(partitioningId)) - val expectingPartitioningJson4 = parseJsonBStringOrThrow(partitioning) + val expectedPartitioningJson4 = parseJsonBStringOrThrow(partitioning) val returnedPartitioningJson4 = parseJsonBStringOrThrow(row2.getJsonB("partitioning").get) - assert(expectingPartitioningJson4 == returnedPartitioningJson4) + assert(returnedPartitioningJson4 == expectedPartitioningJson4) assert(row4.getString("partitioning_author").contains("Joseph")) val measure4 = MeasuredDetails( From 728ee5d7d73a5d88302c80bb937255065ed88f22 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Tue, 26 Nov 2024 09:11:22 +0100 Subject: [PATCH 15/19] explanatory comment added --- .../server/model/CheckpointItemWithPartitioningFromDB.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDB.scala b/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDB.scala index 8f4062ffc..f75b1472c 100644 --- a/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDB.scala +++ b/server/src/main/scala/za/co/absa/atum/server/model/CheckpointItemWithPartitioningFromDB.scala @@ -40,7 +40,7 @@ case class CheckpointItemWithPartitioningFromDB( checkpointStartTime: ZonedDateTime, checkpointEndTime: Option[ZonedDateTime], idPartitioning: Long, - partitioning: Json, + partitioning: Json, // JSON representation of `PartitioningForDB` partitioningAuthor: String, hasMore: Boolean ) From 4d0175b4543f6b13bc061b5bb0c5fdeea1b329db Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Tue, 26 Nov 2024 10:18:40 +0100 Subject: [PATCH 16/19] i_latest_first param and conditional sorting --- .../flows/V0.3.0.1__get_flow_checkpoints.sql | 20 ++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/database/src/main/postgres/flows/V0.3.0.1__get_flow_checkpoints.sql b/database/src/main/postgres/flows/V0.3.0.1__get_flow_checkpoints.sql index ae1d45002..2bbca736a 100644 --- a/database/src/main/postgres/flows/V0.3.0.1__get_flow_checkpoints.sql +++ b/database/src/main/postgres/flows/V0.3.0.1__get_flow_checkpoints.sql @@ -26,6 +26,7 @@ CREATE OR REPLACE FUNCTION flows.get_flow_checkpoints( IN i_checkpoints_limit INT DEFAULT 5, IN i_offset BIGINT DEFAULT 0, IN i_checkpoint_name TEXT DEFAULT NULL, + IN i_latest_first BOOLEAN DEFAULT TRUE, OUT status INTEGER, OUT status_text TEXT, OUT id_checkpoint UUID, @@ -61,6 +62,7 @@ $$ -- i_checkpoints_limit - (optional) maximum number of checkpoint to return, returns all of them if NULL -- i_offset - (optional) offset for checkpoints pagination -- i_checkpoint_name - (optional) if specified, returns data related to particular checkpoint's name +-- i_latest_first - (optional) if true, checkpoints are ordered by process_start_time in descending order -- -- Note: i_checkpoint_limit and i_offset are used for pagination purposes; -- checkpoints are ordered by process_start_time in descending order @@ -132,7 +134,13 @@ BEGIN JOIN flows.partitioning_to_flow PF ON C.fk_partitioning = PF.fk_partitioning WHERE PF.fk_flow = i_flow_id AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name) - ORDER BY C.process_start_time DESC + ORDER BY + CASE + WHEN i_latest_first THEN C.process_start_time + END DESC, + CASE + WHEN NOT i_latest_first THEN C.process_start_time + END LIMIT i_checkpoints_limit OFFSET i_offset ) SELECT @@ -159,9 +167,15 @@ BEGIN runs.measure_definitions MD ON M.fk_measure_definition = MD.id_measure_definition INNER JOIN runs.partitionings P ON LC.fk_partitioning = P.id_partitioning - ORDER BY LC.process_start_time DESC; + ORDER BY + CASE + WHEN i_latest_first THEN LC.process_start_time + END DESC, + CASE + WHEN NOT i_latest_first THEN LC.process_start_time + END; END; $$ LANGUAGE plpgsql VOLATILE SECURITY DEFINER; -GRANT EXECUTE ON FUNCTION flows.get_flow_checkpoints(BIGINT, INT, BIGINT, TEXT) TO atum_owner; +GRANT EXECUTE ON FUNCTION flows.get_flow_checkpoints(BIGINT, INT, BIGINT, TEXT, BOOLEAN) TO atum_owner; From 877fac302e260d6c1060ef85a8ddab77a229a608 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Tue, 26 Nov 2024 10:22:37 +0100 Subject: [PATCH 17/19] remove .bloop --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 68eb03cb2..a864bff58 100644 --- a/.gitignore +++ b/.gitignore @@ -91,3 +91,4 @@ utils/resources/*.conf /server/certs/ /server/selfsigned.crt /server/selfsigned.p12 +/.bloop/ From 57c5bbe81ddc84b69f7cb58cb1cb03930c3cd4e2 Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Tue, 26 Nov 2024 11:13:40 +0100 Subject: [PATCH 18/19] explicit asc ordering --- .../main/postgres/flows/V0.3.0.1__get_flow_checkpoints.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/database/src/main/postgres/flows/V0.3.0.1__get_flow_checkpoints.sql b/database/src/main/postgres/flows/V0.3.0.1__get_flow_checkpoints.sql index 2bbca736a..9bb483ae7 100644 --- a/database/src/main/postgres/flows/V0.3.0.1__get_flow_checkpoints.sql +++ b/database/src/main/postgres/flows/V0.3.0.1__get_flow_checkpoints.sql @@ -140,7 +140,7 @@ BEGIN END DESC, CASE WHEN NOT i_latest_first THEN C.process_start_time - END + END ASC LIMIT i_checkpoints_limit OFFSET i_offset ) SELECT @@ -173,7 +173,7 @@ BEGIN END DESC, CASE WHEN NOT i_latest_first THEN LC.process_start_time - END; + END ASC; END; $$ LANGUAGE plpgsql VOLATILE SECURITY DEFINER; From 12466a00f62b0aed919b865a7a217b55344faa3d Mon Sep 17 00:00:00 2001 From: Pavel Salamon Date: Tue, 26 Nov 2024 15:07:31 +0100 Subject: [PATCH 19/19] use coalesce --- .../postgres/flows/V0.3.0.1__get_flow_checkpoints.sql | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/database/src/main/postgres/flows/V0.3.0.1__get_flow_checkpoints.sql b/database/src/main/postgres/flows/V0.3.0.1__get_flow_checkpoints.sql index 9bb483ae7..925235bd0 100644 --- a/database/src/main/postgres/flows/V0.3.0.1__get_flow_checkpoints.sql +++ b/database/src/main/postgres/flows/V0.3.0.1__get_flow_checkpoints.sql @@ -92,6 +92,7 @@ $$ --------------------------------------------------------------------------------------------------- DECLARE _has_more BOOLEAN; + _latest_first BOOLEAN := coalesce(i_latest_first, TRUE); BEGIN -- Check if the flow exists by querying the partitioning_to_flow table. -- Rationale: @@ -136,10 +137,10 @@ BEGIN AND (i_checkpoint_name IS NULL OR C.checkpoint_name = i_checkpoint_name) ORDER BY CASE - WHEN i_latest_first THEN C.process_start_time + WHEN _latest_first THEN C.process_start_time END DESC, CASE - WHEN NOT i_latest_first THEN C.process_start_time + WHEN NOT _latest_first THEN C.process_start_time END ASC LIMIT i_checkpoints_limit OFFSET i_offset ) @@ -169,10 +170,10 @@ BEGIN runs.partitionings P ON LC.fk_partitioning = P.id_partitioning ORDER BY CASE - WHEN i_latest_first THEN LC.process_start_time + WHEN _latest_first THEN LC.process_start_time END DESC, CASE - WHEN NOT i_latest_first THEN LC.process_start_time + WHEN NOT _latest_first THEN LC.process_start_time END ASC; END; $$