Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change SchedulerRequirement to SlurmRequirement; add qos and reservation #996

Merged
merged 5 commits into from
Feb 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions beeflow/common/config_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,10 @@ def filepath_completion_input(*pargs, **kwargs):
default='', info='default account time limit (leave blank if none)')
VALIDATOR.option('job', 'default_partition', validator=lambda val: val.strip(), prompt=True,
default='', info='default partition to run jobs on (leave blank if none)')
VALIDATOR.option('job', 'default_qos', validator=lambda val: val.strip(), prompt=True,
default='', info='default qos to run jobs on (leave blank if none)')
VALIDATOR.option('job', 'default_reservation', validator=lambda val: val.strip(), prompt=True,
default='', info='default reservation to run jobs on (leave blank if none)')


def validate_chrun_opts(opts):
Expand Down
24 changes: 17 additions & 7 deletions beeflow/common/worker/slurm_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,15 @@
class BaseSlurmWorker(Worker):
"""Base slurm worker code."""

def __init__(self, default_account='', default_time_limit='', default_partition='', **kwargs):
def __init__(self, default_account='', default_time_limit='', default_partition='',
default_qos='', default_reservation='', **kwargs):
"""Initialize the base slurm worker."""
super().__init__(**kwargs)
self.default_account = default_account
self.default_time_limit = default_time_limit
self.default_partition = default_partition
self.default_qos = default_qos
self.default_reservation = default_reservation

def build_text(self, task):
"""Build text for task script."""
Expand All @@ -50,14 +53,17 @@ def build_text(self, task):
ntasks = task.get_requirement('beeflow:MPIRequirement', 'ntasks', default=nodes)
# Need to rethink the MPI version parameter
mpi_version = task.get_requirement('beeflow:MPIRequirement', 'mpiVersion', default='')
time_limit = task.get_requirement('beeflow:SchedulerRequirement', 'timeLimit',
time_limit = task.get_requirement('beeflow:SlurmRequirement', 'timeLimit',
default=self.default_time_limit)
time_limit = validation.time_limit(time_limit)
account = task.get_requirement('beeflow:SchedulerRequirement', 'account',
account = task.get_requirement('beeflow:SlurmRequirement', 'account',
default=self.default_account)
partition = task.get_requirement('beeflow:SchedulerRequirement',
'partition',
partition = task.get_requirement('beeflow:SlurmRequirement', 'partition',
default=self.default_partition)
qos = task.get_requirement('beeflow:SlurmRequirement', 'qos',
default=self.default_qos)
reservation = task.get_requirement('beeflow:SlurmRequirement', 'reservation',
default=self.default_reservation)

shell = task.get_requirement('beeflow:ScriptRequirement', 'shell', default="/bin/bash")
scripts_enabled = task.get_requirement('beeflow:ScriptRequirement', 'enabled',
Expand All @@ -81,9 +87,13 @@ def build_text(self, task):
if time_limit:
script.append(f'#SBATCH --time={time_limit}')
if account:
script.append(f'#SBATCH -A {account}')
script.append(f'#SBATCH --account {account}')
if partition:
script.append(f'#SBATCH -p {partition}')
script.append(f'#SBATCH --partition {partition}')
if qos:
script.append(f'#SBATCH --qos {qos}')
if reservation:
script.append(f'#SBATCH --reservation {reservation}')

# Return immediately on error
if shell == "/bin/bash":
Expand Down
3 changes: 2 additions & 1 deletion beeflow/task_manager/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ def worker_interface():
'runner_opts': bc.get('task_manager', 'runner_opts'),
}
# Job defaults
for default_key in ['default_account', 'default_time_limit', 'default_partition']:
for default_key in ['default_account', 'default_time_limit', 'default_partition',
'default_qos', 'default_reservation']:
worker_kwargs[default_key] = bc.get('job', default_key)
# Special slurm arguments
if wls == 'Slurm':
Expand Down
2 changes: 1 addition & 1 deletion beeflow/wf_manager/resources/wf_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def handle_state_change(self, state_update, task, wfi, db):

# If the job failed and it doesn't include a checkpoint-restart hint,
# then fail the entire workflow
if state_update.job_state == 'FAILED':
if state_update.job_state in ['FAILED', 'SUBMIT_FAIL']:
set_dependent_tasks_dep_fail(db, wfi, state_update.wf_id, task)
log.info("Workflow failed")
wf_id = wfi.workflow_id
Expand Down
2 changes: 2 additions & 0 deletions ci/bee_config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ setup =
default_account =
default_time_limit =
default_partition =
default_qos=
default_reservation=

[graphdb]
hostname = localhost
Expand Down
4 changes: 2 additions & 2 deletions coverage.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
22 changes: 13 additions & 9 deletions docs/sphinx/bee_cwl.rst
Original file line number Diff line number Diff line change
Expand Up @@ -154,25 +154,29 @@ filenames, the ``restart parameter`` will be added to the run command followed
by the path to the latest checkpoint file, and ``num_tries`` specifies the maximum
number of times the task will be restarted.

beeflow:SchedulerRequirement
beeflow:SlurmRequirement
----------------------------

This requirement is designed for specifying additional information that will be
passed to a scheduler such as Slurm on job submission. It currently supports
the following options:

passed to the Slurm scheduler during job submission. Each of the options can be
set in the configuration file bee.conf under the ``job`` section to use for all
workflows. Setting any beeflow:SlurmRequirement in the CWL file will override the
setting in bee.conf. Current options supported are:

* ``account`` - account name to run the job with (often used for charging).
* ``partition`` - partition to launch job on.
* ``qos`` - quality of service to use.
* ``reservation`` - reservation to use to launch job.
* ``timeLimit`` - time limit for the job in the format that Slurm uses currently.
* ``account`` - may be useful if running jobs with different accounts (if you
want to run all workflows with the same account it's best to set this with
the ``default_account`` option under the ``job`` section in the bee.conf file).
* ``partition`` - partition to launch job with.

An example is shown below::

beeflow:SchedulerRequirement:
timeLimit: 00:00:10
account: account12345
partition: scaling
partition: partition-a
qos: long
reservation: reservation-a

beeflow:ScriptRequirement
-------------------------
Expand Down
Loading