Skip to content

Commit

Permalink
more tests + add support for missing location
Browse files Browse the repository at this point in the history
undo accidental delete

fix

another fix

formatting

remove extra log
  • Loading branch information
lucymcnatt committed Dec 9, 2024
1 parent 2af8d35 commit 3134ed3
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 13 deletions.
5 changes: 5 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"files.watcherExclude": {
"**/target": true
}
}
32 changes: 32 additions & 0 deletions runConfigurations/Repo template_ Cromwell server.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Repo template: Cromwell server" type="Application" factoryName="Application">
<option name="ALTERNATIVE_JRE_PATH" value="$USER_HOME$/.sdkman/candidates/java/current" />
<envs>
<env name="CROMWELL_BUILD_CENTAUR_SLICK_PROFILE" value="slick.jdbc.MySQLProfile$" />
<env name="CROMWELL_BUILD_CENTAUR_JDBC_DRIVER" value="com.mysql.cj.jdbc.Driver" />
<env name="CROMWELL_BUILD_CENTAUR_JDBC_URL" value="jdbc:mysql://localhost:3306/cromwell_test?allowPublicKeyRetrieval=true&amp;useSSL=false&amp;rewriteBatchedStatements=true&amp;serverTimezone=UTC&amp;useInformationSchema=true" />
<env name="CROMWELL_BUILD_RESOURCES_DIRECTORY" value="target/ci/resources" />
<env name="CROMWELL_BUILD_PAPI_JSON_FILE" value="target/ci/resources/cromwell-centaur-service-account.json" />
<env name="CROMWELL_BUILD_CENTAUR_READ_LINES_LIMIT" value="128000" />
<env name="CROMWELL_BUILD_CENTAUR_256_BITS_KEY" value="AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=" />
<env name="GOOGLE_APPLICATION_CREDENTIALS" value="target/ci/resources/cromwell-centaur-service-account.json" />
</envs>
<option name="MAIN_CLASS_NAME" value="cromwell.CromwellApp" />
<module name="root.cromwell" />
<option name="PROGRAM_PARAMETERS" value="server" />
<option name="VM_PARAMETERS" value="-Dconfig.file=target/ci/resources/gcp_batch_application.conf" />
<extension name="net.ashald.envfile">
<option name="IS_ENABLED" value="false" />
<option name="IS_SUBST" value="false" />
<option name="IS_PATH_MACRO_SUPPORTED" value="false" />
<option name="IS_IGNORE_MISSING_FILES" value="false" />
<option name="IS_ENABLE_EXPERIMENTAL_INTEGRATIONS" value="false" />
<ENTRIES>
<ENTRY IS_ENABLED="true" PARSER="runconfig" IS_EXECUTABLE="false" />
</ENTRIES>
</extension>
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
</component>
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,18 @@ class GcpBatchAsyncBackendJobExecutionActor(override val standardParams: Standar
} yield status
}

override val pollingResultMonitorActor: Option[ActorRef] = Option(
context.actorOf(
BatchPollResultMonitorActor.props(serviceRegistryActor,
workflowDescriptor,
jobDescriptor,
validatedRuntimeAttributes,
platform,
jobLogger
)
)
)

override def isTerminal(runStatus: RunStatus): Boolean =
runStatus match {
case _: RunStatus.TerminalRunStatus => true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import cromwell.backend.google.batch.api.{BatchApiRequestManager, BatchApiRespon
import cromwell.backend.google.batch.models.{GcpBatchExitCode, RunStatus}
import cromwell.core.ExecutionEvent
import cromwell.services.cost.InstantiatedVmInfo
import cromwell.services.metadata.CallMetadataKeys

import scala.annotation.unused
import scala.concurrent.{ExecutionContext, Future, Promise}
Expand Down Expand Up @@ -146,9 +147,14 @@ object BatchRequestExecutor {
val machineType = instancePolicy.getMachineType
val preemtible = instancePolicy.getProvisioningModelValue == ProvisioningModel.PREEMPTIBLE.getNumber

// Each location can be a region or a zone. Only one region is supported, ex: "regions/us-central1"
val location = allocationPolicy.getLocation.getAllowedLocations(0)
val region = location.split("/").last
// location list = [regions/us-central1, zones/us-central1-b], region is the first element
val location = allocationPolicy.getLocation.getAllowedLocationsList.get(0)
val region =
if (location.isEmpty)
"us-central1"
else
location.split("/").last

val instantiatedVmInfo = Some(InstantiatedVmInfo(region, machineType, preemtible))

if (job.getStatus.getState == JobStatus.State.SUCCEEDED) {
Expand All @@ -167,12 +173,20 @@ object BatchRequestExecutor {
GcpBatchExitCode.fromEventMessage(e.name.toLowerCase)
}.headOption

private def getEventList(events: List[StatusEvent]): List[ExecutionEvent] =
private def getEventList(events: List[StatusEvent]): List[ExecutionEvent] = {
val startedRegex = ".*SCHEDULED to RUNNING.*".r
val endedRegex = ".*RUNNING to.*".r // can be SUCCEEDED or FAILED
events.map { e =>
val time = java.time.Instant
.ofEpochSecond(e.getEventTime.getSeconds, e.getEventTime.getNanos.toLong)
.atOffset(java.time.ZoneOffset.UTC)
ExecutionEvent(name = e.getDescription, offsetDateTime = time)
val eventType = e.getDescription match {
case startedRegex() => CallMetadataKeys.VmStartTime
case endedRegex() => CallMetadataKeys.VmEndTime
case _ => e.getType
}
ExecutionEvent(name = eventType, offsetDateTime = time)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@ import com.google.cloud.batch.v1.{
BatchServiceSettings,
GetJobRequest,
Job,
JobStatus
JobStatus,
StatusEvent
}
import com.google.cloud.batch.v1.AllocationPolicy.{
InstancePolicy,
InstancePolicyOrTemplate,
LocationPolicy,
ProvisioningModel
}
import com.google.cloud.batch.v1.JobStatus.State
import com.google.protobuf.Timestamp
import common.mock.MockSugar
import cromwell.backend.google.batch.api.BatchApiResponse
import cromwell.backend.google.batch.models.RunStatus
Expand All @@ -32,30 +35,57 @@ class BatchRequestExecutorSpec
with MockSugar
with PrivateMethodTester {

behavior of "BatchRequestExecutor"

it should "create instantiatedVmInfo correctly" in {

def setupBatchClient(machineType: String = "n1-standard-1",
location: String = "regions/us-central1",
jobState: State = JobStatus.State.SUCCEEDED
): BatchServiceClient = {
val instancePolicy = InstancePolicy
.newBuilder()
.setMachineType("n1-standard-1")
.setMachineType(machineType)
.setProvisioningModel(ProvisioningModel.PREEMPTIBLE)
.build()

val allocationPolicy = AllocationPolicy
.newBuilder()
.setLocation(LocationPolicy.newBuilder().addAllowedLocations("regions/us-central1"))
.setLocation(LocationPolicy.newBuilder().addAllowedLocations(location))
.addInstances(InstancePolicyOrTemplate.newBuilder().setPolicy(instancePolicy))
.build()

val jobStatus = JobStatus.newBuilder().setState(JobStatus.State.RUNNING).build()
val startStatusEvent = StatusEvent
.newBuilder()
.setType("STATUS_CHANGED")
.setEventTime(Timestamp.newBuilder().setSeconds(1).build())
.setDescription("Job state is set from SCHEDULED to RUNNING for job...")
.build()

val endStatusEvent = StatusEvent
.newBuilder()
.setType("STATUS_CHANGED")
.setEventTime(Timestamp.newBuilder().setSeconds(2).build())
.setDescription("Job state is set from RUNNING to SOME_OTHER_STATUS for job...")
.build()

val jobStatus = JobStatus
.newBuilder()
.setState(jobState)
.addStatusEvents(startStatusEvent)
.addStatusEvents(endStatusEvent)
.build()

val job = Job.newBuilder().setAllocationPolicy(allocationPolicy).setStatus(jobStatus).build()

val mockClient = mock[BatchServiceClient]
doReturn(job).when(mockClient).getJob(any[GetJobRequest])
doReturn(job).when(mockClient).getJob(any[String])

mockClient
}

behavior of "BatchRequestExecutor"

it should "create instantiatedVmInfo correctly" in {

val mockClient = setupBatchClient(jobState = JobStatus.State.RUNNING)
// Create the BatchRequestExecutor
val batchRequestExecutor = new BatchRequestExecutor.CloudImpl(BatchServiceSettings.newBuilder().build())

Expand All @@ -72,4 +102,90 @@ class BatchRequestExecutorSpec
case _ => fail("Expected RunStatus.Running with instantiatedVmInfo")
}
}

it should "create instantiatedVmInfo correctly with different location info" in {

val mockClient = setupBatchClient(location = "zones/us-central1-a", jobState = JobStatus.State.RUNNING)

// Create the BatchRequestExecutor
val batchRequestExecutor = new BatchRequestExecutor.CloudImpl(BatchServiceSettings.newBuilder().build())

// testing a private method see https://www.scalatest.org/user_guide/using_PrivateMethodTester
val internalGetHandler = PrivateMethod[BatchApiResponse.StatusQueried](Symbol("internalGetHandler"))
val result = batchRequestExecutor invokePrivate internalGetHandler(mockClient, GetJobRequest.newBuilder().build())

// Verify the instantiatedVmInfo
result.status match {
case RunStatus.Running(_, Some(instantiatedVmInfo)) =>
instantiatedVmInfo.region shouldBe "us-central1-a"
instantiatedVmInfo.machineType shouldBe "n1-standard-1"
instantiatedVmInfo.preemptible shouldBe true
case _ => fail("Expected RunStatus.Running with instantiatedVmInfo")
}
}

it should "create instantiatedVmInfo correctly with missing location info" in {

val mockClient = setupBatchClient(jobState = JobStatus.State.RUNNING)

// Create the BatchRequestExecutor
val batchRequestExecutor = new BatchRequestExecutor.CloudImpl(BatchServiceSettings.newBuilder().build())

// testing a private method see https://www.scalatest.org/user_guide/using_PrivateMethodTester
val internalGetHandler = PrivateMethod[BatchApiResponse.StatusQueried](Symbol("internalGetHandler"))
val result = batchRequestExecutor invokePrivate internalGetHandler(mockClient, GetJobRequest.newBuilder().build())

// Verify the instantiatedVmInfo
result.status match {
case RunStatus.Running(_, Some(instantiatedVmInfo)) =>
instantiatedVmInfo.region shouldBe "us-central1"
instantiatedVmInfo.machineType shouldBe "n1-standard-1"
instantiatedVmInfo.preemptible shouldBe true
case _ => fail("Expected RunStatus.Running with instantiatedVmInfo")
}
}

it should "send vmStartTime and vmEndTime metadata info when a workflow succeeds" in {

val mockClient = setupBatchClient()

// Create the BatchRequestExecutor
val batchRequestExecutor = new BatchRequestExecutor.CloudImpl(BatchServiceSettings.newBuilder().build())

// testing a private method see https://www.scalatest.org/user_guide/using_PrivateMethodTester
val internalGetHandler = PrivateMethod[BatchApiResponse.StatusQueried](Symbol("internalGetHandler"))
val result = batchRequestExecutor invokePrivate internalGetHandler(mockClient, GetJobRequest.newBuilder().build())

// Verify the events
result.status match {
case RunStatus.Success(events, _) =>
val eventNames = events.map(_.name)
val eventTimes = events.map(_.offsetDateTime.toString)
eventNames should contain theSameElementsAs List("vmStartTime", "vmEndTime")
eventTimes should contain theSameElementsAs List("1970-01-01T00:00:01Z", "1970-01-01T00:00:02Z")
case _ => fail("Expected RunStatus.Success with events")
}
}

it should "send vmStartTime and vmEndTime metadata info when a workflow fails" in {
val mockClient = setupBatchClient(jobState = JobStatus.State.FAILED)

// Create the BatchRequestExecutor
val batchRequestExecutor = new BatchRequestExecutor.CloudImpl(BatchServiceSettings.newBuilder().build())

// testing a private method see https://www.scalatest.org/user_guide/using_PrivateMethodTester
val internalGetHandler = PrivateMethod[BatchApiResponse.StatusQueried](Symbol("internalGetHandler"))
val result = batchRequestExecutor invokePrivate internalGetHandler(mockClient, GetJobRequest.newBuilder().build())

// Verify the events
result.status match {
case RunStatus.Failed(_, events, _) =>
val eventNames = events.map(_.name)
val eventTimes = events.map(_.offsetDateTime.toString)
eventNames should contain theSameElementsAs List("vmStartTime", "vmEndTime")
eventTimes should contain theSameElementsAs List("1970-01-01T00:00:01Z", "1970-01-01T00:00:02Z")
case _ => fail("Expected RunStatus.Success with events")
}
}

}

0 comments on commit 3134ed3

Please sign in to comment.