diff --git a/CHANGELOG.md b/CHANGELOG.md index 00f8ebf79d..36abc519ed 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,7 +33,7 @@ be found [here](https://cromwell.readthedocs.io/en/stable/backends/HPC/#optional - Fixes the reference disk feature. - Fixes pulling Docker image metadata from private GCR repositories. - Fixed `google_project` and `google_compute_service_account` workflow options not taking effect when using GCP Batch backend -- Added a way to use a custom LogsPolicy for the job execution, setting `backend.providers.batch.config.batch.logs-policy` to "CLOUD_LOGGING" (default) keeps the current behavior, or, set it to "PATH" to save the logs into the the mounted disk, at the end, this log file gets copied to the google cloud storage bucket with "task.log" as the name. +- Added a way to use a custom LogsPolicy for the job execution, setting `backend.providers.batch.config.batch.logs-policy` to "CLOUD_LOGGING" (default) keeps the current behavior, or, set it to "PATH" to stream the logs to Google Cloud Storage. - When "CLOUD_LOGGING" is used, many more Cromwell / WDL labels for workflow, root workflow, call, shard etc. are now assigned to GCP Batch log entries. - Fixed subnet selection for networks that use custom subnet creation diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala index 7669b8ca43..58ab8c49dc 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala @@ -607,6 +607,32 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar // not the `stderr` file contained memory retry error keys val retryWithMoreMemoryKeys: Option[List[String]] = memoryRetryFactor.flatMap(_ => memoryRetryErrorKeys) + val targetLogFile = batchAttributes.logsPolicy match { + case GcpBatchLogsPolicy.CloudLogging => None + case GcpBatchLogsPolicy.Path => + DefaultPathBuilder.build( + gcpBatchLogPath.pathAsString.replace( + gcpBatchLogPath.root.pathAsString, + GcpBatchAttachedDisk.GcsMountPoint + "/" + ) + ) match { + case Failure(exception) => + throw new RuntimeException( + "Unable to use GcpBatchLogsPolicy.Path because the destination path could not be built, this is likely a programming error and a bug must be reported", + exception + ) + case Success(path) => + // remove trailing slash + val bucket = workflowPaths.workflowRoot.root.pathWithoutScheme.replace("/", "") + + log.info(s"Batch logs for workflow $workflowId will be streamed to GCS at: $gcpBatchLogPath") + + Some( + GcpBatchLogFile(gcsBucket = bucket, mountPath = GcpBatchAttachedDisk.GcsMountPoint, diskPath = path) + ) + } + } + CreateBatchJobParameters( jobDescriptor = jobDescriptor, runtimeAttributes = runtimeAttributes, @@ -614,7 +640,6 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar cloudWorkflowRoot = workflowPaths.workflowRoot, cloudCallRoot = callRootPath, commandScriptContainerPath = cmdInput.containerPath, - logGcsPath = gcpBatchLogPath, inputOutputParameters = inputOutputParameters, projectId = googleProject(jobDescriptor.workflowDescriptor), computeServiceAccount = computeServiceAccount(jobDescriptor.workflowDescriptor), @@ -632,7 +657,8 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar checkpointingConfiguration, enableSshAccess = enableSshAccess, vpcNetworkAndSubnetworkProjectLabels = data.vpcNetworkAndSubnetworkProjectLabels, - dockerhubCredentials = dockerhubCredentials + dockerhubCredentials = dockerhubCredentials, + targetLogFile = targetLogFile ) case Some(other) => throw new RuntimeException(s"Unexpected initialization data: $other") @@ -838,16 +864,6 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar contentType = plainTextContentType ) - val logFileOutput = GcpBatchFileOutput( - logFilename, - logGcsPath, - DefaultPathBuilder.get(logFilename), - workingDisk, - optional = true, - secondary = false, - contentType = plainTextContentType - ) - val memoryRetryRCFileOutput = GcpBatchFileOutput( memoryRetryRCFilename, memoryRetryRCGcsPath, @@ -888,8 +904,7 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar DetritusOutputParameters( monitoringScriptOutputParameter = monitoringOutput, rcFileOutputParameter = rcFileOutput, - memoryRetryRCFileOutputParameter = memoryRetryRCFileOutput, - logFileOutputParameter = logFileOutput + memoryRetryRCFileOutputParameter = memoryRetryRCFileOutput ) ) @@ -907,10 +922,7 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar runtimeAttributes = runtimeAttributes, batchAttributes = batchAttributes, projectId = batchAttributes.project, - region = batchAttributes.location, - logfile = createParameters.commandScriptContainerPath.sibling( - batchParameters.detritusOutputParameters.logFileOutputParameter.name - ) + region = batchAttributes.location ) drsLocalizationManifestCloudPath = jobPaths.callExecutionRoot / GcpBatchJobPaths.DrsLocalizationManifestName diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchJobCachingActorHelper.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchJobCachingActorHelper.scala index edb778d392..bc97ab24cd 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchJobCachingActorHelper.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchJobCachingActorHelper.scala @@ -33,10 +33,6 @@ trait GcpBatchJobCachingActorHelper extends StandardCachingActorHelper { lazy val gcpBatchLogPath: Path = gcpBatchCallPaths.batchLogPath lazy val memoryRetryRCFilename: String = gcpBatchCallPaths.memoryRetryRCFilename lazy val memoryRetryRCGcsPath: Path = gcpBatchCallPaths.memoryRetryRC - - lazy val logFilename: String = "task.log" - lazy val logGcsPath: Path = gcpBatchCallPaths.callExecutionRoot.resolve(logFilename) - lazy val batchAttributes: GcpBatchConfigurationAttributes = batchConfiguration.batchAttributes lazy val defaultLabels: Labels = { diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/GcpBatchRequestFactory.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/GcpBatchRequestFactory.scala index 74447376cf..7c203a8c3a 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/GcpBatchRequestFactory.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/GcpBatchRequestFactory.scala @@ -41,11 +41,9 @@ object GcpBatchRequestFactory { case class DetritusOutputParameters( monitoringScriptOutputParameter: Option[GcpBatchFileOutput], rcFileOutputParameter: GcpBatchFileOutput, - memoryRetryRCFileOutputParameter: GcpBatchFileOutput, - logFileOutputParameter: GcpBatchFileOutput + memoryRetryRCFileOutputParameter: GcpBatchFileOutput ) { def all: List[GcpBatchFileOutput] = memoryRetryRCFileOutputParameter :: - logFileOutputParameter :: rcFileOutputParameter :: monitoringScriptOutputParameter.toList } @@ -67,13 +65,21 @@ object GcpBatchRequestFactory { case class CreateBatchDockerKeyAndToken(key: String, encryptedToken: String) + /** + * Defines the values used for streaming the job logs to GCS. + * + * @param gcsBucket the Cloud Storage bucket where the log file should be streamed to. + * @param mountPath the path where the Cloud Storage bucket will be mounted to. + * @param diskPath the path in the mounted disk where the log file should be written to. + */ + case class GcpBatchLogFile(gcsBucket: String, mountPath: String, diskPath: Path) + case class CreateBatchJobParameters(jobDescriptor: BackendJobDescriptor, runtimeAttributes: GcpBatchRuntimeAttributes, dockerImage: String, cloudWorkflowRoot: Path, cloudCallRoot: Path, commandScriptContainerPath: Path, - logGcsPath: Path, inputOutputParameters: InputOutputParameters, projectId: String, computeServiceAccount: String, @@ -91,7 +97,8 @@ object GcpBatchRequestFactory { checkpointingConfiguration: CheckpointingConfiguration, enableSshAccess: Boolean, vpcNetworkAndSubnetworkProjectLabels: Option[VpcAndSubnetworkProjectLabelValues], - dockerhubCredentials: (String, String) + dockerhubCredentials: (String, String), + targetLogFile: Option[GcpBatchLogFile] ) { def outputParameters = inputOutputParameters.fileOutputParameters } diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/GcpBatchRequestFactoryImpl.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/GcpBatchRequestFactoryImpl.scala index 3c8186039c..0ad12904c0 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/GcpBatchRequestFactoryImpl.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/api/GcpBatchRequestFactoryImpl.scala @@ -11,6 +11,7 @@ import com.google.cloud.batch.v1.{ ComputeResource, CreateJobRequest, DeleteJobRequest, + GCS, GetJobRequest, Job, JobName, @@ -24,7 +25,7 @@ import com.google.cloud.batch.v1.{ import com.google.protobuf.Duration import cromwell.backend.google.batch.io.GcpBatchAttachedDisk import cromwell.backend.google.batch.models.GcpBatchConfigurationAttributes.GcsTransferConfiguration -import cromwell.backend.google.batch.models.{GcpBatchLogsPolicy, GcpBatchRequest, VpcAndSubnetworkProjectLabelValues} +import cromwell.backend.google.batch.models.{GcpBatchRequest, VpcAndSubnetworkProjectLabelValues} import cromwell.backend.google.batch.runnable._ import cromwell.backend.google.batch.util.{BatchUtilityConversions, GcpBatchMachineConstraints} import cromwell.core.labels.{Label, Labels} @@ -228,7 +229,12 @@ class GcpBatchRequestFactoryImpl()(implicit gcsTransferConfiguration: GcsTransfe val networkInterface = createNetwork(data = data) val networkPolicy = createNetworkPolicy(networkInterface.build()) val allDisks = toDisks(allDisksToBeMounted) - val allVolumes = toVolumes(allDisksToBeMounted) + val allVolumes = toVolumes(allDisksToBeMounted) ::: createParameters.targetLogFile.map { targetLogFile => + Volume.newBuilder + .setGcs(GCS.newBuilder().setRemotePath(targetLogFile.gcsBucket)) + .setMountPath(targetLogFile.mountPath) + .build() + }.toList val containerSetup: List[Runnable] = containerSetupRunnables(allVolumes) val localization: List[Runnable] = localizeRunnables(createParameters, allVolumes) @@ -266,13 +272,14 @@ class GcpBatchRequestFactoryImpl()(implicit gcsTransferConfiguration: GcsTransfe val locationPolicy = LocationPolicy.newBuilder.addAllAllowedLocations(zones.asJava).build val allocationPolicy = createAllocationPolicy(data, locationPolicy, instancePolicy.build, networkPolicy, gcpSa, accelerators) - val logsPolicy = data.gcpBatchParameters.batchAttributes.logsPolicy match { - case GcpBatchLogsPolicy.CloudLogging => - LogsPolicy.newBuilder.setDestination(Destination.CLOUD_LOGGING).build - case GcpBatchLogsPolicy.Path => + + val logsPolicy = data.createParameters.targetLogFile match { + case None => LogsPolicy.newBuilder.setDestination(Destination.CLOUD_LOGGING).build + + case Some(targetLogFile) => LogsPolicy.newBuilder .setDestination(Destination.PATH) - .setLogsPath(data.gcpBatchParameters.logfile.toString) + .setLogsPath(targetLogFile.diskPath.pathAsString) .build } diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/io/GcpBatchAttachedDisk.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/io/GcpBatchAttachedDisk.scala index d890b01640..f6d7a2f3c8 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/io/GcpBatchAttachedDisk.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/io/GcpBatchAttachedDisk.scala @@ -13,6 +13,9 @@ import wom.values._ import scala.util.Try object GcpBatchAttachedDisk { + // The mount point for the Cloud Storage bucket + val GcsMountPoint = "/mnt/disks/gcs" + def parse(s: String): Try[GcpBatchAttachedDisk] = { def sizeGbValidation(sizeGbString: String): ErrorOr[Int] = validateLong(sizeGbString).map(_.toInt) diff --git a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/CreateGcpBatchParameters.scala b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/CreateGcpBatchParameters.scala index 0c47b95fb7..456f7115ae 100644 --- a/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/CreateGcpBatchParameters.scala +++ b/supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/models/CreateGcpBatchParameters.scala @@ -1,12 +1,10 @@ package cromwell.backend.google.batch.models import cromwell.backend.BackendJobDescriptor -import cromwell.core.path.Path case class CreateGcpBatchParameters(jobDescriptor: BackendJobDescriptor, runtimeAttributes: GcpBatchRuntimeAttributes, batchAttributes: GcpBatchConfigurationAttributes, projectId: String, - region: String, - logfile: Path + region: String )