From 3af294284b816abdaa59cbbd499fdc87d374e9c9 Mon Sep 17 00:00:00 2001 From: kz Date: Wed, 22 Nov 2023 12:16:55 -0800 Subject: [PATCH 1/2] multiprocessing for trimming --- mopp/modules/trim.py | 49 +++++++++++++++++++++++++++++++++++++------ mopp/modules/utils.py | 8 +++++++ mopp/mopp.py | 7 ++++--- 3 files changed, 55 insertions(+), 9 deletions(-) diff --git a/mopp/modules/trim.py b/mopp/modules/trim.py index c089068..b4f4308 100644 --- a/mopp/modules/trim.py +++ b/mopp/modules/trim.py @@ -2,13 +2,38 @@ import subprocess from pathlib import Path from mopp.modules.utils import create_folder +from mopp.modules.utils import pool_processes from mopp.modules.metadata import load_metadata +from multiprocessing import Pool + + logger = logging.getLogger("mopp") -def trim_files(indir, outdir, md_path): + + +def run_trim_metars(args): + r1, outdir = args + return _run_trim_metars(r1, outdir) + +def run_trim_paired(args): + r1, r2, outdir = args + return _run_trim_paired(r1, r2, outdir) + +def rename_files(args): + outdir_trimmed, outdir_cat, identifier, omic = args + return _rename_files(outdir_trimmed, outdir_cat, identifier, omic) + +def cat_paired(args): + outdir_trimmed, outdir_cat, identifier, omic = args + print(args) + return _cat_paired(outdir_trimmed, outdir_cat, identifier, omic) + + + +def trim_files(indir, outdir, md_path, threads): # load metadata into md_dict md_dict = load_metadata(md_path) @@ -19,18 +44,29 @@ def trim_files(indir, outdir, md_path): create_folder(outdir_cat) create_folder(outdir_trimmed) - # trim files + arg_list_metars = [] + arg_list_trimpaired = [] + arg_list_renamefiles = [] + arg_list_catpaired =[] + + for identifier, omic_dict in md_dict.items(): for omic in omic_dict.keys(): r1_file = Path(indir) / omic_dict[omic][0] if omic == "metaRS": - _run_trim_metars(r1_file, outdir_trimmed) - _rename_files(outdir_trimmed, outdir_cat, identifier, omic) + arg_list_metars.append((r1_file, outdir_trimmed)) + arg_list_renamefiles.append((outdir_trimmed, outdir_cat, identifier, omic)) else: r2_file = Path(indir) / omic_dict[omic][1] + arg_list_trimpaired.append((r1_file, r2_file, outdir_trimmed)) + arg_list_catpaired.append((outdir_trimmed, outdir_cat, identifier, omic)) + + pool_processes(threads, [[run_trim_metars, arg_list_metars], + [run_trim_paired, arg_list_trimpaired]]) + + pool_processes(threads, [[rename_files, arg_list_renamefiles], + [cat_paired, arg_list_catpaired]]) - _run_trim_paired(r1_file, r2_file, outdir_trimmed) - _cat_paired(outdir_trimmed, outdir_cat, identifier, omic) def _rename_files(indir, outdir, identifier, omic): @@ -49,6 +85,7 @@ def _rename_files(indir, outdir, identifier, omic): def _run_trim_paired(r1_file, r2_file, outdir): + commands = [ "trim_galore", "--paired", diff --git a/mopp/modules/utils.py b/mopp/modules/utils.py index 0228ec9..dce51e4 100644 --- a/mopp/modules/utils.py +++ b/mopp/modules/utils.py @@ -1,6 +1,7 @@ import shutil import subprocess +from multiprocessing import Pool def create_folder(current_dir): if current_dir.exists() and current_dir.is_dir(): @@ -16,6 +17,13 @@ def clear_folder(current_dir): if item.is_dir(): shutil.rmtree(item) +def pool_processes(num_processes, function_list): + with Pool(processes=num_processes) as pool: + for func in function_list: + pool.map_async(func[0], func[1]) + pool.close() + pool.join() + # import time # import logging diff --git a/mopp/mopp.py b/mopp/mopp.py index da32d7c..628a8c9 100644 --- a/mopp/mopp.py +++ b/mopp/mopp.py @@ -96,7 +96,7 @@ def workflow( outdir_index_path = outdir_index / f"{prefix}_bt2index" / prefix outdir_features = Path(output_dir) / "features" - trim_files(input_dir, outdir_trimmed, metadata) + trim_files(input_dir, outdir_trimmed, metadata, threads) align_files( outdir_trimmed, outdir_aligned_metaG, "*metaG*.fq.gz", index, threads ) @@ -123,7 +123,8 @@ def workflow( @click.option("-i", "--input_dir", required=True, help=DESC_INPUT) @click.option("-o", "--output_dir", required=True, help=DESC_OUTPUT) @click.option("-m", "--metadata", required=True, help=DESC_MD) -def trim(input_dir, output_dir, metadata): +@click.option("-t", "--threads", default=4, help=DESC_NTHREADS) +def trim(input_dir, output_dir, metadata, threads): logger.setLevel(logging.INFO) filer_handler = logging.FileHandler(f"mopp_{timestamp}.log") filer_handler.setFormatter(formatter) @@ -133,7 +134,7 @@ def trim(input_dir, output_dir, metadata): logger.addHandler(stream_handler) try: - trim_files(input_dir, output_dir, metadata) + trim_files(input_dir, output_dir, metadata, threads) except Exception as e: logger.error(f"An error occurred: {str(e)}", exc_info=True) From 1fc9fd44d24d1752753736d8c568743f0d45848e Mon Sep 17 00:00:00 2001 From: kz Date: Mon, 27 Nov 2023 12:01:47 -0800 Subject: [PATCH 2/2] storage prediction --- mopp/modules/utils.py | 42 ++++++++++++++++++++++++++++++++++++++++-- mopp/mopp.py | 15 ++++++++++++++- 2 files changed, 54 insertions(+), 3 deletions(-) diff --git a/mopp/modules/utils.py b/mopp/modules/utils.py index dce51e4..31a0c9c 100644 --- a/mopp/modules/utils.py +++ b/mopp/modules/utils.py @@ -1,8 +1,8 @@ import shutil -import subprocess - +import os from multiprocessing import Pool + def create_folder(current_dir): if current_dir.exists() and current_dir.is_dir(): clear_folder(current_dir) @@ -24,6 +24,44 @@ def pool_processes(num_processes, function_list): pool.close() pool.join() +def get_directory_size(directory_path): + total_size = 0 + try: + with os.scandir(directory_path) as entries: + for entry in entries: + if entry.is_file(): + total_size += entry.stat().st_size + elif entry.is_dir(): + total_size += get_directory_size(entry.path) + return total_size + except FileNotFoundError: + return None + +def convert_size(size_bytes): + for unit in ['B', 'KB', 'MB', 'GB', 'TB']: + if size_bytes < 1024.0: + return f"{size_bytes:.2f} {unit}" + size_bytes /= 1024.0 + + +def get_available_storage(path="/"): + try: + total, used, free = shutil.disk_usage(path) + return free + except FileNotFoundError: + return None + +def check_storage(input, multiplier=2): + input_size = get_directory_size(input) + available_space = get_available_storage() + + ideal_space = input_size * multiplier + + if ideal_space > available_space: + return convert_size(-(available_space-ideal_space)) + else: + return 1 + # import time # import logging diff --git a/mopp/mopp.py b/mopp/mopp.py index 628a8c9..6ff03a2 100644 --- a/mopp/mopp.py +++ b/mopp/mopp.py @@ -27,7 +27,7 @@ from mopp.modules.coverages import calculate_genome_coverages from mopp.modules.index import genome_extraction from mopp.modules.features import ft_generation - +from mopp.modules.utils import check_storage logger = logging.getLogger("mopp") timestamp = time.strftime("%Y-%m-%d_%H-%M-%S") @@ -96,6 +96,19 @@ def workflow( outdir_index_path = outdir_index / f"{prefix}_bt2index" / prefix outdir_features = Path(output_dir) / "features" + + ###Storage prediction + storage_check = check_storage(input_dir) + + if storage_check != 1: + print("WARNING: There may not be enough storage to complete this workflow. We recommend you free up ", storage_check , " of space.") + user_input = click.prompt("Would you still like to proceed? (Y/N)", type=str) + if user_input.lower() == "n": + return + + + + trim_files(input_dir, outdir_trimmed, metadata, threads) align_files( outdir_trimmed, outdir_aligned_metaG, "*metaG*.fq.gz", index, threads