Skip to content

Commit

Permalink
google pub sub and cloud storage: Scala 3 (#3350)
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren authored Jan 30, 2025
1 parent d9565ac commit 698075d
Show file tree
Hide file tree
Showing 15 changed files with 79 additions and 50 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/check-build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ jobs:
# v4.1.1
uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11
with: # https://github.com/olafurpg/setup-scala#faster-checkout-of-big-repos
fetch-depth: 100
fetch-tags: true
# we don't know what commit the last tag was it's safer to get entire repo so previousStableVersion resolves
fetch-depth: 0

- name: Cache Coursier cache
# https://github.com/coursier/cache-action/releases
Expand Down
24 changes: 20 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,11 @@ lazy val googleCommon = alpakkaProject(
"google.common",
Dependencies.GoogleCommon,
Test / fork := true,
headerSources / excludeFilter := HiddenFileFilter || "JwtSprayJsonParser.scala"
headerSources / excludeFilter := HiddenFileFilter || "JwtSprayJsonParser.scala",
Scala3.settings,
mimaPreviousArtifacts :=
(if (scalaBinaryVersion.value == "3") Set.empty // No previous Scala 3 artifacts, drop once there are
else mimaPreviousArtifacts.value)
)

lazy val googleCloudBigQuery = alpakkaProject(
Expand Down Expand Up @@ -246,7 +250,11 @@ lazy val googleCloudPubSub = alpakkaProject(
Dependencies.GooglePubSub,
Test / fork := true,
// See docker-compose.yml gcloud-pubsub-emulator_prep
Test / envVars := Map("PUBSUB_EMULATOR_HOST" -> "localhost", "PUBSUB_EMULATOR_PORT" -> "8538")
Test / envVars := Map("PUBSUB_EMULATOR_HOST" -> "localhost", "PUBSUB_EMULATOR_PORT" -> "8538"),
Scala3.settings,
mimaPreviousArtifacts :=
(if (scalaBinaryVersion.value == "3") Set.empty // No previous Scala 3 artifacts, drop once there are
else mimaPreviousArtifacts.value)
).dependsOn(googleCommon)

lazy val googleCloudPubSubGrpc = alpakkaProject(
Expand All @@ -262,14 +270,22 @@ lazy val googleCloudPubSubGrpc = alpakkaProject(
"-Wconf:src=.+/akka-grpc/main/.+:s",
"-Wconf:src=.+/akka-grpc/test/.+:s"
),
compile / javacOptions := (compile / javacOptions).value.filterNot(_ == "-Xlint:deprecation")
compile / javacOptions := (compile / javacOptions).value.filterNot(_ == "-Xlint:deprecation"),
Scala3.settings,
mimaPreviousArtifacts :=
(if (scalaBinaryVersion.value == "3") Set.empty // No previous Scala 3 artifacts, drop once there are
else mimaPreviousArtifacts.value)
).enablePlugins(AkkaGrpcPlugin).dependsOn(googleCommon)

lazy val googleCloudStorage = alpakkaProject(
"google-cloud-storage",
"google.cloud.storage",
Test / fork := true,
Dependencies.GoogleStorage
Dependencies.GoogleStorage,
Scala3.settings,
mimaPreviousArtifacts :=
(if (scalaBinaryVersion.value == "3") Set.empty // No previous Scala 3 artifacts, drop once there are
else mimaPreviousArtifacts.value)
).dependsOn(googleCommon)

lazy val googleFcm = alpakkaProject("google-fcm", "google.firebase.fcm", Dependencies.GoogleFcm, Test / fork := true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ private[pubsub] trait PubSubApi {
.mapMaterializedValue(_ => NotUsed)

private implicit val pullResponseUnmarshaller: FromResponseUnmarshaller[PullResponse] =
Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse =>
Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) =>
response.status match {
case StatusCodes.Success(_) if response.entity.contentType == ContentTypes.`application/json` =>
Unmarshal(response.entity).to[PullResponse]
Expand Down Expand Up @@ -208,7 +208,7 @@ private[pubsub] trait PubSubApi {
.mapMaterializedValue(_ => NotUsed)

private implicit val acknowledgeResponseUnmarshaller: FromResponseUnmarshaller[Done] =
Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse =>
Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) =>
response.status match {
case StatusCodes.Success(_) =>
response.discardEntityBytes().future
Expand Down Expand Up @@ -261,7 +261,7 @@ private[pubsub] trait PubSubApi {
publish(topic, parallelism, None)

private implicit val publishResponseUnmarshaller: FromResponseUnmarshaller[PublishResponse] =
Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse =>
Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) =>
response.status match {
case StatusCodes.Success(_) if response.entity.contentType == ContentTypes.`application/json` =>
Unmarshal(response.entity).to[PublishResponse]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,15 @@ object PubSubConfig {
pullReturnImmediately = true,
pullMaxMessagesPerInternalBatch = 1000,
Some(
GoogleSettings().copy(
projectId = projectId,
credentials =
ServiceAccountCredentials(projectId, clientEmail, privateKey, Seq("https://www.googleapis.com/auth/pubsub"))
)
GoogleSettings
.apply()
.copy(
projectId = projectId,
credentials = ServiceAccountCredentials(projectId,
clientEmail,
privateKey,
Seq("https://www.googleapis.com/auth/pubsub"))
)
)
)

Expand All @@ -85,11 +89,15 @@ object PubSubConfig {
pullReturnImmediately = pullReturnImmediately,
pullMaxMessagesPerInternalBatch = pullMaxMessagesPerInternalBatch,
Some(
GoogleSettings().copy(
projectId = projectId,
credentials =
ServiceAccountCredentials(projectId, clientEmail, privateKey, Seq("https://www.googleapis.com/auth/pubsub"))
)
GoogleSettings
.apply()
.copy(
projectId = projectId,
credentials = ServiceAccountCredentials(projectId,
clientEmail,
privateKey,
Seq("https://www.googleapis.com/auth/pubsub"))
)
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,21 @@ import akka.stream.alpakka.googlecloud.storage._
import spray.json.{DefaultJsonProtocol, JsObject, JsValue, RootJsonFormat, RootJsonReader}

import scala.util.Try
import spray.json.enrichAny

@akka.annotation.InternalApi
object Formats extends DefaultJsonProtocol {

private final case class CustomerEncryption(encryptionAlgorithm: String, keySha256: String)
private implicit val customerEncryptionJsonFormat: RootJsonFormat[CustomerEncryption] = jsonFormat2(
CustomerEncryption
CustomerEncryption.apply
)

private final case class Owner(entity: String, entityId: Option[String])
private implicit val OwnerJsonFormat: RootJsonFormat[Owner] = jsonFormat2(Owner)
private implicit val OwnerJsonFormat: RootJsonFormat[Owner] = jsonFormat2(Owner.apply)

private final case class ProjectTeam(projectNumber: String, team: String)
private implicit val ProjectTeamJsonFormat: RootJsonFormat[ProjectTeam] = jsonFormat2(ProjectTeam)
private implicit val ProjectTeamJsonFormat: RootJsonFormat[ProjectTeam] = jsonFormat2(ProjectTeam.apply)

private final case class ObjectAccessControls(kind: String,
id: String,
Expand All @@ -40,7 +41,7 @@ object Formats extends DefaultJsonProtocol {
projectTeam: ProjectTeam,
etag: String)
private implicit val ObjectAccessControlsJsonFormat: RootJsonFormat[ObjectAccessControls] = jsonFormat13(
ObjectAccessControls
ObjectAccessControls.apply
)

/**
Expand Down Expand Up @@ -73,7 +74,7 @@ object Formats extends DefaultJsonProtocol {
)

private implicit val storageObjectReadOnlyJson: RootJsonFormat[StorageObjectReadOnlyJson] = jsonFormat18(
StorageObjectReadOnlyJson
StorageObjectReadOnlyJson.apply
)

// private sub class of StorageObjectJson used to workaround 22 field jsonFormat issue
Expand All @@ -95,7 +96,7 @@ object Formats extends DefaultJsonProtocol {
)

private implicit val storageObjectWritableJson: RootJsonFormat[StorageObjectWriteableJson] = jsonFormat14(
StorageObjectWriteableJson
StorageObjectWriteableJson.apply
)

private implicit object StorageObjectJsonFormat extends RootJsonFormat[StorageObjectJson] {
Expand Down Expand Up @@ -130,7 +131,7 @@ object Formats extends DefaultJsonProtocol {
items: Option[List[StorageObjectJson]]
)

private implicit val bucketInfoJsonFormat: RootJsonFormat[BucketInfoJson] = jsonFormat6(BucketInfoJson)
private implicit val bucketInfoJsonFormat: RootJsonFormat[BucketInfoJson] = jsonFormat6(BucketInfoJson.apply)

/**
* Google API rewrite response object
Expand All @@ -146,7 +147,9 @@ object Formats extends DefaultJsonProtocol {
resource: Option[StorageObjectJson]
)

private implicit val rewriteResponseFormat: RootJsonFormat[RewriteResponseJson] = jsonFormat6(RewriteResponseJson)
private implicit val rewriteResponseFormat: RootJsonFormat[RewriteResponseJson] = jsonFormat6(
RewriteResponseJson.apply
)

/**
* Google API bucket response object
Expand All @@ -162,7 +165,7 @@ object Formats extends DefaultJsonProtocol {
etag: String
)

implicit val bucketInfoFormat: RootJsonFormat[BucketInfo] = jsonFormat2(BucketInfo)
implicit val bucketInfoFormat: RootJsonFormat[BucketInfo] = jsonFormat2(BucketInfo.apply)

implicit object BucketListResultReads extends RootJsonReader[BucketListResult] {
override def read(json: JsValue): BucketListResult = {
Expand All @@ -177,7 +180,7 @@ object Formats extends DefaultJsonProtocol {
}

private implicit val bucketListResultJsonReads: RootJsonFormat[BucketListResultJson] = jsonFormat4(
BucketListResultJson
BucketListResultJson.apply
)

implicit object RewriteResponseReads extends RootJsonReader[RewriteResponse] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ import scala.concurrent.Future
metadata: Option[Map[String, String]] = None): Sink[ByteString, Future[StorageObject]] =
Sink
.fromMaterializer { (mat, attr) =>
implicit val settings = {
implicit val settings: GoogleSettings = {
val s = resolveSettings(mat, attr)
s.copy(requestSettings = s.requestSettings.copy(uploadChunkSize = chunkSize))
}
Expand All @@ -158,7 +158,7 @@ import scala.concurrent.Future
}: PartialFunction[HttpResponse, Future[StorageObject]]
}.withDefaultRetry

ResumableUpload[StorageObject](request).addAttributes(GoogleAttributes.settings(settings))
ResumableUpload[StorageObject](request)(um).addAttributes(GoogleAttributes.settings(settings))
}
.mapMaterializedValue(_.flatten)

Expand Down Expand Up @@ -236,7 +236,7 @@ import scala.concurrent.Future
getBucketPath(bucket) / "o" / objectName

implicit def unmarshaller[T: FromEntityUnmarshaller]: Unmarshaller[HttpResponse, T] =
Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse =>
Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) =>
response match {
case HttpResponse(status, _, entity, _) if status.isSuccess() && !status.isRedirection() =>
Unmarshal(entity).to[T]
Expand All @@ -248,7 +248,7 @@ import scala.concurrent.Future
}.withDefaultRetry

implicit def optionUnmarshaller[T: FromEntityUnmarshaller]: Unmarshaller[HttpResponse, Option[T]] =
Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse =>
Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) =>
response match {
case HttpResponse(status, _, entity, _) if status.isSuccess() && !status.isRedirection() =>
Unmarshal(entity).to[T].map(Some(_))
Expand Down Expand Up @@ -324,7 +324,7 @@ import scala.concurrent.Future
)
}

GoogleSettings(
GoogleSettings.apply(
legacySettings.projectId,
credentials,
settings.requestSettings
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class GCSExtSpec extends AnyFlatSpec with Matchers with LogCapturing {
).asJava
)

implicit val system = ActorSystem.create("gcs", config)
implicit val system: ActorSystem = ActorSystem.create("gcs", config)
val ext = GCSExt(system)

ext.settings.endpointUrl shouldBe endpointUrl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class GCStorageExtSpec extends AnyFlatSpec with Matchers with LogCapturing {
"alpakka.google.cloud.storage.token-scope" -> tokenScope
).asJava
)
implicit val system = ActorSystem.create("gcStorage", config)
implicit val system: ActorSystem = ActorSystem.create("gcStorage", config)
@nowarn("msg=deprecated")
val ext = GCStorageExt(system)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ object GoogleSettings {

}

final case class GoogleSettings @InternalApi private (projectId: String,
credentials: Credentials,
requestSettings: RequestSettings) {
final case class GoogleSettings @InternalApi private[akka] (projectId: String,
credentials: Credentials,
requestSettings: RequestSettings) {
def getProjectId = projectId
def getCredentials = credentials
def getRequestSettings = requestSettings
Expand Down Expand Up @@ -125,7 +125,7 @@ object RequestSettings {
apply(userIp.toScala, quotaUser.toScala, prettyPrint, chunkSize, retrySettings, forwardProxy.toScala)
}

final case class RequestSettings @InternalApi private (
final case class RequestSettings @InternalApi private[akka] (
userIp: Option[String],
quotaUser: Option[String],
prettyPrint: Boolean,
Expand Down Expand Up @@ -247,7 +247,7 @@ object ForwardProxy {
credentials: Option[BasicHttpCredentials],
trustPem: Option[String])(implicit system: ClassicActorSystemProvider): ForwardProxy = {
ForwardProxy(
trustPem.fold(Http(system).defaultClientHttpsContext)(ForwardProxyHttpsContext(_)),
trustPem.fold(Http(system.classicSystem).defaultClientHttpsContext)(ForwardProxyHttpsContext(_)),
ForwardProxyPoolSettings(scheme, host, port, credentials)(system.classicSystem)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ private[alpakka] object ResumableUpload {
import implicits._

implicit val um: FromResponseUnmarshaller[Uri] = Unmarshaller.withMaterializer {
implicit ec => implicit mat => response: HttpResponse =>
implicit ec => implicit mat => (response: HttpResponse) =>
response.discardEntityBytes().future.map { _ =>
response.header[Location].fold(throw InvalidResponseException(ErrorInfo("No Location header")))(_.uri)
}
Expand All @@ -107,7 +107,7 @@ private[alpakka] object ResumableUpload {
)(implicit mat: Materializer): Flow[Either[T, MaybeLast[Chunk]], Try[Option[T]], NotUsed] = {
implicit val system: ActorSystem = mat.system

val um = Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse =>
val um = Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) =>
response.status match {
case PermanentRedirect =>
response.discardEntityBytes().future.map(_ => None)
Expand Down Expand Up @@ -145,7 +145,7 @@ private[alpakka] object ResumableUpload {
import implicits._

implicit val um: FromResponseUnmarshaller[Either[T, Long]] = Unmarshaller.withMaterializer {
implicit ec => implicit mat => response: HttpResponse =>
implicit ec => implicit mat => (response: HttpResponse) =>
response.status match {
case OK | Created => Unmarshal(response).to[T].map(Left(_))
case PermanentRedirect =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ object Credentials {
*/
def apply(c: Config)(implicit system: ClassicActorSystemProvider): Credentials = c.getString("provider") match {
case "application-default" =>
val log = Logging(system.classicSystem, getClass)
val log = Logging(system.classicSystem, classOf[Credentials])
try {
val creds = parseServiceAccount(c)
log.info("Using service account credentials")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,5 +65,5 @@ private[auth] object GoogleOAuth2 {
}

final case class JwtClaimContent(scope: String)
implicit val jwtClaimContentFormat: JsonFormat[JwtClaimContent] = jsonFormat1(JwtClaimContent)
implicit val jwtClaimContentFormat: JsonFormat[JwtClaimContent] = jsonFormat1(JwtClaimContent.apply)
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ import akka.stream.alpakka.google.util.Retry
import spray.json.DefaultJsonProtocol._
import spray.json.RootJsonFormat

final case class GoogleOAuth2Exception private (override val info: ErrorInfo) extends ExceptionWithErrorInfo(info)
final case class GoogleOAuth2Exception private[akka] (override val info: ErrorInfo) extends ExceptionWithErrorInfo(info)

private[google] object GoogleOAuth2Exception {

private val internalFailure = "internal_failure"
private final case class OAuth2ErrorResponse(error: Option[String], error_description: Option[String])
private implicit val oAuth2ErrorResponseFormat: RootJsonFormat[OAuth2ErrorResponse] = jsonFormat2(OAuth2ErrorResponse)
private implicit val oAuth2ErrorResponseFormat: RootJsonFormat[OAuth2ErrorResponse] = jsonFormat2(
OAuth2ErrorResponse.apply
)

implicit val unmarshaller: FromResponseUnmarshaller[Throwable] =
Unmarshaller
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ object `X-Upload-Content-Type` extends ModeledCustomHeaderCompanion[`X-Upload-Co
)
}

final case class `X-Upload-Content-Type` private (contentType: ContentType)
final case class `X-Upload-Content-Type` private[akka] (contentType: ContentType)
extends ModeledCustomHeader[`X-Upload-Content-Type`]
with XUploadContentType {
override def value(): String = contentType.toString()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class OAuth2CredentialsSpec
implicit val settings: RequestSettings = GoogleSettings().requestSettings
implicit val clock: Clock = Clock.systemUTC()

final object AccessTokenProvider {
object AccessTokenProvider {
@volatile var accessTokenPromise: Promise[AccessToken] = Promise.failed(new RuntimeException)
}

Expand Down

0 comments on commit 698075d

Please sign in to comment.