Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WX-927 GCP Batch: LogsPolicy.PATH now streams the logs to GCS #7529

Merged
merged 4 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ be found [here](https://cromwell.readthedocs.io/en/stable/backends/HPC/#optional
- Fixes the reference disk feature.
- Fixes pulling Docker image metadata from private GCR repositories.
- Fixed `google_project` and `google_compute_service_account` workflow options not taking effect when using GCP Batch backend
- Added a way to use a custom LogsPolicy for the job execution, setting `backend.providers.batch.config.batch.logs-policy` to "CLOUD_LOGGING" (default) keeps the current behavior, or, set it to "PATH" to save the logs into the the mounted disk, at the end, this log file gets copied to the google cloud storage bucket with "task.log" as the name.
- Added a way to use a custom LogsPolicy for the job execution, setting `backend.providers.batch.config.batch.logs-policy` to "CLOUD_LOGGING" (default) keeps the current behavior, or, set it to "PATH" to stream the logs to Google Cloud Storage.
- When "CLOUD_LOGGING" is used, many more Cromwell / WDL labels for workflow, root workflow, call, shard etc. are now assigned to GCP Batch log entries.
- Fixed subnet selection for networks that use custom subnet creation

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -607,14 +607,39 @@
// not the `stderr` file contained memory retry error keys
val retryWithMoreMemoryKeys: Option[List[String]] = memoryRetryFactor.flatMap(_ => memoryRetryErrorKeys)

val targetLogFile = batchAttributes.logsPolicy match {
case GcpBatchLogsPolicy.CloudLogging => None
case GcpBatchLogsPolicy.Path =>
DefaultPathBuilder.build(
gcpBatchLogPath.pathAsString.replace(
gcpBatchLogPath.root.pathAsString,
GcpBatchAttachedDisk.GcsMountPoint + "/"

Check warning on line 616 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala#L613-L616

Added lines #L613 - L616 were not covered by tests
)
) match {

Check warning on line 618 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala#L618

Added line #L618 was not covered by tests
case Failure(exception) =>
throw new RuntimeException(

Check warning on line 620 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala#L620

Added line #L620 was not covered by tests
"Unable to use GcpBatchLogsPolicy.Path because the destination path could not be built, this is likely a programming error and a bug must be reported",
exception
)
case Success(path) =>

Check warning on line 624 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala#L624

Added line #L624 was not covered by tests
// remove trailing slash
val bucket = workflowPaths.workflowRoot.root.pathWithoutScheme.replace("/", "")

Check warning on line 626 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala#L626

Added line #L626 was not covered by tests

log.info(s"Batch logs for workflow $workflowId will be streamed to GCS at: $gcpBatchLogPath")

Check warning on line 628 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala#L628

Added line #L628 was not covered by tests

Some(
GcpBatchLogFile(gcsBucket = bucket, mountPath = GcpBatchAttachedDisk.GcsMountPoint, diskPath = path)

Check warning on line 631 in supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala

View check run for this annotation

Codecov / codecov/patch

supportedBackends/google/batch/src/main/scala/cromwell/backend/google/batch/actors/GcpBatchAsyncBackendJobExecutionActor.scala#L630-L631

Added lines #L630 - L631 were not covered by tests
)
}
}

CreateBatchJobParameters(
jobDescriptor = jobDescriptor,
runtimeAttributes = runtimeAttributes,
dockerImage = jobDockerImage,
cloudWorkflowRoot = workflowPaths.workflowRoot,
cloudCallRoot = callRootPath,
commandScriptContainerPath = cmdInput.containerPath,
logGcsPath = gcpBatchLogPath,
inputOutputParameters = inputOutputParameters,
projectId = googleProject(jobDescriptor.workflowDescriptor),
computeServiceAccount = computeServiceAccount(jobDescriptor.workflowDescriptor),
Expand All @@ -632,7 +657,8 @@
checkpointingConfiguration,
enableSshAccess = enableSshAccess,
vpcNetworkAndSubnetworkProjectLabels = data.vpcNetworkAndSubnetworkProjectLabels,
dockerhubCredentials = dockerhubCredentials
dockerhubCredentials = dockerhubCredentials,
targetLogFile = targetLogFile
)
case Some(other) =>
throw new RuntimeException(s"Unexpected initialization data: $other")
Expand Down Expand Up @@ -838,16 +864,6 @@
contentType = plainTextContentType
)

val logFileOutput = GcpBatchFileOutput(
logFilename,
logGcsPath,
DefaultPathBuilder.get(logFilename),
workingDisk,
optional = true,
secondary = false,
contentType = plainTextContentType
)

val memoryRetryRCFileOutput = GcpBatchFileOutput(
memoryRetryRCFilename,
memoryRetryRCGcsPath,
Expand Down Expand Up @@ -888,8 +904,7 @@
DetritusOutputParameters(
monitoringScriptOutputParameter = monitoringOutput,
rcFileOutputParameter = rcFileOutput,
memoryRetryRCFileOutputParameter = memoryRetryRCFileOutput,
logFileOutputParameter = logFileOutput
memoryRetryRCFileOutputParameter = memoryRetryRCFileOutput
)
)

Expand All @@ -907,10 +922,7 @@
runtimeAttributes = runtimeAttributes,
batchAttributes = batchAttributes,
projectId = batchAttributes.project,
region = batchAttributes.location,
logfile = createParameters.commandScriptContainerPath.sibling(
batchParameters.detritusOutputParameters.logFileOutputParameter.name
)
region = batchAttributes.location
)

drsLocalizationManifestCloudPath = jobPaths.callExecutionRoot / GcpBatchJobPaths.DrsLocalizationManifestName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ trait GcpBatchJobCachingActorHelper extends StandardCachingActorHelper {
lazy val gcpBatchLogPath: Path = gcpBatchCallPaths.batchLogPath
lazy val memoryRetryRCFilename: String = gcpBatchCallPaths.memoryRetryRCFilename
lazy val memoryRetryRCGcsPath: Path = gcpBatchCallPaths.memoryRetryRC

lazy val logFilename: String = "task.log"
lazy val logGcsPath: Path = gcpBatchCallPaths.callExecutionRoot.resolve(logFilename)

lazy val batchAttributes: GcpBatchConfigurationAttributes = batchConfiguration.batchAttributes

lazy val defaultLabels: Labels = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,9 @@ object GcpBatchRequestFactory {
case class DetritusOutputParameters(
monitoringScriptOutputParameter: Option[GcpBatchFileOutput],
rcFileOutputParameter: GcpBatchFileOutput,
memoryRetryRCFileOutputParameter: GcpBatchFileOutput,
logFileOutputParameter: GcpBatchFileOutput
memoryRetryRCFileOutputParameter: GcpBatchFileOutput
) {
def all: List[GcpBatchFileOutput] = memoryRetryRCFileOutputParameter ::
logFileOutputParameter ::
rcFileOutputParameter ::
monitoringScriptOutputParameter.toList
}
Expand All @@ -67,13 +65,21 @@ object GcpBatchRequestFactory {

case class CreateBatchDockerKeyAndToken(key: String, encryptedToken: String)

/**
* Defines the values used for streaming the job logs to GCS.
*
* @param gcsBucket the Cloud Storage bucket where the log file should be streamed to.
* @param mountPath the path where the Cloud Storage bucket will be mounted to.
* @param diskPath the path in the mounted disk where the log file should be written to.
*/
case class GcpBatchLogFile(gcsBucket: String, mountPath: String, diskPath: Path)

case class CreateBatchJobParameters(jobDescriptor: BackendJobDescriptor,
runtimeAttributes: GcpBatchRuntimeAttributes,
dockerImage: String,
cloudWorkflowRoot: Path,
cloudCallRoot: Path,
commandScriptContainerPath: Path,
logGcsPath: Path,
inputOutputParameters: InputOutputParameters,
projectId: String,
computeServiceAccount: String,
Expand All @@ -91,7 +97,8 @@ object GcpBatchRequestFactory {
checkpointingConfiguration: CheckpointingConfiguration,
enableSshAccess: Boolean,
vpcNetworkAndSubnetworkProjectLabels: Option[VpcAndSubnetworkProjectLabelValues],
dockerhubCredentials: (String, String)
dockerhubCredentials: (String, String),
targetLogFile: Option[GcpBatchLogFile]
) {
def outputParameters = inputOutputParameters.fileOutputParameters
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import com.google.cloud.batch.v1.{
ComputeResource,
CreateJobRequest,
DeleteJobRequest,
GCS,
GetJobRequest,
Job,
JobName,
Expand All @@ -24,7 +25,7 @@ import com.google.cloud.batch.v1.{
import com.google.protobuf.Duration
import cromwell.backend.google.batch.io.GcpBatchAttachedDisk
import cromwell.backend.google.batch.models.GcpBatchConfigurationAttributes.GcsTransferConfiguration
import cromwell.backend.google.batch.models.{GcpBatchLogsPolicy, GcpBatchRequest, VpcAndSubnetworkProjectLabelValues}
import cromwell.backend.google.batch.models.{GcpBatchRequest, VpcAndSubnetworkProjectLabelValues}
import cromwell.backend.google.batch.runnable._
import cromwell.backend.google.batch.util.{BatchUtilityConversions, GcpBatchMachineConstraints}
import cromwell.core.labels.{Label, Labels}
Expand Down Expand Up @@ -228,7 +229,12 @@ class GcpBatchRequestFactoryImpl()(implicit gcsTransferConfiguration: GcsTransfe
val networkInterface = createNetwork(data = data)
val networkPolicy = createNetworkPolicy(networkInterface.build())
val allDisks = toDisks(allDisksToBeMounted)
val allVolumes = toVolumes(allDisksToBeMounted)
val allVolumes = toVolumes(allDisksToBeMounted) ::: createParameters.targetLogFile.map { targetLogFile =>
Volume.newBuilder
.setGcs(GCS.newBuilder().setRemotePath(targetLogFile.gcsBucket))
.setMountPath(targetLogFile.mountPath)
.build()
}.toList

val containerSetup: List[Runnable] = containerSetupRunnables(allVolumes)
val localization: List[Runnable] = localizeRunnables(createParameters, allVolumes)
Expand Down Expand Up @@ -266,13 +272,14 @@ class GcpBatchRequestFactoryImpl()(implicit gcsTransferConfiguration: GcsTransfe
val locationPolicy = LocationPolicy.newBuilder.addAllAllowedLocations(zones.asJava).build
val allocationPolicy =
createAllocationPolicy(data, locationPolicy, instancePolicy.build, networkPolicy, gcpSa, accelerators)
val logsPolicy = data.gcpBatchParameters.batchAttributes.logsPolicy match {
case GcpBatchLogsPolicy.CloudLogging =>
LogsPolicy.newBuilder.setDestination(Destination.CLOUD_LOGGING).build
case GcpBatchLogsPolicy.Path =>

val logsPolicy = data.createParameters.targetLogFile match {
case None => LogsPolicy.newBuilder.setDestination(Destination.CLOUD_LOGGING).build

case Some(targetLogFile) =>
LogsPolicy.newBuilder
.setDestination(Destination.PATH)
.setLogsPath(data.gcpBatchParameters.logfile.toString)
.setLogsPath(targetLogFile.diskPath.pathAsString)
.build
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import wom.values._
import scala.util.Try

object GcpBatchAttachedDisk {
// The mount point for the Cloud Storage bucket
val GcsMountPoint = "/mnt/disks/gcs"

def parse(s: String): Try[GcpBatchAttachedDisk] = {

def sizeGbValidation(sizeGbString: String): ErrorOr[Int] = validateLong(sizeGbString).map(_.toInt)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
package cromwell.backend.google.batch.models

import cromwell.backend.BackendJobDescriptor
import cromwell.core.path.Path

case class CreateGcpBatchParameters(jobDescriptor: BackendJobDescriptor,
runtimeAttributes: GcpBatchRuntimeAttributes,
batchAttributes: GcpBatchConfigurationAttributes,
projectId: String,
region: String,
logfile: Path
region: String
)
Loading