Skip to content

Commit

Permalink
#280: replacing HTTP backend, 'HttpURLConnectionBackend' doesn't supp…
Browse files Browse the repository at this point in the history
…ort PATCH; also updating config in our agent tests, also moving envelope related DTOs to model/ package
  • Loading branch information
lsulak committed Oct 2, 2024
1 parent 74f3837 commit ae9875d
Show file tree
Hide file tree
Showing 47 changed files with 100 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,18 @@
package za.co.absa.atum.agent.dispatcher

import com.typesafe.config.Config
import io.circe.syntax.EncoderOps
import org.apache.spark.internal.Logging
import sttp.capabilities
import sttp.client3._
import sttp.model.Uri
import sttp.client3.okhttp.OkHttpSyncBackend
import za.co.absa.atum.agent.exception.AtumAgentException.HttpException
import za.co.absa.atum.model.dto
import za.co.absa.atum.model.dto._
import za.co.absa.atum.model.envelopes.SuccessResponse.SingleSuccessResponse
import za.co.absa.atum.model.utils.DTOBase64Encoder.encodeDTO
import za.co.absa.atum.model.utils.JsonSyntaxExtensions._

import java.util.Base64

class HttpDispatcher(config: Config) extends Dispatcher(config) with Logging {
import HttpDispatcher._
Expand All @@ -50,7 +51,7 @@ class HttpDispatcher(config: Config) extends Dispatcher(config) with Logging {
.header("Content-Type", "application/json")
.response(asString)

private val backend = HttpURLConnectionBackend()
private val backend: SttpBackend[Identity, capabilities.WebSockets] = OkHttpSyncBackend()

logInfo("using http dispatcher")
logInfo(s"serverUrl $serverUrl")
Expand All @@ -67,7 +68,7 @@ class HttpDispatcher(config: Config) extends Dispatcher(config) with Logging {

val response = backend.send(request)

handleResponseBody(response).as[PartitioningWithIdDTO].id
handleResponseBody(response).as[SingleSuccessResponse[PartitioningWithIdDTO]].data.id
}

override protected[agent] def createPartitioning(partitioning: PartitioningSubmitDTO): AtumContextDTO = {
Expand Down Expand Up @@ -95,14 +96,15 @@ class HttpDispatcher(config: Config) extends Dispatcher(config) with Logging {
additionalDataPatchDTO: AdditionalDataPatchDTO
): AdditionalDataDTO = {
val partitioningId = getPartitioningId(partitioning)
log.debug(s"Got partitioning ID: '$partitioningId'")

val request = commonAtumRequest
.patch(createAdditionalDataEndpoint(partitioningId))
.body(additionalDataPatchDTO.asJsonString)

val response = backend.send(request)

handleResponseBody(response).as[AdditionalDataDTO]
handleResponseBody(response).as[SingleSuccessResponse[AdditionalDataDTO]].data
}

private def handleResponseBody(response: Response[Either[String, String]]): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.

atum.dispatcher.type="console"
# dispatcher to be used (http, console, capture)
atum.dispatcher.type="http"

# The REST API URI of the atum server
atum.dispatcher.http.url="http://localhost:8080"

# Maximum number of dispatch captures to keep in memory
#atum.dispatcher.capture.capture-limit=1000 # 0 means no limit

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package za.co.absa.atum.server.model
package za.co.absa.atum.model.envelopes

import io.circe._
import io.circe.generic.semiauto._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package za.co.absa.atum.server.model
package za.co.absa.atum.model.envelopes

sealed trait PaginatedResult[R] {
def data: Seq[R]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
* limitations under the License.
*/

package za.co.absa.atum.server.model
package za.co.absa.atum.model.envelopes

import io.circe.{Decoder, Encoder}
import io.circe.generic.semiauto.{deriveDecoder, deriveEncoder}
import io.circe.{Decoder, Encoder}

case class Pagination(
limit: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package za.co.absa.atum.server.model
package za.co.absa.atum.model.envelopes

import java.util.UUID

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package za.co.absa.atum.server.model
package za.co.absa.atum.model.envelopes

case class StatusResponse(status: String, message: String)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

package za.co.absa.atum.server.model
package za.co.absa.atum.model.envelopes

import io.circe._
import io.circe.generic.semiauto._
Expand Down
17 changes: 10 additions & 7 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ object Dependencies {

val sparkCommons = "0.6.1"

val sttp = "3.5.2"
val sttpClient = "3.5.2"
val sttpOkHttpBackend = "1.7.2"
val sttpCirceJson = "3.9.7"

val postgresql = "42.6.0"

Expand All @@ -56,7 +58,6 @@ object Dependencies {
val http4sBlazeBackend = "0.23.15"
val http4sPrometheus = "0.23.6"
val circeJson = "0.14.7"
val sttpCirceJson = "3.9.7"

val awssdk = "2.23.15"

Expand All @@ -83,7 +84,7 @@ object Dependencies {

Seq(
scalatest,
mockito,
mockito
)
}

Expand All @@ -97,7 +98,7 @@ object Dependencies {
Seq(
circeCore,
circeParser,
circeGeneric,
circeGeneric
)
}

Expand Down Expand Up @@ -198,7 +199,8 @@ object Dependencies {
lazy val sparkCommons = "za.co.absa" % s"spark-commons-spark${sparkMinorVersion}_$scalaMinorVersion" % Versions.sparkCommons
lazy val sparkCommonsTest = "za.co.absa" % s"spark-commons-test_$scalaMinorVersion" % Versions.sparkCommons % Test

lazy val sttp = "com.softwaremill.sttp.client3" %% "core" % Versions.sttp
lazy val sttpClient3 = "com.softwaremill.sttp.client3" %% "core" % Versions.sttpClient
lazy val sttpOkHttpBackend = "com.softwaremill.sttp.client3" %% "okhttp-backend" % Versions.sttpClient

lazy val logback = "ch.qos.logback" % "logback-classic" % Versions.logback

Expand All @@ -210,7 +212,8 @@ object Dependencies {
typeSafeConfig,
sparkCommons,
sparkCommonsTest,
sttp,
sttpClient3,
sttpOkHttpBackend,
logback,
nameOf
) ++
Expand Down Expand Up @@ -245,7 +248,7 @@ object Dependencies {
scalaTest,
balta,
circe,
parser,
parser
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@
package za.co.absa.atum.server.api.controller

import io.circe.{Decoder, parser}
import za.co.absa.atum.model.envelopes.{ConflictErrorResponse, ErrorInDataErrorResponse, ErrorResponse, InternalServerErrorResponse, NotFoundErrorResponse, PaginatedResult, Pagination}
import za.co.absa.atum.server.api.exception.ServiceError
import za.co.absa.atum.server.api.exception.ServiceError._
import za.co.absa.atum.server.api.http.ApiPaths
import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore}
import za.co.absa.atum.server.model.SuccessResponse._
import za.co.absa.atum.model.envelopes.PaginatedResult.{ResultHasMore, ResultNoMore}
import za.co.absa.atum.model.envelopes.SuccessResponse._
import za.co.absa.atum.server.model._
import zio._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package za.co.absa.atum.server.api.controller

import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO}
import za.co.absa.atum.server.model.ErrorResponse
import za.co.absa.atum.server.model.SuccessResponse.{PaginatedResponse, SingleSuccessResponse}
import za.co.absa.atum.model.envelopes.ErrorResponse
import za.co.absa.atum.model.envelopes.SuccessResponse.{PaginatedResponse, SingleSuccessResponse}
import zio.IO
import zio.macros.accessible

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
package za.co.absa.atum.server.api.controller

import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO}
import za.co.absa.atum.model.envelopes.{ErrorResponse, PaginatedResult}
import za.co.absa.atum.server.api.http.ApiPaths.V2Paths
import za.co.absa.atum.server.api.service.CheckpointService
import za.co.absa.atum.server.model.{ErrorResponse, PaginatedResult}
import za.co.absa.atum.server.model.SuccessResponse.{PaginatedResponse, SingleSuccessResponse}
import za.co.absa.atum.model.envelopes.SuccessResponse.{PaginatedResponse, SingleSuccessResponse}
import zio._

import java.util.UUID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package za.co.absa.atum.server.api.controller

import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointQueryDTO}
import za.co.absa.atum.server.model.ErrorResponse
import za.co.absa.atum.server.model.SuccessResponse.MultiSuccessResponse
import za.co.absa.atum.model.envelopes.ErrorResponse
import za.co.absa.atum.model.envelopes.SuccessResponse.MultiSuccessResponse
import zio.IO
import zio.macros.accessible

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
package za.co.absa.atum.server.api.controller

import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointQueryDTO}
import za.co.absa.atum.model.envelopes.ErrorResponse
import za.co.absa.atum.server.api.service.FlowService
import za.co.absa.atum.server.model.ErrorResponse
import za.co.absa.atum.server.model.SuccessResponse.MultiSuccessResponse
import za.co.absa.atum.model.envelopes.SuccessResponse.MultiSuccessResponse
import zio._

class FlowControllerImpl(flowService: FlowService) extends FlowController with BaseController {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package za.co.absa.atum.server.api.controller

import za.co.absa.atum.model.dto._
import za.co.absa.atum.server.model.ErrorResponse
import za.co.absa.atum.server.model.SuccessResponse._
import za.co.absa.atum.model.envelopes.ErrorResponse
import za.co.absa.atum.model.envelopes.SuccessResponse._
import zio.IO
import zio.macros.accessible

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,11 @@
package za.co.absa.atum.server.api.controller

import za.co.absa.atum.model.dto._
import za.co.absa.atum.model.envelopes.{ErrorResponse, GeneralErrorResponse, InternalServerErrorResponse, PaginatedResult}
import za.co.absa.atum.server.api.exception.ServiceError
import za.co.absa.atum.server.api.http.ApiPaths.V2Paths
import za.co.absa.atum.server.api.service.PartitioningService
import za.co.absa.atum.server.model.SuccessResponse._
import za.co.absa.atum.server.model.{ErrorResponse, GeneralErrorResponse, InternalServerErrorResponse, PaginatedResult}
import za.co.absa.atum.model.envelopes.SuccessResponse._
import zio._

class PartitioningControllerImpl(partitioningService: PartitioningService)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import za.co.absa.atum.server.model._
import sttp.tapir.typelevel.MatchType
import sttp.tapir.ztapir._
import sttp.tapir.{EndpointOutput, PublicEndpoint}
import za.co.absa.atum.model.envelopes.{BadRequestResponse, ConflictErrorResponse, ErrorInDataErrorResponse, ErrorResponse, GeneralErrorResponse, InternalServerErrorResponse, NotFoundErrorResponse}
import za.co.absa.atum.server.api.http.ApiPaths._

import java.util.UUID
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ import sttp.tapir.generic.auto.schemaForCaseClass
import sttp.tapir.json.circe.jsonBody
import sttp.tapir.ztapir._
import za.co.absa.atum.model.dto._
import za.co.absa.atum.server.model.{ErrorResponse, StatusResponse}
import za.co.absa.atum.server.model.SuccessResponse._
import za.co.absa.atum.model.envelopes.SuccessResponse._
import sttp.tapir.{PublicEndpoint, Validator, endpoint}
import za.co.absa.atum.server.api.http.ApiPaths.{Health, ZioMetrics, _}
import za.co.absa.atum.model.envelopes.{ErrorResponse, StatusResponse}
import za.co.absa.atum.server.api.http.ApiPaths._

import java.util.UUID

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ import sttp.tapir.server.interceptor.metrics.MetricsRequestInterceptor
import sttp.tapir.swagger.bundle.SwaggerInterpreter
import sttp.tapir.ztapir._
import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataPatchDTO, CheckpointV2DTO, PartitioningWithIdDTO}
import za.co.absa.atum.model.envelopes.{ErrorResponse, StatusResponse}
import za.co.absa.atum.server.api.controller.{CheckpointController, FlowController, PartitioningController}
import za.co.absa.atum.server.config.{HttpMonitoringConfig, JvmMonitoringConfig}
import za.co.absa.atum.server.model.{ErrorResponse, StatusResponse}
import za.co.absa.atum.server.model.SuccessResponse._
import za.co.absa.atum.model.envelopes.SuccessResponse._
import zio._
import zio.interop.catz._
import zio.metrics.connectors.prometheus.PrometheusPublisher
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import sttp.tapir.server.interceptor.decodefailure.DefaultDecodeFailureHandler.r
import sttp.tapir.server.interceptor.metrics.MetricsRequestInterceptor
import sttp.tapir.server.model.ValuedEndpointOutput
import sttp.tapir.ztapir.{headers, statusCode}
import za.co.absa.atum.server.model.BadRequestResponse
import za.co.absa.atum.model.envelopes.BadRequestResponse
import zio.interop.catz._

trait ServerOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package za.co.absa.atum.server.api.repository

import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO}
import za.co.absa.atum.model.envelopes.PaginatedResult
import za.co.absa.atum.server.api.exception.DatabaseError
import za.co.absa.atum.server.model.PaginatedResult
import zio._
import zio.macros.accessible

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,16 @@
package za.co.absa.atum.server.api.repository

import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO}
import za.co.absa.atum.model.envelopes.PaginatedResult
import za.co.absa.atum.server.api.database.runs.functions._
import za.co.absa.atum.server.api.database.runs.functions.GetPartitioningCheckpointV2.GetPartitioningCheckpointV2Args
import za.co.absa.atum.server.api.database.runs.functions.GetPartitioningCheckpoints.GetPartitioningCheckpointsArgs
import za.co.absa.atum.server.api.database.runs.functions.WriteCheckpointV2.WriteCheckpointArgs
import za.co.absa.atum.server.api.database.runs.functions.{WriteCheckpoint, WriteCheckpointV2}
import za.co.absa.atum.server.api.exception.DatabaseError
import za.co.absa.atum.server.api.exception.DatabaseError.GeneralDatabaseError
import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore}
import za.co.absa.atum.server.model.{CheckpointItemFromDB, PaginatedResult}
import za.co.absa.atum.model.envelopes.PaginatedResult.{ResultHasMore, ResultNoMore}
import za.co.absa.atum.server.model.CheckpointItemFromDB
import zio._
import zio.interop.catz.asyncInstance

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package za.co.absa.atum.server.api.repository

import za.co.absa.atum.model.dto._
import za.co.absa.atum.model.envelopes.PaginatedResult
import za.co.absa.atum.server.api.exception.DatabaseError
import za.co.absa.atum.server.model.PaginatedResult
import zio.IO
import zio.macros.accessible

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package za.co.absa.atum.server.api.repository

import za.co.absa.atum.model.dto._
import za.co.absa.atum.model.envelopes.PaginatedResult
import za.co.absa.atum.server.api.database.flows.functions.GetFlowPartitionings._
import za.co.absa.atum.server.api.database.runs.functions.CreateOrUpdateAdditionalData.CreateOrUpdateAdditionalDataArgs
import za.co.absa.atum.server.api.database.runs.functions._
Expand All @@ -26,7 +27,7 @@ import za.co.absa.atum.server.model._
import zio._
import zio.interop.catz.asyncInstance
import za.co.absa.atum.server.api.exception.DatabaseError.GeneralDatabaseError
import za.co.absa.atum.server.model.PaginatedResult.{ResultHasMore, ResultNoMore}
import za.co.absa.atum.model.envelopes.PaginatedResult.{ResultHasMore, ResultNoMore}

class PartitioningRepositoryImpl (
createPartitioningIfNotExistsFn: CreatePartitioningIfNotExists,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package za.co.absa.atum.server.api.service

import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointV2DTO}
import za.co.absa.atum.model.envelopes.PaginatedResult
import za.co.absa.atum.server.api.exception.ServiceError
import za.co.absa.atum.server.model.PaginatedResult
import zio.IO
import zio.macros.accessible

Expand Down
Loading

0 comments on commit ae9875d

Please sign in to comment.