Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

get partitioning #275

Merged
merged 12 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions .github/workflows/jacoco_report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ jobs:
- name: Add coverage to PR (model)
if: steps.jacocorun.outcome == 'success'
id: jacoco-model
uses: madrapps/jacoco-report@v1.6.1
uses: madrapps/jacoco-report@v1.7.1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, I though this was done in earlier of your PRs 😄

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I have done it also here to see "more correct" numbers whilst working on the draft pr ...

with:
paths: ${{ github.workspace }}/model/target/jvm-${{ env.scalaShort }}/jacoco/report/jacoco.xml
token: ${{ secrets.GITHUB_TOKEN }}
Expand All @@ -74,7 +74,7 @@ jobs:
- name: Add coverage to PR (agent)
if: steps.jacocorun.outcome == 'success'
id: jacoco-agent
uses: madrapps/jacoco-report@v1.6.1
uses: madrapps/jacoco-report@v1.7.1
with:
paths: ${{ github.workspace }}/agent/target/spark3-jvm-${{ env.scalaShort }}/jacoco/report/jacoco.xml
token: ${{ secrets.GITHUB_TOKEN }}
Expand All @@ -85,7 +85,7 @@ jobs:
- name: Add coverage to PR (reader)
if: steps.jacocorun.outcome == 'success'
id: jacoco-reader
uses: madrapps/jacoco-report@v1.6.1
uses: madrapps/jacoco-report@v1.7.1
with:
paths: ${{ github.workspace }}/reader/target/jvm-${{ env.scalaShort }}/jacoco/report/jacoco.xml
token: ${{ secrets.GITHUB_TOKEN }}
Expand All @@ -96,7 +96,7 @@ jobs:
- name: Add coverage to PR (server)
if: steps.jacocorun.outcome == 'success'
id: jacoco-server
uses: madrapps/jacoco-report@v1.6.1
uses: madrapps/jacoco-report@v1.7.1
with:
paths: ${{ github.workspace }}/server/target/jvm-${{ env.scalaShort }}/jacoco/report/jacoco.xml
token: ${{ secrets.GITHUB_TOKEN }}
Expand Down
82 changes: 82 additions & 0 deletions database/src/main/postgres/runs/V1.9.12__get_partitioning.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

CREATE OR REPLACE FUNCTION runs.get_partitioning(
IN i_partitioning JSONB,
OUT status INTEGER,
OUT status_text TEXT,
OUT id BIGINT,
OUT o_partitioning JSONB,
OUT author TEXT
) RETURNS record AS
$$
-------------------------------------------------------------------------------
--
-- Function: runs.get_partitioning(1)
-- Retrieves a partitioning by its JSONB representation.
--
-- Parameters:
-- i_partitioning - partitioning to search for, a valid example:
-- {
-- "keys": ["one", "two", "three"],
-- "version": 1,
-- "keysToValues": {
-- "one": "DatasetA",
-- "two": "Version1",
-- "three": "2022-12-20"
-- }
-- }
--
-- Returns:
-- status - status of the operation:
-- status_text - textual representation of the status
-- id - ID of the partitioning
-- o_partitioning - partitioning data
-- author - author of the partitioning
--
-- Status codes:
-- 11 - OK
-- 41 - Partitioning not found
--
-------------------------------------------------------------------------------
BEGIN
-- Initialize status and status_text
status := 41;
status_text := 'Partitioning not found';

-- Retrieve partitioning ID
id := runs._get_id_partitioning(i_partitioning);

-- If ID is found, retrieve partitioning details
IF id IS NOT NULL THEN
SELECT GPBI.id, GPBI.partitioning, GPBI.author
INTO get_partitioning.id, get_partitioning.o_partitioning, get_partitioning.author
FROM runs.get_partitioning_by_id(id) AS GPBI;

-- Update status if partitioning is found
IF FOUND THEN
status := 11;
status_text := 'OK';
END IF;
END IF;

RETURN;
END;
$$
LANGUAGE plpgsql IMMUTABLE SECURITY DEFINER;

ALTER FUNCTION runs.get_partitioning(i_partitioning JSONB) OWNER TO atum_owner;
GRANT EXECUTE ON FUNCTION runs.get_partitioning(i_partitioning JSONB) TO atum_user;
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.database.runs

import io.circe.Json
import za.co.absa.balta.DBTestSuite
import za.co.absa.balta.classes.JsonBString

class GetPartitioningIntegrationTests extends DBTestSuite {

private val fnCreatePartitioning = "runs.create_partitioning"
private val fnGetPartitioning = "runs.get_partitioning"

private val partitioning1Value =
"""
|{
| "version":1,
| "keys":["key1","key2","key3","key4"],
| "keysToValues":{
| "key1":"valueX",
| "key2":"valueY",
| "key3":"valueZ",
| "key4":"valueA"
| }
|}
|""".stripMargin

private val partitioning1 = JsonBString(partitioning1Value)

private val partitioning2 = JsonBString(
"""
|{
| "version":1,
| "keys":["key1","key2","key3","key4"],
| "keysToValues":{
| "key1":"valueX",
| "key2":"valueX",
| "key3":"valueX",
| "key4":"valueX"
| }
|}
|""".stripMargin
)

test("Existing (correct) partitioning is returned") {
val partitioning1ID = function(fnCreatePartitioning)
.setParam("i_partitioning", partitioning1)
.setParam("i_by_user", "Fantômas")
.setParamNull("i_parent_partitioning_id")
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(11))
assert(row.getString("status_text").contains("Partitioning created"))
row.getLong("id_partitioning").get
}

function(fnCreatePartitioning)
.setParam("i_partitioning", partitioning2)
.setParam("i_by_user", "Fantômas")
.setParamNull("i_parent_partitioning_id")
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(11))
assert(row.getString("status_text").contains("Partitioning created"))
}

function(fnGetPartitioning)
.setParam("i_partitioning", partitioning1)
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(11))
assert(row.getString("status_text").contains("OK"))
assert(row.getLong("id").contains(partitioning1ID))
assert {
val retrievedPartitioningAsJson = Json.fromString(row.getJsonB("o_partitioning").get.value)
val expectedPartitioningAsJson = Json.fromString(partitioning1Value)
retrievedPartitioningAsJson \\ "keysToValues" == expectedPartitioningAsJson \\ "keysToValues" &&
retrievedPartitioningAsJson \\ "keys" == expectedPartitioningAsJson \\ "keys"
}
assert(row.getString("author").contains("Fantômas"))
assert(!queryResult.hasNext)
}
}

test("Non-existent partitioning is not returned") {
function(fnGetPartitioning)
.setParam("i_partitioning", partitioning1)
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(41))
assert(row.getString("status_text").contains("Partitioning not found"))
assert(row.getLong("id").isEmpty)
assert(row.getJsonB("o_partitioning").isEmpty)
assert(row.getString("author").isEmpty)
assert(!queryResult.hasNext)
}
}

}
1 change: 1 addition & 0 deletions server/src/main/scala/za/co/absa/atum/server/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ object Main extends ZIOAppDefault with Server {
GetPartitioningCheckpointV2.layer,
GetFlowCheckpoints.layer,
GetPartitioningById.layer,
GetPartitioning.layer,
PostgresDatabaseProvider.layer,
TransactorProvider.layer,
AwsSecretsProviderImpl.layer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@

package za.co.absa.atum.server.api.controller

import io.circe.{Decoder, parser}
import za.co.absa.atum.server.api.exception.ServiceError
import za.co.absa.atum.server.api.exception.ServiceError._
import za.co.absa.atum.server.api.http.ApiPaths
import za.co.absa.atum.server.model.SuccessResponse._
import za.co.absa.atum.server.model._
import zio._

import java.util.Base64

trait BaseController {

def serviceCall[A, B](
Expand Down Expand Up @@ -59,4 +62,10 @@ trait BaseController {
protected def createV2RootAnchoredResourcePath(parts: Seq[String]): IO[ErrorResponse, String] = {
ZIO.succeed(s"/${ApiPaths.Api}/${ApiPaths.V2}/${parts.mkString("/")}")
}

protected def base64Decode[T: Decoder](base64EncodedString: String): Either[io.circe.Error, T] = {
val decodedBytes = Base64.getDecoder.decode(base64EncodedString)
val decodedString = new String(decodedBytes, "UTF-8")
parser.decode[T](decodedString)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,13 @@ trait PartitioningController {
checkpointQueryDTO: CheckpointQueryDTO
): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]]

def getPartitioningV2(partitioningId: Long): IO[ErrorResponse, SingleSuccessResponse[PartitioningWithIdDTO]]

def getPartitioningByIdV2(partitioningId: Long): IO[ErrorResponse, SingleSuccessResponse[PartitioningWithIdDTO]]

def getPartitioningMeasuresV2(
partitioningId: Long
): IO[ErrorResponse, MultiSuccessResponse[MeasureDTO]]

def getPartitioning(
partitioning: String
): IO[ErrorResponse, SingleSuccessResponse[PartitioningWithIdDTO]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import za.co.absa.atum.model.dto._
import za.co.absa.atum.server.api.exception.ServiceError
import za.co.absa.atum.server.api.http.ApiPaths.V2Paths
import za.co.absa.atum.server.api.service.PartitioningService
import za.co.absa.atum.server.model.{ErrorResponse, InternalServerErrorResponse}
import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse}
import za.co.absa.atum.server.model.SuccessResponse._
import za.co.absa.atum.server.model.{ErrorResponse, GeneralErrorResponse, InternalServerErrorResponse}
import zio._

class PartitioningControllerImpl(partitioningService: PartitioningService)
Expand Down Expand Up @@ -69,12 +69,12 @@ class PartitioningControllerImpl(partitioningService: PartitioningService)
)
)
}
override def getPartitioningV2(
override def getPartitioningByIdV2(
partitioningId: Long
): IO[ErrorResponse, SingleSuccessResponse[PartitioningWithIdDTO]] = {
mapToSingleSuccessResponse(
serviceCall[PartitioningWithIdDTO, PartitioningWithIdDTO](
partitioningService.getPartitioning(partitioningId)
partitioningService.getPartitioningById(partitioningId)
)
)
}
Expand Down Expand Up @@ -115,6 +115,21 @@ class PartitioningControllerImpl(partitioningService: PartitioningService)
)
}

override def getPartitioning(
partitioning: String
): IO[ErrorResponse, SingleSuccessResponse[PartitioningWithIdDTO]] = {
for {
decodedPartitions <- ZIO
.fromEither(base64Decode[PartitioningDTO](partitioning))
.mapError(error => GeneralErrorResponse(error.getMessage))
response <-
mapToSingleSuccessResponse[PartitioningWithIdDTO](
serviceCall[PartitioningWithIdDTO, PartitioningWithIdDTO](
partitioningService.getPartitioning(decodedPartitions)
)
)
} yield response
}
}

object PartitioningControllerImpl {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.server.api.database.runs.functions

import doobie.implicits.toSqlInterpolator
import io.circe.syntax.EncoderOps
import za.co.absa.atum.server.api.database.PostgresDatabaseProvider
import za.co.absa.atum.server.api.database.runs.Runs
import za.co.absa.atum.server.model.{PartitioningForDB, PartitioningFromDB}
import za.co.absa.db.fadb.DBSchema
import za.co.absa.db.fadb.doobie.DoobieEngine
import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieSingleResultFunctionWithStatus
import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling
import zio._

import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbPut
import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbGet

class GetPartitioning(implicit schema: DBSchema, dbEngine: DoobieEngine[Task])
extends DoobieSingleResultFunctionWithStatus[PartitioningForDB, Option[PartitioningFromDB], Task](partitioningForDB =>
Seq(fr"${partitioningForDB.asJson}")
) with StandardStatusHandling {
override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq("id", "o_partitioning", "author")
}

object GetPartitioning {
val layer: URLayer[PostgresDatabaseProvider, GetPartitioning] = ZLayer {
for {
dbProvider <- ZIO.service[PostgresDatabaseProvider]
} yield new GetPartitioning()(Runs, dbProvider.dbEngine)
}
}
Loading
Loading