-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add refafilt script for filtering fasta files by sequence length
On branch dev modified: README.md modified: recentrifuge/__init__.py new file: refafilt modified: setup.py
- Loading branch information
Showing
4 changed files
with
338 additions
and
6 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,326 @@ | ||
#!/usr/bin/env python3 | ||
# | ||
# Copyright (C) 2017–2023, Jose Manuel Martí Martínez | ||
# | ||
# This program is free software: you can redistribute it and/or modify | ||
# it under the terms of the GNU Affero General Public License as | ||
# published by the Free Software Foundation, either version 3 of the | ||
# License, or (at your option) any later version. | ||
# | ||
# This program is distributed in the hope that it will be useful, | ||
# but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
# GNU Affero General Public License for more details. | ||
# | ||
# You should have received a copy of the GNU Affero General Public License | ||
# along with this program. If not, see <https://www.gnu.org/licenses/>. | ||
# | ||
""" | ||
Filter a FASTA file attending to the length of the sequences. | ||
""" | ||
# pylint: disable=no-name-in-module, not-an-iterable | ||
import argparse | ||
import collections | ||
import contextlib | ||
import gzip | ||
import pathlib | ||
import sys | ||
import time | ||
from typing import Callable, Dict, TextIO, Counter, TypeAlias | ||
|
||
from Bio.SeqIO.FastaIO import SimpleFastaParser | ||
import matplotlib.pyplot as plt | ||
import numpy as np | ||
|
||
from recentrifuge import __version__, __author__, __date__ | ||
from recentrifuge.config import LICENSE, GZEXT | ||
from recentrifuge.config import gray, red, green, cyan, magenta, blue, yellow | ||
|
||
Filename: TypeAlias = pathlib.Path | ||
|
||
def main(): | ||
"""Main entry point to script.""" | ||
|
||
def vprint(*arguments) -> None: | ||
"""Print only if verbose/debug mode is enabled""" | ||
if args.debug: | ||
print(*arguments) | ||
|
||
def configure_parser(): | ||
"""Argument Parser Configuration""" | ||
parser = argparse.ArgumentParser( | ||
description='Filter a FASTA file by sequence length', | ||
epilog=f'%(prog)s - Release {__version__} - {__date__}' + LICENSE, | ||
formatter_class=argparse.RawDescriptionHelpFormatter | ||
) | ||
parser.add_argument( | ||
'-d', '--debug', | ||
action='store_true', | ||
help='increase output verbosity and perform additional checks' | ||
) | ||
parser.add_argument( | ||
'-i', '--input', | ||
action='store', | ||
metavar='FILE', | ||
type=Filename, | ||
default=None, | ||
required=True, | ||
help='input FASTA file, which may be gzipped' | ||
) | ||
parser.add_argument( | ||
'-o', '--output_pass', | ||
action='store', | ||
metavar='FILE', | ||
type=Filename, | ||
default=None, | ||
required=False, | ||
help='output file for sequences passing the filter(s)' | ||
) | ||
parser.add_argument( | ||
'-s', '--output_short', | ||
action='store', | ||
metavar='FILE', | ||
type=Filename, | ||
default=None, | ||
required=False, | ||
help='output file for sequences too short' | ||
) | ||
parser.add_argument( | ||
'-l', '--output_long', | ||
action='store', | ||
metavar='FILE', | ||
type=Filename, | ||
default=None, | ||
required=False, | ||
help='output file for sequences too long' | ||
) | ||
parser.add_argument( | ||
'-p', '--output_plot', | ||
action='store', | ||
metavar='FILE', | ||
type=Filename, | ||
default=None, | ||
required=False, | ||
help='output PDF file for histogram plot' | ||
) | ||
parser.add_argument( | ||
'-f', '--filter_less_than', | ||
action='store', | ||
metavar='NUMBER', | ||
type=int, | ||
default=None, | ||
help='minimum length of the accepted sequences' | ||
) | ||
parser.add_argument( | ||
'-F', '--filter_more_than', | ||
action='store', | ||
metavar='NUMBER', | ||
type=int, | ||
default=None, | ||
help='maximum length of the accepted sequences' | ||
) | ||
parser.add_argument( | ||
'-m', '--maxreads', | ||
action='store', | ||
metavar='NUMBER', | ||
type=int, | ||
default=None, | ||
help=('maximum number of FASTA reads to process; ' | ||
'default: no maximum') | ||
) | ||
parser.add_argument( | ||
'-c', '--compress', | ||
action='store_true', | ||
default=False, | ||
help='the resulting FASTA files will be gzipped' | ||
) | ||
parser.add_argument( | ||
'-V', '--version', | ||
action='version', | ||
version=f'%(prog)s release {__version__} ({__date__})' | ||
) | ||
return parser | ||
|
||
def check_debug(): | ||
"""Check debugging mode""" | ||
if args.debug: | ||
print(blue('INFO:'), gray('Debugging mode activated')) | ||
print(blue('INFO:'), gray('Active parameters:')) | ||
for key, val in vars(args).items(): | ||
if val is not None and val is not False and val != []: | ||
print(gray(f'\t{key} ='), f'{val}') | ||
|
||
# timing initialization | ||
start_time: float = time.time() | ||
# Program header | ||
print(f'\n=-= {sys.argv[0]} =-= v{__version__} - {__date__}' | ||
f' =-= by {__author__} =-=\n') | ||
sys.stdout.flush() | ||
|
||
# Parse arguments and check debug | ||
argparser = configure_parser() | ||
args = argparser.parse_args() | ||
check_debug() | ||
compr = args.compress | ||
in_fasta: Filename = Filename(args.input) | ||
output_pass: Filename = args.output_pass | ||
if output_pass is None: | ||
output_pass = Filename(f'{in_fasta.stem}_passed.fa') | ||
vprint(blue('INFO:'), | ||
gray('Passing filter output file: '), f'{output_pass}') | ||
output_plot: Filename = args.output_plot | ||
if output_plot is None: | ||
output_plot = Filename(f'{in_fasta.stem}_hist.pdf') | ||
|
||
min_filt: bool | ||
if args.filter_less_than is None: | ||
min_filt: bool = False | ||
else: | ||
min_filt: bool = True | ||
min_len: int = args.filter_less_than | ||
output_short: Filename = args.output_short | ||
if output_short is None: | ||
output_short = Filename(f'{in_fasta.stem}_tooshort{min_len}.fa') | ||
vprint(blue('INFO:'), | ||
gray('Too short filter output: '), f'{output_short}') | ||
|
||
max_filt: bool | ||
if args.filter_more_than is None: | ||
max_filt: bool = False | ||
else: | ||
max_filt: bool = True | ||
max_len: int = args.filter_more_than | ||
output_long: Filename = args.output_long | ||
if output_long is None: | ||
output_long = Filename(f'{in_fasta.stem}_toolong{max_len}nt.fa') | ||
vprint(blue('INFO:'), | ||
gray('Too long filter output: '), f'{output_long}') | ||
|
||
def is_gzipped(fpath: Filename): | ||
"""Check if a file exists and is gzipped""" | ||
try: | ||
with open(fpath, 'rb') as ftest: | ||
return ftest.read(2) == b'\x1f\x8b' | ||
except FileNotFoundError: | ||
return False | ||
|
||
# Prepare output | ||
handler: Callable[..., TextIO] | ||
fext: Filename = Filename('.fa') | ||
if compr: | ||
handler = gzip.open | ||
fext += GZEXT | ||
else: | ||
handler = open | ||
stats: Counter[int] = collections.Counter() | ||
|
||
# Main loop: filter FASTA file | ||
i: int = 0 | ||
pass_lens: dict[int] = [] | ||
tiny_lens: dict[int] = [] | ||
long_lens: dict[int] = [] | ||
print(gray('Filtering FASTA file'), f'{in_fasta}', gray('...\nMseqs: '), | ||
end='') | ||
sys.stdout.flush() | ||
if is_gzipped(in_fasta): | ||
handler = gzip.open | ||
else: | ||
handler = open | ||
with (handler(in_fasta, 'rt') as fa, | ||
open(output_pass, 'w') as pass_file, | ||
open(output_short, 'w') if min_filt else contextlib.nullcontext( | ||
None) as short_file, | ||
open(output_long, 'w') if max_filt else contextlib.nullcontext( | ||
None) as long_file | ||
): | ||
for i, (title, seq) in enumerate(SimpleFastaParser(fa) | ||
): | ||
if args.maxreads and i >= args.maxreads: | ||
print(yellow(' [stopping by maxreads limit]!'), end='') | ||
break | ||
elif not i % 1000000: | ||
print(f'{i // 1000000:_d}', end='') | ||
sys.stdout.flush() | ||
elif not i % 100000: | ||
print('.', end='') | ||
sys.stdout.flush() | ||
seq_len : int = len(seq) | ||
if min_filt and seq_len < min_len: | ||
short_file.write(f'>{title}\n') | ||
short_file.write(seq + '\n') | ||
stats['seq_tiny'] += 1 | ||
stats['nt_tiny'] += seq_len | ||
tiny_lens.append(seq_len) | ||
elif max_filt and seq_len > max_len: | ||
long_file.write(f'>{title}\n') | ||
long_file.write(seq + '\n') | ||
stats['seq_long'] += 1 | ||
stats['nt_long'] += seq_len | ||
long_lens.append(seq_len) | ||
else: | ||
pass_file.write(f'>{title}\n') | ||
pass_file.write(seq + '\n') | ||
stats['seq_pass'] += 1 | ||
stats['nt_pass'] += seq_len | ||
pass_lens.append(seq_len) | ||
|
||
# Final statistics | ||
print(cyan(f' {i / 1e+6:.3g} Mseqs'), green('OK! ')) | ||
print(gray('Passed'), magenta(f'{stats["nt_pass"] / 1e+6:.3g}'), | ||
gray('Mnucs in'), stats['seq_pass'], gray('sequences '), | ||
magenta(f'({stats["seq_passed"]/i:.3%}).')) | ||
pass_lens_np = np.array(pass_lens) | ||
if pass_lens: | ||
print(gray('MIN length: '), f'{np.min(pass_lens_np):n}') | ||
print(gray('AVG length: '), f'{np.average(pass_lens_np):.2g}') | ||
print(gray('MAX length: '), f'{np.max(pass_lens_np, initial=0):n}') | ||
print('') | ||
|
||
tiny_lens_np = np.array(tiny_lens) | ||
if min_filt: | ||
print(gray('Too short'), magenta(f'{stats["nt_tiny"] / 1e+3:.3g}'), | ||
gray('Knucs in'), stats['seq_tiny'], gray('sequences '), | ||
magenta(f'({stats["seq_tiny"]/i:.3%}).')) | ||
if tiny_lens: | ||
print(gray('Too short MIN length: '), f'{np.min(tiny_lens_np):n}') | ||
print(gray('Too short AVG length: '), | ||
f'{np.average(tiny_lens_np):.2g}') | ||
print(gray('Too short MAX length: '), f'{np.max(tiny_lens_np):n}') | ||
print('') | ||
|
||
long_lens_np = np.array(long_lens) | ||
if max_filt: | ||
print(gray('Too long'), magenta(f'{stats["nt_long"] / 1e+6:.3g}'), | ||
gray('Mnucs in'), stats['seq_long'], gray('sequences '), | ||
magenta(f'({stats["seq_long"]/i:.3%}).')) | ||
if long_lens: | ||
print(gray('Too long MIN length: '), f'{np.min(long_lens_np):n}') | ||
print(gray('Too long AVG length: '), | ||
f'{np.average(long_lens_np):.2g}') | ||
print(gray('Too long MAX length: '), f'{np.max(long_lens_np):n}') | ||
print('') | ||
|
||
# Histogram | ||
lens_np: np.ndarray = np.concatenate( | ||
(pass_lens_np, tiny_lens_np, long_lens_np)) | ||
bins = np.logspace(np.floor(np.log10(np.min(lens_np))), | ||
np.ceil(np.log10(np.max(lens_np))), 100) | ||
plt.hist(lens_np, bins=bins) | ||
plt.title("Log histogram of reads by length in nucleotides") | ||
plt.yscale("log") | ||
plt.ylabel("Number of reads per bin") | ||
plt.xscale("log") | ||
plt.xlabel("Length of the reads in nucleotides (100 bins)") | ||
if min_filt: | ||
plt.axvline(x=min_len, color='r') | ||
if max_filt: | ||
plt.axvline(x=max_len, color='r') | ||
plt.savefig(output_plot) | ||
|
||
# Timing results | ||
print(gray('Total elapsed time:'), time.strftime( | ||
"%H:%M:%S", time.gmtime(time.time() - start_time))) | ||
|
||
|
||
if __name__ == '__main__': | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters