Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/224 post partitioning #258

Merged
merged 11 commits into from
Sep 17, 2024
93 changes: 93 additions & 0 deletions database/src/main/postgres/runs/V1.9.7__create_partitioning.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

CREATE OR REPLACE FUNCTION runs.create_partitioning(
IN i_partitioning JSONB,
IN i_by_user TEXT,
IN i_parent_partitioning_id BIGINT = NULL,
OUT status INTEGER,
OUT status_text TEXT,
OUT id_partitioning BIGINT
) RETURNS record AS
$$
-------------------------------------------------------------------------------
--
-- Function: runs.create_partitioning(3)
-- Creates a partitioning entry
--
-- Parameters:
-- i_partitioning - partitioning to create or which existence to check
-- i_by_user - user behind the change
-- i_parent_partitioning_id - (optional) parent partitioning id
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
-- i_parent_partitioning_id - (optional) parent partitioning id
-- i_parent_partitioning_id - (optional) parent partitioning id, must already exist if specified

or something like that

--
-- Returns:
-- status - Status code
-- status_text - Status text
-- id_partitioning - id of the partitioning
--
-- Status codes:
-- 11 - Partitioning created
-- 12 - Partitioning created with parent partitioning
-- 31 - Partitioning already exists
-- 41 - Parent partitioning not found
--
-------------------------------------------------------------------------------
BEGIN
id_partitioning := runs._get_id_partitioning(i_partitioning, true);

IF id_partitioning IS NOT NULL THEN
status := 31;
status_text := 'Partitioning already exists';
RETURN;
benedeki marked this conversation as resolved.
Show resolved Hide resolved
END IF;

IF i_parent_partitioning_id IS NOT NULL THEN
PERFORM 1 FROM runs.partitionings P WHERE P.id_partitioning = i_parent_partitioning_id;
IF NOT FOUND THEN
status := 41;
status_text := 'Parent partitioning not found';
RETURN;
END IF;
END IF;

INSERT INTO runs.partitionings (partitioning, created_by)
VALUES (i_partitioning, i_by_user)
RETURNING partitionings.id_partitioning INTO create_partitioning.id_partitioning;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know that these were possible :)


PERFORM 1 FROM flows._create_flow(id_partitioning, i_by_user);
status := 11;
status_text := 'Partitioning created';

IF i_parent_partitioning_id IS NOT NULL THEN
PERFORM 1 FROM flows._add_to_parent_flows(i_parent_partitioning_id, id_partitioning, i_by_user);

-- copying measure definitions to establish continuity
INSERT INTO runs.measure_definitions(fk_partitioning, measure_name, measured_columns, created_by, created_at)
SELECT id_partitioning, CMD.measure_name, CMD.measured_columns, CMD.created_by, CMD.created_at
FROM runs.measure_definitions CMD
WHERE CMD.fk_partitioning = i_parent_partitioning_id;

status := 12;
status_text := 'Partitioning created with parent partitioning';
END IF;

RETURN;
END;
$$
LANGUAGE plpgsql VOLATILE SECURITY DEFINER;

ALTER FUNCTION runs.create_partitioning(JSONB, TEXT, BIGINT) OWNER TO atum_owner;
GRANT EXECUTE ON FUNCTION runs.create_partitioning(JSONB, TEXT, BIGINT) TO atum_user;
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.database.runs

import za.co.absa.balta.DBTestSuite
import za.co.absa.balta.classes.JsonBString

class CreatePartitioningIntegrationTests extends DBTestSuite{

private val fncCreatePartitioning = "runs.create_partitioning"

private val partitioning = JsonBString(
"""
|{
| "version": 1,
| "keys": ["key1", "key3", "key2", "key4"],
| "keysToValues": {
| "key1": "valueX",
| "key2": "valueY",
| "key3": "valueZ",
| "key4": "valueA"
| }
|}
|""".stripMargin
)

private val parentPartitioning = JsonBString(
"""
|{
| "version": 1,
| "keys": ["key1", "key3"],
| "keysToValues": {
| "key1": "valueX",
| "key3": "valueZ"
| }
|}
|""".stripMargin
)

test("Partitioning created") {
val partitioningID = function(fncCreatePartitioning)
.setParam("i_partitioning", partitioning)
.setParam("i_by_user", "Fantômas")
.setParamNull("i_parent_partitioning_id")
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(11))
assert(row.getString("status_text").contains("Partitioning created"))
row.getLong("id_partitioning").get
}

table("runs.partitionings").where(add("id_partitioning", partitioningID)) {partitioningResult =>
val row = partitioningResult.next()
assert(row.getString("created_by").contains("Fantômas"))
assert(row.getOffsetDateTime("created_at").contains(now()))
}

val idFlow = table("flows.partitioning_to_flow").where(add("fk_partitioning", partitioningID)) { partToFlowResult =>
assert(partToFlowResult.hasNext)
val partToFlowRow = partToFlowResult.next()
val result = partToFlowRow.getLong("fk_flow")
assert(partToFlowRow.getString("created_by").contains("Fantômas"))
assert(!partToFlowResult.hasNext)
result.get
}

table("flows.flows").where(add("id_flow", idFlow)) {flowsResult =>
assert(flowsResult.hasNext)
val flowRow = flowsResult.next()
assert(flowRow.getString("flow_name").exists(_.startsWith("Custom flow #")))
assert(flowRow.getString("flow_description").contains(""))
assert(flowRow.getBoolean("from_pattern").contains(false))
assert(flowRow.getString("created_by").contains("Fantômas"))
assert(flowRow.getOffsetDateTime("created_at").contains(now()))
assert(!flowsResult.hasNext)
}
}

test("Partitioning created with parent partitioning that already exists") {
val parentPartitioningID = function(fncCreatePartitioning)
.setParam("i_partitioning", parentPartitioning)
.setParam("i_by_user", "Albert Einstein")
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(11))
assert(row.getString("status_text").contains("Partitioning created"))
row.getLong("id_partitioning").get
}

assert(
table("flows.partitioning_to_flow").count(add("fk_partitioning", parentPartitioningID)) == 1
)
val partitioningID = function(fncCreatePartitioning)
.setParam("i_partitioning", partitioning)
.setParam("i_by_user", "Fantômas")
.setParam("i_parent_partitioning_id", parentPartitioningID)
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(12))
assert(row.getString("status_text").contains("Partitioning created with parent partitioning"))
row.getLong("id_partitioning").get
}

assert(
table("flows.partitioning_to_flow").count(add("fk_partitioning", parentPartitioningID)) == 1
)
assert(
table("flows.partitioning_to_flow").count(add("fk_partitioning", partitioningID)) == 2
)
}

test("Partitioning already exists") {
val partitioningID = function(fncCreatePartitioning)
.setParam("i_partitioning", partitioning)
.setParam("i_by_user", "Fantômas")
.setParamNull("i_parent_partitioning_id")
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(11))
assert(row.getString("status_text").contains("Partitioning created"))
row.getLong("id_partitioning").get
}

function(fncCreatePartitioning)
.setParam("i_partitioning", partitioning)
.setParam("i_by_user", "Fantômas")
.setParamNull("i_parent_partitioning_id")
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(31))
assert(row.getString("status_text").contains("Partitioning already exists"))
assert(row.getLong("id_partitioning").contains(partitioningID))
}

assert(
table("flows.partitioning_to_flow").count(add("fk_partitioning", partitioningID)) == 1
)
}

test("Partitioning exists, parent is not added") {
val partitioningID = function(fncCreatePartitioning)
.setParam("i_partitioning", partitioning)
.setParam("i_by_user", "Fantômas")
.setParamNull("i_parent_partitioning_id")
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(11))
assert(row.getString("status_text").contains("Partitioning created"))
row.getLong("id_partitioning").get
}

assert(
table("flows.partitioning_to_flow").count(add("fk_partitioning", partitioningID)) == 1
)

function(fncCreatePartitioning)
.setParam("i_partitioning", partitioning)
.setParam("i_by_user", "Fantômas")
.setParam("i_parent_partitioning_id", 123456789L)
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(31))
assert(row.getString("status_text").contains("Partitioning already exists"))
}

assert(
table("flows.partitioning_to_flow").count(add("fk_partitioning", partitioningID)) == 1
)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.model.dto

import io.circe.generic.semiauto._
import io.circe._

case class PartitioningSubmitV2DTO(
partitioning: PartitioningDTO,
parentPartitioningId: Option[Long],
author: String
)

object PartitioningSubmitV2DTO {
implicit val decodePartitioningSubmitV2DTO: Decoder[PartitioningSubmitV2DTO] = deriveDecoder
implicit val encodePartitioningSubmitV2DTO: Encoder[PartitioningSubmitV2DTO] = deriveEncoder
}
1 change: 1 addition & 0 deletions server/src/main/scala/za/co/absa/atum/server/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ object Main extends ZIOAppDefault with Server {
CheckpointRepositoryImpl.layer,
FlowRepositoryImpl.layer,
CreatePartitioningIfNotExists.layer,
CreatePartitioning.layer,
GetPartitioningMeasures.layer,
GetPartitioningMeasuresById.layer,
GetPartitioningAdditionalData.layer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ trait PartitioningController {
partitioningSubmitDTO: PartitioningSubmitDTO
): IO[ErrorResponse, AtumContextDTO]

def createPartitioningIfNotExistsV2(
partitioningSubmitDTO: PartitioningSubmitDTO
): IO[ErrorResponse, SingleSuccessResponse[AtumContextDTO]]
def postPartitioning(
partitioningSubmitDTO: PartitioningSubmitV2DTO
): IO[ErrorResponse, (SingleSuccessResponse[PartitioningWithIdDTO], String)]

def getPartitioningAdditionalDataV2(
partitioningId: Long
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package za.co.absa.atum.server.api.controller

import za.co.absa.atum.model.dto._
import za.co.absa.atum.server.api.exception.ServiceError
import za.co.absa.atum.server.api.http.ApiPaths.V2Paths
import za.co.absa.atum.server.api.service.PartitioningService
import za.co.absa.atum.server.model.{ErrorResponse, InternalServerErrorResponse}
import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse}
Expand Down Expand Up @@ -49,12 +50,6 @@ class PartitioningControllerImpl(partitioningService: PartitioningService)
atumContextDTOEffect
}

override def createPartitioningIfNotExistsV2(
benedeki marked this conversation as resolved.
Show resolved Hide resolved
partitioningSubmitDTO: PartitioningSubmitDTO
): IO[ErrorResponse, SingleSuccessResponse[AtumContextDTO]] = {
mapToSingleSuccessResponse(createPartitioningIfNotExistsV1(partitioningSubmitDTO))
}

override def getPartitioningCheckpointsV2(
checkpointQueryDTO: CheckpointQueryDTO
): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] = {
Expand Down Expand Up @@ -84,6 +79,21 @@ class PartitioningControllerImpl(partitioningService: PartitioningService)
)
}

override def postPartitioning(
partitioningSubmitDTO: PartitioningSubmitV2DTO
): IO[ErrorResponse, (SingleSuccessResponse[PartitioningWithIdDTO], String)] = {
for {
response <- mapToSingleSuccessResponse(
serviceCall[PartitioningWithIdDTO, PartitioningWithIdDTO](
partitioningService.createPartitioning(partitioningSubmitDTO)
)
)
uri <- createV2RootAnchoredResourcePath(
Seq(V2Paths.Partitionings, response.data.id.toString)
)
} yield (response, uri)
}

override def patchPartitioningAdditionalDataV2(
partitioningId: Long,
additionalDataPatchDTO: AdditionalDataPatchDTO
Expand Down
Loading
Loading