Skip to content

Commit

Permalink
incorporating statusCode dbCallWithStatus
Browse files Browse the repository at this point in the history
  • Loading branch information
TebaleloS committed Jul 19, 2024
1 parent 075c621 commit 933688b
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import za.co.absa.atum.model.dto.CheckpointQueryDTO
import za.co.absa.atum.server.api.database.PostgresDatabaseProvider
import za.co.absa.atum.server.api.database.runs.Runs
import za.co.absa.atum.server.model.{CheckpointFromDB, PartitioningForDB}
import za.co.absa.db.fadb.{DBSchema, status}
import za.co.absa.db.fadb.DBSchema
import za.co.absa.db.fadb.doobie.DoobieEngine
import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWithAggStatus
import zio._
Expand All @@ -33,9 +33,8 @@ import za.co.absa.atum.server.api.database.DoobieImplicits.Sequence.get
import doobie.postgres.implicits._
import doobie.postgres.circe.jsonb.implicits.jsonbPut
import doobie.postgres.circe.json.implicits.jsonGet
import za.co.absa.db.fadb.status.aggregation.implementations.{ByFirstErrorStatusAggregator, ByFirstRowStatusAggregator}
import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator
import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling
import za.co.absa.db.fadb.status.{FailedOrRow, FailedOrRows}

class GetPartitioningCheckpoints (implicit schema: DBSchema, dbEngine: DoobieEngine[Task])
extends DoobieMultipleResultFunctionWithAggStatus[CheckpointQueryDTO, CheckpointFromDB, Task] (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,27 @@ trait BaseRepository {
case Left(statusException) =>
ZIO.logError(
s"Exception caused by operation: '$operationName': " +
s"(${statusException.status.statusCode}) ${statusException.status.statusText}"
s"(${statusException.status.statusCode}), ${statusException.status.statusText}"
)
case Right(_) => ZIO.logDebug(s"Operation '$operationName' succeeded in database")
}
.mapError(error => DatabaseError(error.getMessage))
.mapError {
case statusException: StatusException =>
DatabaseError(
s"Exception caused by operation: '$operationName': " +
s"(${statusException.status.statusCode}) ${statusException.status.statusText}"
)
case error =>
DatabaseError(s"Operation '$operationName' failed with unexpected error: ${error.getMessage}")
}
.tapError(error => ZIO.logError(s"Operation '$operationName' failed: ${error.message}"))
.flatMap {
case Left(_) => fail(DatabaseError("Failed to execute operation in database"))
case Left(statusException) => fail(DatabaseError(
s"Failed to execute operation in database: " +
s"(${statusException.status.statusCode}) ${statusException.status.statusText}"
))
case Right(value) => succeed(value)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,19 @@
package za.co.absa.atum.server.api.repository

import shapeless.syntax.std.product.productOps
import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataSubmitDTO, CheckpointQueryDTO, MeasureDTO, PartitioningDTO, PartitioningSubmitDTO}
import za.co.absa.atum.server.api.database.runs.functions.{CreateOrUpdateAdditionalData, CreatePartitioningIfNotExists, GetPartitioningAdditionalData, GetPartitioningCheckpoints, GetPartitioningMeasures}
import za.co.absa.atum.model.dto.{
AdditionalDataDTO,
AdditionalDataSubmitDTO,
CheckpointQueryDTO,
MeasureDTO,
PartitioningDTO,
PartitioningSubmitDTO}
import za.co.absa.atum.server.api.database.runs.functions.{
CreateOrUpdateAdditionalData,
CreatePartitioningIfNotExists,
GetPartitioningAdditionalData,
GetPartitioningCheckpoints,
GetPartitioningMeasures}
import za.co.absa.atum.server.api.exception.DatabaseError
import za.co.absa.atum.server.model.CheckpointFromDB
import zio._
Expand Down Expand Up @@ -54,9 +65,18 @@ class PartitioningRepositoryImpl(
override def getPartitioningAdditionalData(partitioning: PartitioningDTO):
IO[DatabaseError, AdditionalDataDTO] = {
dbCallWithStatus(getPartitioningAdditionalDataFn(partitioning)
.map(_.toMap.getOrElse("additional_data", null)), "getPartitioningAdditionalData"
)
.map(_.toMap.getOrElse("additional_data", null)), "getPartitioningAdditionalData")
}
// override def getPartitioningAdditionalData(partitioning: PartitioningDTO): IO[DatabaseError, AdditionalDataDTO] = {
// dbCallWithStatus(getPartitioningAdditionalDataFn(partitioning)
// .map { data =>
// implicitly[ToMap[FailedOrRows[(String, Option[String])]]]
// .apply(data)
// .get(Symbol("additional_data"))
// .getOrElse(null)
// }, "getPartitioningAdditionalData"
// )
// }

override def getPartitioningCheckpoints(checkpointQueryDTO: CheckpointQueryDTO):
IO[DatabaseError, Seq[CheckpointFromDB]] = {
Expand Down

0 comments on commit 933688b

Please sign in to comment.