diff --git a/src/specimen/classes/reports.py b/src/specimen/classes/reports.py index 418a928..e50d329 100644 --- a/src/specimen/classes/reports.py +++ b/src/specimen/classes/reports.py @@ -86,7 +86,7 @@ def visualise(self, color_palette: str = 'YlGn') -> tuple[matplotlib.figure.Figu (2) matplotlib.figure.Figure: Report for the creation origin. """ - # @TODO maybe change plot type, as with small numbers its barely visibale + # @TODO maybe change plot type, as with small numbers its barely visible def plot_origin(data, color_palette): # create colour gradient diff --git a/src/specimen/cmpb/workflow.py b/src/specimen/cmpb/workflow.py index 5491bec..79875c7 100644 --- a/src/specimen/cmpb/workflow.py +++ b/src/specimen/cmpb/workflow.py @@ -17,6 +17,7 @@ from cobra import Reaction,Model from libsbml import readSBML +import subprocess from refinegems.analysis import growth from refinegems.analysis.investigate import plot_rea_sbo_single @@ -99,11 +100,12 @@ def between_analysis(model: Model, cfg:dict, step:str): if cfg['general']['memote_always_on']: run_memote(model, 'html', return_res=False, - save_res=Path(cfg['general']["dir"], 'cmpb_out', 'misc','memote',f'{step}_.html'), + save_res=Path(cfg['general']["dir"], 'cmpb_out', 'misc','memote',f'{step}.html'), verbose=False) if cfg['general']['stats_always_on']: report = ModelInfoReport(model) - report.save(Path(cfg['general']["dir"],'cmpb_out', 'misc', 'stats',f'{step}_.html'), + Path(dir,"cmpb_out",'misc', 'stats', step).mkdir(parents=True, exist_ok=False) + report.save(Path(cfg['general']["dir"],'cmpb_out', 'misc', 'stats', step), color_palette=config['general']['colours']) @@ -113,7 +115,7 @@ def between_analysis(model: Model, cfg:dict, step:str): # load config # ----------- if not configpath: - config = save_cmpb_user_input(Path('cmpb_out', 'logs', 'config_user.yaml')) + config = save_cmpb_user_input() else: config = validate_config(configpath, 'cmpb') @@ -142,29 +144,28 @@ def between_analysis(model: Model, cfg:dict, step:str): # CarveMe ######### - # @TODO - # will come in a future update if not config['input']['modelpath']: - # run CarveMe - raise ValueError('Currently, CarveMe has not been included in the pipeline. Please use it separatly. This function will be provided in a future update.') - else: - current_modelpath = config['input']['modelpath'] + if config['carveme']['gram'] == "grampos" or config['carveme']['gram'] == "gramneg": + subprocess.run(["carve", config['cm-polish']['protein_fasta'], "--solver", "scip", '-u', config['carveme']['gram'], "-o", dir+"\cmpb_out\models\Draft.xml"]) + else: + subprocess.run(["carve", config['cm-polish']['protein_fasta'], "--solver", "scip", "-o", dir+"\cmpb_out\models\Draft.xml"]) + config['input']['modelpath'] = dir+'\cmpb_out\models\Draft.xml' + current_modelpath = config['input']['modelpath'] # CarveMe correction #################### - - current_libmodel = load_model(current_modelpath,'libsbml') + current_libmodel = load_model(str(current_modelpath),'libsbml') # check, if input is a CarveMe model - if 'CarveMe' in current_libmodel.getNotesString(): + if 'CarveMe' in current_libmodel.getAnnotationString(): Path(dir,"cmpb_out",'misc', 'wrong_annotations').mkdir(parents=True, exist_ok=False) current_libmodel = polish(current_libmodel, - email = config['cm-polish']['email'], - id_db = config['general']['namespace'], - refseq_gff = config['cm-polish']['refseq_gff'], - protein_fasta = config['cm-polish']['protein_fasta'], - lab_strain = config['cm-polish']['is_lab_strain'], - kegg_organism_id = config['cm-polish']['kegg_organism_id'], - path = Path(dir,'cmpb_out','misc','wrong_annotations')) + email = config['cm-polish']['email'], + id_db = config['general']['namespace'], + refseq_gff = config['general']['refseq_gff'], + protein_fasta = config['cm-polish']['protein_fasta'], + lab_strain = config['cm-polish']['is_lab_strain'], + kegg_organism_id = config['general']['kegg_organism_id'], + path = Path(dir,'cmpb_out','misc','wrong_annotations')) # rg correct charges current_libmodel, mult_charges_dict = correct_charges_modelseed(current_libmodel) mult_charges_tab = pd.DataFrame.from_dict(mult_charges_dict, orient='index') @@ -172,15 +173,15 @@ def between_analysis(model: Model, cfg:dict, step:str): # save model if config['general']['save_all_models']: - write_model_to_file(current_libmodel, Path(dir,'cmpb_out','models',f'{current_libmodel.getId()}_after_CarveMe_correction.xml')) + write_model_to_file(current_libmodel, str(Path(dir,'cmpb_out','models',f'{current_libmodel.getId()}_after_CarveMe_correction.xml'))) current_modelpath = Path(dir,'cmpb_out','models',f'{current_libmodel.getId()}_after_CarveMe_correction.xml') else: - write_model_to_file(current_libmodel, only_modelpath) + write_model_to_file(current_libmodel, str(only_modelpath)) current_modelpath = only_modelpath # growth test # ----------- - current_model = load_model(current_modelpath,'cobra') + current_model = load_model(str(current_modelpath),'cobra') between_growth_test(current_model,config,step='after_draft') between_analysis(current_model, config, step='after_draft') @@ -190,7 +191,7 @@ def between_analysis(model: Model, cfg:dict, step:str): # options: automatic/manual extension/manual input if config['gapfilling']['gap_fill_params']: - filename = Path(dir, 'cmpb', 'misc',f'{current_libmodel.getId()}_gap_analysis_{str(today)}') + filename = str(Path(dir, 'cmpb_out', 'misc',f'{current_libmodel.getId()}_gap_analysis_{str(today)}')) gapfilling_perfomed = True match config['gapfilling']['gap_fill_params']['db_to_compare']: @@ -230,10 +231,10 @@ def between_analysis(model: Model, cfg:dict, step:str): # save model if config['general']['save_all_models']: - write_model_to_file(current_libmodel, Path(dir,'cmpb_out','models',f'{current_libmodel.getId()}_autofilled_gaps.xml')) + write_model_to_file(current_libmodel, str(Path(dir,'cmpb_out','models',f'{current_libmodel.getId()}_autofilled_gaps.xml'))) current_modelpath = Path(dir,'cmpb_out','models',f'{current_libmodel.getId()}_autofilled_gaps.xml') else: - write_model_to_file(current_libmodel, current_modelpath) + write_model_to_file(current_libmodel, str(current_modelpath)) logging.info(f'Gaps were filled automatically in {current_libmodel.getId()}.') @@ -242,10 +243,10 @@ def between_analysis(model: Model, cfg:dict, step:str): # save model if config['general']['save_all_models']: - write_model_to_file(current_libmodel, Path(dir,'cmpb_out','models',f'{current_libmodel.getId()}_manually_filled_gaps.xml')) + write_model_to_file(current_libmodel, str(Path(dir,'cmpb_out','models',f'{current_libmodel.getId()}_manually_filled_gaps.xml'))) current_modelpath = Path(dir,'cmpb_out','models',f'{current_libmodel.getId()}_manually_filled_gaps.xml') else: - write_model_to_file(current_libmodel, current_modelpath) + write_model_to_file(current_libmodel, str(current_modelpath)) logging.info(f'Gaps were filled based on a manually curated file in {current_libmodel.getId()}.') @@ -262,37 +263,39 @@ def between_analysis(model: Model, cfg:dict, step:str): # ----------------- if config['kegg_pathway_groups']: current_libmodel, missing_list = kegg_pathways(current_modelpath) - with open(Path(dir, 'cmpb', 'misc', 'reac_wo_kegg_pathway_groups.txt'), 'w') as outfile: + with open(Path(dir, 'cmpb_out', 'misc', 'kegg_pathway', 'reac_wo_kegg_pathway_groups.txt'), 'w') as outfile: for line in missing_list: outfile.write(f'{line}\n') # save model if config['general']['save_all_models']: - write_model_to_file(current_libmodel, Path(dir,'cmpb_out','models',f'{current_libmodel.getId()}_added_KeggPathwayGroups.xml')) + write_model_to_file(current_libmodel, str(Path(dir,'cmpb_out','models',f'{current_libmodel.getId()}_added_KeggPathwayGroups.xml'))) current_modelpath = Path(dir,'cmpb_out','models',f'{current_libmodel.getId()}_added_KeggPathwayGroups.xml') else: - write_model_to_file(current_libmodel, current_modelpath) + write_model_to_file(current_libmodel, str(current_modelpath)) # SBOannotator # ------------ # @TODO # theoretically: something along the way: - libsbml_doc = readSBML(current_modelpath) - libsbml_model = libsbml_doc.getModel() + #libsbml_doc = readSBML(current_modelpath) + #libsbml_model = libsbml_doc.getModel() + """ current_libmodel = load_model(str(current_modelpath),'libsbml') if config['general']['save_all_models']: - libsbml_model = run_SBOannotator(libsbml_model) - write_model_to_file(libsbml_model,Path(dir,'cmpb','models', 'SBOannotated.xml')) - current_modelpath = Path(dir,'cmpb','models', 'SBOannotated.xml') + current_libmodel = run_SBOannotator(current_libmodel) + write_model_to_file(current_libmodel, str(Path(dir,'cmpb_out','models', 'SBOannotated.xml'))) + current_modelpath = Path(dir,'cmpb_out','models', 'SBOannotated.xml') else: - libsbml_model = run_SBOannotator(libsbml_model) - write_model_to_file(current_modelpath) + current_libmodel = run_SBOannotator(current_libmodel) + write_model_to_file(current_libmodel, str(current_modelpath)) + current_model = load_model(str(current_modelpath),'cobra') between_analysis(current_model,config,step='after_annotation') - + """ # model cleanup ############### - current_model = load_model(current_modelpath) - + current_model = load_model(str(current_modelpath), 'cobra') + # duplicates # ---------- match config['duplicates']['reactions']: @@ -338,16 +341,16 @@ def between_analysis(model: Model, cfg:dict, step:str): # save model if config['general']['save_all_models']: - write_model_to_file(current_model, Path(dir,'cmpb_out','models',f'{current_model.getId()}_after_duplicate_removal.xml')) - current_modelpath = Path(dir,'cmpb_out','models',f'{current_model.getId()}_after_duplicate_removal.xml') + write_model_to_file(current_model, str(Path(dir,'cmpb_out','models',f'{current_model.id}_after_duplicate_removal.xml'))) + current_modelpath = Path(dir,'cmpb_out','models',f'{current_model.id}_after_duplicate_removal.xml') else: - write_model_to_file(current_model, only_modelpath) + write_model_to_file(current_model, str(only_modelpath)) current_modelpath = only_modelpath # BOF # --- # BOFdat - optional - current_model = load_model(current_modelpath,'cobra') + current_model = load_model(str(current_modelpath),'cobra') if config['BOF']['run_bofdat']: check_bof = test_biomass_presence(current_model) if check_bof: @@ -367,7 +370,6 @@ def between_analysis(model: Model, cfg:dict, step:str): dna_weight_fraction = config['BOF']['bofdat_params']['dna_weight_fraction'], weight_frac = config['BOF']['bofdat_params']['weight_fraction']) current_model = check_normalise_biomass(current_model) - # check and normalise else: current_model = check_normalise_biomass(current_model) @@ -375,32 +377,33 @@ def between_analysis(model: Model, cfg:dict, step:str): # save # save model if config['general']['save_all_models']: - write_model_to_file(current_model, Path(dir,'cmpb_out','models',f'{current_model.getId()}_after_BOF.xml')) - current_modelpath = Path(dir,'cmpb_out','models',f'{current_model.getId()}_after_BOF.xml') + write_model_to_file(current_model, str(Path(dir,'cmpb_out','models',f'{current_model.id}_after_BOF.xml'))) + current_modelpath = Path(dir,'cmpb_out','models',f'{current_model.id}_after_BOF.xml') else: - write_model_to_file(current_model, current_modelpath) + write_model_to_file(current_model, str(current_modelpath)) # MCC # --- - current_model = perform_mcc(current_model, Path(dir,'cmpb','misc','mcc'),apply=True) + current_model = perform_mcc(current_model, Path(dir,'cmpb_out','misc','mcc'),apply=True) # save the final model if config['general']['save_all_models']: - write_model_to_file(current_model, Path(dir,'cmpb_out','models',f'{current_model.getId()}_final_model.xml')) - current_modelpath = Path(dir,'cmpb_out','models',f'{current_model.getId()}_final_model.xml') + write_model_to_file(current_model, str(Path(dir,'cmpb_out','models',f'{current_model.id}_final_model.xml'))) + current_modelpath = Path(dir,'cmpb_out','models',f'{current_model.id}_final_model.xml') else: - write_model_to_file(current_model, only_modelpath) + write_model_to_file(current_model, str(only_modelpath)) current_modelpath = only_modelpath # analysis ########## - current_model = load_model(current_modelpath,'cobra') - current_libmodel = load_model(current_modelpath,'libsbml') + current_model = load_model(str(current_modelpath),'cobra') + current_libmodel = load_model(str(current_modelpath),'libsbml') # stats # ----- stats_report = ModelInfoReport(current_model) - stats_report.save(Path(dir,'cmpb_out', 'misc','stats'), + Path(dir,'cmpb_out', 'misc','stats','final').mkdir(parents=True, exist_ok=False) + stats_report.save(Path(dir,'cmpb_out', 'misc','stats','final'), color_palette=config['general']['colours']) # kegg pathway @@ -426,7 +429,7 @@ def between_analysis(model: Model, cfg:dict, step:str): # ------------ media_list = growth.read_media_config(config['input']['mediapath']) auxo_report = growth.test_auxotrophies(current_model, media_list[0], media_list[1], config['general']['namespace']) - auxo_report.save(Path(dir,'cmpb_out','misc','auxotrophies'), + auxo_report.save(Path(dir,'cmpb_out','misc','auxotrophy'), color_palette=config['general']['colours']) diff --git a/src/specimen/data/config/cmpb_config.yaml b/src/specimen/data/config/cmpb_config.yaml index 834ee0c..6bb5921 100644 --- a/src/specimen/data/config/cmpb_config.yaml +++ b/src/specimen/data/config/cmpb_config.yaml @@ -39,6 +39,14 @@ general: # Part-specific options # --------------------- +# Build a model using CarveMe +carveme: + run_carve: False + # if CarveMe should be run, + # fill out the params below + protein_fasta: USER # FASTA used for CarveMe + gram: USER # grampos or gramneg? + # Polish a CarveMe model # Only neccessary, if the model will or has been build with CarveMe # Will only be used, if model is indeed a CarveMe model @@ -91,4 +99,4 @@ BOF: bofdat_params: full_genome_sequence: USER # Whole genome sequence dna_weight_fraction: USER # DNA weight fraction for the organism - weight_fraction: USER # Ezyme/ion weight fractions for the organism + weight_fraction: USER # Ezyme/ion weight fractions for the organism \ No newline at end of file diff --git a/src/specimen/util/set_up.py b/src/specimen/util/set_up.py index c60f1c9..6d3a684 100644 --- a/src/specimen/util/set_up.py +++ b/src/specimen/util/set_up.py @@ -33,9 +33,8 @@ HQTB_CONFIG_PATH_REQUIRED = ['annotated_genome','full_sequence','model','diamond', 'mnx_chem_prop', 'mnx_chem_xref','mnx_reac_prop','mnx_reac_xref', 'media_analysis'] #: :meta: -CMPB_CONFIG_PATHS_REQUIRED = ['annotated_genome','mediapath','modelpath','refseq_gff','protein_fasta', - 'biocyc_files','full_genome_sequence'] #: :meta: -CMPB_CONFIG_PATHS_OPTIONAL = [] # :meta: +CMPB_CONFIG_PATHS_REQUIRED = ['mediapath','protein_fasta','refseq_gff','annotated_genome'] #: :meta: +CMPB_CONFIG_PATHS_OPTIONAL = ['modelpath','biocyc_files','full_genome_sequence'] # :meta: PIPELINE_PATHS_OPTIONAL = {'hqtb':HQTB_CONFIG_PATH_OPTIONAL, 'cmpb':CMPB_CONFIG_PATHS_OPTIONAL} #: :meta: PIPELINE_PATHS_REQUIRED = {'hqtb':HQTB_CONFIG_PATH_REQUIRED, @@ -260,7 +259,7 @@ def dict_recursive_overwrite(dictA:dict, key:str=None) -> dict: raise TypeError(F'Missing a required argument in the config file ({key}).') elif dictA == 'USER': # @TODO - mes = F'Keyword USER detected in config ({key}). Either due to skipped options or missing required information.\nReminder: this may lead to downstream problems.' + mes = F'Keyword USER detected in config ({key}). Either due to skipped options or missing required information.\nReminder: This may lead to downstream problems.' logging.warning(mes) return None else: @@ -295,8 +294,8 @@ def dict_recursive_check(dictA:dict, key:str=None, # required file paths if key in PIPELINE_PATHS_REQUIRED[pipeline]: if isinstance(dictA,list): - for item in dictA: - if os.path.isfile(item): + for entry in dictA: + if os.path.isfile(entry): continue else: raise FileNotFoundError(F'Path does not exist: {dictA}') @@ -309,17 +308,22 @@ def dict_recursive_check(dictA:dict, key:str=None, if isinstance(dictA,str): if os.path.isfile(dictA): return - else: + elif not os.path.isfile(dictA): + mes = F'Path does not exist: {dictA}. \nReminder: It is optional, but it may lead to downstream problems.' + logging.warning(mes) + pass + else: raise FileNotFoundError(F'Path does not exist: {dictA}') if isinstance(dictA,list): for entry in dictA: - if dictA and os.path.isfile(dictA): + if entry and os.path.isfile(entry): return - elif not dictA: - # @TODO + elif not os.path.isfile(entry): + mes = F'Path does not exist: {entry}. \nReminder: It is optional, but it may lead to downstream problems.' + logging.warning(mes) pass else: - raise FileNotFoundError(F'Path does not exist: {dictA}') + raise FileNotFoundError(F'Path does not exist: {entry}') elif key in PIPELINE_DIR_PATHS: if dictA and os.path.exists(dictA): return @@ -453,7 +457,7 @@ def save_cmpb_user_input(configpath:Union[str,None]=None) -> dict: config['general']['memote_always_on'] = False # run stats always y/n - models_stats = click.prompt('Do you want to run memote after each step?', type=click.Choice(['y','n']), show_choices=True) + models_stats = click.prompt('Do you want to run stats after each step?', type=click.Choice(['y','n']), show_choices=True) match models_stats: case 'y': config['general']['stats_always_on'] = True @@ -464,8 +468,8 @@ def save_cmpb_user_input(configpath:Union[str,None]=None) -> dict: refseq = click.prompt('If you want to run a gap analysis with KEGG or have a CarveMe model, please enter the path to your refseq gff file', type=click.Path(exists=True)) config['general']['refseq_organism_id'] = refseq - kegg_org_id = click.prompt('If you want to run a gap analysis with KEGG, please enter the KEGG organism ID', type=click.Path(exists=True)) - config['general']['refseq_organism_id'] = kegg_org_id + kegg_org_id = click.prompt('If you want to run a gap analysis with KEGG, please enter the KEGG organism ID') + config['general']['kegg_organism_id'] = kegg_org_id # part-specific # ------------- @@ -473,6 +477,18 @@ def save_cmpb_user_input(configpath:Union[str,None]=None) -> dict: print('Part-specific options') print('------------') + # CarveMe + carve = click.prompt('Do you want to build a model using CarveMe', type=click.Choice(['y','n']), show_choices=True) + match carve: + case 'y': + config['carveme']['run_carve'] = True + prot_fa = click.prompt('Please enter the path to your protein fasta file', type=click.Path(exists=True)) + config['carveme']['protein_fasta'] = prot_fa + gram = click.prompt('Do you want to use a template specialized for gram-positive or gram-negative bacteria?', type=click.Choice(['grampos','gramneg','None']), show_choices=True) + config['carveme']['gram'] = gram + case 'n': + config['carveme']['run_carve'] = False + # model polish carveme = click.prompt('Is your draft model CarveMe-based or will CarveMe be run?', type=click.Choice(['y','n']), show_choices=True) if carveme == 'y': @@ -509,10 +525,14 @@ def save_cmpb_user_input(configpath:Union[str,None]=None) -> dict: Path3 = click.prompt('Enter path to protein FASTA file used as input for CarveMe', type=click.Path(exists=True)) config['gapfilling']['gap_fill_params']['biocyc_files'] = [Path0, Path1, Path2, Path3] -# @TODO: Funktioniert das so? Bei den anderen yes-no-Fragen hast du noch alles auf True oder False gesetzt. +# @TODO: Funktioniert das so? Bei den anderen yes-no-Fragen hast du noch alles auf True oder False gesetzt. -> Habs geändert, weil sonst steht in der config n und es wird trotzdem durchgeführt # kegg pathways as groups kegg_pw_groups = click.prompt('Do you want to add KEGG pathways as groups to the model?', type=click.Choice(['y','n']), show_choices=True) - config['kegg_pathway_groups'] = kegg_pw_groups + match kegg_pw_groups: + case 'y': + config['kegg_pathway_groups'] = True + case 'n': + config['kegg_pathway_groups'] = False # resolve duplicates @@ -542,7 +562,6 @@ def save_cmpb_user_input(configpath:Union[str,None]=None) -> dict: case 'n': config['BOF']['run_bofdat'] = False - # save config if configpath: pass