From 17156aafe30d40b1da311196b7eb84ba69c3cd29 Mon Sep 17 00:00:00 2001 From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> Date: Fri, 14 Feb 2025 16:35:57 +0000 Subject: [PATCH 1/3] Simplify poll handling of prematurely deleted job log dir --- cylc/flow/job_runner_mgr.py | 4 +++ cylc/flow/task_job_mgr.py | 31 ++++++++++---------- tests/integration/test_task_job_mgr.py | 39 +++++++++++++++++--------- tests/unit/test_job_runner_mgr.py | 2 +- 4 files changed, 46 insertions(+), 30 deletions(-) diff --git a/cylc/flow/job_runner_mgr.py b/cylc/flow/job_runner_mgr.py index f9646e27f1..247a699f80 100644 --- a/cylc/flow/job_runner_mgr.py +++ b/cylc/flow/job_runner_mgr.py @@ -445,7 +445,11 @@ def _jobs_poll_status_files(self, job_log_root, job_log_dir): # If the log directory has been deleted prematurely, return a task # failure and an explanation: if not os.path.exists(os.path.join(job_log_root, ctx.job_log_dir)): + # The job may still be in the job runner and may yet succeed, + # but we assume it failed & exited because it's the best we + # can do as it is no longer possible to poll it. ctx.run_status = 1 + ctx.job_runner_exit_polled = 1 ctx.run_signal = JOB_FILES_REMOVED_MESSAGE return ctx try: diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index 96af05b487..adcb875b7c 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -159,7 +159,7 @@ def __init__(self, workflow, proc_pool, workflow_db_mgr, self.workflow = workflow self.proc_pool = proc_pool self.workflow_db_mgr: WorkflowDatabaseManager = workflow_db_mgr - self.task_events_mgr = task_events_mgr + self.task_events_mgr: TaskEventsManager = task_events_mgr self.data_store_mgr = data_store_mgr self.job_file_writer = JobFileWriter() self.job_runner_mgr = self.job_file_writer.job_runner_mgr @@ -753,16 +753,6 @@ def _manip_task_jobs_callback( or (ctx.ret_code and ctx.ret_code != 255) ): LOG.error(ctx) - # A polling task lets us know that a task has failed because it's - # log folder has been deleted whilst the task was active: - if ( - getattr(ctx, 'out', None) - and JOB_FILES_REMOVED_MESSAGE in ctx.out - ): - LOG.error( - f'Task {ctx.cmd[-1]} failed because task log directory' - f'\n{"/".join(ctx.cmd[-2:])}\nhas been removed.' - ) # A dict for easy reference of (CYCLE, NAME, SUBMIT_NUM) -> TaskProxy # # Note for "reload": A TaskProxy instance may be replaced on reload, so @@ -845,7 +835,13 @@ def _poll_task_job_callback_255(self, workflow, itask, cmd_ctx, line): ) self.poll_task_jobs(workflow, [itask]) - def _poll_task_job_callback(self, workflow, itask, cmd_ctx, line): + def _poll_task_job_callback( + self, + workflow: str, + itask: 'TaskProxy', + cmd_ctx: SubProcContext, + line: str, + ): """Helper for _poll_task_jobs_callback, on one task job.""" ctx = SubProcContext(self.JOBS_POLL, None) ctx.out = line @@ -872,6 +868,13 @@ def _poll_task_job_callback(self, workflow, itask, cmd_ctx, line): log_lvl = DEBUG if ( itask.platform.get('communication method') == 'poll' ) else INFO + + if jp_ctx.run_signal == JOB_FILES_REMOVED_MESSAGE: + LOG.error( + f"platform: {itask.platform['name']} - job log directory " + f"{job_tokens.relative_id} no longer exists" + ) + if jp_ctx.run_status == 1 and jp_ctx.run_signal in ["ERR", "EXIT"]: # Failed normally self.task_events_mgr.process_message( @@ -879,9 +882,7 @@ def _poll_task_job_callback(self, workflow, itask, cmd_ctx, line): elif jp_ctx.run_status == 1 and jp_ctx.job_runner_exit_polled == 1: # Failed by a signal, and no longer in job runner self.task_events_mgr.process_message( - itask, log_lvl, TASK_OUTPUT_FAILED, jp_ctx.time_run_exit, flag) - self.task_events_mgr.process_message( - itask, log_lvl, FAIL_MESSAGE_PREFIX + jp_ctx.run_signal, + itask, log_lvl, f"{FAIL_MESSAGE_PREFIX}{jp_ctx.run_signal}", jp_ctx.time_run_exit, flag) elif jp_ctx.run_status == 1: # noqa: SIM114 diff --git a/tests/integration/test_task_job_mgr.py b/tests/integration/test_task_job_mgr.py index 26b9355c0f..090a3540b4 100644 --- a/tests/integration/test_task_job_mgr.py +++ b/tests/integration/test_task_job_mgr.py @@ -15,14 +15,18 @@ # along with this program. If not, see . from contextlib import suppress +import json import logging -from types import SimpleNamespace from typing import Any as Fixture +from unittest.mock import Mock from cylc.flow import CYLC_LOG from cylc.flow.job_runner_mgr import JOB_FILES_REMOVED_MESSAGE from cylc.flow.scheduler import Scheduler -from cylc.flow.task_state import TASK_STATUS_RUNNING +from cylc.flow.task_state import ( + TASK_STATUS_FAILED, + TASK_STATUS_RUNNING, +) async def test_run_job_cmd_no_hosts_error( @@ -238,23 +242,30 @@ async def test_broadcast_platform_change( async def test_poll_job_deleted_log_folder( - one_conf, flow, scheduler, start, caplog + one_conf, flow, scheduler, start, log_filter ): """Capture a task error caused by polling finding the job log dir deleted. https://github.com/cylc/cylc-flow/issues/6425 """ - ctx = SimpleNamespace() - ctx.out = JOB_FILES_REMOVED_MESSAGE - ctx.ret_code = None - ctx.cmd = ['foo', 'bar'] - - schd = scheduler(flow(one_conf), run_mode='live', paused_start=False) + response = { + 'run_signal': JOB_FILES_REMOVED_MESSAGE, + 'run_status': 1, + 'job_runner_exit_polled': 1, + } + schd: Scheduler = scheduler(flow(one_conf)) async with start(schd): - schd.task_job_mgr._manip_task_jobs_callback(ctx, '', [], '') + itask = schd.pool.get_tasks()[0] + itask.submit_num = 1 + job_id = itask.tokens.duplicate(job='01').relative_id + schd.task_job_mgr._poll_task_job_callback( + schd.workflow, + itask, + cmd_ctx=Mock(), + line=f'2025-02-13T12:08:30Z|{job_id}|{json.dumps(response)}', + ) + assert itask.state(TASK_STATUS_FAILED) - assert ( - 'Task bar failed because task log directory' - '\nfoo/bar\nhas been removed.' - in caplog.messages + assert log_filter( + logging.ERROR, f"job log directory {job_id} no longer exists" ) diff --git a/tests/unit/test_job_runner_mgr.py b/tests/unit/test_job_runner_mgr.py index 2d6fc6e606..40c69f66dd 100644 --- a/tests/unit/test_job_runner_mgr.py +++ b/tests/unit/test_job_runner_mgr.py @@ -66,6 +66,7 @@ def test__job_poll_status_files_deleted_logdir(): ctx = jrm._jobs_poll_status_files('foo', 'bar') assert ctx.run_signal == JOB_FILES_REMOVED_MESSAGE assert ctx.run_status == 1 + assert ctx.job_runner_exit_polled == 1 def test__job_poll_status_files_ioerror(tmp_path, capsys): @@ -75,4 +76,3 @@ def test__job_poll_status_files_ioerror(tmp_path, capsys): jrm._jobs_poll_status_files(str(tmp_path), 'sub') cap = capsys.readouterr() assert '[Errno 2] No such file or directory' in cap.err - From aa4312f3e0ea6a58e37d15b179511d27a0390625 Mon Sep 17 00:00:00 2001 From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> Date: Fri, 14 Feb 2025 16:54:28 +0000 Subject: [PATCH 2/3] Mypy --- cylc/flow/task_job_mgr.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cylc/flow/task_job_mgr.py b/cylc/flow/task_job_mgr.py index adcb875b7c..6df4e9f4ea 100644 --- a/cylc/flow/task_job_mgr.py +++ b/cylc/flow/task_job_mgr.py @@ -44,6 +44,7 @@ Optional, Tuple, Union, + cast, ) from cylc.flow import LOG @@ -1270,7 +1271,8 @@ def _prep_submit_task_job( workflow, itask, '(platform not defined)', exc) return False else: - itask.platform = platform + # (platform is not None here as subshell eval has finished) + itask.platform = cast('dict', platform) # Retry delays, needed for the try_num self._set_retry_timers(itask, rtconfig) From 63fbf9c2ac2a24f90c54df433997120066d10d6f Mon Sep 17 00:00:00 2001 From: Ronnie Dutta <61982285+MetRonnie@users.noreply.github.com> Date: Fri, 14 Feb 2025 19:13:37 +0000 Subject: [PATCH 3/3] Simplify too-flaky test --- .../cylc-poll/16-execution-time-limit.t | 55 ++++--------------- .../16-execution-time-limit/flow.cylc | 12 +--- 2 files changed, 12 insertions(+), 55 deletions(-) diff --git a/tests/flakyfunctional/cylc-poll/16-execution-time-limit.t b/tests/flakyfunctional/cylc-poll/16-execution-time-limit.t index a771131869..a6ef36c951 100755 --- a/tests/flakyfunctional/cylc-poll/16-execution-time-limit.t +++ b/tests/flakyfunctional/cylc-poll/16-execution-time-limit.t @@ -14,12 +14,12 @@ # # You should have received a copy of the GNU General Public License # along with this program. If not, see . -#------------------------------------------------------------------------------- + # Test execution time limit polling. export REQUIRE_PLATFORM='loc:* comms:poll runner:background' . "$(dirname "$0")/test_header" -#------------------------------------------------------------------------------- -set_test_number 4 + +set_test_number 5 create_test_global_config '' " [platforms] [[$CYLC_TEST_PLATFORM]] @@ -28,51 +28,16 @@ create_test_global_config '' " execution time limit polling intervals = PT5S " install_workflow "${TEST_NAME_BASE}" "${TEST_NAME_BASE}" -#------------------------------------------------------------------------------- + run_ok "${TEST_NAME_BASE}-validate" cylc validate "${WORKFLOW_NAME}" workflow_run_ok "${TEST_NAME_BASE}-run" \ cylc play --reference-test -v --no-detach "${WORKFLOW_NAME}" --timestamp -#------------------------------------------------------------------------------- -# shellcheck disable=SC2317 -cmp_times () { - # Test if the times $1 and $2 are within $3 seconds of each other. - python3 -u - "$@" <<'__PYTHON__' -import sys -from metomi.isodatetime.parsers import TimePointParser -parser = TimePointParser() -time_1 = parser.parse(sys.argv[1]) -time_2 = parser.parse(sys.argv[2]) -if abs((time_1 - time_2).get_seconds()) > int(sys.argv[3]): - sys.exit("abs(predicted - actual) > tolerance: %s" % sys.argv[1:]) -__PYTHON__ -} -time_offset () { - # Add an ISO8601 duration to an ISO8601 date-time. - python3 -u - "$@" <<'__PYTHON__' -import sys -from metomi.isodatetime.parsers import TimePointParser, DurationParser -print( - TimePointParser().parse(sys.argv[1]) + DurationParser().parse(sys.argv[2])) -__PYTHON__ -} -#------------------------------------------------------------------------------- + LOG="${WORKFLOW_RUN_DIR}/log/scheduler/log" -# Test logging of the "next job poll" message when task starts. -TEST_NAME="${TEST_NAME_BASE}-log-entry" -LINE="$(grep '\[1/foo.* execution timeout=None, polling intervals=' "${LOG}")" -run_ok "${TEST_NAME}" grep -q 'health: execution timeout=None, polling intervals=' <<< "${LINE}" -# Determine poll times. -PREDICTED_POLL_TIME=$(time_offset \ - "$(cut -d ' ' -f 1 <<< "${LINE}")" \ - "PT10S") # PT5S time limit + PT5S polling interval -ACTUAL_POLL_TIME=$(sed -n \ - 's|\(.*\) DEBUG - \[1/foo.* (polled)failed .*|\1|p' "${LOG}") -# Test execution timeout polling. -# Main loop is roughly 1 second, but integer rounding may give an apparent 2 -# seconds delay, so set threshold as 2 seconds. -run_ok "${TEST_NAME_BASE}-poll-time" \ - cmp_times "${PREDICTED_POLL_TIME}" "${ACTUAL_POLL_TIME}" '10' -#------------------------------------------------------------------------------- +log_scan "${TEST_NAME_BASE}-log" "${LOG}" 1 0 \ + "\[1/foo/01:submitted\] => running" \ + "\[1/foo/01:running\] poll now, (next in PT5S" \ + "\[1/foo/01:running\] (polled)failed/XCPU" + purge -exit diff --git a/tests/flakyfunctional/cylc-poll/16-execution-time-limit/flow.cylc b/tests/flakyfunctional/cylc-poll/16-execution-time-limit/flow.cylc index 3fa9301a5a..1f06427c26 100644 --- a/tests/flakyfunctional/cylc-poll/16-execution-time-limit/flow.cylc +++ b/tests/flakyfunctional/cylc-poll/16-execution-time-limit/flow.cylc @@ -13,14 +13,6 @@ [runtime] [[foo]] platform = {{ environ['CYLC_TEST_PLATFORM'] }} - init-script = cylc__job__disable_fail_signals ERR EXIT - script = """ - cylc__job__wait_cylc_message_started - # give it a while for the started message to get picked up by - # the scheduler - sleep 10 - exit 1 - """ - [[[job]]] - execution time limit = PT5S + script = sleep 20 + execution time limit = PT10S [[bar]]