Skip to content

Commit

Permalink
patch additional data (#253)
Browse files Browse the repository at this point in the history
patch additional data
  • Loading branch information
salamonpavel authored Sep 3, 2024
1 parent 270c943 commit c27c776
Show file tree
Hide file tree
Showing 22 changed files with 371 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,80 +14,81 @@
*/

CREATE OR REPLACE FUNCTION runs.create_or_update_additional_data(
IN i_partitioning JSONB,
IN i_additional_data HSTORE,
IN i_by_user TEXT,
OUT status INTEGER,
OUT status_text TEXT,
OUT id_additional_data BIGINT
) RETURNS record AS
IN i_partitioning_id BIGINT,
IN i_additional_data HSTORE,
IN i_by_user TEXT,
OUT status INTEGER,
OUT status_text TEXT,
OUT o_ad_name TEXT,
OUT o_ad_value TEXT,
OUT o_ad_author TEXT
) RETURNS SETOF record AS
$$
-------------------------------------------------------------------------------
--
-- Function: runs.create_or_update_additional_data(3)
-- Adds the additional data for the input partitioning. If additional data of a given name already
-- exists for such partitioning, the value is updated and the old value is moved to the
-- additional data history table.
-- The function returns all actual additional data of the partitioning.
--
-- Parameters:
-- i_partitioning - partitioning to add the additional data for
-- i_partitioning_id - id of partitioning to add the additional data for
-- i_additional_data - sets of key/value pairs representing name and values of the additional data
-- i_by_user - user behind the change (an author of AD records if there will be something to upsert)
--
-- Returns:
-- status - Status code
-- status_text - Status text
-- id_additional_data - id of the data added
-- ad_name - Name of the additional data
-- ad_value - Value of the additional data
-- ad_author - Author of the additional data
--
-- Status codes:
-- 11 - Additional data have been added
-- 12 - Additional data have been upserted
-- 11 - Additional data have been updated, added or both
-- 14 - No changes in additional data (this is when they already existed)
-- 41 - Partitioning not found
--
-------------------------------------------------------------------------------
DECLARE
_fk_partitioning BIGINT;
_records_updated BOOLEAN;
_records_updated BOOLEAN;
BEGIN

_fk_partitioning := runs._get_id_partitioning(i_partitioning, true);

IF _fk_partitioning IS NULL THEN
PERFORM 1 FROM runs.partitionings WHERE id_partitioning = i_partitioning_id;
IF NOT FOUND THEN
status := 41;
status_text := 'Partitioning not found';
RETURN NEXT;
RETURN;
END IF;

-- 1. (backup) get records that already exist but values differ,
-- then insert them into AD history table and
-- then update the actual AD table with new values
_records_updated := runs._update_existing_additional_data(_fk_partitioning, i_additional_data, i_by_user);
_records_updated := runs._update_existing_additional_data(i_partitioning_id, i_additional_data, i_by_user);

-- 2. (insert) get records that do not not exist yet and insert it into ad table
-- (their original rows were previously saved in step 1)
INSERT INTO runs.additional_data (fk_partitioning, ad_name, ad_value, created_by)
SELECT _fk_partitioning, ad_input.key, ad_input.value, i_by_user
SELECT i_partitioning_id, ad_input.key, ad_input.value, i_by_user
FROM each(i_additional_data) AS ad_input
ON CONFLICT (fk_partitioning, ad_name) DO NOTHING;

IF _records_updated THEN
status := 12;
status_text := 'Additional data have been upserted';
-- 3. return the updated additional data (all, not only updated/added records)
IF not _records_updated AND not found THEN
RETURN QUERY
SELECT 14, 'No changes in additional data', GPAD.ad_name, GPAD.ad_value, GPAD.ad_author
FROM runs.get_partitioning_additional_data(i_partitioning_id) AS GPAD;
RETURN;
ELSE
IF found THEN
status := 11;
status_text := 'Additional data have been added';
ELSE
status := 14;
status_text := 'No changes in additional data';
END IF;
RETURN QUERY
SELECT 11, 'Additional data have been updated, added or both', GPAD.ad_name, GPAD.ad_value, GPAD.ad_author
FROM runs.get_partitioning_additional_data(i_partitioning_id) AS GPAD;
RETURN;
END IF;

RETURN;
END;
$$
LANGUAGE plpgsql VOLATILE SECURITY DEFINER;

ALTER FUNCTION runs.create_or_update_additional_data(JSONB, HSTORE, TEXT) OWNER TO atum_owner;
GRANT EXECUTE ON FUNCTION runs.create_or_update_additional_data(JSONB, HSTORE, TEXT) TO atum_user;
ALTER FUNCTION runs.create_or_update_additional_data(BIGINT, HSTORE, TEXT) OWNER TO atum_owner;
GRANT EXECUTE ON FUNCTION runs.create_or_update_additional_data(BIGINT, HSTORE, TEXT) TO atum_user;
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import za.co.absa.balta.DBTestSuite
import za.co.absa.balta.classes.JsonBString
import za.co.absa.balta.classes.setter.CustomDBType

class CreateOrUpdateAdditionalDataIntegrationTests extends DBTestSuite{
class CreateOrUpdateAdditionalDataIntegrationTests extends DBTestSuite {

private val fncCreateOrUpdateAdditionalData = "runs.create_or_update_additional_data"

Expand Down Expand Up @@ -70,15 +70,35 @@ class CreateOrUpdateAdditionalDataIntegrationTests extends DBTestSuite{
)

function(fncCreateOrUpdateAdditionalData)
.setParam("i_partitioning", partitioning)
.setParam("i_partitioning_id", fkPartitioning)
.setParam("i_additional_data", inputADToUpsert)
.setParam("i_by_user", "MikeRusty")
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()

assert(row.getInt("status").contains(12))
assert(row.getString("status_text").contains("Additional data have been upserted"))
assert(row.getInt("status").contains(11))
assert(row.getString("status_text").contains("Additional data have been updated, added or both"))
assert(row.getString("o_ad_name").contains("PrimaryOwner"))
assert(row.getString("o_ad_value").contains("TechnicalManagerA"))
assert(row.getString("o_ad_author").contains("SuperTool"))

assert(queryResult.hasNext)
val row2 = queryResult.next()

assert(row2.getInt("status").contains(11))
assert(row2.getString("status_text").contains("Additional data have been updated, added or both"))
assert(row2.getString("o_ad_name").contains("SecondaryOwner"))
assert(row2.getString("o_ad_value").contains("AnalystNew"))
assert(row2.getString("o_ad_author").contains("MikeRusty"))

assert(queryResult.hasNext)
val row3 = queryResult.next()
assert(row3.getInt("status").contains(11))
assert(row3.getString("status_text").contains("Additional data have been updated, added or both"))
assert(row3.getString("o_ad_name").contains("IsDatasetInDatalake"))
assert(row3.getString("o_ad_value").contains("true"))
assert(row3.getString("o_ad_author").contains("MikeRusty"))

assert(!queryResult.hasNext)
}
Expand Down Expand Up @@ -134,17 +154,15 @@ class CreateOrUpdateAdditionalDataIntegrationTests extends DBTestSuite{
)

function(fncCreateOrUpdateAdditionalData)
.setParam("i_partitioning", partitioning)
.setParam("i_partitioning_id", fkPartitioning)
.setParam("i_additional_data", inputADToUpsert)
.setParam("i_by_user", "MikeRusty")
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()

assert(row.getInt("status").contains(11))
assert(row.getString("status_text").contains("Additional data have been added"))

assert(!queryResult.hasNext)
assert(row.getString("status_text").contains("Additional data have been updated, added or both"))
}

assert(table("runs.additional_data").count() == 5)
Expand Down Expand Up @@ -199,7 +217,7 @@ class CreateOrUpdateAdditionalDataIntegrationTests extends DBTestSuite{
)

function(fncCreateOrUpdateAdditionalData)
.setParam("i_partitioning", partitioning)
.setParam("i_partitioning_id", fkPartitioning)
.setParam("i_additional_data", inputADToUpsert)
.setParam("i_by_user", "MikeRusty")
.execute { queryResult =>
Expand All @@ -208,8 +226,6 @@ class CreateOrUpdateAdditionalDataIntegrationTests extends DBTestSuite{

assert(row.getInt("status").contains(14))
assert(row.getString("status_text").contains("No changes in additional data"))

assert(!queryResult.hasNext)
}

assert(table("runs.additional_data").count(add("fk_partitioning", fkPartitioning)) == 2)
Expand All @@ -228,7 +244,7 @@ class CreateOrUpdateAdditionalDataIntegrationTests extends DBTestSuite{
)

function(fncCreateOrUpdateAdditionalData)
.setParam("i_partitioning", partitioning)
.setParam("i_partitioning_id", 0L)
.setParam("i_additional_data", inputADToInsert)
.setParam("i_by_user", "MikeRusty")
.execute { queryResult =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
import io.circe.{Decoder, Encoder}

case class AdditionalDataPatchDTO(
data: Map[String, AdditionalDataItemDTO]
byUser: String,
data: Map[String, String]
)

object AdditionalDataPatchDTO {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.model.dto

import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
import io.circe.{Decoder, Encoder}

case class AdditionalDataPatchItemDTO(
value: String,
author: String
)

object AdditionalDataPatchItemDTO {
implicit val encoderAdditionalDataPatchItem: Encoder[AdditionalDataPatchItemDTO] = deriveEncoder
implicit val decoderAdditionalDataPatchItem: Decoder[AdditionalDataPatchItemDTO] = deriveDecoder
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ package za.co.absa.atum.server.api.controller
import za.co.absa.atum.server.api.exception.ServiceError
import za.co.absa.atum.server.api.exception.ServiceError._
import za.co.absa.atum.server.api.http.ApiPaths
import za.co.absa.atum.server.model.{ConflictErrorResponse, ErrorResponse, InternalServerErrorResponse, NotFoundErrorResponse}
import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse}
import za.co.absa.atum.server.model.SuccessResponse._
import za.co.absa.atum.server.model._
import zio._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ trait PartitioningController {
partitioningId: Long
): IO[ErrorResponse, SingleSuccessResponse[AdditionalDataDTO]]

def createOrUpdateAdditionalDataV2(
additionalData: AdditionalDataSubmitDTO
): IO[ErrorResponse, SingleSuccessResponse[AdditionalDataSubmitDTO]]
def patchPartitioningAdditionalDataV2(
partitioningId: Long,
additionalDataPatchDTO: AdditionalDataPatchDTO
): IO[ErrorResponse, SingleSuccessResponse[AdditionalDataDTO]]

def getPartitioningCheckpointsV2(
checkpointQueryDTO: CheckpointQueryDTO
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,6 @@ class PartitioningControllerImpl(partitioningService: PartitioningService)
mapToSingleSuccessResponse(createPartitioningIfNotExistsV1(partitioningSubmitDTO))
}

override def createOrUpdateAdditionalDataV2(
additionalData: AdditionalDataSubmitDTO
): IO[ErrorResponse, SingleSuccessResponse[AdditionalDataSubmitDTO]] = {
mapToSingleSuccessResponse(
serviceCall[Unit, AdditionalDataSubmitDTO](
partitioningService.createOrUpdateAdditionalData(additionalData),
_ => additionalData
)
)
}

override def getPartitioningCheckpointsV2(
checkpointQueryDTO: CheckpointQueryDTO
): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] = {
Expand Down Expand Up @@ -94,6 +83,17 @@ class PartitioningControllerImpl(partitioningService: PartitioningService)
)
)
}

override def patchPartitioningAdditionalDataV2(
partitioningId: Long,
additionalDataPatchDTO: AdditionalDataPatchDTO
): IO[ErrorResponse, SingleSuccessResponse[AdditionalDataDTO]] = {
mapToSingleSuccessResponse(
serviceCall[AdditionalDataDTO, AdditionalDataDTO](
partitioningService.patchAdditionalData(partitioningId, additionalDataPatchDTO)
)
)
}
}

object PartitioningControllerImpl {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,31 +17,42 @@
package za.co.absa.atum.server.api.database.runs.functions

import doobie.implicits.toSqlInterpolator
import za.co.absa.atum.model.dto.AdditionalDataSubmitDTO
import za.co.absa.atum.server.api.database.PostgresDatabaseProvider
import za.co.absa.atum.server.api.database.runs.Runs
import za.co.absa.atum.server.model.PartitioningForDB
import za.co.absa.atum.server.model.AdditionalDataItemFromDB
import za.co.absa.db.fadb.DBSchema
import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieSingleResultFunctionWithStatus
import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWithAggStatus
import za.co.absa.db.fadb.doobie.DoobieEngine
import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling
import zio._
import io.circe.syntax._

import doobie.postgres.implicits._
import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbPut
import za.co.absa.atum.model.dto.AdditionalDataPatchDTO
import za.co.absa.atum.server.api.database.runs.functions.CreateOrUpdateAdditionalData.CreateOrUpdateAdditionalDataArgs
import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstRowStatusAggregator

class CreateOrUpdateAdditionalData(implicit schema: DBSchema, dbEngine: DoobieEngine[Task])
extends DoobieSingleResultFunctionWithStatus[AdditionalDataSubmitDTO, Unit, Task](values =>
extends DoobieMultipleResultFunctionWithAggStatus[CreateOrUpdateAdditionalDataArgs, Option[
AdditionalDataItemFromDB
], Task](args =>
Seq(
fr"${PartitioningForDB.fromSeqPartitionDTO(values.partitioning).asJson}",
fr"${values.additionalData.map { case (k, v) => (k, v.orNull) }}",
fr"${values.author}"
fr"${args.partitioningId}",
fr"${args.additionalData.data}",
fr"${args.additionalData.byUser}"
)
)
with StandardStatusHandling
with ByFirstRowStatusAggregator {

override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq("o_ad_name", "o_ad_value", "o_ad_author")
}

object CreateOrUpdateAdditionalData {
case class CreateOrUpdateAdditionalDataArgs(
partitioningId: Long,
additionalData: AdditionalDataPatchDTO
)

val layer: URLayer[PostgresDatabaseProvider, CreateOrUpdateAdditionalData] = ZLayer {
for {
dbProvider <- ZIO.service[PostgresDatabaseProvider]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,13 +80,16 @@ trait Endpoints extends BaseEndpoints {
.errorOutVariantPrepend(notFoundErrorOneOfVariant)
}

protected val createOrUpdateAdditionalDataEndpointV2
: PublicEndpoint[AdditionalDataSubmitDTO, ErrorResponse, SingleSuccessResponse[AdditionalDataSubmitDTO], Any] = {
apiV2.post
.in(CreateOrUpdateAdditionalData)
.in(jsonBody[AdditionalDataSubmitDTO])
protected val patchPartitioningAdditionalDataEndpointV2
: PublicEndpoint[(Long, AdditionalDataPatchDTO), ErrorResponse, SingleSuccessResponse[
AdditionalDataDTO
], Any] = {
apiV2.patch
.in(V2Paths.Partitionings / path[Long]("partitioningId") / V2Paths.AdditionalData)
.in(jsonBody[AdditionalDataPatchDTO])
.out(statusCode(StatusCode.Ok))
.out(jsonBody[SingleSuccessResponse[AdditionalDataSubmitDTO]])
.out(jsonBody[SingleSuccessResponse[AdditionalDataDTO]])
.errorOutVariantPrepend(notFoundErrorOneOfVariant)
}

protected val getPartitioningCheckpointEndpointV2
Expand Down
Loading

0 comments on commit c27c776

Please sign in to comment.