diff --git a/changes.d/6577.fix.md b/changes.d/6577.fix.md
new file mode 100644
index 00000000000..4f5b1ee0b05
--- /dev/null
+++ b/changes.d/6577.fix.md
@@ -0,0 +1 @@
+Fixed a bug where if you prematurely deleted the job log directory, it would leave tasks permanently in the submitted or running states.
diff --git a/cylc/flow/job_runner_mgr.py b/cylc/flow/job_runner_mgr.py
index 558a3d158b2..247a699f80c 100644
--- a/cylc/flow/job_runner_mgr.py
+++ b/cylc/flow/job_runner_mgr.py
@@ -46,6 +46,9 @@
from cylc.flow.parsec.OrderedDict import OrderedDict
+JOB_FILES_REMOVED_MESSAGE = 'ERR_JOB_FILES_REMOVED'
+
+
class JobPollContext():
"""Context object for a job poll."""
CONTEXT_ATTRIBUTES = (
@@ -439,6 +442,16 @@ def _filter_submit_output(cls, st_file_path, job_runner, out, err):
def _jobs_poll_status_files(self, job_log_root, job_log_dir):
"""Helper 1 for self.jobs_poll(job_log_root, job_log_dirs)."""
ctx = JobPollContext(job_log_dir)
+ # If the log directory has been deleted prematurely, return a task
+ # failure and an explanation:
+ if not os.path.exists(os.path.join(job_log_root, ctx.job_log_dir)):
+ # The job may still be in the job runner and may yet succeed,
+ # but we assume it failed & exited because it's the best we
+ # can do as it is no longer possible to poll it.
+ ctx.run_status = 1
+ ctx.job_runner_exit_polled = 1
+ ctx.run_signal = JOB_FILES_REMOVED_MESSAGE
+ return ctx
try:
with open(
os.path.join(job_log_root, ctx.job_log_dir, JOB_LOG_STATUS)
diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py
index 9ce496c8fd2..7779d05c019 100644
--- a/cylc/flow/task_job_mgr.py
+++ b/cylc/flow/task_job_mgr.py
@@ -44,6 +44,7 @@
Optional,
Tuple,
Union,
+ cast,
)
from cylc.flow import LOG
@@ -60,7 +61,7 @@
is_remote_platform,
)
from cylc.flow.job_file import JobFileWriter
-from cylc.flow.job_runner_mgr import JobPollContext
+from cylc.flow.job_runner_mgr import JOB_FILES_REMOVED_MESSAGE, JobPollContext
from cylc.flow.pathutil import get_remote_workflow_run_job_dir
from cylc.flow.platforms import (
get_host_from_platform,
@@ -864,7 +865,13 @@ def _poll_task_job_callback_255(self, workflow, itask, cmd_ctx, line):
)
self.poll_task_jobs(workflow, [itask])
- def _poll_task_job_callback(self, workflow, itask, cmd_ctx, line):
+ def _poll_task_job_callback(
+ self,
+ workflow: str,
+ itask: 'TaskProxy',
+ cmd_ctx: SubProcContext,
+ line: str,
+ ):
"""Helper for _poll_task_jobs_callback, on one task job."""
ctx = SubProcContext(self.JOBS_POLL, None)
ctx.out = line
@@ -891,6 +898,13 @@ def _poll_task_job_callback(self, workflow, itask, cmd_ctx, line):
log_lvl = DEBUG if (
itask.platform.get('communication method') == 'poll'
) else INFO
+
+ if jp_ctx.run_signal == JOB_FILES_REMOVED_MESSAGE:
+ LOG.error(
+ f"platform: {itask.platform['name']} - job log directory "
+ f"{job_tokens.relative_id} no longer exists"
+ )
+
if jp_ctx.run_status == 1 and jp_ctx.run_signal in ["ERR", "EXIT"]:
# Failed normally
self.task_events_mgr.process_message(
@@ -898,9 +912,7 @@ def _poll_task_job_callback(self, workflow, itask, cmd_ctx, line):
elif jp_ctx.run_status == 1 and jp_ctx.job_runner_exit_polled == 1:
# Failed by a signal, and no longer in job runner
self.task_events_mgr.process_message(
- itask, log_lvl, TASK_OUTPUT_FAILED, jp_ctx.time_run_exit, flag)
- self.task_events_mgr.process_message(
- itask, log_lvl, FAIL_MESSAGE_PREFIX + jp_ctx.run_signal,
+ itask, log_lvl, f"{FAIL_MESSAGE_PREFIX}{jp_ctx.run_signal}",
jp_ctx.time_run_exit,
flag)
elif jp_ctx.run_status == 1: # noqa: SIM114
@@ -1288,7 +1300,8 @@ def _prep_submit_task_job(
workflow, itask, '(platform not defined)', exc)
return False
else:
- itask.platform = platform # type: ignore[assignment]
+ # (platform is not None here as subshell eval has finished)
+ itask.platform = cast('dict', platform)
# Retry delays, needed for the try_num
self._set_retry_timers(itask, rtconfig)
diff --git a/tests/flakyfunctional/cylc-poll/16-execution-time-limit.t b/tests/flakyfunctional/cylc-poll/16-execution-time-limit.t
index a7711318690..a6ef36c9512 100755
--- a/tests/flakyfunctional/cylc-poll/16-execution-time-limit.t
+++ b/tests/flakyfunctional/cylc-poll/16-execution-time-limit.t
@@ -14,12 +14,12 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-#-------------------------------------------------------------------------------
+
# Test execution time limit polling.
export REQUIRE_PLATFORM='loc:* comms:poll runner:background'
. "$(dirname "$0")/test_header"
-#-------------------------------------------------------------------------------
-set_test_number 4
+
+set_test_number 5
create_test_global_config '' "
[platforms]
[[$CYLC_TEST_PLATFORM]]
@@ -28,51 +28,16 @@ create_test_global_config '' "
execution time limit polling intervals = PT5S
"
install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"
-#-------------------------------------------------------------------------------
+
run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"
workflow_run_ok "${TEST_NAME_BASE}-run" \
cylc play --reference-test -v --no-detach "${WORKFLOW_NAME}" --timestamp
-#-------------------------------------------------------------------------------
-# shellcheck disable=SC2317
-cmp_times () {
- # Test if the times $1 and $2 are within $3 seconds of each other.
- python3 -u - "$@" <<'__PYTHON__'
-import sys
-from metomi.isodatetime.parsers import TimePointParser
-parser = TimePointParser()
-time_1 = parser.parse(sys.argv[1])
-time_2 = parser.parse(sys.argv[2])
-if abs((time_1 - time_2).get_seconds()) > int(sys.argv[3]):
- sys.exit("abs(predicted - actual) > tolerance: %s" % sys.argv[1:])
-__PYTHON__
-}
-time_offset () {
- # Add an ISO8601 duration to an ISO8601 date-time.
- python3 -u - "$@" <<'__PYTHON__'
-import sys
-from metomi.isodatetime.parsers import TimePointParser, DurationParser
-print(
- TimePointParser().parse(sys.argv[1]) + DurationParser().parse(sys.argv[2]))
-__PYTHON__
-}
-#-------------------------------------------------------------------------------
+
LOG="${WORKFLOW_RUN_DIR}/log/scheduler/log"
-# Test logging of the "next job poll" message when task starts.
-TEST_NAME="${TEST_NAME_BASE}-log-entry"
-LINE="$(grep '\[1/foo.* execution timeout=None, polling intervals=' "${LOG}")"
-run_ok "${TEST_NAME}" grep -q 'health: execution timeout=None, polling intervals=' <<< "${LINE}"
-# Determine poll times.
-PREDICTED_POLL_TIME=$(time_offset \
- "$(cut -d ' ' -f 1 <<< "${LINE}")" \
- "PT10S") # PT5S time limit + PT5S polling interval
-ACTUAL_POLL_TIME=$(sed -n \
- 's|\(.*\) DEBUG - \[1/foo.* (polled)failed .*|\1|p' "${LOG}")
-# Test execution timeout polling.
-# Main loop is roughly 1 second, but integer rounding may give an apparent 2
-# seconds delay, so set threshold as 2 seconds.
-run_ok "${TEST_NAME_BASE}-poll-time" \
- cmp_times "${PREDICTED_POLL_TIME}" "${ACTUAL_POLL_TIME}" '10'
-#-------------------------------------------------------------------------------
+log_scan "${TEST_NAME_BASE}-log" "${LOG}" 1 0 \
+ "\[1/foo/01:submitted\] => running" \
+ "\[1/foo/01:running\] poll now, (next in PT5S" \
+ "\[1/foo/01:running\] (polled)failed/XCPU"
+
purge
-exit
diff --git a/tests/flakyfunctional/cylc-poll/16-execution-time-limit/flow.cylc b/tests/flakyfunctional/cylc-poll/16-execution-time-limit/flow.cylc
index 3fa9301a5a8..1f06427c26c 100644
--- a/tests/flakyfunctional/cylc-poll/16-execution-time-limit/flow.cylc
+++ b/tests/flakyfunctional/cylc-poll/16-execution-time-limit/flow.cylc
@@ -13,14 +13,6 @@
[runtime]
[[foo]]
platform = {{ environ['CYLC_TEST_PLATFORM'] }}
- init-script = cylc__job__disable_fail_signals ERR EXIT
- script = """
- cylc__job__wait_cylc_message_started
- # give it a while for the started message to get picked up by
- # the scheduler
- sleep 10
- exit 1
- """
- [[[job]]]
- execution time limit = PT5S
+ script = sleep 20
+ execution time limit = PT10S
[[bar]]
diff --git a/tests/integration/test_task_job_mgr.py b/tests/integration/test_task_job_mgr.py
index eda704a2d39..799499eda3b 100644
--- a/tests/integration/test_task_job_mgr.py
+++ b/tests/integration/test_task_job_mgr.py
@@ -15,12 +15,18 @@
# along with this program. If not, see .
from contextlib import suppress
+import json
import logging
from typing import Any as Fixture
+from unittest.mock import Mock
from cylc.flow import CYLC_LOG
+from cylc.flow.job_runner_mgr import JOB_FILES_REMOVED_MESSAGE
from cylc.flow.scheduler import Scheduler
-from cylc.flow.task_state import TASK_STATUS_RUNNING
+from cylc.flow.task_state import (
+ TASK_STATUS_FAILED,
+ TASK_STATUS_RUNNING,
+)
async def test_run_job_cmd_no_hosts_error(
@@ -229,3 +235,33 @@ async def test_broadcast_platform_change(
assert schd.pool.get_tasks()[0].platform['name'] == 'foo'
# ... and that remote init failed because all hosts bad:
assert log_filter(regex=r"platform: foo .*\(no hosts were reachable\)")
+
+
+async def test_poll_job_deleted_log_folder(
+ one_conf, flow, scheduler, start, log_filter
+):
+ """Capture a task error caused by polling finding the job log dir deleted.
+
+ https://github.com/cylc/cylc-flow/issues/6425
+ """
+ response = {
+ 'run_signal': JOB_FILES_REMOVED_MESSAGE,
+ 'run_status': 1,
+ 'job_runner_exit_polled': 1,
+ }
+ schd: Scheduler = scheduler(flow(one_conf))
+ async with start(schd):
+ itask = schd.pool.get_tasks()[0]
+ itask.submit_num = 1
+ job_id = itask.tokens.duplicate(job='01').relative_id
+ schd.task_job_mgr._poll_task_job_callback(
+ schd.workflow,
+ itask,
+ cmd_ctx=Mock(),
+ line=f'2025-02-13T12:08:30Z|{job_id}|{json.dumps(response)}',
+ )
+ assert itask.state(TASK_STATUS_FAILED)
+
+ assert log_filter(
+ logging.ERROR, f"job log directory {job_id} no longer exists"
+ )
diff --git a/tests/unit/test_job_runner_mgr.py b/tests/unit/test_job_runner_mgr.py
new file mode 100644
index 00000000000..40c69f66dd8
--- /dev/null
+++ b/tests/unit/test_job_runner_mgr.py
@@ -0,0 +1,78 @@
+# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
+# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+
+from cylc.flow.job_runner_mgr import (
+ JobRunnerManager, JOB_FILES_REMOVED_MESSAGE)
+
+jrm = JobRunnerManager()
+
+
+SAMPLE_STATUS = """
+ignore me, I have no = sign
+CYLC_JOB_RUNNER_NAME=pbs
+CYLC_JOB_ID=2361713
+CYLC_JOB_RUNNER_SUBMIT_TIME=2025-01-28T14:46:04Z
+CYLC_JOB_PID=2361713
+CYLC_JOB_INIT_TIME=2025-01-28T14:46:05Z
+CYLC_MESSAGE=2025-01-28T14:46:05Z|INFO|sleep 31
+CYLC_JOB_RUNNER_EXIT_POLLED=2025-01-28T14:46:08Z
+CYLC_JOB_EXIT=SUCCEEDED
+CYLC_JOB_EXIT_TIME=2025-01-28T14:46:38Z
+"""
+
+
+def test__job_poll_status_files(tmp_path):
+ """Good Path: A valid job.status files exists"""
+ (tmp_path / 'sub').mkdir()
+ (tmp_path / 'sub' / 'job.status').write_text(SAMPLE_STATUS)
+ ctx = jrm._jobs_poll_status_files(str(tmp_path), 'sub')
+ assert ctx.job_runner_name == 'pbs'
+ assert ctx.job_id == '2361713'
+ assert ctx.job_runner_exit_polled == 1
+ assert ctx.pid == '2361713'
+ assert ctx.time_submit_exit == '2025-01-28T14:46:04Z'
+ assert ctx.time_run == '2025-01-28T14:46:05Z'
+ assert ctx.time_run_exit == '2025-01-28T14:46:38Z'
+ assert ctx.run_status == 0
+ assert ctx.messages == ['2025-01-28T14:46:05Z|INFO|sleep 31']
+
+
+def test__job_poll_status_files_task_failed(tmp_path):
+ """Good Path: A valid job.status files exists"""
+ (tmp_path / 'sub').mkdir()
+ (tmp_path / 'sub' / 'job.status').write_text("CYLC_JOB_EXIT=FOO")
+ ctx = jrm._jobs_poll_status_files(str(tmp_path), 'sub')
+ assert ctx.run_status == 1
+ assert ctx.run_signal == 'FOO'
+
+
+def test__job_poll_status_files_deleted_logdir():
+ """The log dir has been deleted whilst the task is still active.
+ Return the context with the message that the task has failed.
+ """
+ ctx = jrm._jobs_poll_status_files('foo', 'bar')
+ assert ctx.run_signal == JOB_FILES_REMOVED_MESSAGE
+ assert ctx.run_status == 1
+ assert ctx.job_runner_exit_polled == 1
+
+
+def test__job_poll_status_files_ioerror(tmp_path, capsys):
+ """There is no readable file.
+ """
+ (tmp_path / 'sub').mkdir()
+ jrm._jobs_poll_status_files(str(tmp_path), 'sub')
+ cap = capsys.readouterr()
+ assert '[Errno 2] No such file or directory' in cap.err