Skip to content

Commit

Permalink
Merge branch 'master' into feature/#59-Create-adjust-endpoint-for-rea…
Browse files Browse the repository at this point in the history
…ding-checkpoint-defintion-of-partioning
  • Loading branch information
TebaleloS authored Nov 9, 2023
2 parents c6bd426 + 8ad1b36 commit 4bd6d03
Showing 1 changed file with 21 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import za.co.absa.fadb.slick.FaDbPostgresProfile.api._
import za.co.absa.fadb.slick.{SlickFunctionWithStatusSupport, SlickPgEngine}
import za.co.absa.fadb.status.handling.implementations.StandardStatusHandling

import scala.reflect.ClassTag

class Runs (implicit dBEngine: SlickPgEngine) extends DBSchema{
import Runs._

Expand All @@ -35,33 +37,36 @@ class Runs (implicit dBEngine: SlickPgEngine) extends DBSchema{

object Runs {

private def scalaIterableToSQLArray(toConvert: Seq[Any]): String = {
SerializationUtils.asJson(toConvert)
.replace("[", "{")
.replace("]", "}")
private def nestedScalaSeqToPgArray[T <: AnyRef: ClassTag](toConvert: Seq[Seq[T]]): String = {
val scalaSeqJsonized = toConvert.map(scalaSeqToPgArray).mkString(",")
"{" + scalaSeqJsonized + "}"
}

private def scalaSeqToPgArray[T <: AnyRef: ClassTag](toConvert: Seq[T]): String = {
val scalaSeqJsonized = SerializationUtils.asJson(toConvert) // this also correctly escapes double quotes

// only the square brackets (indicating Seq, after the 'jsonization') have to be replaced with curly ones
"{" + scalaSeqJsonized.substring(1, scalaSeqJsonized.length - 1) + "}"
}

class WriteCheckpoint(implicit override val schema: DBSchema, override val dbEngine: SlickPgEngine)
extends DBSingleResultFunction[CheckpointDTO, Unit, SlickPgEngine]
with SlickFunctionWithStatusSupport[CheckpointDTO, Unit]
with StandardStatusHandling
{

/** Call the database function that create a checkpoint to the db **/
override protected def sql(values: CheckpointDTO): SQLActionBuilder = {
val partitioning = PartitioningForDB.fromSeqPartitionDTO(values.partitioning)
val partitioningNormalized = SerializationUtils.asJson(partitioning)

val measureNames = scalaIterableToSQLArray(values.measurements.map(_.measure.measureName))
val controlColumns = scalaIterableToSQLArray(values.measurements.map(_.measure.controlColumns))
val measureNames = values.measurements.map(_.measure.measureName)
val measureNamesNormalized = scalaSeqToPgArray(measureNames)

val controlColumns = values.measurements.map(_.measure.controlColumns)
val controlColumnsNormalized = nestedScalaSeqToPgArray(controlColumns)

val measureResults = values.measurements.map(_.result)
val measureResultsAsJsonString = SerializationUtils.asJson(measureResults)
.replace("[", "{")
.replace("]", "}")
println(measureResults)
println(SerializationUtils.asJson(measureResults))
println(measureResultsAsJsonString)
val measureResultsNormalized = measureResults.map(SerializationUtils.asJson)

sql"""SELECT #$selectEntry
FROM #$functionName(
Expand All @@ -70,9 +75,9 @@ object Runs {
${values.name},
${values.processStartTime}::TIMESTAMPTZ,
${values.processEndTime}::TIMESTAMPTZ,
$measureNames::TEXT[],
$controlColumns::TEXT[][],
'{}'::JSONB[],
$measureNamesNormalized::TEXT[],
$controlColumnsNormalized::TEXT[][],
$measureResultsNormalized::JSONB[],
${values.author}
) #$alias;"""
}
Expand Down

0 comments on commit 4bd6d03

Please sign in to comment.