diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/GcpBatchBackendLifecycleActorFactory.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/GcpBatchBackendLifecycleActorFactory.scala index c1585ffe9f..847c97b09b 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/GcpBatchBackendLifecycleActorFactory.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/GcpBatchBackendLifecycleActorFactory.scala @@ -7,6 +7,9 @@ import com.google.cloud.batch.v1.BatchServiceSettings import com.google.common.collect.ImmutableMap import com.typesafe.scalalogging.StrictLogging import cromwell.backend._ +import cromwell.backend.google.batch.GcpBatchBackendLifecycleActorFactory.{ + preemptionCountKey +} import cromwell.backend.google.batch.actors._ import cromwell.backend.google.batch.api.request.{BatchRequestExecutor, RequestHandler} import cromwell.backend.google.batch.authentication.GcpBatchDockerCredentials @@ -30,6 +33,7 @@ class GcpBatchBackendLifecycleActorFactory(override val name: String, ) extends StandardLifecycleActorFactory with GcpPlatform { + override val requestedKeyValueStoreKeys: Seq[String] = Seq(preemptionCountKey) import GcpBatchBackendLifecycleActorFactory._ override def jobIdKey: String = "__gcp_batch" @@ -133,6 +137,7 @@ class GcpBatchBackendLifecycleActorFactory(override val name: String, } object GcpBatchBackendLifecycleActorFactory extends StrictLogging { + val preemptionCountKey = "PreemptionCount" private[batch] def robustBuildAttributes(buildAttributes: () => GcpBatchConfigurationAttributes, maxAttempts: Int = 3, diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala index 7669b8ca43..0f06a53fdd 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala @@ -14,13 +14,16 @@ import cromwell.backend.async.{ AbortedExecutionHandle, ExecutionHandle, FailedNonRetryableExecutionHandle, + FailedRetryableExecutionHandle, PendingExecutionHandle } +import cromwell.backend.google.batch.GcpBatchBackendLifecycleActorFactory import cromwell.backend.google.batch.api.GcpBatchRequestFactory._ import cromwell.backend.google.batch.io._ import cromwell.backend.google.batch.models.GcpBatchConfigurationAttributes.GcsTransferConfiguration import cromwell.backend.google.batch.models.GcpBatchJobPaths.GcsTransferLibraryName import cromwell.backend.google.batch.models.RunStatus.TerminalRunStatus +import cromwell.backend.google.batch.models.{GcpBatchExitCode, RunStatus} import cromwell.backend.google.batch.models._ import cromwell.backend.google.batch.monitoring.{BatchInstrumentation, CheckpointingConfiguration, MonitoringImage} import cromwell.backend.google.batch.runnable.WorkflowOptionKeys @@ -46,7 +49,7 @@ import cromwell.filesystems.gcs.GcsPath import cromwell.filesystems.http.HttpPath import cromwell.filesystems.sra.SraPath import cromwell.services.instrumentation.CromwellInstrumentation -import cromwell.services.keyvalue.KeyValueServiceActor.KvJobKey +import cromwell.services.keyvalue.KeyValueServiceActor.{KvJobKey, KvPair, ScopedKey} import cromwell.services.metadata.CallMetadataKeys import mouse.all._ import shapeless.Coproduct @@ -175,6 +178,15 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar override def dockerImageUsed: Option[String] = Option(jobDockerImage) + override lazy val preemptible: Int = jobDescriptor.prefetchedKvStoreEntries.get(GcpBatchBackendLifecycleActorFactory.preemptionCountKey) match { + case Some(KvPair(_, v)) => + Try(v.toInt) match { + case Success(m) => m + case Failure(_) => 0 + } + case _ => runtimeAttributes.preemptible + } + override def tryAbort(job: StandardAsyncJob): Unit = abortJob(workflowId = workflowId, jobName = JobName.parse(job.jobId), @@ -619,6 +631,7 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar projectId = googleProject(jobDescriptor.workflowDescriptor), computeServiceAccount = computeServiceAccount(jobDescriptor.workflowDescriptor), googleLabels = backendLabels ++ customLabels, + preemptible = preemptible, batchTimeout = batchConfiguration.batchTimeout, jobShell = batchConfiguration.jobShell, privateDockerKeyAndEncryptedToken = dockerKeyAndToken, @@ -825,7 +838,7 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar override def executeAsync(): Future[ExecutionHandle] = { // Want to force runtimeAttributes to evaluate so we can fail quickly now if we need to: - def evaluateRuntimeAttributes = Future.fromTry(Try(runtimeAttributes)) + def evaluateRuntimeAttributes = Future.fromTry(Try(runtimeAttributes.copy(preemptible = preemptible))) def generateInputOutputParameters: Future[InputOutputParameters] = Future.fromTry(Try { val rcFileOutput = GcpBatchFileOutput( @@ -896,7 +909,7 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar }) val runBatchResponse = for { - _ <- evaluateRuntimeAttributes + runtimeAttributes <- evaluateRuntimeAttributes _ <- uploadScriptFile() customLabels <- Future.fromTry(GcpLabel.fromWorkflowOptions(workflowDescriptor.workflowOptions)) batchParameters <- generateInputOutputParameters @@ -1046,14 +1059,23 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar // returnCode is provided by cromwell, so far, this is empty for all the tests I ran override def handleExecutionFailure(runStatus: RunStatus, returnCode: Option[Int]): Future[ExecutionHandle] = { def handleFailedRunStatus(runStatus: RunStatus.UnsuccessfulRunStatus): ExecutionHandle = - FailedNonRetryableExecutionHandle( - StandardException( + if (runStatus.exitCode == Some(GcpBatchExitCode.VMPreemption)) { + FailedRetryableExecutionHandle( + StandardException( + message = runStatus.prettyPrintedError, + jobTag = jobTag), + returnCode, + Option(Seq(KvPair(ScopedKey(workflowId, futureKvJobKey, GcpBatchBackendLifecycleActorFactory.preemptionCountKey), "0"))) + ) + } else { + FailedNonRetryableExecutionHandle( + StandardException( message = runStatus.prettyPrintedError, - jobTag = jobTag - ), - returnCode, - None - ) + jobTag = jobTag), + returnCode, + None + ) + } Future.fromTry { Try { diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchJobCachingActorHelper.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchJobCachingActorHelper.scala index edb778d392..9d7189a14b 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchJobCachingActorHelper.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchJobCachingActorHelper.scala @@ -25,6 +25,8 @@ trait GcpBatchJobCachingActorHelper extends StandardCachingActorHelper { batchConfiguration.runtimeConfig ) + val preemptible: Int + lazy val workingDisk: GcpBatchAttachedDisk = runtimeAttributes.disks.find(_.name == GcpBatchWorkingDisk.Name).get lazy val callRootPath: Path = gcpBatchCallPaths.callExecutionRoot @@ -75,9 +77,10 @@ trait GcpBatchJobCachingActorHelper extends StandardCachingActorHelper { .get(WorkflowOptionKeys.GoogleProject) .getOrElse(batchAttributes.project) - Map[String, String]( + Map[String, Any]( GcpBatchMetadataKeys.GoogleProject -> googleProject, - GcpBatchMetadataKeys.ExecutionBucket -> initializationData.workflowPaths.executionRootString + GcpBatchMetadataKeys.ExecutionBucket -> initializationData.workflowPaths.executionRootString, + "preemptible" -> preemptible ) ++ originalLabelEvents } diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/GcpBatchRequestFactory.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/GcpBatchRequestFactory.scala index 74447376cf..eaa7873a8f 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/GcpBatchRequestFactory.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/GcpBatchRequestFactory.scala @@ -78,6 +78,7 @@ object GcpBatchRequestFactory { projectId: String, computeServiceAccount: String, googleLabels: Seq[GcpLabel], + preemptible: Int, batchTimeout: FiniteDuration, jobShell: String, privateDockerKeyAndEncryptedToken: Option[CreateBatchDockerKeyAndToken],