Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

get partitioning #275

Merged
merged 12 commits into from
Sep 27, 2024
Merged
82 changes: 82 additions & 0 deletions database/src/main/postgres/runs/V1.9.12__get_partitioning.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

CREATE OR REPLACE FUNCTION runs.get_partitioning(
IN i_partitioning JSONB,
OUT status INTEGER,
OUT status_text TEXT,
OUT id BIGINT,
OUT o_partitioning JSONB,
OUT author TEXT
) RETURNS record AS
$$
-------------------------------------------------------------------------------
--
-- Function: runs.get_partitioning(1)
-- Retrieves a partitioning by its JSONB representation.
--
-- Parameters:
-- i_partitioning - partitioning to search for, a valid example:
-- {
-- "keys": ["one", "two", "three"],
-- "version": 1,
-- "keysToValues": {
-- "one": "DatasetA",
-- "two": "Version1",
-- "three": "2022-12-20"
-- }
-- }
--
-- Returns:
-- status - status of the operation:
-- status_text - textual representation of the status
-- id - ID of the partitioning
-- o_partitioning - partitioning data
-- author - author of the partitioning
--
-- Status codes:
-- 11 - OK
-- 41 - Partitioning not found
--
-------------------------------------------------------------------------------
BEGIN
-- Initialize status and status_text
status := 41;
status_text := 'Partitioning not found';

-- Retrieve partitioning ID
id := runs._get_id_partitioning(i_partitioning);

-- If ID is found, retrieve partitioning details
IF id IS NOT NULL THEN
SELECT GPBI.id, GPBI.partitioning, GPBI.author
INTO get_partitioning.id, get_partitioning.o_partitioning, get_partitioning.author
FROM runs.get_partitioning_by_id(id) AS GPBI;

-- Update status if partitioning is found
IF FOUND THEN
status := 11;
status_text := 'OK';
END IF;
END IF;

RETURN;
END;
$$
LANGUAGE plpgsql IMMUTABLE SECURITY DEFINER;

ALTER FUNCTION runs.get_partitioning(i_partitioning JSONB) OWNER TO atum_owner;
GRANT EXECUTE ON FUNCTION runs.get_partitioning(i_partitioning JSONB) TO atum_user;
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.database.runs

import io.circe.Json
import za.co.absa.balta.DBTestSuite
import za.co.absa.balta.classes.JsonBString

class GetPartitioningIntegrationTests extends DBTestSuite {

private val fnCreatePartitioning = "runs.create_partitioning"
private val fnGetPartitioning = "runs.get_partitioning"

private val partitioning1Value =
"""
|{
| "version":1,
| "keys":["key1","key2","key3","key4"],
| "keysToValues":{
| "key1":"valueX",
| "key2":"valueY",
| "key3":"valueZ",
| "key4":"valueA"
| }
|}
|""".stripMargin

private val partitioning1 = JsonBString(partitioning1Value)

private val partitioning2 = JsonBString(
"""
|{
| "version":1,
| "keys":["key1","key2","key3","key4"],
| "keysToValues":{
| "key1":"valueX",
| "key2":"valueX",
| "key3":"valueX",
| "key4":"valueX"
| }
|}
|""".stripMargin
)

test("Existing (correct) partitioning is returned") {
val partitioning1ID = function(fnCreatePartitioning)
.setParam("i_partitioning", partitioning1)
.setParam("i_by_user", "Fantômas")
.setParamNull("i_parent_partitioning_id")
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(11))
assert(row.getString("status_text").contains("Partitioning created"))
row.getLong("id_partitioning").get
}

function(fnCreatePartitioning)
.setParam("i_partitioning", partitioning2)
.setParam("i_by_user", "Fantômas")
.setParamNull("i_parent_partitioning_id")
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(11))
assert(row.getString("status_text").contains("Partitioning created"))
}

function(fnGetPartitioning)
.setParam("i_partitioning", partitioning1)
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(11))
assert(row.getString("status_text").contains("OK"))
assert(row.getLong("id").contains(partitioning1ID))
assert {
val retrievedPartitioningAsJson = Json.fromString(row.getJsonB("o_partitioning").get.value)
val expectedPartitioningAsJson = Json.fromString(partitioning1Value)
retrievedPartitioningAsJson \\ "keysToValues" == expectedPartitioningAsJson \\ "keysToValues" &&
retrievedPartitioningAsJson \\ "keys" == expectedPartitioningAsJson \\ "keys"
}
assert(row.getString("author").contains("Fantômas"))
assert(!queryResult.hasNext)
}
}

test("Non-existent partitioning is not returned") {
function(fnGetPartitioning)
.setParam("i_partitioning", partitioning1)
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(41))
assert(row.getString("status_text").contains("Partitioning not found"))
assert(row.getLong("id").isEmpty)
assert(row.getJsonB("o_partitioning").isEmpty)
assert(row.getString("author").isEmpty)
assert(!queryResult.hasNext)
}
}

}
45 changes: 0 additions & 45 deletions server/src/main/scala/za/co/absa/atum/server/Constants.scala

This file was deleted.

7 changes: 4 additions & 3 deletions server/src/main/scala/za/co/absa/atum/server/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package za.co.absa.atum.server

import za.co.absa.atum.server.api.controller._
import za.co.absa.atum.server.api.database.flows.functions.{GetFlowCheckpoints, GetFlowPartitionings}
import za.co.absa.atum.server.api.database.flows.functions._
import za.co.absa.atum.server.api.database.{PostgresDatabaseProvider, TransactorProvider}
import za.co.absa.atum.server.api.database.runs.functions._
import za.co.absa.atum.server.api.http.Server
import za.co.absa.atum.server.api.repository.{CheckpointRepositoryImpl, FlowRepositoryImpl, PartitioningRepositoryImpl}
import za.co.absa.atum.server.api.service.{CheckpointServiceImpl, FlowServiceImpl, PartitioningServiceImpl}
import za.co.absa.atum.server.api.repository._
import za.co.absa.atum.server.api.service._
import za.co.absa.atum.server.aws.AwsSecretsProviderImpl
import za.co.absa.atum.server.config.JvmMonitoringConfig
import zio._
Expand Down Expand Up @@ -63,6 +63,7 @@ object Main extends ZIOAppDefault with Server {
GetPartitioningCheckpointV2.layer,
GetFlowCheckpoints.layer,
GetPartitioningById.layer,
GetPartitioning.layer,
GetFlowPartitionings.layer,
PostgresDatabaseProvider.layer,
TransactorProvider.layer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package za.co.absa.atum.server.api.controller

import io.circe.{Decoder, parser}
import za.co.absa.atum.server.api.exception.ServiceError
import za.co.absa.atum.server.api.exception.ServiceError._
import za.co.absa.atum.server.api.http.ApiPaths
Expand All @@ -24,6 +25,8 @@ import za.co.absa.atum.server.model.SuccessResponse._
import za.co.absa.atum.server.model._
import zio._

import java.util.Base64

trait BaseController {

def serviceCall[A, B](
Expand Down Expand Up @@ -71,4 +74,10 @@ trait BaseController {
protected def createV2RootAnchoredResourcePath(parts: Seq[String]): IO[ErrorResponse, String] = {
ZIO.succeed(s"/${ApiPaths.Api}/${ApiPaths.V2}/${parts.mkString("/")}")
}

protected def base64Decode[T: Decoder](base64EncodedString: String): Either[io.circe.Error, T] = {
val decodedBytes = Base64.getDecoder.decode(base64EncodedString)
val decodedString = new String(decodedBytes, "UTF-8")
parser.decode[T](decodedString)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import zio._

class FlowControllerImpl(flowService: FlowService) extends FlowController with BaseController {

// to be replaced (and moved to checkpointcontroller) with new implementation in #233
override def getFlowCheckpointsV2(
checkpointQueryDTO: CheckpointQueryDTO
): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package za.co.absa.atum.server.api.controller

import za.co.absa.atum.model.dto._
import za.co.absa.atum.server.model.ErrorResponse
import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, PaginatedResponse, SingleSuccessResponse}
import za.co.absa.atum.server.model.SuccessResponse._
import zio.IO
import zio.macros.accessible

Expand All @@ -41,12 +41,16 @@ trait PartitioningController {
additionalDataPatchDTO: AdditionalDataPatchDTO
): IO[ErrorResponse, SingleSuccessResponse[AdditionalDataDTO]]

def getPartitioningV2(partitioningId: Long): IO[ErrorResponse, SingleSuccessResponse[PartitioningWithIdDTO]]
def getPartitioningByIdV2(partitioningId: Long): IO[ErrorResponse, SingleSuccessResponse[PartitioningWithIdDTO]]

def getPartitioningMeasuresV2(
partitioningId: Long
): IO[ErrorResponse, MultiSuccessResponse[MeasureDTO]]

def getPartitioning(
partitioning: String
): IO[ErrorResponse, SingleSuccessResponse[PartitioningWithIdDTO]]

def getFlowPartitionings(
flowId: Long,
limit: Option[Int],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import za.co.absa.atum.model.dto._
import za.co.absa.atum.server.api.exception.ServiceError
import za.co.absa.atum.server.api.http.ApiPaths.V2Paths
import za.co.absa.atum.server.api.service.PartitioningService
import za.co.absa.atum.server.model.{ErrorResponse, InternalServerErrorResponse, PaginatedResult}
import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, PaginatedResponse, SingleSuccessResponse}
import za.co.absa.atum.server.model.SuccessResponse._
import za.co.absa.atum.server.model.{ErrorResponse, GeneralErrorResponse, InternalServerErrorResponse, PaginatedResult}
import zio._

class PartitioningControllerImpl(partitioningService: PartitioningService)
Expand Down Expand Up @@ -59,12 +59,12 @@ class PartitioningControllerImpl(partitioningService: PartitioningService)
)
)
}
override def getPartitioningV2(
override def getPartitioningByIdV2(
partitioningId: Long
): IO[ErrorResponse, SingleSuccessResponse[PartitioningWithIdDTO]] = {
mapToSingleSuccessResponse(
serviceCall[PartitioningWithIdDTO, PartitioningWithIdDTO](
partitioningService.getPartitioning(partitioningId)
partitioningService.getPartitioningById(partitioningId)
)
)
}
Expand Down Expand Up @@ -119,6 +119,21 @@ class PartitioningControllerImpl(partitioningService: PartitioningService)
)
}

override def getPartitioning(
partitioning: String
): IO[ErrorResponse, SingleSuccessResponse[PartitioningWithIdDTO]] = {
for {
decodedPartitions <- ZIO
.fromEither(base64Decode[PartitioningDTO](partitioning))
.mapError(error => GeneralErrorResponse(error.getMessage))
response <-
mapToSingleSuccessResponse[PartitioningWithIdDTO](
serviceCall[PartitioningWithIdDTO, PartitioningWithIdDTO](
partitioningService.getPartitioning(decodedPartitions)
)
)
} yield response
}
}

object PartitioningControllerImpl {
Expand Down
Loading
Loading