Skip to content

Commit

Permalink
add simple wf status check
Browse files Browse the repository at this point in the history
  • Loading branch information
MehmedGIT committed Jan 25, 2024
1 parent 1fe5155 commit a57b8ea
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 4 deletions.
44 changes: 44 additions & 0 deletions src/ocrd_network/processing_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,15 @@ def __init__(self, config_path: str, host: str, port: int) -> None:
summary='Get information about a workflow run',
)

self.router.add_api_route(
path='/workflow/job-simple/{workflow_job_id}',
endpoint=self.get_workflow_info_simple,
methods=['GET'],
tags=['workflow', 'processing'],
status_code=status.HTTP_200_OK,
summary='Get simplified overall job status',
)

self.router.add_api_route(
path='/workflow',
endpoint=self.upload_workflow,
Expand Down Expand Up @@ -911,6 +920,41 @@ async def get_workflow_info(self, workflow_job_id) -> Dict:
})
return res

"""
Simplified version of the `get_workflow_info` that returns a single state for the entire workflow.
- If a single processing job fails, the entire workflow job status is set to FAILED.
- If there are any processing jobs running, regardless of other states, such as QUEUED and CACHED,
the entire workflow job status is set to RUNNING.
- If all processing jobs has finished successfully, only then the workflow job status is set to SUCCESS
"""
async def get_workflow_info_simple(self, workflow_job_id) -> Dict[str, StateEnum]:
""" Return list of a workflow's processor jobs
"""
try:
workflow_job = await db_get_workflow_job(workflow_job_id)
except ValueError:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workflow-Job with id: {workflow_job_id} not found")
job_ids: List[str] = [job_id for lst in workflow_job.processing_job_ids.values() for job_id in lst]
jobs = await db_get_processing_jobs(job_ids)

workflow_job_state = "UNSET"
success_jobs = 0
for job in jobs:
if job.state == StateEnum.cached or job.state == StateEnum.queued:
continue
if job.state == StateEnum.failed or job.state == StateEnum.cancelled:
workflow_job_state = StateEnum.failed
break
if job.state == StateEnum.running:
workflow_job_state = StateEnum.running
if job.state == StateEnum.success:
success_jobs += 1
# if all jobs succeeded
if len(job_ids) == success_jobs:
workflow_job_state = StateEnum.success
return {"wf_job_state": workflow_job_state}

async def upload_workflow(self, workflow: UploadFile) -> Dict:
""" Store a script for a workflow in the database
"""
Expand Down
26 changes: 22 additions & 4 deletions tests/network/test_processing_server.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from time import sleep
from requests import get, post
from ocrd_utils.config import config
from ocrd_network import NETWORK_AGENT_WORKER
from ocrd_network.models import StateEnum

PROCESSING_SERVER_URL = config.PROCESSING_SERVER_URL

Expand Down Expand Up @@ -52,12 +54,28 @@ def test_processing_server_workflow_request():
path_to_mets = "/tmp/assets/kant_aufklaerung_1784/data/mets.xml"
path_to_dummy_wf = "/tmp/assets/dummy-workflow.txt"

test_url = f"{PROCESSING_SERVER_URL}/workflow?mets_path={path_to_mets}&page_wise=False"
# submit the workflow job
test_url = f"{PROCESSING_SERVER_URL}/workflow/run?mets_path={path_to_mets}&page_wise=False"
response = post(
url=test_url,
files={"workflow": open(path_to_dummy_wf, 'rb')}
)
assert response.status_code == 201, \
f'Processing server: {test_url}, {response.status_code}'
assert response.status_code == 200, f"Processing server: {test_url}, {response.status_code}"

wf_job_id = response.json()["job_id"]
assert wf_job_id

# TODO: Check workflow status here
# check simplified workflow status till timeout
tries = 50
wait_between_tries = 30
wf_job_state = None
test_url = f"{PROCESSING_SERVER_URL}/workflow/job-simple/{wf_job_id}"
while tries > 0:
sleep(wait_between_tries)
response = post(url=test_url)
assert response.status_code == 200, f"Processing server: {test_url}, {response.status_code}"
wf_job_state = response.json()["wf_job_state"]
if wf_job_state == StateEnum.success or wf_job_state == StateEnum.failed:
break
tries -= 1
assert wf_job_state == "SUCCESS"

0 comments on commit a57b8ea

Please sign in to comment.