From 17156aafe30d40b1da311196b7eb84ba69c3cd29 Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Fri, 14 Feb 2025 16:35:57 +0000
Subject: [PATCH 1/3] Simplify poll handling of prematurely deleted job log dir
---
cylc/flow/job_runner_mgr.py | 4 +++
cylc/flow/task_job_mgr.py | 31 ++++++++++----------
tests/integration/test_task_job_mgr.py | 39 +++++++++++++++++---------
tests/unit/test_job_runner_mgr.py | 2 +-
4 files changed, 46 insertions(+), 30 deletions(-)
diff --git a/cylc/flow/job_runner_mgr.py b/cylc/flow/job_runner_mgr.py
index f9646e27f1..247a699f80 100644
--- a/cylc/flow/job_runner_mgr.py
+++ b/cylc/flow/job_runner_mgr.py
@@ -445,7 +445,11 @@ def _jobs_poll_status_files(self, job_log_root, job_log_dir):
# If the log directory has been deleted prematurely, return a task
# failure and an explanation:
if not os.path.exists(os.path.join(job_log_root, ctx.job_log_dir)):
+ # The job may still be in the job runner and may yet succeed,
+ # but we assume it failed & exited because it's the best we
+ # can do as it is no longer possible to poll it.
ctx.run_status = 1
+ ctx.job_runner_exit_polled = 1
ctx.run_signal = JOB_FILES_REMOVED_MESSAGE
return ctx
try:
diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py
index 96af05b487..adcb875b7c 100644
--- a/cylc/flow/task_job_mgr.py
+++ b/cylc/flow/task_job_mgr.py
@@ -159,7 +159,7 @@ def __init__(self, workflow, proc_pool, workflow_db_mgr,
self.workflow = workflow
self.proc_pool = proc_pool
self.workflow_db_mgr: WorkflowDatabaseManager = workflow_db_mgr
- self.task_events_mgr = task_events_mgr
+ self.task_events_mgr: TaskEventsManager = task_events_mgr
self.data_store_mgr = data_store_mgr
self.job_file_writer = JobFileWriter()
self.job_runner_mgr = self.job_file_writer.job_runner_mgr
@@ -753,16 +753,6 @@ def _manip_task_jobs_callback(
or (ctx.ret_code and ctx.ret_code != 255)
):
LOG.error(ctx)
- # A polling task lets us know that a task has failed because it's
- # log folder has been deleted whilst the task was active:
- if (
- getattr(ctx, 'out', None)
- and JOB_FILES_REMOVED_MESSAGE in ctx.out
- ):
- LOG.error(
- f'Task {ctx.cmd[-1]} failed because task log directory'
- f'\n{"/".join(ctx.cmd[-2:])}\nhas been removed.'
- )
# A dict for easy reference of (CYCLE, NAME, SUBMIT_NUM) -> TaskProxy
#
# Note for "reload": A TaskProxy instance may be replaced on reload, so
@@ -845,7 +835,13 @@ def _poll_task_job_callback_255(self, workflow, itask, cmd_ctx, line):
)
self.poll_task_jobs(workflow, [itask])
- def _poll_task_job_callback(self, workflow, itask, cmd_ctx, line):
+ def _poll_task_job_callback(
+ self,
+ workflow: str,
+ itask: 'TaskProxy',
+ cmd_ctx: SubProcContext,
+ line: str,
+ ):
"""Helper for _poll_task_jobs_callback, on one task job."""
ctx = SubProcContext(self.JOBS_POLL, None)
ctx.out = line
@@ -872,6 +868,13 @@ def _poll_task_job_callback(self, workflow, itask, cmd_ctx, line):
log_lvl = DEBUG if (
itask.platform.get('communication method') == 'poll'
) else INFO
+
+ if jp_ctx.run_signal == JOB_FILES_REMOVED_MESSAGE:
+ LOG.error(
+ f"platform: {itask.platform['name']} - job log directory "
+ f"{job_tokens.relative_id} no longer exists"
+ )
+
if jp_ctx.run_status == 1 and jp_ctx.run_signal in ["ERR", "EXIT"]:
# Failed normally
self.task_events_mgr.process_message(
@@ -879,9 +882,7 @@ def _poll_task_job_callback(self, workflow, itask, cmd_ctx, line):
elif jp_ctx.run_status == 1 and jp_ctx.job_runner_exit_polled == 1:
# Failed by a signal, and no longer in job runner
self.task_events_mgr.process_message(
- itask, log_lvl, TASK_OUTPUT_FAILED, jp_ctx.time_run_exit, flag)
- self.task_events_mgr.process_message(
- itask, log_lvl, FAIL_MESSAGE_PREFIX + jp_ctx.run_signal,
+ itask, log_lvl, f"{FAIL_MESSAGE_PREFIX}{jp_ctx.run_signal}",
jp_ctx.time_run_exit,
flag)
elif jp_ctx.run_status == 1: # noqa: SIM114
diff --git a/tests/integration/test_task_job_mgr.py b/tests/integration/test_task_job_mgr.py
index 26b9355c0f..090a3540b4 100644
--- a/tests/integration/test_task_job_mgr.py
+++ b/tests/integration/test_task_job_mgr.py
@@ -15,14 +15,18 @@
# along with this program. If not, see .
from contextlib import suppress
+import json
import logging
-from types import SimpleNamespace
from typing import Any as Fixture
+from unittest.mock import Mock
from cylc.flow import CYLC_LOG
from cylc.flow.job_runner_mgr import JOB_FILES_REMOVED_MESSAGE
from cylc.flow.scheduler import Scheduler
-from cylc.flow.task_state import TASK_STATUS_RUNNING
+from cylc.flow.task_state import (
+ TASK_STATUS_FAILED,
+ TASK_STATUS_RUNNING,
+)
async def test_run_job_cmd_no_hosts_error(
@@ -238,23 +242,30 @@ async def test_broadcast_platform_change(
async def test_poll_job_deleted_log_folder(
- one_conf, flow, scheduler, start, caplog
+ one_conf, flow, scheduler, start, log_filter
):
"""Capture a task error caused by polling finding the job log dir deleted.
https://github.com/cylc/cylc-flow/issues/6425
"""
- ctx = SimpleNamespace()
- ctx.out = JOB_FILES_REMOVED_MESSAGE
- ctx.ret_code = None
- ctx.cmd = ['foo', 'bar']
-
- schd = scheduler(flow(one_conf), run_mode='live', paused_start=False)
+ response = {
+ 'run_signal': JOB_FILES_REMOVED_MESSAGE,
+ 'run_status': 1,
+ 'job_runner_exit_polled': 1,
+ }
+ schd: Scheduler = scheduler(flow(one_conf))
async with start(schd):
- schd.task_job_mgr._manip_task_jobs_callback(ctx, '', [], '')
+ itask = schd.pool.get_tasks()[0]
+ itask.submit_num = 1
+ job_id = itask.tokens.duplicate(job='01').relative_id
+ schd.task_job_mgr._poll_task_job_callback(
+ schd.workflow,
+ itask,
+ cmd_ctx=Mock(),
+ line=f'2025-02-13T12:08:30Z|{job_id}|{json.dumps(response)}',
+ )
+ assert itask.state(TASK_STATUS_FAILED)
- assert (
- 'Task bar failed because task log directory'
- '\nfoo/bar\nhas been removed.'
- in caplog.messages
+ assert log_filter(
+ logging.ERROR, f"job log directory {job_id} no longer exists"
)
diff --git a/tests/unit/test_job_runner_mgr.py b/tests/unit/test_job_runner_mgr.py
index 2d6fc6e606..40c69f66dd 100644
--- a/tests/unit/test_job_runner_mgr.py
+++ b/tests/unit/test_job_runner_mgr.py
@@ -66,6 +66,7 @@ def test__job_poll_status_files_deleted_logdir():
ctx = jrm._jobs_poll_status_files('foo', 'bar')
assert ctx.run_signal == JOB_FILES_REMOVED_MESSAGE
assert ctx.run_status == 1
+ assert ctx.job_runner_exit_polled == 1
def test__job_poll_status_files_ioerror(tmp_path, capsys):
@@ -75,4 +76,3 @@ def test__job_poll_status_files_ioerror(tmp_path, capsys):
jrm._jobs_poll_status_files(str(tmp_path), 'sub')
cap = capsys.readouterr()
assert '[Errno 2] No such file or directory' in cap.err
-
From aa4312f3e0ea6a58e37d15b179511d27a0390625 Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Fri, 14 Feb 2025 16:54:28 +0000
Subject: [PATCH 2/3] Mypy
---
cylc/flow/task_job_mgr.py | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py
index adcb875b7c..6df4e9f4ea 100644
--- a/cylc/flow/task_job_mgr.py
+++ b/cylc/flow/task_job_mgr.py
@@ -44,6 +44,7 @@
Optional,
Tuple,
Union,
+ cast,
)
from cylc.flow import LOG
@@ -1270,7 +1271,8 @@ def _prep_submit_task_job(
workflow, itask, '(platform not defined)', exc)
return False
else:
- itask.platform = platform
+ # (platform is not None here as subshell eval has finished)
+ itask.platform = cast('dict', platform)
# Retry delays, needed for the try_num
self._set_retry_timers(itask, rtconfig)
From 63fbf9c2ac2a24f90c54df433997120066d10d6f Mon Sep 17 00:00:00 2001
From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com>
Date: Fri, 14 Feb 2025 19:13:37 +0000
Subject: [PATCH 3/3] Simplify too-flaky test
---
.../cylc-poll/16-execution-time-limit.t | 55 ++++---------------
.../16-execution-time-limit/flow.cylc | 12 +---
2 files changed, 12 insertions(+), 55 deletions(-)
diff --git a/tests/flakyfunctional/cylc-poll/16-execution-time-limit.t b/tests/flakyfunctional/cylc-poll/16-execution-time-limit.t
index a771131869..a6ef36c951 100755
--- a/tests/flakyfunctional/cylc-poll/16-execution-time-limit.t
+++ b/tests/flakyfunctional/cylc-poll/16-execution-time-limit.t
@@ -14,12 +14,12 @@
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
-#-------------------------------------------------------------------------------
+
# Test execution time limit polling.
export REQUIRE_PLATFORM='loc:* comms:poll runner:background'
. "$(dirname "$0")/test_header"
-#-------------------------------------------------------------------------------
-set_test_number 4
+
+set_test_number 5
create_test_global_config '' "
[platforms]
[[$CYLC_TEST_PLATFORM]]
@@ -28,51 +28,16 @@ create_test_global_config '' "
execution time limit polling intervals = PT5S
"
install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}"
-#-------------------------------------------------------------------------------
+
run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}"
workflow_run_ok "${TEST_NAME_BASE}-run" \
cylc play --reference-test -v --no-detach "${WORKFLOW_NAME}" --timestamp
-#-------------------------------------------------------------------------------
-# shellcheck disable=SC2317
-cmp_times () {
- # Test if the times $1 and $2 are within $3 seconds of each other.
- python3 -u - "$@" <<'__PYTHON__'
-import sys
-from metomi.isodatetime.parsers import TimePointParser
-parser = TimePointParser()
-time_1 = parser.parse(sys.argv[1])
-time_2 = parser.parse(sys.argv[2])
-if abs((time_1 - time_2).get_seconds()) > int(sys.argv[3]):
- sys.exit("abs(predicted - actual) > tolerance: %s" % sys.argv[1:])
-__PYTHON__
-}
-time_offset () {
- # Add an ISO8601 duration to an ISO8601 date-time.
- python3 -u - "$@" <<'__PYTHON__'
-import sys
-from metomi.isodatetime.parsers import TimePointParser, DurationParser
-print(
- TimePointParser().parse(sys.argv[1]) + DurationParser().parse(sys.argv[2]))
-__PYTHON__
-}
-#-------------------------------------------------------------------------------
+
LOG="${WORKFLOW_RUN_DIR}/log/scheduler/log"
-# Test logging of the "next job poll" message when task starts.
-TEST_NAME="${TEST_NAME_BASE}-log-entry"
-LINE="$(grep '\[1/foo.* execution timeout=None, polling intervals=' "${LOG}")"
-run_ok "${TEST_NAME}" grep -q 'health: execution timeout=None, polling intervals=' <<< "${LINE}"
-# Determine poll times.
-PREDICTED_POLL_TIME=$(time_offset \
- "$(cut -d ' ' -f 1 <<< "${LINE}")" \
- "PT10S") # PT5S time limit + PT5S polling interval
-ACTUAL_POLL_TIME=$(sed -n \
- 's|\(.*\) DEBUG - \[1/foo.* (polled)failed .*|\1|p' "${LOG}")
-# Test execution timeout polling.
-# Main loop is roughly 1 second, but integer rounding may give an apparent 2
-# seconds delay, so set threshold as 2 seconds.
-run_ok "${TEST_NAME_BASE}-poll-time" \
- cmp_times "${PREDICTED_POLL_TIME}" "${ACTUAL_POLL_TIME}" '10'
-#-------------------------------------------------------------------------------
+log_scan "${TEST_NAME_BASE}-log" "${LOG}" 1 0 \
+ "\[1/foo/01:submitted\] => running" \
+ "\[1/foo/01:running\] poll now, (next in PT5S" \
+ "\[1/foo/01:running\] (polled)failed/XCPU"
+
purge
-exit
diff --git a/tests/flakyfunctional/cylc-poll/16-execution-time-limit/flow.cylc b/tests/flakyfunctional/cylc-poll/16-execution-time-limit/flow.cylc
index 3fa9301a5a..1f06427c26 100644
--- a/tests/flakyfunctional/cylc-poll/16-execution-time-limit/flow.cylc
+++ b/tests/flakyfunctional/cylc-poll/16-execution-time-limit/flow.cylc
@@ -13,14 +13,6 @@
[runtime]
[[foo]]
platform = {{ environ['CYLC_TEST_PLATFORM'] }}
- init-script = cylc__job__disable_fail_signals ERR EXIT
- script = """
- cylc__job__wait_cylc_message_started
- # give it a while for the started message to get picked up by
- # the scheduler
- sleep 10
- exit 1
- """
- [[[job]]]
- execution time limit = PT5S
+ script = sleep 20
+ execution time limit = PT10S
[[bar]]