From 19fa066b0e7ff9c4e8375182b44a814dc0d07703 Mon Sep 17 00:00:00 2001 From: Matthew de Detrich Date: Thu, 24 Oct 2024 13:26:19 +0200 Subject: [PATCH] Fix google cloud keep alive settings --- .../connectors/google/http/GoogleHttp.scala | 23 +++++++++++++++---- .../firebase/fcm/v1/impl/FcmSenderSpec.scala | 4 ++-- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttp.scala b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttp.scala index 0bafea003..e90eab7eb 100644 --- a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttp.scala +++ b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttp.scala @@ -18,8 +18,9 @@ import pekko.actor.{ ClassicActorSystemProvider, ExtendedActorSystem, Scheduler import pekko.annotation.InternalApi import pekko.dispatch.ExecutionContexts import pekko.http.scaladsl.Http.HostConnectionPool -import pekko.http.scaladsl.model.headers.Authorization +import pekko.http.scaladsl.model.headers.{ Authorization, Connection } import pekko.http.scaladsl.model.{ HttpRequest, HttpResponse } +import pekko.http.scaladsl.settings.ConnectionPoolSettings import pekko.http.scaladsl.unmarshalling.{ FromResponseUnmarshaller, Unmarshal } import pekko.http.scaladsl.{ Http, HttpExt } import pekko.stream.connectors.google.{ GoogleAttributes, GoogleSettings, RequestSettings, RetrySettings } @@ -27,6 +28,7 @@ import pekko.stream.connectors.google.util.Retry import pekko.stream.scaladsl.{ Flow, FlowWithContext, Keep, RetryFlow } import scala.concurrent.{ ExecutionContextExecutor, Future } +import scala.concurrent.duration.Duration import scala.util.{ Failure, Success, Try } @InternalApi @@ -48,14 +50,23 @@ private[connectors] final class GoogleHttp private (val http: HttpExt) extends A private implicit def system: ExtendedActorSystem = http.system private implicit def ec: ExecutionContextExecutor = system.dispatcher private implicit def scheduler: Scheduler = system.scheduler + private def defaultConnectionPoolSettingsWithInfKeepAlive = + ConnectionPoolSettings(system).withKeepAliveTimeout(Duration.Inf) + + private def setKeepAlive(request: HttpRequest): HttpRequest = + if (request.headers.exists { header => header.is("connection") || header.is("keep-alive") }) { + request.removeHeader("connection").removeHeader("keep-alive").addHeader(Connection("Keep-Alive")) + } else request.addHeader(Connection("Keep-Alive")) /** * Sends a single [[HttpRequest]] and returns the raw [[HttpResponse]]. */ def singleRawRequest(request: HttpRequest)(implicit settings: RequestSettings): Future[HttpResponse] = { - val requestWithStandardParams = addStandardQuery(request) - settings.forwardProxy.fold(http.singleRequest(requestWithStandardParams)) { proxy => - http.singleRequest(requestWithStandardParams, proxy.connectionContext, proxy.poolSettings) + val requestWithStandardParams = addStandardQuery(setKeepAlive(request)) + settings.forwardProxy.fold(http.singleRequest(requestWithStandardParams, http.defaultClientHttpsContext, + defaultConnectionPoolSettingsWithInfKeepAlive)) { proxy => + http.singleRequest(requestWithStandardParams, proxy.connectionContext, + proxy.poolSettings.withKeepAliveTimeout(Duration.Inf)) } } @@ -121,7 +132,7 @@ private[connectors] final class GoogleHttp private (val http: HttpExt) extends A else FlowWithContext[HttpRequest, Ctx] - val requestFlow = settings.requestSettings.forwardProxy match { + val requestFlow = (settings.requestSettings.forwardProxy match { case None if !https => http.cachedHostConnectionPool[Ctx](host, p) case Some(proxy) if !https => @@ -131,6 +142,8 @@ private[connectors] final class GoogleHttp private (val http: HttpExt) extends A case Some(proxy) if https => http.cachedHostConnectionPoolHttps[Ctx](host, p, proxy.connectionContext, proxy.poolSettings) case _ => throw new RuntimeException(s"illegal proxy settings with https=$https") + }).contramap[(HttpRequest, Ctx)] { case (request, context) => + (setKeepAlive(request), context) } val unmarshalFlow = Flow[(Try[HttpResponse], Ctx)].mapAsyncUnordered(parallelism) { diff --git a/google-fcm/src/test/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/v1/impl/FcmSenderSpec.scala b/google-fcm/src/test/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/v1/impl/FcmSenderSpec.scala index ca6c18820..feb4c05ba 100644 --- a/google-fcm/src/test/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/v1/impl/FcmSenderSpec.scala +++ b/google-fcm/src/test/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/v1/impl/FcmSenderSpec.scala @@ -83,8 +83,8 @@ class FcmSenderSpec val request: HttpRequest = captor.getValue Unmarshal(request.entity).to[FcmSend].futureValue shouldBe FcmSend(false, FcmNotification.empty) request.uri.toString should startWith("https://fcm.googleapis.com/v1/projects/projectId/messages:send") - request.headers.size shouldBe 1 - request.headers.head should matchPattern { case HttpHeader("authorization", "Bearer ") => } + request.headers.size shouldBe 2 + request.headers(1) should matchPattern { case HttpHeader("authorization", "Bearer ") => } } "parse the success response correctly" in {