Skip to content

Commit

Permalink
Merge branch 'develop' into AN-146-batch-vm-cost
Browse files Browse the repository at this point in the history
  • Loading branch information
lucymcnatt authored Dec 9, 2024
2 parents b4e437b + e917e52 commit 757ffae
Show file tree
Hide file tree
Showing 9 changed files with 61 additions and 223 deletions.
108 changes: 0 additions & 108 deletions .github/workflows/combine_scalasteward_prs.yml

This file was deleted.

77 changes: 0 additions & 77 deletions .scala-steward.conf

This file was deleted.

2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ be found [here](https://cromwell.readthedocs.io/en/stable/backends/HPC/#optional
- Fixes the reference disk feature.
- Fixes pulling Docker image metadata from private GCR repositories.
- Fixed `google_project` and `google_compute_service_account` workflow options not taking effect when using GCP Batch backend
- Added a way to use a custom LogsPolicy for the job execution, setting `backend.providers.batch.config.batch.logs-policy` to "CLOUD_LOGGING" (default) keeps the current behavior, or, set it to "PATH" to save the logs into the the mounted disk, at the end, this log file gets copied to the google cloud storage bucket with "task.log" as the name.
- Added a way to use a custom LogsPolicy for the job execution, setting `backend.providers.batch.config.batch.logs-policy` to "CLOUD_LOGGING" (default) keeps the current behavior, or, set it to "PATH" to stream the logs to Google Cloud Storage.
- When "CLOUD_LOGGING" is used, many more Cromwell / WDL labels for workflow, root workflow, call, shard etc. are now assigned to GCP Batch log entries.
- Fixed subnet selection for networks that use custom subnet creation

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -607,14 +607,39 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
// not the `stderr` file contained memory retry error keys
val retryWithMoreMemoryKeys: Option[List[String]] = memoryRetryFactor.flatMap(_ => memoryRetryErrorKeys)

val targetLogFile = batchAttributes.logsPolicy match {
case GcpBatchLogsPolicy.CloudLogging => None
case GcpBatchLogsPolicy.Path =>
DefaultPathBuilder.build(
gcpBatchLogPath.pathAsString.replace(
gcpBatchLogPath.root.pathAsString,
GcpBatchAttachedDisk.GcsMountPoint + "/"
)
) match {
case Failure(exception) =>
throw new RuntimeException(
"Unable to use GcpBatchLogsPolicy.Path because the destination path could not be built, this is likely a programming error and a bug must be reported",
exception
)
case Success(path) =>
// remove trailing slash
val bucket = workflowPaths.workflowRoot.root.pathWithoutScheme.replace("/", "")

log.info(s"Batch logs for workflow $workflowId will be streamed to GCS at: $gcpBatchLogPath")

Some(
GcpBatchLogFile(gcsBucket = bucket, mountPath = GcpBatchAttachedDisk.GcsMountPoint, diskPath = path)
)
}
}

CreateBatchJobParameters(
jobDescriptor = jobDescriptor,
runtimeAttributes = runtimeAttributes,
dockerImage = jobDockerImage,
cloudWorkflowRoot = workflowPaths.workflowRoot,
cloudCallRoot = callRootPath,
commandScriptContainerPath = cmdInput.containerPath,
logGcsPath = gcpBatchLogPath,
inputOutputParameters = inputOutputParameters,
projectId = googleProject(jobDescriptor.workflowDescriptor),
computeServiceAccount = computeServiceAccount(jobDescriptor.workflowDescriptor),
Expand All @@ -632,7 +657,8 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
checkpointingConfiguration,
enableSshAccess = enableSshAccess,
vpcNetworkAndSubnetworkProjectLabels = data.vpcNetworkAndSubnetworkProjectLabels,
dockerhubCredentials = dockerhubCredentials
dockerhubCredentials = dockerhubCredentials,
targetLogFile = targetLogFile
)
case Some(other) =>
throw new RuntimeException(s"Unexpected initialization data: $other")
Expand Down Expand Up @@ -838,16 +864,6 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
contentType = plainTextContentType
)

val logFileOutput = GcpBatchFileOutput(
logFilename,
logGcsPath,
DefaultPathBuilder.get(logFilename),
workingDisk,
optional = true,
secondary = false,
contentType = plainTextContentType
)

val memoryRetryRCFileOutput = GcpBatchFileOutput(
memoryRetryRCFilename,
memoryRetryRCGcsPath,
Expand Down Expand Up @@ -888,8 +904,7 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
DetritusOutputParameters(
monitoringScriptOutputParameter = monitoringOutput,
rcFileOutputParameter = rcFileOutput,
memoryRetryRCFileOutputParameter = memoryRetryRCFileOutput,
logFileOutputParameter = logFileOutput
memoryRetryRCFileOutputParameter = memoryRetryRCFileOutput
)
)

Expand All @@ -907,10 +922,7 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
runtimeAttributes = runtimeAttributes,
batchAttributes = batchAttributes,
projectId = batchAttributes.project,
region = batchAttributes.location,
logfile = createParameters.commandScriptContainerPath.sibling(
batchParameters.detritusOutputParameters.logFileOutputParameter.name
)
region = batchAttributes.location
)

drsLocalizationManifestCloudPath = jobPaths.callExecutionRoot / GcpBatchJobPaths.DrsLocalizationManifestName
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ trait GcpBatchJobCachingActorHelper extends StandardCachingActorHelper {
lazy val gcpBatchLogPath: Path = gcpBatchCallPaths.batchLogPath
lazy val memoryRetryRCFilename: String = gcpBatchCallPaths.memoryRetryRCFilename
lazy val memoryRetryRCGcsPath: Path = gcpBatchCallPaths.memoryRetryRC

lazy val logFilename: String = "task.log"
lazy val logGcsPath: Path = gcpBatchCallPaths.callExecutionRoot.resolve(logFilename)

lazy val batchAttributes: GcpBatchConfigurationAttributes = batchConfiguration.batchAttributes

lazy val defaultLabels: Labels = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,9 @@ object GcpBatchRequestFactory {
case class DetritusOutputParameters(
monitoringScriptOutputParameter: Option[GcpBatchFileOutput],
rcFileOutputParameter: GcpBatchFileOutput,
memoryRetryRCFileOutputParameter: GcpBatchFileOutput,
logFileOutputParameter: GcpBatchFileOutput
memoryRetryRCFileOutputParameter: GcpBatchFileOutput
) {
def all: List[GcpBatchFileOutput] = memoryRetryRCFileOutputParameter ::
logFileOutputParameter ::
rcFileOutputParameter ::
monitoringScriptOutputParameter.toList
}
Expand All @@ -67,13 +65,21 @@ object GcpBatchRequestFactory {

case class CreateBatchDockerKeyAndToken(key: String, encryptedToken: String)

/**
* Defines the values used for streaming the job logs to GCS.
*
* @param gcsBucket the Cloud Storage bucket where the log file should be streamed to.
* @param mountPath the path where the Cloud Storage bucket will be mounted to.
* @param diskPath the path in the mounted disk where the log file should be written to.
*/
case class GcpBatchLogFile(gcsBucket: String, mountPath: String, diskPath: Path)

case class CreateBatchJobParameters(jobDescriptor: BackendJobDescriptor,
runtimeAttributes: GcpBatchRuntimeAttributes,
dockerImage: String,
cloudWorkflowRoot: Path,
cloudCallRoot: Path,
commandScriptContainerPath: Path,
logGcsPath: Path,
inputOutputParameters: InputOutputParameters,
projectId: String,
computeServiceAccount: String,
Expand All @@ -91,7 +97,8 @@ object GcpBatchRequestFactory {
checkpointingConfiguration: CheckpointingConfiguration,
enableSshAccess: Boolean,
vpcNetworkAndSubnetworkProjectLabels: Option[VpcAndSubnetworkProjectLabelValues],
dockerhubCredentials: (String, String)
dockerhubCredentials: (String, String),
targetLogFile: Option[GcpBatchLogFile]
) {
def outputParameters = inputOutputParameters.fileOutputParameters
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import com.google.cloud.batch.v1.{
ComputeResource,
CreateJobRequest,
DeleteJobRequest,
GCS,
GetJobRequest,
Job,
JobName,
Expand All @@ -24,7 +25,7 @@ import com.google.cloud.batch.v1.{
import com.google.protobuf.Duration
import cromwell.backend.google.batch.io.GcpBatchAttachedDisk
import cromwell.backend.google.batch.models.GcpBatchConfigurationAttributes.GcsTransferConfiguration
import cromwell.backend.google.batch.models.{GcpBatchLogsPolicy, GcpBatchRequest, VpcAndSubnetworkProjectLabelValues}
import cromwell.backend.google.batch.models.{GcpBatchRequest, VpcAndSubnetworkProjectLabelValues}
import cromwell.backend.google.batch.runnable._
import cromwell.backend.google.batch.util.{BatchUtilityConversions, GcpBatchMachineConstraints}
import cromwell.core.labels.{Label, Labels}
Expand Down Expand Up @@ -228,7 +229,12 @@ class GcpBatchRequestFactoryImpl()(implicit gcsTransferConfiguration: GcsTransfe
val networkInterface = createNetwork(data = data)
val networkPolicy = createNetworkPolicy(networkInterface.build())
val allDisks = toDisks(allDisksToBeMounted)
val allVolumes = toVolumes(allDisksToBeMounted)
val allVolumes = toVolumes(allDisksToBeMounted) ::: createParameters.targetLogFile.map { targetLogFile =>
Volume.newBuilder
.setGcs(GCS.newBuilder().setRemotePath(targetLogFile.gcsBucket))
.setMountPath(targetLogFile.mountPath)
.build()
}.toList

val containerSetup: List[Runnable] = containerSetupRunnables(allVolumes)
val localization: List[Runnable] = localizeRunnables(createParameters, allVolumes)
Expand Down Expand Up @@ -266,13 +272,14 @@ class GcpBatchRequestFactoryImpl()(implicit gcsTransferConfiguration: GcsTransfe
val locationPolicy = LocationPolicy.newBuilder.addAllAllowedLocations(zones.asJava).build
val allocationPolicy =
createAllocationPolicy(data, locationPolicy, instancePolicy.build, networkPolicy, gcpSa, accelerators)
val logsPolicy = data.gcpBatchParameters.batchAttributes.logsPolicy match {
case GcpBatchLogsPolicy.CloudLogging =>
LogsPolicy.newBuilder.setDestination(Destination.CLOUD_LOGGING).build
case GcpBatchLogsPolicy.Path =>

val logsPolicy = data.createParameters.targetLogFile match {
case None => LogsPolicy.newBuilder.setDestination(Destination.CLOUD_LOGGING).build

case Some(targetLogFile) =>
LogsPolicy.newBuilder
.setDestination(Destination.PATH)
.setLogsPath(data.gcpBatchParameters.logfile.toString)
.setLogsPath(targetLogFile.diskPath.pathAsString)
.build
}

Expand Down
Loading

0 comments on commit 757ffae

Please sign in to comment.