Skip to content

Commit

Permalink
Feature/235 get flow partitionings (#267)
Browse files Browse the repository at this point in the history
get flow partitionings
  • Loading branch information
salamonpavel authored Sep 26, 2024
1 parent 43ec036 commit db4a867
Show file tree
Hide file tree
Showing 19 changed files with 778 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ CREATE OR REPLACE FUNCTION flows._add_to_parent_flows(
IN i_fk_partitioning BIGINT,
IN i_by_user TEXT,
OUT status INTEGER,
OUT status_text TEXT,
OUT id_flow BIGINT
OUT status_text TEXT
) RETURNS record AS
$$
-------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

CREATE OR REPLACE FUNCTION flows.get_flow_partitionings(
IN i_flow_id BIGINT,
IN i_limit INT DEFAULT 5,
IN i_offset BIGINT DEFAULT 0,
OUT status INTEGER,
OUT status_text TEXT,
OUT id BIGINT,
OUT partitioning JSONB,
OUT author TEXT,
OUT has_more BOOLEAN
) RETURNS SETOF record AS
-------------------------------------------------------------------------------
--
-- Function: flows.get_flow_partitionings(3)
-- Retrieves all partitionings associated with the input flow.
--
-- Note: partitionings will be retrieved in ordered fashion, by created_at column from runs.partitionings table
--
-- Parameters:
-- i_flow_id - flow id to use for identifying the partitionings that will be retrieved
-- i_limit - (optional) maximum number of partitionings to return, default is 5
-- i_offset - (optional) offset to use for pagination, default is 0
--
-- Returns:
-- status - Status code
-- status_text - Status text
-- id - ID of retrieved partitioning
-- partitioning - Partitioning value
-- author - Author of the partitioning
-- has_more - Flag indicating if there are more partitionings available
--
-- Status codes:
-- 11 - OK
-- 41 - Flow not found
--
-------------------------------------------------------------------------------
$$
DECLARE
_has_more BOOLEAN;
BEGIN
PERFORM 1 FROM flows.flows WHERE id_flow = i_flow_id;
IF NOT FOUND THEN
status := 41;
status_text := 'Flow not found';
RETURN NEXT;
RETURN;
END IF;

IF i_limit IS NOT NULL THEN
SELECT count(*) > i_limit
FROM flows.partitioning_to_flow PTF
WHERE PTF.fk_flow = i_flow_id
LIMIT i_limit + 1 OFFSET i_offset
INTO _has_more;
ELSE
_has_more := false;
END IF;


RETURN QUERY
SELECT
11 AS status,
'OK' AS status_text,
P.id_partitioning,
P.partitioning,
P.created_by,
_has_more
FROM
runs.partitionings P INNER JOIN
flows.partitioning_to_flow PF ON PF.fk_partitioning = P.id_partitioning
WHERE
PF.fk_flow = i_flow_id
ORDER BY
P.id_partitioning,
P.created_at DESC
LIMIT i_limit OFFSET i_offset;
END;
$$
LANGUAGE plpgsql VOLATILE SECURITY DEFINER;

GRANT EXECUTE ON FUNCTION flows.get_flow_partitionings(BIGINT, INT, BIGINT) TO atum_owner;
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.database.flows

import io.circe.Json
import io.circe.parser.parse
import za.co.absa.balta.DBTestSuite
import za.co.absa.balta.classes.JsonBString

class GetFlowPartitioningsIntegrationTests extends DBTestSuite {

private val getFlowPartitioningsFn = "flows.get_flow_partitionings"
private val createFlowFn = "flows._create_flow"
private val addToParentFlowsFn = "flows._add_to_parent_flows"

private val partitioningsTable = "runs.partitionings"

private val partitioning1 = JsonBString(
"""
|{
| "version": 1,
| "keys": ["keyA", "keyB", "keyC"],
| "keysToValues": {
| "keyA": "valueA",
| "keyB": "valueB",
| "keyC": "valueC"
| }
|}
|""".stripMargin
)

private val partitioning1Parent = JsonBString(
"""
|{
| "version": 1,
| "keys": ["keyA", "keyB"],
| "keysToValues": {
| "keyA": "valueA",
| "keyB": "valueB"
| }
|}
|""".stripMargin
)

private val partitioning2 = JsonBString(
"""
|{
| "version": 1,
| "keys": ["keyD", "keyE", "keyF"],
| "keysToValues": {
| "keyD": "valueD",
| "keyE": "valueE",
| "keyF": "valueF"
| }
|}
|""".stripMargin
)

var flowIdOfPartitioning1: Long = _
var flowIdOfParentPartitioning1: Long = _
var flowIdOfPartitioning2: Long = _
var flowIdOfPartitioning3: Long = _

test("Returns partitioning(s) for a given flow") {
table(partitioningsTable).insert(add("partitioning", partitioning1).add("created_by", "Joseph"))
table(partitioningsTable).insert(add("partitioning", partitioning1Parent).add("created_by", "Joseph"))
table(partitioningsTable).insert(add("partitioning", partitioning2).add("created_by", "Joseph"))

val partId1: Long = table(partitioningsTable)
.fieldValue("partitioning", partitioning1, "id_partitioning").get.get

val partId1Parent: Long = table(partitioningsTable)
.fieldValue("partitioning", partitioning1Parent, "id_partitioning").get.get

val partId2: Long = table(partitioningsTable)
.fieldValue("partitioning", partitioning2, "id_partitioning").get.get

function(createFlowFn)
.setParam("i_fk_partitioning", partId1)
.setParam("i_by_user", "Joseph")
.execute { queryResult =>
flowIdOfPartitioning1 = queryResult.next().getLong("id_flow").get
}

function(createFlowFn)
.setParam("i_fk_partitioning", partId1Parent)
.setParam("i_by_user", "Joseph")
.execute { queryResult =>
flowIdOfParentPartitioning1 = queryResult.next().getLong("id_flow").get
}

function(createFlowFn)
.setParam("i_fk_partitioning", partId2)
.setParam("i_by_user", "Joseph")
.execute { queryResult =>
flowIdOfPartitioning2 = queryResult.next().getLong("id_flow").get
}

function(addToParentFlowsFn)
.setParam("i_fk_parent_partitioning", partId1Parent)
.setParam("i_fk_partitioning", partId1)
.setParam("i_by_user", "Joseph")
.execute { queryResult =>
val result1 = queryResult.next()
assert(result1.getInt("status").get == 11)
assert(result1.getString("status_text").get == "Partitioning added to flows")
}

function(getFlowPartitioningsFn)
.setParam("i_flow_id", flowIdOfPartitioning1)
.setParam("i_limit", 1)
.execute { queryResult =>
val result1 = queryResult.next()
assert(result1.getInt("status").get == 11)
assert(result1.getString("status_text").get == "OK")
assert(result1.getLong("id").get == partId1)
val expectedPartitioningJson = parseJsonBStringOrThrow(partitioning1)
val returnedPartitioningJson = parseJsonBStringOrThrow(result1.getJsonB("partitioning").get)
assert(expectedPartitioningJson == returnedPartitioningJson)
assert(!result1.getBoolean("has_more").get)
assert(!queryResult.hasNext)
}

function(getFlowPartitioningsFn)
.setParam("i_flow_id", flowIdOfParentPartitioning1)
.setParam("i_limit", 1) // limit is set to 1, so only one partitioning should be returned and more data available
.execute { queryResult =>
val result1 = queryResult.next()
assert(result1.getInt("status").get == 11)
assert(result1.getString("status_text").get == "OK")
assert(result1.getLong("id").get == partId1)
val expectedPartitioningJson1 = parseJsonBStringOrThrow(partitioning1)
val returnedPartitioningJson1 = parseJsonBStringOrThrow(result1.getJsonB("partitioning").get)
assert(expectedPartitioningJson1 == returnedPartitioningJson1)
assert(result1.getBoolean("has_more").get)
assert(!queryResult.hasNext)
}

function(getFlowPartitioningsFn)
.setParam("i_flow_id", flowIdOfParentPartitioning1)
.setParam("i_limit", 2) // limit is set to 2, so both partitionings should be returned and no more data available
.execute { queryResult =>
val result1 = queryResult.next()
assert(result1.getInt("status").get == 11)
assert(result1.getString("status_text").get == "OK")
assert(result1.getLong("id").get == partId1)
val expectedPartitioningJson1 = parseJsonBStringOrThrow(partitioning1)
val returnedPartitioningJson1 = parseJsonBStringOrThrow(result1.getJsonB("partitioning").get)
assert(expectedPartitioningJson1 == returnedPartitioningJson1)
assert(!result1.getBoolean("has_more").get)
assert(queryResult.hasNext)
assert(queryResult.hasNext)
val result2 = queryResult.next()
assert(result2.getLong("id").get == partId1Parent)
val expectedPartitioningJson2 = parseJsonBStringOrThrow(partitioning1Parent)
val returnedPartitioningJson2 = parseJsonBStringOrThrow(result2.getJsonB("partitioning").get)
assert(expectedPartitioningJson2 == returnedPartitioningJson2)
assert(!result2.getBoolean("has_more").get)
assert(!queryResult.hasNext)
}
}

test("Fails for non-existent flow"){
function(getFlowPartitioningsFn)
.setParam("i_flow_id", 999999)
.setParam("i_limit", 1)
.execute { queryResult =>
val result1 = queryResult.next()
assert(result1.getInt("status").get == 41)
assert(result1.getString("status_text").get == "Flow not found")
assert(!queryResult.hasNext)
}
}

private def parseJsonBStringOrThrow(jsonBString: JsonBString): Json = {
parse(jsonBString.value).getOrElse(throw new Exception("Failed to parse JsonBString to Json"))
}

}
3 changes: 2 additions & 1 deletion server/src/main/scala/za/co/absa/atum/server/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package za.co.absa.atum.server

import za.co.absa.atum.server.api.controller._
import za.co.absa.atum.server.api.database.flows.functions.GetFlowCheckpoints
import za.co.absa.atum.server.api.database.flows.functions.{GetFlowCheckpoints, GetFlowPartitionings}
import za.co.absa.atum.server.api.database.{PostgresDatabaseProvider, TransactorProvider}
import za.co.absa.atum.server.api.database.runs.functions._
import za.co.absa.atum.server.api.http.Server
Expand Down Expand Up @@ -63,6 +63,7 @@ object Main extends ZIOAppDefault with Server {
GetPartitioningCheckpointV2.layer,
GetFlowCheckpoints.layer,
GetPartitioningById.layer,
GetFlowPartitionings.layer,
PostgresDatabaseProvider.layer,
TransactorProvider.layer,
AwsSecretsProviderImpl.layer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package za.co.absa.atum.server.api.controller

import za.co.absa.atum.model.dto._
import za.co.absa.atum.server.model.ErrorResponse
import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse}
import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, PaginatedResponse, SingleSuccessResponse}
import zio.IO
import zio.macros.accessible

Expand Down Expand Up @@ -46,4 +46,10 @@ trait PartitioningController {
def getPartitioningMeasuresV2(
partitioningId: Long
): IO[ErrorResponse, MultiSuccessResponse[MeasureDTO]]

def getFlowPartitionings(
flowId: Long,
limit: Option[Int],
offset: Option[Long]
): IO[ErrorResponse, PaginatedResponse[PartitioningWithIdDTO]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import za.co.absa.atum.model.dto._
import za.co.absa.atum.server.api.exception.ServiceError
import za.co.absa.atum.server.api.http.ApiPaths.V2Paths
import za.co.absa.atum.server.api.service.PartitioningService
import za.co.absa.atum.server.model.{ErrorResponse, InternalServerErrorResponse}
import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse}
import za.co.absa.atum.server.model.{ErrorResponse, InternalServerErrorResponse, PaginatedResult}
import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, PaginatedResponse, SingleSuccessResponse}
import zio._

class PartitioningControllerImpl(partitioningService: PartitioningService)
Expand Down Expand Up @@ -105,6 +105,20 @@ class PartitioningControllerImpl(partitioningService: PartitioningService)
)
}

override def getFlowPartitionings(
flowId: Long,
limit: Option[Int],
offset: Option[Long]
): IO[ErrorResponse, PaginatedResponse[PartitioningWithIdDTO]] = {
mapToPaginatedResponse(
limit.get,
offset.get,
serviceCall[PaginatedResult[PartitioningWithIdDTO], PaginatedResult[PartitioningWithIdDTO]](
partitioningService.getFlowPartitionings(flowId, limit, offset)
)
)
}

}

object PartitioningControllerImpl {
Expand Down
Loading

0 comments on commit db4a867

Please sign in to comment.