Skip to content

Commit

Permalink
Merge branch 'master' into feature/278-docker-zio-compatibility
Browse files Browse the repository at this point in the history
  • Loading branch information
lsulak authored Sep 27, 2024
2 parents 4f620f9 + ae026c8 commit 8532edf
Show file tree
Hide file tree
Showing 26 changed files with 725 additions and 311 deletions.
82 changes: 82 additions & 0 deletions database/src/main/postgres/runs/V1.9.12__get_partitioning.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

CREATE OR REPLACE FUNCTION runs.get_partitioning(
IN i_partitioning JSONB,
OUT status INTEGER,
OUT status_text TEXT,
OUT id BIGINT,
OUT o_partitioning JSONB,
OUT author TEXT
) RETURNS record AS
$$
-------------------------------------------------------------------------------
--
-- Function: runs.get_partitioning(1)
-- Retrieves a partitioning by its JSONB representation.
--
-- Parameters:
-- i_partitioning - partitioning to search for, a valid example:
-- {
-- "keys": ["one", "two", "three"],
-- "version": 1,
-- "keysToValues": {
-- "one": "DatasetA",
-- "two": "Version1",
-- "three": "2022-12-20"
-- }
-- }
--
-- Returns:
-- status - status of the operation:
-- status_text - textual representation of the status
-- id - ID of the partitioning
-- o_partitioning - partitioning data
-- author - author of the partitioning
--
-- Status codes:
-- 11 - OK
-- 41 - Partitioning not found
--
-------------------------------------------------------------------------------
BEGIN
-- Initialize status and status_text
status := 41;
status_text := 'Partitioning not found';

-- Retrieve partitioning ID
id := runs._get_id_partitioning(i_partitioning);

-- If ID is found, retrieve partitioning details
IF id IS NOT NULL THEN
SELECT GPBI.id, GPBI.partitioning, GPBI.author
INTO get_partitioning.id, get_partitioning.o_partitioning, get_partitioning.author
FROM runs.get_partitioning_by_id(id) AS GPBI;

-- Update status if partitioning is found
IF FOUND THEN
status := 11;
status_text := 'OK';
END IF;
END IF;

RETURN;
END;
$$
LANGUAGE plpgsql IMMUTABLE SECURITY DEFINER;

ALTER FUNCTION runs.get_partitioning(i_partitioning JSONB) OWNER TO atum_owner;
GRANT EXECUTE ON FUNCTION runs.get_partitioning(i_partitioning JSONB) TO atum_user;
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.database.runs

import io.circe.Json
import za.co.absa.balta.DBTestSuite
import za.co.absa.balta.classes.JsonBString

class GetPartitioningIntegrationTests extends DBTestSuite {

private val fnCreatePartitioning = "runs.create_partitioning"
private val fnGetPartitioning = "runs.get_partitioning"

private val partitioning1Value =
"""
|{
| "version":1,
| "keys":["key1","key2","key3","key4"],
| "keysToValues":{
| "key1":"valueX",
| "key2":"valueY",
| "key3":"valueZ",
| "key4":"valueA"
| }
|}
|""".stripMargin

private val partitioning1 = JsonBString(partitioning1Value)

private val partitioning2 = JsonBString(
"""
|{
| "version":1,
| "keys":["key1","key2","key3","key4"],
| "keysToValues":{
| "key1":"valueX",
| "key2":"valueX",
| "key3":"valueX",
| "key4":"valueX"
| }
|}
|""".stripMargin
)

test("Existing (correct) partitioning is returned") {
val partitioning1ID = function(fnCreatePartitioning)
.setParam("i_partitioning", partitioning1)
.setParam("i_by_user", "Fantômas")
.setParamNull("i_parent_partitioning_id")
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(11))
assert(row.getString("status_text").contains("Partitioning created"))
row.getLong("id_partitioning").get
}

function(fnCreatePartitioning)
.setParam("i_partitioning", partitioning2)
.setParam("i_by_user", "Fantômas")
.setParamNull("i_parent_partitioning_id")
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(11))
assert(row.getString("status_text").contains("Partitioning created"))
}

function(fnGetPartitioning)
.setParam("i_partitioning", partitioning1)
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(11))
assert(row.getString("status_text").contains("OK"))
assert(row.getLong("id").contains(partitioning1ID))
assert {
val retrievedPartitioningAsJson = Json.fromString(row.getJsonB("o_partitioning").get.value)
val expectedPartitioningAsJson = Json.fromString(partitioning1Value)
retrievedPartitioningAsJson \\ "keysToValues" == expectedPartitioningAsJson \\ "keysToValues" &&
retrievedPartitioningAsJson \\ "keys" == expectedPartitioningAsJson \\ "keys"
}
assert(row.getString("author").contains("Fantômas"))
assert(!queryResult.hasNext)
}
}

test("Non-existent partitioning is not returned") {
function(fnGetPartitioning)
.setParam("i_partitioning", partitioning1)
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(41))
assert(row.getString("status_text").contains("Partitioning not found"))
assert(row.getLong("id").isEmpty)
assert(row.getJsonB("o_partitioning").isEmpty)
assert(row.getString("author").isEmpty)
assert(!queryResult.hasNext)
}
}

}
45 changes: 0 additions & 45 deletions server/src/main/scala/za/co/absa/atum/server/Constants.scala

This file was deleted.

7 changes: 4 additions & 3 deletions server/src/main/scala/za/co/absa/atum/server/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
package za.co.absa.atum.server

import za.co.absa.atum.server.api.controller._
import za.co.absa.atum.server.api.database.flows.functions.{GetFlowCheckpoints, GetFlowPartitionings}
import za.co.absa.atum.server.api.database.flows.functions._
import za.co.absa.atum.server.api.database.{PostgresDatabaseProvider, TransactorProvider}
import za.co.absa.atum.server.api.database.runs.functions._
import za.co.absa.atum.server.api.http.Server
import za.co.absa.atum.server.api.repository.{CheckpointRepositoryImpl, FlowRepositoryImpl, PartitioningRepositoryImpl}
import za.co.absa.atum.server.api.service.{CheckpointServiceImpl, FlowServiceImpl, PartitioningServiceImpl}
import za.co.absa.atum.server.api.repository._
import za.co.absa.atum.server.api.service._
import za.co.absa.atum.server.aws.AwsSecretsProviderImpl
import za.co.absa.atum.server.config.JvmMonitoringConfig
import zio._
Expand Down Expand Up @@ -63,6 +63,7 @@ object Main extends ZIOAppDefault with Server {
GetPartitioningCheckpointV2.layer,
GetFlowCheckpoints.layer,
GetPartitioningById.layer,
GetPartitioning.layer,
GetFlowPartitionings.layer,
PostgresDatabaseProvider.layer,
TransactorProvider.layer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package za.co.absa.atum.server.api.controller

import io.circe.{Decoder, parser}
import za.co.absa.atum.server.api.exception.ServiceError
import za.co.absa.atum.server.api.exception.ServiceError._
import za.co.absa.atum.server.api.http.ApiPaths
Expand All @@ -24,6 +25,8 @@ import za.co.absa.atum.server.model.SuccessResponse._
import za.co.absa.atum.server.model._
import zio._

import java.util.Base64

trait BaseController {

def serviceCall[A, B](
Expand Down Expand Up @@ -71,4 +74,10 @@ trait BaseController {
protected def createV2RootAnchoredResourcePath(parts: Seq[String]): IO[ErrorResponse, String] = {
ZIO.succeed(s"/${ApiPaths.Api}/${ApiPaths.V2}/${parts.mkString("/")}")
}

protected def base64Decode[T: Decoder](base64EncodedString: String): Either[io.circe.Error, T] = {
val decodedBytes = Base64.getDecoder.decode(base64EncodedString)
val decodedString = new String(decodedBytes, "UTF-8")
parser.decode[T](decodedString)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import zio._

class FlowControllerImpl(flowService: FlowService) extends FlowController with BaseController {

// to be replaced (and moved to checkpointcontroller) with new implementation in #233
override def getFlowCheckpointsV2(
checkpointQueryDTO: CheckpointQueryDTO
): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package za.co.absa.atum.server.api.controller

import za.co.absa.atum.model.dto._
import za.co.absa.atum.server.model.ErrorResponse
import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, PaginatedResponse, SingleSuccessResponse}
import za.co.absa.atum.server.model.SuccessResponse._
import zio.IO
import zio.macros.accessible

Expand All @@ -41,12 +41,16 @@ trait PartitioningController {
additionalDataPatchDTO: AdditionalDataPatchDTO
): IO[ErrorResponse, SingleSuccessResponse[AdditionalDataDTO]]

def getPartitioningV2(partitioningId: Long): IO[ErrorResponse, SingleSuccessResponse[PartitioningWithIdDTO]]
def getPartitioningByIdV2(partitioningId: Long): IO[ErrorResponse, SingleSuccessResponse[PartitioningWithIdDTO]]

def getPartitioningMeasuresV2(
partitioningId: Long
): IO[ErrorResponse, MultiSuccessResponse[MeasureDTO]]

def getPartitioning(
partitioning: String
): IO[ErrorResponse, SingleSuccessResponse[PartitioningWithIdDTO]]

def getFlowPartitionings(
flowId: Long,
limit: Option[Int],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import za.co.absa.atum.model.dto._
import za.co.absa.atum.server.api.exception.ServiceError
import za.co.absa.atum.server.api.http.ApiPaths.V2Paths
import za.co.absa.atum.server.api.service.PartitioningService
import za.co.absa.atum.server.model.{ErrorResponse, InternalServerErrorResponse, PaginatedResult}
import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, PaginatedResponse, SingleSuccessResponse}
import za.co.absa.atum.server.model.SuccessResponse._
import za.co.absa.atum.server.model.{ErrorResponse, GeneralErrorResponse, InternalServerErrorResponse, PaginatedResult}
import zio._

class PartitioningControllerImpl(partitioningService: PartitioningService)
Expand Down Expand Up @@ -59,12 +59,12 @@ class PartitioningControllerImpl(partitioningService: PartitioningService)
)
)
}
override def getPartitioningV2(
override def getPartitioningByIdV2(
partitioningId: Long
): IO[ErrorResponse, SingleSuccessResponse[PartitioningWithIdDTO]] = {
mapToSingleSuccessResponse(
serviceCall[PartitioningWithIdDTO, PartitioningWithIdDTO](
partitioningService.getPartitioning(partitioningId)
partitioningService.getPartitioningById(partitioningId)
)
)
}
Expand Down Expand Up @@ -119,6 +119,21 @@ class PartitioningControllerImpl(partitioningService: PartitioningService)
)
}

override def getPartitioning(
partitioning: String
): IO[ErrorResponse, SingleSuccessResponse[PartitioningWithIdDTO]] = {
for {
decodedPartitions <- ZIO
.fromEither(base64Decode[PartitioningDTO](partitioning))
.mapError(error => GeneralErrorResponse(error.getMessage))
response <-
mapToSingleSuccessResponse[PartitioningWithIdDTO](
serviceCall[PartitioningWithIdDTO, PartitioningWithIdDTO](
partitioningService.getPartitioning(decodedPartitions)
)
)
} yield response
}
}

object PartitioningControllerImpl {
Expand Down
Loading

0 comments on commit 8532edf

Please sign in to comment.