From 94c6330a5b556168317fefaee80a8d214ca8310d Mon Sep 17 00:00:00 2001 From: Andreas Neumann Date: Wed, 30 Oct 2019 15:24:48 +0100 Subject: [PATCH 1/3] Fixed some timeout issues on startup Fixed issues with PodIds containing . Exit JVM if an error is thrown or the main loop exits --- .../src/main/resources/application.conf | 4 +++- .../usi/helloworld/KeepAliveFramework.scala | 12 ++++++++++-- .../usi/helloworld/KeepAlivePodSpecHelper.scala | 11 +++++++---- mesos-client/src/main/resources/reference.conf | 15 ++++++++++++++- 4 files changed, 34 insertions(+), 8 deletions(-) diff --git a/examples/keep-alive-framework/src/main/resources/application.conf b/examples/keep-alive-framework/src/main/resources/application.conf index 100750e84..e6988e033 100644 --- a/examples/keep-alive-framework/src/main/resources/application.conf +++ b/examples/keep-alive-framework/src/main/resources/application.conf @@ -17,7 +17,9 @@ akka { loglevel = "INFO" logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" - + + logger-startup-timeout = 30s + http.client { # The time period within which the TCP connecting process must be completed. diff --git a/examples/keep-alive-framework/src/main/scala/com/mesosphere/usi/helloworld/KeepAliveFramework.scala b/examples/keep-alive-framework/src/main/scala/com/mesosphere/usi/helloworld/KeepAliveFramework.scala index 64d167939..db9e5fc2b 100644 --- a/examples/keep-alive-framework/src/main/scala/com/mesosphere/usi/helloworld/KeepAliveFramework.scala +++ b/examples/keep-alive-framework/src/main/scala/com/mesosphere/usi/helloworld/KeepAliveFramework.scala @@ -50,7 +50,7 @@ class KeepAliveFramework(settings: KeepAliveFrameWorkSettings, authorization: Op import TaskState._ def activeTask(status: TaskStatus) = Seq(TASK_STAGING, TASK_STARTING, TASK_RUNNING).contains(status.getState) // We're only interested in the bad task statuses for our pod - val failedTasks = taskStatuses.filterNot { case (id, status) => activeTask(status) } + val failedTasks = taskStatuses.filterNot { case (taskId, status) => activeTask(status) } if (failedTasks.nonEmpty) { logger.info(s"Restarting Pod $id") val newId = KeepAlivePodSpecHelper.createNewIncarnationId(id) @@ -110,6 +110,7 @@ class KeepAliveFramework(settings: KeepAliveFrameWorkSettings, authorization: Op // We let the framework run "forever" val result = Await.result(end, Duration.Inf) logger.warn(s"Framework finished with $result") + Await.result(system.terminate(), Duration.Inf) } object KeepAliveFramework { @@ -202,7 +203,14 @@ object KeepAliveFramework { MesosClientSettings.fromConfig(conf).withMasters(Seq(mesosUrl.toURL)), conf.getInt("keep-alive-framework.tasks-started"), mesosRole) - new KeepAliveFramework(settings, provider) + try { + new KeepAliveFramework(settings, provider) + } catch { + case _: Throwable => { + Await.result(system.terminate(), Duration.Inf) + sys.exit(1) + } + } case _ => sys.exit(1) } diff --git a/examples/keep-alive-framework/src/main/scala/com/mesosphere/usi/helloworld/KeepAlivePodSpecHelper.scala b/examples/keep-alive-framework/src/main/scala/com/mesosphere/usi/helloworld/KeepAlivePodSpecHelper.scala index 9bd180e40..9a53385b3 100644 --- a/examples/keep-alive-framework/src/main/scala/com/mesosphere/usi/helloworld/KeepAlivePodSpecHelper.scala +++ b/examples/keep-alive-framework/src/main/scala/com/mesosphere/usi/helloworld/KeepAlivePodSpecHelper.scala @@ -5,11 +5,12 @@ import java.util.UUID import com.mesosphere.usi.core.models.resources.{ResourceType, ScalarRequirement} import com.mesosphere.usi.core.models.template.{RunTemplate, SimpleRunTemplateFactory} import com.mesosphere.usi.core.models.{PodId, RunningPodSpec} +import com.typesafe.scalalogging.StrictLogging /** * This is a helper object that generates pod specs and snapshots. */ -object KeepAlivePodSpecHelper { +object KeepAlivePodSpecHelper extends StrictLogging { val runSpec: RunTemplate = SimpleRunTemplateFactory( resourceRequirements = List(ScalarRequirement(ResourceType.CPUS, 0.001), ScalarRequirement(ResourceType.MEM, 32)), @@ -18,8 +19,9 @@ object KeepAlivePodSpecHelper { ) def generatePodSpec(): RunningPodSpec = { - val podId = PodId(s"hello-world.${UUID.randomUUID()}.1") + val podId = PodId(s"hello-world_${UUID.randomUUID()}_1") + logger.info(s"Generating PodSpec for '${podId}'") val podSpec = RunningPodSpec(id = podId, runSpec = runSpec) podSpec } @@ -28,12 +30,13 @@ object KeepAlivePodSpecHelper { (1 to numberOfPods).map(_ => generatePodSpec())(collection.breakOut) def createNewIncarnationId(podId: PodId): PodId = { - val idAndIncarnation = """^(.+\..*)\.(\d+)$""".r + val idAndIncarnation = """^(.+_.*)_(\d+)$""".r val (podIdWithoutIncarnation, currentIncarnation) = podId.value match { case idAndIncarnation(id, inc) => id -> inc.toLong + case _ => throw new IllegalArgumentException(s"Failed to create new incarnation id for ${podId.value}") } - PodId(s"$podIdWithoutIncarnation.${currentIncarnation + 1}") + PodId(s"${podIdWithoutIncarnation}_${currentIncarnation + 1}") } } diff --git a/mesos-client/src/main/resources/reference.conf b/mesos-client/src/main/resources/reference.conf index cfa6b5ee3..8567080d9 100644 --- a/mesos-client/src/main/resources/reference.conf +++ b/mesos-client/src/main/resources/reference.conf @@ -33,4 +33,17 @@ akka { # outgoing connection might be closed if no call is issued to Mesos for a while idle-timeout = infinite } -} + + stream { + + # Default materializer settings + materializer { + + # Cleanup leaked publishers and subscribers when they are not used within a given + # deadline + subscription-timeout { + timeout = 25s + } + } + } +} \ No newline at end of file From da9ea231ef3800027a1416ead67c00330d57ba3c Mon Sep 17 00:00:00 2001 From: Tim Harper Date: Thu, 31 Oct 2019 18:34:06 -0600 Subject: [PATCH 2/3] Let's not put stream defaults in USI If we need timeouts they should be controlled by USI specific config, and not by changing Akka defaults. Doing the latter could lead to surprising behavior in frameworks that use Akka. Further, the override order for reference.conf files is non-deterministic. Depending on the packaging method used, one reference.conf file (from akka) could take precendence over another, and lead to non-deterministic config. Avoid. --- mesos-client/src/main/resources/reference.conf | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/mesos-client/src/main/resources/reference.conf b/mesos-client/src/main/resources/reference.conf index 8567080d9..b81b80c55 100644 --- a/mesos-client/src/main/resources/reference.conf +++ b/mesos-client/src/main/resources/reference.conf @@ -33,17 +33,4 @@ akka { # outgoing connection might be closed if no call is issued to Mesos for a while idle-timeout = infinite } - - stream { - - # Default materializer settings - materializer { - - # Cleanup leaked publishers and subscribers when they are not used within a given - # deadline - subscription-timeout { - timeout = 25s - } - } - } } \ No newline at end of file From b183d477725c6927a427558624744e14eb514044 Mon Sep 17 00:00:00 2001 From: Andreas Neumann Date: Mon, 4 Nov 2019 15:17:24 +0100 Subject: [PATCH 3/3] Create pool settings with timeout and log error if we take too long --- .../com/mesosphere/mesos/client/Session.scala | 38 ++++++++++++++----- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/mesos-client/src/main/scala/com/mesosphere/mesos/client/Session.scala b/mesos-client/src/main/scala/com/mesosphere/mesos/client/Session.scala index e33f2edac..603ab8e7c 100644 --- a/mesos-client/src/main/scala/com/mesosphere/mesos/client/Session.scala +++ b/mesos-client/src/main/scala/com/mesosphere/mesos/client/Session.scala @@ -4,14 +4,16 @@ import java.net.URL import akka.NotUsed import akka.actor.{Actor, ActorRef, ActorSystem, Props, Stash, Status} import akka.http.scaladsl.Http -import akka.http.scaladsl.model.headers.{Authorization, HttpCredentials} import akka.http.scaladsl.model._ +import akka.http.scaladsl.model.headers.{Authorization, HttpCredentials} import akka.http.scaladsl.settings.ConnectionPoolSettings import akka.stream.Materializer import akka.stream.scaladsl.Flow import akka.util.Timeout import com.typesafe.scalalogging.StrictLogging +import scala.concurrent.duration._ +import scala.concurrent.{Await, ExecutionContext, Future, TimeoutException} import scala.util.{Failure, Success} /** @@ -25,7 +27,7 @@ import scala.util.{Failure, Success} * @param authorization A [[CredentialsProvider]] if the connection is secured. */ case class Session(url: URL, streamId: String, authorization: Option[CredentialsProvider] = None)( - implicit askTimout: Timeout) { + implicit askTimout: Timeout) extends StrictLogging { /** * Construct a new [[HttpRequest]] for a serialized Mesos call and a set of authorization, ie session token. @@ -52,6 +54,28 @@ case class Session(url: URL, streamId: String, authorization: Option[Credentials Flow[Array[Byte]].map(createPostRequest(_, None)).via(connection) } + private def poolSettings():Future[ConnectionPoolSettings] = Future { + // Constructs the connection pool settings with defaults and overrides the max connections and pipelining limit so + // that only one request at a time is processed. See https://doc.akka.io/docs/akka-http/current/configuration.html + // for details. + // *IMPORTANT*: DO NOT CHANGE maxConnections OR pipeliningLimit! Otherwise, USI won't guarantee request order to Mesos! + ConnectionPoolSettings("").withMaxConnections(1).withPipeliningLimit(1) + }(ExecutionContext.global) + + private def poolSettingsWithTimeout():ConnectionPoolSettings = { + // We create the pool settings with a timeout: It seems by default, the creation of the settings requires a reverse + // dns lookup, which may take more than 5 seconds which triggers the akka streams subscription timeout. + // If we let the creation here take the full time, we won't start without any reasonable logging message + try { + Await.result(poolSettings(), 2.seconds) + } catch { + case e: TimeoutException => { + logger.error("Failed to create ConnectionPoolSettings. Is your reverse DNS lookup slow?", e) + throw e + } + } + } + /** * A connection flow factory that will create a flow that only processes one request at a time. * @@ -60,19 +84,15 @@ case class Session(url: URL, streamId: String, authorization: Option[Credentials * @return A connection flow for single requests. */ private def connection(implicit system: ActorSystem, mat: Materializer): Flow[HttpRequest, HttpResponse, NotUsed] = { - // Constructs the connection pool settings with defaults and overrides the max connections and pipelining limit so - // that only one request at a time is processed. See https://doc.akka.io/docs/akka-http/current/configuration.html - // for details. - // *IMPORTANT*: DO NOT CHANGE maxConnections OR pipeliningLimit! Otherwise, USI won't guarantee request order to Mesos! - val poolSettings = ConnectionPoolSettings("").withMaxConnections(1).withPipeliningLimit(1) + val settings = poolSettingsWithTimeout() Flow[HttpRequest] .map(_ -> NotUsed) .via(if (Session.isSecured(url)) { Http() - .newHostConnectionPoolHttps(host = url.getHost, port = Session.effectivePort(url), settings = poolSettings) + .newHostConnectionPoolHttps(host = url.getHost, port = Session.effectivePort(url), settings = settings) } else { - Http().newHostConnectionPool(host = url.getHost, port = Session.effectivePort(url), settings = poolSettings) + Http().newHostConnectionPool(host = url.getHost, port = Session.effectivePort(url), settings = settings) }) .map { case (Success(response), NotUsed) => response