Skip to content

Commit

Permalink
Add tellread (#149)
Browse files Browse the repository at this point in the history
* initial add

* initial cleanup

* first pass at converting TELLREAD scripts

* Second pass at integrating tellread scripts

* third pass adding tellread

* fourth pass

* Fifth pass, tested on qiita-rc and then refactored.

* Manually merged with current master

* Manually merged with master

* Updates based on testing in qiita-rc

* flake8

* Small fixes

* Refactor KISSLoader to be more DRY.

* Pipeline.py updated to support changes in qp-klp

* Version 2.0 of TellSeq support.

Version 2.0 of TellSeq support removes the master tellread.sh script and
the drop-in replacement TRConvertJob.py for Job()s that wrap individual
steps in the original script. These steps can be used in whole or in
part in varying order in the refactored SPP plugin (qp-klp).

* Creation tests added for new TellReadJob() class.

* flake8

* New sample files added

* Added optional parameter to Pipeline() class.

Added optional parameter to Pipeline() class that overwrites the values in the lane column of a sample-sheet's data section.
This functionality used to reside in the qp-klp plugin and is a common usage pattern.
This allows SPP to override the value in a sample-sheet's lane column with the value provided by the user at submission time.

* bugfix

* Fixes error

Fixes error found when post-processing adapter-trimmed fastq files.
All files were being moved into one of the project sub-folders, rather than into their associated folders.
This appears to be due to recent implementation change.
All files are now moved into their correct folder.

* Rewrote test

* Updated branch to use new DFSheet() functionality

* Updated to recent changes in metapool

* Update from testing

* Updates to TRIntegrateJob based on testing

* Updated sample config file

* Replaced legacy exit check for tellread

* recent updates

* Updated tests

* Update setup.py to point to merged metapool updates

* New tests for slurm polling

* Updates

* Updates

* flake8

* flake8 post merger

* Fixed older test

* Minor update

* Remove lengthy comment

* fix test

* Updates based on feedback

* Update based on feedback

* Added renamed file

* Refactored sequence counting job

Request from Antonio to make TRNormCountsJob more generalized for
current and upcoming work.
TRNormCountsJob replaced w/SeqCountsJob:
 * takes a list of paths to fastq and/or fastq.gz files.
 * runs seqtk to count sequences and bases in parallel.
 * aggregator code produces a json file of counts from log output.

* Update test based on randomness in output generation

* Updates based on feedback

* Common parse_log() method made default
  • Loading branch information
charles-cowart authored Dec 3, 2024
1 parent 2c94f2a commit b6fb6ff
Show file tree
Hide file tree
Showing 55 changed files with 2,610 additions and 368 deletions.
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ git clone https://github.com/biocore/mg-scripts.git
Create a Python3 Conda environment in which to run the notebook:

```bash
conda create -n sp_pipeline 'python==3.9' numpy pandas click scipy matplotlib fastq-pair
conda create --yes -n spp python='python=3.9' scikit-learn pandas numpy nose pep8 flake8 matplotlib jupyter notebook 'seaborn>=0.7.1' pip openpyxl 'seqtk>=1.4' click scipy fastq-pair
```

Activate the Conda environment:
Expand Down Expand Up @@ -62,3 +62,9 @@ Please note that the setting 'minimap2_databases' is expected to be a list of pa
For NuQCJob, minimap2_databases is expected to be the path to a directory containing two subdirectories: 'metagenomic'
and 'metatranscriptomic'. Each directory should contain or symlink to the appropriate .mmi files needed for that Assay
type.

Additional TellSeq-related notes:
'spades-cloudspades-0.1', 'tellread-release-novaseqX' or similar directories must be placed in a location available to SPP.
Their paths should be made known to SPP in the configuration files. (See examples for details).
Additional scripts found in sequence_processing_pipeline/contrib were contributed by Daniel and Omar and can be similarly located and configured.

6 changes: 6 additions & 0 deletions sequence_processing_pipeline/Commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,14 @@ def split_similar_size_bins(data_location_path, max_file_list_size_in_gb,
# is now the following:
# add one more level to account for project_names nested under ConvertJob
# dir.
# this will ignore the _I1_ reads that appear in the integrated result.
fastq_paths = glob.glob(data_location_path + '*/*/*.fastq.gz')

# case-specific filter for TellSeq output directories that also contain
# _I1_ files. Ensure paths are still sorted afterwards.
fastq_paths = [x for x in fastq_paths if '_I1_001.fastq.gz' not in x]
fastq_paths = sorted(fastq_paths)

# convert from GB and halve as we sum R1
max_size = (int(max_file_list_size_in_gb) * (2 ** 30) / 2)

Expand Down
9 changes: 8 additions & 1 deletion sequence_processing_pipeline/ConvertJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,13 @@ def run(self, callback=None):
exec_from=self.log_path,
callback=callback)

self.copy_controls_between_projects()
# ConvertJob() is used to process Amplicon as well as Meta*Omic
# runs. Amplicon runs use a dummy sample-sheet generated by
# Pipeline(). For these types of sheets we can't copy controls
# between projects because demuxing is not performed here.
_, sheet_name = split(self.sample_sheet_path)
if sheet_name != 'dummy_sample_sheet.csv':
self.copy_controls_between_projects()

except JobFailedError as e:
# When a job has failed, parse the logs generated by this specific
Expand All @@ -169,6 +175,7 @@ def run(self, callback=None):
logging.info(f'Successful job: {job_info}')

def parse_logs(self):
# overrides Job.parse_logs() w/tailored parse for specific logs.
log_path = join(self.output_path, 'Logs')
errors = join(log_path, 'Errors.log')

Expand Down
17 changes: 0 additions & 17 deletions sequence_processing_pipeline/FastQCJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
from functools import partial
from json import dumps
import logging
import glob


class FastQCJob(Job):
Expand Down Expand Up @@ -305,19 +304,3 @@ def _generate_job_script(self):

with open(sh_details_fp, 'w') as f:
f.write('\n'.join(self.commands))

def parse_logs(self):
log_path = join(self.output_path, 'logs')
files = sorted(glob.glob(join(log_path, '*.out')))
msgs = []

for some_file in files:
with open(some_file, 'r') as f:
msgs += [line for line in f.readlines()
# note 'error' is not same
# requirement as found in QCJob.
# ('error:'). This is a very
# generalized filter.
if 'error' in line.lower()]

return [msg.strip() for msg in msgs]
187 changes: 126 additions & 61 deletions sequence_processing_pipeline/Job.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from jinja2 import BaseLoader, TemplateNotFound
from os.path import getmtime
import pathlib
from itertools import zip_longest
from os import makedirs, walk
from os.path import basename, exists, split, join
Expand All @@ -9,6 +12,26 @@
import logging
from inspect import stack
import re
from collections import Counter
from glob import glob


# taken from https://jinja.palletsprojects.com/en/3.0.x/api/#jinja2.BaseLoader
class KISSLoader(BaseLoader):
def __init__(self, path):
# pin the path for loader to the location sequence_processing_pipeline
# (the location of this file), along w/the relative path to the
# templates directory.
self.path = join(pathlib.Path(__file__).parent.resolve(), path)

def get_source(self, environment, template):
path = join(self.path, template)
if not exists(path):
raise TemplateNotFound(template)
mtime = getmtime(path)
with open(path) as f:
source = f.read()
return source, path, lambda: mtime == getmtime(path)


class Job:
Expand All @@ -32,6 +55,7 @@ class Job:
slurm_status_running)

polling_interval_in_seconds = 60
squeue_retry_in_seconds = 10

def __init__(self, root_dir, output_path, job_name, executable_paths,
max_array_length, modules_to_load=None):
Expand Down Expand Up @@ -103,7 +127,17 @@ def run(self):
raise PipelineError("Base class run() method not implemented.")

def parse_logs(self):
raise PipelineError("Base class parse_logs() method not implemented.")
# by default, look for anything to parse in the logs directory.
log_path = join(self.output_path, 'logs')
files = sorted(glob(join(log_path, '*')))
msgs = []

for some_file in files:
with open(some_file, 'r') as f:
msgs += [line for line in f.readlines()
if 'error:' in line.lower()]

return [msg.strip() for msg in msgs]

def _which(self, file_path, modules_to_load=None):
"""
Expand Down Expand Up @@ -212,6 +246,41 @@ def _system_call(self, cmd, allow_return_codes=[], callback=None):

return {'stdout': stdout, 'stderr': stderr, 'return_code': return_code}

def _query_slurm(self, job_ids):
# query_slurm encapsulates the handling of squeue.
count = 0
while True:
result = self._system_call("squeue -t all -j "
f"{','.join(job_ids)} "
"-o '%i,%T'")

if result['return_code'] == 0:
# there was no issue w/squeue, break this loop and
# continue.
break
else:
# there was likely an intermittent issue w/squeue. Pause
# and wait before trying a few more times. If the problem
# persists then report the error and exit.
count += 1

if count > 3:
raise ExecFailedError(result['stderr'])

sleep(Job.squeue_retry_in_seconds)

lines = result['stdout'].split('\n')
lines.pop(0) # remove header
lines = [x.split(',') for x in lines if x != '']

jobs = {}
for job_id, state in lines:
# ensure unique_id is of type string for downstream use.
job_id = str(job_id)
jobs[job_id] = state

return jobs

def wait_on_job_ids(self, job_ids, callback=None):
'''
Wait for the given job-ids to finish running before returning.
Expand All @@ -229,63 +298,27 @@ def wait_on_job_ids(self, job_ids, callback=None):
# ensure all ids are strings to ensure proper working w/join().
job_ids = [str(x) for x in job_ids]

def query_slurm(job_ids):
# internal function query_slurm encapsulates the handling of
# squeue.
count = 0
while True:
result = self._system_call("squeue -t all -j "
f"{','.join(job_ids)} "
"-o '%F,%A,%T'")

if result['return_code'] == 0:
# there was no issue w/squeue, break this loop and
# continue.
break
else:
# there was a likely intermittent issue w/squeue. Pause
# and wait before trying a few more times. If the problem
# persists then report the error and exit.
count += 1

if count > 3:
raise ExecFailedError(result['stderr'])

sleep(60)

lines = result['stdout'].split('\n')
lines.pop(0) # remove header
lines = [x.split(',') for x in lines if x != '']

jobs = {}
child_jobs = {}
for job_id, unique_id, state in lines:
jobs[unique_id] = state

if unique_id != job_id:
child_jobs[unique_id] = job_id # job is a child job

return jobs, child_jobs

while True:
jobs, child_jobs = query_slurm(job_ids)

for jid in job_ids:
logging.debug("JOB %s: %s" % (jid, jobs[jid]))
if callback is not None:
callback(jid=jid, status=jobs[jid])

children = [x for x in child_jobs if child_jobs[x] == jid]
if len(children) == 0:
logging.debug("\tNO CHILDREN")
for cid in children:
logging.debug("\tCHILD JOB %s: %s" % (cid, jobs[cid]))
status = [jobs[x] in Job.slurm_status_not_running for x in job_ids]

if set(status) == {True}:
# all jobs either completed successfully or terminated.
# Because query_slurm only returns state on the job-ids we specify,
# the wait process is a simple check to see whether any of the
# states are 'running' states or not.
jobs = self._query_slurm(job_ids)

# jobs will be a dict of job-ids or array-ids for jobs that
# are array-jobs. the value of jobs[id] will be a state e.g.:
# 'RUNNING', 'FAILED', 'COMPLETED'.
states = [jobs[x] in Job.slurm_status_not_running for x in jobs]

if set(states) == {True}:
# if all the states are either FAILED or COMPLETED
# then the set of those states no matter how many
# array-jobs there were will ultimately be the set of
# {True}. If not then that means there are still jobs
# that are running.
break

logging.debug(f"sleeping {Job.polling_interval_in_seconds} "
"seconds...")
sleep(Job.polling_interval_in_seconds)

return jobs
Expand Down Expand Up @@ -345,16 +378,48 @@ def submit_job(self, script_path, job_parameters=None,
# to the user.
results = self.wait_on_job_ids([job_id], callback=callback)

job_result = {'job_id': job_id, 'job_state': results[job_id]}
if job_id in results:
# job is a non-array job
job_result = {'job_id': job_id, 'job_state': results[job_id]}
else:
# job is an array job
# assume all array jobs in this case will be associated w/job_id.
counts = Counter()
for array_id in results:
counts[results[array_id]] += 1

# for array jobs we won't be returning a string representing the
# state of a single job. Instead we're returning a dictionary of
# the number of unique states the set of array-jobs ended up in and
# the number for each one.
job_result = {'job_id': job_id, 'job_state': dict(counts)}

if callback is not None:
callback(jid=job_id, status=job_result['job_state'])
if isinstance(job_result['job_state'], dict):
# this is an array job
states = []
for key in counts:
states.append(f"{key}: {counts[key]}")

if job_result['job_state'] == 'COMPLETED':
return job_result
callback(jid=job_id, status=", ".join(states))

else:
# this is a standard job
callback(jid=job_id, status=job_result['job_state'])

if isinstance(job_result['job_state'], dict):
states = list(job_result['job_state'].keys())
if states == ['COMPLETED']:
return job_result
else:
raise JobFailedError(f"job {job_id} exited with jobs in the "
f"following states: {', '.join(states)}")
else:
raise JobFailedError(f"job {job_id} exited with status "
f"{job_result['job_state']}")
if job_result['job_state'] == 'COMPLETED':
return job_result
else:
raise JobFailedError(f"job {job_id} exited with status "
f"{job_result['job_state']}")

def _group_commands(self, cmds):
# break list of commands into chunks of max_array_length (Typically
Expand Down
Loading

0 comments on commit b6fb6ff

Please sign in to comment.