diff --git a/src/ocrd_network/processing_server.py b/src/ocrd_network/processing_server.py index f60201311..d4a28e58e 100644 --- a/src/ocrd_network/processing_server.py +++ b/src/ocrd_network/processing_server.py @@ -228,6 +228,15 @@ def __init__(self, config_path: str, host: str, port: int) -> None: summary='Get information about a workflow run', ) + self.router.add_api_route( + path='/workflow/job-simple/{workflow_job_id}', + endpoint=self.get_workflow_info_simple, + methods=['GET'], + tags=['workflow', 'processing'], + status_code=status.HTTP_200_OK, + summary='Get simplified overall job status', + ) + self.router.add_api_route( path='/workflow', endpoint=self.upload_workflow, @@ -911,6 +920,41 @@ async def get_workflow_info(self, workflow_job_id) -> Dict: }) return res + """ + Simplified version of the `get_workflow_info` that returns a single state for the entire workflow. + - If a single processing job fails, the entire workflow job status is set to FAILED. + - If there are any processing jobs running, regardless of other states, such as QUEUED and CACHED, + the entire workflow job status is set to RUNNING. + - If all processing jobs has finished successfully, only then the workflow job status is set to SUCCESS + """ + async def get_workflow_info_simple(self, workflow_job_id) -> Dict[str, StateEnum]: + """ Return list of a workflow's processor jobs + """ + try: + workflow_job = await db_get_workflow_job(workflow_job_id) + except ValueError: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, + detail=f"Workflow-Job with id: {workflow_job_id} not found") + job_ids: List[str] = [job_id for lst in workflow_job.processing_job_ids.values() for job_id in lst] + jobs = await db_get_processing_jobs(job_ids) + + workflow_job_state = "UNSET" + success_jobs = 0 + for job in jobs: + if job.state == StateEnum.cached or job.state == StateEnum.queued: + continue + if job.state == StateEnum.failed or job.state == StateEnum.cancelled: + workflow_job_state = StateEnum.failed + break + if job.state == StateEnum.running: + workflow_job_state = StateEnum.running + if job.state == StateEnum.success: + success_jobs += 1 + # if all jobs succeeded + if len(job_ids) == success_jobs: + workflow_job_state = StateEnum.success + return {"wf_job_state": workflow_job_state} + async def upload_workflow(self, workflow: UploadFile) -> Dict: """ Store a script for a workflow in the database """ diff --git a/tests/network/test_processing_server.py b/tests/network/test_processing_server.py index 35e4d9aa4..6716ddf50 100644 --- a/tests/network/test_processing_server.py +++ b/tests/network/test_processing_server.py @@ -1,6 +1,8 @@ +from time import sleep from requests import get, post from ocrd_utils.config import config from ocrd_network import NETWORK_AGENT_WORKER +from ocrd_network.models import StateEnum PROCESSING_SERVER_URL = config.PROCESSING_SERVER_URL @@ -52,12 +54,28 @@ def test_processing_server_workflow_request(): path_to_mets = "/tmp/assets/kant_aufklaerung_1784/data/mets.xml" path_to_dummy_wf = "/tmp/assets/dummy-workflow.txt" - test_url = f"{PROCESSING_SERVER_URL}/workflow?mets_path={path_to_mets}&page_wise=False" + # submit the workflow job + test_url = f"{PROCESSING_SERVER_URL}/workflow/run?mets_path={path_to_mets}&page_wise=False" response = post( url=test_url, files={"workflow": open(path_to_dummy_wf, 'rb')} ) - assert response.status_code == 201, \ - f'Processing server: {test_url}, {response.status_code}' + assert response.status_code == 200, f"Processing server: {test_url}, {response.status_code}" + + wf_job_id = response.json()["job_id"] + assert wf_job_id - # TODO: Check workflow status here + # check simplified workflow status till timeout + tries = 50 + wait_between_tries = 30 + wf_job_state = None + test_url = f"{PROCESSING_SERVER_URL}/workflow/job-simple/{wf_job_id}" + while tries > 0: + sleep(wait_between_tries) + response = post(url=test_url) + assert response.status_code == 200, f"Processing server: {test_url}, {response.status_code}" + wf_job_state = response.json()["wf_job_state"] + if wf_job_state == StateEnum.success or wf_job_state == StateEnum.failed: + break + tries -= 1 + assert wf_job_state == "SUCCESS"