diff --git a/babs/babs.py b/babs/babs.py index 4f50cdb4..af1745a5 100644 --- a/babs/babs.py +++ b/babs/babs.py @@ -2,7 +2,6 @@ import os import os.path as op -# from re import L import subprocess import warnings import pandas as pd @@ -23,7 +22,6 @@ from babs.utils import (get_immediate_subdirectories, check_validity_unzipped_input_dataset, - if_input_ds_from_osf, generate_cmd_set_envvar, generate_cmd_filterfile, generate_cmd_singularityRun_from_config, generate_cmd_unzip_inputds, @@ -40,10 +38,8 @@ get_list_sub_ses, submit_one_job, submit_one_test_job, - create_job_status_csv, read_job_status_csv, report_job_status, - request_job_status, request_all_job_status, calcu_runtime, get_last_line, @@ -55,7 +51,6 @@ get_git_show_ref_shasum, ceildiv) -# import pandas as pd # @build_doc class BABS(): @@ -483,11 +478,11 @@ def babs_bootstrap(self, input_ds, print("\nGenerating a template for job submission calls...") print("The template text file will be named as `submit_job_template.yaml`.") yaml_path = op.join(self.analysis_path, "code", "submit_job_template.yaml") - container.generate_job_submit_template(yaml_path, input_ds, self, system) + container.generate_job_submit_template(yaml_path, self, system) # also, generate template for testing job used by `babs-check-setup`: yaml_test_path = op.join(self.analysis_path, "code/check_setup", "submit_test_job_template.yaml") - container.generate_test_job_submit_template(yaml_test_path, self, system) + container.generate_job_submit_template(yaml_test_path, self, system, test=True) # datalad save: self.datalad_save(path=["code/submit_job_template.yaml", @@ -888,7 +883,7 @@ def babs_check_setup(self, input_ds, flag_job_test): " in the designated environment" " and will make sure jobs can finish successfully on current cluster.") - _, job_id_str, log_filename = submit_one_test_job(self.analysis_path) + _, job_id_str, log_filename = submit_one_test_job(self.analysis_path, self.type_system) log_fn = op.join(self.analysis_path, "logs", log_filename) # abs path o_fn = log_fn.replace(".*", ".o") # write this information in a YAML file: @@ -913,7 +908,7 @@ def babs_check_setup(self, input_ds, flag_job_test): time.sleep(60) # Sleep for 60 seconds # check the job status - df_all_job_status = request_all_job_status() + df_all_job_status = request_all_job_status(self.type_system) d_now_str = str(datetime.now()) to_print = d_now_str + ": " if job_id_str in df_all_job_status.index.to_list(): @@ -1051,6 +1046,7 @@ def babs_submit(self, count=1, df_job_specified=None): if not df_job["has_submitted"][i_job]: # to run job_id, _, log_filename = submit_one_job(self.analysis_path, self.type_session, + self.type_system, sub, ses) # assign into `df_job_updated`: @@ -1100,6 +1096,7 @@ def babs_submit(self, count=1, df_job_specified=None): job_id, _, log_filename = \ submit_one_job(self.analysis_path, self.type_session, + self.type_system, sub, ses) # assign into `df_job_updated`: @@ -1207,7 +1204,7 @@ def babs_status(self, flags_resubmit, df_job_updated = df_job.copy() # Get all jobs' status: - df_all_job_status = request_all_job_status() + df_all_job_status = request_all_job_status(self.type_system) # For jobs that have been submitted but not successful yet: # Update job status, and resubmit if requested: @@ -1321,6 +1318,7 @@ def babs_status(self, flags_resubmit, job_id_updated, _, log_filename = \ submit_one_job(self.analysis_path, self.type_session, + self.type_system, sub, ses) # update fields: df_job_updated.at[i_job, "job_id"] = job_id_updated @@ -1337,8 +1335,11 @@ def babs_status(self, flags_resubmit, df_job_updated.at[i_job, "job_state_category"] = state_category df_job_updated.at[i_job, "job_state_code"] = state_code # get the duration: - duration = calcu_runtime( - df_all_job_status.at[job_id_str, "JAT_start_time"]) + if "duration" in df_all_job_status: + duration = df_all_job_status.at[job_id_str, "duration"] + else: + duration = calcu_runtime( + df_all_job_status.at[job_id_str, "JAT_start_time"]) df_job_updated.at[i_job, "duration"] = duration # do nothing else, just wait @@ -1364,6 +1365,7 @@ def babs_status(self, flags_resubmit, job_id_updated, _, log_filename = \ submit_one_job(self.analysis_path, self.type_session, + self.type_system, sub, ses) # update fields: df_job_updated.at[i_job, "job_id"] = job_id_updated @@ -1402,6 +1404,7 @@ def babs_status(self, flags_resubmit, job_id_updated, _, log_filename = \ submit_one_job(self.analysis_path, self.type_session, + self.type_system, sub, ses) # update fields: df_job_updated.at[i_job, "job_id"] = job_id_updated @@ -1449,6 +1452,7 @@ def babs_status(self, flags_resubmit, job_id_updated, _, log_filename = \ submit_one_job(self.analysis_path, self.type_session, + self.type_system, sub, ses) # update fields: @@ -1467,7 +1471,7 @@ def babs_status(self, flags_resubmit, # message found in log files: job_name = log_filename.split(".*")[0] msg_job_account = \ - check_job_account(job_id_str, job_name, username_lowercase) + check_job_account(job_id_str, job_name, username_lowercase, self.type_system) df_job_updated.at[i_job, "job_account"] = msg_job_account # Done: submitted jobs that not 'is_done' @@ -1542,6 +1546,7 @@ def babs_status(self, flags_resubmit, job_id_updated, _, log_filename = \ submit_one_job(self.analysis_path, self.type_session, + self.type_system, sub, ses) # update fields: df_job_updated.at[i_job, "job_id"] = job_id_updated @@ -2800,22 +2805,24 @@ def generate_bash_test_job(self, folder_check_setup, ) proc_chmod_pyfile.check_returncode() - def generate_job_submit_template(self, yaml_path, input_ds, babs, system): + def generate_job_submit_template(self, yaml_path, babs, system, test=False): """ This is to generate a YAML file that serves as a template - of job submission of one participant (or session). + of job submission of one participant (or session), + or test job submission in `babs-check-setup`. Parameters: ------------- yaml_path: str The path to the yaml file to be generated. It should be in the `analysis/code` folder. It has several fields: 1) cmd_template; 2) job_name_template - input_ds: class `Input_ds` - input dataset(s) information babs: class `BABS` information about the BABS project system: class `System` information on cluster management system + test: bool + flag to set to True if generating the test job submit template + for `babs-check-setup`. """ # Section 1: Command for submitting the job: --------------------------- @@ -2823,8 +2830,9 @@ def generate_job_submit_template(self, yaml_path, input_ds, babs, system): if system.type == "sge": submit_head = "qsub -cwd" env_flags = "-v DSLOCKFILE=" + babs.analysis_path + "/.SGE_datalad_lock" - eo_args = "-e " + babs.analysis_path + "/logs " \ - + "-o " + babs.analysis_path + "/logs" + elif system.type == "slurm": + submit_head = "sbatch" + env_flags = "--export=DSLOCKFILE=" + babs.analysis_path + "/.SLURM_datalad_lock" else: warnings.warn("not supporting systems other than sge...") @@ -2834,95 +2842,55 @@ def generate_job_submit_template(self, yaml_path, input_ds, babs, system): # Write into the bash file: yaml_file = open(yaml_path, "a") # open in append mode - yaml_file.write("# '${sub_id}' and '${ses_id}' are placeholders." + "\n") - - # Variables to use: - # `dssource`: Input RIA: - dssource = babs.input_ria_url + "#" + babs.analysis_dataset_id - # `pushgitremote`: Output RIA: - pushgitremote = babs.output_ria_data_dir + if not test: + yaml_file.write("# '${sub_id}' and '${ses_id}' are placeholders." + "\n") + # Variables to use: + # `dssource`: Input RIA: + dssource = babs.input_ria_url + "#" + babs.analysis_dataset_id + # `pushgitremote`: Output RIA: + pushgitremote = babs.output_ria_data_dir # Generate the command: - # several rows in the text file; in between, to insert sub and ses id. - if babs.type_session == "single-ses": - cmd = submit_head + " " + env_flags \ - + " -N " + self.container_name[0:3] + "_" + "${sub_id}" - cmd += " " \ - + eo_args + " " \ - + babs.analysis_path + "/code/participant_job.sh" + " " \ - + dssource + " " \ - + pushgitremote + " " + "${sub_id}" - - elif babs.type_session == "multi-ses": - cmd = submit_head + " " + env_flags \ - + " -N " + self.container_name[0:3] + "_" + "${sub_id}_${ses_id}" - cmd += " " \ - + eo_args + " " \ - + babs.analysis_path + "/code/participant_job.sh" + " " \ - + dssource + " " \ - + pushgitremote + " " + "${sub_id} ${ses_id}" - - yaml_file.write("cmd_template: '" + cmd + "'" + "\n") - - # TODO: currently only support SGE. + # TODO: TBD: adding below to `dict_cluster_systems.yaml` or another YAML file? + if system.type == "sge": + name_flag_str = " -N " + if system.type == "slurm": + name_flag_str = " --job-name " # Section 2: Job name: --------------------------- - job_name = self.container_name[0:3] + "_" + "${sub_id}" - if babs.type_session == "multi-ses": - job_name += "_${ses_id}" - - yaml_file.write("job_name_template: '" + job_name + "'\n") - - yaml_file.close() - - def generate_test_job_submit_template(self, yaml_path, babs, system): - """ - This is to generate a YAML file that serves as a template - of *test* job submission, which will be used in `babs-check-setup`. - - Parameters: - ------------ - yaml_path: str - The path to the yaml file to be generated. - It should be in the `analysis/code/check_setup` folder. - It has several fields: 1) cmd_template; 2) job_name_template - babs: class `BABS` - information about the BABS project - system: class `System` - information on cluster management system - """ + # Job name: + if test: + job_name = self.container_name[0:3] + "_" + "test_job" + else: + job_name = self.container_name[0:3] + "_" + "${sub_id}" + if babs.type_session == "multi-ses": + job_name += "_${ses_id}" - # Section 1: Command for submitting the job: --------------------------- - # Flags when submitting the job: + # Now, we can define stdout and stderr file names/paths: if system.type == "sge": - submit_head = "qsub -cwd" - env_flags = "-v DSLOCKFILE=" + babs.analysis_path + "/.SGE_datalad_lock" + # sge clusters only need logs folder path; filename is not needed: eo_args = "-e " + babs.analysis_path + "/logs " \ + "-o " + babs.analysis_path + "/logs" + elif system.type == "slurm": + # slurm clusters also need exact filenames: + eo_args = "-e " + babs.analysis_path + f"/logs/{job_name}.e%A " \ + + "-o " + babs.analysis_path + f"/logs/{job_name}.o%A" + + # Generate the job submission command, with sub ID and ses ID as placeholders: + cmd = submit_head + " " + env_flags + name_flag_str + job_name + " " + eo_args + " " + if test: + cmd += babs.analysis_path + "/code/check_setup/call_test_job.sh" else: - warnings.warn("not supporting systems other than sge...") - - # Check if the bash file already exist: - if op.exists(yaml_path): - os.remove(yaml_path) # remove it - - # Write into the bash file: - yaml_file = open(yaml_path, "a") # open in append mode - - # Generate the command: - cmd = submit_head + " " + env_flags \ - + " -N " + self.container_name[0:3] + "_" + "test_job" - cmd += " " \ - + eo_args + " " \ - + babs.analysis_path + "/code/check_setup/call_test_job.sh" + # if test is False, the type of session will be checked + if babs.type_session == "single-ses": + cmd += babs.analysis_path + "/code/participant_job.sh" + " " \ + + dssource + " " \ + + pushgitremote + " " + "${sub_id}" + elif babs.type_session == "multi-ses": + cmd += babs.analysis_path + "/code/participant_job.sh" + " " \ + + dssource + " " \ + + pushgitremote + " " + "${sub_id} ${ses_id}" yaml_file.write("cmd_template: '" + cmd + "'" + "\n") - - # TODO: currently only support SGE. - - # Section 2: Job name: --------------------------- - job_name = self.container_name[0:3] + "_" + "test_job" - yaml_file.write("job_name_template: '" + job_name + "'\n") - yaml_file.close() diff --git a/babs/dict_cluster_systems.yaml b/babs/dict_cluster_systems.yaml index ff8bc04a..29392207 100644 --- a/babs/dict_cluster_systems.yaml +++ b/babs/dict_cluster_systems.yaml @@ -10,4 +10,11 @@ sge: soft_memory_limit: "-l s_vmem=$VALUE" # "-l s_vmem=23.5G" on cubic temporary_disk_space: "-l tmpfree=$VALUE" # "-l tmpfree=200G" on cubic number_of_cpus: "-pe threaded $VALUE" # "-pe threaded N" or a range: "-pe threaded N-M", N