Skip to content

Commit

Permalink
Merge branch 'master' into feature/211-release-notes-entry-enforcement
Browse files Browse the repository at this point in the history
  • Loading branch information
benedeki authored Jun 14, 2024
2 parents ac0738e + 470e091 commit 829b97c
Show file tree
Hide file tree
Showing 43 changed files with 914 additions and 233 deletions.
46 changes: 26 additions & 20 deletions database/src/main/postgres/flows/V1.9.1__get_flow_checkpoints.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,20 @@
*/

CREATE OR REPLACE FUNCTION flows.get_flow_checkpoints(
IN i_partitioning_of_flow JSONB,
IN i_limit INT DEFAULT 5,
IN i_checkpoint_name TEXT DEFAULT NULL,
OUT status INTEGER,
OUT status_text TEXT,
OUT id_checkpoint UUID,
OUT checkpoint_name TEXT,
OUT measure_name TEXT,
OUT measured_columns TEXT[],
OUT measurement_value JSONB,
OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE,
OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE
IN i_partitioning_of_flow JSONB,
IN i_limit INT DEFAULT 5,
IN i_checkpoint_name TEXT DEFAULT NULL,
OUT status INTEGER,
OUT status_text TEXT,
OUT id_checkpoint UUID,
OUT checkpoint_name TEXT,
OUT author TEXT,
OUT measured_by_atum_agent BOOLEAN,
OUT measure_name TEXT,
OUT measured_columns TEXT[],
OUT measurement_value JSONB,
OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE,
OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE
) RETURNS SETOF record AS
$$
-------------------------------------------------------------------------------
Expand All @@ -53,14 +55,17 @@ $$
-- specifying `i_checkpoint_name` parameter
--
-- Returns:
-- status - Status code
-- status_text - Status text
-- id_checkpoint - id of retrieved checkpoint
-- checkpoint_name - name of retrieved checkpoint
-- measure_name - measure name associated with a given checkpoint
-- measured_columns - measure columns associated with a given checkpoint
-- measurement_value - measurement details associated with a given checkpoint
-- checkpoint_time - time
-- status - Status code
-- status_text - Status text
-- id_checkpoint - ID of retrieved checkpoint
-- checkpoint_name - Name of the retrieved checkpoint
-- author - Author of the checkpoint
-- measured_by_atum_agent - Flag indicating whether the checkpoint was measured by Atum Agent
-- (if false, data supplied manually)
-- measure_name - measure name associated with a given checkpoint
-- measured_columns - measure columns associated with a given checkpoint
-- measurement_value - measurement details associated with a given checkpoint
-- checkpoint_time - time
--
-- Status codes:
-- 11 - OK
Expand Down Expand Up @@ -89,6 +94,7 @@ BEGIN
RETURN QUERY
SELECT 11 AS status, 'OK' AS status_text,
CP.id_checkpoint, CP.checkpoint_name,
CP.created_by AS author, CP.measured_by_atum_agent,
MD.measure_name, MD.measured_columns,
M.measurement_value,
CP.process_start_time AS checkpoint_start_time, CP.process_end_time AS checkpoint_end_time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite {
assert(row1.getString("status_text").contains("OK"))
assert(row1.getUUID("id_checkpoint").contains(checkpointId))
assert(row1.getString("checkpoint_name").contains("CheckpointNameCntAndAvg"))
assert(row1.getString("author").contains("ObviouslySomeTest"))
assert(row1.getBoolean("measured_by_atum_agent").contains(true))
assert(row1.getOffsetDateTime("checkpoint_start_time").contains(startTime))
assert(row1.getOffsetDateTime("checkpoint_end_time").contains(endTime))

Expand All @@ -243,6 +245,8 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite {
assert(row2.getString("status_text").contains("OK"))
assert(row2.getUUID("id_checkpoint").contains(checkpointId))
assert(row2.getString("checkpoint_name").contains("CheckpointNameCntAndAvg"))
assert(row1.getString("author").contains("ObviouslySomeTest"))
assert(row1.getBoolean("measured_by_atum_agent").contains(true))
assert(row2.getOffsetDateTime("checkpoint_start_time").contains(startTime))
assert(row2.getOffsetDateTime("checkpoint_end_time").contains(endTime))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package za.co.absa.atum.model.dto

import io.circe.{Decoder, Encoder}

case class MeasureResultDTO(
mainValue: MeasureResultDTO.TypedValue,
supportValues: Map[String, MeasureResultDTO.TypedValue] = Map.empty
Expand All @@ -36,4 +38,29 @@ object MeasureResultDTO {
case object Double extends ResultValueType
}


implicit val encodeResultValueType: Encoder[MeasureResultDTO.ResultValueType] = Encoder.encodeString.contramap {
case MeasureResultDTO.ResultValueType.String => "String"
case MeasureResultDTO.ResultValueType.Long => "Long"
case MeasureResultDTO.ResultValueType.BigDecimal => "BigDecimal"
case MeasureResultDTO.ResultValueType.Double => "Double"
}

implicit val decodeResultValueType: Decoder[MeasureResultDTO.ResultValueType] = Decoder.decodeString.emap {
case "String" => Right(MeasureResultDTO.ResultValueType.String)
case "Long" => Right(MeasureResultDTO.ResultValueType.Long)
case "BigDecimal" => Right(MeasureResultDTO.ResultValueType.BigDecimal)
case "Double" => Right(MeasureResultDTO.ResultValueType.Double)
case other => Left(s"Cannot decode $other as ResultValueType")
}

implicit val encodeTypedValue: Encoder[MeasureResultDTO.TypedValue] =
Encoder.forProduct2("value", "valueType")(tv => (tv.value, tv.valueType))

implicit val decodeTypedValue: Decoder[MeasureResultDTO.TypedValue] =
Decoder.forProduct2("value", "valueType")(MeasureResultDTO.TypedValue.apply)

implicit val decodeMeasureResultDTO: Decoder[MeasureResultDTO] =
Decoder.forProduct2("mainValue", "supportValues")(MeasureResultDTO.apply)

}
4 changes: 2 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ object Dependencies {
json4sJackson,
json4sNative,
circeCore,
circeParser
circeParser,
)
}

Expand Down Expand Up @@ -252,7 +252,7 @@ object Dependencies {
jsonSerdeDependencies(scalaVersion)
}

def databaseDependencies: Seq[ModuleID] = {
def databaseDependencies: Seq[ModuleID] = {
lazy val scalaTest = "org.scalatest" %% "scalatest" % Versions.scalatest % Test
lazy val balta = "za.co.absa" %% "balta" % Versions.balta % Test

Expand Down
14 changes: 9 additions & 5 deletions server/src/main/scala/za/co/absa/atum/server/Constants.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@ object Constants {
final val V1 = "v1"
final val V2 = "v2"

final val CreatePartitioning = "createPartitioning"
final val CreateOrUpdateAdditionalData = "createOrUpdateAdditionalData"
final val CreateCheckpoint = "createCheckpoint"
final val GetPartitioningCheckpoints = "getPartitioningCheckpoints"
// todo to uppercase no hyphen for v1, backward compatibility
// todo - better API paths??
final val CreatePartitioning = "create-partitioning"
final val CreateCheckpoint = "create-checkpoint"

final val CreateOrUpdateAdditionalData = "upsert-additional-data"

final val GetPartitioningCheckpoints = "get-partitioning-checkpoints"
final val GetFlowCheckpoints = "get-flow-checkpoints"

final val Health = "health"
final val ZioMetrics = "zio-metrics"

}

final val SwaggerApiName = "Atum API"
Expand Down
9 changes: 7 additions & 2 deletions server/src/main/scala/za/co/absa/atum/server/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
package za.co.absa.atum.server

import za.co.absa.atum.server.api.controller._
import za.co.absa.atum.server.api.database.flows.functions.GetFlowCheckpoints
import za.co.absa.atum.server.api.database.{PostgresDatabaseProvider, TransactorProvider}
import za.co.absa.atum.server.api.database.runs.functions._
import za.co.absa.atum.server.api.http.Server
import za.co.absa.atum.server.api.repository.{CheckpointRepositoryImpl, PartitioningRepositoryImpl}
import za.co.absa.atum.server.api.service.{CheckpointServiceImpl, PartitioningServiceImpl}
import za.co.absa.atum.server.api.repository.{CheckpointRepositoryImpl, FlowRepositoryImpl, PartitioningRepositoryImpl}
import za.co.absa.atum.server.api.service.{CheckpointServiceImpl, FlowServiceImpl, PartitioningServiceImpl}
import za.co.absa.atum.server.aws.AwsSecretsProviderImpl
import za.co.absa.atum.server.config.JvmMonitoringConfig
import zio._
Expand All @@ -42,16 +43,20 @@ object Main extends ZIOAppDefault with Server {
.provide(
PartitioningControllerImpl.layer,
CheckpointControllerImpl.layer,
FlowControllerImpl.layer,
PartitioningServiceImpl.layer,
CheckpointServiceImpl.layer,
FlowServiceImpl.layer,
PartitioningRepositoryImpl.layer,
CheckpointRepositoryImpl.layer,
FlowRepositoryImpl.layer,
CreatePartitioningIfNotExists.layer,
GetPartitioningMeasures.layer,
GetPartitioningAdditionalData.layer,
CreateOrUpdateAdditionalData.layer,
GetPartitioningCheckpoints.layer,
WriteCheckpoint.layer,
GetFlowCheckpoints.layer,
PostgresDatabaseProvider.layer,
TransactorProvider.layer,
AwsSecretsProviderImpl.layer,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.server.api.controller

import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointQueryDTO}
import za.co.absa.atum.server.model.ErrorResponse.ErrorResponse
import za.co.absa.atum.server.model.SuccessResponse.MultiSuccessResponse
import zio.IO
import zio.macros.accessible

@accessible
trait FlowController {
def getFlowCheckpointsV2(checkpointQueryDTO: CheckpointQueryDTO): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.server.api.controller

import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointQueryDTO}
import za.co.absa.atum.server.api.service.FlowService
import za.co.absa.atum.server.model.ErrorResponse.ErrorResponse
import za.co.absa.atum.server.model.SuccessResponse.MultiSuccessResponse
import zio._

class FlowControllerImpl(flowService: FlowService) extends FlowController with BaseController {

override def getFlowCheckpointsV2(
checkpointQueryDTO: CheckpointQueryDTO
): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] = {
mapToMultiSuccessResponse(
serviceCall[Seq[CheckpointDTO], Seq[CheckpointDTO]](
flowService.getFlowCheckpoints(checkpointQueryDTO),
identity
)
)
}

}

object FlowControllerImpl {
val layer: URLayer[FlowService, FlowController] = ZLayer {
for {
flowService <- ZIO.service[FlowService]
} yield new FlowControllerImpl(flowService)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@

package za.co.absa.atum.server.api.controller

import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, AtumContextDTO, CheckpointDTO, CheckpointQueryDTO, PartitioningSubmitDTO}
import za.co.absa.atum.model.dto.{
AdditionalDataSubmitDTO,
AtumContextDTO,
CheckpointDTO,
CheckpointQueryDTO,
PartitioningSubmitDTO
}
import za.co.absa.atum.server.model.ErrorResponse.ErrorResponse
import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse}
import zio.IO
Expand All @@ -36,5 +42,7 @@ trait PartitioningController {
additionalData: AdditionalDataSubmitDTO
): IO[ErrorResponse, SingleSuccessResponse[AdditionalDataSubmitDTO]]

def getPartitioningCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]]
def getPartitioningCheckpointsV2(
checkpointQueryDTO: CheckpointQueryDTO
): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ class PartitioningControllerImpl(partitioningService: PartitioningService)
)
}

override def getPartitioningCheckpoints(
override def getPartitioningCheckpointsV2(
checkpointQueryDTO: CheckpointQueryDTO
): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] = {
mapToMultiSuccessResponse(
serviceCall[Seq[CheckpointDTO], Seq[CheckpointDTO]](
partitioningService.getPartitioningCheckpoints(checkpointQueryDTO),
checkpoints => checkpoints
identity
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@ import cats.Show
import cats.data.NonEmptyList
import doobie.{Get, Put}
import doobie.postgres.implicits._
import io.circe.{Decoder, Encoder}
import org.postgresql.jdbc.PgArray
import org.postgresql.util.PGobject
import za.co.absa.atum.model.dto.MeasureResultDTO

import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -155,31 +153,6 @@ object DoobieImplicits {
}
)
}

}

implicit val encodeResultValueType: Encoder[MeasureResultDTO.ResultValueType] = Encoder.encodeString.contramap {
case MeasureResultDTO.ResultValueType.String => "String"
case MeasureResultDTO.ResultValueType.Long => "Long"
case MeasureResultDTO.ResultValueType.BigDecimal => "BigDecimal"
case MeasureResultDTO.ResultValueType.Double => "Double"
}

implicit val decodeResultValueType: Decoder[MeasureResultDTO.ResultValueType] = Decoder.decodeString.emap {
case "String" => Right(MeasureResultDTO.ResultValueType.String)
case "Long" => Right(MeasureResultDTO.ResultValueType.Long)
case "BigDecimal" => Right(MeasureResultDTO.ResultValueType.BigDecimal)
case "Double" => Right(MeasureResultDTO.ResultValueType.Double)
case other => Left(s"Cannot decode $other as ResultValueType")
}

implicit val encodeTypedValue: Encoder[MeasureResultDTO.TypedValue] =
Encoder.forProduct2("value", "valueType")(tv => (tv.value, tv.valueType))

implicit val decodeTypedValue: Decoder[MeasureResultDTO.TypedValue] =
Decoder.forProduct2("value", "valueType")(MeasureResultDTO.TypedValue.apply)

implicit val decodeMeasureResultDTO: Decoder[MeasureResultDTO] =
Decoder.forProduct2("mainValue", "supportValues")(MeasureResultDTO.apply)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.server.api.database.flows

import za.co.absa.fadb.DBSchema
import za.co.absa.fadb.naming.implementations.SnakeCaseNaming.Implicits._

object Flows extends DBSchema
Loading

0 comments on commit 829b97c

Please sign in to comment.