Skip to content
This repository has been archived by the owner on Jan 20, 2022. It is now read-only.

Add minimap2 to wdl #184

Open
wants to merge 34 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
5732343
transfer utils
rzlim08 Oct 20, 2021
aeff3d1
remove unnecessary idseq-dag m8 files
rzlim08 Oct 21, 2021
7154525
replace count_reads file
rzlim08 Oct 21, 2021
435b1fc
linting
rzlim08 Oct 22, 2021
1a724f1
add miniwdl code for diamond
rzlim08 Oct 22, 2021
1963d55
linting
rzlim08 Oct 22, 2021
f0d49e1
add miniwdl to main short-read-mngs wdl
rzlim08 Nov 12, 2021
1d5d65a
transfer utils
rzlim08 Oct 20, 2021
c54b40c
remove unnecessary idseq-dag m8 files
rzlim08 Oct 21, 2021
7523b3a
replace count_reads file
rzlim08 Oct 21, 2021
4cb6f50
linting
rzlim08 Oct 22, 2021
ed4393a
add miniwdl code for diamond
rzlim08 Oct 22, 2021
f9bbe7c
linting
rzlim08 Oct 22, 2021
b129172
add miniwdl to main short-read-mngs wdl
rzlim08 Nov 12, 2021
b826464
Merge branch 'rlim-add-minimap2-to-wdl' of github.com:chanzuckerberg/…
rzlim08 Nov 12, 2021
7578f8b
add diamond to wdl
rzlim08 Nov 12, 2021
476a1de
revert to using gsnap/rapsearch in local test
rzlim08 Nov 13, 2021
1dae799
use prod database
rzlim08 Nov 17, 2021
3a364a7
clean up and pass arguments to minimap2
rzlim08 Nov 17, 2021
b61f5e9
set bash options
rzlim08 Nov 18, 2021
ee48647
change database and fix single end reads
rzlim08 Nov 18, 2021
98dcdf0
fix diamond min_alignment_length
rzlim08 Nov 18, 2021
085d239
Merge branch 'main' into rlim-add-minimap2-to-wdl
rzlim08 Nov 23, 2021
413e0c5
update minimap2 database to newest
rzlim08 Nov 23, 2021
2e7b15d
clean up alignment scalability code
rzlim08 Dec 1, 2021
be26062
add trailing slash to chunks directory
rzlim08 Dec 1, 2021
5a3d68f
clean up alignment scalability wdl
rzlim08 Dec 2, 2021
edd3cd3
change deployment environment back to dev
rzlim08 Dec 2, 2021
07a3572
add improved logging
rzlim08 Dec 10, 2021
b5c9f27
linting
rzlim08 Dec 15, 2021
9c1f4b3
Add minimap2/diamond local steps (#190)
rzlim08 Dec 20, 2021
75fd49d
change output names to avoid loop in viz
rzlim08 Dec 21, 2021
bbb9da1
fix viz
rzlim08 Dec 22, 2021
ca5a808
unindent python3 code
rzlim08 Dec 23, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions short-read-mngs/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -141,5 +141,24 @@ WORKDIR /
COPY idseq-dag /tmp/idseq-dag
RUN pip3 install /tmp/idseq-dag && rm -rf /tmp/idseq-dag

RUN apt-get -y update && apt-get install -y build-essential libz-dev git python3-pip cmake

WORKDIR /tmp
RUN git clone --recursive https://github.com/mlin/minimap2-scatter.git
WORKDIR /tmp/minimap2-scatter
RUN make minimap2
RUN mv /tmp/minimap2-scatter/minimap2/minimap2 /usr/local/bin/minimap2

WORKDIR /tmp
RUN git clone https://github.com/morsecodist/diamond
WORKDIR /tmp/diamond
RUN git checkout minimal-mods
WORKDIR /tmp/diamond/build
RUN cmake -DCMAKE_BUILD_TYPE=Release ..
RUN make -j6
RUN mv diamond /usr/local/bin

RUN curl -Ls https://github.com/chanzuckerberg/s3parcp/releases/download/v0.2.0-alpha/s3parcp_0.2.0-alpha_Linux_x86_64.tar.gz | tar -C /usr/bin -xz s3parcp

COPY idseq_utils /tmp/idseq_utils
RUN pip3 install /tmp/idseq_utils && rm -rf /tmp/idseq_utils
2 changes: 1 addition & 1 deletion short-read-mngs/idseq_utils/idseq_utils/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
''' idseq_utils '''
""" idseq_utils """
__version__ = "EXTERNALLY_MANAGED"
129 changes: 129 additions & 0 deletions short-read-mngs/idseq_utils/idseq_utils/batch_run_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import time
import random
import boto3
import json
import requests
import logging
from botocore.exceptions import ClientError

log = logging.getLogger(__name__)

MAX_CHUNKS_IN_FLIGHT = 10


def get_batch_job_desc_bucket():
try:
account_id = boto3.client("sts").get_caller_identity()["Account"]
except ClientError:
account_id = requests.get(
"http://169.254.169.254/latest/dynamic/instance-identity/document"
).json()["accountId"]
return f"aegea-batch-jobs-{account_id}"


class BatchJobFailed(Exception):
pass


def _log_alignment_batch_job_status(
job_id, job_queue, job_definition, chunk_id, status, alignment_algorithm
):
log.info(
"alignment_batch_job_status",
extra={
"job_id": job_id,
"chunk_id": chunk_id,
"job_queue": job_queue,
"job_definition": job_definition,
"status": status,
"alignment_algorithm": alignment_algorithm,
},
)


def _get_job_status(job_id):
batch_job_desc_bucket = boto3.resource("s3").Bucket(get_batch_job_desc_bucket())
key = f"job_descriptions/{job_id}"
try:
job_desc_object = batch_job_desc_bucket.Object(key)
return json.loads(job_desc_object.get()["Body"].read())["status"]
except ClientError as e:
if e.response["Error"]["Code"] == "NoSuchKey":
# Warn that the object is missing so any issue with the s3 mechanism can be identified
log.debug("missing_job_description_ojbect", extra={key: key})
# Return submitted because a missing job status probably means it hasn't been added yet
return "SUBMITTED"
else:
raise e


def _run_batch_job(job_name, job_queue, job_definition, environment, chunk_id, alignment_algorithm, retries):
client = boto3.client("batch")
response = client.submit_job(
jobName=job_name,
jobQueue=job_queue,
jobDefinition=job_definition,
containerOverrides={
"environment": environment,
},
retryStrategy={"attempts": retries},
)
job_id = response["jobId"]
_log_alignment_batch_job_status(
job_id, job_queue, job_definition, chunk_id, "SUBMITTED", alignment_algorithm
)

delay = 60 + random.randint(
-60 // 2, 60 // 2
) # Add some noise to de-synchronize chunks
status = "SUBMITTED"
# the job this is monitoring has an timeout and the job this runs in has a timeout
while True:
try:
status = _get_job_status(job_id)
except ClientError as e:
# If we get throttled, randomly wait to de-synchronize the requests
if e.response["Error"]["Code"] == "TooManyRequestsException":
log.warn("describe_jobs_rate_limit_error", extra={"job_id": job_id})
# Possibly implement a backoff here if throttling becomes an issue
else:
log.error(
"unexpected_client_error_while_polling_job_status",
extra={"job_id": job_id},
)
raise e

if status == "SUCCEEDED":
_log_alignment_batch_job_status(
job_id, job_queue, job_definition, chunk_id, status, alignment_algorithm
)
return job_id
if status == "FAILED":
log.error(
"alignment_batch_job_failed",
extra={
"job_id": job_id,
"chunk_id": chunk_id,
"alignment_algorithm": alignment_algorithm,
},
)
_log_alignment_batch_job_status(
job_id, job_queue, job_definition, chunk_id, status, alignment_algorithm
)
raise BatchJobFailed("chunk alignment failed")
time.sleep(delay)


def _db_chunks(bucket: str, prefix):
s3_client = boto3.client("s3")
paginator = s3_client.get_paginator("list_objects_v2")
log.debug("db chunks")

result = paginator.paginate(
Bucket=bucket,
Prefix=prefix,
)

for page in result:
for obj in page["Contents"]:
yield obj["Key"]
Loading