Skip to content

Commit

Permalink
check for run_id not found and handle gracefully
Browse files Browse the repository at this point in the history
  • Loading branch information
jcschaff committed Jan 21, 2025
1 parent 8eb620c commit bcc7cbb
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 73 deletions.
1 change: 1 addition & 0 deletions biosim_server/common/biosim1_client/biosim_service_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
class BiosimServiceRest(BiosimService):
@override
async def get_sim_run(self, simulation_run_id: str) -> BiosimSimulationRun:
""" raises ClientResponseError if the response status is not 2xx """
api_base_url = os.environ.get('API_BASE_URL') or "https://api.biosimulations.org"
assert (api_base_url is not None)

Expand Down
1 change: 1 addition & 0 deletions biosim_server/common/biosim1_client/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ class BiosimSimulationRunStatus(StrEnum):
PROCESSING = 'PROCESSING',
SUCCEEDED = 'SUCCEEDED',
FAILED = 'FAILED',
RUN_ID_NOT_FOUND = 'RUN_ID_NOT_FOUND',
UNKNOWN = 'UNKNOWN'


Expand Down
23 changes: 20 additions & 3 deletions biosim_server/workflows/simulate/biosim_activities.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,42 @@
import logging
import os
from typing import Optional

from aiohttp import ClientResponseError
from pydantic import BaseModel
from temporalio import activity

from biosim_server.common.biosim1_client import BiosimService, BiosimServiceRest, SourceOmex, BiosimSimulatorSpec, \
BiosimSimulationRun, HDF5File, Hdf5DataValues
BiosimSimulationRun, BiosimSimulationRunStatus, HDF5File, Hdf5DataValues
from biosim_server.common.storage import FileService
from biosim_server.dependencies import get_file_service, get_biosim_service


class GetSimRunInput(BaseModel):
biosim_run_id: str
abort_on_not_found: Optional[bool] = False


@activity.defn
async def get_sim_run(get_sim_run_input: GetSimRunInput) -> BiosimSimulationRun:
activity.logger.setLevel(logging.INFO)
biosim_service = BiosimServiceRest()
biosim_sim_run: BiosimSimulationRun = await biosim_service.get_sim_run(get_sim_run_input.biosim_run_id)
return biosim_sim_run
try:
biosim_sim_run: BiosimSimulationRun = await biosim_service.get_sim_run(get_sim_run_input.biosim_run_id)
return biosim_sim_run
except ClientResponseError as e:
if e.status == 404:
activity.logger.warn(f"Simulation run with id {get_sim_run_input.biosim_run_id} not found", exc_info=e)
if get_sim_run_input.abort_on_not_found:
# return a failed simulation run rather than raising an exception to avoid retrying the activity
return BiosimSimulationRun(
id=get_sim_run_input.biosim_run_id,
name="",
simulator="",
simulatorVersion="",
status=BiosimSimulationRunStatus.RUN_ID_NOT_FOUND
)
raise e


class SubmitBiosimSimInput(BaseModel):
Expand Down
28 changes: 24 additions & 4 deletions biosim_server/workflows/verify/runs_verify_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@

from pydantic import BaseModel
from temporalio import workflow
from temporalio.client import WorkflowFailureError
from temporalio.common import RetryPolicy
from temporalio.exceptions import ActivityError

from biosim_server.common.biosim1_client import BiosimSimulationRun, BiosimSimulatorSpec, HDF5File
from biosim_server.common.biosim1_client import BiosimSimulationRun, BiosimSimulatorSpec, HDF5File, \
BiosimSimulationRunStatus
from biosim_server.workflows.simulate import GetSimRunInput, get_hdf5_metadata, GetHdf5MetadataInput, get_sim_run
from biosim_server.workflows.verify import generate_statistics, GenerateStatisticsOutput, GenerateStatisticsInput, \
SimulationRunInfo
Expand All @@ -18,6 +21,12 @@ class RunsVerifyWorkflowStatus(StrEnum):
IN_PROGRESS = "IN_PROGRESS"
COMPLETED = "COMPLETED"
FAILED = "FAILED"
RUN_ID_NOT_FOUND = "RUN_ID_NOT_FOUND"

@property
def is_done(self) -> bool:
return self in [RunsVerifyWorkflowStatus.COMPLETED, RunsVerifyWorkflowStatus.FAILED,
RunsVerifyWorkflowStatus.RUN_ID_NOT_FOUND]


class RunsVerifyWorkflowInput(BaseModel):
Expand All @@ -34,6 +43,7 @@ class RunsVerifyWorkflowOutput(BaseModel):
workflow_input: RunsVerifyWorkflowInput
workflow_status: RunsVerifyWorkflowStatus
timestamp: str
workflow_error: Optional[str] = None
actual_simulators: Optional[list[BiosimSimulatorSpec]] = None
workflow_run_id: Optional[str] = None
workflow_results: Optional[GenerateStatisticsOutput] = None
Expand Down Expand Up @@ -61,13 +71,23 @@ async def run(self, verify_input: RunsVerifyWorkflowInput) -> RunsVerifyWorkflow
workflow.logger.setLevel(level=logging.INFO)
workflow.logger.info("Main workflow started.")

# verify biosimulation runs are valid and complete and retreive Simulation results metadata
# verify biosimulation runs are valid and complete and retrieve Simulation results metadata
biosimulation_runs: list[BiosimSimulationRun] = []
for biosimulation_run_id in verify_input.biosimulations_run_ids:
biosimulation_run = await workflow.execute_activity(get_sim_run,
args=[GetSimRunInput(biosim_run_id=biosimulation_run_id)], start_to_close_timeout=timedelta(seconds=60),
retry_policy=RetryPolicy(maximum_attempts=3))
args=[GetSimRunInput(biosim_run_id=biosimulation_run_id,
abort_on_not_found=True)],
start_to_close_timeout=timedelta(seconds=60),
retry_policy=RetryPolicy(maximum_attempts=30))
biosimulation_runs.append(biosimulation_run)
if biosimulation_run.status == BiosimSimulationRunStatus.RUN_ID_NOT_FOUND:
# important to update the state of the workflow before returning,
# so that subsequent workflow queries get the correct state
self.verify_output = RunsVerifyWorkflowOutput(workflow_id=workflow.info().workflow_id,
workflow_input=verify_input, workflow_run_id=workflow.info().run_id,
workflow_status=RunsVerifyWorkflowStatus.RUN_ID_NOT_FOUND, timestamp=str(workflow.now()),
workflow_error=f"Simulation run with id {biosimulation_run_id} not found.")
return self.verify_output

workflow.logger.info(f"verified access to completed run ids {verify_input.biosimulations_run_ids}.")

Expand Down
42 changes: 41 additions & 1 deletion tests/api/test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
from biosim_server.config import get_settings
from biosim_server.workflows.verify import OmexVerifyWorkflowInput, OmexVerifyWorkflowOutput, OmexVerifyWorkflowStatus, \
RunsVerifyWorkflowInput, RunsVerifyWorkflowOutput, RunsVerifyWorkflowStatus
from tests.workflows.test_verify_workflows import assert_omex_verify_results, assert_runs_verify_results
from tests.workflows.test_omex_verify_workflows import assert_omex_verify_results
from tests.workflows.test_runs_verify_workflow import assert_runs_verify_results


@pytest.mark.asyncio
Expand Down Expand Up @@ -150,3 +151,42 @@ async def test_runs_verify_and_get_output(runs_verify_workflow_input: RunsVerify
logging.info(f"polling, job status is: {output.workflow_status}")

assert_runs_verify_results(observed_results=output, expected_results_template=runs_verify_workflow_output)


@pytest.mark.asyncio
async def test_runs_verify_not_found(runs_verify_workflow_input: RunsVerifyWorkflowInput,
runs_verify_workflow_output: RunsVerifyWorkflowOutput,
omex_test_file: Path,
file_service_local: FileServiceLocal,
temporal_client: Client,
temporal_verify_worker: Worker,
biosim_service_rest: BiosimServiceRest) -> None:
assert runs_verify_workflow_input.observables is not None
query_params: dict[str, float | str | list[str]] = {
"workflow_id_prefix": "verification-",
"biosimulations_run_ids": ["bad_run_id_1", "bad_run_id_2"],
"include_outputs": runs_verify_workflow_input.include_outputs,
"user_description": runs_verify_workflow_input.user_description,
"observables": runs_verify_workflow_input.observables,
"rel_tol": runs_verify_workflow_input.rel_tol,
"abs_tol": runs_verify_workflow_input.abs_tol
}

async with (AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as test_client):
with open(omex_test_file, "rb") as file:
response = await test_client.post("/verify_runs", params=query_params)
assert response.status_code == 200

output = RunsVerifyWorkflowOutput.model_validate(response.json())

# poll api until job is completed
while not output.workflow_status.is_done:
await asyncio.sleep(5)
response = await test_client.get(f"/verify_runs/{output.workflow_id}")
if response.status_code == 200:
output = RunsVerifyWorkflowOutput.model_validate(response.json())
logging.info(f"polling, job status is: {output.workflow_status}")

assert output.workflow_status == RunsVerifyWorkflowStatus.RUN_ID_NOT_FOUND
assert output.workflow_error in [ "Simulation run with id bad_run_id_1 not found",
"Simulation run with id bad_run_id_2 not found"]
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from biosim_server.common.storage import FileServiceLocal, FileServiceS3
from biosim_server.config import get_settings
from biosim_server.workflows.verify import ComparisonStatistics, OmexVerifyWorkflow, OmexVerifyWorkflowInput, \
OmexVerifyWorkflowOutput, RunsVerifyWorkflow, RunsVerifyWorkflowInput, RunsVerifyWorkflowOutput
OmexVerifyWorkflowOutput
from tests.fixtures.s3_fixtures import file_service_s3_test_base_path


Expand Down Expand Up @@ -71,25 +71,6 @@ async def test_omex_verify_workflow_mockS3(temporal_client: Client, temporal_ver
assert_omex_verify_results(observed_results=observed_results, expected_results_template=omex_verify_workflow_output)


@pytest.mark.asyncio
async def test_run_verify_workflow(temporal_client: Client, temporal_verify_worker: Worker,
runs_verify_workflow_input: RunsVerifyWorkflowInput,
runs_verify_workflow_output: RunsVerifyWorkflowOutput,
biosim_service_rest: BiosimServiceRest, file_service_s3: FileServiceS3) -> None:
assert biosim_service_rest is not None

workflow_id = uuid.uuid4().hex

observed_results: RunsVerifyWorkflowOutput = await temporal_client.execute_workflow(
RunsVerifyWorkflow.run, args=[runs_verify_workflow_input],
# result_type=RunsVerifyWorkflowOutput,
id=workflow_id, task_queue="verification_tasks")

# with open(Path(__file__).parent / "fixtures" / "local_data" / "RunsVerifyWorkflowOutput_expected.json", "w") as f:
# f.write(observed_results.model_dump_json())

assert_runs_verify_results(observed_results=observed_results, expected_results_template=runs_verify_workflow_output)


def assert_omex_verify_results(observed_results: OmexVerifyWorkflowOutput,
expected_results_template: OmexVerifyWorkflowOutput) -> None:
Expand Down Expand Up @@ -135,48 +116,3 @@ def assert_omex_verify_results(observed_results: OmexVerifyWorkflowOutput,

# compare everything else which has not been hardwired to match
assert observed_results == expected_results


def assert_runs_verify_results(observed_results: RunsVerifyWorkflowOutput,
expected_results_template: RunsVerifyWorkflowOutput) -> None:

# customize expected results to match those things which vary between runs
expected_results = expected_results_template.model_copy(deep=True)
expected_results.workflow_id = observed_results.workflow_id
expected_results.workflow_run_id = observed_results.workflow_run_id
expected_results.timestamp = observed_results.timestamp
if expected_results.workflow_results and observed_results.workflow_results:
for i in range(len(expected_results.workflow_results.sims_run_info)):
expected_biosim_sim_run = expected_results.workflow_results.sims_run_info[i].biosim_sim_run
expected_hdf5_file = expected_results.workflow_results.sims_run_info[i].hdf5_file
result_biosim_sim_run = observed_results.workflow_results.sims_run_info[i].biosim_sim_run
result_hdf5_file = observed_results.workflow_results.sims_run_info[i].hdf5_file

expected_hdf5_file.uri = result_hdf5_file.uri
expected_hdf5_file.id = result_hdf5_file.id
expected_biosim_sim_run.id = result_biosim_sim_run.id
expected_biosim_sim_run.simulatorVersion = result_biosim_sim_run.simulatorVersion
expected_biosim_sim_run.simulatorDigest = result_biosim_sim_run.simulatorDigest

# compare the comparison statistics separately, seems not to be 100% deterministic
assert expected_results.workflow_results is not None
assert observed_results.workflow_results is not None
ds_names = list(expected_results.workflow_results.comparison_statistics.keys())
num_simulators = len(expected_results_template.workflow_input.biosimulations_run_ids)
for ds_name in ds_names:
for i in range(num_simulators):
for j in range(num_simulators):
assert observed_results.workflow_results is not None
observed_compare_i_j: ComparisonStatistics = \
observed_results.workflow_results.comparison_statistics[ds_name][i][j]
expected_compare_i_j: ComparisonStatistics = \
expected_results.workflow_results.comparison_statistics[ds_name][i][j]
# assert observed_compare_i_j.mse == expected_compare_i_j.mse
assert observed_compare_i_j.is_close == expected_compare_i_j.is_close
assert observed_compare_i_j.error_message == expected_compare_i_j.error_message
assert observed_compare_i_j.simulator_version_i == expected_compare_i_j.simulator_version_i
assert observed_compare_i_j.simulator_version_j == expected_compare_i_j.simulator_version_j
expected_results.workflow_results.comparison_statistics = observed_results.workflow_results.comparison_statistics

# compare everything else which has not been hardwired to match
assert observed_results == expected_results
Loading

0 comments on commit bcc7cbb

Please sign in to comment.