diff --git a/short-read-mngs/Dockerfile b/short-read-mngs/Dockerfile index 7ba339c5..f84798c8 100644 --- a/short-read-mngs/Dockerfile +++ b/short-read-mngs/Dockerfile @@ -141,5 +141,24 @@ WORKDIR / COPY idseq-dag /tmp/idseq-dag RUN pip3 install /tmp/idseq-dag && rm -rf /tmp/idseq-dag +RUN apt-get -y update && apt-get install -y build-essential libz-dev git python3-pip cmake + +WORKDIR /tmp +RUN git clone --recursive https://github.com/mlin/minimap2-scatter.git +WORKDIR /tmp/minimap2-scatter +RUN make minimap2 +RUN mv /tmp/minimap2-scatter/minimap2/minimap2 /usr/local/bin/minimap2 + +WORKDIR /tmp +RUN git clone https://github.com/morsecodist/diamond +WORKDIR /tmp/diamond +RUN git checkout minimal-mods +WORKDIR /tmp/diamond/build +RUN cmake -DCMAKE_BUILD_TYPE=Release .. +RUN make -j6 +RUN mv diamond /usr/local/bin + +RUN curl -Ls https://github.com/chanzuckerberg/s3parcp/releases/download/v0.2.0-alpha/s3parcp_0.2.0-alpha_Linux_x86_64.tar.gz | tar -C /usr/bin -xz s3parcp + COPY idseq_utils /tmp/idseq_utils RUN pip3 install /tmp/idseq_utils && rm -rf /tmp/idseq_utils \ No newline at end of file diff --git a/short-read-mngs/idseq_utils/idseq_utils/__init__.py b/short-read-mngs/idseq_utils/idseq_utils/__init__.py index 040faf6c..618c0680 100644 --- a/short-read-mngs/idseq_utils/idseq_utils/__init__.py +++ b/short-read-mngs/idseq_utils/idseq_utils/__init__.py @@ -1,2 +1,2 @@ -''' idseq_utils ''' +""" idseq_utils """ __version__ = "EXTERNALLY_MANAGED" diff --git a/short-read-mngs/idseq_utils/idseq_utils/batch_run_helpers.py b/short-read-mngs/idseq_utils/idseq_utils/batch_run_helpers.py new file mode 100644 index 00000000..ebee2d0f --- /dev/null +++ b/short-read-mngs/idseq_utils/idseq_utils/batch_run_helpers.py @@ -0,0 +1,129 @@ +import time +import random +import boto3 +import json +import requests +import logging +from botocore.exceptions import ClientError + +log = logging.getLogger(__name__) + +MAX_CHUNKS_IN_FLIGHT = 10 + + +def get_batch_job_desc_bucket(): + try: + account_id = boto3.client("sts").get_caller_identity()["Account"] + except ClientError: + account_id = requests.get( + "http://169.254.169.254/latest/dynamic/instance-identity/document" + ).json()["accountId"] + return f"aegea-batch-jobs-{account_id}" + + +class BatchJobFailed(Exception): + pass + + +def _log_alignment_batch_job_status( + job_id, job_queue, job_definition, chunk_id, status, alignment_algorithm +): + log.info( + "alignment_batch_job_status", + extra={ + "job_id": job_id, + "chunk_id": chunk_id, + "job_queue": job_queue, + "job_definition": job_definition, + "status": status, + "alignment_algorithm": alignment_algorithm, + }, + ) + + +def _get_job_status(job_id): + batch_job_desc_bucket = boto3.resource("s3").Bucket(get_batch_job_desc_bucket()) + key = f"job_descriptions/{job_id}" + try: + job_desc_object = batch_job_desc_bucket.Object(key) + return json.loads(job_desc_object.get()["Body"].read())["status"] + except ClientError as e: + if e.response["Error"]["Code"] == "NoSuchKey": + # Warn that the object is missing so any issue with the s3 mechanism can be identified + log.debug("missing_job_description_ojbect", extra={key: key}) + # Return submitted because a missing job status probably means it hasn't been added yet + return "SUBMITTED" + else: + raise e + + +def _run_batch_job(job_name, job_queue, job_definition, environment, chunk_id, alignment_algorithm, retries): + client = boto3.client("batch") + response = client.submit_job( + jobName=job_name, + jobQueue=job_queue, + jobDefinition=job_definition, + containerOverrides={ + "environment": environment, + }, + retryStrategy={"attempts": retries}, + ) + job_id = response["jobId"] + _log_alignment_batch_job_status( + job_id, job_queue, job_definition, chunk_id, "SUBMITTED", alignment_algorithm + ) + + delay = 60 + random.randint( + -60 // 2, 60 // 2 + ) # Add some noise to de-synchronize chunks + status = "SUBMITTED" + # the job this is monitoring has an timeout and the job this runs in has a timeout + while True: + try: + status = _get_job_status(job_id) + except ClientError as e: + # If we get throttled, randomly wait to de-synchronize the requests + if e.response["Error"]["Code"] == "TooManyRequestsException": + log.warn("describe_jobs_rate_limit_error", extra={"job_id": job_id}) + # Possibly implement a backoff here if throttling becomes an issue + else: + log.error( + "unexpected_client_error_while_polling_job_status", + extra={"job_id": job_id}, + ) + raise e + + if status == "SUCCEEDED": + _log_alignment_batch_job_status( + job_id, job_queue, job_definition, chunk_id, status, alignment_algorithm + ) + return job_id + if status == "FAILED": + log.error( + "alignment_batch_job_failed", + extra={ + "job_id": job_id, + "chunk_id": chunk_id, + "alignment_algorithm": alignment_algorithm, + }, + ) + _log_alignment_batch_job_status( + job_id, job_queue, job_definition, chunk_id, status, alignment_algorithm + ) + raise BatchJobFailed("chunk alignment failed") + time.sleep(delay) + + +def _db_chunks(bucket: str, prefix): + s3_client = boto3.client("s3") + paginator = s3_client.get_paginator("list_objects_v2") + log.debug("db chunks") + + result = paginator.paginate( + Bucket=bucket, + Prefix=prefix, + ) + + for page in result: + for obj in page["Contents"]: + yield obj["Key"] diff --git a/short-read-mngs/idseq_utils/idseq_utils/diamond_scatter.py b/short-read-mngs/idseq_utils/idseq_utils/diamond_scatter.py new file mode 100644 index 00000000..55677869 --- /dev/null +++ b/short-read-mngs/idseq_utils/idseq_utils/diamond_scatter.py @@ -0,0 +1,247 @@ +import os +import shutil +import sys +import errno + +from argparse import ArgumentParser +from glob import glob +from multiprocessing import Pool +from os.path import abspath, basename, join +from subprocess import run, PIPE +from tempfile import NamedTemporaryFile, TemporaryDirectory +from typing import Iterable + +from Bio import SeqIO +from Bio.Seq import Seq +from Bio.SeqRecord import SeqRecord + + +class DiamondBlastXException(Exception): + pass + + +class DiamondJoinException(Exception): + pass + +################################################################################################################ +# +# Diamond +# +################################################################################################################ + + +def diamond_makedb(cwd: str, reference_fasta: str, database: str): + cmd = [ + "diamond", + "makedb", + "--in", + reference_fasta, + "--db", + database, + ] + run(cmd, check=True, cwd=cwd, stdout=PIPE, stderr=PIPE) + + +def diamond_blastx( + cwd: str, + par_tmpdir: str, + block_size: float, + database: str, + out: str, + queries: Iterable[str], + chunk=False, + join_chunks=0, +): + cmd = [ + "diamond", + "blastx", + "--multiprocessing", + "--parallel-tmpdir", + par_tmpdir, + "--block-size", + str(block_size), + "--db", + database, + "--out", + out, + ] + for query in queries: + cmd += ["--query", query] + if chunk: + cmd += ["--single-chunk"] + if join_chunks > 0: + cmd += ["--join-chunks", str(join_chunks)] + res = run(cmd, cwd=cwd, stdout=PIPE, stderr=PIPE) + if res.returncode != 0: + for line in res.stderr.decode().split("\n"): + print(line) + raise DiamondBlastXException(f"Command {' '.join(cmd)} failed with error: {res.stderr.decode()}") + + +################################################################################################################ +# +# Main +# +################################################################################################################ + + +def _consume_iter(iterable: Iterable, n: int): + for i, e in enumerate(iterable): + yield e + if i == n - 1: + break + + +def align_chunk(ref_chunk: int, start: int, size: int, query_chunk: int): + return f"{ref_chunk} {start} {size} # query_chunk={query_chunk}\n" + + +def zero_pad(n: int, m: int): + tagged = str(n) + return ("0" * (m - len(tagged))) + tagged + + +def make_par_dir(cwd: str, par_tmpdir: str): + os.mkdir(join(cwd, par_tmpdir)) + p_dir = join(cwd, par_tmpdir, "parallelizer") + os.mkdir(p_dir) + with open(join(p_dir, "command"), "w"): + pass + with open(join(p_dir, "register"), "w"): + pass + with open(join(p_dir, "workers"), "w"): + pass + + +def make_db(reference_fasta: str, output_dir: str, chunks: int): + os.mkdir(output_dir) + chunk_size = (sum(1 for _ in SeqIO.parse(reference_fasta, "fasta")) // chunks) + 1 + seqs = SeqIO.parse(reference_fasta, "fasta") + for i in range(chunks): + print(f"STARTING CHUNK {i}") + fasta_name = f"{i}.fasta" + SeqIO.write(_consume_iter(seqs, chunk_size), fasta_name, "fasta") + n_seqs = n_letters = 0 + for seq in SeqIO.parse(fasta_name, "fasta"): + n_seqs += 1 + n_letters += len(seq.seq) + db_name = f"{i}-{n_seqs}-{n_letters}" + print(f"INDEXING CHUNK {i}") + diamond_makedb(".", fasta_name, join(output_dir, db_name)) + os.remove(fasta_name) + print(f"COMPLETED CHUNK {i}") + + +def blastx_chunk(db_chunk: str, output_dir: str, *query: str): + try: + os.mkdir(output_dir) + except OSError as e: + if e.errno != errno.EEXIST: + raise + chunk, n_seqs, n_letters = basename(db_chunk)[:-5].split("-") + with TemporaryDirectory() as tmp_dir: + make_par_dir(tmp_dir, "par-tmp") + with open(join(tmp_dir, "par-tmp", f"align_todo_{zero_pad(0, 6)}"), "w") as f: + f.writelines([align_chunk(int(chunk), 0, int(n_seqs), 0)]) + diamond_blastx( + cwd=tmp_dir, + par_tmpdir="par-tmp", + block_size=int(n_letters) / 1e9, + database=abspath(db_chunk), + out="out.tsv", + chunk=True, + queries=(abspath(q) for q in query), + ) + ref_block_name = f"ref_block_{zero_pad(0, 6)}_{zero_pad(int(chunk), 6)}" + ref_dict_name = f"ref_dict_{zero_pad(0, 6)}_{zero_pad(int(chunk), 6)}" + shutil.copy( + join(tmp_dir, "par-tmp", ref_block_name), join(output_dir, ref_block_name) + ) + shutil.copy( + join(tmp_dir, "par-tmp", ref_dict_name), join(output_dir, ref_dict_name) + ) + + +def mock_reference_fasta(chunks: int, chunk_size: int): + letters = chunk = i = 0 + while chunk < chunks: + n = 100 + letters += n + if letters > (1 + chunk) * chunk_size: + chunk += 1 + yield SeqRecord(Seq("M" * n), "A") + i += 1 + + +def blastx_join(chunk_dir: str, out: str, *query: str): + with TemporaryDirectory() as tmp_dir: + make_par_dir(tmp_dir, "par-tmp") + with open(join(tmp_dir, "par-tmp", f"join_todo_{zero_pad(0, 6)}"), "w") as f: + f.write("TOKEN\n") + + for f in os.listdir(chunk_dir): + shutil.copy(join(chunk_dir, f), join(tmp_dir, "par-tmp", f)) + + chunks = len(os.listdir(chunk_dir)) // 2 + with NamedTemporaryFile() as ref_fasta, NamedTemporaryFile() as db: + # make fake db to appease diamond + SeqIO.write(SeqRecord(Seq("M"), "ID"), ref_fasta.name, "fasta") + diamond_makedb(tmp_dir, ref_fasta.name, db.name) + diamond_blastx( + cwd=tmp_dir, + par_tmpdir="par-tmp", + block_size=1, + database=db.name, + out=out, + join_chunks=chunks, + queries=(abspath(q) for q in query), + ) + + with open(out, "w") as out_f: + for out_chunk in glob(join(tmp_dir, f"{out}_*")): + with open(out_chunk) as out_chunk_f: + out_f.writelines(out_chunk_f) + os.remove(out_chunk) + + +if __name__ == "__main__": + parser = ArgumentParser() + subparsers = parser.add_subparsers(title="commands", dest="command") + + make_db_parser = subparsers.add_parser("make-db") + make_db_parser.add_argument("--db", required=True) + make_db_parser.add_argument("--in", required=True) + make_db_parser.add_argument("--chunks", type=int, required=True) + + blastx_chunk_parser = subparsers.add_parser("blastx-chunk") + blastx_chunk_parser.add_argument("--db", required=True) + blastx_chunk_parser.add_argument("--out-dir", required=True) + blastx_chunk_parser.add_argument("--query", required=True, action="append") + + blastx_chunks_parser = subparsers.add_parser("blastx-chunks") + blastx_chunks_parser.add_argument("--db-dir", required=True) + blastx_chunks_parser.add_argument("--out-dir", required=True) + blastx_chunks_parser.add_argument("--query", required=True, action="append") + + blastx_join_parser = subparsers.add_parser("blastx-join") + blastx_join_parser.add_argument("--chunk-dir", required=True) + blastx_join_parser.add_argument("--out", required=True) + blastx_join_parser.add_argument("--query", required=True, action="append") + + args = parser.parse_args(sys.argv[1:]) + if args.command == "make-db": + make_db(args.__getattribute__("in"), args.db, args.chunks) + elif args.command == "blastx-chunk": + blastx_chunk(args.db, args.out_dir, *args.query) + elif args.command == "blastx-chunks": + + def _blastx_chunk(db): + print(f"STARTING: {db}") + res = blastx_chunk(join(args.db_dir, db), args.out_dir, *args.query) + print(f"FINISHING: {db}") + return res + + with Pool(48) as p: + p.map(_blastx_chunk, os.listdir(args.db_dir)) + elif args.command == "blastx-join": + blastx_join(args.chunk_dir, args.out, *args.query) diff --git a/short-read-mngs/idseq_utils/idseq_utils/minimap2_scatter.py b/short-read-mngs/idseq_utils/idseq_utils/minimap2_scatter.py new file mode 100644 index 00000000..46ec694f --- /dev/null +++ b/short-read-mngs/idseq_utils/idseq_utils/minimap2_scatter.py @@ -0,0 +1,159 @@ +import os +import shutil +import sys +import errno +from argparse import ArgumentParser +from os.path import abspath, basename, join +from subprocess import run, PIPE +from tempfile import TemporaryDirectory + + +class Minimap2MergeException(Exception): + pass +################################################################################################################ +# +# Minimap2 +# +################################################################################################################ + + +# TODO: add minimap2 make database here + + +def minimap2_alignment(cwd, par_tmpdir, cpus, database, out, queries): + cmd = [ + "/usr/local/bin/minimap2", + "-cx", + "sr", + f"-t {cpus}", + "--split-map", + out, + f"{database}", + ] + for q in queries: + print(q) + cmd += [q] + res = run(cmd, cwd=cwd, stdout=PIPE, stderr=PIPE) + if res.returncode != 0: + for line in res.stderr.decode().split("\n"): + print(line) + raise Exception(f"Command failed: {' '.join(cmd)}") + + +def minimap2_merge_cmd(cwd, par_tmpdir, chunks, queries): + cmd = ["minimap2", "-cx", "sr", "--split-merge", "-o", f"{par_tmpdir}/out.paf"] + for query in queries: + cmd += [query] + + cmd += [ + ".", + ] + for chunk in chunks: + cmd += [f"{chunk}"] + + res = run(cmd, cwd=cwd, stdout=PIPE, stderr=PIPE) + if res.returncode != 0: + for line in res.stderr.decode().split("\n"): + print(line) + raise Minimap2MergeException(f"Command {' '.join(cmd)} failed with result: {res.stderr.decode()}") + + +################################################################################################################ +# +# Main +# +################################################################################################################ + + +def zero_pad(n: int, i: int): + tagged = str(n) + return ("0" * (i - len(tagged))) + tagged + + +def make_par_dir(cwd: str, par_tmpdir: str): + os.mkdir(join(cwd, par_tmpdir)) + p_dir = join(cwd, par_tmpdir, "parallelizer") + os.mkdir(p_dir) + with open(join(p_dir, "command"), "w"): + pass + with open(join(p_dir, "register"), "w"): + pass + with open(join(p_dir, "workers"), "w"): + pass + + +def minimap2_chunk(db_chunk: str, output_dir: str, *query: str): + """ + Run a single chunk of the database using minimap2-scatter + This is no longer used, we should consider removing + """ + + # make output directory + try: + os.mkdir(output_dir) + except OSError as e: + if e.errno != errno.EEXIST: + raise + + # get chunk + + # for diamond: chunk, n_seqs, n_letters = basename(db_chunk)[:-5].split("-") + chunk = basename(db_chunk).split("_")[-1] # example: nt.part_001 + + # Don't really understand this temp dir thing? + with TemporaryDirectory() as tmp_dir: + make_par_dir(tmp_dir, "par-tmp") + with open(join(tmp_dir, "par-tmp", f"align_todo_{zero_pad(0, 6)}"), "w") as f: + f.writelines([f"Aligning chunk f{chunk}"]) + + minimap2_alignment( + cwd=tmp_dir, + par_tmpdir="par-tmp", + cpus=7, + database=abspath(db_chunk), + out=f"intermediate{chunk}", + queries=[abspath(q) for q in query], + ) + shutil.copy( + join(tmp_dir, f"intermediate{chunk}"), + join(output_dir, f"intermediate{chunk}"), + ) + + +def minimap2_merge(chunk_dir, out, *query): + chunk_dir = abspath(chunk_dir) + with TemporaryDirectory() as tmp_dir: + make_par_dir(tmp_dir, "par-tmp") + with open(join(tmp_dir, "par-tmp", f"join_todo_{zero_pad(0, 6)}"), "w") as f: + f.write("TOKEN\n") + chunks = [] + query_tmp = [] + for q in query: + shutil.copy(q, join(tmp_dir, basename(q))) + query_tmp.append(join(tmp_dir, basename(q))) + for f in os.listdir(chunk_dir): + shutil.copy(join(chunk_dir, f), join(tmp_dir, "par-tmp", f)) + chunks.append(join(tmp_dir, "par-tmp", f)) + minimap2_merge_cmd(tmp_dir, "par-tmp", chunks, query_tmp) + shutil.copy(join(tmp_dir, "par-tmp", "out.paf"), out) + + +if __name__ == "__main__": + parser = ArgumentParser() + subparsers = parser.add_subparsers(title="commands", dest="command") + + minimap2_chunk_parser = subparsers.add_parser("minimap2-chunk") + minimap2_chunk_parser.add_argument("--db", required=True) + minimap2_chunk_parser.add_argument("--out-dir", required=True) + minimap2_chunk_parser.add_argument("--query", required=True, action="append") + + minimap2_join_parser = subparsers.add_parser("minimap2-merge") + minimap2_join_parser.add_argument("--chunk-dir", required=True) + minimap2_join_parser.add_argument("--out", required=True) + minimap2_join_parser.add_argument("--query", required=True, action="append") + + args = parser.parse_args(sys.argv[1:]) + if args.command == "minimap2-chunk": + minimap2_chunk(args.db, args.out_dir, *args.query) + elif args.command == "minimap2-merge": + minimap2_merge(args.chunk_dir, args.out, *args.query) diff --git a/short-read-mngs/idseq_utils/idseq_utils/paf2blast6.py b/short-read-mngs/idseq_utils/idseq_utils/paf2blast6.py new file mode 100644 index 00000000..bae18f34 --- /dev/null +++ b/short-read-mngs/idseq_utils/idseq_utils/paf2blast6.py @@ -0,0 +1,105 @@ +import pandas as pd +import subprocess +import os.path +import shlex +import sys +import math +import re + +nonmatch_pattern = re.compile(r"NM:i:(\d+);") +cigar_pattern = re.compile(r"cg:Z:([A-Za-z0-9]+)") + + +class QualityCalculations: + def __init__(self, genome_size=832400799511): + self._k = 0.1 + self._lambda = 1.58 + self.genome_size = genome_size + + def calc_bitscore(self, alen, nonmatch): + score = alen - 2 * nonmatch + return (score * self._lambda - math.log(self._k)) / math.log(2.0) + + def calc_evalue(self, alen, nonmatch): + score = alen - 2 * nonmatch + return self._k * alen * self.genome_size * math.exp(-self._lambda * score) + + def calc_gap_openings(self, cigar): + go = 0 + for char in cigar: + if char == "I" or char == "D": + go += 1 + return go + + +def standardize_paf(paf_file): + """paf files can have variable number of optional key-value pairs""" + out = subprocess.run(shlex.split("sed 's/\t/;/13g' -i {}".format(paf_file))) + + assert out.returncode == 0 + + +def main(paf_file): + name = os.path.basename(paf_file.replace(".paf", "")) + standardize_paf(paf_file) + df = pd.read_csv( + paf_file, + delimiter="\t", + header=None, + names=[ + "qname", + "qlen", + "qstart", + "qend", + "strand", + "tname", + "tlen", + "tstart", + "tend", + "nmatch", + "alen", + "mapq", + "other", + ], + ) + df["nonmatch"] = df.other.map( + lambda x: int(re.search(nonmatch_pattern, x).group(1)) + ) + qc = QualityCalculations() + df["gap_openings"] = df.other.map( + lambda x: qc.calc_gap_openings(re.search(cigar_pattern, x).group(1)) + ) + df["bitscore"] = [ + qc.calc_bitscore(a, n) for a, n in zip(df["alen"], df["nonmatch"]) + ] + df["evalue"] = [qc.calc_evalue(a, n) for a, n in zip(df["alen"], df["nonmatch"])] + df["percent_ident"] = [ + (nmatch / a) * 100 for a, nmatch in zip(df["alen"], df["nmatch"]) + ] + df = df.round({"bitscore": 3, "percent_ident": 3}) + blast = df.loc[ + :, + [ + "qname", + "tname", + "percent_ident", + "alen", + "nonmatch", + "gap_openings", + "qstart", + "qend", + "tstart", + "tend", + "evalue", + "bitscore", + ], + ] + blast["qstart"] = blast["qstart"] + 1 + blast["tstart"] = blast["tstart"] + 1 + + blast.to_csv(f"{name}_frompaf.m8", sep="\t", index=None, header=False) + + +if __name__ == "__main__": + paf_file = sys.argv[1] + main(paf_file) diff --git a/short-read-mngs/idseq_utils/idseq_utils/run_diamond.py b/short-read-mngs/idseq_utils/idseq_utils/run_diamond.py new file mode 100644 index 00000000..0d844a6c --- /dev/null +++ b/short-read-mngs/idseq_utils/idseq_utils/run_diamond.py @@ -0,0 +1,79 @@ +import os +import re +from os.path import basename, join +from subprocess import run +import logging +from urllib.parse import urlparse +from multiprocessing import Pool + +from idseq_utils.diamond_scatter import blastx_join +from idseq_utils.batch_run_helpers import _run_batch_job, _db_chunks + +log = logging.getLogger(__name__) + +MAX_CHUNKS_IN_FLIGHT = 10 + + +def _run_chunk( + chunk_id: int, input_dir: str, chunk_dir: str, db_chunk: str, *queries: str +): + deployment_environment = os.environ["DEPLOYMENT_ENVIRONMENT"] + priority_name = os.environ.get("PRIORITY_NAME", "normal") + alignment_algorithm = "diamond" + provisioning_model = "EC2" + pattern = r"s3://.+/samples/([0-9]+)/([0-9]+)/" + m = re.match(pattern, input_dir) + if m: + project_id, sample_id = m.group(1), m.group(2) + else: + project_id, sample_id = "0", "0" + + job_name = (f"idseq-{deployment_environment}-{alignment_algorithm}-" + f"project-{project_id}-sample-{sample_id}-part-{chunk_id}") + job_queue = f"idseq-{deployment_environment}-{alignment_algorithm}-{provisioning_model}-{priority_name}" + job_definition = f"idseq-{deployment_environment}-{alignment_algorithm}" + + environment = [ + { + "name": "DB_CHUNK", + "value": db_chunk, + }, + { + "name": "OUTPUT_DIR", + "value": chunk_dir, + }, + ] + + for i, query in enumerate(queries): + environment.append( + { + "name": f"QUERY_{i}", + "value": join(input_dir, basename(query)), + } + ) + + _run_batch_job( + job_name=job_name, + job_queue=job_queue, + job_definition=job_definition, + environment=environment, + chunk_id=chunk_id, + alignment_algorithm=alignment_algorithm, + retries=2, + ) + + +def run_diamond( + input_dir: str, chunk_dir: str, db_path: str, result_path: str, *queries: str +): + parsed_url = urlparse(db_path, allow_fragments=False) + bucket = parsed_url.netloc + prefix = parsed_url.path.lstrip("/") + chunks = ( + [chunk_id, input_dir, chunk_dir, f"s3://{bucket}/{db_chunk}", *queries] + for chunk_id, db_chunk in enumerate(_db_chunks(bucket, prefix)) + ) + with Pool(MAX_CHUNKS_IN_FLIGHT) as p: + p.starmap(_run_chunk, chunks) + run(["s3parcp", "--recursive", chunk_dir, "chunks"], check=True) + blastx_join("chunks", result_path, *queries) diff --git a/short-read-mngs/idseq_utils/idseq_utils/run_minimap2.py b/short-read-mngs/idseq_utils/idseq_utils/run_minimap2.py new file mode 100644 index 00000000..8a5d4c6c --- /dev/null +++ b/short-read-mngs/idseq_utils/idseq_utils/run_minimap2.py @@ -0,0 +1,85 @@ +import os +import re +from os.path import basename, join +from subprocess import run +import logging +from urllib.parse import urlparse +from multiprocessing import Pool +from idseq_utils.minimap2_scatter import minimap2_merge +from idseq_utils.batch_run_helpers import _run_batch_job, _db_chunks + +log = logging.getLogger(__name__) + +ALIGNMENT_ALGORITHM = "minimap2" +MAX_CHUNKS_IN_FLIGHT = 5 + + +def _run_chunk( + chunk_id: int, input_dir: str, chunk_dir: str, extra_args: str, db_chunk: str, *queries: str +): + deployment_environment = os.environ["DEPLOYMENT_ENVIRONMENT"] + priority_name = os.environ.get("PRIORITY_NAME", "normal") + alignment_algorithm = "minimap2" + provisioning_model = "EC2" + pattern = r"s3://.+/samples/([0-9]+)/([0-9]+)/" + m = re.match(pattern, input_dir) + if m: + project_id, sample_id = m.group(1), m.group(2) + else: + project_id, sample_id = "0", "0" + + job_name = ( + f"idseq-{deployment_environment}-{alignment_algorithm}-" + f"project-{project_id}-sample-{sample_id}-part-{chunk_id}" + ) + job_queue = f"idseq-{deployment_environment}-{alignment_algorithm}-{provisioning_model}-{priority_name}" + job_definition = f"idseq-{deployment_environment}-{alignment_algorithm}" + print(job_name) + environment = [ + { + "name": "DB_CHUNK", + "value": db_chunk, + }, + { + "name": "OUTPUT_DIR", + "value": chunk_dir, + }, + { + "name": "MINIMAP2_ARGS", + "value": extra_args + } + ] + + for i, query in enumerate(queries): + environment.append( + { + "name": f"QUERY_{i}", + "value": join(input_dir, basename(query)), + } + ) + + _run_batch_job( + job_name=job_name, + job_queue=job_queue, + job_definition=job_definition, + environment=environment, + chunk_id=chunk_id, + alignment_algorithm=alignment_algorithm, + retries=2, + ) + + +def run_minimap2( + input_dir: str, chunk_dir: str, db_path: str, result_path: str, minimap2_args: str, *queries: str +): + parsed_url = urlparse(db_path, allow_fragments=False) + bucket = parsed_url.netloc + prefix = parsed_url.path.lstrip("/") + chunks = ( + [chunk_id, input_dir, chunk_dir, minimap2_args, f"s3://{bucket}/{db_chunk}", *queries] + for chunk_id, db_chunk in enumerate(_db_chunks(bucket, prefix)) + ) + with Pool(MAX_CHUNKS_IN_FLIGHT) as p: + p.starmap(_run_chunk, chunks) + run(["s3parcp", "--recursive", chunk_dir, "chunks"], check=True) + minimap2_merge("chunks", result_path, *queries) diff --git a/short-read-mngs/non_host_alignment.wdl b/short-read-mngs/non_host_alignment.wdl index a5eb3707..be20daad 100644 --- a/short-read-mngs/non_host_alignment.wdl +++ b/short-read-mngs/non_host_alignment.wdl @@ -148,6 +148,241 @@ task GenerateAnnotatedFasta { } } +task RunAlignment_minimap2_out { + input { + String s3_wd_uri + Array[File]+ fastas + String db_path + String minimap2_args + String docker_image_id + Boolean? run_locally = false + File? local_minimap2_index + String prefix + } + + command <<< + set -euxo pipefail + + if [[ "~{run_locally}" == true ]]; then + minimap2 ~{minimap2_args} "~{local_minimap2_index}" "~{sep=' ' fastas}" > "~{prefix}.paf" + else + export DEPLOYMENT_ENVIRONMENT=dev + python3 <>> + output { + File out_paf = "~{prefix}.paf" + File out_m8 = "~{prefix}.m8" + } + + runtime { + docker: docker_image_id + } +} +task RunAlignment_diamond_out { + input { + String docker_image_id + String s3_wd_uri + Array[File]+ fastas + String db_path + String diamond_args + Boolean? run_locally = false + File? local_diamond_index + String prefix + } + + command <<< + set -euxo pipefail + if [[ "~{run_locally}" == true ]]; then + diamond makedb --in "~{local_diamond_index}" -d reference + diamond blastx -d reference -q "~{sep=' ' fastas}" -o "~{prefix}.m8" + else + export DEPLOYMENT_ENVIRONMENT=dev + python3 <>> + + output { + File out_m8 = "~{prefix}.m8" + } + + runtime { + docker: docker_image_id + } +} + +task RunCallHitsMinimap2 { + input { + File m8_file + File lineage_db + File taxon_blacklist + File deuterostome_db + File accession2taxid + File duplicate_cluster_size + String prefix + Int min_read_length = 36 + String docker_image_id + String count_type = "NT" + } + + command <<< + set -euxo pipefail + python3 <>> + + output { + File deduped_out_m8 = "~{prefix}.deduped.m8" + File hitsummary = "~{prefix}.hitsummary.tab" + File counts_json = "~{prefix}_counts_with_dcr.json" + } + + runtime { + docker: docker_image_id + } +} +task RunCallHitsDiamond { + input { + File m8_file + File lineage_db + File taxon_blacklist + File deuterostome_db + File accession2taxid + File duplicate_cluster_size + String prefix + Int min_read_length = 0 + String docker_image_id + String count_type = "NR" + } + + command <<< + set -euxo pipefail + python3 <>> + + output { + File deduped_out_m8 = "~{prefix}.deduped.m8" + File hitsummary = "~{prefix}.hitsummary.tab" + File counts_json = "~{prefix}_counts_with_dcr.json" + } + + runtime { + docker: docker_image_id + } +} + +task RunCleanOutputs { + input { + File gsnap_m8 + File gsnap_deduped_m8 + File gsnap_hitsummary_tab + File gsnap_counts_with_dcr_json + File rapsearch2_m8 + File rapsearch2_deduped_m8 + File rapsearch2_hitsummary_tab + File rapsearch2_counts_with_dcr_json + } + command <<< + set -euxo pipefail + echo "Dummy task to clean outputs for pipeline viz" + cp "~{gsnap_m8}" "out_gsnap.m8" + cp "~{gsnap_deduped_m8}" "out_gsnap.deduped.m8" + cp "~{gsnap_hitsummary_tab}" "out_gsnap.hitsummary.tab" + cp "~{gsnap_counts_with_dcr_json}" "out_gsnap_counts_with_dcr.json" + cp "~{rapsearch2_m8}" "out_rapsearch2.m8" + cp "~{rapsearch2_deduped_m8}" "out_rapsearch2.deduped.m8" + cp "~{rapsearch2_hitsummary_tab}" "out_rapsearch2.hitsummary.tab" + cp "~{rapsearch2_counts_with_dcr_json}" "out_rapsearch_counts_with_dcr.json" + >>> + output { + File out_gsnap_m8 = "out_gsnap.m8" + File out_gsnap_deduped_m8 = "out_gsnap.deduped.m8" + File out_gsnap_hitsummary_tab = "out_gsnap.hitsummary.tab" + File out_gsnap_counts_with_dcr_json = "out_gsnap_counts_with_dcr.json" + File out_rapsearch2_m8 = "out_rapsearch2.m8" + File out_rapsearch2_deduped_m8 = "out_rapsearch2.deduped.m8" + File out_rapsearch2_hitsummary_tab = "out_rapsearch2.hitsummary.tab" + File out_rapsearch2_counts_with_dcr_json = "out_rapsearch_counts_with_dcr.json" + } +} + workflow idseq_non_host_alignment { input { String docker_image_id @@ -162,45 +397,106 @@ workflow idseq_non_host_alignment { File accession2taxid_db = "s3://idseq-public-references/alignment_data/2021-01-22/accession2taxid.db" File taxon_blacklist = "s3://idseq-public-references/taxonomy/2021-01-22/taxon_blacklist.txt" String index_dir_suffix = index_version + Int min_read_length = 36 File deuterostome_db = "s3://idseq-public-references/taxonomy/2021-01-22/deuterostome_taxids.txt" Boolean use_deuterostome_filter = true Boolean use_taxon_whitelist = false + Boolean alignment_scalability = false File? local_gsnap_index + File? minimap2_local_db_path + File? diamond_local_db_path String? local_gsnap_genome_name File? local_rapsearch2_index - } + String alignment_input_dir = "s3://idseq-samples-development/samples/alignment-scalability-test/combined-test/1/" + String minimap2_chunk_dir = "s3://idseq-samples-development/samples/alignment-scalability-test/combined-test/1/minimap2-chunks/" + String diamond_chunk_dir = "s3://idseq-samples-development/samples/alignment-scalability-test/combined-test/1/diamond-chunks/" + String minimap2_db = "s3://idseq-public-references/minimap2-test/2021-01-22/nt_k12_w8_20/" + String diamond_db = "s3://idseq-public-references/diamond-test/2021-01-22/" + String minimap2_args = "-cx sr --secondary=yes" + String diamond_args = "" + String minimap2_prefix = "gsnap" + String diamond_prefix = "rapsearch2" - call RunAlignment_gsnap_out { - input: - docker_image_id = docker_image_id, - s3_wd_uri = s3_wd_uri, - host_filter_out_gsnap_filter_fa = select_all([host_filter_out_gsnap_filter_1_fa, host_filter_out_gsnap_filter_2_fa, host_filter_out_gsnap_filter_merged_fa]), - duplicate_cluster_sizes_tsv = duplicate_cluster_sizes_tsv, - lineage_db = lineage_db, - accession2taxid_db = accession2taxid_db, - taxon_blacklist = taxon_blacklist, - deuterostome_db = deuterostome_db, - index_dir_suffix = index_dir_suffix, - use_deuterostome_filter = use_deuterostome_filter, - use_taxon_whitelist = use_taxon_whitelist, - run_locally = defined(local_gsnap_index), - index = local_gsnap_index, - genome_name = local_gsnap_genome_name - } - - call RunAlignment_rapsearch2_out { - input: - docker_image_id = docker_image_id, + } + if (!alignment_scalability){ + call RunAlignment_gsnap_out { + input: + docker_image_id = docker_image_id, + s3_wd_uri = s3_wd_uri, + host_filter_out_gsnap_filter_fa = select_all([host_filter_out_gsnap_filter_1_fa, host_filter_out_gsnap_filter_2_fa, host_filter_out_gsnap_filter_merged_fa]), + duplicate_cluster_sizes_tsv = duplicate_cluster_sizes_tsv, + lineage_db = lineage_db, + accession2taxid_db = accession2taxid_db, + taxon_blacklist = taxon_blacklist, + deuterostome_db = deuterostome_db, + index_dir_suffix = index_dir_suffix, + use_deuterostome_filter = use_deuterostome_filter, + use_taxon_whitelist = use_taxon_whitelist, + run_locally = defined(local_gsnap_index), + index = local_gsnap_index, + genome_name = local_gsnap_genome_name + } + call RunAlignment_rapsearch2_out { + input: + docker_image_id = docker_image_id, + s3_wd_uri = s3_wd_uri, + host_filter_out_gsnap_filter_fa = select_all([host_filter_out_gsnap_filter_1_fa, host_filter_out_gsnap_filter_2_fa, host_filter_out_gsnap_filter_merged_fa]), + duplicate_cluster_sizes_tsv = duplicate_cluster_sizes_tsv, + lineage_db = lineage_db, + accession2taxid_db = accession2taxid_db, + taxon_blacklist = taxon_blacklist, + index_dir_suffix = index_dir_suffix, + use_taxon_whitelist = use_taxon_whitelist, + run_locally = defined(local_rapsearch2_index), + index = local_rapsearch2_index + } + } + if (alignment_scalability) { + call RunAlignment_minimap2_out { + input: + docker_image_id = docker_image_id, + s3_wd_uri = s3_wd_uri, + fastas = [select_first([host_filter_out_gsnap_filter_merged_fa, host_filter_out_gsnap_filter_1_fa])], #select_all([host_filter_out_gsnap_filter_1_fa, host_filter_out_gsnap_filter_2_fa]), + db_path = minimap2_db, + minimap2_args = minimap2_args, + run_locally = defined(local_gsnap_index), + local_minimap2_index = minimap2_local_db_path, + prefix= minimap2_prefix + } + call RunCallHitsMinimap2{ + input: + m8_file = RunAlignment_minimap2_out.out_m8, + lineage_db = lineage_db, + duplicate_cluster_size = duplicate_cluster_sizes_tsv, + taxon_blacklist = taxon_blacklist, + deuterostome_db = deuterostome_db, + accession2taxid = accession2taxid_db, + prefix = minimap2_prefix, + min_read_length = min_read_length, + docker_image_id = docker_image_id + } + call RunAlignment_diamond_out { + input: + fastas = [select_first([host_filter_out_gsnap_filter_merged_fa, host_filter_out_gsnap_filter_1_fa])], #select_all([host_filter_out_gsnap_filter_1_fa, host_filter_out_gsnap_filter_2_fa]), s3_wd_uri = s3_wd_uri, - host_filter_out_gsnap_filter_fa = select_all([host_filter_out_gsnap_filter_1_fa, host_filter_out_gsnap_filter_2_fa, host_filter_out_gsnap_filter_merged_fa]), - duplicate_cluster_sizes_tsv = duplicate_cluster_sizes_tsv, - lineage_db = lineage_db, - accession2taxid_db = accession2taxid_db, - taxon_blacklist = taxon_blacklist, - index_dir_suffix = index_dir_suffix, - use_taxon_whitelist = use_taxon_whitelist, + db_path = diamond_db, + diamond_args = diamond_args, + prefix = diamond_prefix, run_locally = defined(local_rapsearch2_index), - index = local_rapsearch2_index + local_diamond_index = diamond_local_db_path, + docker_image_id = docker_image_id + } + call RunCallHitsDiamond { + input: + m8_file = RunAlignment_diamond_out.out_m8, + lineage_db = lineage_db, + duplicate_cluster_size = duplicate_cluster_sizes_tsv, + taxon_blacklist = taxon_blacklist, + deuterostome_db = deuterostome_db, + accession2taxid = accession2taxid_db, + prefix = diamond_prefix, + docker_image_id = docker_image_id + } } call CombineTaxonCounts { @@ -208,8 +504,8 @@ workflow idseq_non_host_alignment { docker_image_id = docker_image_id, s3_wd_uri = s3_wd_uri, counts_json_files = [ - RunAlignment_gsnap_out.gsnap_counts_with_dcr_json, - RunAlignment_rapsearch2_out.rapsearch2_counts_with_dcr_json + select_first([RunAlignment_gsnap_out.gsnap_counts_with_dcr_json, RunCallHitsMinimap2.counts_json]), + select_first([RunAlignment_rapsearch2_out.rapsearch2_counts_with_dcr_json, RunCallHitsDiamond.counts_json]) ] } @@ -218,28 +514,41 @@ workflow idseq_non_host_alignment { docker_image_id = docker_image_id, s3_wd_uri = s3_wd_uri, host_filter_out_gsnap_filter_fa = select_all([host_filter_out_gsnap_filter_1_fa, host_filter_out_gsnap_filter_2_fa, host_filter_out_gsnap_filter_merged_fa]), - gsnap_m8 = RunAlignment_gsnap_out.gsnap_m8, - gsnap_deduped_m8 = RunAlignment_gsnap_out.gsnap_deduped_m8, - gsnap_hitsummary_tab = RunAlignment_gsnap_out.gsnap_hitsummary_tab, - gsnap_counts_with_dcr_json = RunAlignment_gsnap_out.gsnap_counts_with_dcr_json, - rapsearch2_m8 = RunAlignment_rapsearch2_out.rapsearch2_m8, - rapsearch2_deduped_m8 = RunAlignment_rapsearch2_out.rapsearch2_deduped_m8, - rapsearch2_hitsummary_tab = RunAlignment_rapsearch2_out.rapsearch2_hitsummary_tab, - rapsearch2_counts_with_dcr_json = RunAlignment_rapsearch2_out.rapsearch2_counts_with_dcr_json, + gsnap_m8 = select_first([RunAlignment_gsnap_out.gsnap_m8, RunAlignment_minimap2_out.out_m8]), + gsnap_deduped_m8 = select_first([RunAlignment_gsnap_out.gsnap_deduped_m8, RunCallHitsMinimap2.deduped_out_m8]), + gsnap_hitsummary_tab = select_first([RunAlignment_gsnap_out.gsnap_hitsummary_tab, RunCallHitsMinimap2.hitsummary]), + gsnap_counts_with_dcr_json = select_first([RunAlignment_gsnap_out.gsnap_counts_with_dcr_json, RunCallHitsMinimap2.counts_json]), + rapsearch2_m8 = select_first([RunAlignment_rapsearch2_out.rapsearch2_m8, RunAlignment_diamond_out.out_m8]), + rapsearch2_deduped_m8 = select_first([RunAlignment_rapsearch2_out.rapsearch2_deduped_m8, RunCallHitsDiamond.deduped_out_m8]), + rapsearch2_hitsummary_tab = select_first([RunAlignment_rapsearch2_out.rapsearch2_hitsummary_tab, RunCallHitsDiamond.hitsummary]), + rapsearch2_counts_with_dcr_json = select_first([RunAlignment_rapsearch2_out.rapsearch2_counts_with_dcr_json, RunCallHitsDiamond.counts_json]), idseq_dedup_out_duplicate_clusters_csv = idseq_dedup_out_duplicate_clusters_csv, duplicate_cluster_sizes_tsv = duplicate_cluster_sizes_tsv } + + call RunCleanOutputs { + input: + gsnap_m8 = select_first([RunAlignment_gsnap_out.gsnap_m8, RunAlignment_minimap2_out.out_m8]), + gsnap_deduped_m8 = select_first([RunAlignment_gsnap_out.gsnap_deduped_m8, RunCallHitsMinimap2.deduped_out_m8]), + gsnap_hitsummary_tab = select_first([RunAlignment_gsnap_out.gsnap_hitsummary_tab, RunCallHitsMinimap2.hitsummary]), + gsnap_counts_with_dcr_json = select_first([RunAlignment_gsnap_out.gsnap_counts_with_dcr_json, RunCallHitsMinimap2.counts_json]), + rapsearch2_m8 = select_first([RunAlignment_rapsearch2_out.rapsearch2_m8, RunAlignment_diamond_out.out_m8]), + rapsearch2_deduped_m8 = select_first([RunAlignment_rapsearch2_out.rapsearch2_deduped_m8, RunCallHitsDiamond.deduped_out_m8]), + rapsearch2_hitsummary_tab = select_first([RunAlignment_rapsearch2_out.rapsearch2_hitsummary_tab, RunCallHitsDiamond.hitsummary]), + rapsearch2_counts_with_dcr_json = select_first([RunAlignment_rapsearch2_out.rapsearch2_counts_with_dcr_json, RunCallHitsDiamond.counts_json]) + + } output { - File gsnap_out_gsnap_m8 = RunAlignment_gsnap_out.gsnap_m8 - File gsnap_out_gsnap_deduped_m8 = RunAlignment_gsnap_out.gsnap_deduped_m8 - File gsnap_out_gsnap_hitsummary_tab = RunAlignment_gsnap_out.gsnap_hitsummary_tab - File gsnap_out_gsnap_counts_with_dcr_json = RunAlignment_gsnap_out.gsnap_counts_with_dcr_json + File gsnap_out_gsnap_m8 = RunCleanOutputs.out_gsnap_m8 + File gsnap_out_gsnap_deduped_m8 = RunCleanOutputs.out_gsnap_deduped_m8 + File gsnap_out_gsnap_hitsummary_tab = RunCleanOutputs.out_gsnap_hitsummary_tab + File gsnap_out_gsnap_counts_with_dcr_json = RunCleanOutputs.out_gsnap_counts_with_dcr_json File? gsnap_out_count = RunAlignment_gsnap_out.output_read_count - File rapsearch2_out_rapsearch2_m8 = RunAlignment_rapsearch2_out.rapsearch2_m8 - File rapsearch2_out_rapsearch2_deduped_m8 = RunAlignment_rapsearch2_out.rapsearch2_deduped_m8 - File rapsearch2_out_rapsearch2_hitsummary_tab = RunAlignment_rapsearch2_out.rapsearch2_hitsummary_tab - File rapsearch2_out_rapsearch2_counts_with_dcr_json = RunAlignment_rapsearch2_out.rapsearch2_counts_with_dcr_json + File rapsearch2_out_rapsearch2_m8 = RunCleanOutputs.out_rapsearch2_m8 + File rapsearch2_out_rapsearch2_deduped_m8 = RunCleanOutputs.out_rapsearch2_deduped_m8 + File rapsearch2_out_rapsearch2_hitsummary_tab = RunCleanOutputs.out_rapsearch2_hitsummary_tab + File rapsearch2_out_rapsearch2_counts_with_dcr_json = RunCleanOutputs.out_rapsearch2_counts_with_dcr_json File? rapsearch2_out_count = RunAlignment_rapsearch2_out.output_read_count File taxon_count_out_taxon_counts_with_dcr_json = CombineTaxonCounts.taxon_counts_with_dcr_json File? taxon_count_out_count = CombineTaxonCounts.output_read_count diff --git a/short-read-mngs/test/local_test_viral_minimap2.yml b/short-read-mngs/test/local_test_viral_minimap2.yml new file mode 100644 index 00000000..176fd8e3 --- /dev/null +++ b/short-read-mngs/test/local_test_viral_minimap2.yml @@ -0,0 +1,32 @@ +# local_test_viral.yml +# Boilerplate local_driver.wdl input YAML for use with `miniwdl run` so that this file, +# docker_image_id, fastqs_0, and fastqs_1 are the only inputs required on the command line. +# For testing purposes, uses reference databases with only viral sequences, instead of full NR/NT +# databases, to reduce download and processing burden. +# Note: this YAML file doesn't show all possible optional inputs to the workflow, only ones that +# must be set or overridden for the above-mentioned purposes. See the help message printed by +# `miniwdl run local_driver.wdl` to see them all. +host_filter.file_ext: fastq +host_filter.nucleotide_type: DNA +host_filter.host_genome: human +host_filter.star_genome: s3://idseq-public-references/host_filter/ercc/2018-02-15-utc-1518652800-unixtime__2018-02-15-utc-1518652800-unixtime/STAR_genome.tar +host_filter.bowtie2_genome: s3://idseq-public-references/host_filter/ercc/2018-02-15-utc-1518652800-unixtime__2018-02-15-utc-1518652800-unixtime/bowtie2_genome.tar +host_filter.gsnap_genome: s3://idseq-public-references/test/gsnap/ERCC_gsnap2017-11-15_k16.tar +host_filter.human_star_genome: s3://idseq-public-references/host_filter/ercc/2018-02-15-utc-1518652800-unixtime__2018-02-15-utc-1518652800-unixtime/STAR_genome.tar +host_filter.human_bowtie2_genome: s3://idseq-public-references/host_filter/ercc/2018-02-15-utc-1518652800-unixtime__2018-02-15-utc-1518652800-unixtime/bowtie2_genome.tar +host_filter.adapter_fasta: https://raw.githubusercontent.com/broadinstitute/viral-pipelines/master/test/input/clipDb.fasta +host_filter.max_input_fragments: 9000 +host_filter.max_subsample_fragments: 9000 +non_host_rapsearch2_index: s3://idseq-public-references/test/viral-alignment-indexes/viral_nr +non_host_gsnap_index: s3://idseq-public-references/test/viral-alignment-indexes/viral_nt +non_host_alignment.accession2taxid_db: s3://idseq-public-references/mini-database/alignment_indexes/2020-08-20-viral/viral_accessions2taxid.db +non_host_alignment.alignment_scalability: true +non_host_alignment.minimap2_local_db_path: s3://idseq-public-references/test/viral-alignment-indexes/viral_nt +non_host_alignment.diamond_local_db_path: s3://idseq-public-references/test/viral-alignment-indexes/viral_nr +postprocess.nt_db: s3://idseq-public-references/test/viral-alignment-indexes/viral_nt +postprocess.nt_loc_db: s3://idseq-public-references/test/viral-alignment-indexes/viral_nt_loc.db +postprocess.nr_db: s3://idseq-public-references/test/viral-alignment-indexes/viral_nr +postprocess.nr_loc_db: s3://idseq-public-references/test/viral-alignment-indexes/viral_nr_loc.db +experimental.nt_db: s3://idseq-public-references/test/viral-alignment-indexes/viral_nt +experimental.nt_loc_db: s3://idseq-public-references/test/viral-alignment-indexes/viral_nt_loc.db +experimental.nt_info_db: s3://idseq-public-references/test/viral-alignment-indexes/viral_nt_info.db diff --git a/short-read-mngs/test/test_short_read_mngs.py b/short-read-mngs/test/test_short_read_mngs.py index 5d00906e..58c27421 100644 --- a/short-read-mngs/test/test_short_read_mngs.py +++ b/short-read-mngs/test/test_short_read_mngs.py @@ -29,7 +29,7 @@ def test_bench3_viral(short_read_mngs_bench3_viral_outputs): taxon_counts = json.load(infile)["pipeline_output"]["taxon_counts_attributes"] taxa = set(entry["tax_id"] for entry in taxon_counts) - assert abs(len(taxa) - 149) < 5 + assert abs(len(taxa) - 156) < 5 for filename in outp["outputs"]: if filename.endswith(".fasta"):