From 08aaf031bbf5097de8a493d71b49dc5ef49926e5 Mon Sep 17 00:00:00 2001 From: Jessica Rowell Date: Sun, 12 Jan 2025 14:20:37 -0500 Subject: [PATCH 01/15] fix submission_report.csv to have a header and to correctly get target_db data --- bin/submission_new.py | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/bin/submission_new.py b/bin/submission_new.py index 86057166..bbfa74ea 100755 --- a/bin/submission_new.py +++ b/bin/submission_new.py @@ -54,7 +54,6 @@ def submission_main(): # Get all parameters from argparse parameters_class = GetParams() parameters = parameters_class.parameters - # Get list of all databases to submit to (or update) databases = [db for db in parameters if parameters[db] and db in ['biosample', 'sra', 'genbank', 'gisaid']] @@ -65,7 +64,10 @@ def submission_main(): config_dict = config_parser.load_config() # Read in metadata file - metadata_df = pd.read_csv(parameters['metadata_file'], sep='\t') + try: + metadata_df = pd.read_csv(parameters['metadata_file'], sep='\t') + except Exception as e: + raise ValueError(f"Failed to load metadata file: {parameters['metadata_file']}. Error: {e}") # Initialize the Sample object with parameters from argparse sample = Sample( @@ -358,16 +360,21 @@ def parse_report_xml(self, report_path): db = action.attrib.get('target_db') status = action.attrib.get('status') accession = action.attrib.get('accession') - message = action.find('Message').text if action.find('Message') is not None else "" - if db == 'biosample': + message = None + response = action.find('Response') + if response is not None: + message_element = response.find('Message') + if message_element is not None: + message = message_element.text + if db == 'BioSample': report_dict['biosample_status'] = status report_dict['biosample_accession'] = accession report_dict['biosample_message'] = message - elif db == 'sra': + elif db == 'SRA': report_dict['sra_status'] = status report_dict['sra_accession'] = accession report_dict['sra_message'] = message - elif db == 'genbank': + elif db == 'GenBank': report_dict['genbank_status'] = status if status == 'processed-ok': # Handle Genbank-specific logic (AccessionReport.tsv) @@ -378,9 +385,10 @@ def parse_report_xml(self, report_path): report_dict['genbank_message'] = message return report_dict def save_report_to_csv(self, report_dict, csv_file): + write_header = not os.path.exists(csv_file) or os.stat(csv_file).st_size == 0 with open(csv_file, 'a', newline='') as f: writer = csv.DictWriter(f, fieldnames=report_dict.keys()) - if not os.path.isfile(csv_file): + if write_header: writer.writeheader() # todo: need to use pandas to do this probably, not all keys are being written to the file writer.writerow(report_dict) print(f"Submission report saved to {csv_file}") From f14733ef2b7c2f27ad5f2f53b77942ac676079ab Mon Sep 17 00:00:00 2001 From: Jessica Rowell Date: Mon, 13 Jan 2025 23:55:24 -0500 Subject: [PATCH 02/15] initial attempt to add an update_submissions workflow to resubmit metadata --- bin/submission_new.py | 86 ++++++++++++++++++++++-- modules/local/initial_submission/main.nf | 3 +- subworkflows/local/submission.nf | 29 +++++--- 3 files changed, 102 insertions(+), 16 deletions(-) diff --git a/bin/submission_new.py b/bin/submission_new.py index bbfa74ea..b82ad227 100755 --- a/bin/submission_new.py +++ b/bin/submission_new.py @@ -48,6 +48,21 @@ def fetch_and_parse_report(submission_object, client, submission_id, submission_ print(f"No report found for submission {submission_id}") return None +def get_accessions(sample, report_df): + """ Returns a dict with available accessions for the input sample + """ + accessions = { + 'biosample': None, + 'sra': None, + 'genbank': None + } + sample_row = report_df[report_df['submission_name'] == sample] + if not sample_row.empty: + accessions['biosample'] = sample_row['biosample_accession'].values[0] if 'biosample_accession' in sample_row else None + accessions['sra'] = sample_row['sra_accession'].values[0] if 'sra_accession' in sample_row else None + accessions['genbank'] = sample_row['genbank_accession'].values[0] if 'genbank_accession' in sample_row else None + return accessions + def submission_main(): """ Main for initiating submission steps """ @@ -128,7 +143,7 @@ def submission_main(): if parameters['send_email']: genbank_submission.sendemail() - elif parameters['update']: + elif parameters['fetch']: start_time = time.time() timeout = 60 # time out after 60 seconds report_fetched = False # Flag to indicate if a report has been fetched @@ -153,6 +168,44 @@ def submission_main(): else: # If the while loop completes without a successful fetch print("Timeout occurred while trying to fetch and parse the report.") + + elif parameters['update']: + # Call and run the update submission script + #update_submission.submission_main() + # Load the report file + report_file = parameters["submission_report"] + try: + report_df = pd.read_csv(report_file) + except Exception as e: + raise ValueError(f"Failed to load CSV file: {report_file}. Error: {e}") + accessions_dict = get_accessions(sample.sample_id, report_df) + + # Prepare all submissions with the accessions + if accessions_dict['biosample'] and parameters['biosample'] and 'biosample' not in databases_to_skip: + biosample_submission = BiosampleSubmission(sample, parameters, config_dict, metadata_df, f"{parameters['output_dir']}/{parameters['submission_name']}/biosample", + parameters['submission_mode'], submission_dir, 'biosample', accessions_dict['biosample']) + if accessions_dict['sra'] and parameters['sra'] and 'sra' not in databases_to_skip: + sra_submission = SRASubmission(sample, parameters, config_dict, metadata_df, f"{parameters['output_dir']}/{parameters['submission_name']}/sra", + parameters['submission_mode'], submission_dir, 'sra', accessions_dict['sra']) + if accessions_dict['genbank'] and parameters['genbank'] and 'genbank' not in databases_to_skip: + genbank_submission = GenbankSubmission(sample, parameters, config_dict, metadata_df, f"{parameters['output_dir']}/{parameters['submission_name']}/genbank", + parameters['submission_mode'], submission_dir, 'genbank', accessions_dict['genbank']) + # Submit all prepared submissions and fetch report once + if parameters['biosample'] and 'biosample' not in databases_to_skip: + biosample_submission.submit() + if parameters['sra'] and 'sra' not in databases_to_skip: + sra_submission.submit() + if parameters['genbank'] and 'genbank' not in databases_to_skip: + # If user is submitting via FTP + if sample.ftp_upload: + genbank_submission.prepare_files_ftp_submission() # Prep files and run table2asn + genbank_submission.submit() + else: + # Otherwise, prepare manual submission + genbank_submission.prepare_files_manual_submission() # Prep files and run table2asn + # Send email if the user requests it + if parameters['send_email']: + genbank_submission.sendemail() class GetParams: """ Class constructor for getting all necessary parameters (input args from argparse and hard-coded ones) @@ -179,9 +232,11 @@ def get_args(): parser.add_argument("--submission_name", help='Name of the submission', required=True) parser.add_argument("--config_file", help="Name of the submission onfig file", required=True) parser.add_argument("--metadata_file", help="Name of the validated metadata tsv file", required=True) + parser.add_argument("--submission_report", help="Path to submission report csv file", required=False, default="submission_report.csv") parser.add_argument("--species", help="Type of organism data", required=True) parser.add_argument('--submit', action='store_true', help='Run the full submission process') - parser.add_argument('--update', action='store_true', help='Run the update process to fetch and parse report') + parser.add_argument('--fetch', action='store_true', help='Run the process to fetch and parse report') + parser.add_argument('--update', action='store_true', help='Run the update process to submit new data') # optional parameters parser.add_argument("-o", "--output_dir", type=str, default='submission_outputs', help="Output Directory for final Files, default is current directory") @@ -272,6 +327,9 @@ def validate_files(self): if missing_files: missing_files_per_database['genbank'] = missing_files return missing_files_per_database + # Function to add accession Ids to the sample info once assigned + def add_accession_id(self, accession_id): + self.accession_ids = accession_id class MetadataParser: def __init__(self, metadata_df, parameters): @@ -588,7 +646,7 @@ def add_attributes_block(self, submission): pass class BiosampleSubmission(XMLSubmission, Submission): - def __init__(self, sample, parameters, submission_config, metadata_df, output_dir, submission_mode, submission_dir, type): + def __init__(self, sample, parameters, submission_config, metadata_df, output_dir, submission_mode, submission_dir, type, accession_id = None): # Properly initialize the base classes XMLSubmission.__init__(self, sample, submission_config, metadata_df, output_dir, parameters) Submission.__init__(self, sample, parameters, submission_config, output_dir, submission_mode, submission_dir, type) @@ -596,6 +654,7 @@ def __init__(self, sample, parameters, submission_config, metadata_df, output_di parser = MetadataParser(metadata_df, parameters) self.top_metadata = parser.extract_top_metadata() self.biosample_metadata = parser.extract_biosample_metadata() + self.accession_id = accession_id os.makedirs(self.output_dir, exist_ok=True) # Generate the BioSample XML upon initialization self.xml_output_path = self.create_xml(output_dir) @@ -625,10 +684,20 @@ def add_action_block(self, submission): organismName = ET.SubElement(organism, 'OrganismName') organismName.text = self.safe_text(self.biosample_metadata['organism']) bioproject = ET.SubElement(biosample, 'BioProject') - primaryID = ET.SubElement(bioproject, 'PrimaryId') - primaryID.text = self.safe_text(self.top_metadata['ncbi-bioproject']) + primary_id = ET.SubElement(bioproject, 'PrimaryId') + primary_id.text = self.safe_text(self.top_metadata['ncbi-bioproject']) bs_package = ET.SubElement(biosample, 'Package') bs_package.text = self.safe_text(self.submission_config['BioSample_package']) + # Add conditional block for accession_id + if self.accession_id: + attribute_ref_id = ET.SubElement(biosample, 'AttributeRefId') + ref_id = ET.SubElement(attribute_ref_id, 'RefId') + primary_id = ET.SubElement(ref_id, 'PrimaryId', {'db': 'BioProject'}) + primary_id.text = self.safe_text(self.top_metadata['ncbi-bioproject']) + attribute_ref_id = ET.SubElement(biosample, 'AttributeRefId') + ref_id = ET.SubElement(attribute_ref_id, 'RefId') + primary_id = ET.SubElement(ref_id, 'PrimaryId', {'db': 'BioSample'}) + primary_id.text = self.accession_id def add_attributes_block(self, submission): biosample = submission.find(".//BioSample") attributes = ET.SubElement(biosample, 'Attributes') @@ -652,7 +721,7 @@ def update_report(self): self.fetch_report() class SRASubmission(XMLSubmission, Submission): - def __init__(self, sample, parameters, submission_config, metadata_df, output_dir, submission_mode, submission_dir, type): + def __init__(self, sample, parameters, submission_config, metadata_df, output_dir, submission_mode, submission_dir, type, accession_id = None): # Properly initialize the base classes XMLSubmission.__init__(self, sample, submission_config, metadata_df, output_dir, parameters) Submission.__init__(self, sample, parameters, submission_config, output_dir, submission_mode, submission_dir, type) @@ -660,6 +729,7 @@ def __init__(self, sample, parameters, submission_config, metadata_df, output_di parser = MetadataParser(metadata_df, parameters) self.top_metadata = parser.extract_top_metadata() self.sra_metadata = parser.extract_sra_metadata() + self.accession_id = accession_id os.makedirs(self.output_dir, exist_ok=True) # Generate the BioSample XML upon initialization self.xml_output_path = self.create_xml(output_dir) @@ -694,6 +764,7 @@ def add_attributes_block(self, submission): identifier = ET.SubElement(add_files, 'Identifier') identifier_spuid = ET.SubElement(identifier, 'SPUID', {'spuid_namespace': f"{spuid_namespace_value}_SRA"}) identifier_spuid.text = self.safe_text(self.top_metadata['ncbi-spuid']) + # todo: add attribute ref ID for BioSample def submit(self): # Create submit.ready file (without using Posix object because all files_to_submit need to be same type) @@ -714,7 +785,7 @@ def update_report(self): self.fetch_report() class GenbankSubmission(XMLSubmission, Submission): - def __init__(self, sample, parameters, submission_config, metadata_df, output_dir, submission_mode, submission_dir, type): + def __init__(self, sample, parameters, submission_config, metadata_df, output_dir, submission_mode, submission_dir, type, accession_id = None): # Properly initialize the base classes XMLSubmission.__init__(self, sample, submission_config, metadata_df, output_dir, parameters) Submission.__init__(self, sample, parameters, submission_config, output_dir, submission_mode, submission_dir, type) @@ -723,6 +794,7 @@ def __init__(self, sample, parameters, submission_config, metadata_df, output_di self.top_metadata = parser.extract_top_metadata() self.genbank_metadata = parser.extract_genbank_metadata() self.biosample_metadata = parser.extract_biosample_metadata() + self.accession_id = accession_id os.makedirs(self.output_dir, exist_ok=True) # Generate the GenBank XML upon initialization only if sample.ftp_upload is True if self.sample.ftp_upload: diff --git a/modules/local/initial_submission/main.nf b/modules/local/initial_submission/main.nf index 0caad752..43a6de21 100644 --- a/modules/local/initial_submission/main.nf +++ b/modules/local/initial_submission/main.nf @@ -15,6 +15,7 @@ process SUBMISSION { input: tuple val(meta), path(validated_meta_path), path(fasta_path), path(fastq_1), path(fastq_2), path(annotations_path) path submission_config + val submission_mode // define the command line arguments based on the value of params.submission_test_or_prod, params.send_submission_email def test_flag = params.submission_prod_or_test == 'test' ? '--test' : '' @@ -26,7 +27,7 @@ process SUBMISSION { script: """ submission_new.py \ - --submit \ + --$submission_mode \ --submission_name $meta.id \ --config_file $submission_config \ --metadata_file $validated_meta_path \ diff --git a/subworkflows/local/submission.nf b/subworkflows/local/submission.nf index 833a0515..7232c244 100644 --- a/subworkflows/local/submission.nf +++ b/subworkflows/local/submission.nf @@ -7,7 +7,7 @@ */ include { SUBMISSION } from '../../modules/local/initial_submission/main' -include { UPDATE_SUBMISSION } from '../../modules/local/update_submission/main' +//include { UPDATE_SUBMISSION } from '../../modules/local/update_submission/main' include { WAIT } from '../../modules/local/general_util/wait/main' include { MERGE_UPLOAD_LOG } from "../../modules/local/general_util/merge_upload_log/main" @@ -18,15 +18,28 @@ workflow INITIAL_SUBMISSION { wait_time main: - // submit the files to database of choice (after fixing config and getting wait time) - SUBMISSION ( submission_ch, submission_config ) - - // actual process to initiate wait - WAIT ( SUBMISSION.out.submission_files.collect(), wait_time ) + if ( params.update_submission == false ) { + // submit the files to database of choice + SUBMISSION ( submission_ch, submission_config, Channel.of("submit") ) + + // actual process to initiate wait + WAIT ( SUBMISSION.out.submission_files.collect(), wait_time ) - // process for updating the submitted samples - UPDATE_SUBMISSION ( WAIT.out, submission_ch, submission_config ) + // try to fetch & parse the report.xml + // todo: need to incorporate WAIT.out somehow + SUBMISSION ( submission_ch, submission_config, Channel.of("fetch") ) + } + // if params.update_submission is true, update an existing submission + else if ( params.update_submission == true ) { + // process for updating the submitted samples + SUBMISSION ( submission_ch, submission_config, Channel.of("update") ) + + // try to fetch & parse the report.xml + // todo: need to incorporate WAIT.out somehow + SUBMISSION ( submission_ch, submission_config, Channel.of("fetch") ) + } + emit: submission_files = UPDATE_SUBMISSION.out.submission_files //submission_log = UPDATE_SUBMISSION.out.submission_log From 33e8980f4b3239264ba735d1cdd7f62533cc6602 Mon Sep 17 00:00:00 2001 From: Jessica Rowell Date: Tue, 14 Jan 2025 00:19:40 -0500 Subject: [PATCH 03/15] put submission pieces into separate modules, can't call same module more than once --- modules/local/fetch_submission/main.nf | 49 ++++++++++++++++++++++++ modules/local/initial_submission/main.nf | 3 +- modules/local/update_submission/main.nf | 1 - subworkflows/local/submission.nf | 16 ++++---- 4 files changed, 59 insertions(+), 10 deletions(-) create mode 100644 modules/local/fetch_submission/main.nf diff --git a/modules/local/fetch_submission/main.nf b/modules/local/fetch_submission/main.nf new file mode 100644 index 00000000..fb31fce1 --- /dev/null +++ b/modules/local/fetch_submission/main.nf @@ -0,0 +1,49 @@ +/* +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + FETCH SUBMISSION +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +*/ +process FETCH_SUBMISSION { + + publishDir "$params.output_dir/$params.submission_output_dir", mode: 'copy', overwrite: params.overwrite_output + + conda (params.enable_conda ? params.env_yml : null) + container "${ workflow.containerEngine == 'singularity' && !task.ext.singularity_pull_docker_container ? + 'staphb/tostadas:latest' : 'staphb/tostadas:latest' }" + + input: + val wait_time + tuple val(meta), path(validated_meta_path), path(fasta_path), path(fastq_1), path(fastq_2), path(annotations_path) + path submission_config + + // define the command line arguments based on the value of params.submission_test_or_prod, params.send_submission_email + def test_flag = params.submission_prod_or_test == 'test' ? '--test' : '' + def send_submission_email = params.send_submission_email == true ? '--send_email' : '' + def biosample = params.biosample == true ? '--biosample' : '' + def sra = params.sra == true ? '--sra' : '' + def genbank = params.genbank == true ? '--genbank' : '' + + script: + """ + submission_new.py \ + --fetch \ + --submission_name $meta.id \ + --config_file $submission_config \ + --metadata_file $validated_meta_path \ + --species $params.species \ + --output_dir . \ + ${fasta_path ? "--fasta_file $fasta_path" : ""} \ + ${annotations_path ? "--annotation_file $annotations_path" : ""} \ + ${fastq_1 ? "--fastq1 $fastq_1" : ""} \ + ${fastq_2 ? "--fastq2 $fastq_2" : ""} \ + --custom_metadata_file $params.custom_fields_file \ + --submission_mode $params.submission_mode \ + $test_flag \ + $send_submission_email \ + $genbank $sra $biosample + + """ + output: + path "${validated_meta_path.getBaseName()}", emit: submission_files + path "*.csv", emit: submission_log +} \ No newline at end of file diff --git a/modules/local/initial_submission/main.nf b/modules/local/initial_submission/main.nf index 43a6de21..0caad752 100644 --- a/modules/local/initial_submission/main.nf +++ b/modules/local/initial_submission/main.nf @@ -15,7 +15,6 @@ process SUBMISSION { input: tuple val(meta), path(validated_meta_path), path(fasta_path), path(fastq_1), path(fastq_2), path(annotations_path) path submission_config - val submission_mode // define the command line arguments based on the value of params.submission_test_or_prod, params.send_submission_email def test_flag = params.submission_prod_or_test == 'test' ? '--test' : '' @@ -27,7 +26,7 @@ process SUBMISSION { script: """ submission_new.py \ - --$submission_mode \ + --submit \ --submission_name $meta.id \ --config_file $submission_config \ --metadata_file $validated_meta_path \ diff --git a/modules/local/update_submission/main.nf b/modules/local/update_submission/main.nf index 844b1158..db59c979 100644 --- a/modules/local/update_submission/main.nf +++ b/modules/local/update_submission/main.nf @@ -12,7 +12,6 @@ process UPDATE_SUBMISSION { 'staphb/tostadas:latest' : 'staphb/tostadas:latest' }" input: - val wait_time tuple val(meta), path(validated_meta_path), path(fasta_path), path(fastq_1), path(fastq_2), path(annotations_path) path submission_config diff --git a/subworkflows/local/submission.nf b/subworkflows/local/submission.nf index 7232c244..72302e86 100644 --- a/subworkflows/local/submission.nf +++ b/subworkflows/local/submission.nf @@ -7,7 +7,8 @@ */ include { SUBMISSION } from '../../modules/local/initial_submission/main' -//include { UPDATE_SUBMISSION } from '../../modules/local/update_submission/main' +include { FETCH_SUBMISSION } from '../../modules/local/fetch_submission/main' +include { UPDATE_SUBMISSION } from '../../modules/local/update_submission/main' include { WAIT } from '../../modules/local/general_util/wait/main' include { MERGE_UPLOAD_LOG } from "../../modules/local/general_util/merge_upload_log/main" @@ -20,29 +21,30 @@ workflow INITIAL_SUBMISSION { main: if ( params.update_submission == false ) { // submit the files to database of choice - SUBMISSION ( submission_ch, submission_config, Channel.of("submit") ) + SUBMISSION ( submission_ch, submission_config ) // actual process to initiate wait WAIT ( SUBMISSION.out.submission_files.collect(), wait_time ) // try to fetch & parse the report.xml // todo: need to incorporate WAIT.out somehow - SUBMISSION ( submission_ch, submission_config, Channel.of("fetch") ) + // todo: this maybe doesn't need to take all the inputs from submission (or maybe doesn't need to be a separate module) + FETCH_SUBMISSION ( WAIT.out, submission_ch, submission_config ) } // if params.update_submission is true, update an existing submission else if ( params.update_submission == true ) { // process for updating the submitted samples - SUBMISSION ( submission_ch, submission_config, Channel.of("update") ) + UPDATE_SUBMISSION ( submission_ch, submission_config ) // try to fetch & parse the report.xml // todo: need to incorporate WAIT.out somehow - SUBMISSION ( submission_ch, submission_config, Channel.of("fetch") ) + FETCH_SUBMISSION ( WAIT.out, submission_ch, submission_config ) } emit: - submission_files = UPDATE_SUBMISSION.out.submission_files - //submission_log = UPDATE_SUBMISSION.out.submission_log + submission_files = SUBMISSION.out.submission_files + //submission_log = SUBMISSION.out.submission_log //to do: add GISAID module } From 83072f50349c83f5412faf61f54a8848d7e75204 Mon Sep 17 00:00:00 2001 From: Jessica Rowell Date: Tue, 14 Jan 2025 13:33:11 -0500 Subject: [PATCH 04/15] Fix error in the SRA submission.xml Attributes heading --- bin/submission_new.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/bin/submission_new.py b/bin/submission_new.py index b82ad227..d77a7eb7 100755 --- a/bin/submission_new.py +++ b/bin/submission_new.py @@ -747,10 +747,9 @@ def add_action_block(self, submission): def add_attributes_block(self, submission): add_files = submission.find(".//AddFiles") - attributes = ET.SubElement(add_files, 'Attributes') for attr_name, attr_value in self.sra_metadata.items(): if attr_value != "Not Provided": - attribute = ET.SubElement(attributes, 'Attribute', {'attribute_name': attr_name}) + attribute = ET.SubElement(add_files, 'Attribute', {'name': attr_name}) attribute.text = self.safe_text(attr_value) spuid_namespace_value = self.safe_text(self.top_metadata['ncbi-spuid_namespace']) attribute_ref_id_bioproject = ET.SubElement(add_files, "AttributeRefId", name="BioProject") From 79012303f6f7fdfed9aca569e36d6f756e991763 Mon Sep 17 00:00:00 2001 From: Jessica Rowell Date: Wed, 15 Jan 2025 14:29:20 -0500 Subject: [PATCH 05/15] a lot of code rearrangement and simplification to correct the submission report --- bin/submission_new.py | 158 ++++++++++++++++++++---------------------- 1 file changed, 75 insertions(+), 83 deletions(-) diff --git a/bin/submission_new.py b/bin/submission_new.py index d77a7eb7..8cf757c8 100755 --- a/bin/submission_new.py +++ b/bin/submission_new.py @@ -24,30 +24,6 @@ from email.mime.multipart import MIMEMultipart from email.mime.application import MIMEApplication -def fetch_and_parse_report(submission_object, client, submission_id, submission_dir, output_dir, type): - # Connect to the FTP/SFTP client - client.connect() - client.change_dir('submit') # Change to 'submit' directory - client.change_dir(submission_dir) # Change to Test or Prod - client.change_dir(f"{submission_id}_{type}") # Change to sample-specific directory - # Check if report.xml exists and download it - report_file = "report.xml" - if client.file_exists(report_file): - print(f"Report found at {report_file}") - report_local_path = os.path.join(output_dir, 'report.xml') - client.download_file(report_file, report_local_path) - # Parse the report.xml - parsed_report = submission_object.parse_report_xml(report_local_path) - # Save as CSV to top level sample submission folder - # output_dir = 'path/to/results/sample_name/database' and we want to save a report for all samples to 'path/to/results/' - report_filename = os.path.join(os.path.dirname(os.path.dirname(output_dir)), 'submission_report.csv') - print(f"save_report_to_csv inputs are: {parsed_report}, {report_filename}") - submission_object.save_report_to_csv(parsed_report, report_filename) - return parsed_report - else: - print(f"No report found for submission {submission_id}") - return None - def get_accessions(sample, report_df): """ Returns a dict with available accessions for the input sample """ @@ -154,6 +130,7 @@ def submission_main(): submission_objects = {'biosample': biosample_submission, 'sra': sra_submission, 'genbank': genbank_submission} else: submission_objects = {'biosample': biosample_submission, 'sra': sra_submission} + # Try fetching the report for db, submission_object in submission_objects.items(): print(f"Fetching report for {db}") if submission_object.fetch_report(): # Stop trying if the report is found locally @@ -168,6 +145,23 @@ def submission_main(): else: # If the while loop completes without a successful fetch print("Timeout occurred while trying to fetch and parse the report.") + + # Loop over submission dbs to parse the report.xmls + # todo: add error-handling + all_reports = pd.DataFrame() + for db, submission in submission_objects.items(): + report_xml_file = f"{parameters['output_dir']}/{parameters['submission_name']}/{db}/report.xml" + print(f"report path {report_xml_file}") + df = submission.parse_report_to_df(report_xml_file) + all_reports = pd.concat([all_reports, df], ignore_index=True) + # output_dir = submission_outputs/submission_name/ and we want to save a report for all samples to submission_outputs/submission_name/ + report_csv_file = f"{parameters['output_dir']}/{parameters['submission_name']}/submission_report.csv" + if os.path.exists(report_csv_file): + # If file exists, append to it without writing the header + all_reports.to_csv(report_file, mode='a', header=False, index=False) + else: + # If file doesn't exist, write it with the header + all_reports.to_csv(report_file, mode='w', header=True, index=False) elif parameters['update']: # Call and run the update submission script @@ -395,14 +389,34 @@ def get_client(self): return FTPClient(self.submission_config) else: raise ValueError("Invalid submission mode: must be 'sftp' or 'ftp'") - def parse_report_xml(self, report_path): - # Parse the XML file and extract required information - tree = ET.parse(report_path) - root = tree.getroot() - report_dict = { + def fetch_report(self): + """ Fetches report.xml from the host site folder submit//sample_database/""" + self.client.connect() + self.client.change_dir('submit') # Change to 'submit' directory + self.client.change_dir(self.submission_dir) # Change to Test or Prod + self.client.change_dir(f"{self.sample.sample_id}_{self.type}") # Change to sample-specific directory + # Check if report.xml exists and download it + report_file = os.path.join(self.output_dir, 'report.xml') + if os.path.exists(report_file): + return True # Indicate that the report already exists + if self.client.file_exists(report_file): + print(f"Report found at {report_file}") + report_local_path = os.path.join(self.output_dir, 'report.xml') + self.client.download_file(report_file, report_local_path) + return report_local_path + else: + print(f"No report found for submission {self.sample.sample_id}") + return False + def parse_report_to_df(self, report_path): + """ + Parses report.xml file and consolidates all database entries into a single row + Returns a DataFrame with one row per submission_name. + """ + # Initialize a dictionary to store consolidated data + consolidated_report = { 'submission_name': self.sample.sample_id, - 'submission_type': self.type, 'submission_status': None, + 'submission_id': None, 'biosample_status': None, 'biosample_accession': None, 'biosample_message': None, @@ -413,44 +427,39 @@ def parse_report_xml(self, report_path): 'genbank_accession': None, 'genbank_message': None, 'genbank_release_date': None, + 'tracking_location': None, } - for action in root.findall('Action'): - db = action.attrib.get('target_db') - status = action.attrib.get('status') - accession = action.attrib.get('accession') - message = None - response = action.find('Response') - if response is not None: - message_element = response.find('Message') - if message_element is not None: - message = message_element.text - if db == 'BioSample': - report_dict['biosample_status'] = status - report_dict['biosample_accession'] = accession - report_dict['biosample_message'] = message - elif db == 'SRA': - report_dict['sra_status'] = status - report_dict['sra_accession'] = accession - report_dict['sra_message'] = message - elif db == 'GenBank': - report_dict['genbank_status'] = status - if status == 'processed-ok': - # Handle Genbank-specific logic (AccessionReport.tsv) - accession_report = action.find('AccessionReport') - if accession_report is not None: - report_dict['genbank_accession'] = accession_report.find('Accession').text - report_dict['genbank_release_date'] = accession_report.find('ReleaseDate').text - report_dict['genbank_message'] = message - return report_dict - def save_report_to_csv(self, report_dict, csv_file): - write_header = not os.path.exists(csv_file) or os.stat(csv_file).st_size == 0 - with open(csv_file, 'a', newline='') as f: - writer = csv.DictWriter(f, fieldnames=report_dict.keys()) - if write_header: - writer.writeheader() # todo: need to use pandas to do this probably, not all keys are being written to the file - writer.writerow(report_dict) - print(f"Submission report saved to {csv_file}") + try: + # Parse the XML file + tree = ET.parse(report_path) + root = tree.getroot() + # Extract submission-wide attributes + consolidated_report['submission_status'] = root.get("status", None) + consolidated_report['submission_id'] = root.get("submission_id", None) + # Iterate over each element to extract database-specific data + for action in root.findall("Action"): + db = action.get("target_db", "").lower() + if db == "biosample": + consolidated_report['biosample_status'] = action.get("status", None) + consolidated_report['biosample_message'] = action.findtext("Response") + elif db == "sra": + consolidated_report['sra_status'] = action.get("status", None) + consolidated_report['sra_message'] = action.findtext("Response") + elif db == "genbank": + consolidated_report['genbank_status'] = action.get("status", None) + consolidated_report['genbank_message'] = action.findtext("Response") + consolidated_report['genbank_release_date'] = action.get("release_date", None) + # Add tracking location if available + tracking_location = root.find("Tracking/SubmissionLocation") + if tracking_location is not None: + consolidated_report['tracking_location'] = tracking_location.text + except FileNotFoundError: + print(f"Report not found: {report_path}") + except ET.ParseError: + print(f"Error parsing XML report: {report_path}") + return consolidated_report def submit_files(self, files, type): + """ Uploads a set of files to a host site at submit//sample_database/ """ sample_subtype_dir = f'{self.sample.sample_id}_{type}' # samplename_ (a unique submission dir) self.client.connect() self.client.change_dir('submit') # Change to 'submit' directory @@ -461,13 +470,6 @@ def submit_files(self, files, type): print(f"Submitted files for sample {self.sample.sample_id}") def close(self): self.client.close() - def fetch_report(self): - report_path = os.path.join(self.output_dir, 'report.xml') - if os.path.exists(report_path): - return True # Indicate that the report was found - else: - fetch_and_parse_report(self, self.client, self.sample.sample_id, self.submission_dir, self.output_dir, self.type) - return False # Indicate that the report was not fetched class SFTPClient: @@ -716,9 +718,6 @@ def submit(self): files_to_submit = [submit_ready_file, self.xml_output_path] self.submit_files(files_to_submit, 'biosample') print(f"Submitted sample {self.sample.sample_id} to BioSample") - # Trigger report fetching - def update_report(self): - self.fetch_report() class SRASubmission(XMLSubmission, Submission): def __init__(self, sample, parameters, submission_config, metadata_df, output_dir, submission_mode, submission_dir, type, accession_id = None): @@ -779,9 +778,6 @@ def submit(self): files_to_submit = [submit_ready_file, self.xml_output_path, fastq1, fastq2] self.submit_files(files_to_submit, 'sra') print(f"Submitted sample {self.sample.sample_id} to SRA") - # Trigger report fetching - def update_report(self): - self.fetch_report() class GenbankSubmission(XMLSubmission, Submission): def __init__(self, sample, parameters, submission_config, metadata_df, output_dir, submission_mode, submission_dir, type, accession_id = None): @@ -1205,10 +1201,6 @@ def submit(self): f"{self.output_dir}/authorset.sbt", f"{self.output_dir}/comment.cmt"] self.submit_files(files_to_submit, 'genbank') print(f"Submitted sample {self.sample.sample_id} to Genbank") - # Trigger report fetching - def update_report(self): - self.fetch_report() - if __name__ == "__main__": submission_main() From 562a4d08a08a752a1da4af5ec6e48778e0bfce5c Mon Sep 17 00:00:00 2001 From: Jessica Rowell Date: Wed, 15 Jan 2025 17:05:16 -0500 Subject: [PATCH 06/15] return the report as a df not a dict --- bin/submission_new.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bin/submission_new.py b/bin/submission_new.py index 8cf757c8..9b5668dd 100755 --- a/bin/submission_new.py +++ b/bin/submission_new.py @@ -457,7 +457,7 @@ def parse_report_to_df(self, report_path): print(f"Report not found: {report_path}") except ET.ParseError: print(f"Error parsing XML report: {report_path}") - return consolidated_report + return pd.DataFrame([consolidated_report]) def submit_files(self, files, type): """ Uploads a set of files to a host site at submit//sample_database/ """ sample_subtype_dir = f'{self.sample.sample_id}_{type}' # samplename_ (a unique submission dir) From bf1380b9edeac23c2bdc74153882437579940399 Mon Sep 17 00:00:00 2001 From: Jessica Rowell Date: Wed, 15 Jan 2025 17:06:08 -0500 Subject: [PATCH 07/15] shorten consolidated_report to report --- bin/submission_new.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/bin/submission_new.py b/bin/submission_new.py index 9b5668dd..458bc29f 100755 --- a/bin/submission_new.py +++ b/bin/submission_new.py @@ -413,7 +413,7 @@ def parse_report_to_df(self, report_path): Returns a DataFrame with one row per submission_name. """ # Initialize a dictionary to store consolidated data - consolidated_report = { + report = { 'submission_name': self.sample.sample_id, 'submission_status': None, 'submission_id': None, @@ -434,30 +434,30 @@ def parse_report_to_df(self, report_path): tree = ET.parse(report_path) root = tree.getroot() # Extract submission-wide attributes - consolidated_report['submission_status'] = root.get("status", None) - consolidated_report['submission_id'] = root.get("submission_id", None) + report['submission_status'] = root.get("status", None) + report['submission_id'] = root.get("submission_id", None) # Iterate over each element to extract database-specific data for action in root.findall("Action"): db = action.get("target_db", "").lower() if db == "biosample": - consolidated_report['biosample_status'] = action.get("status", None) - consolidated_report['biosample_message'] = action.findtext("Response") + report['biosample_status'] = action.get("status", None) + report['biosample_message'] = action.findtext("Response") elif db == "sra": - consolidated_report['sra_status'] = action.get("status", None) - consolidated_report['sra_message'] = action.findtext("Response") + report['sra_status'] = action.get("status", None) + report['sra_message'] = action.findtext("Response") elif db == "genbank": - consolidated_report['genbank_status'] = action.get("status", None) - consolidated_report['genbank_message'] = action.findtext("Response") - consolidated_report['genbank_release_date'] = action.get("release_date", None) + report['genbank_status'] = action.get("status", None) + report['genbank_message'] = action.findtext("Response") + report['genbank_release_date'] = action.get("release_date", None) # Add tracking location if available tracking_location = root.find("Tracking/SubmissionLocation") if tracking_location is not None: - consolidated_report['tracking_location'] = tracking_location.text + report['tracking_location'] = tracking_location.text except FileNotFoundError: print(f"Report not found: {report_path}") except ET.ParseError: print(f"Error parsing XML report: {report_path}") - return pd.DataFrame([consolidated_report]) + return pd.DataFrame([report]) def submit_files(self, files, type): """ Uploads a set of files to a host site at submit//sample_database/ """ sample_subtype_dir = f'{self.sample.sample_id}_{type}' # samplename_ (a unique submission dir) From 2c574d2b32a0e4c786a04e601b4fd18b0e5313c6 Mon Sep 17 00:00:00 2001 From: Jessica Rowell Date: Wed, 15 Jan 2025 17:12:42 -0500 Subject: [PATCH 08/15] corrected the reference to report_csv_file --- bin/submission_new.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/bin/submission_new.py b/bin/submission_new.py index 458bc29f..94292f3a 100755 --- a/bin/submission_new.py +++ b/bin/submission_new.py @@ -158,10 +158,10 @@ def submission_main(): report_csv_file = f"{parameters['output_dir']}/{parameters['submission_name']}/submission_report.csv" if os.path.exists(report_csv_file): # If file exists, append to it without writing the header - all_reports.to_csv(report_file, mode='a', header=False, index=False) + all_reports.to_csv(report_csv_file, mode='a', header=False, index=False) else: # If file doesn't exist, write it with the header - all_reports.to_csv(report_file, mode='w', header=True, index=False) + all_reports.to_csv(report_csv_file, mode='w', header=True, index=False) elif parameters['update']: # Call and run the update submission script From 3575da43b2de296d77953de6b5a6a2fbb4e4ed50 Mon Sep 17 00:00:00 2001 From: Jessica Rowell Date: Thu, 16 Jan 2025 10:16:18 -0500 Subject: [PATCH 09/15] fix the report fetching function _csv_file --- bin/submission_new.py | 33 ++++++++++++++++----------------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/bin/submission_new.py b/bin/submission_new.py index 94292f3a..d624eadf 100755 --- a/bin/submission_new.py +++ b/bin/submission_new.py @@ -151,7 +151,6 @@ def submission_main(): all_reports = pd.DataFrame() for db, submission in submission_objects.items(): report_xml_file = f"{parameters['output_dir']}/{parameters['submission_name']}/{db}/report.xml" - print(f"report path {report_xml_file}") df = submission.parse_report_to_df(report_xml_file) all_reports = pd.concat([all_reports, df], ignore_index=True) # output_dir = submission_outputs/submission_name/ and we want to save a report for all samples to submission_outputs/submission_name/ @@ -396,17 +395,17 @@ def fetch_report(self): self.client.change_dir(self.submission_dir) # Change to Test or Prod self.client.change_dir(f"{self.sample.sample_id}_{self.type}") # Change to sample-specific directory # Check if report.xml exists and download it - report_file = os.path.join(self.output_dir, 'report.xml') - if os.path.exists(report_file): - return True # Indicate that the report already exists - if self.client.file_exists(report_file): - print(f"Report found at {report_file}") - report_local_path = os.path.join(self.output_dir, 'report.xml') - self.client.download_file(report_file, report_local_path) - return report_local_path + report_ftp_path = f"{self.sample.sample_id}_{self.type}/report.xml" + report_local_path = os.path.join(self.output_dir, 'report.xml') + if os.path.exists(report_local_path): + return True # Report already exists, don't fetch + if self.client.file_exists('report.xml'): + print(f"Report found at {report_ftp_path}") + self.client.download_file('report.xml', report_local_path) + return report_local_path # Report fetched, return its path else: print(f"No report found for submission {self.sample.sample_id}") - return False + return False # Report not found, need to try again def parse_report_to_df(self, report_path): """ Parses report.xml file and consolidates all database entries into a single row @@ -502,12 +501,12 @@ def file_exists(self, file_path): return True except IOError: return False - def download_file(self, remote_path, local_path): + def download_file(self, remote_file, local_path): try: - self.sftp.get(remote_path, local_path) - print(f"Downloaded file from {remote_path} to {local_path}") + self.sftp.get(remote_file, local_path) + print(f"Downloaded {remote_file} to {local_path}") except Exception as e: - raise IOError(f"Failed to download {remote_path}: {e}") + raise IOError(f"Failed to download {remote_file}: {e}") def upload_file(self, file_path, destination_path): try: self.sftp.put(file_path, destination_path) @@ -553,10 +552,10 @@ def file_exists(self, file_path): return True else: return False - def download_file(self, remote_path, local_path): + def download_file(self, remote_file, local_path): with open(local_path, 'wb') as f: - self.ftp.retrbinary(f'RETR {remote_path}', f.write) - print(f"Downloaded file from {remote_path} to {local_path}") + self.ftp.retrbinary(f'RETR {remote_file}', f.write) + print(f"Downloaded file from {remote_file} to {local_path}") def upload_file(self, file_path, destination_path): try: if file_path.endswith(('.fasta', '.fastq', '.fna', '.fsa', '.gff', '.gff3', '.gz', 'xml', '.sqn', '.sbt', '.cmt')): From cb45a121ab601b807ad9a410e575e73a747f81a8 Mon Sep 17 00:00:00 2001 From: Jessica Rowell Date: Thu, 16 Jan 2025 14:29:02 -0500 Subject: [PATCH 10/15] correct and simplify the report fetching loop --- bin/submission_new.py | 71 +++++++++++++++++++++++-------------------- 1 file changed, 38 insertions(+), 33 deletions(-) diff --git a/bin/submission_new.py b/bin/submission_new.py index d624eadf..ff732b47 100755 --- a/bin/submission_new.py +++ b/bin/submission_new.py @@ -122,7 +122,7 @@ def submission_main(): elif parameters['fetch']: start_time = time.time() timeout = 60 # time out after 60 seconds - report_fetched = False # Flag to indicate if a report has been fetched + report_fetched = {db: False for db in ['biosample', 'sra', 'genbank']} # Track fetched status for each db while time.time() - start_time < timeout: # if user is submitting to genbank via ftp and provided the necessary files @@ -130,41 +130,48 @@ def submission_main(): submission_objects = {'biosample': biosample_submission, 'sra': sra_submission, 'genbank': genbank_submission} else: submission_objects = {'biosample': biosample_submission, 'sra': sra_submission} - # Try fetching the report + # Try fetching reports for all databases for db, submission_object in submission_objects.items(): - print(f"Fetching report for {db}") - if submission_object.fetch_report(): # Stop trying if the report is found locally - print(f"Report for {db} successfully fetched or already exists.") - report_fetched = True - break # Exit the for loop - # If report is not fetched, continue trying - time.sleep(3) - if report_fetched: - print("Exiting update loop as a report has been fetched or already exists.") - break # Exit the while loop + if not report_fetched[db]: # Only attempt fetching if the report has not been fetched + print(f"Fetching report for {db}") + fetched_path = submission_object.fetch_report() + if fetched_path: + print(f"Report for {db} successfully fetched or already exists.") + report_fetched[db] = True + else: + print(f"Failed to fetch report for {db}, retrying...") + time.sleep(3) # Prevent spamming the server + + # Exit the loop if all reports have been fetched + if all(report_fetched.values()): + print("All reports successfully fetched.") + break else: - # If the while loop completes without a successful fetch - print("Timeout occurred while trying to fetch and parse the report.") + # If the while loop completes without fetching all reports + print("Timeout occurred while trying to fetch all reports.") # Loop over submission dbs to parse the report.xmls # todo: add error-handling all_reports = pd.DataFrame() for db, submission in submission_objects.items(): report_xml_file = f"{parameters['output_dir']}/{parameters['submission_name']}/{db}/report.xml" + print(report_xml_file) # debug df = submission.parse_report_to_df(report_xml_file) + print(df) all_reports = pd.concat([all_reports, df], ignore_index=True) # output_dir = submission_outputs/submission_name/ and we want to save a report for all samples to submission_outputs/submission_name/ report_csv_file = f"{parameters['output_dir']}/{parameters['submission_name']}/submission_report.csv" - if os.path.exists(report_csv_file): - # If file exists, append to it without writing the header - all_reports.to_csv(report_csv_file, mode='a', header=False, index=False) - else: - # If file doesn't exist, write it with the header - all_reports.to_csv(report_csv_file, mode='w', header=True, index=False) - + print(report_csv_file) + try: + if os.path.exists(report_csv_file): + all_reports.to_csv(report_csv_file, mode='a', header=False, index=False) + else: + all_reports.to_csv(report_csv_file, mode='w', header=True, index=False) + print(f"Report table updated at: {report_csv_file}") + except Exception as e: + raise ValueError(f"Failed to save CSV file: {report_csv_file}. Error: {e}") + elif parameters['update']: - # Call and run the update submission script - #update_submission.submission_main() # Load the report file report_file = parameters["submission_report"] try: @@ -391,16 +398,15 @@ def get_client(self): def fetch_report(self): """ Fetches report.xml from the host site folder submit//sample_database/""" self.client.connect() - self.client.change_dir('submit') # Change to 'submit' directory - self.client.change_dir(self.submission_dir) # Change to Test or Prod - self.client.change_dir(f"{self.sample.sample_id}_{self.type}") # Change to sample-specific directory + # Navigate to submit// folder + self.client.change_dir(f"submit/{self.submission_dir}/{self.sample.sample_id}_{self.type}") # Check if report.xml exists and download it - report_ftp_path = f"{self.sample.sample_id}_{self.type}/report.xml" report_local_path = os.path.join(self.output_dir, 'report.xml') if os.path.exists(report_local_path): - return True # Report already exists, don't fetch - if self.client.file_exists('report.xml'): - print(f"Report found at {report_ftp_path}") + print(f"Report already exists locally: {report_local_path}") + return report_local_path + elif self.client.file_exists('report.xml'): + print(f"Report found on server. Downloading to: {report_local_path}.") self.client.download_file('report.xml', report_local_path) return report_local_path # Report fetched, return its path else: @@ -461,9 +467,8 @@ def submit_files(self, files, type): """ Uploads a set of files to a host site at submit//sample_database/ """ sample_subtype_dir = f'{self.sample.sample_id}_{type}' # samplename_ (a unique submission dir) self.client.connect() - self.client.change_dir('submit') # Change to 'submit' directory - self.client.change_dir(self.submission_dir) # Change to Test or Prod - self.client.change_dir(sample_subtype_dir) # Change to unique dir for sample_destination + # Navigate to submit// folder + self.client.change_dir(f"submit/{self.submission_dir}/{self.sample.sample_id}_{self.type}") for file_path in files: self.client.upload_file(file_path, f"{os.path.basename(file_path)}") print(f"Submitted files for sample {self.sample.sample_id}") From 7922cb3af665e69e5f23013cacde2bf9883a300e Mon Sep 17 00:00:00 2001 From: Jessica Rowell Date: Thu, 16 Jan 2025 14:29:22 -0500 Subject: [PATCH 11/15] correct csv output paths --- modules/local/fetch_submission/main.nf | 2 +- modules/local/initial_submission/main.nf | 2 +- modules/local/update_submission/main.nf | 2 +- subworkflows/local/submission.nf | 3 ++- 4 files changed, 5 insertions(+), 4 deletions(-) diff --git a/modules/local/fetch_submission/main.nf b/modules/local/fetch_submission/main.nf index fb31fce1..e5395a50 100644 --- a/modules/local/fetch_submission/main.nf +++ b/modules/local/fetch_submission/main.nf @@ -45,5 +45,5 @@ process FETCH_SUBMISSION { """ output: path "${validated_meta_path.getBaseName()}", emit: submission_files - path "*.csv", emit: submission_log + path "${validated_meta_path.getBaseName()}/*.csv", emit: submission_report } \ No newline at end of file diff --git a/modules/local/initial_submission/main.nf b/modules/local/initial_submission/main.nf index 0caad752..ad28b6fd 100644 --- a/modules/local/initial_submission/main.nf +++ b/modules/local/initial_submission/main.nf @@ -45,5 +45,5 @@ process SUBMISSION { """ output: path "${validated_meta_path.getBaseName()}", emit: submission_files - //path "*.csv", emit: submission_log + //path ""${validated_meta_path.getBaseName()}/*.csv", emit: submission_report } \ No newline at end of file diff --git a/modules/local/update_submission/main.nf b/modules/local/update_submission/main.nf index db59c979..ca596dba 100644 --- a/modules/local/update_submission/main.nf +++ b/modules/local/update_submission/main.nf @@ -44,5 +44,5 @@ process UPDATE_SUBMISSION { """ output: path "${validated_meta_path.getBaseName()}", emit: submission_files - path "*.csv", emit: submission_log + //path ""${validated_meta_path.getBaseName()}/*.csv", emit: submission_report } \ No newline at end of file diff --git a/subworkflows/local/submission.nf b/subworkflows/local/submission.nf index 72302e86..0a94138d 100644 --- a/subworkflows/local/submission.nf +++ b/subworkflows/local/submission.nf @@ -34,7 +34,8 @@ workflow INITIAL_SUBMISSION { // if params.update_submission is true, update an existing submission else if ( params.update_submission == true ) { - // process for updating the submitted samples + // process for updating the submitted samples + // todo: update to take the csv output from FETCH_SUBMISSION UPDATE_SUBMISSION ( submission_ch, submission_config ) // try to fetch & parse the report.xml From 0a0abacca964eedb05d1e3767737857fc12348df Mon Sep 17 00:00:00 2001 From: Jessica Rowell Date: Thu, 16 Jan 2025 14:49:44 -0500 Subject: [PATCH 12/15] report parsing was not fetching the message when it exists --- bin/submission_new.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/bin/submission_new.py b/bin/submission_new.py index ff732b47..7a2dfe08 100755 --- a/bin/submission_new.py +++ b/bin/submission_new.py @@ -444,17 +444,27 @@ def parse_report_to_df(self, report_path): # Iterate over each element to extract database-specific data for action in root.findall("Action"): db = action.get("target_db", "").lower() + response = action.find("Response") + response_message = None + if response is not None: + # Extract message from tag if present + message_tag = response.find("Message") + if message_tag is not None: + response_message = message_tag.text.strip() + else: + # Fallback to Response's own text or attributes + response_message = response.get("status", "").strip() or response.text.strip() if db == "biosample": report['biosample_status'] = action.get("status", None) - report['biosample_message'] = action.findtext("Response") + report['biosample_message'] = response_message elif db == "sra": report['sra_status'] = action.get("status", None) - report['sra_message'] = action.findtext("Response") + report['sra_message'] = response_message elif db == "genbank": report['genbank_status'] = action.get("status", None) - report['genbank_message'] = action.findtext("Response") + report['genbank_message'] = response_message report['genbank_release_date'] = action.get("release_date", None) - # Add tracking location if available + # Add server location if available tracking_location = root.find("Tracking/SubmissionLocation") if tracking_location is not None: report['tracking_location'] = tracking_location.text From 1440a8fe7119e5c01608129a34439e19008c6721 Mon Sep 17 00:00:00 2001 From: Jessica Rowell Date: Fri, 17 Jan 2025 13:22:37 -0500 Subject: [PATCH 13/15] some fixes to allow fetch_submission to run independently --- conf/test_params.config | 4 ++- modules/local/fetch_submission/main.nf | 2 +- modules/local/general_util/wait/main.nf | 2 +- nextflow.config | 5 +-- subworkflows/local/submission.nf | 44 ++++++++++++++++++++----- 5 files changed, 43 insertions(+), 14 deletions(-) diff --git a/conf/test_params.config b/conf/test_params.config index 11908be2..fba9573c 100644 --- a/conf/test_params.config +++ b/conf/test_params.config @@ -146,9 +146,11 @@ params { submission_mode = 'ftp' // 'ftp' or 'sftp' submission_output_dir = "submission_outputs" submission_prod_or_test = "test" // "prod" if submitting + submission_config = "${projectDir}/bin/config_files/mpxv_config.yaml" submission_wait_time = 380 send_submission_email = false - submission_config = "${projectDir}/bin/config_files/mpxv_config.yaml" + update_submission = false + fetch_reports_only = false // only try to fetch reports // batch_name = "batch1" // for update_submission: diff --git a/modules/local/fetch_submission/main.nf b/modules/local/fetch_submission/main.nf index e5395a50..7e651094 100644 --- a/modules/local/fetch_submission/main.nf +++ b/modules/local/fetch_submission/main.nf @@ -44,6 +44,6 @@ process FETCH_SUBMISSION { """ output: - path "${validated_meta_path.getBaseName()}", emit: submission_files + //path "${validated_meta_path.getBaseName()}", emit: submission_files path "${validated_meta_path.getBaseName()}/*.csv", emit: submission_report } \ No newline at end of file diff --git a/modules/local/general_util/wait/main.nf b/modules/local/general_util/wait/main.nf index 735d926c..6637ffeb 100644 --- a/modules/local/general_util/wait/main.nf +++ b/modules/local/general_util/wait/main.nf @@ -15,7 +15,7 @@ process WAIT { input: - val submission_signal + //val submission_signal val wait_time script: diff --git a/nextflow.config b/nextflow.config index cdcbe1f4..a3800460 100644 --- a/nextflow.config +++ b/nextflow.config @@ -60,11 +60,12 @@ params { biosample = true submission_mode = 'ftp' // 'ftp' or 'sftp' submission_output_dir = "submission_outputs" - submission_wait_time = 380 // time in seconds - submission_config = "${projectDir}/bin/config_files/.yaml" submission_prod_or_test = "test" // "prod" if submitting + submission_config = "${projectDir}/bin/config_files/.yaml" + submission_wait_time = 380 // time in seconds send_submission_email = false update_submission = false + fetch_reports_only = false // only try to fetch reports // general params help = false diff --git a/subworkflows/local/submission.nf b/subworkflows/local/submission.nf index 0a94138d..d16b0374 100644 --- a/subworkflows/local/submission.nf +++ b/subworkflows/local/submission.nf @@ -17,34 +17,60 @@ workflow INITIAL_SUBMISSION { submission_ch // meta.id, tsv, fasta, fastq1, fastq2, gff submission_config wait_time - + main: - if ( params.update_submission == false ) { + // Declare channels to dynamically handle conditional process outputs + Channel.empty().set { submission_files } // Default for SUBMISSION output + Channel.empty().set { update_files } // Default for UPDATE_SUBMISSION output + Channel.empty().set { fetched_reports } // Default for FETCH_SUBMISSION output + // actual process to initiate wait + //WAIT ( SUBMISSION.out.submission_files.collect(), wait_time ) + WAIT ( wait_time ) + + if ( params.fetch_reports_only == true ) { + // Check if submission folder exists and run report fetching module + submission_ch + .map { meta, validated_meta_path, fasta_path, fastq_1, fastq_2, annotations_path -> + def folder_path = file("${params.output_dir}/${params.submission_output_dir}/${meta.id}") + if (!folder_path.exists()) { + throw new IllegalStateException("Submission folder does not exist for ID: ${meta.id}") + } + // Return the tuple unchanged if the folder exists + return tuple(meta, validated_meta_path, fasta_path, fastq_1, fastq_2, annotations_path) + } + .set { valid_submission_ch } // Create a new validated channel + + FETCH_SUBMISSION ( WAIT.out, valid_submission_ch, submission_config ) + .set { fetched_reports } + } + + else if ( params.update_submission == false ) { // submit the files to database of choice SUBMISSION ( submission_ch, submission_config ) - - // actual process to initiate wait - WAIT ( SUBMISSION.out.submission_files.collect(), wait_time ) + .set { submission_files } // try to fetch & parse the report.xml - // todo: need to incorporate WAIT.out somehow // todo: this maybe doesn't need to take all the inputs from submission (or maybe doesn't need to be a separate module) FETCH_SUBMISSION ( WAIT.out, submission_ch, submission_config ) + .set { fetched_reports } } // if params.update_submission is true, update an existing submission else if ( params.update_submission == true ) { // process for updating the submitted samples - // todo: update to take the csv output from FETCH_SUBMISSION UPDATE_SUBMISSION ( submission_ch, submission_config ) + .set { update_files } // try to fetch & parse the report.xml - // todo: need to incorporate WAIT.out somehow FETCH_SUBMISSION ( WAIT.out, submission_ch, submission_config ) + .set { fetched_reports } } emit: - submission_files = SUBMISSION.out.submission_files + submission_files = submission_files + update_files = update_files + fetched_reports = fetched_reports + //submission_files = SUBMISSION.out.submission_files //submission_log = SUBMISSION.out.submission_log //to do: add GISAID module From 17ada80ee0f1598b32de98b6ac08f55e2f7028e6 Mon Sep 17 00:00:00 2001 From: Jessica Rowell Date: Mon, 27 Jan 2025 11:35:31 -0500 Subject: [PATCH 14/15] resolve path for the submission report in update_submissions --- modules/local/update_submission/main.nf | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/modules/local/update_submission/main.nf b/modules/local/update_submission/main.nf index ca596dba..7b7a2f4f 100644 --- a/modules/local/update_submission/main.nf +++ b/modules/local/update_submission/main.nf @@ -21,13 +21,17 @@ process UPDATE_SUBMISSION { def biosample = params.biosample == true ? '--biosample' : '' def sra = params.sra == true ? '--sra' : '' def genbank = params.genbank == true ? '--genbank' : '' + // get absolute path if relative dir passed + def resolved_output_dir = params.output_dir.startsWith('/') ? params.output_dir : "${baseDir}/${params.output_dir}" + script: """ submission_new.py \ --update \ --submission_name $meta.id \ - --config_file $submission_config \ + --submission_report ${resolved_output_dir}/${params.submission_output_dir}/${meta.id}/submission_report.csv \ + --config_file $submission_config \ --metadata_file $validated_meta_path \ --species $params.species \ --output_dir . \ From 7ade1baa65991fa9ec0e4615591860485b0150ab Mon Sep 17 00:00:00 2001 From: Jessica Rowell Date: Fri, 31 Jan 2025 10:10:01 -0500 Subject: [PATCH 15/15] a bunch of fixes to correct the update submissions workflow including getting the right folders in workDir, a reorg of the workflow, some corrections to the submission report...just so many things --- bin/submission_new.py | 122 ++++++++++++----------- modules/local/fetch_submission/main.nf | 6 +- modules/local/initial_submission/main.nf | 2 +- modules/local/update_submission/main.nf | 2 +- subworkflows/local/submission.nf | 56 ++++++++--- 5 files changed, 110 insertions(+), 78 deletions(-) diff --git a/bin/submission_new.py b/bin/submission_new.py index 7a2dfe08..e487e7d5 100755 --- a/bin/submission_new.py +++ b/bin/submission_new.py @@ -87,60 +87,46 @@ def submission_main(): submission_dir = 'Test' else: submission_dir = 'Prod' + + # Initial a dictionary to hold accessions if updating + accessions_dict = {'biosample':None, 'sra':None, 'genbank':None} - # Prepare all submissions first (so files are generated even if submission step fails) - if parameters['biosample'] and 'biosample' not in databases_to_skip: - biosample_submission = BiosampleSubmission(sample, parameters, config_dict, metadata_df, f"{parameters['output_dir']}/{parameters['submission_name']}/biosample", - parameters['submission_mode'], submission_dir, 'biosample') - if parameters['sra'] and 'sra' not in databases_to_skip: - sra_submission = SRASubmission(sample, parameters, config_dict, metadata_df, f"{parameters['output_dir']}/{parameters['submission_name']}/sra", - parameters['submission_mode'], submission_dir, 'sra') - if parameters['genbank'] and 'genbank' not in databases_to_skip: - # Generates an XML if ftp_upload is True - genbank_submission = GenbankSubmission(sample, parameters, config_dict, metadata_df, f"{parameters['output_dir']}/{parameters['submission_name']}/genbank", - parameters['submission_mode'], submission_dir, 'genbank') + # Get workflow + # todo: error messaging here should probably be part of NF not this script + if parameters["submit"] and parameters["update"]: + raise ValueError("Only one of 'submit' or 'update' can be True, not both.") - # If submission mode - if parameters['submit']: - # Submit all prepared submissions and fetch report once - if parameters['biosample'] and 'biosample' not in databases_to_skip: - biosample_submission.submit() - if parameters['sra'] and 'sra' not in databases_to_skip: - sra_submission.submit() - if parameters['genbank'] and 'genbank' not in databases_to_skip: - # If user is submitting via FTP - if sample.ftp_upload: - genbank_submission.prepare_files_ftp_submission() # Prep files and run table2asn - genbank_submission.submit() - else: - # Otherwise, prepare manual submission - genbank_submission.prepare_files_manual_submission() # Prep files and run table2asn - # Send email if the user requests it - if parameters['send_email']: - genbank_submission.sendemail() + if parameters["submit"]: + workflow = "submit" + elif parameters["update"]: + workflow = "update" + elif parameters["fetch"]: + workflow = "fetch" + else: + raise ValueError("Please specify a workflow (submit, update, or fetch) to proceed.") + print(f"Workflow requested: {workflow}") - elif parameters['fetch']: + # Beginning of fetch workflow + if workflow == 'fetch': + # Fetch the reports from NCBI's ftp/sftp site start_time = time.time() timeout = 60 # time out after 60 seconds report_fetched = {db: False for db in ['biosample', 'sra', 'genbank']} # Track fetched status for each db - + databases_to_fetch = [db for db in databases if db not in databases_to_skip] # List of databases to fetch reports for while time.time() - start_time < timeout: - # if user is submitting to genbank via ftp and provided the necessary files - if parameters['genbank'] and 'genbank' not in databases_to_skip and sample.ftp_upload: - submission_objects = {'biosample': biosample_submission, 'sra': sra_submission, 'genbank': genbank_submission} - else: - submission_objects = {'biosample': biosample_submission, 'sra': sra_submission} - # Try fetching reports for all databases - for db, submission_object in submission_objects.items(): + # Try fetching reports for all applicable databases + for db in databases_to_fetch: if not report_fetched[db]: # Only attempt fetching if the report has not been fetched print(f"Fetching report for {db}") - fetched_path = submission_object.fetch_report() + # Instantiate a Submission object for this database type + submission = Submission(sample, parameters, config_dict, f"{parameters['output_dir']}/{parameters['submission_name']}/{db}", parameters['submission_mode'], submission_dir, db) + fetched_path = submission.fetch_report() if fetched_path: print(f"Report for {db} successfully fetched or already exists.") report_fetched[db] = True else: print(f"Failed to fetch report for {db}, retrying...") - time.sleep(3) # Prevent spamming the server + time.sleep(3) # Because spamming servers isn't nice # Exit the loop if all reports have been fetched if all(report_fetched.values()): @@ -153,11 +139,9 @@ def submission_main(): # Loop over submission dbs to parse the report.xmls # todo: add error-handling all_reports = pd.DataFrame() - for db, submission in submission_objects.items(): + for db in databases_to_fetch: report_xml_file = f"{parameters['output_dir']}/{parameters['submission_name']}/{db}/report.xml" - print(report_xml_file) # debug df = submission.parse_report_to_df(report_xml_file) - print(df) all_reports = pd.concat([all_reports, df], ignore_index=True) # output_dir = submission_outputs/submission_name/ and we want to save a report for all samples to submission_outputs/submission_name/ report_csv_file = f"{parameters['output_dir']}/{parameters['submission_name']}/submission_report.csv" @@ -170,26 +154,41 @@ def submission_main(): print(f"Report table updated at: {report_csv_file}") except Exception as e: raise ValueError(f"Failed to save CSV file: {report_csv_file}. Error: {e}") + # End of the fetch workflow - elif parameters['update']: - # Load the report file - report_file = parameters["submission_report"] - try: - report_df = pd.read_csv(report_file) - except Exception as e: - raise ValueError(f"Failed to load CSV file: {report_file}. Error: {e}") - accessions_dict = get_accessions(sample.sample_id, report_df) + # Beginning of submit and update workflows + else: + # Load the report file to get the accession IDs if user is updating a submission + if workflow == 'update': + report_file = parameters["submission_report"] + try: + report_df = pd.read_csv(report_file) + except Exception as e: + raise ValueError(f"Failed to load CSV file: {report_file}. Error: {e}") + accessions_dict = get_accessions(sample.sample_id, report_df) + # Exit gracefully if accession IDs not found (cannot push update to NCBI without an accession ID) + databases_to_update = [db for db in databases if db not in databases_to_skip] + print(f"Updated requested for databases {', '.join(databases_to_update)}") + if any(accessions_dict.get(db) is None for db in databases_to_update): + print(f"Error: Missing accession for one of more of {', '.join(databases_to_update)}. Exiting update workflow.") + sys.exit(1) + else: + print(f"Accessions found for {', '.join(accessions_dict.keys())}") + print(f"Accessions: {', '.join(f'{k}: {v}' for k, v in accessions_dict.items())}") + # Todo: this code requires the user to specify the exact database(s) they want to update - # Prepare all submissions with the accessions - if accessions_dict['biosample'] and parameters['biosample'] and 'biosample' not in databases_to_skip: + # Run rest of the submission steps: Prep the files, submit, and fetch the report once + if parameters['biosample'] and 'biosample' not in databases_to_skip: biosample_submission = BiosampleSubmission(sample, parameters, config_dict, metadata_df, f"{parameters['output_dir']}/{parameters['submission_name']}/biosample", - parameters['submission_mode'], submission_dir, 'biosample', accessions_dict['biosample']) - if accessions_dict['sra'] and parameters['sra'] and 'sra' not in databases_to_skip: + parameters['submission_mode'], submission_dir, 'biosample', accessions_dict['biosample']) + if parameters['sra'] and 'sra' not in databases_to_skip: sra_submission = SRASubmission(sample, parameters, config_dict, metadata_df, f"{parameters['output_dir']}/{parameters['submission_name']}/sra", - parameters['submission_mode'], submission_dir, 'sra', accessions_dict['sra']) - if accessions_dict['genbank'] and parameters['genbank'] and 'genbank' not in databases_to_skip: + parameters['submission_mode'], submission_dir, 'sra', accessions_dict['sra']) + if parameters['genbank'] and 'genbank' not in databases_to_skip: + # Generates an XML if ftp_upload is True genbank_submission = GenbankSubmission(sample, parameters, config_dict, metadata_df, f"{parameters['output_dir']}/{parameters['submission_name']}/genbank", - parameters['submission_mode'], submission_dir, 'genbank', accessions_dict['genbank']) + parameters['submission_mode'], submission_dir, 'genbank', accessions_dict['genbank']) + # Submit all prepared submissions and fetch report once if parameters['biosample'] and 'biosample' not in databases_to_skip: biosample_submission.submit() @@ -206,7 +205,7 @@ def submission_main(): # Send email if the user requests it if parameters['send_email']: genbank_submission.sendemail() - + # End of submit and update workflows class GetParams: """ Class constructor for getting all necessary parameters (input args from argparse and hard-coded ones) """ @@ -472,7 +471,10 @@ def parse_report_to_df(self, report_path): print(f"Report not found: {report_path}") except ET.ParseError: print(f"Error parsing XML report: {report_path}") - return pd.DataFrame([report]) + report = pd.DataFrame([report]) + report = report.where(pd.notna(report), None) + return report + #return pd.DataFrame([report]) def submit_files(self, files, type): """ Uploads a set of files to a host site at submit//sample_database/ """ sample_subtype_dir = f'{self.sample.sample_id}_{type}' # samplename_ (a unique submission dir) diff --git a/modules/local/fetch_submission/main.nf b/modules/local/fetch_submission/main.nf index 7e651094..759a60d1 100644 --- a/modules/local/fetch_submission/main.nf +++ b/modules/local/fetch_submission/main.nf @@ -13,7 +13,7 @@ process FETCH_SUBMISSION { input: val wait_time - tuple val(meta), path(validated_meta_path), path(fasta_path), path(fastq_1), path(fastq_2), path(annotations_path) + tuple val(meta), path(validated_meta_path), path(fasta_path), path(fastq_1), path(fastq_2), path(annotations_path), path(submission_folder) path submission_config // define the command line arguments based on the value of params.submission_test_or_prod, params.send_submission_email @@ -24,7 +24,9 @@ process FETCH_SUBMISSION { def genbank = params.genbank == true ? '--genbank' : '' script: - """ + """ + echo "Using submission folder: $submission_folder" + ls -lh $submission_folder submission_new.py \ --fetch \ --submission_name $meta.id \ diff --git a/modules/local/initial_submission/main.nf b/modules/local/initial_submission/main.nf index ad28b6fd..a6c988d3 100644 --- a/modules/local/initial_submission/main.nf +++ b/modules/local/initial_submission/main.nf @@ -44,6 +44,6 @@ process SUBMISSION { """ output: - path "${validated_meta_path.getBaseName()}", emit: submission_files + tuple val(meta), path("${validated_meta_path.getBaseName()}"), emit: submission_files //path ""${validated_meta_path.getBaseName()}/*.csv", emit: submission_report } \ No newline at end of file diff --git a/modules/local/update_submission/main.nf b/modules/local/update_submission/main.nf index 7b7a2f4f..6f469fcd 100644 --- a/modules/local/update_submission/main.nf +++ b/modules/local/update_submission/main.nf @@ -47,6 +47,6 @@ process UPDATE_SUBMISSION { """ output: - path "${validated_meta_path.getBaseName()}", emit: submission_files + tuple val(meta), path("${validated_meta_path.getBaseName()}"), emit: submission_files //path ""${validated_meta_path.getBaseName()}/*.csv", emit: submission_report } \ No newline at end of file diff --git a/subworkflows/local/submission.nf b/subworkflows/local/submission.nf index d16b0374..aef1b242 100644 --- a/subworkflows/local/submission.nf +++ b/subworkflows/local/submission.nf @@ -27,30 +27,52 @@ workflow INITIAL_SUBMISSION { //WAIT ( SUBMISSION.out.submission_files.collect(), wait_time ) WAIT ( wait_time ) + def resolved_output_dir = params.output_dir.startsWith('/') ? params.output_dir : "${baseDir}/${params.output_dir}" + //def submission_folder = file("${resolved_output_dir}/${params.submission_output_dir}/${meta.id}") + if ( params.fetch_reports_only == true ) { // Check if submission folder exists and run report fetching module - submission_ch - .map { meta, validated_meta_path, fasta_path, fastq_1, fastq_2, annotations_path -> - def folder_path = file("${params.output_dir}/${params.submission_output_dir}/${meta.id}") - if (!folder_path.exists()) { - throw new IllegalStateException("Submission folder does not exist for ID: ${meta.id}") - } - // Return the tuple unchanged if the folder exists - return tuple(meta, validated_meta_path, fasta_path, fastq_1, fastq_2, annotations_path) + submission_ch = submission_ch + .map { meta, validated_meta_path, fasta_path, fastq_1, fastq_2, annotations_path -> + def submission_folder = file("${resolved_output_dir}/${params.submission_output_dir}/${meta.id}") + if (!submission_folder.exists()) { + throw new IllegalStateException("Submission folder does not exist for ID: ${meta.id}") } - .set { valid_submission_ch } // Create a new validated channel - - FETCH_SUBMISSION ( WAIT.out, valid_submission_ch, submission_config ) + return tuple(meta, validated_meta_path, fasta_path, fastq_1, fastq_2, annotations_path, submission_folder) + } + FETCH_SUBMISSION ( WAIT.out, submission_ch, submission_config ) .set { fetched_reports } } + else if ( params.update_submission == false ) { + // submit the files to the database of choice + SUBMISSION ( submission_ch, submission_config ) + .set { submission_files } + + // Add submission_files directory to channel before passing to FETCH_SUBMISSION + submission_ch.join(submission_files) + .map { meta, validated_meta_path, fasta_path, fastq_1, fastq_2, annotations_path, submission_folder -> + return tuple(meta, validated_meta_path, fasta_path, fastq_1, fastq_2, annotations_path, submission_folder) + } + .set { submission_with_folder } + + // Fetch & parse the report.xml + FETCH_SUBMISSION ( WAIT.out, submission_with_folder, submission_config ) + .set { fetched_reports } + } else if ( params.update_submission == false ) { // submit the files to database of choice SUBMISSION ( submission_ch, submission_config ) .set { submission_files } - // try to fetch & parse the report.xml - // todo: this maybe doesn't need to take all the inputs from submission (or maybe doesn't need to be a separate module) + // Add submission_files directory to channel before passing to FETCH_SUBMISSION + submission_ch.join(submission_files) + .map { meta, validated_meta_path, fasta_path, fastq_1, fastq_2, annotations_path, submission_folder -> + return tuple(meta, validated_meta_path, fasta_path, fastq_1, fastq_2, annotations_path, submission_folder) + } + .set { submission_with_folder } + + // Try to fetch & parse the report.xml FETCH_SUBMISSION ( WAIT.out, submission_ch, submission_config ) .set { fetched_reports } } @@ -61,7 +83,13 @@ workflow INITIAL_SUBMISSION { UPDATE_SUBMISSION ( submission_ch, submission_config ) .set { update_files } - // try to fetch & parse the report.xml + // Map submission_ch to include submission_folder (from UPDATE_SUBMISSION.out.submission_files) + submission_ch = submission_ch + .map { meta, validated_meta_path, fasta_path, fastq_1, fastq_2, annotations_path -> + return tuple(meta, validated_meta_path, fasta_path, fastq_1, fastq_2, annotations_path, UPDATE_SUBMISSION.out.submission_files) + } + + // Try to fetch & parse the report.xml FETCH_SUBMISSION ( WAIT.out, submission_ch, submission_config ) .set { fetched_reports } }