diff --git a/sequence_processing_pipeline/FastQCJob.py b/sequence_processing_pipeline/FastQCJob.py index f5a5555..4ac272c 100644 --- a/sequence_processing_pipeline/FastQCJob.py +++ b/sequence_processing_pipeline/FastQCJob.py @@ -1,18 +1,19 @@ -from os import listdir, makedirs -from os.path import exists, join, basename -from sequence_processing_pipeline.Job import Job -from sequence_processing_pipeline.PipelineError import (PipelineError, - JobFailedError) from functools import partial +from jinja2 import Environment from json import dumps import logging +from os import listdir, makedirs +from os.path import join, basename +from sequence_processing_pipeline.Job import Job, KISSLoader +from sequence_processing_pipeline.PipelineError import (PipelineError, + JobFailedError) class FastQCJob(Job): def __init__(self, run_dir, output_path, raw_fastq_files_path, processed_fastq_files_path, nprocs, nthreads, fastqc_path, modules_to_load, qiita_job_id, queue_name, node_count, - wall_time_limit, jmem, pool_size, multiqc_config_file_path, + wall_time_limit, jmem, pool_size, max_array_length, is_amplicon): super().__init__(run_dir, output_path, @@ -36,9 +37,6 @@ def __init__(self, run_dir, output_path, raw_fastq_files_path, self.job_script_path = join(self.output_path, f"{self.job_name}.sh") - self._file_check(multiqc_config_file_path) - self.multiqc_config_file_path = multiqc_config_file_path - self.project_names = [] self.commands, self.project_names = self._get_commands() # for lists greater than n commands, chain the extra commands, @@ -46,6 +44,11 @@ def __init__(self, run_dir, output_path, raw_fastq_files_path, self.commands = self._group_commands(self.commands) self.suffix = 'fastqc.html' + # for projects that use sequence_processing_pipeline as a dependency, + # jinja_env must be set to sequence_processing_pipeline's root path, + # rather than the project's root path. + self.jinja_env = Environment(loader=KISSLoader('templates')) + self._generate_job_script() def _get_commands(self): @@ -217,90 +220,38 @@ def run(self, callback=None): logging.debug(job_info) - # If project-level reports were not needed, MultiQC could simply be - # given the path to the run-directory itself and it will discover all - # of the relevant data files. Confirmed that for a one-project sample- - # sheet, this produces and equivalent report. - - for project in self.project_names: - # MultiQC doesn't like input paths that don't exist. Simply add - # all paths that do exist as input. - input_path_list = [] - p_path = partial(join, self.output_path, 'fastqc') - - for filter_type in ['bclconvert', 'trimmed_sequences', - 'filtered_sequences', 'amplicon']: - input_path_list.append(p_path(project, filter_type)) - - input_path_list.append(p_path(project, 'Reports')) - - p_path = partial(join, self.processed_fastq_files_path, project) - input_path_list.append(p_path('fastp_reports_dir', 'json')) - - # I don't usually see a json directory associated with raw data. - # It looks to be metadata coming directly off the machine, in the - # few instances I've seen it in /sequencing... - p_path = partial(join, self.raw_fastq_files_path, project) - input_path_list.append(p_path('json')) - - input_path_list = [x for x in input_path_list if exists(x)] - - cmd_head = ['multiqc', '-c', self.multiqc_config_file_path, - '--fullnames', '--force'] - - # --interactive graphs is set to True in MultiQC configuration - # file and hence this switch was redunant and now removed. - cmd_tail = ['-o', join(self.output_path, 'multiqc', project)] - - cmd = ' '.join(cmd_head + input_path_list + cmd_tail) - - results = self._system_call(cmd, callback=callback) - - if results['return_code'] != 0: - raise PipelineError("multiqc encountered an error") - if self._get_failed_indexes(job_info['job_id']): # raise error if list isn't empty. raise PipelineError("FastQCJob did not complete successfully.") def _generate_job_script(self): - lines = [] - - details_file_name = f'{self.job_name}.array-details' - sh_details_fp = join(self.output_path, details_file_name) + # bypass generating job script for a force-fail job, since it is + # not needed. + if self.force_job_fail: + return None - lines.append("#!/bin/bash") + template = self.jinja_env.get_template("fastqc_job.sh") job_name = f'{self.qiita_job_id}_{self.job_name}' - lines.append(f"#SBATCH --job-name {job_name}") - lines.append(f"#SBATCH -p {self.queue_name}") - lines.append(f"#SBATCH -N {self.node_count}") - lines.append(f"#SBATCH -n {self.nprocs}") - lines.append("#SBATCH --time %d" % self.wall_time_limit) - lines.append(f"#SBATCH --mem {self.jmem}") - lines.append("#SBATCH --array 1-%d%%%d" % ( - len(self.commands), self.pool_size)) - - lines.append("set -x") - lines.append("set +e") - lines.append('date') - lines.append('hostname') - lines.append('echo ${SLURM_JOBID} ${SLURM_ARRAY_TASK_ID}') - lines.append(f'cd {self.output_path}') - - if self.modules_to_load: - lines.append("module load " + ' '.join(self.modules_to_load)) - - lines.append('offset=${SLURM_ARRAY_TASK_ID}') - lines.append('step=$(( $offset - 0 ))') - lines.append(f'cmd0=$(head -n $step {sh_details_fp} | tail -n 1)') - lines.append('eval $cmd0') - - sentinel_file = f'{self.job_name}_$step.completed' - lines.append(f'echo "Cmd Completed: $cmd0" > logs/{sentinel_file}') - - with open(self.job_script_path, 'w') as f: - f.write('\n'.join(lines)) - - with open(sh_details_fp, 'w') as f: + details_file_name = f'{self.job_name}.array-details' + array_details = join(self.output_path, details_file_name) + array_params = "1-%d%%%d" % (len(self.commands), self.pool_size) + modules_to_load = ' '.join(self.modules_to_load) + + with open(self.job_script_path, mode="w", encoding="utf-8") as f: + f.write(template.render(job_name=job_name, + array_details=array_details, + queue_name=self.queue_name, + node_count=self.node_count, + nprocs=self.nprocs, + wall_time_limit=self.wall_time_limit, + mem_in_gb=self.jmem, + array_params=array_params, + output_path=self.output_path, + modules_to_load=modules_to_load)) + + # save the .details file as well + with open(array_details, 'w') as f: f.write('\n'.join(self.commands)) + + return self.job_script_path diff --git a/sequence_processing_pipeline/templates/fastqc_job.sh b/sequence_processing_pipeline/templates/fastqc_job.sh new file mode 100644 index 0000000..8b79706 --- /dev/null +++ b/sequence_processing_pipeline/templates/fastqc_job.sh @@ -0,0 +1,23 @@ +#!/bin/bash +#SBATCH -J {{job_name}} +#SBATCH -p {{queue_name}} +#SBATCH -N {{node_count}} +#SBATCH -n {{nprocs}} +#SBATCH --time {{wall_time_limit}} +#SBATCH --mem {{mem_in_gb}}G +#SBATCH --array {{array_params}} +#SBATCH --constraint="intel" +set -x +set +e +date +hostname +echo ${SLURM_JOBID} ${SLURM_ARRAY_TASK_ID} +cd {{output_path}} +{% if modules_to_load is defined %} + module load {{modules_to_load}} +{% endif %} +offset=${SLURM_ARRAY_TASK_ID} +step=$(( $offset - 0 )) +cmd0=$(head -n $step {{array_details}} | tail -n 1) +eval $cmd0 +echo "Cmd Completed: $cmd0" > logs/FastQCJob_$step.completed \ No newline at end of file diff --git a/sequence_processing_pipeline/tests/test_FastQCJob.py b/sequence_processing_pipeline/tests/test_FastQCJob.py index a229129..93bd478 100644 --- a/sequence_processing_pipeline/tests/test_FastQCJob.py +++ b/sequence_processing_pipeline/tests/test_FastQCJob.py @@ -576,20 +576,6 @@ def tearDown(self): rmtree(zero_path) - def test_config_file_not_found(self): - with self.assertRaises(PipelineError) as e: - FastQCJob(self.qc_root_path, self.output_path, - self.raw_fastq_files_path.replace('/project1', ''), - self.processed_fastq_files_path, 16, 16, - 'sequence_processing_pipeline/tests/bin/fastqc', [], - self.qiita_job_id, 'queue_name', 4, 23, '8g', 30, - ('sequence_processing_pipeline/' - 'not-multiqc-bclconvert-config.yaml'), 1000, False) - - self.assertEqual(str(e.exception), "file 'sequence_processing_pipeline" - "/not-multiqc-bclconvert-config." - "yaml' does not exist.") - def test_generate_job_scripts(self): job = FastQCJob(self.qc_root_path, self.output_path, self.raw_fastq_files_path.replace('/project1', ''), @@ -597,7 +583,7 @@ def test_generate_job_scripts(self): 16, 16, 'sequence_processing_pipeline/tests/bin/fastqc', [], self.qiita_job_id, 'queue_name', 4, 23, '8g', 30, - self.config_yml, 1000, False) + 1000, False) self.assertEqual(exists(join(job.output_path, 'FastQCJob.sh')), True) self.assertEqual(exists(join(job.output_path, @@ -610,7 +596,7 @@ def test_audit(self): 16, 16, 'sequence_processing_pipeline/tests/bin/fastqc', [], self.qiita_job_id, 'queue_name', 4, 23, '8g', 30, - self.config_yml, 1000, False) + 1000, False) obs = job.audit(self.sample_ids) @@ -1044,7 +1030,7 @@ def test_all_zero_files(self): 16, 16, 'sequence_processing_pipeline/tests/bin/fastqc', [], self.qiita_job_id, 'queue_name', 4, 23, '8g', 30, - self.config_yml, 1000, False) + 1000, False) self.assertEqual(str(e.exception), "There are no fastq files for " "FastQCJob to process in sequence" @@ -1059,7 +1045,7 @@ def test_completed_file_generation(self): 16, 16, 'sequence_processing_pipeline/tests/bin/fastqc', [], self.qiita_job_id, 'queue_name', 4, 23, '8g', 30, - self.config_yml, 1000, False) + 1000, False) my_path = join(self.output_path, 'FastQCJob', 'logs') @@ -1079,7 +1065,7 @@ def test_completed_file_generation_some_failures(self): 16, 16, 'sequence_processing_pipeline/tests/bin/fastqc', [], self.qiita_job_id, 'queue_name', 4, 23, '8g', 30, - self.config_yml, 1000, False) + 1000, False) my_path = join(self.output_path, 'FastQCJob', 'logs') @@ -1115,7 +1101,7 @@ def test_error_msg_from_logs(self): 16, 16, 'sequence_processing_pipeline/tests/bin/fastqc', [], self.qiita_job_id, 'queue_name', 4, 23, '8g', 30, - self.config_yml, 1000, False) + 1000, False) self.assertFalse(job is None)