Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kill tasks during job prep #6535

Open
wants to merge 7 commits into
base: 8.4.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes.d/6535.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Ensure tasks can be killed while in the preparing state.
46 changes: 27 additions & 19 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from collections import deque
from contextlib import suppress
import itertools
import logging
import os
from pathlib import Path
from queue import (
Expand Down Expand Up @@ -82,6 +83,7 @@
FLOW_NONE,
FlowMgr,
repr_flow_nums,
stringify_flow_nums,
)
from cylc.flow.host_select import (
HostSelectException,
Expand Down Expand Up @@ -1078,18 +1080,21 @@ def kill_tasks(
to_kill: List[TaskProxy] = []
unkillable: List[TaskProxy] = []
for itask in itasks:
if itask.state(*TASK_STATUSES_ACTIVE):
if itask.state_reset(is_held=True):
self.data_store_mgr.delta_task_state(itask)
if not itask.state(TASK_STATUS_PREPARING, *TASK_STATUSES_ACTIVE):
unkillable.append(itask)
continue
if itask.state_reset(is_held=True):
self.data_store_mgr.delta_task_state(itask)
if itask.state(TASK_STATUS_PREPARING):
self.task_job_mgr.kill_prep_task(itask)
else:
to_kill.append(itask)
if jobless:
# Directly set failed in sim mode:
self.task_events_mgr.process_message(
itask, 'CRITICAL', TASK_STATUS_FAILED,
flag=self.task_events_mgr.FLAG_RECEIVED
)
else:
unkillable.append(itask)
if warn and unkillable:
LOG.warning(
"Tasks not killable: "
Expand Down Expand Up @@ -1250,6 +1255,7 @@ def get_contact_data(self) -> Dict[str, str]:
"""
fields = workflow_files.ContactFileFields
proc = psutil.Process()
platform = get_platform()
# fmt: off
return {
fields.API:
Expand All @@ -1275,11 +1281,11 @@ def get_contact_data(self) -> Dict[str, str]:
fields.VERSION:
CYLC_VERSION,
fields.SCHEDULER_SSH_COMMAND:
str(get_platform()['ssh command']),
str(platform['ssh command']),
fields.SCHEDULER_CYLC_PATH:
str(get_platform()['cylc path']),
str(platform['cylc path']),
fields.SCHEDULER_USE_LOGIN_SHELL:
str(get_platform()['use login shell'])
str(platform['use login shell'])
}
# fmt: on

Expand Down Expand Up @@ -1531,22 +1537,24 @@ def start_job_submission(self, itasks: 'Iterable[TaskProxy]') -> bool:
self.task_job_mgr.task_remote_mgr.rsync_includes = (
self.config.get_validated_rsync_includes())

log = LOG.debug
if self.options.reftest or self.options.genref:
log = LOG.info

for itask in self.task_job_mgr.submit_task_jobs(
submitted = self.task_job_mgr.submit_task_jobs(
self.workflow,
itasks,
self.server.curve_auth,
self.server.client_pub_key_dir,
run_mode=self.get_run_mode()
):
if itask.flow_nums:
flow = ','.join(str(i) for i in itask.flow_nums)
else:
flow = FLOW_NONE
log(
)
if not submitted:
return False

log_lvl = logging.DEBUG
if self.options.reftest or self.options.genref:
log_lvl = logging.INFO

for itask in submitted:
flow = stringify_flow_nums(itask.flow_nums) or FLOW_NONE
LOG.log(
log_lvl,
f"{itask.identity} -triggered off "
f"{itask.state.get_resolved_dependencies()} in flow {flow}"
)
Expand Down
4 changes: 4 additions & 0 deletions cylc/flow/subprocctx.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from shlex import quote
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union


from cylc.flow.wallclock import get_current_time_string

if TYPE_CHECKING:
Expand Down Expand Up @@ -137,6 +138,9 @@
'mesg': mesg}
return ret.rstrip()

def __repr__(self) -> str:
return f"<{type(self).__name__} {self.cmd_key}>"

Check warning on line 142 in cylc/flow/subprocctx.py

View check run for this annotation

Codecov / codecov/patch

cylc/flow/subprocctx.py#L142

Added line #L142 was not covered by tests


class SubFuncContext(SubProcContext):
"""Represent the context of a Python function to run as a subprocess.
Expand Down
58 changes: 42 additions & 16 deletions cylc/flow/task_job_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@


if TYPE_CHECKING:
# BACK COMPAT: typing_extensions.Literal
# FROM: Python 3.7
# TO: Python 3.8
from typing_extensions import Literal

from cylc.flow.task_proxy import TaskProxy
from cylc.flow.workflow_db_mgr import WorkflowDatabaseManager

Expand Down Expand Up @@ -156,10 +161,10 @@ class TaskJobManager:

def __init__(self, workflow, proc_pool, workflow_db_mgr,
task_events_mgr, data_store_mgr, bad_hosts):
self.workflow = workflow
self.workflow: str = workflow
self.proc_pool = proc_pool
self.workflow_db_mgr: WorkflowDatabaseManager = workflow_db_mgr
self.task_events_mgr = task_events_mgr
self.task_events_mgr: TaskEventsManager = task_events_mgr
self.data_store_mgr = data_store_mgr
self.job_file_writer = JobFileWriter()
self.job_runner_mgr = self.job_file_writer.job_runner_mgr
Expand Down Expand Up @@ -196,6 +201,15 @@ def kill_task_jobs(
self._kill_task_jobs_callback_255
)

def kill_prep_task(self, itask: 'TaskProxy') -> None:
"""Kill a preparing task."""
itask.waiting_on_job_prep = False
itask.local_job_file_path = None # reset for retry
self._set_retry_timers(itask)
self._prep_submit_task_job_error(
self.workflow, itask, '(killed in job prep)', ''
)

def poll_task_jobs(self, workflow, itasks, msg=None):
"""Poll jobs of specified tasks.

Expand All @@ -220,14 +234,19 @@ def poll_task_jobs(self, workflow, itasks, msg=None):
self._poll_task_jobs_callback_255
)

def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True):
def prep_submit_task_jobs(
self,
workflow: str,
itasks: 'Iterable[TaskProxy]',
check_syntax: bool = True,
) -> 'Tuple[List[TaskProxy], List[TaskProxy]]':
"""Prepare task jobs for submit.

Prepare tasks where possible. Ignore tasks that are waiting for host
select command to complete. Bad host select command or error writing to
a job file will cause a bad task - leading to submission failure.

Return [list, list]: list of good tasks, list of bad tasks
Return (good_tasks, bad_tasks)
"""
prepared_tasks = []
bad_tasks = []
Expand All @@ -244,16 +263,16 @@ def prep_submit_task_jobs(self, workflow, itasks, check_syntax=True):
prepared_tasks.append(itask)
elif prep_task is False:
bad_tasks.append(itask)
return [prepared_tasks, bad_tasks]
return (prepared_tasks, bad_tasks)

def submit_task_jobs(
self,
workflow,
itasks,
itasks: 'Iterable[TaskProxy]',
curve_auth,
client_pub_key_dir,
run_mode: RunMode = RunMode.LIVE,
):
) -> 'List[TaskProxy]':
"""Prepare for job submission and submit task jobs.

Preparation (host selection, remote host init, and remote install)
Expand All @@ -264,7 +283,7 @@ def submit_task_jobs(
Once preparation has completed or failed, reset .waiting_on_job_prep in
task instances so the scheduler knows to stop sending them back here.

This method uses prep_submit_task_job() as helper.
This method uses prep_submit_task_jobs() as helper.

Return (list): list of tasks that attempted submission.
"""
Expand Down Expand Up @@ -327,7 +346,7 @@ def submit_livelike_task_jobs(
bc_mgr = self.task_events_mgr.broadcast_mgr
rtconf = bc_mgr.get_updated_rtconfig(itask)
try:
platform = get_platform(
platform = get_platform( # type: ignore[assignment]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in #6564

rtconf,
bad_hosts=self.bad_hosts
)
Expand Down Expand Up @@ -1029,7 +1048,7 @@ def _set_retry_timers(
def submit_nonlive_task_jobs(
self: 'TaskJobManager',
workflow: str,
itasks: 'List[TaskProxy]',
itasks: 'Iterable[TaskProxy]',
workflow_run_mode: RunMode,
) -> 'Tuple[List[TaskProxy], List[TaskProxy]]':
"""Identify task mode and carry out alternative submission
Expand Down Expand Up @@ -1152,7 +1171,7 @@ def _prep_submit_task_job(
workflow: str,
itask: 'TaskProxy',
check_syntax: bool = True
):
) -> 'Union[TaskProxy, None, Literal[False]]':
"""Prepare a task job submission.

Returns:
Expand Down Expand Up @@ -1217,7 +1236,7 @@ def _prep_submit_task_job(
else:
# host/platform select not ready
if host_n is None and platform_name is None:
return
return None
elif (
host_n is None
and rtconfig['platform']
Expand Down Expand Up @@ -1259,7 +1278,7 @@ def _prep_submit_task_job(
workflow, itask, '(platform not defined)', exc)
return False
else:
itask.platform = platform
itask.platform = platform # type: ignore[assignment]
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in #6564

# Retry delays, needed for the try_num
self._set_retry_timers(itask, rtconfig)

Expand Down Expand Up @@ -1292,7 +1311,13 @@ def _prep_submit_task_job(
itask.local_job_file_path = local_job_file_path
return itask

def _prep_submit_task_job_error(self, workflow, itask, action, exc):
def _prep_submit_task_job_error(
self,
workflow: str,
itask: 'TaskProxy',
action: str,
exc: Union[Exception, str],
) -> None:
"""Helper for self._prep_submit_task_job. On error."""
log_task_job_activity(
SubProcContext(self.JOBS_SUBMIT, action, err=exc, ret_code=1),
Expand All @@ -1306,11 +1331,12 @@ def _prep_submit_task_job_error(self, workflow, itask, action, exc):
# than submit-failed
# provide a dummy job config - this info will be added to the data
# store
try_num = itask.get_try_num()
itask.jobs.append({
'task_id': itask.identity,
'platform': itask.platform,
'submit_num': itask.submit_num,
'try_num': itask.get_try_num(),
'try_num': try_num,
})
# create a DB entry for the submit-failed job
self.workflow_db_mgr.put_insert_task_jobs(
Expand All @@ -1319,7 +1345,7 @@ def _prep_submit_task_job_error(self, workflow, itask, action, exc):
'flow_nums': serialise_set(itask.flow_nums),
'job_id': itask.summary.get('submit_method_id'),
'is_manual_submit': itask.is_manual_submit,
'try_num': itask.get_try_num(),
'try_num': try_num,
'time_submit': get_current_time_string(),
'platform_name': itask.platform['name'],
'job_runner_name': itask.summary['job_runner_name'],
Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/task_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ class TaskProxy:
graph children: {msg: [(name, point), ...]}
.flow_nums:
flows I belong to (if empty, belongs to 'none' flow)
flow_wait:
.flow_wait:
wait for flow merge before spawning children
.waiting_on_job_prep:
True whilst task is awaiting job prep, reset to False once the
Expand Down Expand Up @@ -316,7 +316,7 @@ def __init__(
)

def __repr__(self) -> str:
return f"<{self.__class__.__name__} {self.identity}>"
return f"<{type(self).__name__} {self.identity} {self.state}>"

def __str__(self) -> str:
"""Stringify with tokens, state, submit_num, and flow_nums.
Expand Down
12 changes: 0 additions & 12 deletions tests/functional/cylc-kill/03-simulation/flow.cylc

This file was deleted.

47 changes: 47 additions & 0 deletions tests/functional/cylc-kill/04-handlers.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#!/usr/bin/env bash
# THIS FILE IS PART OF THE CYLC WORKFLOW ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

# Test event handlers when killing running/submitted/preparing tasks.
# Any downstream tasks that depend on the `:submit-fail`/`:fail` outputs
# SHOULD run.
# Handlers for the `submission failed`/`failed` events SHOULD run.

export REQUIRE_PLATFORM='runner:at'
. "$(dirname "$0")/test_header"
set_test_number 5

# Create platform that ensures job will be in submitted state for long enough
create_test_global_config '' "
[platforms]
[[old_street]]
job runner = at
job runner command template = at now + 5 minutes
hosts = localhost
install target = localhost
"

install_and_validate
reftest_run

grep_workflow_log_ok "grep-a" "[(('event-handler-00', 'failed'), 1) out] 1/a" -F

for task in b c; do
grep_workflow_log_ok "grep-${task}" \
"[(('event-handler-00', 'submission failed'), 1) out] 1/${task}" -F
done

purge
Loading
Loading