Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#188: new Flow service in API v2 #195

Merged
merged 37 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
86397c2
#188: adding new controller / service / repository Flow, with server …
lsulak Apr 30, 2024
dab8751
#188: further PoC-ing
lsulak Apr 30, 2024
9678071
Merge remote-tracking branch 'refs/remotes/origin/master' into featur…
lsulak Apr 30, 2024
1fea27b
merge conflict resolution
lsulak Apr 30, 2024
61f8f47
Merge branch 'master' into feature/188-server-part-of-get-flow-checkp…
lsulak May 7, 2024
c1c10ce
Merge branch 'master' into feature/188-server-part-of-get-flow-checkp…
lsulak May 22, 2024
a8c8ead
#188: final implementation of desired functionality with adjusted dat…
lsulak May 23, 2024
95b4ba6
#188: implementing decoding of SupportValues as well, in measurements…
lsulak May 23, 2024
520cba8
#188: final refactoring
lsulak May 23, 2024
f840e5a
#188: tests, tests, tests
lsulak May 23, 2024
09249cb
Merge remote-tracking branch 'refs/remotes/origin/master' into featur…
lsulak May 23, 2024
4bf50a1
merge conflict resolution
lsulak May 23, 2024
163de13
#188: removing temporary code I used
lsulak May 23, 2024
5b21021
remove
lsulak May 23, 2024
dcac162
post-review improvements
lsulak May 24, 2024
9e0731b
Merge branch 'master' into feature/188-server-part-of-get-flow-checkp…
lsulak May 27, 2024
c2a672e
Merge branch 'master' into feature/188-server-part-of-get-flow-checkp…
lsulak May 28, 2024
f6d3400
Merge branch 'master' into feature/188-server-part-of-get-flow-checkp…
lsulak May 28, 2024
52f3c29
Merge branch 'master' into feature/188-server-part-of-get-flow-checkp…
lsulak Jun 5, 2024
54b8017
Merge branch 'master' into feature/188-server-part-of-get-flow-checkp…
lsulak Jun 7, 2024
71ee83d
post-review improvement
lsulak Jun 7, 2024
4805305
Merge branch 'master' into feature/188-server-part-of-get-flow-checkp…
lsulak Jun 12, 2024
52972c3
post-review improvement
lsulak Jun 12, 2024
e219d3f
Merge remote-tracking branch 'refs/remotes/origin/master' into featur…
lsulak Jun 13, 2024
bddf570
post-merge changes
lsulak Jun 13, 2024
1ce658b
#188: API changes in v2
lsulak Jun 13, 2024
0676967
#188: fix
lsulak Jun 13, 2024
d0ca3ae
#188: making APIv1 compatible again
lsulak Jun 13, 2024
b7310f7
#188: fixing mocked API tests
lsulak Jun 13, 2024
e346077
#188: fixing test data, removing duplicates - part of the recent post…
lsulak Jun 13, 2024
3c53a89
#188: fixing some ITs
lsulak Jun 13, 2024
464752a
#188: fixing all UTs for the server
lsulak Jun 13, 2024
92065f6
#188: optimization / refactoring
lsulak Jun 13, 2024
d6fe29d
#188: fixing the API & ITs
lsulak Jun 13, 2024
8d777f6
#188: moving the circe-related implicit conversions to a companion ob…
lsulak Jun 14, 2024
8e8b565
#188: one more integration test for the new API
lsulak Jun 14, 2024
38ca1bc
#188: not ITs, everything is being mocked and runs locally
lsulak Jun 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 26 additions & 20 deletions database/src/main/postgres/flows/V1.9.1__get_flow_checkpoints.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,20 @@
*/

CREATE OR REPLACE FUNCTION flows.get_flow_checkpoints(
IN i_partitioning_of_flow JSONB,
IN i_limit INT DEFAULT 5,
IN i_checkpoint_name TEXT DEFAULT NULL,
OUT status INTEGER,
OUT status_text TEXT,
OUT id_checkpoint UUID,
OUT checkpoint_name TEXT,
OUT measure_name TEXT,
OUT measure_columns TEXT[],
OUT measurement_value JSONB,
OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE,
OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE
IN i_partitioning_of_flow JSONB,
IN i_limit INT DEFAULT 5,
IN i_checkpoint_name TEXT DEFAULT NULL,
OUT status INTEGER,
OUT status_text TEXT,
OUT id_checkpoint UUID,
OUT checkpoint_name TEXT,
OUT author TEXT,
OUT measured_by_atum_agent BOOLEAN,
OUT measure_name TEXT,
OUT measured_columns TEXT[],
OUT measurement_value JSONB,
OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE,
OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE
) RETURNS SETOF record AS
$$
-------------------------------------------------------------------------------
Expand All @@ -53,14 +55,17 @@ $$
-- specifying `i_checkpoint_name` parameter
--
-- Returns:
-- status - Status code
-- status_text - Status text
-- id_checkpoint - id of retrieved checkpoint
-- checkpoint_name - name of retrieved checkpoint
-- measure_name - measure name associated with a given checkpoint
-- measure_columns - measure columns associated with a given checkpoint
-- measurement_value - measurement details associated with a given checkpoint
-- checkpoint_time - time
-- status - Status code
-- status_text - Status text
-- id_checkpoint - ID of retrieved checkpoint
-- checkpoint_name - Name of the retrieved checkpoint
-- author - Author of the checkpoint
-- measured_by_atum_agent - Flag indicating whether the checkpoint was measured by Atum Agent
-- (if false, data supplied manually)
-- measure_name - measure name associated with a given checkpoint
-- measured_columns - measure columns associated with a given checkpoint
-- measurement_value - measurement details associated with a given checkpoint
-- checkpoint_time - time
--
-- Status codes:
-- 11 - OK
Expand Down Expand Up @@ -89,6 +94,7 @@ BEGIN
RETURN QUERY
SELECT 11 AS status, 'OK' AS status_text,
CP.id_checkpoint, CP.checkpoint_name,
CP.created_by AS author, CP.measured_by_atum_agent,
MD.measure_name, MD.measured_columns,
M.measurement_value,
CP.process_start_time AS checkpoint_start_time, CP.process_end_time AS checkpoint_end_time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,14 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite {
assert(row1.getString("status_text").contains("OK"))
assert(row1.getUUID("id_checkpoint").contains(checkpointId))
assert(row1.getString("checkpoint_name").contains("CheckpointNameCntAndAvg"))
assert(row1.getString("author").contains("ObviouslySomeTest"))
assert(row1.getBoolean("measured_by_atum_agent").contains(true))
assert(row1.getOffsetDateTime("checkpoint_start_time").contains(startTime))
assert(row1.getOffsetDateTime("checkpoint_end_time").contains(endTime))

val measure1 = MeasuredDetails(
row1.getString("measure_name").get,
row1.getArray[String]("measure_columns").map(_.toList).get,
row1.getArray[String]("measured_columns").map(_.toList).get,
row1.getJsonB("measurement_value").get
)

Expand All @@ -243,12 +245,14 @@ class GetFlowCheckpointsIntegrationTests extends DBTestSuite {
assert(row2.getString("status_text").contains("OK"))
assert(row2.getUUID("id_checkpoint").contains(checkpointId))
assert(row2.getString("checkpoint_name").contains("CheckpointNameCntAndAvg"))
assert(row1.getString("author").contains("ObviouslySomeTest"))
assert(row1.getBoolean("measured_by_atum_agent").contains(true))
assert(row2.getOffsetDateTime("checkpoint_start_time").contains(startTime))
assert(row2.getOffsetDateTime("checkpoint_end_time").contains(endTime))

val measure2 = MeasuredDetails(
row2.getString("measure_name").get,
row2.getArray[String]("measure_columns").map(_.toList).get,
row2.getArray[String]("measured_columns").map(_.toList).get,
row2.getJsonB("measurement_value").get
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.model.dto

case class CheckpointQueryDTO(
partitioning: PartitioningDTO,
limit: Option[Int],
checkpointName: Option[String]
)
12 changes: 10 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ object Dependencies {
val balta = "0.1.0"

val jacksonModuleScala = "2.14.2"
val circeVersion = "0.14.5"

val specs2 = "4.10.0"
val typesafeConfig = "1.4.2"
Expand Down Expand Up @@ -115,12 +116,17 @@ object Dependencies {
lazy val json4sJackson = "org.json4s" %% "json4s-jackson" % json4sVersion
lazy val json4sNative = "org.json4s" %% "json4s-native" % json4sVersion % Provided

lazy val circeCore = "io.circe" %% "circe-core" % Versions.circeVersion
lazy val circeParser = "io.circe" %% "circe-parser" % Versions.circeVersion

Seq(
jacksonModuleScala,
json4sExt,
json4sCore,
json4sJackson,
json4sNative
json4sNative,
circeCore,
circeParser,
)
}

Expand Down Expand Up @@ -163,6 +169,7 @@ object Dependencies {

// Fa-db
lazy val faDbDoobie = faDbOrg %% "doobie" % Versions.fadb
lazy val pgCirceDoobie = "org.tpolecat" %% "doobie-postgres-circe" % "1.0.0-RC2"

// aws
lazy val awsSecretsManagerSdk = awsSdkOrg % "secretsmanager" % Versions.awssdk
Expand All @@ -175,6 +182,7 @@ object Dependencies {

Seq(
faDbDoobie,
pgCirceDoobie,
zioCore,
zioMacros,
zioLogging,
Expand Down Expand Up @@ -244,7 +252,7 @@ object Dependencies {
jsonSerdeDependencies(scalaVersion)
}

def databaseDependencies: Seq[ModuleID] = {
def databaseDependencies: Seq[ModuleID] = {
lazy val scalaTest = "org.scalatest" %% "scalatest" % Versions.scalatest % Test
lazy val balta = "za.co.absa" %% "balta" % Versions.balta % Test

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ object Constants {

final val Api = "api"
final val V1 = "v1"
final val V2 = "v2"

final val CreatePartitioning = "createPartitioning"
final val CreateOrUpdateAdditionalData = "createOrUpdateAdditionalData"
final val CreateCheckpoint = "createCheckpoint"
final val GetFlowCheckpoints = "getFlowCheckpoints"
Copy link
Collaborator

@salamonpavel salamonpavel Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these v2 endpoint paths should follow this naming convention (kebab-case) 'get-flow-checkpoints'. Of course it's a topic for a team discussion.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay. Once your envelope PR and FOrgiveness' PR will be merged to master, thus propagated here, I'll change it


final val Health = "health"
final val ZioMetrics = "zio-metrics"
Expand Down
9 changes: 7 additions & 2 deletions server/src/main/scala/za/co/absa/atum/server/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
package za.co.absa.atum.server

import za.co.absa.atum.server.api.controller._
import za.co.absa.atum.server.api.database.flows.functions.GetFlowCheckpoints
import za.co.absa.atum.server.api.database.{PostgresDatabaseProvider, TransactorProvider}
import za.co.absa.atum.server.api.database.runs.functions._
import za.co.absa.atum.server.api.http.Server
import za.co.absa.atum.server.api.repository.{CheckpointRepositoryImpl, PartitioningRepositoryImpl}
import za.co.absa.atum.server.api.service.{CheckpointServiceImpl, PartitioningServiceImpl}
import za.co.absa.atum.server.api.repository.{CheckpointRepositoryImpl, FlowRepositoryImpl, PartitioningRepositoryImpl}
import za.co.absa.atum.server.api.service.{CheckpointServiceImpl, FlowServiceImpl, PartitioningServiceImpl}
import za.co.absa.atum.server.aws.AwsSecretsProviderImpl
import za.co.absa.atum.server.config.JvmMonitoringConfig
import zio._
Expand All @@ -42,15 +43,19 @@ object Main extends ZIOAppDefault with Server {
.provide(
PartitioningControllerImpl.layer,
CheckpointControllerImpl.layer,
FlowControllerImpl.layer,
PartitioningServiceImpl.layer,
CheckpointServiceImpl.layer,
FlowServiceImpl.layer,
PartitioningRepositoryImpl.layer,
CheckpointRepositoryImpl.layer,
FlowRepositoryImpl.layer,
CreatePartitioningIfNotExists.layer,
GetPartitioningMeasures.layer,
GetPartitioningAdditionalData.layer,
CreateOrUpdateAdditionalData.layer,
WriteCheckpoint.layer,
GetFlowCheckpoints.layer,
PostgresDatabaseProvider.layer,
TransactorProvider.layer,
AwsSecretsProviderImpl.layer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,21 @@ import zio._

trait BaseController {

def serviceCall[A, B](
serviceCall: IO[ServiceError, A],
onSuccessFnc: A => B
): IO[ErrorResponse, B] = {

serviceCall
.mapError { serviceError: ServiceError =>
InternalServerErrorResponse(serviceError.message)
}
.flatMap {
result => ZIO.succeed(onSuccessFnc(result))
}

}

def serviceCallWithStatus[A, B](
serviceCall: IO[ServiceError, Either[StatusException, A]],
onSuccessFnc: A => B
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.server.api.controller

import za.co.absa.atum.model.dto.{CheckpointQueryDTO, CheckpointDTO}
import za.co.absa.atum.server.model.ErrorResponse
import zio.IO
import zio.macros.accessible

@accessible
trait FlowController {
def getFlowCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[ErrorResponse, Seq[CheckpointDTO]]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed we don't have any documentation in our traits for Controllers/Services/Repositories. I am wondering whether it would be useful to introduce it ....

Copy link
Collaborator Author

@lsulak lsulak Jun 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know. I find it quite useless, since all this is I think quite obvious, naming and implementation wise. If you disagree let's initiate the conversation about it on our Curation tech meeting / chat though :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's bring this topic up on one of upcoming team meetings then.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.server.api.controller

import za.co.absa.atum.model.dto.{CheckpointQueryDTO, CheckpointDTO}
import za.co.absa.atum.server.api.service.FlowService
import za.co.absa.atum.server.model.ErrorResponse
import zio._

class FlowControllerImpl(flowService: FlowService)
extends FlowController with BaseController {

override def getFlowCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[ErrorResponse, Seq[CheckpointDTO]] = {
serviceCall[Seq[CheckpointDTO], Seq[CheckpointDTO]](
flowService.getFlowCheckpoints(checkpointQueryDTO),
r => r
TebaleloS marked this conversation as resolved.
Show resolved Hide resolved
)
}

}

object FlowControllerImpl {
val layer: URLayer[FlowService, FlowController] = ZLayer {
for {
flowService <- ZIO.service[FlowService]
} yield new FlowControllerImpl(flowService)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ package za.co.absa.atum.server.api.database

import cats.Show
import cats.data.NonEmptyList

import doobie.{Get, Put}
import doobie.postgres.implicits._

import doobie.{Get, Put}
import io.circe.{Decoder, Encoder}
import org.postgresql.jdbc.PgArray
import org.postgresql.util.PGobject
import za.co.absa.atum.model.dto.MeasureResultDTO

import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -155,7 +155,29 @@ object DoobieImplicits {
}
)
}
}

implicit val encodeResultValueType: Encoder[MeasureResultDTO.ResultValueType] = Encoder.encodeString.contramap {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the motivation to place json related encoders/decoders alongside doobie implicits?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand the question, those JSON related SerDe code was already there & I needed those MeasureResult DTOs to be serialized/deserialized as well

Copy link
Collaborator

@salamonpavel salamonpavel Jun 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DoobieImplicits object is there for defining Put/Get/Read/Write instances for Doobie. Then we have PlayJsonImplicits for Reads/Writes/Format type classes for Play Json. And what you have defined is actually related to Circe.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aaah. Yes, I'm sorry I understand now. I'll move them to CirceImplicits.scala

I know that I could move them directly to CheckpointFromDB.scala, but I anticipate that @TebaleloS will create a bunch of them later as well, so it might be a good idea for them to be centralized

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's customary to place them in companion objects.

Copy link
Collaborator Author

@lsulak lsulak Jun 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved into companion object of a given DTO, thanks for the recommendation

case MeasureResultDTO.ResultValueType.String => "String"
case MeasureResultDTO.ResultValueType.Long => "Long"
case MeasureResultDTO.ResultValueType.BigDecimal => "BigDecimal"
case MeasureResultDTO.ResultValueType.Double => "Double"
}

implicit val decodeResultValueType: Decoder[MeasureResultDTO.ResultValueType] = Decoder.decodeString.emap {
case "String" => Right(MeasureResultDTO.ResultValueType.String)
case "Long" => Right(MeasureResultDTO.ResultValueType.Long)
case "BigDecimal" => Right(MeasureResultDTO.ResultValueType.BigDecimal)
case "Double" => Right(MeasureResultDTO.ResultValueType.Double)
case other => Left(s"Cannot decode $other as ResultValueType")
}

implicit val encodeTypedValue: Encoder[MeasureResultDTO.TypedValue] =
Encoder.forProduct2("value", "valueType")(tv => (tv.value, tv.valueType))

implicit val decodeTypedValue: Decoder[MeasureResultDTO.TypedValue] =
Decoder.forProduct2("value", "valueType")(MeasureResultDTO.TypedValue.apply)

implicit val decodeMeasureResultDTO: Decoder[MeasureResultDTO] =
Decoder.forProduct2("mainValue", "supportValues")(MeasureResultDTO.apply)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.server.api.database.flows

import za.co.absa.fadb.DBSchema
import za.co.absa.fadb.naming.implementations.SnakeCaseNaming.Implicits._

object Flows extends DBSchema
Loading
Loading