Skip to content

Commit

Permalink
Merge pull request #92 from djarecka/slurm
Browse files Browse the repository at this point in the history
[ENH] adding slurm as a system type
  • Loading branch information
Chenying Zhao authored May 19, 2023
2 parents 986c7be + 6643251 commit 2e11bc5
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 137 deletions.
164 changes: 66 additions & 98 deletions babs/babs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import os
import os.path as op
# from re import L
import subprocess
import warnings
import pandas as pd
Expand All @@ -23,7 +22,6 @@

from babs.utils import (get_immediate_subdirectories,
check_validity_unzipped_input_dataset,
if_input_ds_from_osf,
generate_cmd_set_envvar,
generate_cmd_filterfile,
generate_cmd_singularityRun_from_config, generate_cmd_unzip_inputds,
Expand All @@ -40,10 +38,8 @@
get_list_sub_ses,
submit_one_job,
submit_one_test_job,
create_job_status_csv,
read_job_status_csv,
report_job_status,
request_job_status,
request_all_job_status,
calcu_runtime,
get_last_line,
Expand All @@ -55,7 +51,6 @@
get_git_show_ref_shasum,
ceildiv)

# import pandas as pd

# @build_doc
class BABS():
Expand Down Expand Up @@ -483,11 +478,11 @@ def babs_bootstrap(self, input_ds,
print("\nGenerating a template for job submission calls...")
print("The template text file will be named as `submit_job_template.yaml`.")
yaml_path = op.join(self.analysis_path, "code", "submit_job_template.yaml")
container.generate_job_submit_template(yaml_path, input_ds, self, system)
container.generate_job_submit_template(yaml_path, self, system)

# also, generate template for testing job used by `babs-check-setup`:
yaml_test_path = op.join(self.analysis_path, "code/check_setup", "submit_test_job_template.yaml")
container.generate_test_job_submit_template(yaml_test_path, self, system)
container.generate_job_submit_template(yaml_test_path, self, system, test=True)

# datalad save:
self.datalad_save(path=["code/submit_job_template.yaml",
Expand Down Expand Up @@ -888,7 +883,7 @@ def babs_check_setup(self, input_ds, flag_job_test):
" in the designated environment"
" and will make sure jobs can finish successfully on current cluster.")

_, job_id_str, log_filename = submit_one_test_job(self.analysis_path)
_, job_id_str, log_filename = submit_one_test_job(self.analysis_path, self.type_system)
log_fn = op.join(self.analysis_path, "logs", log_filename) # abs path
o_fn = log_fn.replace(".*", ".o")
# write this information in a YAML file:
Expand All @@ -913,7 +908,7 @@ def babs_check_setup(self, input_ds, flag_job_test):
time.sleep(60) # Sleep for 60 seconds

# check the job status
df_all_job_status = request_all_job_status()
df_all_job_status = request_all_job_status(self.type_system)
d_now_str = str(datetime.now())
to_print = d_now_str + ": "
if job_id_str in df_all_job_status.index.to_list():
Expand Down Expand Up @@ -1051,6 +1046,7 @@ def babs_submit(self, count=1, df_job_specified=None):
if not df_job["has_submitted"][i_job]: # to run
job_id, _, log_filename = submit_one_job(self.analysis_path,
self.type_session,
self.type_system,
sub, ses)

# assign into `df_job_updated`:
Expand Down Expand Up @@ -1100,6 +1096,7 @@ def babs_submit(self, count=1, df_job_specified=None):
job_id, _, log_filename = \
submit_one_job(self.analysis_path,
self.type_session,
self.type_system,
sub, ses)

# assign into `df_job_updated`:
Expand Down Expand Up @@ -1207,7 +1204,7 @@ def babs_status(self, flags_resubmit,
df_job_updated = df_job.copy()

# Get all jobs' status:
df_all_job_status = request_all_job_status()
df_all_job_status = request_all_job_status(self.type_system)

# For jobs that have been submitted but not successful yet:
# Update job status, and resubmit if requested:
Expand Down Expand Up @@ -1321,6 +1318,7 @@ def babs_status(self, flags_resubmit,
job_id_updated, _, log_filename = \
submit_one_job(self.analysis_path,
self.type_session,
self.type_system,
sub, ses)
# update fields:
df_job_updated.at[i_job, "job_id"] = job_id_updated
Expand All @@ -1337,8 +1335,11 @@ def babs_status(self, flags_resubmit,
df_job_updated.at[i_job, "job_state_category"] = state_category
df_job_updated.at[i_job, "job_state_code"] = state_code
# get the duration:
duration = calcu_runtime(
df_all_job_status.at[job_id_str, "JAT_start_time"])
if "duration" in df_all_job_status:
duration = df_all_job_status.at[job_id_str, "duration"]
else:
duration = calcu_runtime(
df_all_job_status.at[job_id_str, "JAT_start_time"])
df_job_updated.at[i_job, "duration"] = duration

# do nothing else, just wait
Expand All @@ -1364,6 +1365,7 @@ def babs_status(self, flags_resubmit,
job_id_updated, _, log_filename = \
submit_one_job(self.analysis_path,
self.type_session,
self.type_system,
sub, ses)
# update fields:
df_job_updated.at[i_job, "job_id"] = job_id_updated
Expand Down Expand Up @@ -1402,6 +1404,7 @@ def babs_status(self, flags_resubmit,
job_id_updated, _, log_filename = \
submit_one_job(self.analysis_path,
self.type_session,
self.type_system,
sub, ses)
# update fields:
df_job_updated.at[i_job, "job_id"] = job_id_updated
Expand Down Expand Up @@ -1449,6 +1452,7 @@ def babs_status(self, flags_resubmit,
job_id_updated, _, log_filename = \
submit_one_job(self.analysis_path,
self.type_session,
self.type_system,
sub, ses)

# update fields:
Expand All @@ -1467,7 +1471,7 @@ def babs_status(self, flags_resubmit,
# message found in log files:
job_name = log_filename.split(".*")[0]
msg_job_account = \
check_job_account(job_id_str, job_name, username_lowercase)
check_job_account(job_id_str, job_name, username_lowercase, self.type_system)
df_job_updated.at[i_job, "job_account"] = msg_job_account
# Done: submitted jobs that not 'is_done'

Expand Down Expand Up @@ -1542,6 +1546,7 @@ def babs_status(self, flags_resubmit,
job_id_updated, _, log_filename = \
submit_one_job(self.analysis_path,
self.type_session,
self.type_system,
sub, ses)
# update fields:
df_job_updated.at[i_job, "job_id"] = job_id_updated
Expand Down Expand Up @@ -2800,31 +2805,34 @@ def generate_bash_test_job(self, folder_check_setup,
)
proc_chmod_pyfile.check_returncode()

def generate_job_submit_template(self, yaml_path, input_ds, babs, system):
def generate_job_submit_template(self, yaml_path, babs, system, test=False):
"""
This is to generate a YAML file that serves as a template
of job submission of one participant (or session).
of job submission of one participant (or session),
or test job submission in `babs-check-setup`.
Parameters:
-------------
yaml_path: str
The path to the yaml file to be generated. It should be in the `analysis/code` folder.
It has several fields: 1) cmd_template; 2) job_name_template
input_ds: class `Input_ds`
input dataset(s) information
babs: class `BABS`
information about the BABS project
system: class `System`
information on cluster management system
test: bool
flag to set to True if generating the test job submit template
for `babs-check-setup`.
"""

# Section 1: Command for submitting the job: ---------------------------
# Flags when submitting the job:
if system.type == "sge":
submit_head = "qsub -cwd"
env_flags = "-v DSLOCKFILE=" + babs.analysis_path + "/.SGE_datalad_lock"
eo_args = "-e " + babs.analysis_path + "/logs " \
+ "-o " + babs.analysis_path + "/logs"
elif system.type == "slurm":
submit_head = "sbatch"
env_flags = "--export=DSLOCKFILE=" + babs.analysis_path + "/.SLURM_datalad_lock"
else:
warnings.warn("not supporting systems other than sge...")

Expand All @@ -2834,95 +2842,55 @@ def generate_job_submit_template(self, yaml_path, input_ds, babs, system):

# Write into the bash file:
yaml_file = open(yaml_path, "a") # open in append mode
yaml_file.write("# '${sub_id}' and '${ses_id}' are placeholders." + "\n")

# Variables to use:
# `dssource`: Input RIA:
dssource = babs.input_ria_url + "#" + babs.analysis_dataset_id
# `pushgitremote`: Output RIA:
pushgitremote = babs.output_ria_data_dir
if not test:
yaml_file.write("# '${sub_id}' and '${ses_id}' are placeholders." + "\n")
# Variables to use:
# `dssource`: Input RIA:
dssource = babs.input_ria_url + "#" + babs.analysis_dataset_id
# `pushgitremote`: Output RIA:
pushgitremote = babs.output_ria_data_dir

# Generate the command:
# several rows in the text file; in between, to insert sub and ses id.
if babs.type_session == "single-ses":
cmd = submit_head + " " + env_flags \
+ " -N " + self.container_name[0:3] + "_" + "${sub_id}"
cmd += " " \
+ eo_args + " " \
+ babs.analysis_path + "/code/participant_job.sh" + " " \
+ dssource + " " \
+ pushgitremote + " " + "${sub_id}"

elif babs.type_session == "multi-ses":
cmd = submit_head + " " + env_flags \
+ " -N " + self.container_name[0:3] + "_" + "${sub_id}_${ses_id}"
cmd += " " \
+ eo_args + " " \
+ babs.analysis_path + "/code/participant_job.sh" + " " \
+ dssource + " " \
+ pushgitremote + " " + "${sub_id} ${ses_id}"

yaml_file.write("cmd_template: '" + cmd + "'" + "\n")

# TODO: currently only support SGE.
# TODO: TBD: adding below to `dict_cluster_systems.yaml` or another YAML file?
if system.type == "sge":
name_flag_str = " -N "
if system.type == "slurm":
name_flag_str = " --job-name "

# Section 2: Job name: ---------------------------
job_name = self.container_name[0:3] + "_" + "${sub_id}"
if babs.type_session == "multi-ses":
job_name += "_${ses_id}"

yaml_file.write("job_name_template: '" + job_name + "'\n")

yaml_file.close()

def generate_test_job_submit_template(self, yaml_path, babs, system):
"""
This is to generate a YAML file that serves as a template
of *test* job submission, which will be used in `babs-check-setup`.
Parameters:
------------
yaml_path: str
The path to the yaml file to be generated.
It should be in the `analysis/code/check_setup` folder.
It has several fields: 1) cmd_template; 2) job_name_template
babs: class `BABS`
information about the BABS project
system: class `System`
information on cluster management system
"""
# Job name:
if test:
job_name = self.container_name[0:3] + "_" + "test_job"
else:
job_name = self.container_name[0:3] + "_" + "${sub_id}"
if babs.type_session == "multi-ses":
job_name += "_${ses_id}"

# Section 1: Command for submitting the job: ---------------------------
# Flags when submitting the job:
# Now, we can define stdout and stderr file names/paths:
if system.type == "sge":
submit_head = "qsub -cwd"
env_flags = "-v DSLOCKFILE=" + babs.analysis_path + "/.SGE_datalad_lock"
# sge clusters only need logs folder path; filename is not needed:
eo_args = "-e " + babs.analysis_path + "/logs " \
+ "-o " + babs.analysis_path + "/logs"
elif system.type == "slurm":
# slurm clusters also need exact filenames:
eo_args = "-e " + babs.analysis_path + f"/logs/{job_name}.e%A " \
+ "-o " + babs.analysis_path + f"/logs/{job_name}.o%A"

# Generate the job submission command, with sub ID and ses ID as placeholders:
cmd = submit_head + " " + env_flags + name_flag_str + job_name + " " + eo_args + " "
if test:
cmd += babs.analysis_path + "/code/check_setup/call_test_job.sh"
else:
warnings.warn("not supporting systems other than sge...")

# Check if the bash file already exist:
if op.exists(yaml_path):
os.remove(yaml_path) # remove it

# Write into the bash file:
yaml_file = open(yaml_path, "a") # open in append mode

# Generate the command:
cmd = submit_head + " " + env_flags \
+ " -N " + self.container_name[0:3] + "_" + "test_job"
cmd += " " \
+ eo_args + " " \
+ babs.analysis_path + "/code/check_setup/call_test_job.sh"
# if test is False, the type of session will be checked
if babs.type_session == "single-ses":
cmd += babs.analysis_path + "/code/participant_job.sh" + " " \
+ dssource + " " \
+ pushgitremote + " " + "${sub_id}"
elif babs.type_session == "multi-ses":
cmd += babs.analysis_path + "/code/participant_job.sh" + " " \
+ dssource + " " \
+ pushgitremote + " " + "${sub_id} ${ses_id}"

yaml_file.write("cmd_template: '" + cmd + "'" + "\n")

# TODO: currently only support SGE.

# Section 2: Job name: ---------------------------
job_name = self.container_name[0:3] + "_" + "test_job"

yaml_file.write("job_name_template: '" + job_name + "'\n")

yaml_file.close()
9 changes: 8 additions & 1 deletion babs/dict_cluster_systems.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,11 @@ sge:
soft_memory_limit: "-l s_vmem=$VALUE" # "-l s_vmem=23.5G" on cubic
temporary_disk_space: "-l tmpfree=$VALUE" # "-l tmpfree=200G" on cubic
number_of_cpus: "-pe threaded $VALUE" # "-pe threaded N" or a range: "-pe threaded N-M", N<M on cubic
hard_runtime_limit: "-l h_rt=$VALUE" # "-l h_rt=24:00:00" on cubic
hard_runtime_limit: "-l h_rt=$VALUE" # "-l h_rt=24:00:00" on cubic
slurm:
interpreting_shell: ""
hard_memory_limit: "--mem=$VALUE"
soft_memory_limit: ""
temporary_disk_space: "--tmp=$VALUE" # "--tmp=20g" on MSI
number_of_cpus: "--cpus-per-task=$VALUE"
hard_runtime_limit: "--time=$VALUE"
Loading

0 comments on commit 2e11bc5

Please sign in to comment.