diff --git a/.vscode/launch.json b/.vscode/launch.json index 5e7a0d84..15932eb4 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -18,28 +18,28 @@ // "--project-root", // "/Users/chenyzh/Desktop/Research/Satterthwaite_Lab/datalad_wrapper/data/test_babs_multi-ses_toybidsapp", // ], - "args": [ - "--where_project", - "/Users/chenyzh/Desktop/Research/Satterthwaite_Lab/datalad_wrapper/data", - "--project_name", - "test_babs_multi-ses_toybidsapp", // "test_babs_multi-ses_fmriprep", - "--input", - "BIDS", - "/Users/chenyzh/Desktop/Research/Satterthwaite_Lab/datalad_wrapper/data/w2nu3", - // "https://osf.io/w2nu3/", - "--container_ds", - "/Users/chenyzh/Desktop/Research/Satterthwaite_Lab/datalad_wrapper/data/toybidsapp-container-docker", - "--container_name", - "toybidsapp-0-0-7", // "fmriprep-20-2-3", - "--container_config_yaml_file", - "/Users/chenyzh/Desktop/Research/Satterthwaite_Lab/datalad_wrapper/babs/notebooks/example_container_toybidsapp.yaml", - // "/Users/chenyzh/Desktop/Research/Satterthwaite_Lab/datalad_wrapper/babs/notebooks/example_container_fmriprep.yaml", - "--type_session", - "multi-ses", - "--type_system", - "sge", - // "--keep-if-failed" - ] + // "args": [ + // "--where_project", + // "/Users/chenyzh/Desktop/Research/Satterthwaite_Lab/datalad_wrapper/data", + // "--project_name", + // "test_babs_multi-ses_toybidsapp", // "test_babs_multi-ses_fmriprep", + // "--input", + // "BIDS", + // "/Users/chenyzh/Desktop/Research/Satterthwaite_Lab/datalad_wrapper/data/w2nu3", + // // "https://osf.io/w2nu3/", + // "--container_ds", + // "/Users/chenyzh/Desktop/Research/Satterthwaite_Lab/datalad_wrapper/data/toybidsapp-container-docker", + // "--container_name", + // "toybidsapp-0-0-7", // "fmriprep-20-2-3", + // "--container_config_yaml_file", + // "/Users/chenyzh/Desktop/Research/Satterthwaite_Lab/datalad_wrapper/babs/notebooks/example_container_toybidsapp.yaml", + // // "/Users/chenyzh/Desktop/Research/Satterthwaite_Lab/datalad_wrapper/babs/notebooks/example_container_fmriprep.yaml", + // "--type_session", + // "multi-ses", + // "--type_system", + // "sge", + // // "--keep-if-failed" + // ] // "args": [ // "--where_project", "/cbica/projects/BABS/data", // // "--project_name", "test_babs_multi-ses_toybidsapp", @@ -56,6 +56,12 @@ // "--type_session", "multi-ses", // "--type_system", "sge" // ] + "args": [ + "--project-root", + "/home/faird/zhaoc/data/test_babs_multi-ses_toybidsapp""--container-config-yaml-file", + "/home/faird/zhaoc/babs_tests/notebooks/bidsapp-toybidsapp-0-0-7_task-rawBIDS_system-slurm_cluster-MSI_egConfig.yaml", + "--job-account" + ] // "args": [ // "--project-root", // "/cbica/projects/BABS/data/test_babs_multi-ses_toybidsapp", diff --git a/babs/babs.py b/babs/babs.py index af1745a5..1a25b894 100644 --- a/babs/babs.py +++ b/babs/babs.py @@ -290,8 +290,10 @@ def babs_bootstrap(self, input_ds, if system.type == "sge": gitignore_file.write("\n.SGE_datalad_lock") elif system.type == "slurm": - # TODO: add command for `slurm`!!! - print("Not supported yet... To work on...") + gitignore_file.write("\n.SLURM_datalad_lock") + else: + warnings.warn("Not supporting systems other than SGE or Slurm" + + " for '.gitignore'.") # not to track lock file: gitignore_file.write("\n" + "code/babs_proj_config.yaml.lock") # not to track `job_status.csv`: @@ -1336,8 +1338,13 @@ def babs_status(self, flags_resubmit, df_job_updated.at[i_job, "job_state_code"] = state_code # get the duration: if "duration" in df_all_job_status: + # e.g., slurm `squeue` automatically returns the duration, + # so no need to calcu again. duration = df_all_job_status.at[job_id_str, "duration"] else: + # This duration time may be slightly longer than actual + # time, as this is using current time, instead of + # the time when `qstat`/requesting job queue. duration = calcu_runtime( df_all_job_status.at[job_id_str, "JAT_start_time"]) df_job_updated.at[i_job, "duration"] = duration @@ -1471,7 +1478,8 @@ def babs_status(self, flags_resubmit, # message found in log files: job_name = log_filename.split(".*")[0] msg_job_account = \ - check_job_account(job_id_str, job_name, username_lowercase, self.type_system) + check_job_account(job_id_str, job_name, + username_lowercase, self.type_system) df_job_updated.at[i_job, "job_account"] = msg_job_account # Done: submitted jobs that not 'is_done' diff --git a/babs/utils.py b/babs/utils.py index 0b3014e8..ff5ad49c 100644 --- a/babs/utils.py +++ b/babs/utils.py @@ -1644,7 +1644,7 @@ def report_job_status(df, analysis_path, config_msg_alert): def request_all_job_status(type_system): """ This is to get all jobs' status - using `qstat` for SGE clusters and squeue for Slurm + using `qstat` for SGE clusters and `squeue` for Slurm Parameters: -------------- @@ -1693,46 +1693,82 @@ def _request_all_job_status_sge(): def _request_all_job_status_slurm(): """ This is to get all jobs' status for Slurm + by calling `squeue`. """ username = get_username() - sacct_proc = subprocess.run( + squeue_proc = subprocess.run( ["squeue", "-u", username, "-o", "%.18i %.9P %.8j %.8u %.2t %T %.10M"], stdout=subprocess.PIPE ) - std = sacct_proc.stdout.decode('utf-8') + std = squeue_proc.stdout.decode('utf-8') - sacct_out_df = _parsing_sacct_out(std) - return sacct_out_df + squeue_out_df = _parsing_squeue_out(std) + return squeue_out_df -def _parsing_sacct_out(sacct_std): - header_l = sacct_std.splitlines()[0].split() - datarows = sacct_std.splitlines()[1:] - - dict_ind = {"jobid": 0, "st": 4, "state": 5, "time": 6} - dict_val = dict((key, []) for key in dict_ind) - - - for fld in ["jobid", "st", "state", "time"]: - if header_l[dict_ind[fld]].lower() != fld: - raise Exception(f"error in the squeue output, expected {fld} and got {header_l[dict_ind[fld]].lower()}") - - for row in datarows: - if "." not in row.split()[0]: - for key, ind in dict_ind.items(): - dict_val[key].append(row.split()[ind]) - - # renaming the keys - dict_val["JB_job_number"] = dict_val.pop("jobid") - dict_val["@state"] =dict_val.pop("state") - dict_val["duration"] =dict_val.pop("time") - +def _parsing_squeue_out(squeue_std): + """ + This is to parse printed messages from `squeue` on Slurm clusters + and to convert Slurm codes to SGE codes - state_slurm2sge = {"R": "r", "PD": "qw"} - dict_val["state"] = [state_slurm2sge.get(sl_st, "NA") for sl_st in dict_val.pop("st")] + Parameters + ------------- + squeue_std: str + Standard output from running command `squeue` in terminal - df = pd.DataFrame(data=dict_val) - if dict_val["JB_job_number"]: + Returns + ----------- + df: pd.DataFrame + Job status based on `squeue` printed messages. + If there is no job in the queue, df will be an empty DataFrame + (i.e., Columns: [], Index: []) + """ + # Sanity check: if there is no job in queue: + if len(squeue_std.splitlines()) <= 1: + # there is only a header, no job is in queue: + df = pd.DataFrame(data=[]) # empty dataframe + else: # there are job(s) in queue (e.g., pending or running) + header_l = squeue_std.splitlines()[0].split() + datarows = squeue_std.splitlines()[1:] + + # column index of these column names: + # NOTE: this is hard coded! Please check out `_request_all_job_status_slurm()` + # for the format of printed messages from `squeue` + dict_ind = {"jobid": 0, "st": 4, "state": 5, "time": 6} + # initialize a dict for holding the values from all jobs: + # ROADMAP: pd.DataFrame is probably more memory efficient than dicts + dict_val = dict((key, []) for key in dict_ind) + + # sanity check: these fields show up in the header we got: + for fld in ["jobid", "st", "state", "time"]: + if header_l[dict_ind[fld]].lower() != fld: + raise Exception("error in the `squeue` output," + + f" expected {fld} and got {header_l[dict_ind[fld]].lower()}") + + for row in datarows: + if "." not in row.split()[0]: + for key, ind in dict_ind.items(): + dict_val[key].append(row.split()[ind]) + # e.g.: dict_val: {'jobid': ['157414586', '157414584'], + # 'st': ['PD', 'R'], 'state': ['PENDING', 'RUNNING'], 'time': ['0:00', '0:52']} + + # Renaming the keys, to be consistent with results got from SGE clusters: + dict_val["JB_job_number"] = dict_val.pop("jobid") + # change to lowercase, and rename the key: + dict_val["@state"] = [x.lower() for x in dict_val.pop("state")] + dict_val["duration"] = dict_val.pop("time") + # e.g.,: dict_val: {'st': ['PD', 'R'], 'JB_job_number': ['157414586', '157414584'], + # '@state': ['pending', 'running'], 'duration': ['0:00', '0:52']} + # NOTE: the 'duration' format might be slightly different from results from + # function `calcu_runtime()` used by SGE clusters. + + # job state mapping from slurm to sge: + state_slurm2sge = {"R": "r", "PD": "qw"} + dict_val["state"] = [state_slurm2sge.get(sl_st, "NA") for sl_st in dict_val.pop("st")] + # e.g.,: dict_val: {'JB_job_number': ['157414586', '157414584'], + # '@state': ['pending', 'running'], 'duration': ['0:00', '0:52'], 'state': ['qw', 'r']} + + df = pd.DataFrame(data=dict_val) df = df.set_index('JB_job_number') return df @@ -1749,13 +1785,23 @@ def calcu_runtime(start_time_str): Can be got via `df.at['2820901', 'JAT_start_time']` Example on CUBIC: '' - TODO: add type_system - Returns: ----------------- duration_time_str: str Duration time of running. Format: '0:00:05.050744' (i.e., ~5sec), '2 days, 0:00:00' + + Notes: + --------- + TODO: add type_system if needed + Currently we don't need to add `type_system`. Whether 'duration' has been returned + is checked before current function is called. + However the format of the duration that got from Slurm cluster might be a bit different from + what we get here. See examples in function `_parsing_squeue_out()` for Slurm clusters. + + This duration time may be slightly longer than actual + time, as this is using current time, instead of + the time when `qstat`/requesting job queue. """ # format of time in the job status requested: format_job_status = '%Y-%m-%dT%H:%M:%S' # format in `qstat` @@ -2005,28 +2051,88 @@ def _check_job_account_slurm(job_id_str, job_name, username_lowercase): """ get information for a finished job in Slurm by calling `sacct` """ - proc_qacct = subprocess.run( + msg_no_sacct = "BABS: sacct doesn't provide information about the job." + if_no_sacct = False + msg_more_than_one = "BABS: sacct detects more than one job for this job ID." + + len_char_jobid = 20 + len_char_jobname = 50 + + the_delimiter = "!" # use a special delimiter for easy parsing + # ^^ if parsing with default e.g., space: + # will have problem when State is "CANCELLED by 78382" - it will also be parsed out... + proc_sacct = subprocess.run( ["sacct", "-u", username_lowercase, - "-j", job_id_str], + "-j", job_id_str, + "--format=JobID%" + str(len_char_jobid) + "," + + "JobName%" + str(len_char_jobname) + ",State%30,ExitCode%15", + # ^^ specific format: column names and the number of chars + # e.g., '--format=JobID%20,JobName%50,State%30,ExitCode%15' + "--parsable2", # Output will be delimited without a delimiter at the end. + "--delimiter=" + the_delimiter], stdout=subprocess.PIPE ) + # ref: https://slurm.schedmd.com/sacct.html - proc_qacct.check_returncode() - msg_l = proc_qacct.stdout.decode('utf-8').split("\n") - msg_head = msg_l[0].split() - if "State" not in msg_head or "JobID" not in msg_head or "JobName" not in msg_head: - return "sacct doesn't provide information about the job" - - st_ind = msg_head.index("State") - jobid_ind = msg_head.index("JobID") - jobnm_ind = msg_head.index("JobName") - job_saact = msg_l[2].split() # the 2nd row should have the main job + proc_sacct.check_returncode() + # even if the job does not exist, there will still be printed msg from sacct, + # at least a header. So `check_returncode()` should always succeed. + msg_l = proc_sacct.stdout.decode('utf-8').split("\n") + msg_head = msg_l[0].split(the_delimiter) # list of column names - if job_saact[jobid_ind] != job_id_str or job_saact[jobnm_ind] != job_name: - return "sacct doesn't have the info for the specific job or the format is different" + # Check if there is any problem when calling `sacct` for this job: + if "State" not in msg_head or "JobID" not in msg_head or "JobName" not in msg_head: + if_no_sacct = True + if len(msg_l) <= 2 or msg_l[2] == '': + # if there is only header (len <= 2 or the 3rd element is empty): + if_no_sacct = True - return "sacct state: " + job_saact[st_ind] + if if_no_sacct: # there is no information about this job in sacct: + warnings.warn("`sacct` did not provide information about job " + job_id_str + + ", " + job_name) + print("Hint: check if the job is still in the queue," + " e.g., in state of pending, running, etc") + print("Hint: check if the username used for submitting this job" + + " was not current username '" + username_lowercase + "'") + msg_toreturn = msg_no_sacct + else: + # create a pd.DataFrame for printed messages from `sacct`: + df = pd.DataFrame(data=[], columns=msg_head) + msg_l_jobs = msg_l[1:] # only keeps rows for jobs + # ^^ NOTE: if using `--parsable2` and `--delimiter`, there is no 2nd line of "----" dashes + for i_row in range(0, len(msg_l_jobs)): + if msg_l_jobs[i_row] == '': # empty + pass + else: + # add to df: + df.loc[len(df)] = msg_l_jobs[i_row].split(the_delimiter) + + # find the row that matches the job id and job name + # i.e., without '.batch' or '.extern'; usually is the first line: + temp = df.index[(df["JobID"] == job_id_str) + & (df["JobName"] == job_name)].tolist() + if len(temp) == 0: # did not find the job: + warnings.warn("`sacct` did not provide information about job " + job_id_str + + ", " + job_name) + print("Hint: check if the job is still in the queue," + " e.g., in state of pending, running, etc") + print("Hint: check if the username used for submitting this job" + + " was not current username '" + username_lowercase + "'") + print("Hint: check if the job ID is more than " + str(len_char_jobid) + " chars," + " or job name is more than " + str(len_char_jobname) + " chars.") + msg_toreturn = msg_no_sacct + elif len(temp) > 1: # more than one matched: + warnings.warn("`sacct` detects more than one job for this job " + + job_id_str + + ", " + job_name) + print("Hint: check if the job ID is more than " + str(len_char_jobid) + " chars," + " or job name is more than " + str(len_char_jobname) + " chars.") + msg_toreturn = msg_more_than_one + else: # expected, only one: + msg_toreturn = "sacct: state: " \ + + df.loc[temp[0], "State"] # `temp[0]`: first and the only element from list `temp` + return msg_toreturn def _check_job_account_sge(job_id_str, job_name, username_lowercase): """ diff --git a/notebooks/testing_babs_run.py b/notebooks/testing_babs_run.py index c3c4850f..b79f1ffe 100644 --- a/notebooks/testing_babs_run.py +++ b/notebooks/testing_babs_run.py @@ -11,10 +11,10 @@ # ++++++++++++++++++++++++++++++++++++++++++++++++ flag_instance = "toybidsapp" -type_session = "single-ses" +type_session = "multi-ses" count = 1 -flag_where = "cubic" # "cubic" or "local" +flag_where = "msi" # "cubic" or "local" or "msi" # ++++++++++++++++++++++++++++++++++++++++++++++++ # where: @@ -22,6 +22,8 @@ where_project = "/cbica/projects/BABS/data" elif flag_where == "local": where_project = "/Users/chenyzh/Desktop/Research/Satterthwaite_Lab/datalad_wrapper/data" +elif flag_where == "msi": + where_project = "/home/faird/zhaoc/data" else: raise Exception("not valid `flag_where`!") @@ -36,6 +38,11 @@ else: raise Exception("not valid `flag_instance`!") +project_root = op.join(where_project, project_name) + +print("--project-root:") +print(project_root) + babs_project = op.join(where_project, project_name) # babs_submit_main()