diff --git a/example/definitions/simple.yaml b/example/definitions/simple.yaml index a94bbbab..4604a531 100644 --- a/example/definitions/simple.yaml +++ b/example/definitions/simple.yaml @@ -210,6 +210,7 @@ spec: config: job_state: cursors_states_enabled: true + cursors_states_namespace: example-namespace job: buffer_size: 5 diff --git a/src/saturn_engine/stores/jobs_store.py b/src/saturn_engine/stores/jobs_store.py index d078e279..94836108 100644 --- a/src/saturn_engine/stores/jobs_store.py +++ b/src/saturn_engine/stores/jobs_store.py @@ -4,6 +4,7 @@ import time from datetime import datetime +import sqlalchemy as sa from sqlalchemy import select from sqlalchemy import union_all from sqlalchemy import update @@ -190,13 +191,19 @@ def fetch_cursors_states( fetch_stmts = [] for job, cursors in query.items(): fetch_stmts.append( - select(Job.name, JobCursorState) + select(JobCursorState, sa.func.coalesce(Job.name, job).label("name")) .join( - JobCursorState, + Job, Job.job_definition_name == JobCursorState.job_definition_name, + isouter=True, ) .where( - Job.name == job, + sa.or_( + Job.name == job, + sa.and_( + Job.name.is_(None), JobCursorState.job_definition_name == job + ), + ), JobCursorState.cursor.in_(cursors), ) ) diff --git a/tests/worker_manager/api/test_jobs.py b/tests/worker_manager/api/test_jobs.py index 4e28968a..a4101de9 100644 --- a/tests/worker_manager/api/test_jobs.py +++ b/tests/worker_manager/api/test_jobs.py @@ -649,7 +649,7 @@ def test_sync_states( }, }, orphan_job.name: {"cursors_states": {"a": {"x": 1}}}, - } + }, } }, )