Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#188: new Flow service in API v2 #195

Merged
merged 37 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
86397c2
#188: adding new controller / service / repository Flow, with server …
lsulak Apr 30, 2024
dab8751
#188: further PoC-ing
lsulak Apr 30, 2024
9678071
Merge remote-tracking branch 'refs/remotes/origin/master' into featur…
lsulak Apr 30, 2024
1fea27b
merge conflict resolution
lsulak Apr 30, 2024
61f8f47
Merge branch 'master' into feature/188-server-part-of-get-flow-checkp…
lsulak May 7, 2024
c1c10ce
Merge branch 'master' into feature/188-server-part-of-get-flow-checkp…
lsulak May 22, 2024
a8c8ead
#188: final implementation of desired functionality with adjusted dat…
lsulak May 23, 2024
95b4ba6
#188: implementing decoding of SupportValues as well, in measurements…
lsulak May 23, 2024
520cba8
#188: final refactoring
lsulak May 23, 2024
f840e5a
#188: tests, tests, tests
lsulak May 23, 2024
09249cb
Merge remote-tracking branch 'refs/remotes/origin/master' into featur…
lsulak May 23, 2024
4bf50a1
merge conflict resolution
lsulak May 23, 2024
163de13
#188: removing temporary code I used
lsulak May 23, 2024
5b21021
remove
lsulak May 23, 2024
dcac162
post-review improvements
lsulak May 24, 2024
9e0731b
Merge branch 'master' into feature/188-server-part-of-get-flow-checkp…
lsulak May 27, 2024
c2a672e
Merge branch 'master' into feature/188-server-part-of-get-flow-checkp…
lsulak May 28, 2024
f6d3400
Merge branch 'master' into feature/188-server-part-of-get-flow-checkp…
lsulak May 28, 2024
52f3c29
Merge branch 'master' into feature/188-server-part-of-get-flow-checkp…
lsulak Jun 5, 2024
54b8017
Merge branch 'master' into feature/188-server-part-of-get-flow-checkp…
lsulak Jun 7, 2024
71ee83d
post-review improvement
lsulak Jun 7, 2024
4805305
Merge branch 'master' into feature/188-server-part-of-get-flow-checkp…
lsulak Jun 12, 2024
52972c3
post-review improvement
lsulak Jun 12, 2024
e219d3f
Merge remote-tracking branch 'refs/remotes/origin/master' into featur…
lsulak Jun 13, 2024
bddf570
post-merge changes
lsulak Jun 13, 2024
1ce658b
#188: API changes in v2
lsulak Jun 13, 2024
0676967
#188: fix
lsulak Jun 13, 2024
d0ca3ae
#188: making APIv1 compatible again
lsulak Jun 13, 2024
b7310f7
#188: fixing mocked API tests
lsulak Jun 13, 2024
e346077
#188: fixing test data, removing duplicates - part of the recent post…
lsulak Jun 13, 2024
3c53a89
#188: fixing some ITs
lsulak Jun 13, 2024
464752a
#188: fixing all UTs for the server
lsulak Jun 13, 2024
92065f6
#188: optimization / refactoring
lsulak Jun 13, 2024
d6fe29d
#188: fixing the API & ITs
lsulak Jun 13, 2024
8d777f6
#188: moving the circe-related implicit conversions to a companion ob…
lsulak Jun 14, 2024
8e8b565
#188: one more integration test for the new API
lsulak Jun 14, 2024
38ca1bc
#188: not ITs, everything is being mocked and runs locally
lsulak Jun 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 26 additions & 20 deletions database/src/main/postgres/flows/V1.9.1__get_flow_checkpoints.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,20 @@
*/

CREATE OR REPLACE FUNCTION flows.get_flow_checkpoints(
IN i_partitioning_of_flow JSONB,
IN i_limit INT DEFAULT 5,
IN i_checkpoint_name TEXT DEFAULT NULL,
OUT status INTEGER,
OUT status_text TEXT,
OUT id_checkpoint UUID,
OUT checkpoint_name TEXT,
OUT measure_name TEXT,
OUT measured_columns TEXT[],
OUT measurement_value JSONB,
OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE,
OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE
IN i_partitioning_of_flow JSONB,
IN i_limit INT DEFAULT 5,
IN i_checkpoint_name TEXT DEFAULT NULL,
OUT status INTEGER,
OUT status_text TEXT,
OUT id_checkpoint UUID,
OUT checkpoint_name TEXT,
OUT author TEXT,
OUT measured_by_atum_agent BOOLEAN,
OUT measure_name TEXT,
OUT measured_columns TEXT[],
OUT measurement_value JSONB,
OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE,
OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE
) RETURNS SETOF record AS
$$
-------------------------------------------------------------------------------
Expand All @@ -53,14 +55,17 @@ $$
-- specifying `i_checkpoint_name` parameter
--
-- Returns:
-- status - Status code
-- status_text - Status text
-- id_checkpoint - id of retrieved checkpoint
-- checkpoint_name - name of retrieved checkpoint
-- measure_name - measure name associated with a given checkpoint
-- measured_columns - measure columns associated with a given checkpoint
-- measurement_value - measurement details associated with a given checkpoint
-- checkpoint_time - time
-- status - Status code
-- status_text - Status text
-- id_checkpoint - ID of retrieved checkpoint
-- checkpoint_name - Name of the retrieved checkpoint
-- author - Author of the checkpoint
-- measured_by_atum_agent - Flag indicating whether the checkpoint was measured by Atum Agent
-- (if false, data supplied manually)
-- measure_name - measure name associated with a given checkpoint
-- measured_columns - measure columns associated with a given checkpoint
-- measurement_value - measurement details associated with a given checkpoint
-- checkpoint_time - time
--
-- Status codes:
-- 11 - OK
Expand Down Expand Up @@ -89,6 +94,7 @@ BEGIN
RETURN QUERY
SELECT 11 AS status, 'OK' AS status_text,
CP.id_checkpoint, CP.checkpoint_name,
CP.created_by AS author, CP.measured_by_atum_agent,
MD.measure_name, MD.measured_columns,
M.measurement_value,
CP.process_start_time AS checkpoint_start_time, CP.process_end_time AS checkpoint_end_time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite {
assert(row1.getString("status_text").contains("OK"))
assert(row1.getUUID("id_checkpoint").contains(checkpointId))
assert(row1.getString("checkpoint_name").contains("CheckpointNameCntAndAvg"))
assert(row1.getString("author").contains("ObviouslySomeTest"))
assert(row1.getBoolean("measured_by_atum_agent").contains(true))
assert(row1.getOffsetDateTime("checkpoint_start_time").contains(startTime))
assert(row1.getOffsetDateTime("checkpoint_end_time").contains(endTime))

Expand All @@ -243,6 +245,8 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite {
assert(row2.getString("status_text").contains("OK"))
assert(row2.getUUID("id_checkpoint").contains(checkpointId))
assert(row2.getString("checkpoint_name").contains("CheckpointNameCntAndAvg"))
assert(row1.getString("author").contains("ObviouslySomeTest"))
assert(row1.getBoolean("measured_by_atum_agent").contains(true))
assert(row2.getOffsetDateTime("checkpoint_start_time").contains(startTime))
assert(row2.getOffsetDateTime("checkpoint_end_time").contains(endTime))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package za.co.absa.atum.model.dto

import io.circe.{Decoder, Encoder}

case class MeasureResultDTO(
mainValue: MeasureResultDTO.TypedValue,
supportValues: Map[String, MeasureResultDTO.TypedValue] = Map.empty
Expand All @@ -36,4 +38,29 @@ object MeasureResultDTO {
case object Double extends ResultValueType
}


implicit val encodeResultValueType: Encoder[MeasureResultDTO.ResultValueType] = Encoder.encodeString.contramap {
case MeasureResultDTO.ResultValueType.String => "String"
case MeasureResultDTO.ResultValueType.Long => "Long"
case MeasureResultDTO.ResultValueType.BigDecimal => "BigDecimal"
case MeasureResultDTO.ResultValueType.Double => "Double"
}

implicit val decodeResultValueType: Decoder[MeasureResultDTO.ResultValueType] = Decoder.decodeString.emap {
case "String" => Right(MeasureResultDTO.ResultValueType.String)
case "Long" => Right(MeasureResultDTO.ResultValueType.Long)
case "BigDecimal" => Right(MeasureResultDTO.ResultValueType.BigDecimal)
case "Double" => Right(MeasureResultDTO.ResultValueType.Double)
case other => Left(s"Cannot decode $other as ResultValueType")
}

implicit val encodeTypedValue: Encoder[MeasureResultDTO.TypedValue] =
Encoder.forProduct2("value", "valueType")(tv => (tv.value, tv.valueType))

implicit val decodeTypedValue: Decoder[MeasureResultDTO.TypedValue] =
Decoder.forProduct2("value", "valueType")(MeasureResultDTO.TypedValue.apply)

implicit val decodeMeasureResultDTO: Decoder[MeasureResultDTO] =
Decoder.forProduct2("mainValue", "supportValues")(MeasureResultDTO.apply)

}
4 changes: 2 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ object Dependencies {
json4sJackson,
json4sNative,
circeCore,
circeParser
circeParser,
)
}

Expand Down Expand Up @@ -252,7 +252,7 @@ object Dependencies {
jsonSerdeDependencies(scalaVersion)
}

def databaseDependencies: Seq[ModuleID] = {
def databaseDependencies: Seq[ModuleID] = {
lazy val scalaTest = "org.scalatest" %% "scalatest" % Versions.scalatest % Test
lazy val balta = "za.co.absa" %% "balta" % Versions.balta % Test

Expand Down
14 changes: 9 additions & 5 deletions server/src/main/scala/za/co/absa/atum/server/Constants.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,18 @@ object Constants {
final val V1 = "v1"
final val V2 = "v2"

final val CreatePartitioning = "createPartitioning"
final val CreateOrUpdateAdditionalData = "createOrUpdateAdditionalData"
final val CreateCheckpoint = "createCheckpoint"
final val GetPartitioningCheckpoints = "getPartitioningCheckpoints"
// todo to uppercase no hyphen for v1, backward compatibility
// todo - better API paths??
final val CreatePartitioning = "create-partitioning"
final val CreateCheckpoint = "create-checkpoint"

final val CreateOrUpdateAdditionalData = "upsert-additional-data"

final val GetPartitioningCheckpoints = "get-partitioning-checkpoints"
final val GetFlowCheckpoints = "get-flow-checkpoints"

final val Health = "health"
final val ZioMetrics = "zio-metrics"

}

final val SwaggerApiName = "Atum API"
Expand Down
9 changes: 7 additions & 2 deletions server/src/main/scala/za/co/absa/atum/server/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
package za.co.absa.atum.server

import za.co.absa.atum.server.api.controller._
import za.co.absa.atum.server.api.database.flows.functions.GetFlowCheckpoints
import za.co.absa.atum.server.api.database.{PostgresDatabaseProvider, TransactorProvider}
import za.co.absa.atum.server.api.database.runs.functions._
import za.co.absa.atum.server.api.http.Server
import za.co.absa.atum.server.api.repository.{CheckpointRepositoryImpl, PartitioningRepositoryImpl}
import za.co.absa.atum.server.api.service.{CheckpointServiceImpl, PartitioningServiceImpl}
import za.co.absa.atum.server.api.repository.{CheckpointRepositoryImpl, FlowRepositoryImpl, PartitioningRepositoryImpl}
import za.co.absa.atum.server.api.service.{CheckpointServiceImpl, FlowServiceImpl, PartitioningServiceImpl}
import za.co.absa.atum.server.aws.AwsSecretsProviderImpl
import za.co.absa.atum.server.config.JvmMonitoringConfig
import zio._
Expand All @@ -42,16 +43,20 @@ object Main extends ZIOAppDefault with Server {
.provide(
PartitioningControllerImpl.layer,
CheckpointControllerImpl.layer,
FlowControllerImpl.layer,
PartitioningServiceImpl.layer,
CheckpointServiceImpl.layer,
FlowServiceImpl.layer,
PartitioningRepositoryImpl.layer,
CheckpointRepositoryImpl.layer,
FlowRepositoryImpl.layer,
CreatePartitioningIfNotExists.layer,
GetPartitioningMeasures.layer,
GetPartitioningAdditionalData.layer,
CreateOrUpdateAdditionalData.layer,
GetPartitioningCheckpoints.layer,
WriteCheckpoint.layer,
GetFlowCheckpoints.layer,
PostgresDatabaseProvider.layer,
TransactorProvider.layer,
AwsSecretsProviderImpl.layer,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.server.api.controller

import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointQueryDTO}
import za.co.absa.atum.server.model.ErrorResponse.ErrorResponse
import za.co.absa.atum.server.model.SuccessResponse.MultiSuccessResponse
import zio.IO
import zio.macros.accessible

@accessible
trait FlowController {
def getFlowCheckpointsV2(checkpointQueryDTO: CheckpointQueryDTO): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.server.api.controller

import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointQueryDTO}
import za.co.absa.atum.server.api.service.FlowService
import za.co.absa.atum.server.model.ErrorResponse.ErrorResponse
import za.co.absa.atum.server.model.SuccessResponse.MultiSuccessResponse
import zio._

class FlowControllerImpl(flowService: FlowService) extends FlowController with BaseController {

override def getFlowCheckpointsV2(
checkpointQueryDTO: CheckpointQueryDTO
): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] = {
mapToMultiSuccessResponse(
serviceCall[Seq[CheckpointDTO], Seq[CheckpointDTO]](
flowService.getFlowCheckpoints(checkpointQueryDTO),
identity
)
)
}

}

object FlowControllerImpl {
val layer: URLayer[FlowService, FlowController] = ZLayer {
for {
flowService <- ZIO.service[FlowService]
} yield new FlowControllerImpl(flowService)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@

package za.co.absa.atum.server.api.controller

import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, AtumContextDTO, CheckpointDTO, CheckpointQueryDTO, PartitioningSubmitDTO}
import za.co.absa.atum.model.dto.{
AdditionalDataSubmitDTO,
AtumContextDTO,
CheckpointDTO,
CheckpointQueryDTO,
PartitioningSubmitDTO
}
import za.co.absa.atum.server.model.ErrorResponse.ErrorResponse
import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse}
import zio.IO
Expand All @@ -36,5 +42,7 @@ trait PartitioningController {
additionalData: AdditionalDataSubmitDTO
): IO[ErrorResponse, SingleSuccessResponse[AdditionalDataSubmitDTO]]

def getPartitioningCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]]
def getPartitioningCheckpointsV2(
checkpointQueryDTO: CheckpointQueryDTO
): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ class PartitioningControllerImpl(partitioningService: PartitioningService)
)
}

override def getPartitioningCheckpoints(
override def getPartitioningCheckpointsV2(
checkpointQueryDTO: CheckpointQueryDTO
): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] = {
mapToMultiSuccessResponse(
serviceCall[Seq[CheckpointDTO], Seq[CheckpointDTO]](
partitioningService.getPartitioningCheckpoints(checkpointQueryDTO),
checkpoints => checkpoints
identity
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@ import cats.Show
import cats.data.NonEmptyList
import doobie.{Get, Put}
import doobie.postgres.implicits._
import io.circe.{Decoder, Encoder}
import org.postgresql.jdbc.PgArray
import org.postgresql.util.PGobject
import za.co.absa.atum.model.dto.MeasureResultDTO

import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -155,31 +153,6 @@ object DoobieImplicits {
}
)
}

}

implicit val encodeResultValueType: Encoder[MeasureResultDTO.ResultValueType] = Encoder.encodeString.contramap {
case MeasureResultDTO.ResultValueType.String => "String"
case MeasureResultDTO.ResultValueType.Long => "Long"
case MeasureResultDTO.ResultValueType.BigDecimal => "BigDecimal"
case MeasureResultDTO.ResultValueType.Double => "Double"
}

implicit val decodeResultValueType: Decoder[MeasureResultDTO.ResultValueType] = Decoder.decodeString.emap {
case "String" => Right(MeasureResultDTO.ResultValueType.String)
case "Long" => Right(MeasureResultDTO.ResultValueType.Long)
case "BigDecimal" => Right(MeasureResultDTO.ResultValueType.BigDecimal)
case "Double" => Right(MeasureResultDTO.ResultValueType.Double)
case other => Left(s"Cannot decode $other as ResultValueType")
}

implicit val encodeTypedValue: Encoder[MeasureResultDTO.TypedValue] =
Encoder.forProduct2("value", "valueType")(tv => (tv.value, tv.valueType))

implicit val decodeTypedValue: Decoder[MeasureResultDTO.TypedValue] =
Decoder.forProduct2("value", "valueType")(MeasureResultDTO.TypedValue.apply)

implicit val decodeMeasureResultDTO: Decoder[MeasureResultDTO] =
Decoder.forProduct2("mainValue", "supportValues")(MeasureResultDTO.apply)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.server.api.database.flows

import za.co.absa.fadb.DBSchema
import za.co.absa.fadb.naming.implementations.SnakeCaseNaming.Implicits._

object Flows extends DBSchema
Loading
Loading