Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tellread #149

Merged
merged 51 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
09dc316
initial add
charles-cowart Aug 11, 2024
3406cbf
initial cleanup
charles-cowart Aug 11, 2024
c5540f7
first pass at converting TELLREAD scripts
charles-cowart Aug 12, 2024
5bac0ff
Second pass at integrating tellread scripts
charles-cowart Aug 12, 2024
16ec417
third pass adding tellread
charles-cowart Aug 12, 2024
74cab5d
fourth pass
charles-cowart Aug 14, 2024
a6bde1e
Fifth pass, tested on qiita-rc and then refactored.
charles-cowart Sep 5, 2024
1d431a3
Manually merged with current master
charles-cowart Sep 16, 2024
7a84cd0
Manually merged with master
charles-cowart Sep 16, 2024
d347217
Merge branch 'master' into add_tellread
charles-cowart Sep 16, 2024
0236403
Updates based on testing in qiita-rc
charles-cowart Sep 17, 2024
64583a2
flake8
charles-cowart Sep 17, 2024
e7e7c54
Small fixes
charles-cowart Sep 19, 2024
497738f
Refactor KISSLoader to be more DRY.
charles-cowart Sep 24, 2024
ca71c1d
Pipeline.py updated to support changes in qp-klp
charles-cowart Oct 1, 2024
6cdc7ba
Version 2.0 of TellSeq support.
charles-cowart Oct 1, 2024
6818d44
Creation tests added for new TellReadJob() class.
charles-cowart Oct 4, 2024
baf35ea
flake8
charles-cowart Oct 6, 2024
56fc5be
New sample files added
charles-cowart Oct 7, 2024
3372651
Added optional parameter to Pipeline() class.
charles-cowart Oct 9, 2024
d883b7b
bugfix
charles-cowart Oct 9, 2024
a075cd9
Fixes error
charles-cowart Oct 14, 2024
80d672c
Merge branch 'biocore:master' into add_tellread
charles-cowart Oct 17, 2024
62734d8
Rewrote test
charles-cowart Oct 17, 2024
e7f2d08
Merge branch 'biocore:master' into add_tellread
charles-cowart Nov 2, 2024
45131f1
Updated branch to use new DFSheet() functionality
charles-cowart Nov 2, 2024
4665ee8
Updated to recent changes in metapool
charles-cowart Nov 4, 2024
3542df3
Update from testing
charles-cowart Nov 7, 2024
c2c3b06
Updates to TRIntegrateJob based on testing
charles-cowart Nov 14, 2024
49f1673
Updated sample config file
charles-cowart Nov 14, 2024
efc0849
Replaced legacy exit check for tellread
charles-cowart Nov 14, 2024
4727865
recent updates
charles-cowart Nov 16, 2024
ba1399f
Updated tests
charles-cowart Nov 19, 2024
fd1809b
Update setup.py to point to merged metapool updates
charles-cowart Nov 19, 2024
96f3cff
New tests for slurm polling
charles-cowart Nov 21, 2024
84edad5
Updates
charles-cowart Nov 21, 2024
8691147
Updates
charles-cowart Nov 21, 2024
487fc0c
flake8
charles-cowart Nov 21, 2024
5b56fd6
Merge branch 'master' into add_tellread
charles-cowart Nov 21, 2024
4276fc7
flake8 post merger
charles-cowart Nov 21, 2024
77c10b9
Fixed older test
charles-cowart Nov 21, 2024
a67a8a8
Minor update
charles-cowart Nov 24, 2024
eb36001
Remove lengthy comment
charles-cowart Nov 24, 2024
d69a0c3
fix test
charles-cowart Nov 24, 2024
1649d64
Updates based on feedback
charles-cowart Nov 25, 2024
6569039
Update based on feedback
charles-cowart Nov 25, 2024
81922b5
Added renamed file
charles-cowart Nov 28, 2024
01d77d6
Refactored sequence counting job
charles-cowart Dec 2, 2024
b718e8b
Update test based on randomness in output generation
charles-cowart Dec 2, 2024
a0ffb81
Updates based on feedback
charles-cowart Dec 2, 2024
0b7ce90
Common parse_log() method made default
charles-cowart Dec 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ git clone https://github.com/biocore/mg-scripts.git
Create a Python3 Conda environment in which to run the notebook:

```bash
conda create -n sp_pipeline 'python==3.9' numpy pandas click scipy matplotlib fastq-pair
conda create --yes -n spp python=${{ matrix.python-version }} scikit-learn pandas numpy nose pep8 flake8 matplotlib jupyter notebook 'seaborn>=0.7.1' pip openpyxl 'seqtk>=1.4' click scipy fastq-pair
charles-cowart marked this conversation as resolved.
Show resolved Hide resolved
```

Activate the Conda environment:
Expand Down Expand Up @@ -62,3 +62,9 @@ Please note that the setting 'minimap2_databases' is expected to be a list of pa
For NuQCJob, minimap2_databases is expected to be the path to a directory containing two subdirectories: 'metagenomic'
and 'metatranscriptomic'. Each directory should contain or symlink to the appropriate .mmi files needed for that Assay
type.

Additional TellSeq-related notes:
'spades-cloudspades-0.1', 'tellread-release-novaseqX' or similar directories must be placed in a location available to SPP.
Their paths should be made known to SPP in the configuration files. (See examples for details).
Additional scripts found in sequence_processing_pipeline/contrib were contributed by Daniel and Omar and can be similarly located and configured.

5 changes: 3 additions & 2 deletions sequence_processing_pipeline/Commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ def split_similar_size_bins(data_location_path, max_file_list_size_in_gb,
# is now the following:
# add one more level to account for project_names nested under ConvertJob
# dir.
fastq_paths = glob.glob(data_location_path + '*/*/*.fastq.gz')
# this will ignore the _I1_ reads that appear in the integrated result.
fastq_paths = glob.glob(data_location_path + '/*/*_R?_001.fastq.gz')
charles-cowart marked this conversation as resolved.
Show resolved Hide resolved

# convert from GB and halve as we sum R1
max_size = (int(max_file_list_size_in_gb) * (2 ** 30) / 2)
Expand Down Expand Up @@ -86,7 +87,7 @@ def demux(id_map, fp, out_d, task, maxtask):
"""Split infile data based in provided map"""
delimiter = '::MUX::'
mode = 'wt'
ext = '.fastq.gz'
ext = '_001.fastq.gz'
sep = '/'
rec = '@'

Expand Down
8 changes: 7 additions & 1 deletion sequence_processing_pipeline/ConvertJob.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,13 @@ def run(self, callback=None):
exec_from=self.log_path,
callback=callback)

self.copy_controls_between_projects()
# ConvertJob() is used to process Amplicon as well as Meta*Omic
# runs. Amplicon runs use a dummy sample-sheet generated by
# Pipeline(). For these types of sheets we can't copy controls
# between projects because demuxing is not performed here.
_, sheet_name = split(self.sample_sheet_path)
if sheet_name != 'dummy_sample_sheet.csv':
self.copy_controls_between_projects()

except JobFailedError as e:
# When a job has failed, parse the logs generated by this specific
Expand Down
174 changes: 114 additions & 60 deletions sequence_processing_pipeline/Job.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from jinja2 import BaseLoader, TemplateNotFound
from os.path import getmtime
import pathlib
from itertools import zip_longest
from os import makedirs, walk
from os.path import basename, exists, split, join
Expand All @@ -9,6 +12,25 @@
import logging
from inspect import stack
import re
from collections import Counter


# taken from https://jinja.palletsprojects.com/en/3.0.x/api/#jinja2.BaseLoader
class KISSLoader(BaseLoader):
def __init__(self, path):
# pin the path for loader to the location sequence_processing_pipeline
# (the location of this file), along w/the relative path to the
# templates directory.
self.path = join(pathlib.Path(__file__).parent.resolve(), path)

def get_source(self, environment, template):
path = join(self.path, template)
if not exists(path):
raise TemplateNotFound(template)
mtime = getmtime(path)
with open(path) as f:
source = f.read()
return source, path, lambda: mtime == getmtime(path)


class Job:
Expand All @@ -32,6 +54,7 @@ class Job:
slurm_status_running)

polling_interval_in_seconds = 60
squeue_retry_in_seconds = 10

def __init__(self, root_dir, output_path, job_name, executable_paths,
max_array_length, modules_to_load=None):
Expand Down Expand Up @@ -212,6 +235,41 @@ def _system_call(self, cmd, allow_return_codes=[], callback=None):

return {'stdout': stdout, 'stderr': stderr, 'return_code': return_code}

def _query_slurm(self, job_ids):
# query_slurm encapsulates the handling of squeue.
count = 0
while True:
result = self._system_call("squeue -t all -j "
f"{','.join(job_ids)} "
"-o '%i,%T'")

if result['return_code'] == 0:
# there was no issue w/squeue, break this loop and
# continue.
break
else:
# there was likely an intermittent issue w/squeue. Pause
# and wait before trying a few more times. If the problem
# persists then report the error and exit.
count += 1

if count > 3:
raise ExecFailedError(result['stderr'])

sleep(Job.squeue_retry_in_seconds)

lines = result['stdout'].split('\n')
lines.pop(0) # remove header
lines = [x.split(',') for x in lines if x != '']

jobs = {}
for job_id, state in lines:
# ensure unique_id is of type string for downstream use.
job_id = str(job_id)
jobs[job_id] = state

return jobs

def wait_on_job_ids(self, job_ids, callback=None):
'''
Wait for the given job-ids to finish running before returning.
Expand All @@ -229,63 +287,27 @@ def wait_on_job_ids(self, job_ids, callback=None):
# ensure all ids are strings to ensure proper working w/join().
job_ids = [str(x) for x in job_ids]

def query_slurm(job_ids):
# internal function query_slurm encapsulates the handling of
# squeue.
count = 0
while True:
result = self._system_call("squeue -t all -j "
f"{','.join(job_ids)} "
"-o '%F,%A,%T'")

if result['return_code'] == 0:
# there was no issue w/squeue, break this loop and
# continue.
break
else:
# there was a likely intermittent issue w/squeue. Pause
# and wait before trying a few more times. If the problem
# persists then report the error and exit.
count += 1

if count > 3:
raise ExecFailedError(result['stderr'])

sleep(60)

lines = result['stdout'].split('\n')
lines.pop(0) # remove header
lines = [x.split(',') for x in lines if x != '']

jobs = {}
child_jobs = {}
for job_id, unique_id, state in lines:
jobs[unique_id] = state

if unique_id != job_id:
child_jobs[unique_id] = job_id # job is a child job

return jobs, child_jobs

while True:
jobs, child_jobs = query_slurm(job_ids)

for jid in job_ids:
logging.debug("JOB %s: %s" % (jid, jobs[jid]))
if callback is not None:
callback(jid=jid, status=jobs[jid])

children = [x for x in child_jobs if child_jobs[x] == jid]
if len(children) == 0:
logging.debug("\tNO CHILDREN")
for cid in children:
logging.debug("\tCHILD JOB %s: %s" % (cid, jobs[cid]))
status = [jobs[x] in Job.slurm_status_not_running for x in job_ids]

if set(status) == {True}:
# all jobs either completed successfully or terminated.
# Because query_slurm only returns state on the job-ids we specify,
# the wait process is a simple check to see whether any of the
# states are 'running' states or not.
jobs = self._query_slurm(job_ids)

# jobs will be a dict of job-ids or array-ids for jobs that
# are array-jobs. the value of jobs[id] will be a state e.g.:
# 'RUNNING', 'FAILED', 'COMPLETED'.
states = [jobs[x] in Job.slurm_status_not_running for x in jobs]

if set(states) == {True}:
# if all the states are either FAILED or COMPLETED
# then the set of those states no matter how many
# array-jobs there were will ultimately be the set of
# {True}. If not then that means there are still jobs
# that are running.
break

logging.debug(f"sleeping {Job.polling_interval_in_seconds} "
"seconds...")
sleep(Job.polling_interval_in_seconds)

return jobs
Expand Down Expand Up @@ -345,16 +367,48 @@ def submit_job(self, script_path, job_parameters=None,
# to the user.
results = self.wait_on_job_ids([job_id], callback=callback)

job_result = {'job_id': job_id, 'job_state': results[job_id]}
if job_id in results:
# job is a non-array job
job_result = {'job_id': job_id, 'job_state': results[job_id]}
else:
# job is an array job
# assume all array jobs in this case will be associated w/job_id.
counts = Counter()
for array_id in results:
counts[results[array_id]] += 1

# for array jobs we won't be returning a string representing the
# state of a single job. Instead we're returning a dictionary of
# the number of unique states the set of array-jobs ended up in and
# the number for each one.
job_result = {'job_id': job_id, 'job_state': dict(counts)}

if callback is not None:
callback(jid=job_id, status=job_result['job_state'])
if isinstance(job_result['job_state'], dict):
# this is an array job
states = []
for key in counts:
states.append(f"{key}: {counts[key]}")

if job_result['job_state'] == 'COMPLETED':
return job_result
callback(jid=job_id, status=", ".join(states))

else:
# this is a standard job
callback(jid=job_id, status=job_result['job_state'])

if isinstance(job_result['job_state'], dict):
states = list(job_result['job_state'].keys())
if states == ['COMPLETED']:
return job_result
else:
raise JobFailedError(f"job {job_id} exited with jobs in the "
f"following states: {', '.join(states)}")
else:
raise JobFailedError(f"job {job_id} exited with status "
f"{job_result['job_state']}")
if job_result['job_state'] == 'COMPLETED':
return job_result
else:
raise JobFailedError(f"job {job_id} exited with status "
f"{job_result['job_state']}")

def _group_commands(self, cmds):
# break list of commands into chunks of max_array_length (Typically
Expand Down
Loading
Loading