Skip to content

Commit

Permalink
merge master
Browse files Browse the repository at this point in the history
  • Loading branch information
TebaleloS committed Aug 27, 2024
2 parents 3c34522 + 696e77f commit f559e27
Show file tree
Hide file tree
Showing 23 changed files with 591 additions and 52 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

CREATE OR REPLACE FUNCTION runs.get_partitioning_additional_data(
IN i_partitioning_id BIGINT,
OUT status INTEGER,
OUT status_text TEXT,
OUT ad_name TEXT,
OUT ad_value TEXT,
OUT ad_author TEXT
) RETURNS SETOF record AS
$$
-------------------------------------------------------------------------------
--
-- Function: runs.get_partitioning_additional_data(1)
-- Returns additional data for the given partitioning
--
-- Parameters:
-- i_partitioning_id - id of the partitioning for requested additional data
--
-- Returns:
-- status - Status code
-- status_text - Status message
-- ad_name - Name of the additional data
-- ad_value - Value of the additional data
-- ad_author - Author of the additional data
--
-- Status codes:
-- 11 - OK
-- 16 - No additional data found
-- 41 - Partitioning not found
--
-------------------------------------------------------------------------------

BEGIN
PERFORM 1 FROM runs.partitionings WHERE id_partitioning = i_partitioning_id;
IF NOT FOUND THEN
status := 41;
status_text := 'Partitioning not found';
RETURN NEXT;
RETURN;
END IF;

status = 11;
status_text = 'OK';

RETURN QUERY
SELECT status, status_text, ad.ad_name, ad.ad_value, ad.created_by
FROM runs.additional_data AS ad
WHERE ad.fk_partitioning = i_partitioning_id;

IF NOT FOUND THEN
status := 16;
status_text := 'No additional data found';
RETURN NEXT;
RETURN;
END IF;
END;
$$
LANGUAGE plpgsql VOLATILE SECURITY DEFINER;

ALTER FUNCTION runs.get_partitioning_additional_data(BIGINT) OWNER TO atum_owner;
GRANT EXECUTE ON FUNCTION runs.get_partitioning_additional_data(BIGINT) TO atum_user;
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.database.runs

import za.co.absa.balta.DBTestSuite
import za.co.absa.balta.classes.JsonBString

class GetPartitioningAdditionalDataV2IntegrationTests extends DBTestSuite {

private val fncGetPartitioningAdditionalData = "runs.get_partitioning_additional_data"

private val partitioning1 = JsonBString(
"""
|{
| "version": 1,
| "keys": ["keyX", "keyY", "keyZ"],
| "keysToValues": {
| "keyX": "value1",
| "keyZ": "value3",
| "keyY": "value2"
| }
|}
|""".stripMargin
)

private val partitioning2 = JsonBString(
"""
|{
| "version": 1,
| "keys": ["key1", "key3", "key2", "key4"],
| "keysToValues": {
| "key1": "valueX",
| "key2": "valueY",
| "key3": "valueZ",
| "key4": "valueA"
| }
|}
|""".stripMargin
)

test("Get partitioning additional data returns additional data for partitioning with additional data") {
table("runs.partitionings").insert(
add("partitioning", partitioning1)
.add("created_by", "Joseph")
)

table("runs.partitionings").insert(
add("partitioning", partitioning2)
.add("created_by", "Daniel")
)

val fkPartitioning1: Long = table("runs.partitionings").fieldValue("partitioning", partitioning1, "id_partitioning").get.get
val fkPartitioning2: Long = table("runs.partitionings").fieldValue("partitioning", partitioning2, "id_partitioning").get.get

table("runs.additional_data").insert(
add("fk_partitioning", fkPartitioning1)
.add("created_by", "Joseph")
.add("ad_name", "ad_1")
.add("ad_value", "This is the additional data for Joseph")
)

table("runs.additional_data").insert(
add("fk_partitioning", fkPartitioning1)
.add("created_by", "Joseph")
.add("ad_name", "ad_2")
.add("ad_value", "This is the additional data for Joseph")
)

table("runs.additional_data").insert(
add("fk_partitioning", fkPartitioning2)
.add("created_by", "Daniel")
.add("ad_name", "ad_3")
.add("ad_value", "This is the additional data for Daniel")
)

function(fncGetPartitioningAdditionalData)
.setParam("i_partitioning_id", fkPartitioning1)
.execute { queryResult =>
val results = queryResult.next()
assert(results.getInt("status").contains(11))
assert(results.getString("status_text").contains("OK"))
assert(results.getString("ad_name").contains("ad_1"))
assert(results.getString("ad_value").contains("This is the additional data for Joseph"))
assert(results.getString("ad_author").contains("Joseph"))

val results2 = queryResult.next()
assert(results2.getInt("status").contains(11))
assert(results2.getString("status_text").contains("OK"))
assert(results2.getString("ad_name").contains("ad_2"))
assert(results2.getString("ad_value").contains("This is the additional data for Joseph"))
assert(results2.getString("ad_author").contains("Joseph"))

assert(!queryResult.hasNext)
}

table("runs.additional_data").where(add("fk_partitioning", fkPartitioning1)) { additionalDataResult =>
assert(additionalDataResult.hasNext)
val row = additionalDataResult.next()
assert(row.getString("ad_name").contains("ad_1"))
assert(row.getString("ad_value").contains("This is the additional data for Joseph"))
assert(row.getString("created_by").contains("Joseph"))
}

}

test("Get partitioning additional data should return no records for partitioning without additional data") {
table("runs.partitionings").insert(
add("partitioning", partitioning2)
.add("created_by", "Joseph")
)

val fkPartitioning: Long = table("runs.partitionings").fieldValue("partitioning", partitioning2, "id_partitioning").get.get

function(fncGetPartitioningAdditionalData)
.setParam("i_partitioning_id", fkPartitioning)
.execute { queryResult =>
val result = queryResult.next()
assert(result.getInt("status").contains(16))
assert(result.getString("status_text").contains("No additional data found"))
assert(!queryResult.hasNext)
}

table("runs.additional_data").where(add("fk_partitioning", fkPartitioning)) { additionalDataResult =>
assert(!additionalDataResult.hasNext)
}
}

test("Get partitioning additional data should return error status code on non existing partitioning") {
function(fncGetPartitioningAdditionalData)
.setParam("i_partitioning_id", 0L)
.execute { queryResult =>
val results = queryResult.next()
assert(results.getInt("status").contains(41))
assert(results.getString("status_text").contains("Partitioning not found"))
assert(!queryResult.hasNext)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
import io.circe.{Decoder, Encoder}

case class AdditionalDataItemDTO(
value: String,
value: Option[String],
author: String
)

Expand Down
1 change: 1 addition & 0 deletions server/src/main/scala/za/co/absa/atum/server/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ object Main extends ZIOAppDefault with Server {
CreatePartitioningIfNotExists.layer,
GetPartitioningMeasures.layer,
GetPartitioningAdditionalData.layer,
GetPartitioningAdditionalDataV2.layer,
CreateOrUpdateAdditionalData.layer,
GetPartitioningCheckpoints.layer,
WriteCheckpoint.layer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ trait BaseController {

def serviceCall[A, B](
serviceCall: IO[ServiceError, A],
onSuccessFnc: A => B
onSuccessFnc: A => B = identity[A] _
): IO[ErrorResponse, B] = {

serviceCall
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ class CheckpointControllerImpl(checkpointService: CheckpointService) extends Che
): IO[ErrorResponse, SingleSuccessResponse[CheckpointV2DTO]] = {
mapToSingleSuccessResponse(
serviceCall[CheckpointV2DTO, CheckpointV2DTO](
checkpointService.getCheckpointV2(partitioningId, checkpointId),
identity
checkpointService.getCheckpointV2(partitioningId, checkpointId)
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ class FlowControllerImpl(flowService: FlowService) extends FlowController with B
): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] = {
mapToMultiSuccessResponse(
serviceCall[Seq[CheckpointDTO], Seq[CheckpointDTO]](
flowService.getFlowCheckpoints(checkpointQueryDTO),
identity
flowService.getFlowCheckpoints(checkpointQueryDTO)
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,7 @@

package za.co.absa.atum.server.api.controller

import za.co.absa.atum.model.dto.{
AdditionalDataSubmitDTO,
AtumContextDTO,
CheckpointDTO,
CheckpointQueryDTO,
PartitioningSubmitDTO,
PartitioningWithIdDTO
}
import za.co.absa.atum.model.dto._
import za.co.absa.atum.server.model.ErrorResponse
import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse}
import zio.IO
Expand All @@ -39,6 +32,10 @@ trait PartitioningController {
partitioningSubmitDTO: PartitioningSubmitDTO
): IO[ErrorResponse, SingleSuccessResponse[AtumContextDTO]]

def getPartitioningAdditionalDataV2(
partitioningId: Long
): IO[ErrorResponse, SingleSuccessResponse[AdditionalDataDTO]]

def createOrUpdateAdditionalDataV2(
additionalData: AdditionalDataSubmitDTO
): IO[ErrorResponse, SingleSuccessResponse[AdditionalDataSubmitDTO]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,20 @@ class PartitioningControllerImpl(partitioningService: PartitioningService)
): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] = {
mapToMultiSuccessResponse(
serviceCall[Seq[CheckpointDTO], Seq[CheckpointDTO]](
partitioningService.getPartitioningCheckpoints(checkpointQueryDTO),
identity
partitioningService.getPartitioningCheckpoints(checkpointQueryDTO)
)
)
}

override def getPartitioningAdditionalDataV2(
partitioningId: Long
): IO[ErrorResponse, SingleSuccessResponse[AdditionalDataDTO]] = {
mapToSingleSuccessResponse(
serviceCall[AdditionalDataDTO, AdditionalDataDTO](
partitioningService.getPartitioningAdditionalDataV2(partitioningId)
)
)
}
override def getPartitioningV2(
partitioningId: Long
): IO[ErrorResponse, SingleSuccessResponse[PartitioningWithIdDTO]] = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.server.api.database.runs.functions

import doobie.implicits.toSqlInterpolator
import za.co.absa.atum.server.api.database.PostgresDatabaseProvider
import za.co.absa.atum.server.api.database.runs.Runs
import za.co.absa.atum.server.model.AdditionalDataItemFromDB
import za.co.absa.db.fadb.DBSchema
import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWithAggStatus
import za.co.absa.db.fadb.doobie.DoobieEngine
import zio.{Task, URLayer, ZIO, ZLayer}

import za.co.absa.atum.server.api.database.DoobieImplicits.getMapWithOptionStringValues
import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbPut
import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator
import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling

class GetPartitioningAdditionalDataV2(implicit schema: DBSchema, dbEngine: DoobieEngine[Task])
extends DoobieMultipleResultFunctionWithAggStatus[Long, Option[AdditionalDataItemFromDB], Task](input =>
Seq(fr"$input"), Some("get_partitioning_additional_data")
)
with StandardStatusHandling
with ByFirstErrorStatusAggregator {

override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq("ad_name", "ad_value", "ad_author")
}

object GetPartitioningAdditionalDataV2 {
val layer: URLayer[PostgresDatabaseProvider, GetPartitioningAdditionalDataV2] = ZLayer {
for {
dbProvider <- ZIO.service[PostgresDatabaseProvider]
} yield new GetPartitioningAdditionalDataV2()(Runs, dbProvider.dbEngine)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ trait Endpoints extends BaseEndpoints {
.out(jsonBody[SingleSuccessResponse[AtumContextDTO]])
}

protected val getPartitioningAdditionalDataEndpointV2
: PublicEndpoint[Long, ErrorResponse, SingleSuccessResponse[AdditionalDataDTO], Any] = {
apiV2.get
.in(V2Paths.Partitionings / path[Long]("partitioningId") / V2Paths.AdditionalData)
.out(statusCode(StatusCode.Ok))
.out(jsonBody[SingleSuccessResponse[AdditionalDataDTO]])
.errorOutVariantPrepend(notFoundErrorOneOfVariant)
}

protected val createOrUpdateAdditionalDataEndpointV2
: PublicEndpoint[AdditionalDataSubmitDTO, ErrorResponse, SingleSuccessResponse[AdditionalDataSubmitDTO], Any] = {
apiV2.post
Expand Down
Loading

0 comments on commit f559e27

Please sign in to comment.