Skip to content

Commit

Permalink
Updated FastQCJob to use Jinja2 template.
Browse files Browse the repository at this point in the history
MultiQCJob support removed.
  • Loading branch information
charles-cowart committed Jan 17, 2025
1 parent dd39ff0 commit 4fd1fd5
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 107 deletions.
125 changes: 38 additions & 87 deletions sequence_processing_pipeline/FastQCJob.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
from os import listdir, makedirs
from os.path import exists, join, basename
from sequence_processing_pipeline.Job import Job
from sequence_processing_pipeline.PipelineError import (PipelineError,
JobFailedError)
from functools import partial
from jinja2 import Environment
from json import dumps
import logging
from os import listdir, makedirs
from os.path import join, basename
from sequence_processing_pipeline.Job import Job, KISSLoader
from sequence_processing_pipeline.PipelineError import (PipelineError,
JobFailedError)


class FastQCJob(Job):
def __init__(self, run_dir, output_path, raw_fastq_files_path,
processed_fastq_files_path, nprocs, nthreads, fastqc_path,
modules_to_load, qiita_job_id, queue_name, node_count,
wall_time_limit, jmem, pool_size, multiqc_config_file_path,
wall_time_limit, jmem, pool_size,
max_array_length, is_amplicon):
super().__init__(run_dir,
output_path,
Expand All @@ -36,16 +37,18 @@ def __init__(self, run_dir, output_path, raw_fastq_files_path,

self.job_script_path = join(self.output_path, f"{self.job_name}.sh")

self._file_check(multiqc_config_file_path)
self.multiqc_config_file_path = multiqc_config_file_path

self.project_names = []
self.commands, self.project_names = self._get_commands()
# for lists greater than n commands, chain the extra commands,
# distributing them evenly throughout the first n commands.
self.commands = self._group_commands(self.commands)
self.suffix = 'fastqc.html'

# for projects that use sequence_processing_pipeline as a dependency,
# jinja_env must be set to sequence_processing_pipeline's root path,
# rather than the project's root path.
self.jinja_env = Environment(loader=KISSLoader('templates'))

self._generate_job_script()

def _get_commands(self):
Expand Down Expand Up @@ -217,90 +220,38 @@ def run(self, callback=None):

logging.debug(job_info)

# If project-level reports were not needed, MultiQC could simply be
# given the path to the run-directory itself and it will discover all
# of the relevant data files. Confirmed that for a one-project sample-
# sheet, this produces and equivalent report.

for project in self.project_names:
# MultiQC doesn't like input paths that don't exist. Simply add
# all paths that do exist as input.
input_path_list = []
p_path = partial(join, self.output_path, 'fastqc')

for filter_type in ['bclconvert', 'trimmed_sequences',
'filtered_sequences', 'amplicon']:
input_path_list.append(p_path(project, filter_type))

input_path_list.append(p_path(project, 'Reports'))

p_path = partial(join, self.processed_fastq_files_path, project)
input_path_list.append(p_path('fastp_reports_dir', 'json'))

# I don't usually see a json directory associated with raw data.
# It looks to be metadata coming directly off the machine, in the
# few instances I've seen it in /sequencing...
p_path = partial(join, self.raw_fastq_files_path, project)
input_path_list.append(p_path('json'))

input_path_list = [x for x in input_path_list if exists(x)]

cmd_head = ['multiqc', '-c', self.multiqc_config_file_path,
'--fullnames', '--force']

# --interactive graphs is set to True in MultiQC configuration
# file and hence this switch was redunant and now removed.
cmd_tail = ['-o', join(self.output_path, 'multiqc', project)]

cmd = ' '.join(cmd_head + input_path_list + cmd_tail)

results = self._system_call(cmd, callback=callback)

if results['return_code'] != 0:
raise PipelineError("multiqc encountered an error")

if self._get_failed_indexes(job_info['job_id']):
# raise error if list isn't empty.
raise PipelineError("FastQCJob did not complete successfully.")

def _generate_job_script(self):
lines = []

details_file_name = f'{self.job_name}.array-details'
sh_details_fp = join(self.output_path, details_file_name)
# bypass generating job script for a force-fail job, since it is
# not needed.
if self.force_job_fail:
return None

lines.append("#!/bin/bash")
template = self.jinja_env.get_template("fastqc_job.sh")

job_name = f'{self.qiita_job_id}_{self.job_name}'
lines.append(f"#SBATCH --job-name {job_name}")
lines.append(f"#SBATCH -p {self.queue_name}")
lines.append(f"#SBATCH -N {self.node_count}")
lines.append(f"#SBATCH -n {self.nprocs}")
lines.append("#SBATCH --time %d" % self.wall_time_limit)
lines.append(f"#SBATCH --mem {self.jmem}")
lines.append("#SBATCH --array 1-%d%%%d" % (
len(self.commands), self.pool_size))

lines.append("set -x")
lines.append("set +e")
lines.append('date')
lines.append('hostname')
lines.append('echo ${SLURM_JOBID} ${SLURM_ARRAY_TASK_ID}')
lines.append(f'cd {self.output_path}')

if self.modules_to_load:
lines.append("module load " + ' '.join(self.modules_to_load))

lines.append('offset=${SLURM_ARRAY_TASK_ID}')
lines.append('step=$(( $offset - 0 ))')
lines.append(f'cmd0=$(head -n $step {sh_details_fp} | tail -n 1)')
lines.append('eval $cmd0')

sentinel_file = f'{self.job_name}_$step.completed'
lines.append(f'echo "Cmd Completed: $cmd0" > logs/{sentinel_file}')

with open(self.job_script_path, 'w') as f:
f.write('\n'.join(lines))

with open(sh_details_fp, 'w') as f:
details_file_name = f'{self.job_name}.array-details'
array_details = join(self.output_path, details_file_name)
array_params = "1-%d%%%d" % (len(self.commands), self.pool_size)
modules_to_load = ' '.join(self.modules_to_load)

with open(self.job_script_path, mode="w", encoding="utf-8") as f:
f.write(template.render(job_name=job_name,
array_details=array_details,
queue_name=self.queue_name,
node_count=self.node_count,
nprocs=self.nprocs,
wall_time_limit=self.wall_time_limit,
mem_in_gb=self.jmem,
array_params=array_params,
output_path=self.output_path,
modules_to_load=modules_to_load))

# save the .details file as well
with open(array_details, 'w') as f:
f.write('\n'.join(self.commands))

return self.job_script_path
23 changes: 23 additions & 0 deletions sequence_processing_pipeline/templates/fastqc_job.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/bin/bash
#SBATCH -J {{job_name}}
#SBATCH -p {{queue_name}}
#SBATCH -N {{node_count}}
#SBATCH -n {{nprocs}}
#SBATCH --time {{wall_time_limit}}
#SBATCH --mem {{mem_in_gb}}G
#SBATCH --array {{array_params}}
#SBATCH --constraint="intel"
set -x
set +e
date
hostname
echo ${SLURM_JOBID} ${SLURM_ARRAY_TASK_ID}
cd {{output_path}}
{% if modules_to_load is defined %}
module load {{modules_to_load}}
{% endif %}
offset=${SLURM_ARRAY_TASK_ID}
step=$(( $offset - 0 ))
cmd0=$(head -n $step {{array_details}} | tail -n 1)
eval $cmd0
echo "Cmd Completed: $cmd0" > logs/FastQCJob_$step.completed
26 changes: 6 additions & 20 deletions sequence_processing_pipeline/tests/test_FastQCJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -576,28 +576,14 @@ def tearDown(self):

rmtree(zero_path)

def test_config_file_not_found(self):
with self.assertRaises(PipelineError) as e:
FastQCJob(self.qc_root_path, self.output_path,
self.raw_fastq_files_path.replace('/project1', ''),
self.processed_fastq_files_path, 16, 16,
'sequence_processing_pipeline/tests/bin/fastqc', [],
self.qiita_job_id, 'queue_name', 4, 23, '8g', 30,
('sequence_processing_pipeline/'
'not-multiqc-bclconvert-config.yaml'), 1000, False)

self.assertEqual(str(e.exception), "file 'sequence_processing_pipeline"
"/not-multiqc-bclconvert-config."
"yaml' does not exist.")

def test_generate_job_scripts(self):
job = FastQCJob(self.qc_root_path, self.output_path,
self.raw_fastq_files_path.replace('/project1', ''),
self.processed_fastq_files_path,
16, 16,
'sequence_processing_pipeline/tests/bin/fastqc', [],
self.qiita_job_id, 'queue_name', 4, 23, '8g', 30,
self.config_yml, 1000, False)
1000, False)

self.assertEqual(exists(join(job.output_path, 'FastQCJob.sh')), True)
self.assertEqual(exists(join(job.output_path,
Expand All @@ -610,7 +596,7 @@ def test_audit(self):
16, 16,
'sequence_processing_pipeline/tests/bin/fastqc', [],
self.qiita_job_id, 'queue_name', 4, 23, '8g', 30,
self.config_yml, 1000, False)
1000, False)

obs = job.audit(self.sample_ids)

Expand Down Expand Up @@ -1044,7 +1030,7 @@ def test_all_zero_files(self):
16, 16,
'sequence_processing_pipeline/tests/bin/fastqc', [],
self.qiita_job_id, 'queue_name', 4, 23, '8g', 30,
self.config_yml, 1000, False)
1000, False)

self.assertEqual(str(e.exception), "There are no fastq files for "
"FastQCJob to process in sequence"
Expand All @@ -1059,7 +1045,7 @@ def test_completed_file_generation(self):
16, 16,
'sequence_processing_pipeline/tests/bin/fastqc', [],
self.qiita_job_id, 'queue_name', 4, 23, '8g', 30,
self.config_yml, 1000, False)
1000, False)

my_path = join(self.output_path, 'FastQCJob', 'logs')

Expand All @@ -1079,7 +1065,7 @@ def test_completed_file_generation_some_failures(self):
16, 16,
'sequence_processing_pipeline/tests/bin/fastqc', [],
self.qiita_job_id, 'queue_name', 4, 23, '8g', 30,
self.config_yml, 1000, False)
1000, False)

my_path = join(self.output_path, 'FastQCJob', 'logs')

Expand Down Expand Up @@ -1115,7 +1101,7 @@ def test_error_msg_from_logs(self):
16, 16,
'sequence_processing_pipeline/tests/bin/fastqc', [],
self.qiita_job_id, 'queue_name', 4, 23, '8g', 30,
self.config_yml, 1000, False)
1000, False)

self.assertFalse(job is None)

Expand Down

0 comments on commit 4fd1fd5

Please sign in to comment.