From bcc7cbbc9e044a96870894e8f41726a30619e692 Mon Sep 17 00:00:00 2001 From: jcschaff Date: Tue, 21 Jan 2025 12:04:18 -0500 Subject: [PATCH] check for run_id not found and handle gracefully --- .../biosim1_client/biosim_service_rest.py | 1 + biosim_server/common/biosim1_client/models.py | 1 + .../workflows/simulate/biosim_activities.py | 23 +++- .../workflows/verify/runs_verify_workflow.py | 28 +++- tests/api/test_main.py | 42 +++++- ...flows.py => test_omex_verify_workflows.py} | 66 +--------- tests/workflows/test_runs_verify_workflow.py | 123 ++++++++++++++++++ 7 files changed, 211 insertions(+), 73 deletions(-) rename tests/workflows/{test_verify_workflows.py => test_omex_verify_workflows.py} (62%) create mode 100644 tests/workflows/test_runs_verify_workflow.py diff --git a/biosim_server/common/biosim1_client/biosim_service_rest.py b/biosim_server/common/biosim1_client/biosim_service_rest.py index 8f3f130..9a596e9 100644 --- a/biosim_server/common/biosim1_client/biosim_service_rest.py +++ b/biosim_server/common/biosim1_client/biosim_service_rest.py @@ -18,6 +18,7 @@ class BiosimServiceRest(BiosimService): @override async def get_sim_run(self, simulation_run_id: str) -> BiosimSimulationRun: + """ raises ClientResponseError if the response status is not 2xx """ api_base_url = os.environ.get('API_BASE_URL') or "https://api.biosimulations.org" assert (api_base_url is not None) diff --git a/biosim_server/common/biosim1_client/models.py b/biosim_server/common/biosim1_client/models.py index 4a96297..2771c53 100644 --- a/biosim_server/common/biosim1_client/models.py +++ b/biosim_server/common/biosim1_client/models.py @@ -61,6 +61,7 @@ class BiosimSimulationRunStatus(StrEnum): PROCESSING = 'PROCESSING', SUCCEEDED = 'SUCCEEDED', FAILED = 'FAILED', + RUN_ID_NOT_FOUND = 'RUN_ID_NOT_FOUND', UNKNOWN = 'UNKNOWN' diff --git a/biosim_server/workflows/simulate/biosim_activities.py b/biosim_server/workflows/simulate/biosim_activities.py index 83f0ef0..e89d22c 100644 --- a/biosim_server/workflows/simulate/biosim_activities.py +++ b/biosim_server/workflows/simulate/biosim_activities.py @@ -1,25 +1,42 @@ import logging import os +from typing import Optional +from aiohttp import ClientResponseError from pydantic import BaseModel from temporalio import activity from biosim_server.common.biosim1_client import BiosimService, BiosimServiceRest, SourceOmex, BiosimSimulatorSpec, \ - BiosimSimulationRun, HDF5File, Hdf5DataValues + BiosimSimulationRun, BiosimSimulationRunStatus, HDF5File, Hdf5DataValues from biosim_server.common.storage import FileService from biosim_server.dependencies import get_file_service, get_biosim_service class GetSimRunInput(BaseModel): biosim_run_id: str + abort_on_not_found: Optional[bool] = False @activity.defn async def get_sim_run(get_sim_run_input: GetSimRunInput) -> BiosimSimulationRun: activity.logger.setLevel(logging.INFO) biosim_service = BiosimServiceRest() - biosim_sim_run: BiosimSimulationRun = await biosim_service.get_sim_run(get_sim_run_input.biosim_run_id) - return biosim_sim_run + try: + biosim_sim_run: BiosimSimulationRun = await biosim_service.get_sim_run(get_sim_run_input.biosim_run_id) + return biosim_sim_run + except ClientResponseError as e: + if e.status == 404: + activity.logger.warn(f"Simulation run with id {get_sim_run_input.biosim_run_id} not found", exc_info=e) + if get_sim_run_input.abort_on_not_found: + # return a failed simulation run rather than raising an exception to avoid retrying the activity + return BiosimSimulationRun( + id=get_sim_run_input.biosim_run_id, + name="", + simulator="", + simulatorVersion="", + status=BiosimSimulationRunStatus.RUN_ID_NOT_FOUND + ) + raise e class SubmitBiosimSimInput(BaseModel): diff --git a/biosim_server/workflows/verify/runs_verify_workflow.py b/biosim_server/workflows/verify/runs_verify_workflow.py index 01d01b8..8640a62 100644 --- a/biosim_server/workflows/verify/runs_verify_workflow.py +++ b/biosim_server/workflows/verify/runs_verify_workflow.py @@ -5,9 +5,12 @@ from pydantic import BaseModel from temporalio import workflow +from temporalio.client import WorkflowFailureError from temporalio.common import RetryPolicy +from temporalio.exceptions import ActivityError -from biosim_server.common.biosim1_client import BiosimSimulationRun, BiosimSimulatorSpec, HDF5File +from biosim_server.common.biosim1_client import BiosimSimulationRun, BiosimSimulatorSpec, HDF5File, \ + BiosimSimulationRunStatus from biosim_server.workflows.simulate import GetSimRunInput, get_hdf5_metadata, GetHdf5MetadataInput, get_sim_run from biosim_server.workflows.verify import generate_statistics, GenerateStatisticsOutput, GenerateStatisticsInput, \ SimulationRunInfo @@ -18,6 +21,12 @@ class RunsVerifyWorkflowStatus(StrEnum): IN_PROGRESS = "IN_PROGRESS" COMPLETED = "COMPLETED" FAILED = "FAILED" + RUN_ID_NOT_FOUND = "RUN_ID_NOT_FOUND" + + @property + def is_done(self) -> bool: + return self in [RunsVerifyWorkflowStatus.COMPLETED, RunsVerifyWorkflowStatus.FAILED, + RunsVerifyWorkflowStatus.RUN_ID_NOT_FOUND] class RunsVerifyWorkflowInput(BaseModel): @@ -34,6 +43,7 @@ class RunsVerifyWorkflowOutput(BaseModel): workflow_input: RunsVerifyWorkflowInput workflow_status: RunsVerifyWorkflowStatus timestamp: str + workflow_error: Optional[str] = None actual_simulators: Optional[list[BiosimSimulatorSpec]] = None workflow_run_id: Optional[str] = None workflow_results: Optional[GenerateStatisticsOutput] = None @@ -61,13 +71,23 @@ async def run(self, verify_input: RunsVerifyWorkflowInput) -> RunsVerifyWorkflow workflow.logger.setLevel(level=logging.INFO) workflow.logger.info("Main workflow started.") - # verify biosimulation runs are valid and complete and retreive Simulation results metadata + # verify biosimulation runs are valid and complete and retrieve Simulation results metadata biosimulation_runs: list[BiosimSimulationRun] = [] for biosimulation_run_id in verify_input.biosimulations_run_ids: biosimulation_run = await workflow.execute_activity(get_sim_run, - args=[GetSimRunInput(biosim_run_id=biosimulation_run_id)], start_to_close_timeout=timedelta(seconds=60), - retry_policy=RetryPolicy(maximum_attempts=3)) + args=[GetSimRunInput(biosim_run_id=biosimulation_run_id, + abort_on_not_found=True)], + start_to_close_timeout=timedelta(seconds=60), + retry_policy=RetryPolicy(maximum_attempts=30)) biosimulation_runs.append(biosimulation_run) + if biosimulation_run.status == BiosimSimulationRunStatus.RUN_ID_NOT_FOUND: + # important to update the state of the workflow before returning, + # so that subsequent workflow queries get the correct state + self.verify_output = RunsVerifyWorkflowOutput(workflow_id=workflow.info().workflow_id, + workflow_input=verify_input, workflow_run_id=workflow.info().run_id, + workflow_status=RunsVerifyWorkflowStatus.RUN_ID_NOT_FOUND, timestamp=str(workflow.now()), + workflow_error=f"Simulation run with id {biosimulation_run_id} not found.") + return self.verify_output workflow.logger.info(f"verified access to completed run ids {verify_input.biosimulations_run_ids}.") diff --git a/tests/api/test_main.py b/tests/api/test_main.py index 84dfb3a..f24f392 100644 --- a/tests/api/test_main.py +++ b/tests/api/test_main.py @@ -14,7 +14,8 @@ from biosim_server.config import get_settings from biosim_server.workflows.verify import OmexVerifyWorkflowInput, OmexVerifyWorkflowOutput, OmexVerifyWorkflowStatus, \ RunsVerifyWorkflowInput, RunsVerifyWorkflowOutput, RunsVerifyWorkflowStatus -from tests.workflows.test_verify_workflows import assert_omex_verify_results, assert_runs_verify_results +from tests.workflows.test_omex_verify_workflows import assert_omex_verify_results +from tests.workflows.test_runs_verify_workflow import assert_runs_verify_results @pytest.mark.asyncio @@ -150,3 +151,42 @@ async def test_runs_verify_and_get_output(runs_verify_workflow_input: RunsVerify logging.info(f"polling, job status is: {output.workflow_status}") assert_runs_verify_results(observed_results=output, expected_results_template=runs_verify_workflow_output) + + +@pytest.mark.asyncio +async def test_runs_verify_not_found(runs_verify_workflow_input: RunsVerifyWorkflowInput, + runs_verify_workflow_output: RunsVerifyWorkflowOutput, + omex_test_file: Path, + file_service_local: FileServiceLocal, + temporal_client: Client, + temporal_verify_worker: Worker, + biosim_service_rest: BiosimServiceRest) -> None: + assert runs_verify_workflow_input.observables is not None + query_params: dict[str, float | str | list[str]] = { + "workflow_id_prefix": "verification-", + "biosimulations_run_ids": ["bad_run_id_1", "bad_run_id_2"], + "include_outputs": runs_verify_workflow_input.include_outputs, + "user_description": runs_verify_workflow_input.user_description, + "observables": runs_verify_workflow_input.observables, + "rel_tol": runs_verify_workflow_input.rel_tol, + "abs_tol": runs_verify_workflow_input.abs_tol + } + + async with (AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as test_client): + with open(omex_test_file, "rb") as file: + response = await test_client.post("/verify_runs", params=query_params) + assert response.status_code == 200 + + output = RunsVerifyWorkflowOutput.model_validate(response.json()) + + # poll api until job is completed + while not output.workflow_status.is_done: + await asyncio.sleep(5) + response = await test_client.get(f"/verify_runs/{output.workflow_id}") + if response.status_code == 200: + output = RunsVerifyWorkflowOutput.model_validate(response.json()) + logging.info(f"polling, job status is: {output.workflow_status}") + + assert output.workflow_status == RunsVerifyWorkflowStatus.RUN_ID_NOT_FOUND + assert output.workflow_error in [ "Simulation run with id bad_run_id_1 not found", + "Simulation run with id bad_run_id_2 not found"] diff --git a/tests/workflows/test_verify_workflows.py b/tests/workflows/test_omex_verify_workflows.py similarity index 62% rename from tests/workflows/test_verify_workflows.py rename to tests/workflows/test_omex_verify_workflows.py index 1cbd67a..f859338 100644 --- a/tests/workflows/test_verify_workflows.py +++ b/tests/workflows/test_omex_verify_workflows.py @@ -10,7 +10,7 @@ from biosim_server.common.storage import FileServiceLocal, FileServiceS3 from biosim_server.config import get_settings from biosim_server.workflows.verify import ComparisonStatistics, OmexVerifyWorkflow, OmexVerifyWorkflowInput, \ - OmexVerifyWorkflowOutput, RunsVerifyWorkflow, RunsVerifyWorkflowInput, RunsVerifyWorkflowOutput + OmexVerifyWorkflowOutput from tests.fixtures.s3_fixtures import file_service_s3_test_base_path @@ -71,25 +71,6 @@ async def test_omex_verify_workflow_mockS3(temporal_client: Client, temporal_ver assert_omex_verify_results(observed_results=observed_results, expected_results_template=omex_verify_workflow_output) -@pytest.mark.asyncio -async def test_run_verify_workflow(temporal_client: Client, temporal_verify_worker: Worker, - runs_verify_workflow_input: RunsVerifyWorkflowInput, - runs_verify_workflow_output: RunsVerifyWorkflowOutput, - biosim_service_rest: BiosimServiceRest, file_service_s3: FileServiceS3) -> None: - assert biosim_service_rest is not None - - workflow_id = uuid.uuid4().hex - - observed_results: RunsVerifyWorkflowOutput = await temporal_client.execute_workflow( - RunsVerifyWorkflow.run, args=[runs_verify_workflow_input], - # result_type=RunsVerifyWorkflowOutput, - id=workflow_id, task_queue="verification_tasks") - - # with open(Path(__file__).parent / "fixtures" / "local_data" / "RunsVerifyWorkflowOutput_expected.json", "w") as f: - # f.write(observed_results.model_dump_json()) - - assert_runs_verify_results(observed_results=observed_results, expected_results_template=runs_verify_workflow_output) - def assert_omex_verify_results(observed_results: OmexVerifyWorkflowOutput, expected_results_template: OmexVerifyWorkflowOutput) -> None: @@ -135,48 +116,3 @@ def assert_omex_verify_results(observed_results: OmexVerifyWorkflowOutput, # compare everything else which has not been hardwired to match assert observed_results == expected_results - - -def assert_runs_verify_results(observed_results: RunsVerifyWorkflowOutput, - expected_results_template: RunsVerifyWorkflowOutput) -> None: - - # customize expected results to match those things which vary between runs - expected_results = expected_results_template.model_copy(deep=True) - expected_results.workflow_id = observed_results.workflow_id - expected_results.workflow_run_id = observed_results.workflow_run_id - expected_results.timestamp = observed_results.timestamp - if expected_results.workflow_results and observed_results.workflow_results: - for i in range(len(expected_results.workflow_results.sims_run_info)): - expected_biosim_sim_run = expected_results.workflow_results.sims_run_info[i].biosim_sim_run - expected_hdf5_file = expected_results.workflow_results.sims_run_info[i].hdf5_file - result_biosim_sim_run = observed_results.workflow_results.sims_run_info[i].biosim_sim_run - result_hdf5_file = observed_results.workflow_results.sims_run_info[i].hdf5_file - - expected_hdf5_file.uri = result_hdf5_file.uri - expected_hdf5_file.id = result_hdf5_file.id - expected_biosim_sim_run.id = result_biosim_sim_run.id - expected_biosim_sim_run.simulatorVersion = result_biosim_sim_run.simulatorVersion - expected_biosim_sim_run.simulatorDigest = result_biosim_sim_run.simulatorDigest - - # compare the comparison statistics separately, seems not to be 100% deterministic - assert expected_results.workflow_results is not None - assert observed_results.workflow_results is not None - ds_names = list(expected_results.workflow_results.comparison_statistics.keys()) - num_simulators = len(expected_results_template.workflow_input.biosimulations_run_ids) - for ds_name in ds_names: - for i in range(num_simulators): - for j in range(num_simulators): - assert observed_results.workflow_results is not None - observed_compare_i_j: ComparisonStatistics = \ - observed_results.workflow_results.comparison_statistics[ds_name][i][j] - expected_compare_i_j: ComparisonStatistics = \ - expected_results.workflow_results.comparison_statistics[ds_name][i][j] - # assert observed_compare_i_j.mse == expected_compare_i_j.mse - assert observed_compare_i_j.is_close == expected_compare_i_j.is_close - assert observed_compare_i_j.error_message == expected_compare_i_j.error_message - assert observed_compare_i_j.simulator_version_i == expected_compare_i_j.simulator_version_i - assert observed_compare_i_j.simulator_version_j == expected_compare_i_j.simulator_version_j - expected_results.workflow_results.comparison_statistics = observed_results.workflow_results.comparison_statistics - - # compare everything else which has not been hardwired to match - assert observed_results == expected_results diff --git a/tests/workflows/test_runs_verify_workflow.py b/tests/workflows/test_runs_verify_workflow.py new file mode 100644 index 0000000..f451a0a --- /dev/null +++ b/tests/workflows/test_runs_verify_workflow.py @@ -0,0 +1,123 @@ +import asyncio +import logging +import uuid + +import pytest +from temporalio.client import Client, WorkflowHandle +from temporalio.worker import Worker + +from biosim_server.common.biosim1_client import BiosimServiceRest +from biosim_server.common.storage import FileServiceS3 +from biosim_server.workflows.verify import ComparisonStatistics, RunsVerifyWorkflow, RunsVerifyWorkflowInput, \ + RunsVerifyWorkflowOutput, RunsVerifyWorkflowStatus + + +@pytest.mark.asyncio +async def test_run_verify_workflow(temporal_client: Client, temporal_verify_worker: Worker, + runs_verify_workflow_input: RunsVerifyWorkflowInput, + runs_verify_workflow_output: RunsVerifyWorkflowOutput, + biosim_service_rest: BiosimServiceRest, file_service_s3: FileServiceS3) -> None: + assert biosim_service_rest is not None + + workflow_id = uuid.uuid4().hex + + observed_results: RunsVerifyWorkflowOutput = await temporal_client.execute_workflow( + RunsVerifyWorkflow.run, args=[runs_verify_workflow_input], + # result_type=RunsVerifyWorkflowOutput, + id=workflow_id, task_queue="verification_tasks") + + # with open(Path(__file__).parent / "fixtures" / "local_data" / "RunsVerifyWorkflowOutput_expected.json", "w") as f: + # f.write(observed_results.model_dump_json()) + + assert_runs_verify_results(observed_results=observed_results, expected_results_template=runs_verify_workflow_output) + + +@pytest.mark.asyncio +async def test_run_verify_workflow_not_found_execute(temporal_client: Client, temporal_verify_worker: Worker, + runs_verify_workflow_input: RunsVerifyWorkflowInput, + runs_verify_workflow_output: RunsVerifyWorkflowOutput, + biosim_service_rest: BiosimServiceRest, file_service_s3: FileServiceS3) -> None: + assert biosim_service_rest is not None + + workflow_id = uuid.uuid4().hex + + runs_verify_workflow_input = runs_verify_workflow_input.model_copy(deep=True) + runs_verify_workflow_input.biosimulations_run_ids[0] = "bad_id" + + observed_results: RunsVerifyWorkflowOutput = await temporal_client.execute_workflow( + RunsVerifyWorkflow.run, args=[runs_verify_workflow_input], id=workflow_id, task_queue="verification_tasks") + assert observed_results.workflow_status == RunsVerifyWorkflowStatus.RUN_ID_NOT_FOUND + assert observed_results.workflow_error == "Simulation run with id bad_id not found." + + +@pytest.mark.asyncio +async def test_run_verify_workflow_not_found_poll(temporal_client: Client, temporal_verify_worker: Worker, + runs_verify_workflow_input: RunsVerifyWorkflowInput, + runs_verify_workflow_output: RunsVerifyWorkflowOutput, + biosim_service_rest: BiosimServiceRest, file_service_s3: FileServiceS3) -> None: + assert biosim_service_rest is not None + + workflow_id = uuid.uuid4().hex + + runs_verify_workflow_input = runs_verify_workflow_input.model_copy(deep=True) + runs_verify_workflow_input.biosimulations_run_ids[0] = "bad_id" + + handle = await temporal_client.start_workflow( + RunsVerifyWorkflow.run, + args=[runs_verify_workflow_input], + id=workflow_id, task_queue="verification_tasks") + + # poll until the workflow completes using workflow query mechanism + observed_results: RunsVerifyWorkflowOutput = await handle.query("get_output", result_type=RunsVerifyWorkflowOutput) + while not observed_results.workflow_status.is_done: + await asyncio.sleep(1) + observed_results = await handle.query("get_output", result_type=RunsVerifyWorkflowOutput) + logging.log(logging.INFO, f"Workflow status: {observed_results.workflow_status}") + + assert observed_results.workflow_status == RunsVerifyWorkflowStatus.RUN_ID_NOT_FOUND + assert observed_results.workflow_error == "Simulation run with id bad_id not found." + + +def assert_runs_verify_results(observed_results: RunsVerifyWorkflowOutput, + expected_results_template: RunsVerifyWorkflowOutput) -> None: + + # customize expected results to match those things which vary between runs + expected_results = expected_results_template.model_copy(deep=True) + expected_results.workflow_id = observed_results.workflow_id + expected_results.workflow_run_id = observed_results.workflow_run_id + expected_results.timestamp = observed_results.timestamp + if expected_results.workflow_results and observed_results.workflow_results: + for i in range(len(expected_results.workflow_results.sims_run_info)): + expected_biosim_sim_run = expected_results.workflow_results.sims_run_info[i].biosim_sim_run + expected_hdf5_file = expected_results.workflow_results.sims_run_info[i].hdf5_file + result_biosim_sim_run = observed_results.workflow_results.sims_run_info[i].biosim_sim_run + result_hdf5_file = observed_results.workflow_results.sims_run_info[i].hdf5_file + + expected_hdf5_file.uri = result_hdf5_file.uri + expected_hdf5_file.id = result_hdf5_file.id + expected_biosim_sim_run.id = result_biosim_sim_run.id + expected_biosim_sim_run.simulatorVersion = result_biosim_sim_run.simulatorVersion + expected_biosim_sim_run.simulatorDigest = result_biosim_sim_run.simulatorDigest + + # compare the comparison statistics separately, seems not to be 100% deterministic + assert expected_results.workflow_results is not None + assert observed_results.workflow_results is not None + ds_names = list(expected_results.workflow_results.comparison_statistics.keys()) + num_simulators = len(expected_results_template.workflow_input.biosimulations_run_ids) + for ds_name in ds_names: + for i in range(num_simulators): + for j in range(num_simulators): + assert observed_results.workflow_results is not None + observed_compare_i_j: ComparisonStatistics = \ + observed_results.workflow_results.comparison_statistics[ds_name][i][j] + expected_compare_i_j: ComparisonStatistics = \ + expected_results.workflow_results.comparison_statistics[ds_name][i][j] + # assert observed_compare_i_j.mse == expected_compare_i_j.mse + assert observed_compare_i_j.is_close == expected_compare_i_j.is_close + assert observed_compare_i_j.error_message == expected_compare_i_j.error_message + assert observed_compare_i_j.simulator_version_i == expected_compare_i_j.simulator_version_i + assert observed_compare_i_j.simulator_version_j == expected_compare_i_j.simulator_version_j + expected_results.workflow_results.comparison_statistics = observed_results.workflow_results.comparison_statistics + + # compare everything else which has not been hardwired to match + assert observed_results == expected_results