Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add check for duplicate records to MainVcfQc #705

Merged
merged 31 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
968ab5b
Initial commit
kjaisingh Aug 6, 2024
239d7f9
Added branch name to dockstore file
kjaisingh Aug 6, 2024
aa921e2
Updated script definitions
kjaisingh Aug 6, 2024
2317dd7
Updated WDL to use custom script
kjaisingh Aug 6, 2024
bf4d7bc
Modified output file paths
kjaisingh Aug 7, 2024
f734014
Added more logging
kjaisingh Aug 7, 2024
e7e9ddc
Used dot notation
kjaisingh Aug 7, 2024
8d98cd1
Minor logging changes
kjaisingh Aug 7, 2024
cfe266e
Removed glob() references
kjaisingh Aug 7, 2024
9d4fcce
Standardized variable input structure
kjaisingh Aug 7, 2024
4ff559c
Removed direct python script call
kjaisingh Aug 7, 2024
b72e9d6
Renamed output files to be duplicate instead of duplicated
kjaisingh Aug 7, 2024
2b4c476
Removed call to MergeDuplicates
kjaisingh Aug 7, 2024
ae2f33e
Undo commenting out of MergeDuplicates
kjaisingh Aug 7, 2024
6a270e5
Removed direct reference to vcfs_for_qc
kjaisingh Aug 8, 2024
0f5a1b6
Minor changes
kjaisingh Aug 8, 2024
899ff79
Added option for both default and custom script
kjaisingh Aug 9, 2024
f9af767
Modified path to python scripts
kjaisingh Aug 9, 2024
e9e5872
Removed unused import
kjaisingh Aug 9, 2024
37e20c2
Resoled all flake8 linting errors
kjaisingh Aug 9, 2024
89cd795
Rolled back to solely use custom scripts for time being
kjaisingh Aug 9, 2024
f6a3025
Update wdl/MainVcfQc.wdl
kjaisingh Sep 25, 2024
8b5bd12
Updated files
kjaisingh Sep 25, 2024
3ae646f
Merge branch 'kj/701_vcf_qc_duplicates' of https://github.com/broadin…
kjaisingh Sep 25, 2024
b709ec9
Merge branch 'main' into kj/701_vcf_qc_duplicates
kjaisingh Sep 25, 2024
be4f722
Re-added extra line
kjaisingh Sep 26, 2024
ad609e1
Corrected line add to test docker image publishing
kjaisingh Sep 26, 2024
f5b2ed0
Corrected to use identify_duplicates in task
kjaisingh Sep 26, 2024
3fb16a4
Added root path for python files
kjaisingh Sep 26, 2024
577e6b1
Minor script update - use lowercase exact
kjaisingh Sep 26, 2024
25a1467
Removed dockstore push
kjaisingh Sep 26, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 131 additions & 0 deletions src/sv-pipeline/scripts/identify_duplicates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
Identify and classify duplicated variants from an input VCF.
"""

from typing import List, Text, Optional
from collections import defaultdict
from itertools import groupby

import argparse
import sys
import pysam


def process_duplicates(vcf, fout):
# Initialize counters and buffers
counts = defaultdict(int)
exact_buffer = []
ins_buffer = []
current_chrom = None
current_pos = None

# Create output files
with open(f"{fout}_duplicate_records.tsv", 'w') as f_records, open(f"{fout}_duplicate_counts.tsv", 'w') as f_counts:
f_records.write("TYPE\tDUP_RECORDS\n")
f_counts.write("TYPE\tDUP_COUNTS\n")

# Iterate through all records
for record in vcf.fetch():
# Process current buffers if we've reached a new chrom or pos
if record.chrom != current_chrom or record.pos != current_pos:
process_buffers(exact_buffer, ins_buffer, counts, f_records)
exact_buffer = []
ins_buffer = []
current_chrom = record.chrom
current_pos = record.pos

# Update buffers with new record
exact_key = (
record.chrom,
record.pos,
record.stop,
record.info.get('SVTYPE'),
record.info.get('SVLEN'),
record.info.get('CHR2'),
record.info.get('END2'),
record.info.get('STRANDS'),
record.info.get('CPX_TYPE'),
record.info.get('CPX_INTERVALS')
)
exact_buffer.append((exact_key, record.id))

if record.info.get('SVTYPE') == 'INS':
insert_key = (
record.id,
record.info.get('SVLEN'),
record.alts[0]
)
ins_buffer.append(insert_key)

# Process remaining records in the buffer
process_buffers(exact_buffer, ins_buffer, counts, f_records)

# Write counts to file
for match_type in sorted(counts.keys()):
f_counts.write(f"{match_type}\t{counts[match_type]}\n")


def process_buffers(exact_buffer, ins_buffer, counts, f_records):
# Process exact matches
sorted_buffer = sorted(exact_buffer, key=lambda x: x[0])
exact_matches = {
key: [record for _, record in group] for key, group in groupby(sorted_buffer, key=lambda x: x[0])
}
for records in exact_matches.values():
if len(records) > 1:
counts['exact'] += 1
f_records.write(f"exact\t{','.join(sorted(records))}\n")

# Process insert matches
for i in range(len(ins_buffer)):
for j in range(i + 1, len(ins_buffer)):
rec1, rec2 = ins_buffer[i], ins_buffer[j]

# Size comparison
if rec1[1] == rec2[1]:
counts['ins_size_similarity_100'] += 1
f_records.write(f"ins_size_similarity_100\t{rec1[0]},{rec2[0]}\n")
elif abs(rec1[1] - rec2[1]) <= 0.5 * max(rec1[1], rec2[1]):
counts['ins_size_similarity_50'] += 1
f_records.write(f"ins_size_similarity_50\t{rec1[0]},{rec2[0]}\n")
else:
counts['ins_size_similarity_0'] += 1
f_records.write(f"ins_size_similarity_0\t{rec1[0]},{rec2[0]}\n")

# ALT comparison
if rec1[2] == rec2[2]:
counts['ins_alt_identical'] += 1
f_records.write(f"ins_alt_identical\t{rec1[0]},{rec2[0]}\n")
elif ('<INS>' in (rec1[2], rec2[2])) and ('<INS:' in (rec1[2] + rec2[2])):
counts['ins_alt_same_subtype'] += 1
f_records.write(f"ins_alt_same_subtype\t{rec1[0]},{rec2[0]}\n")
elif rec1[2].startswith('<INS:') and rec2[2].startswith('<INS:') and rec1[2] != rec2[2]:
counts['ins_alt_different_subtype'] += 1
f_records.write(f"ins_alt_different_subtype\t{rec1[0]},{rec2[0]}\n")


def _parse_arguments(argv: List[Text]) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Identify duplicated records from a sorted input VCF",
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument('-v', '--vcf', required=True, help='Input VCF.')
parser.add_argument('-f', '--fout', required=True, help='Output file name.')
parsed_arguments = parser.parse_args(argv[1:])
return parsed_arguments


def main(argv: Optional[List[Text]] = None):
if argv is None:
argv = sys.argv
args = _parse_arguments(argv)

vcf = pysam.VariantFile(args.vcf)
process_duplicates(vcf, args.fout)


if __name__ == '__main__':
main()
72 changes: 72 additions & 0 deletions src/sv-pipeline/scripts/merge_duplicates.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
Merge duplicate records and counts files across multiple TSVs.
"""

from typing import List, Text, Optional
from collections import defaultdict

import argparse
import sys
import csv


def merge_duplicates(record_files: List[str], count_files: List[str], fout: str):
# Merge records
with open(f"{fout}_duplicate_records.tsv", 'w', newline='') as out_records:
writer = csv.writer(out_records, delimiter='\t')
# Write header
writer.writerow(['TYPE', 'DUP_RECORDS'])
# Append records from each file
for record_file in record_files:
with open(record_file, 'r') as f:
reader = csv.reader(f, delimiter='\t')
next(reader)
for row in reader:
writer.writerow(row)

# Sum counts
counts = defaultdict(int)
for count_file in count_files:
with open(count_file, 'r') as f:
reader = csv.reader(f, delimiter='\t')
next(reader)
for row in reader:
counts[row[0]] += int(row[1])

# Merge counts
with open(f"{fout}_duplicate_counts.tsv", 'w', newline='') as out_counts:
writer = csv.writer(out_counts, delimiter='\t')

# Write header
writer.writerow(['TYPE', 'DUP_COUNTS'])

# Append each row from merged counts
for category, count in counts.items():
writer.writerow([category, count])


def _parse_arguments(argv: List[Text]) -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Merge duplicate records and counts files across multiple TSVs",
formatter_class=argparse.ArgumentDefaultsHelpFormatter
)
parser.add_argument('-r', '--records', nargs='+', required=True, help='Input duplicated record TSV files.')
parser.add_argument('-c', '--counts', nargs='+', required=True, help='Input duplicated counts TSV files.')
parser.add_argument('-f', '--fout', required=True, help='Output file name.')
parsed_arguments = parser.parse_args(argv[1:])
return parsed_arguments


def main(argv: Optional[List[Text]] = None):
if argv is None:
argv = sys.argv
args = _parse_arguments(argv)

merge_duplicates(args.records, args.counts, args.fout)


if __name__ == '__main__':
main()
125 changes: 125 additions & 0 deletions wdl/MainVcfQc.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ workflow MainVcfQc {
RuntimeAttr? runtime_override_plot_qc_per_sample
RuntimeAttr? runtime_override_plot_qc_per_family
RuntimeAttr? runtime_override_per_sample_benchmark_plot
RuntimeAttr? runtime_override_identify_duplicates
RuntimeAttr? runtime_override_merge_duplicates
RuntimeAttr? runtime_override_sanitize_outputs

# overrides for MiniTasks or Utils
Expand Down Expand Up @@ -275,6 +277,27 @@ workflow MainVcfQc {
}
}

# Identify all duplicates
scatter(vcf in vcfs_for_qc) {
call IdentifyDuplicates {
input:
prefix=prefix,
vcf=vcf,
sv_pipeline_qc_docker=sv_pipeline_qc_docker,
runtime_attr_override=runtime_override_identify_duplicates
}
}

# Merge duplicates
call MergeDuplicates {
input:
prefix=prefix,
tsv_records=IdentifyDuplicates.duplicate_records,
tsv_counts=IdentifyDuplicates.duplicate_counts,
sv_pipeline_qc_docker=sv_pipeline_qc_docker,
runtime_attr_override=runtime_override_merge_duplicates
}

# Sanitize all outputs
call SanitizeOutputs {
input:
Expand All @@ -296,6 +319,8 @@ workflow MainVcfQc {
output {
File sv_vcf_qc_output = SanitizeOutputs.vcf_qc_tarball
File vcf2bed_output = MergeVcf2Bed.merged_bed_file
File duplicate_records_output = MergeDuplicates.duplicate_records
File duplicate_counts_output = MergeDuplicates.duplicate_counts
}
}

Expand Down Expand Up @@ -858,3 +883,103 @@ task SanitizeOutputs {
}
}


# Identify all duplicates in a single file
task IdentifyDuplicates {
input {
String prefix
File vcf
String sv_pipeline_qc_docker
RuntimeAttr? runtime_attr_override
}

String vcf_basename = basename(vcf, ".vcf.gz")
String full_prefix = "~{prefix}.~{vcf_basename}"

RuntimeAttr runtime_default = object {
mem_gb: 3.75,
disk_gb: 10 + ceil(size(vcf, "GiB")),
cpu_cores: 1,
preemptible_tries: 1,
max_retries: 1,
boot_disk_gb: 10
}

RuntimeAttr runtime_override = select_first([runtime_attr_override, runtime_default])
runtime {
memory: "~{select_first([runtime_override.mem_gb, runtime_default.mem_gb])} GiB"
disks: "local-disk ~{select_first([runtime_override.disk_gb, runtime_default.disk_gb])} HDD"
cpu: select_first([runtime_override.cpu_cores, runtime_default.cpu_cores])
preemptible: select_first([runtime_override.preemptible_tries, runtime_default.preemptible_tries])
maxRetries: select_first([runtime_override.max_retries, runtime_default.max_retries])
docker: sv_pipeline_qc_docker
bootDiskSizeGb: select_first([runtime_override.boot_disk_gb, runtime_default.boot_disk_gb])
}

command <<<
set -euo pipefail

echo "Processing ~{vcf} into ~{full_prefix}..."

python /opt/sv-pipeline/scripts/identify_duplicates.py \
--vcf ~{vcf} \
--fout ~{full_prefix}

echo "Finishing processing VCF."
>>>

output {
File duplicate_records = "~{full_prefix}_duplicate_records.tsv"
File duplicate_counts = "~{full_prefix}_duplicate_counts.tsv"
}
}


# Aggregate distinct duplicate summary files
task MergeDuplicates {
input {
String prefix
Array[File] tsv_records
Array[File] tsv_counts
String sv_pipeline_qc_docker
RuntimeAttr? runtime_attr_override
}

RuntimeAttr runtime_default = object {
mem_gb: 3.75,
disk_gb: 5 + ceil(size(tsv_records, "GiB")) + ceil(size(tsv_counts, "GiB")),
cpu_cores: 1,
preemptible_tries: 1,
max_retries: 1,
boot_disk_gb: 10
}

RuntimeAttr runtime_override = select_first([runtime_attr_override, runtime_default])
runtime {
memory: "~{select_first([runtime_override.mem_gb, runtime_default.mem_gb])} GiB"
disks: "local-disk ~{select_first([runtime_override.disk_gb, runtime_default.disk_gb])} HDD"
cpu: select_first([runtime_override.cpu_cores, runtime_default.cpu_cores])
preemptible: select_first([runtime_override.preemptible_tries, runtime_default.preemptible_tries])
maxRetries: select_first([runtime_override.max_retries, runtime_default.max_retries])
docker: sv_pipeline_qc_docker
bootDiskSizeGb: select_first([runtime_override.boot_disk_gb, runtime_default.boot_disk_gb])
}

command <<<
set -euo pipefail

echo "Merging all TSV files into one..."

python /opt/sv-pipeline/scripts/merge_duplicates.py \
--records ~{sep=' ' tsv_records} \
--counts ~{sep=' ' tsv_counts} \
--fout "~{prefix}.agg"

echo "All TSVs processed."
>>>

output {
File duplicate_records = "~{prefix}.agg_duplicate_records.tsv"
File duplicate_counts = "~{prefix}.agg_duplicate_counts.tsv"
}
}
Loading