Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix google cloud keep alive settings #876

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,17 @@ import pekko.actor.{ ClassicActorSystemProvider, ExtendedActorSystem, Scheduler
import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts
import pekko.http.scaladsl.Http.HostConnectionPool
import pekko.http.scaladsl.model.headers.Authorization
import pekko.http.scaladsl.model.headers.{ Authorization, Connection }
import pekko.http.scaladsl.model.{ HttpRequest, HttpResponse }
import pekko.http.scaladsl.settings.ConnectionPoolSettings
import pekko.http.scaladsl.unmarshalling.{ FromResponseUnmarshaller, Unmarshal }
import pekko.http.scaladsl.{ Http, HttpExt }
import pekko.stream.connectors.google.{ GoogleAttributes, GoogleSettings, RequestSettings, RetrySettings }
import pekko.stream.connectors.google.util.Retry
import pekko.stream.scaladsl.{ Flow, FlowWithContext, Keep, RetryFlow }

import scala.concurrent.{ ExecutionContextExecutor, Future }
import scala.concurrent.duration.Duration
import scala.util.{ Failure, Success, Try }

@InternalApi
Expand All @@ -48,14 +50,23 @@ private[connectors] final class GoogleHttp private (val http: HttpExt) extends A
private implicit def system: ExtendedActorSystem = http.system
private implicit def ec: ExecutionContextExecutor = system.dispatcher
private implicit def scheduler: Scheduler = system.scheduler
private def defaultConnectionPoolSettingsWithInfKeepAlive =
ConnectionPoolSettings(system).withKeepAliveTimeout(Duration.Inf)

private def setKeepAlive(request: HttpRequest): HttpRequest =
if (request.headers.exists { header => header.is("connection") || header.is("keep-alive") }) {
request.removeHeader("connection").removeHeader("keep-alive").addHeader(Connection("Keep-Alive"))
} else request.addHeader(Connection("Keep-Alive"))

/**
* Sends a single [[HttpRequest]] and returns the raw [[HttpResponse]].
*/
def singleRawRequest(request: HttpRequest)(implicit settings: RequestSettings): Future[HttpResponse] = {
val requestWithStandardParams = addStandardQuery(request)
settings.forwardProxy.fold(http.singleRequest(requestWithStandardParams)) { proxy =>
http.singleRequest(requestWithStandardParams, proxy.connectionContext, proxy.poolSettings)
val requestWithStandardParams = addStandardQuery(setKeepAlive(request))
settings.forwardProxy.fold(http.singleRequest(requestWithStandardParams, http.defaultClientHttpsContext,
defaultConnectionPoolSettingsWithInfKeepAlive)) { proxy =>
http.singleRequest(requestWithStandardParams, proxy.connectionContext,
proxy.poolSettings.withKeepAliveTimeout(Duration.Inf))
}
}

Expand Down Expand Up @@ -121,7 +132,7 @@ private[connectors] final class GoogleHttp private (val http: HttpExt) extends A
else
FlowWithContext[HttpRequest, Ctx]

val requestFlow = settings.requestSettings.forwardProxy match {
val requestFlow = (settings.requestSettings.forwardProxy match {
case None if !https =>
http.cachedHostConnectionPool[Ctx](host, p)
case Some(proxy) if !https =>
Expand All @@ -131,6 +142,8 @@ private[connectors] final class GoogleHttp private (val http: HttpExt) extends A
case Some(proxy) if https =>
http.cachedHostConnectionPoolHttps[Ctx](host, p, proxy.connectionContext, proxy.poolSettings)
case _ => throw new RuntimeException(s"illegal proxy settings with https=$https")
}).contramap[(HttpRequest, Ctx)] { case (request, context) =>
(setKeepAlive(request), context)
}

val unmarshalFlow = Flow[(Try[HttpResponse], Ctx)].mapAsyncUnordered(parallelism) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ class FcmSenderSpec
val request: HttpRequest = captor.getValue
Unmarshal(request.entity).to[FcmSend].futureValue shouldBe FcmSend(false, FcmNotification.empty)
request.uri.toString should startWith("https://fcm.googleapis.com/v1/projects/projectId/messages:send")
request.headers.size shouldBe 1
request.headers.head should matchPattern { case HttpHeader("authorization", "Bearer <no-token>") => }
request.headers.size shouldBe 2
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we have added an extra keep alive header we know return 2 headers, but we still only check the first header for what we expect

request.headers(1) should matchPattern { case HttpHeader("authorization", "Bearer <no-token>") => }
}

"parse the success response correctly" in {
Expand Down