From dfe82c1a628f64134ff3dde2d87974b8976c1734 Mon Sep 17 00:00:00 2001 From: jcschaff Date: Mon, 13 Jan 2025 17:18:39 -0500 Subject: [PATCH] return HDF5File from sim workflow in instead of result_s3_path --- .../omex_sim/workflows/omex_sim_workflow.py | 32 ++++----------- .../verify/workflows/omex_verify_workflow.py | 41 +++++++++---------- tests/workflows/test_verify_workflows.py | 3 +- 3 files changed, 31 insertions(+), 45 deletions(-) diff --git a/biosim_server/omex_sim/workflows/omex_sim_workflow.py b/biosim_server/omex_sim/workflows/omex_sim_workflow.py index 260f8a0..29a0885 100644 --- a/biosim_server/omex_sim/workflows/omex_sim_workflow.py +++ b/biosim_server/omex_sim/workflows/omex_sim_workflow.py @@ -7,10 +7,10 @@ from temporalio.common import RetryPolicy from biosim_server.omex_sim.biosim1.models import BiosimSimulationRun, BiosimSimulationRunStatus, HDF5File, \ - Hdf5DataValues, SourceOmex, BiosimSimulatorSpec -from biosim_server.omex_sim.workflows.biosim_activities import get_hdf5_metadata, get_hdf5_data + SourceOmex, BiosimSimulatorSpec +from biosim_server.omex_sim.workflows.biosim_activities import get_hdf5_metadata from biosim_server.omex_sim.workflows.biosim_activities import get_sim_run, submit_biosim_sim, \ - SubmitBiosimSimInput, GetSimRunInput, GetHdf5DataInput, GetHdf5MetadataInput + SubmitBiosimSimInput, GetSimRunInput, GetHdf5MetadataInput class OmexSimWorkflowInput(BaseModel): @@ -30,7 +30,7 @@ class OmexSimWorkflowOutput(BaseModel): workflow_input: OmexSimWorkflowInput workflow_status: OmexSimWorkflowStatus biosim_run: BiosimSimulationRun | None = None - result_s3_path: str | None = None + hdf5_file: HDF5File | None = None @workflow.defn @@ -96,24 +96,10 @@ async def run(self, sim_input: OmexSimWorkflowInput) -> OmexSimWorkflowOutput: retry_policy=RetryPolicy(maximum_attempts=100, maximum_interval=timedelta(seconds=5), backoff_coefficient=2.0), ) - workflow.logger.info(f"retrieved HDF5File for simulation_run_id: {self.sim_output.biosim_run.id}") - - results_dict: dict[str, Hdf5DataValues] = {} - for group in hdf5_file.groups: - for dataset in group.datasets: - workflow.logger.info(f"getting data for dataset: {dataset.name}") - hdf5_data_values: Hdf5DataValues = await workflow.execute_activity( - get_hdf5_data, - args=[GetHdf5DataInput(simulation_run_id=self.sim_output.biosim_run.id, dataset_name=dataset.name)], - start_to_close_timeout=timedelta(seconds=60), - retry_policy=RetryPolicy(maximum_attempts=100, maximum_interval=timedelta(seconds=5), backoff_coefficient=2.0) - ) - results_dict[dataset.name] = hdf5_data_values - - workflow.logger.info(f"retrieved Simulation run data for simulation_run_id: {self.sim_output.biosim_run.id}") - - # Simulate SLURM job monitoring (replace with actual monitoring activity) - # await workflow.sleep(1) # Simulating job run time - self.sim_output.result_s3_path = f"s3://bucket-name/results/{self.sim_output.workflow_input.simulator_spec.simulator}.output" + workflow.logger.info( + f"Simulation run metadata for simulation_run_id: {self.sim_output.biosim_run.id} is {hdf5_file.model_dump_json()}") + + self.sim_output.hdf5_file = hdf5_file + self.sim_output.workflow_status = OmexSimWorkflowStatus.COMPLETED return self.sim_output diff --git a/biosim_server/verify/workflows/omex_verify_workflow.py b/biosim_server/verify/workflows/omex_verify_workflow.py index 5b2ad2f..bdd151c 100644 --- a/biosim_server/verify/workflows/omex_verify_workflow.py +++ b/biosim_server/verify/workflows/omex_verify_workflow.py @@ -2,16 +2,15 @@ import logging from datetime import timedelta from enum import StrEnum -from typing import Any, Optional +from typing import Any, Optional, Coroutine from pydantic import BaseModel from temporalio import workflow -from temporalio.common import RetryPolicy from temporalio.workflow import ChildWorkflowHandle from biosim_server.omex_sim.biosim1.models import BiosimSimulatorSpec, SourceOmex, Hdf5DataValues -from biosim_server.omex_sim.workflows.omex_sim_workflow import OmexSimWorkflow, OmexSimWorkflowInput -from biosim_server.verify.workflows.activities import generate_statistics +from biosim_server.omex_sim.workflows.omex_sim_workflow import OmexSimWorkflow, OmexSimWorkflowInput, \ + OmexSimWorkflowOutput class OmexVerifyWorkflowStatus(StrEnum): @@ -77,16 +76,14 @@ async def run(self, verify_input: OmexVerifyWorkflowInput) -> OmexVerifyWorkflow workflow.logger.info("Main workflow started.") # Launch child workflows for each simulator - child_workflows: list[Any] = [] + child_workflows: list[ + Coroutine[Any, Any, ChildWorkflowHandle[OmexSimWorkflowInput, OmexSimWorkflowOutput]]] = [] for simulator_spec in verify_input.requested_simulators: child_workflows.append( - workflow.start_child_workflow( - OmexSimWorkflow.run, - args=[OmexSimWorkflowInput( - source_omex=verify_input.source_omex, - simulator_spec=simulator_spec)], - task_queue="verification_tasks", - execution_timeout=timedelta(minutes=10), + workflow.start_child_workflow(OmexSimWorkflow.run, # type: ignore + args=[OmexSimWorkflowInput(source_omex=verify_input.source_omex, simulator_spec=simulator_spec)], + result_type=OmexSimWorkflowOutput, + task_queue="verification_tasks", execution_timeout=timedelta(minutes=10), ) ) @@ -99,7 +96,8 @@ async def run(self, verify_input: OmexVerifyWorkflowInput) -> OmexVerifyWorkflow # # Wait for all child workflows to complete - child_results: list[ChildWorkflowHandle[dict[str, str], Any]] = await asyncio.gather(*child_workflows) + child_results: list[ChildWorkflowHandle[OmexSimWorkflowInput, OmexSimWorkflowOutput]] = await asyncio.gather( + *child_workflows) # print types of all members of child_results for i in child_results: workflow.logger.info(f"child_results member type is {type(i)}") @@ -110,18 +108,19 @@ async def run(self, verify_input: OmexVerifyWorkflowInput) -> OmexVerifyWorkflow real_results: list[str] = [] for i in child_results: workflow.logger.info(f"child_results member type is {type(i)}") - a: ChildWorkflowHandle[dict[str,str], Any] = i + a = i real_results.append(str(await a)) workflow.logger.info(f"real_results member type is {type(a.result())}") # Generate comparison report - report_location = await workflow.execute_activity( - generate_statistics, - arg=real_results, - start_to_close_timeout=timedelta(seconds=10), - retry_policy=RetryPolicy(maximum_attempts=100, backoff_coefficient=2.0, maximum_interval=timedelta(seconds=10)), - ) - workflow.logger.info(f"Report generated at: {report_location}") + # report_location = await workflow.execute_activity( + # generate_statistics, + # arg=real_results, + # result_type=GenerateStatisticsOutput, + # start_to_close_timeout=timedelta(seconds=10), + # retry_policy=RetryPolicy(maximum_attempts=100, backoff_coefficient=2.0, maximum_interval=timedelta(seconds=10)), + # ) + # workflow.logger.info(f"Report generated at: {report_location}") self.verify_output.workflow_status = OmexVerifyWorkflowStatus.COMPLETED return self.verify_output diff --git a/tests/workflows/test_verify_workflows.py b/tests/workflows/test_verify_workflows.py index 17be733..5a770ef 100644 --- a/tests/workflows/test_verify_workflows.py +++ b/tests/workflows/test_verify_workflows.py @@ -49,12 +49,13 @@ async def test_sim_workflow(temporal_client: Client, temporal_verify_worker: Wor simulatorVersion=sim_spec.version or "latest", simulatorDigest=uuid.uuid4().hex, status=BiosimSimulationRunStatus.SUCCEEDED), - result_s3_path='s3://bucket-name/results/vcell.output' + hdf5_file=None ) if expected_results.biosim_run and workflow_handle_result.biosim_run: expected_results.biosim_run.id = workflow_handle_result.biosim_run.id expected_results.biosim_run.simulatorVersion = workflow_handle_result.biosim_run.simulatorVersion expected_results.biosim_run.simulatorDigest = workflow_handle_result.biosim_run.simulatorDigest + expected_results.hdf5_file = workflow_handle_result.hdf5_file assert workflow_handle_result == expected_results