Skip to content

Commit

Permalink
Fixing V1.8.3__get_partitioning_checkpoints.sql fields
Browse files Browse the repository at this point in the history
  • Loading branch information
TebaleloS committed May 31, 2024
1 parent 46da906 commit f5e3d4c
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ CREATE OR REPLACE FUNCTION runs.get_partitioning_checkpoints(
OUT id_checkpoint UUID,
OUT checkpoint_name TEXT,
OUT author TEXT,
OUT measuredByAtumAgent BOOLEAN,
OUT measured_by_atum_agent BOOLEAN,
OUT measure_name TEXT,
OUT measured_columns TEXT[],
OUT measured_columns TEXT[],
OUT measurement_value JSONB,
OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE,
OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package za.co.absa.atum.server.api.database.runs.functions
import doobie.Fragment
import doobie.implicits.toSqlInterpolator
import doobie.util.Read
import play.api.libs.json.Json
import za.co.absa.atum.model.dto.CheckpointQueryDTO
import za.co.absa.atum.server.api.database.PostgresDatabaseProvider
import za.co.absa.atum.server.api.database.runs.Runs
Expand All @@ -28,13 +29,14 @@ import za.co.absa.fadb.doobie.DoobieEngine
import za.co.absa.fadb.doobie.DoobieFunction.DoobieMultipleResultFunction
import zio._
import zio.interop.catz._

import za.co.absa.atum.server.api.database.DoobieImplicits.Sequence.get
import doobie.postgres.circe.jsonb.implicits.jsonbGet
import doobie.postgres.implicits._
import doobie.postgres.circe.jsonb.implicits._
import io.circe.syntax.EncoderOps
import io.circe.generic.auto._
import io.circe.Json
import io.circe.parser._


class GetPartitioningCheckpoints (implicit schema: DBSchema, dbEngine: DoobieEngine[Task])
extends DoobieMultipleResultFunction[CheckpointQueryDTO, CheckpointFromDB, Task] {
Expand All @@ -52,17 +54,19 @@ class GetPartitioningCheckpoints (implicit schema: DBSchema, dbEngine: DoobieEng
)

override def sql(values: CheckpointQueryDTO)(implicit read: Read[CheckpointFromDB]): Fragment = {
val partitioning = PartitioningForDB.fromSeqPartitionDTO(values.partitioning).asJson
val partitioningNormalized = partitioning.noSpaces
val partitioning = PartitioningForDB.fromSeqPartitionDTO(values.partitioning)
val partitioningNormalized = Json.toJson(partitioning).toString

sql"""SELECT ${Fragment.const(selectEntry)}
FROM ${Fragment.const(functionName)}(
$partitioningNormalized,
${
import za.co.absa.atum.server.api.database.DoobieImplicits.Jsonb.jsonbPutUsingString
partitioningNormalized
},
${values.limit},
${values.checkpointName},
${values.checkpointName}
) AS ${Fragment.const(alias)};"""
}

}

object GetPartitioningCheckpoints {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,18 @@

package za.co.absa.atum.server.api.repository

import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataSubmitDTO, CheckpointQueryDTO, MeasureDTO, PartitioningDTO, PartitioningSubmitDTO}
import za.co.absa.atum.server.api.database.runs.functions.{CreateOrUpdateAdditionalData, CreatePartitioningIfNotExists, GetPartitioningAdditionalData, GetPartitioningCheckpoints, GetPartitioningMeasures}
import za.co.absa.atum.model.dto.{
AdditionalDataDTO, AdditionalDataSubmitDTO,
CheckpointQueryDTO, MeasureDTO, PartitioningDTO,
PartitioningSubmitDTO}
import za.co.absa.atum.server.api.database.runs.functions.{
CreateOrUpdateAdditionalData,
CreatePartitioningIfNotExists,
GetPartitioningAdditionalData,
GetPartitioningCheckpoints,
GetPartitioningMeasures }
import za.co.absa.atum.server.api.exception.DatabaseError
import za.co.absa.atum.server.model.CheckpointMeasurements
import za.co.absa.atum.server.model.CheckpointFromDB
import za.co.absa.fadb.exceptions.StatusException
import zio._
import zio.prelude.ZivariantOps
Expand Down Expand Up @@ -57,7 +65,7 @@ class PartitioningRepositoryImpl(
}

override def getPartitioningCheckpoints(partitioningName: CheckpointQueryDTO):
IO[DatabaseError, Seq[CheckpointMeasurements]] = {
IO[DatabaseError, Seq[CheckpointFromDB]] = {
dbCall(getPartitioningCheckpointsFn(partitioningName), "getPartitioningCheckpoints")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import za.co.absa.atum.server.api.exception.ServiceError
import za.co.absa.atum.server.api.repository.PartitioningRepository
import za.co.absa.fadb.exceptions.StatusException
import za.co.absa.atum.server.api.exception.DatabaseError
import za.co.absa.atum.server.model.CheckpointFromDB
import za.co.absa.atum.server.model.CheckpointFromDBObject
import zio._

class PartitioningServiceImpl(partitioningRepository: PartitioningRepository)
Expand Down Expand Up @@ -56,21 +56,21 @@ class PartitioningServiceImpl(partitioningRepository: PartitioningRepository)
}
}

override def getPartitioningCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[ServiceError, Seq[CheckpointDTO]] = {

override def getPartitioningCheckpoints(
checkpointQueryDTO: CheckpointQueryDTO
): IO[ServiceError, Seq[CheckpointDTO]] = {
for {
checkpointsFromDB <- repositoryCall(
partitioningRepository.getPartitioningCheckpoints(checkpointQueryDTO), "getPartitioningCheckpoints"
)
checkpointDTOs <- ZIO.foreach(checkpointsFromDB) {
checkpointFromDB =>
ZIO.fromEither(CheckpointFromDB.toCheckpointDTO(checkpointQueryDTO.partitioning, checkpointFromDB))
.mapError(error => ServiceError(error.getMessage))
ZIO.fromEither(CheckpointFromDBObject.toCheckpointDTO(checkpointQueryDTO.partitioning, checkpointFromDB))
.mapError(error => ServiceError(error.getMessage))
}
} yield checkpointDTOs

}

}

object PartitioningServiceImpl {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ case class CheckpointFromDB(
checkpointEndTime: Option[ZonedDateTime]
)

object CheckpointFromDB {
object CheckpointFromDBObject {

def toCheckpointDTO(partitioning: PartitioningDTO, checkpointQueryResult: CheckpointFromDB
): Either[DecodingFailure, CheckpointDTO] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,23 +147,23 @@ trait TestData {


// CheckpointMeasurement DTO
protected val CheckpointFromDB1: CheckpointFromDB = CheckpointFromDB(
protected val checkpointFromDB1: CheckpointFromDB = CheckpointFromDB(
idCheckpoint = UUID.randomUUID(),
checkpointName = "name",
author = "author",
measureName = measureDTO1.measureName,
measuredColumns = measureDTO1.measuredColumns,
measuredColumns = Seq(measureDTO1.measuredColumns.toString()),
measurementValue = defaultJson,
checkpointStartTime = ZonedDateTime.now(),
checkpointEndTime = Some(ZonedDateTime.now())
)

protected val CheckpointFromDB2: CheckpointFromDB = CheckpointFromDB(
protected val checkpointFromDB2: CheckpointFromDB = CheckpointFromDB(
idCheckpoint = UUID.randomUUID(),
checkpointName = "name",
author = "author",
measureName = measureDTO2.measureName,
measuredColumns = measureDTO2.measuredColumns,
measuredColumns = Seq(measureDTO2.measuredColumns.toString()),
measurementValue = defaultJson,
checkpointStartTime = ZonedDateTime.now(),
checkpointEndTime = Some(ZonedDateTime.now())
Expand Down Expand Up @@ -210,6 +210,7 @@ trait TestData {
id = UUID.randomUUID(),
name = "name",
author = "author",
measuredByAtumAgent = true,
partitioning = Seq.empty,
processStartTime = ZonedDateTime.now(),
processEndTime = None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import za.co.absa.atum.server.model.CheckpointFromDB
import zio.test.Assertion.failsWithA
import zio.{Scope, ZIO}
import zio.test._

import doobie.postgres.implicits._
import doobie.postgres.circe.jsonb.implicits.jsonbGet
import za.co.absa.atum.server.api.database.DoobieImplicits.Sequence._
Expand All @@ -18,8 +19,8 @@ object GetPartitioningCheckpointsIntegrationTests extends ConfigProviderTest {
override def spec: Spec[TestEnvironment with Scope, Any] = {

val partitioningDTO1: PartitioningDTO = Seq(
PartitionDTO("string1", "string1"),
PartitionDTO("string2", "string2")
PartitionDTO("stringA", "stringA"),
PartitionDTO("stringB", "stringB")
)

suite("GetPartitioningCheckpointsIntegrationTests")(
Expand All @@ -29,7 +30,7 @@ object GetPartitioningCheckpointsIntegrationTests extends ConfigProviderTest {
limit = Some(10),
checkpointName = Some("checkpointName")
)
// Read[CheckpointMeasurements] implicit validation
// Read[CheckpointFromDB] implicit validation
Read[CheckpointFromDB]

for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@ package za.co.absa.atum.server.api.repository
import org.junit.runner.RunWith
import org.mockito.Mockito.{mock, when}
import za.co.absa.atum.model.dto.{AdditionalDataDTO, MeasureDTO}
import za.co.absa.atum.server.api.database.runs.functions.{CreateOrUpdateAdditionalData, CreatePartitioningIfNotExists, GetPartitioningAdditionalData, GetPartitioningCheckpoints, GetPartitioningMeasures}
import za.co.absa.atum.server.api.database.runs.functions.{
CreateOrUpdateAdditionalData,
CreatePartitioningIfNotExists,
GetPartitioningAdditionalData,
GetPartitioningCheckpoints,
GetPartitioningMeasures}
import za.co.absa.atum.server.api.exception.DatabaseError
import za.co.absa.atum.server.api.TestData
import za.co.absa.atum.server.model.CheckpointMeasurements
import za.co.absa.atum.server.model.CheckpointFromDB
import za.co.absa.fadb.exceptions.ErrorInDataException
import za.co.absa.fadb.status.FunctionStatus
import zio._
Expand Down Expand Up @@ -75,7 +80,7 @@ class PartitioningRepositoryIntegrationTests extends ZIOSpecDefault with TestDat
// Get Partitioning Checkpoints Mocks
private val getPartitioningCheckpointsMock = mock(classOf[GetPartitioningCheckpoints])

when(getPartitioningCheckpointsMock.apply(checkpointQueryDTO1)).thenReturn(ZIO.succeed(Seq(checkpointMeasurements1)))
when(getPartitioningCheckpointsMock.apply(checkpointQueryDTO1)).thenReturn(ZIO.succeed(Seq(checkpointFromDB1)))
when(getPartitioningCheckpointsMock.apply(checkpointQueryDTO2)).thenReturn(ZIO.fail(DatabaseError("boom!")))
when(getPartitioningCheckpointsMock.apply(checkpointQueryDTO3)).thenReturn(ZIO.succeed(Seq.empty))

Expand Down Expand Up @@ -150,7 +155,7 @@ class PartitioningRepositoryIntegrationTests extends ZIOSpecDefault with TestDat
test("Returns expected Seq") {
for {
result <- PartitioningRepository.getPartitioningCheckpoints(checkpointQueryDTO1)
} yield assertTrue(result.isInstanceOf[Seq[CheckpointMeasurements]] && result == Seq(checkpointMeasurements1))
} yield assertTrue(result.isInstanceOf[Seq[CheckpointFromDB]] && result == Seq(checkpointFromDB1))
},
test("Returns expected DatabaseError") {
assertZIO(PartitioningRepository.getPartitioningCheckpoints(checkpointQueryDTO2).exit)(
Expand All @@ -160,7 +165,7 @@ class PartitioningRepositoryIntegrationTests extends ZIOSpecDefault with TestDat
test("Returns expected Seq.empty") {
for {
result <- PartitioningRepository.getPartitioningCheckpoints(checkpointQueryDTO3)
} yield assertTrue(result.isInstanceOf[Seq[CheckpointMeasurements]] && result.isEmpty)
} yield assertTrue(result.isInstanceOf[Seq[CheckpointFromDB]] && result.isEmpty)
}
)
).provide(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@

package za.co.absa.atum.server.api.service


import org.junit.runner.RunWith
import org.mockito.Mockito.{mock, when}
import za.co.absa.atum.model.dto.{AdditionalDataDTO, CheckpointDTO, MeasureDTO}
import za.co.absa.atum.server.api.TestData
import za.co.absa.atum.server.api.exception.{DatabaseError, ServiceError}
import za.co.absa.atum.server.api.repository.PartitioningRepository
import za.co.absa.atum.server.model.CheckpointMeasurements
import za.co.absa.fadb.exceptions.ErrorInDataException
import za.co.absa.fadb.status.FunctionStatus
import zio.test.Assertion.failsWithA
Expand Down Expand Up @@ -58,7 +58,7 @@ class PartitioningServiceIntegrationTests extends ZIOSpecDefault with TestData {
.thenReturn(ZIO.fail(DatabaseError("boom!")))

when(partitioningRepositoryMock.getPartitioningCheckpoints(checkpointQueryDTO1))
.thenReturn(ZIO.succeed(Seq(checkpointMeasurements1, checkpointMeasurements2)))
.thenReturn(ZIO.succeed(Seq(checkpointFromDB1, checkpointFromDB2)))
when(partitioningRepositoryMock.getPartitioningCheckpoints(checkpointQueryDTO2))
.thenReturn(ZIO.fail(DatabaseError("boom!")))

Expand Down Expand Up @@ -128,14 +128,14 @@ class PartitioningServiceIntegrationTests extends ZIOSpecDefault with TestData {
)
}
),

suite("GetPartitioningCheckpointsSuite")(
test("Returns expected Right with Seq[CheckpointMeasurements]") {
test("Returns expected Right with Seq[CheckpointDTO]") {
for {
result <- PartitioningService.getPartitioningCheckpoints(checkpointQueryDTO1)
} yield assertTrue{
println("Results: ", result)
result.isInstanceOf[Seq[CheckpointDTO]]
result == Seq(checkpointMeasurements1, checkpointMeasurements2)
result == Seq(checkpointDTO1, checkpointDTO2)
}
},
test("Returns expected ServiceError") {
Expand Down

0 comments on commit f5e3d4c

Please sign in to comment.