Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storage prediction #16

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 43 additions & 6 deletions mopp/modules/trim.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,38 @@
import subprocess
from pathlib import Path
from mopp.modules.utils import create_folder
from mopp.modules.utils import pool_processes
from mopp.modules.metadata import load_metadata


from multiprocessing import Pool


logger = logging.getLogger("mopp")


def trim_files(indir, outdir, md_path):


def run_trim_metars(args):
r1, outdir = args
return _run_trim_metars(r1, outdir)

def run_trim_paired(args):
r1, r2, outdir = args
return _run_trim_paired(r1, r2, outdir)

def rename_files(args):
outdir_trimmed, outdir_cat, identifier, omic = args
return _rename_files(outdir_trimmed, outdir_cat, identifier, omic)

def cat_paired(args):
outdir_trimmed, outdir_cat, identifier, omic = args
print(args)
return _cat_paired(outdir_trimmed, outdir_cat, identifier, omic)



def trim_files(indir, outdir, md_path, threads):
# load metadata into md_dict
md_dict = load_metadata(md_path)

Expand All @@ -19,18 +44,29 @@ def trim_files(indir, outdir, md_path):
create_folder(outdir_cat)
create_folder(outdir_trimmed)

# trim files
arg_list_metars = []
arg_list_trimpaired = []
arg_list_renamefiles = []
arg_list_catpaired =[]


for identifier, omic_dict in md_dict.items():
for omic in omic_dict.keys():
r1_file = Path(indir) / omic_dict[omic][0]
if omic == "metaRS":
_run_trim_metars(r1_file, outdir_trimmed)
_rename_files(outdir_trimmed, outdir_cat, identifier, omic)
arg_list_metars.append((r1_file, outdir_trimmed))
arg_list_renamefiles.append((outdir_trimmed, outdir_cat, identifier, omic))
else:
r2_file = Path(indir) / omic_dict[omic][1]
arg_list_trimpaired.append((r1_file, r2_file, outdir_trimmed))
arg_list_catpaired.append((outdir_trimmed, outdir_cat, identifier, omic))

pool_processes(threads, [[run_trim_metars, arg_list_metars],
[run_trim_paired, arg_list_trimpaired]])

pool_processes(threads, [[rename_files, arg_list_renamefiles],
[cat_paired, arg_list_catpaired]])

_run_trim_paired(r1_file, r2_file, outdir_trimmed)
_cat_paired(outdir_trimmed, outdir_cat, identifier, omic)


def _rename_files(indir, outdir, identifier, omic):
Expand All @@ -49,6 +85,7 @@ def _rename_files(indir, outdir, identifier, omic):


def _run_trim_paired(r1_file, r2_file, outdir):

commands = [
"trim_galore",
"--paired",
Expand Down
48 changes: 47 additions & 1 deletion mopp/modules/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import shutil
import subprocess
import os
from multiprocessing import Pool


def create_folder(current_dir):
Expand All @@ -16,6 +17,51 @@ def clear_folder(current_dir):
if item.is_dir():
shutil.rmtree(item)

def pool_processes(num_processes, function_list):
with Pool(processes=num_processes) as pool:
for func in function_list:
pool.map_async(func[0], func[1])
pool.close()
pool.join()

def get_directory_size(directory_path):
total_size = 0
try:
with os.scandir(directory_path) as entries:
for entry in entries:
if entry.is_file():
total_size += entry.stat().st_size
elif entry.is_dir():
total_size += get_directory_size(entry.path)
return total_size
except FileNotFoundError:
return None

def convert_size(size_bytes):
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size_bytes < 1024.0:
return f"{size_bytes:.2f} {unit}"
size_bytes /= 1024.0


def get_available_storage(path="/"):
try:
total, used, free = shutil.disk_usage(path)
return free
except FileNotFoundError:
return None

def check_storage(input, multiplier=2):
input_size = get_directory_size(input)
available_space = get_available_storage()

ideal_space = input_size * multiplier

if ideal_space > available_space:
return convert_size(-(available_space-ideal_space))
else:
return 1


# import time
# import logging
Expand Down
22 changes: 18 additions & 4 deletions mopp/mopp.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from mopp.modules.coverages import calculate_genome_coverages
from mopp.modules.index import genome_extraction
from mopp.modules.features import ft_generation

from mopp.modules.utils import check_storage

logger = logging.getLogger("mopp")
timestamp = time.strftime("%Y-%m-%d_%H-%M-%S")
Expand Down Expand Up @@ -96,7 +96,20 @@ def workflow(
outdir_index_path = outdir_index / f"{prefix}_bt2index" / prefix
outdir_features = Path(output_dir) / "features"

trim_files(input_dir, outdir_trimmed, metadata)

###Storage prediction
storage_check = check_storage(input_dir)

if storage_check != 1:
print("WARNING: There may not be enough storage to complete this workflow. We recommend you free up ", storage_check , " of space.")
user_input = click.prompt("Would you still like to proceed? (Y/N)", type=str)
if user_input.lower() == "n":
return




trim_files(input_dir, outdir_trimmed, metadata, threads)
align_files(
outdir_trimmed, outdir_aligned_metaG, "*metaG*.fq.gz", index, threads
)
Expand All @@ -123,7 +136,8 @@ def workflow(
@click.option("-i", "--input_dir", required=True, help=DESC_INPUT)
@click.option("-o", "--output_dir", required=True, help=DESC_OUTPUT)
@click.option("-m", "--metadata", required=True, help=DESC_MD)
def trim(input_dir, output_dir, metadata):
@click.option("-t", "--threads", default=4, help=DESC_NTHREADS)
def trim(input_dir, output_dir, metadata, threads):
logger.setLevel(logging.INFO)
filer_handler = logging.FileHandler(f"mopp_{timestamp}.log")
filer_handler.setFormatter(formatter)
Expand All @@ -133,7 +147,7 @@ def trim(input_dir, output_dir, metadata):
logger.addHandler(stream_handler)

try:
trim_files(input_dir, output_dir, metadata)
trim_files(input_dir, output_dir, metadata, threads)
except Exception as e:
logger.error(f"An error occurred: {str(e)}", exc_info=True)

Expand Down