Skip to content

Commit

Permalink
return HDF5File from sim workflow in instead of result_s3_path
Browse files Browse the repository at this point in the history
  • Loading branch information
jcschaff committed Jan 13, 2025
1 parent 562a897 commit dfe82c1
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 45 deletions.
32 changes: 9 additions & 23 deletions biosim_server/omex_sim/workflows/omex_sim_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@
from temporalio.common import RetryPolicy

from biosim_server.omex_sim.biosim1.models import BiosimSimulationRun, BiosimSimulationRunStatus, HDF5File, \
Hdf5DataValues, SourceOmex, BiosimSimulatorSpec
from biosim_server.omex_sim.workflows.biosim_activities import get_hdf5_metadata, get_hdf5_data
SourceOmex, BiosimSimulatorSpec
from biosim_server.omex_sim.workflows.biosim_activities import get_hdf5_metadata
from biosim_server.omex_sim.workflows.biosim_activities import get_sim_run, submit_biosim_sim, \
SubmitBiosimSimInput, GetSimRunInput, GetHdf5DataInput, GetHdf5MetadataInput
SubmitBiosimSimInput, GetSimRunInput, GetHdf5MetadataInput


class OmexSimWorkflowInput(BaseModel):
Expand All @@ -30,7 +30,7 @@ class OmexSimWorkflowOutput(BaseModel):
workflow_input: OmexSimWorkflowInput
workflow_status: OmexSimWorkflowStatus
biosim_run: BiosimSimulationRun | None = None
result_s3_path: str | None = None
hdf5_file: HDF5File | None = None


@workflow.defn
Expand Down Expand Up @@ -96,24 +96,10 @@ async def run(self, sim_input: OmexSimWorkflowInput) -> OmexSimWorkflowOutput:
retry_policy=RetryPolicy(maximum_attempts=100, maximum_interval=timedelta(seconds=5), backoff_coefficient=2.0),
)

workflow.logger.info(f"retrieved HDF5File for simulation_run_id: {self.sim_output.biosim_run.id}")

results_dict: dict[str, Hdf5DataValues] = {}
for group in hdf5_file.groups:
for dataset in group.datasets:
workflow.logger.info(f"getting data for dataset: {dataset.name}")
hdf5_data_values: Hdf5DataValues = await workflow.execute_activity(
get_hdf5_data,
args=[GetHdf5DataInput(simulation_run_id=self.sim_output.biosim_run.id, dataset_name=dataset.name)],
start_to_close_timeout=timedelta(seconds=60),
retry_policy=RetryPolicy(maximum_attempts=100, maximum_interval=timedelta(seconds=5), backoff_coefficient=2.0)
)
results_dict[dataset.name] = hdf5_data_values

workflow.logger.info(f"retrieved Simulation run data for simulation_run_id: {self.sim_output.biosim_run.id}")

# Simulate SLURM job monitoring (replace with actual monitoring activity)
# await workflow.sleep(1) # Simulating job run time
self.sim_output.result_s3_path = f"s3://bucket-name/results/{self.sim_output.workflow_input.simulator_spec.simulator}.output"
workflow.logger.info(
f"Simulation run metadata for simulation_run_id: {self.sim_output.biosim_run.id} is {hdf5_file.model_dump_json()}")

self.sim_output.hdf5_file = hdf5_file

self.sim_output.workflow_status = OmexSimWorkflowStatus.COMPLETED
return self.sim_output
41 changes: 20 additions & 21 deletions biosim_server/verify/workflows/omex_verify_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@
import logging
from datetime import timedelta
from enum import StrEnum
from typing import Any, Optional
from typing import Any, Optional, Coroutine

from pydantic import BaseModel
from temporalio import workflow
from temporalio.common import RetryPolicy
from temporalio.workflow import ChildWorkflowHandle

from biosim_server.omex_sim.biosim1.models import BiosimSimulatorSpec, SourceOmex, Hdf5DataValues
from biosim_server.omex_sim.workflows.omex_sim_workflow import OmexSimWorkflow, OmexSimWorkflowInput
from biosim_server.verify.workflows.activities import generate_statistics
from biosim_server.omex_sim.workflows.omex_sim_workflow import OmexSimWorkflow, OmexSimWorkflowInput, \
OmexSimWorkflowOutput


class OmexVerifyWorkflowStatus(StrEnum):
Expand Down Expand Up @@ -77,16 +76,14 @@ async def run(self, verify_input: OmexVerifyWorkflowInput) -> OmexVerifyWorkflow
workflow.logger.info("Main workflow started.")

# Launch child workflows for each simulator
child_workflows: list[Any] = []
child_workflows: list[
Coroutine[Any, Any, ChildWorkflowHandle[OmexSimWorkflowInput, OmexSimWorkflowOutput]]] = []
for simulator_spec in verify_input.requested_simulators:
child_workflows.append(
workflow.start_child_workflow(
OmexSimWorkflow.run,
args=[OmexSimWorkflowInput(
source_omex=verify_input.source_omex,
simulator_spec=simulator_spec)],
task_queue="verification_tasks",
execution_timeout=timedelta(minutes=10),
workflow.start_child_workflow(OmexSimWorkflow.run, # type: ignore
args=[OmexSimWorkflowInput(source_omex=verify_input.source_omex, simulator_spec=simulator_spec)],
result_type=OmexSimWorkflowOutput,
task_queue="verification_tasks", execution_timeout=timedelta(minutes=10),
)
)

Expand All @@ -99,7 +96,8 @@ async def run(self, verify_input: OmexVerifyWorkflowInput) -> OmexVerifyWorkflow
#

# Wait for all child workflows to complete
child_results: list[ChildWorkflowHandle[dict[str, str], Any]] = await asyncio.gather(*child_workflows)
child_results: list[ChildWorkflowHandle[OmexSimWorkflowInput, OmexSimWorkflowOutput]] = await asyncio.gather(
*child_workflows)
# print types of all members of child_results
for i in child_results:
workflow.logger.info(f"child_results member type is {type(i)}")
Expand All @@ -110,18 +108,19 @@ async def run(self, verify_input: OmexVerifyWorkflowInput) -> OmexVerifyWorkflow
real_results: list[str] = []
for i in child_results:
workflow.logger.info(f"child_results member type is {type(i)}")
a: ChildWorkflowHandle[dict[str,str], Any] = i
a = i
real_results.append(str(await a))
workflow.logger.info(f"real_results member type is {type(a.result())}")

# Generate comparison report
report_location = await workflow.execute_activity(
generate_statistics,
arg=real_results,
start_to_close_timeout=timedelta(seconds=10),
retry_policy=RetryPolicy(maximum_attempts=100, backoff_coefficient=2.0, maximum_interval=timedelta(seconds=10)),
)
workflow.logger.info(f"Report generated at: {report_location}")
# report_location = await workflow.execute_activity(
# generate_statistics,
# arg=real_results,
# result_type=GenerateStatisticsOutput,
# start_to_close_timeout=timedelta(seconds=10),
# retry_policy=RetryPolicy(maximum_attempts=100, backoff_coefficient=2.0, maximum_interval=timedelta(seconds=10)),
# )
# workflow.logger.info(f"Report generated at: {report_location}")

self.verify_output.workflow_status = OmexVerifyWorkflowStatus.COMPLETED
return self.verify_output
3 changes: 2 additions & 1 deletion tests/workflows/test_verify_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,13 @@ async def test_sim_workflow(temporal_client: Client, temporal_verify_worker: Wor
simulatorVersion=sim_spec.version or "latest",
simulatorDigest=uuid.uuid4().hex,
status=BiosimSimulationRunStatus.SUCCEEDED),
result_s3_path='s3://bucket-name/results/vcell.output'
hdf5_file=None
)
if expected_results.biosim_run and workflow_handle_result.biosim_run:
expected_results.biosim_run.id = workflow_handle_result.biosim_run.id
expected_results.biosim_run.simulatorVersion = workflow_handle_result.biosim_run.simulatorVersion
expected_results.biosim_run.simulatorDigest = workflow_handle_result.biosim_run.simulatorDigest
expected_results.hdf5_file = workflow_handle_result.hdf5_file
assert workflow_handle_result == expected_results


Expand Down

0 comments on commit dfe82c1

Please sign in to comment.