From 20fd5f9f85d4fb1af92d23e8c12d43d7c02b8f13 Mon Sep 17 00:00:00 2001 From: danlu1 Date: Tue, 1 Oct 2024 16:51:02 +0000 Subject: [PATCH 01/40] make table update cohort specific and allow saving files to staging --- scripts/table_updates/config.json | 5 +- scripts/table_updates/update_data_table.py | 116 +++++++++++++-------- scripts/table_updates/utilities.py | 101 +++++++++++++++++- 3 files changed, 174 insertions(+), 48 deletions(-) diff --git a/scripts/table_updates/config.json b/scripts/table_updates/config.json index 328da408..2893b67c 100644 --- a/scripts/table_updates/config.json +++ b/scripts/table_updates/config.json @@ -21,5 +21,6 @@ }, "main_genie_release_version": "16.6-consortium", "main_genie_data_release_files": "syn16804261", - "main_genie_sample_mapping_table": "syn7434273" -} + "main_genie_sample_mapping_table": "syn7434273", + "staging_project": "syn63600793" +} \ No newline at end of file diff --git a/scripts/table_updates/update_data_table.py b/scripts/table_updates/update_data_table.py index 82cd5eb9..c1a27d86 100644 --- a/scripts/table_updates/update_data_table.py +++ b/scripts/table_updates/update_data_table.py @@ -28,7 +28,11 @@ "primary": ("syn23285911", "table_type='data'"), "irr": ("syn21446696", "table_type='data' and double_curated is true"), } - + # This contains external tables with redacted + TABLE_INFO["redacted"] = ( + "syn21446696", + "table_type='data' and double_curated is false", + ) def get_main_genie_clinical_sample_file( syn: synapseclient.Synapse, @@ -70,9 +74,23 @@ def get_main_genie_clinical_sample_file( return clinical_df[["SAMPLE_ID", "SEQ_YEAR"]] -def _store_data(syn, table_id, label_data, table_type, logger, dry_run): +def _store_data(syn, table_id, label_data, table_type, cohort, logger, dry_run, production, config): + """Helper function to store data to each table in the master table. + + Args: + syn (synapseclient.Synapse): Synapse client connection + master_table (pandas.DataFrame): Table of all of the primary or irr BPC tables + label_data (pandas.DataFrame): The uploaded data + table_type (string): Table type, primary or irr + cohort (string): Cohort name + logger (logging.Logger): The custom logger. Optional. + dry_run (bool): The dry run flag. If True, perform a dry run. + production (bool): If True, save the output to the production environment. + config (dict): config read in + """ table_schema = syn.get(table_id) logger.info(f"Updating table: {table_schema.name} {table_id}") + # subset columns for the uploaded data form_label = table_schema.form_label[0] table_columns = syn.getColumns(table_schema.columnIds) table_columns = [col["name"] for col in list(table_columns)] @@ -81,6 +99,7 @@ def _store_data(syn, table_id, label_data, table_type, logger, dry_run): ) # variable in dd but not in data table_columns.append("redcap_repeat_instrument") temp_data = label_data[table_columns] + # subset the uploaded data based on form_label if form_label == "non-repeating": temp_data = temp_data[temp_data.redcap_repeat_instrument.isnull()] else: @@ -104,23 +123,26 @@ def _store_data(syn, table_id, label_data, table_type, logger, dry_run): # remove .0 from all columns temp_data = temp_data.applymap(lambda x: float_to_int(x)) # update table - table_query = syn.tableQuery("SELECT * from %s" % table_id) - if table_type == "irr": - # check for exsiting id to update for new data only - existing_records = list(set(table_query.asDataFrame()["record_id"])) - temp_data = temp_data[~temp_data["record_id"].isin(existing_records)] - if not dry_run: - if table_type == "primary": - table = syn.delete(table_query.asRowSet()) # wipe the table - table = syn.store(Table(table_schema, temp_data)) - else: - temp_data.to_csv(table_id + "_temp.csv") + store_cohort_data(syn, table_type, temp_data, table_schema, cohort, dry_run, production, config) + +def store_data(syn, master_table, label_data, table_type, cohort, logger, dry_run, production, config): + """Store data to each table in the master table. -def store_data(syn, master_table, label_data, table_type, logger, dry_run): + Args: + syn (synapseclient.Synapse): Synapse client connection + master_table (pandas.DataFrame): Table of all of the primary or irr BPC tables + label_data (pandas.DataFrame): The uploaded data + table_type (string): Table type, primary or irr + cohort (string): Cohort name + logger (logging.Logger): The custom logger. Optional. + dry_run (bool): The dry run flag. If True, perform a dry run. + production (bool): If True, save the output to the production environment. + config (dict): config read in + """ logger.info("Updating data for %s tables..." % table_type) for table_id in master_table["id"]: - _store_data(syn, table_id, label_data, table_type, logger, dry_run) + _store_data(syn, table_id, label_data, table_type, cohort, logger, dry_run, production, config) def get_phi_cutoff(unit): @@ -218,7 +240,19 @@ def _redact_table(df, interval_cols_info): return df, record_to_redact -def update_redact_table(syn, redacted_table_info, full_data_table_info, logger): +def update_redact_table(syn, redacted_table_info, full_data_table_info, cohort, logger, dry_run, production, config): + """Update redacted table + + Args: + syn (synapseclient.Synapse): Synapse client connection + redacted_table_info (pandas.DataFrame): Table of all of the redacted tables + full_data_table_info (pandas.DataFrame): Table of all of the primary or irr BPC tables + cohort (string): Cohort name + logger (logging.Logger): The custom logger. Optional. + dry_run (bool): The dry run flag. If True, perform a dry run. + production (bool): If True, save the output to the production environment. + config (dict): config read in + """ interval_cols_info = download_synapse_table(syn, "syn23281483", "") # Create new master table master_table = redacted_table_info.merge( @@ -235,14 +269,13 @@ def update_redact_table(syn, redacted_table_info, full_data_table_info, logger): master_table["name"] == "Cancer Panel Test", "id_full" ].values[0] curation_info = syn.tableQuery( - "SELECT record_id, curation_dt FROM %s" % curation_table_id + f"SELECT record_id, curation_dt FROM {curation_table_id} where cohort = '{cohort}'" ).asDataFrame() patient_info = syn.tableQuery( - "SELECT record_id, birth_year, hybrid_death_ind FROM %s" % patient_table_id + f"SELECT record_id, birth_year, hybrid_death_ind FROM {patient_table_id} where cohort = '{cohort}'" ).asDataFrame() sample_info = syn.tableQuery( - "SELECT record_id, cpt_genie_sample_id, age_at_seq_report FROM %s" - % sample_table_id + f"SELECT record_id, cpt_genie_sample_id, age_at_seq_report FROM {sample_table_id} where cohort = '{cohort}'" ).asDataFrame() patient_curation_info = patient_info.merge( curation_info, how="left", on="record_id" @@ -268,17 +301,16 @@ def update_redact_table(syn, redacted_table_info, full_data_table_info, logger): for _, row in master_table.iterrows(): if row["name"] != "Patient Characteristics": table_id = row["id_full"] - df = syn.tableQuery("SELECT * FROM %s" % table_id).asDataFrame() + df = syn.tableQuery(f"SELECT * FROM {table_id} where cohort = '{cohort}'").asDataFrame() new_df, new_record_to_redact = _redact_table(df, interval_cols_info) new_df.reset_index(drop=True, inplace=True) record_to_redact = record_to_redact + new_record_to_redact table_schema = syn.get(row["id_redacted"]) logger.info("Updating table: %s" % table_schema.name) - table_query = syn.tableQuery("SELECT * from %s" % row["id_redacted"]) - table = syn.delete(table_query.asRowSet()) # wipe the table - table = syn.store(Table(table_schema, new_df)) - # Modify patient table - df = syn.tableQuery("SELECT * FROM %s" % patient_table_id).asDataFrame() + store_cohort_redacted_data(syn, new_df, table_schema, cohort, dry_run, production, config) + + # Modify patient table separately + df = syn.tableQuery(f"SELECT * FROM {patient_table_id} where cohort = '{cohort}'").asDataFrame() new_df, new_record_to_redact = _redact_table(df, interval_cols_info) new_df.reset_index(drop=True, inplace=True) record_to_redact = record_to_redact + new_record_to_redact @@ -293,16 +325,18 @@ def update_redact_table(syn, redacted_table_info, full_data_table_info, logger): master_table["name"] == "Patient Characteristics", "id_redacted" ].values[0] table_schema = syn.get(redacted_patient_id) - table_query = syn.tableQuery("SELECT * from %s" % redacted_patient_id) - table = syn.delete(table_query.asRowSet()) # wipe the table - table = syn.store(Table(table_schema, new_df)) + # rename the table to distiguish it from full data patient table in Staging project + if not production: + table_schema["name"] = "Patient Characteristics Redacted" + store_cohort_redacted_data(syn, new_df, table_schema, cohort, dry_run, production, config) + # Update redacted column in full data patient table - logger.info("Updating redacted column in the internal table...") + logger.info("Updating redacted column in the Sage internal table...") full_pt_id = master_table.loc[ master_table["name"] == "Patient Characteristics", "id_full" ].values[0] full_pt_schema = syn.get(full_pt_id) - pt_dat_query = syn.tableQuery("SELECT cohort, record_id FROM %s" % full_pt_id) + pt_dat_query = syn.tableQuery(f"SELECT cohort, record_id FROM {full_pt_id} where cohort = '{cohort}'") pt_dat = pt_dat_query.asDataFrame() pt_dat.index = pt_dat.index.map(str) pt_dat["index"] = pt_dat.index @@ -310,8 +344,7 @@ def update_redact_table(syn, redacted_table_info, full_data_table_info, logger): result = pandas.merge(pt_dat, info_to_update, on=["cohort", "record_id"]) result.index = result["index"] result = result[["redacted"]] - syn.store(Table(full_pt_schema, result, etag=pt_dat_query.etag)) - + update_redacted_column(syn, full_pt_schema, pt_dat, result, pt_dat_query, dry_run, production, config) def custom_fix_for_cancer_panel_test_table( syn: synapseclient.Synapse, @@ -399,6 +432,8 @@ def main(): parser.add_argument( "-p", "--project_config", default="config.json", help="Project config file" ) + parser.add_argument("-c", "--cohort", default="", help="Cohort name for which the tables should be updated") + parser.add_argument("-pd", "--production", action="store_true", help="Save output to production folder") parser.add_argument("-m", "--message", default="", help="Version comment") parser.add_argument("-d", "--dry_run", action="store_true", help="dry run flag") @@ -406,6 +441,8 @@ def main(): table_type = args.table synapse_config = args.synapse_config project_config = args.project_config + cohort = args.cohort + production = args.production comment = args.message dry_run = args.dry_run @@ -426,26 +463,17 @@ def main(): # This is the internal tables with non redacted table_id, condition = list(TABLE_INFO[table_type]) master_table = download_synapse_table(syn, table_id, condition) - # This contains external tables with redacted - TABLE_INFO["redacted"] = ( - "syn21446696", - "table_type='data' and double_curated is false", - ) # download data files # TODO: find the cohort that has new data # This is a mapping to all the intake data. e.g: ProstateBPCIntake_data # found here: https://www.synapse.org/Synapse:syn23286928 cohort_info_selected = config[table_type] - cohort_data_list = [] - for cohort in cohort_info_selected: - df = get_data(syn, cohort_info_selected[cohort], cohort) - cohort_data_list.append(df) - label_data = pandas.concat(cohort_data_list, axis=0, ignore_index=True) + label_data = get_data(syn, cohort_info_selected[cohort], cohort) label_data["redacted"] = numpy.nan # update data tables - store_data(syn, master_table, label_data, table_type, logger, dry_run) + store_data(syn, master_table, label_data, table_type, cohort, logger, dry_run, production, config) if not dry_run: custom_fix_for_cancer_panel_test_table(syn, master_table, logger, config) if table_type == "primary": diff --git a/scripts/table_updates/utilities.py b/scripts/table_updates/utilities.py index e8d725be..9871392d 100644 --- a/scripts/table_updates/utilities.py +++ b/scripts/table_updates/utilities.py @@ -3,7 +3,7 @@ import pandas import synapseclient -from synapseclient import Schema, Column, Table +from synapseclient import Schema, Table, as_table_columns def _is_float(val): """Check if the value is float @@ -134,4 +134,101 @@ def revert_table_version(syn, table_id): temp_data = old_data[table_columns] table_query = syn.tableQuery("SELECT * from %s" % table_id) syn.delete(table_query.asRowSet()) - syn.store(Table(table_schema, temp_data)) \ No newline at end of file + syn.store(Table(table_schema, temp_data)) + + +def store_cohort_data(syn, table_type, data, schema, cohort, dry_run, production, config): + """Truncate and save record for a cohort + + Args: + syn (synapseclient.Synapse): Synapse client connection + table_type (string): Table type, primary or irr + data (pandas.DataFrame): The data to be stored + schema (Schema): The table schema of the table in the production project + cohort (string): Cohort name + dry_run (bool): The dry run flag. If True, perform a dry run. + production (bool): If True, save the output to the production environment. + config (dict): config read in + """ + table_query = syn.tableQuery(f"SELECT * FROM {schema.id} where cohort = '{cohort}'") + + if table_type == "irr": + # check for exsiting id to update for new data only + existing_records = list(set(table_query.asDataFrame()["record_id"])) + data = data[~data["record_id"].isin(existing_records)] + if not dry_run: + # save to synapse + if table_type == "primary": + # wipe out cohort data + syn.delete(table_query) + if production: + # save to production project + syn.store(Table(schema, data)) + else: + # save to staging project + table_schema = Schema(name=schema.name, columns=as_table_columns(data), primary_key= schema.primary_key, data_type= schema.data_type, table_type = schema.table_type, form = schema.form, form_label = schema.form_label, parent=config['staging_project']) + syn.store(Table(table_schema, data)) + else: + # save to local + data.to_csv(schema.id + "_temp.csv") + + +def store_cohort_redacted_data(syn, data, schema, cohort, dry_run, production, config): + """Store redacted tables for a cohort + + Args: + syn (synapseclient.Synapse): Synapse client connection + data (pandas.DataFrame): The data to be stored + schema (Schema): The table schema of the table in the production project + cohort (string): Cohort name + dry_run (bool): The dry run flag. If True, perform a dry run. + production (bool): If True, save the output to the production environment. + config (dict): config read in + """ + table_query = syn.tableQuery(f"SELECT * FROM {schema.id} where cohort = '{cohort}'") + + if not dry_run: + # save to synapse + # wipe out cohort data + syn.delete(table_query) + if production: + # save to production project + syn.store(Table(schema, data)) + else: + # save to staging project + table_schema = Schema(name=schema.name, columns=as_table_columns(data), primary_key= schema.primary_key, data_type= schema.data_type, table_type = schema.table_type, form = schema.form, form_label = schema.form_label, parent=config['staging_project']) + syn.store(Table(table_schema, data)) + else: + # save to local + data.to_csv(schema.id + "_temp.csv") + + +def update_redacted_column(syn, schema, data, redacted_column, table_query, dry_run, production, config): + """Update redacted column in a table + + Args: + syn (synapseclient.Synapse): Synapse client connection + schema (Schema): The table schema of the table in the production project + data (pandas.DataFrame): The data to be stored + redacted_column (pandas.Series): The redacted column + table_query (synapseclient.table.CsvFileTable): The synapseclient.table.CsvFileTable object + dry_run (bool): The dry run flag. If True, perform a dry run. + production (bool): If True, save the output to the production environment. + config (dict): config read in + """ + if not dry_run: + # save to synapse + if production: + # save to production project + syn.store(Table(schema, redacted_column, etag = table_query.etag)) + else: + # save to staging project + # get the table id + table_id = syn.findEntityId(schema.name, parent=config['staging_project']) + schema = syn.get(table_id) + table_query = syn.tableQuery(f"SELECT cohort, record_id FROM {table_id}") + syn.store(Table(schema, redacted_column, etag= table_query.etag)) + else: + # save to local + data["redacted"] = redacted_column + data.to_csv(schema.id + "_temp.csv") From 80bcfec7b4ce0b98544262b9fb27c8db49e0b7be Mon Sep 17 00:00:00 2001 From: danlu1 Date: Wed, 2 Oct 2024 23:30:51 +0000 Subject: [PATCH 02/40] utilize Staging projects and tables so no custom table creation scripts needed --- scripts/table_updates/update_data_table.py | 60 +++++++------ scripts/table_updates/utilities.py | 98 +--------------------- 2 files changed, 37 insertions(+), 121 deletions(-) diff --git a/scripts/table_updates/update_data_table.py b/scripts/table_updates/update_data_table.py index c1a27d86..610dab35 100644 --- a/scripts/table_updates/update_data_table.py +++ b/scripts/table_updates/update_data_table.py @@ -19,20 +19,20 @@ import json import math -import pandas import numpy - +import pandas from utilities import * TABLE_INFO = { + "production" : { "primary": ("syn23285911", "table_type='data'"), "irr": ("syn21446696", "table_type='data' and double_curated is true"), + "redacted" : ("syn21446696", "table_type='data' and double_curated is false") +}, "staging" : { + "primary": ("syn63616766", "table_type='data'"), + "redacted" : ("syn63617582", "table_type='data' and double_curated is false") +} } - # This contains external tables with redacted - TABLE_INFO["redacted"] = ( - "syn21446696", - "table_type='data' and double_curated is false", - ) def get_main_genie_clinical_sample_file( syn: synapseclient.Synapse, @@ -74,7 +74,7 @@ def get_main_genie_clinical_sample_file( return clinical_df[["SAMPLE_ID", "SEQ_YEAR"]] -def _store_data(syn, table_id, label_data, table_type, cohort, logger, dry_run, production, config): +def _store_data(syn, table_id, label_data, table_type, cohort, logger, dry_run): """Helper function to store data to each table in the master table. Args: @@ -123,10 +123,19 @@ def _store_data(syn, table_id, label_data, table_type, cohort, logger, dry_run, # remove .0 from all columns temp_data = temp_data.applymap(lambda x: float_to_int(x)) # update table - store_cohort_data(syn, table_type, temp_data, table_schema, cohort, dry_run, production, config) - + table_query = syn.tableQuery(f"SELECT * FROM {table_schema.id} where cohort = '{cohort}'") + if table_type == "irr": + # check for exsiting id to update for new data only + existing_records = list(set(table_query.asDataFrame()["record_id"])) + temp_data = temp_data[~temp_data["record_id"].isin(existing_records)] + if not dry_run: + if table_type == "primary": + table = syn.delete(table_query) # wipe the cohort data + table = syn.store(Table(table_schema, temp_data)) + else: + temp_data.to_csv(table_id + "_temp.csv") -def store_data(syn, master_table, label_data, table_type, cohort, logger, dry_run, production, config): +def store_data(syn, master_table, label_data, table_type, cohort, logger, dry_run): """Store data to each table in the master table. Args: @@ -142,8 +151,7 @@ def store_data(syn, master_table, label_data, table_type, cohort, logger, dry_ru """ logger.info("Updating data for %s tables..." % table_type) for table_id in master_table["id"]: - _store_data(syn, table_id, label_data, table_type, cohort, logger, dry_run, production, config) - + _store_data(syn, table_id, label_data, table_type, cohort, logger, dry_run) def get_phi_cutoff(unit): switcher = {"day": math.floor(89 * 365), "month": math.floor(89 * 12), "year": 89} @@ -240,7 +248,7 @@ def _redact_table(df, interval_cols_info): return df, record_to_redact -def update_redact_table(syn, redacted_table_info, full_data_table_info, cohort, logger, dry_run, production, config): +def update_redact_table(syn, redacted_table_info, full_data_table_info, cohort, logger, dry_run): """Update redacted table Args: @@ -307,9 +315,11 @@ def update_redact_table(syn, redacted_table_info, full_data_table_info, cohort, record_to_redact = record_to_redact + new_record_to_redact table_schema = syn.get(row["id_redacted"]) logger.info("Updating table: %s" % table_schema.name) - store_cohort_redacted_data(syn, new_df, table_schema, cohort, dry_run, production, config) + table_query = syn.tableQuery(f"SELECT * from {table_schema.id} where cohort = '{cohort}'") + table = syn.delete(table_query) # wipe the table + table = syn.store(Table(table_schema, new_df)) - # Modify patient table separately + # Modify patient table df = syn.tableQuery(f"SELECT * FROM {patient_table_id} where cohort = '{cohort}'").asDataFrame() new_df, new_record_to_redact = _redact_table(df, interval_cols_info) new_df.reset_index(drop=True, inplace=True) @@ -325,11 +335,9 @@ def update_redact_table(syn, redacted_table_info, full_data_table_info, cohort, master_table["name"] == "Patient Characteristics", "id_redacted" ].values[0] table_schema = syn.get(redacted_patient_id) - # rename the table to distiguish it from full data patient table in Staging project - if not production: - table_schema["name"] = "Patient Characteristics Redacted" - store_cohort_redacted_data(syn, new_df, table_schema, cohort, dry_run, production, config) - + table_query = syn.tableQuery(f"SELECT * from {redacted_patient_id} where cohort = '{cohort}'") + table = syn.delete(table_query) # wipe the table + table = syn.store(Table(table_schema, new_df)) # Update redacted column in full data patient table logger.info("Updating redacted column in the Sage internal table...") full_pt_id = master_table.loc[ @@ -344,7 +352,8 @@ def update_redact_table(syn, redacted_table_info, full_data_table_info, cohort, result = pandas.merge(pt_dat, info_to_update, on=["cohort", "record_id"]) result.index = result["index"] result = result[["redacted"]] - update_redacted_column(syn, full_pt_schema, pt_dat, result, pt_dat_query, dry_run, production, config) + syn.store(Table(full_pt_schema, result, etag=pt_dat_query.etag)) + def custom_fix_for_cancer_panel_test_table( syn: synapseclient.Synapse, @@ -461,9 +470,12 @@ def main(): # get master table # This is the internal tables with non redacted + if production: + TABLE_INFO = TABLE_INFO["production"] + else: + TABLE_INFO = TABLE_INFO["staging"] table_id, condition = list(TABLE_INFO[table_type]) master_table = download_synapse_table(syn, table_id, condition) - # download data files # TODO: find the cohort that has new data # This is a mapping to all the intake data. e.g: ProstateBPCIntake_data @@ -473,7 +485,7 @@ def main(): label_data["redacted"] = numpy.nan # update data tables - store_data(syn, master_table, label_data, table_type, cohort, logger, dry_run, production, config) + store_data(syn, master_table, label_data, table_type, cohort, logger, dry_run) if not dry_run: custom_fix_for_cancer_panel_test_table(syn, master_table, logger, config) if table_type == "primary": diff --git a/scripts/table_updates/utilities.py b/scripts/table_updates/utilities.py index 9871392d..2a4c8264 100644 --- a/scripts/table_updates/utilities.py +++ b/scripts/table_updates/utilities.py @@ -5,6 +5,7 @@ import synapseclient from synapseclient import Schema, Table, as_table_columns + def _is_float(val): """Check if the value is float @@ -135,100 +136,3 @@ def revert_table_version(syn, table_id): table_query = syn.tableQuery("SELECT * from %s" % table_id) syn.delete(table_query.asRowSet()) syn.store(Table(table_schema, temp_data)) - - -def store_cohort_data(syn, table_type, data, schema, cohort, dry_run, production, config): - """Truncate and save record for a cohort - - Args: - syn (synapseclient.Synapse): Synapse client connection - table_type (string): Table type, primary or irr - data (pandas.DataFrame): The data to be stored - schema (Schema): The table schema of the table in the production project - cohort (string): Cohort name - dry_run (bool): The dry run flag. If True, perform a dry run. - production (bool): If True, save the output to the production environment. - config (dict): config read in - """ - table_query = syn.tableQuery(f"SELECT * FROM {schema.id} where cohort = '{cohort}'") - - if table_type == "irr": - # check for exsiting id to update for new data only - existing_records = list(set(table_query.asDataFrame()["record_id"])) - data = data[~data["record_id"].isin(existing_records)] - if not dry_run: - # save to synapse - if table_type == "primary": - # wipe out cohort data - syn.delete(table_query) - if production: - # save to production project - syn.store(Table(schema, data)) - else: - # save to staging project - table_schema = Schema(name=schema.name, columns=as_table_columns(data), primary_key= schema.primary_key, data_type= schema.data_type, table_type = schema.table_type, form = schema.form, form_label = schema.form_label, parent=config['staging_project']) - syn.store(Table(table_schema, data)) - else: - # save to local - data.to_csv(schema.id + "_temp.csv") - - -def store_cohort_redacted_data(syn, data, schema, cohort, dry_run, production, config): - """Store redacted tables for a cohort - - Args: - syn (synapseclient.Synapse): Synapse client connection - data (pandas.DataFrame): The data to be stored - schema (Schema): The table schema of the table in the production project - cohort (string): Cohort name - dry_run (bool): The dry run flag. If True, perform a dry run. - production (bool): If True, save the output to the production environment. - config (dict): config read in - """ - table_query = syn.tableQuery(f"SELECT * FROM {schema.id} where cohort = '{cohort}'") - - if not dry_run: - # save to synapse - # wipe out cohort data - syn.delete(table_query) - if production: - # save to production project - syn.store(Table(schema, data)) - else: - # save to staging project - table_schema = Schema(name=schema.name, columns=as_table_columns(data), primary_key= schema.primary_key, data_type= schema.data_type, table_type = schema.table_type, form = schema.form, form_label = schema.form_label, parent=config['staging_project']) - syn.store(Table(table_schema, data)) - else: - # save to local - data.to_csv(schema.id + "_temp.csv") - - -def update_redacted_column(syn, schema, data, redacted_column, table_query, dry_run, production, config): - """Update redacted column in a table - - Args: - syn (synapseclient.Synapse): Synapse client connection - schema (Schema): The table schema of the table in the production project - data (pandas.DataFrame): The data to be stored - redacted_column (pandas.Series): The redacted column - table_query (synapseclient.table.CsvFileTable): The synapseclient.table.CsvFileTable object - dry_run (bool): The dry run flag. If True, perform a dry run. - production (bool): If True, save the output to the production environment. - config (dict): config read in - """ - if not dry_run: - # save to synapse - if production: - # save to production project - syn.store(Table(schema, redacted_column, etag = table_query.etag)) - else: - # save to staging project - # get the table id - table_id = syn.findEntityId(schema.name, parent=config['staging_project']) - schema = syn.get(table_id) - table_query = syn.tableQuery(f"SELECT cohort, record_id FROM {table_id}") - syn.store(Table(schema, redacted_column, etag= table_query.etag)) - else: - # save to local - data["redacted"] = redacted_column - data.to_csv(schema.id + "_temp.csv") From d27734afc6e0d91dbb4d6230a294cdedb7c90a5a Mon Sep 17 00:00:00 2001 From: danlu1 Date: Wed, 2 Oct 2024 23:33:58 +0000 Subject: [PATCH 03/40] remove unwanted line in config file --- scripts/table_updates/config.json | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/scripts/table_updates/config.json b/scripts/table_updates/config.json index 2893b67c..52f01a57 100644 --- a/scripts/table_updates/config.json +++ b/scripts/table_updates/config.json @@ -21,6 +21,5 @@ }, "main_genie_release_version": "16.6-consortium", "main_genie_data_release_files": "syn16804261", - "main_genie_sample_mapping_table": "syn7434273", - "staging_project": "syn63600793" + "main_genie_sample_mapping_table": "syn7434273" } \ No newline at end of file From b7281cdd7f1b05d39c2baa53781631328f9b1bf0 Mon Sep 17 00:00:00 2001 From: danlu1 Date: Wed, 2 Oct 2024 23:37:28 +0000 Subject: [PATCH 04/40] update docstrings --- scripts/table_updates/config.json | 2 +- scripts/table_updates/update_data_table.py | 9 +-------- scripts/table_updates/utilities.py | 2 +- 3 files changed, 3 insertions(+), 10 deletions(-) diff --git a/scripts/table_updates/config.json b/scripts/table_updates/config.json index 52f01a57..328da408 100644 --- a/scripts/table_updates/config.json +++ b/scripts/table_updates/config.json @@ -22,4 +22,4 @@ "main_genie_release_version": "16.6-consortium", "main_genie_data_release_files": "syn16804261", "main_genie_sample_mapping_table": "syn7434273" -} \ No newline at end of file +} diff --git a/scripts/table_updates/update_data_table.py b/scripts/table_updates/update_data_table.py index 610dab35..5381d09b 100644 --- a/scripts/table_updates/update_data_table.py +++ b/scripts/table_updates/update_data_table.py @@ -85,8 +85,6 @@ def _store_data(syn, table_id, label_data, table_type, cohort, logger, dry_run): cohort (string): Cohort name logger (logging.Logger): The custom logger. Optional. dry_run (bool): The dry run flag. If True, perform a dry run. - production (bool): If True, save the output to the production environment. - config (dict): config read in """ table_schema = syn.get(table_id) logger.info(f"Updating table: {table_schema.name} {table_id}") @@ -146,8 +144,6 @@ def store_data(syn, master_table, label_data, table_type, cohort, logger, dry_ru cohort (string): Cohort name logger (logging.Logger): The custom logger. Optional. dry_run (bool): The dry run flag. If True, perform a dry run. - production (bool): If True, save the output to the production environment. - config (dict): config read in """ logger.info("Updating data for %s tables..." % table_type) for table_id in master_table["id"]: @@ -248,7 +244,7 @@ def _redact_table(df, interval_cols_info): return df, record_to_redact -def update_redact_table(syn, redacted_table_info, full_data_table_info, cohort, logger, dry_run): +def update_redact_table(syn, redacted_table_info, full_data_table_info, cohort, logger): """Update redacted table Args: @@ -257,9 +253,6 @@ def update_redact_table(syn, redacted_table_info, full_data_table_info, cohort, full_data_table_info (pandas.DataFrame): Table of all of the primary or irr BPC tables cohort (string): Cohort name logger (logging.Logger): The custom logger. Optional. - dry_run (bool): The dry run flag. If True, perform a dry run. - production (bool): If True, save the output to the production environment. - config (dict): config read in """ interval_cols_info = download_synapse_table(syn, "syn23281483", "") # Create new master table diff --git a/scripts/table_updates/utilities.py b/scripts/table_updates/utilities.py index 2a4c8264..c1b98175 100644 --- a/scripts/table_updates/utilities.py +++ b/scripts/table_updates/utilities.py @@ -135,4 +135,4 @@ def revert_table_version(syn, table_id): temp_data = old_data[table_columns] table_query = syn.tableQuery("SELECT * from %s" % table_id) syn.delete(table_query.asRowSet()) - syn.store(Table(table_schema, temp_data)) + syn.store(Table(table_schema, temp_data)) \ No newline at end of file From f049c3fe1bee890e0444eed5e573d553ec34ef8a Mon Sep 17 00:00:00 2001 From: Dan Lu <90745557+danlu1@users.noreply.github.com> Date: Wed, 2 Oct 2024 16:38:06 -0700 Subject: [PATCH 05/40] Update utilities.py --- scripts/table_updates/utilities.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/table_updates/utilities.py b/scripts/table_updates/utilities.py index c1b98175..ff89bd94 100644 --- a/scripts/table_updates/utilities.py +++ b/scripts/table_updates/utilities.py @@ -3,7 +3,7 @@ import pandas import synapseclient -from synapseclient import Schema, Table, as_table_columns +from synapseclient import Schema, Table def _is_float(val): @@ -135,4 +135,4 @@ def revert_table_version(syn, table_id): temp_data = old_data[table_columns] table_query = syn.tableQuery("SELECT * from %s" % table_id) syn.delete(table_query.asRowSet()) - syn.store(Table(table_schema, temp_data)) \ No newline at end of file + syn.store(Table(table_schema, temp_data)) From 7eb0d36efbd8e2db2498fcdd015a54671174710f Mon Sep 17 00:00:00 2001 From: Dan Lu <90745557+danlu1@users.noreply.github.com> Date: Wed, 2 Oct 2024 16:38:25 -0700 Subject: [PATCH 06/40] Update utilities.py From 936c578937a7fa112b011dd2c60b4d822180861c Mon Sep 17 00:00:00 2001 From: danlu1 Date: Wed, 2 Oct 2024 23:55:38 +0000 Subject: [PATCH 07/40] reformat code --- scripts/table_updates/update_data_table.py | 62 +++++++++++++++------- 1 file changed, 44 insertions(+), 18 deletions(-) diff --git a/scripts/table_updates/update_data_table.py b/scripts/table_updates/update_data_table.py index 5381d09b..d409c695 100644 --- a/scripts/table_updates/update_data_table.py +++ b/scripts/table_updates/update_data_table.py @@ -24,16 +24,18 @@ from utilities import * TABLE_INFO = { - "production" : { - "primary": ("syn23285911", "table_type='data'"), - "irr": ("syn21446696", "table_type='data' and double_curated is true"), - "redacted" : ("syn21446696", "table_type='data' and double_curated is false") -}, "staging" : { - "primary": ("syn63616766", "table_type='data'"), - "redacted" : ("syn63617582", "table_type='data' and double_curated is false") -} + "production": { + "primary": ("syn23285911", "table_type='data'"), + "irr": ("syn21446696", "table_type='data' and double_curated is true"), + "redacted": ("syn21446696", "table_type='data' and double_curated is false"), + }, + "staging": { + "primary": ("syn63616766", "table_type='data'"), + "redacted": ("syn63617582", "table_type='data' and double_curated is false"), + }, } + def get_main_genie_clinical_sample_file( syn: synapseclient.Synapse, release: str, @@ -121,7 +123,9 @@ def _store_data(syn, table_id, label_data, table_type, cohort, logger, dry_run): # remove .0 from all columns temp_data = temp_data.applymap(lambda x: float_to_int(x)) # update table - table_query = syn.tableQuery(f"SELECT * FROM {table_schema.id} where cohort = '{cohort}'") + table_query = syn.tableQuery( + f"SELECT * FROM {table_schema.id} where cohort = '{cohort}'" + ) if table_type == "irr": # check for exsiting id to update for new data only existing_records = list(set(table_query.asDataFrame()["record_id"])) @@ -133,6 +137,7 @@ def _store_data(syn, table_id, label_data, table_type, cohort, logger, dry_run): else: temp_data.to_csv(table_id + "_temp.csv") + def store_data(syn, master_table, label_data, table_type, cohort, logger, dry_run): """Store data to each table in the master table. @@ -149,6 +154,7 @@ def store_data(syn, master_table, label_data, table_type, cohort, logger, dry_ru for table_id in master_table["id"]: _store_data(syn, table_id, label_data, table_type, cohort, logger, dry_run) + def get_phi_cutoff(unit): switcher = {"day": math.floor(89 * 365), "month": math.floor(89 * 12), "year": 89} return switcher.get(unit, "Invalid unit") @@ -302,18 +308,24 @@ def update_redact_table(syn, redacted_table_info, full_data_table_info, cohort, for _, row in master_table.iterrows(): if row["name"] != "Patient Characteristics": table_id = row["id_full"] - df = syn.tableQuery(f"SELECT * FROM {table_id} where cohort = '{cohort}'").asDataFrame() + df = syn.tableQuery( + f"SELECT * FROM {table_id} where cohort = '{cohort}'" + ).asDataFrame() new_df, new_record_to_redact = _redact_table(df, interval_cols_info) new_df.reset_index(drop=True, inplace=True) record_to_redact = record_to_redact + new_record_to_redact table_schema = syn.get(row["id_redacted"]) logger.info("Updating table: %s" % table_schema.name) - table_query = syn.tableQuery(f"SELECT * from {table_schema.id} where cohort = '{cohort}'") + table_query = syn.tableQuery( + f"SELECT * from {table_schema.id} where cohort = '{cohort}'" + ) table = syn.delete(table_query) # wipe the table table = syn.store(Table(table_schema, new_df)) # Modify patient table - df = syn.tableQuery(f"SELECT * FROM {patient_table_id} where cohort = '{cohort}'").asDataFrame() + df = syn.tableQuery( + f"SELECT * FROM {patient_table_id} where cohort = '{cohort}'" + ).asDataFrame() new_df, new_record_to_redact = _redact_table(df, interval_cols_info) new_df.reset_index(drop=True, inplace=True) record_to_redact = record_to_redact + new_record_to_redact @@ -328,8 +340,10 @@ def update_redact_table(syn, redacted_table_info, full_data_table_info, cohort, master_table["name"] == "Patient Characteristics", "id_redacted" ].values[0] table_schema = syn.get(redacted_patient_id) - table_query = syn.tableQuery(f"SELECT * from {redacted_patient_id} where cohort = '{cohort}'") - table = syn.delete(table_query) # wipe the table + table_query = syn.tableQuery( + f"SELECT * from {redacted_patient_id} where cohort = '{cohort}'" + ) + table = syn.delete(table_query) # wipe the table table = syn.store(Table(table_schema, new_df)) # Update redacted column in full data patient table logger.info("Updating redacted column in the Sage internal table...") @@ -337,7 +351,9 @@ def update_redact_table(syn, redacted_table_info, full_data_table_info, cohort, master_table["name"] == "Patient Characteristics", "id_full" ].values[0] full_pt_schema = syn.get(full_pt_id) - pt_dat_query = syn.tableQuery(f"SELECT cohort, record_id FROM {full_pt_id} where cohort = '{cohort}'") + pt_dat_query = syn.tableQuery( + f"SELECT cohort, record_id FROM {full_pt_id} where cohort = '{cohort}'" + ) pt_dat = pt_dat_query.asDataFrame() pt_dat.index = pt_dat.index.map(str) pt_dat["index"] = pt_dat.index @@ -434,8 +450,18 @@ def main(): parser.add_argument( "-p", "--project_config", default="config.json", help="Project config file" ) - parser.add_argument("-c", "--cohort", default="", help="Cohort name for which the tables should be updated") - parser.add_argument("-pd", "--production", action="store_true", help="Save output to production folder") + parser.add_argument( + "-c", + "--cohort", + default="", + help="Cohort name for which the tables should be updated", + ) + parser.add_argument( + "-pd", + "--production", + action="store_true", + help="Save output to production folder", + ) parser.add_argument("-m", "--message", default="", help="Version comment") parser.add_argument("-d", "--dry_run", action="store_true", help="dry run flag") @@ -463,7 +489,7 @@ def main(): # get master table # This is the internal tables with non redacted - if production: + if production: TABLE_INFO = TABLE_INFO["production"] else: TABLE_INFO = TABLE_INFO["staging"] From 993c066da961846f12f8dbf9ec27562af038871a Mon Sep 17 00:00:00 2001 From: danlu1 Date: Thu, 3 Oct 2024 00:03:12 +0000 Subject: [PATCH 08/40] remove changes in utilities --- scripts/table_updates/utilities.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/table_updates/utilities.py b/scripts/table_updates/utilities.py index ff89bd94..4302801b 100644 --- a/scripts/table_updates/utilities.py +++ b/scripts/table_updates/utilities.py @@ -135,4 +135,4 @@ def revert_table_version(syn, table_id): temp_data = old_data[table_columns] table_query = syn.tableQuery("SELECT * from %s" % table_id) syn.delete(table_query.asRowSet()) - syn.store(Table(table_schema, temp_data)) + syn.store(Table(table_schema, temp_data)) \ No newline at end of file From 01398af49d3b1fc7cdea1c8d388e32e28513693e Mon Sep 17 00:00:00 2001 From: danlu1 Date: Thu, 3 Oct 2024 04:31:57 +0000 Subject: [PATCH 09/40] add missing parameter --- scripts/table_updates/update_data_table.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/scripts/table_updates/update_data_table.py b/scripts/table_updates/update_data_table.py index d409c695..ce0af96b 100644 --- a/scripts/table_updates/update_data_table.py +++ b/scripts/table_updates/update_data_table.py @@ -23,7 +23,7 @@ import pandas from utilities import * -TABLE_INFO = { +TABLES = { "production": { "primary": ("syn23285911", "table_type='data'"), "irr": ("syn21446696", "table_type='data' and double_curated is true"), @@ -439,7 +439,7 @@ def main(): description="Update data tables on Synapse for BPC databases" ) parser.add_argument( - "table", type=str, help="Specify table type to run", choices=TABLE_INFO.keys() + "table", type=str, help="Specify table type to run", choices=TABLES["production"].keys() ) parser.add_argument( "-s", @@ -490,9 +490,9 @@ def main(): # get master table # This is the internal tables with non redacted if production: - TABLE_INFO = TABLE_INFO["production"] + TABLE_INFO = TABLES["production"] else: - TABLE_INFO = TABLE_INFO["staging"] + TABLE_INFO = TABLES["staging"] table_id, condition = list(TABLE_INFO[table_type]) master_table = download_synapse_table(syn, table_id, condition) # download data files @@ -511,7 +511,7 @@ def main(): table_id, condition = list(TABLE_INFO["redacted"]) redacted_table_info = download_synapse_table(syn, table_id, condition) logger.info("Updating redacted tables...") - update_redact_table(syn, redacted_table_info, master_table, logger) + update_redact_table(syn, redacted_table_info, master_table, cohort, logger) logger.info("Updating version for redacted tables") for table_id in redacted_table_info["id"]: update_version(syn, table_id, comment) From b270a4b74c81b3a022b4caccaefe31b823bb004f Mon Sep 17 00:00:00 2001 From: danlu1 Date: Thu, 3 Oct 2024 18:32:28 +0000 Subject: [PATCH 10/40] add type hint and update docstring --- scripts/table_updates/update_data_table.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/scripts/table_updates/update_data_table.py b/scripts/table_updates/update_data_table.py index ce0af96b..43419c30 100644 --- a/scripts/table_updates/update_data_table.py +++ b/scripts/table_updates/update_data_table.py @@ -76,12 +76,12 @@ def get_main_genie_clinical_sample_file( return clinical_df[["SAMPLE_ID", "SEQ_YEAR"]] -def _store_data(syn, table_id, label_data, table_type, cohort, logger, dry_run): +def _store_data(syn: synapseclient.Synapse, table_id: str, label_data: pandas.DataFrame, table_type: str, cohort: str, logger: logging.Logger, dry_run: bool): """Helper function to store data to each table in the master table. Args: syn (synapseclient.Synapse): Synapse client connection - master_table (pandas.DataFrame): Table of all of the primary or irr BPC tables + table_id (string): The table id label_data (pandas.DataFrame): The uploaded data table_type (string): Table type, primary or irr cohort (string): Cohort name @@ -138,7 +138,7 @@ def _store_data(syn, table_id, label_data, table_type, cohort, logger, dry_run): temp_data.to_csv(table_id + "_temp.csv") -def store_data(syn, master_table, label_data, table_type, cohort, logger, dry_run): +def store_data(syn: synapseclient.Synapse, master_table: pandas.DataFrame, label_data: pandas.DataFrame, table_type: str, cohort: str, logger: logging.Logger, dry_run: bool): """Store data to each table in the master table. Args: @@ -154,7 +154,6 @@ def store_data(syn, master_table, label_data, table_type, cohort, logger, dry_ru for table_id in master_table["id"]: _store_data(syn, table_id, label_data, table_type, cohort, logger, dry_run) - def get_phi_cutoff(unit): switcher = {"day": math.floor(89 * 365), "month": math.floor(89 * 12), "year": 89} return switcher.get(unit, "Invalid unit") @@ -250,7 +249,7 @@ def _redact_table(df, interval_cols_info): return df, record_to_redact -def update_redact_table(syn, redacted_table_info, full_data_table_info, cohort, logger): +def update_redact_table(syn: synapseclient.Synapse, redacted_table_info: pandas.DataFrame, full_data_table_info: pandas.DataFrame, cohort: str, logger: logging.Logger): """Update redacted table Args: From a57182ba5b7cc14727bd9faaa229d1409c93ea55 Mon Sep 17 00:00:00 2001 From: danlu1 Date: Thu, 3 Oct 2024 22:04:15 +0000 Subject: [PATCH 11/40] add new parameters to nextflow script --- modules/update_data_table.nf | 16 +++++++++++----- scripts/table_updates/update_data_table.py | 18 ++++++++++++++++++ 2 files changed, 29 insertions(+), 5 deletions(-) diff --git a/modules/update_data_table.nf b/modules/update_data_table.nf index 95147a74..0d9216a3 100644 --- a/modules/update_data_table.nf +++ b/modules/update_data_table.nf @@ -9,8 +9,10 @@ process update_data_table { input: val previous - val comment + val cohort val production + val comment + val dry_run output: stdout @@ -19,13 +21,17 @@ process update_data_table { if (production) { """ cd /root/scripts/ - python update_data_table.py -p /root/scripts/config.json -m "$comment" primary + python update_data_table.py -p /root/scripts/config.json -c $cohort -m "$comment" primary -pd """ - } - else { + } else if (dry_run){ + """ + cd /root/scripts/ + python update_data_table.py -p /root/scripts/config.json -c $cohort -m "$comment" primary -d + """ + } else { """ cd /root/scripts/ - python update_data_table.py -p /root/scripts/config.json -m "$comment" primary -d + python update_data_table.py -p /root/scripts/config.json -c $cohort -m "$comment" primary """ } } diff --git a/scripts/table_updates/update_data_table.py b/scripts/table_updates/update_data_table.py index 43419c30..33587d1e 100644 --- a/scripts/table_updates/update_data_table.py +++ b/scripts/table_updates/update_data_table.py @@ -79,6 +79,16 @@ def get_main_genie_clinical_sample_file( def _store_data(syn: synapseclient.Synapse, table_id: str, label_data: pandas.DataFrame, table_type: str, cohort: str, logger: logging.Logger, dry_run: bool): """Helper function to store data to each table in the master table. + Before uploading data to the Synapse table, the provided label data is filtered + based on matching columns between the label data and the table schema, as well + as the form_label. Data cleansing, including the removal of rows with no data + and the conversion of numeric values to integers, is applied to the label data. + + When table_type is set to 'primary', existing data for the cohort is wiped, and + new data is inserted. When table_type is set to 'irr', only records that do not + already exist in the table are added. The dry_run flag can be used to toggle + between uploading the table to Synapse or saving it locally. + Args: syn (synapseclient.Synapse): Synapse client connection table_id (string): The table id @@ -252,6 +262,14 @@ def _redact_table(df, interval_cols_info): def update_redact_table(syn: synapseclient.Synapse, redacted_table_info: pandas.DataFrame, full_data_table_info: pandas.DataFrame, cohort: str, logger: logging.Logger): """Update redacted table + Before uploading data to the Synapse table, records are identified for redaction + based on criteria such as birth year, sequencing age, vital status, and interval + fields. The redacted data is then stored in the BPC internal tables. + + A special case applies to the Patient Characteristics table: flagged records are + updated, with the birth_year field cleared. Additionally, the "redacted" column + in this table is updated within the Sage Internal project. + Args: syn (synapseclient.Synapse): Synapse client connection redacted_table_info (pandas.DataFrame): Table of all of the redacted tables From f40edfabe0ded8baa410a97d401f0409847dc28f Mon Sep 17 00:00:00 2001 From: Dan Lu <90745557+danlu1@users.noreply.github.com> Date: Thu, 3 Oct 2024 15:38:22 -0700 Subject: [PATCH 12/40] Update README.md Update README file to reflect new parameters in update_data_table.py --- scripts/table_updates/README.md | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/scripts/table_updates/README.md b/scripts/table_updates/README.md index 7b0cfcb5..bacf9e0b 100644 --- a/scripts/table_updates/README.md +++ b/scripts/table_updates/README.md @@ -39,6 +39,14 @@ Usage ### Update the Synapse Tables with data #### Primary Case Tables - python update_data_table.py -m [version_comment] primary -#### IRR Case Tables +##### 1. dry-run (Save output to local) + python update_data_table.py -c [cohort_name] -m [version_comment] primary -d + +##### 2. production (Save output to production projects) + python update_data_table.py -c [cohort_name] -m [version_comment] primary -pd + +##### 3. staging (Save output to staging projects) + python update_data_table.py -c [cohort_name] -m [version_comment] primary + +#### IRR Case Tables (Deprecated) python update_data_table.py -m [version_comment] irr From 39fe512d340578ebc7284470b3bd9253cb27ceef Mon Sep 17 00:00:00 2001 From: danlu1 Date: Tue, 8 Oct 2024 17:08:32 +0000 Subject: [PATCH 13/40] add NA list to asDataFrame function in download_synapse_table --- scripts/table_updates/tests/test_utilities.py | 101 +++++++++++++++ scripts/table_updates/update_data_table.py | 42 +++--- scripts/table_updates/utilities.py | 121 +++++++++++++----- 3 files changed, 206 insertions(+), 58 deletions(-) create mode 100644 scripts/table_updates/tests/test_utilities.py diff --git a/scripts/table_updates/tests/test_utilities.py b/scripts/table_updates/tests/test_utilities.py new file mode 100644 index 00000000..f36d1519 --- /dev/null +++ b/scripts/table_updates/tests/test_utilities.py @@ -0,0 +1,101 @@ +import pdb +from unittest.mock import MagicMock, create_autospec, patch + +import numpy as np +import pandas as pd +import pytest +import synapseclient +from scripts.table_updates import utilities +from synapseclient import Schema, Table + + +@pytest.fixture(scope="session") +def syn(): + return create_autospec(synapseclient.Synapse) + +def test_download_with_select(syn): + select = "col1" + df = pd.DataFrame({ + 'col1': ['value1', 'value2'] + }) + schema = synapseclient.table.Schema( + name="test_table", + parent="syn123", + column_names=["col1", "col2"], + column_types=["STRING", "INTEGER"], + ) + syn.tableQuery = MagicMock(return_value = Table(schema, df)) + result = utilities.download_synapse_table(syn, "syn123456", select) + + syn.tableQuery.assert_called_once_with("SELECT col1 from syn123456") + pd.testing.assert_frame_equal(result, df) + +def test_download_without_condition(syn): + df = pd.DataFrame({ + 'col1': ['value1', 'value2'], + 'col2': [1, 2] + }) + schema = synapseclient.table.Schema( + name="test_table", + parent="syn123", + column_names=["col1", "col2"], + column_types=["STRING", "INTEGER"], + ) + syn.tableQuery = MagicMock(return_value = Table(schema, df)) + result = utilities.download_synapse_table(syn, "syn123456", condition = "") + + syn.tableQuery.assert_called_once_with("SELECT * from syn123456") + pd.testing.assert_frame_equal(result, df) + +def test_download_with_condition(syn): + condition = "col1 = 'value1'" + df = pd.DataFrame({ + 'col1': ['value1'], + 'col2': [1] + }) + schema = synapseclient.table.Schema( + name="test_table", + parent="syn123", + column_names=["col1", "col2"], + column_types=["STRING", "INTEGER"], + ) + syn.tableQuery = MagicMock(return_value = Table(schema, df)) + result = utilities.download_synapse_table(syn, "syn123456", condition = condition) + + syn.tableQuery.assert_called_once_with("SELECT * from syn123456 WHERE col1 = 'value1'") + pd.testing.assert_frame_equal(result, df) + +def test_download_with_na_values(syn): + df = pd.DataFrame({ + 'col1': ["NA", "value1", "None"], + 'col2': [1, 2, 3] + }) + schema = synapseclient.table.Schema( + name="test_table", + parent="syn123", + column_names=["col1", "col2"], + column_types=["STRING", "INTEGER"], + ) + syn.tableQuery = MagicMock(return_value = Table(schema, df)) + result = utilities.download_synapse_table(syn, "syn123456", condition = "") + + syn.tableQuery.assert_called_once_with("SELECT * from syn123456") + # Unlike None is not converted to nan + pd.testing.assert_frame_equal(result, pd.DataFrame({ + 'col1': [np.nan, "value1", "None"], + 'col2': [1, 2, 3] + })) + +def test_download_with_empty_table(syn): + df = pd.DataFrame(columns = ["col1", "col2"]) + schema = synapseclient.table.Schema( + name="test_table", + parent="syn123", + column_names=["col1", "col2"], + column_types=["STRING", "INTEGER"], + ) + syn.tableQuery = MagicMock(return_value = Table(schema, df)) + result = utilities.download_synapse_table(syn, "syn123456", condition = "") + + syn.tableQuery.assert_called_once_with("SELECT * from syn123456") + pd.testing.assert_frame_equal(result, df) \ No newline at end of file diff --git a/scripts/table_updates/update_data_table.py b/scripts/table_updates/update_data_table.py index 33587d1e..c89aa38a 100644 --- a/scripts/table_updates/update_data_table.py +++ b/scripts/table_updates/update_data_table.py @@ -54,9 +54,7 @@ def get_main_genie_clinical_sample_file( Returns: pandas.DataFrame: the read in clinical file as dataframe """ - release_files = syn.tableQuery( - f"SELECT * FROM {release_files_table_synid}" - ).asDataFrame() + release_files = download_synapse_table(syn, release_files_table_synid) clinical_link_synid = release_files[ (release_files["release"] == release) & (release_files["name"] == "data_clinical_sample.txt") @@ -277,7 +275,7 @@ def update_redact_table(syn: synapseclient.Synapse, redacted_table_info: pandas. cohort (string): Cohort name logger (logging.Logger): The custom logger. Optional. """ - interval_cols_info = download_synapse_table(syn, "syn23281483", "") + interval_cols_info = download_synapse_table(syn, "syn23281483") # Create new master table master_table = redacted_table_info.merge( full_data_table_info, on="name", suffixes=("_redacted", "_full") @@ -292,15 +290,11 @@ def update_redact_table(syn: synapseclient.Synapse, redacted_table_info: pandas. sample_table_id = master_table.loc[ master_table["name"] == "Cancer Panel Test", "id_full" ].values[0] - curation_info = syn.tableQuery( - f"SELECT record_id, curation_dt FROM {curation_table_id} where cohort = '{cohort}'" - ).asDataFrame() - patient_info = syn.tableQuery( - f"SELECT record_id, birth_year, hybrid_death_ind FROM {patient_table_id} where cohort = '{cohort}'" - ).asDataFrame() - sample_info = syn.tableQuery( - f"SELECT record_id, cpt_genie_sample_id, age_at_seq_report FROM {sample_table_id} where cohort = '{cohort}'" - ).asDataFrame() + # download tables + condition = f"cohort = '{cohort}'" + curation_info = download_synapse_table(syn, curation_table_id, "record_id, curation_dt", condition) + patient_info = download_synapse_table(syn, patient_table_id, "record_id, birth_year, hybrid_death_ind", condition) + sample_info = download_synapse_table(syn, sample_table_id, "record_id, cpt_genie_sample_id, age_at_seq_report", condition) patient_curation_info = patient_info.merge( curation_info, how="left", on="record_id" ) @@ -325,9 +319,7 @@ def update_redact_table(syn: synapseclient.Synapse, redacted_table_info: pandas. for _, row in master_table.iterrows(): if row["name"] != "Patient Characteristics": table_id = row["id_full"] - df = syn.tableQuery( - f"SELECT * FROM {table_id} where cohort = '{cohort}'" - ).asDataFrame() + df = download_synapse_table(syn, table_id, condition = condition) new_df, new_record_to_redact = _redact_table(df, interval_cols_info) new_df.reset_index(drop=True, inplace=True) record_to_redact = record_to_redact + new_record_to_redact @@ -340,9 +332,7 @@ def update_redact_table(syn: synapseclient.Synapse, redacted_table_info: pandas. table = syn.store(Table(table_schema, new_df)) # Modify patient table - df = syn.tableQuery( - f"SELECT * FROM {patient_table_id} where cohort = '{cohort}'" - ).asDataFrame() + df = download_synapse_table(syn, patient_table_id, condition = condition) new_df, new_record_to_redact = _redact_table(df, interval_cols_info) new_df.reset_index(drop=True, inplace=True) record_to_redact = record_to_redact + new_record_to_redact @@ -371,7 +361,7 @@ def update_redact_table(syn: synapseclient.Synapse, redacted_table_info: pandas. pt_dat_query = syn.tableQuery( f"SELECT cohort, record_id FROM {full_pt_id} where cohort = '{cohort}'" ) - pt_dat = pt_dat_query.asDataFrame() + pt_dat = download_synapse_table(syn, full_pt_id, "cohort, record_id", condition = condition) pt_dat.index = pt_dat.index.map(str) pt_dat["index"] = pt_dat.index info_to_update = new_df[["cohort", "record_id", "redacted"]] @@ -408,7 +398,7 @@ def custom_fix_for_cancer_panel_test_table( ].values[0] cpt_table_schema = syn.get(cpt_table_id) cpt_dat_query = syn.tableQuery("SELECT cpt_genie_sample_id FROM %s" % cpt_table_id) - cpt_dat = cpt_dat_query.asDataFrame() + cpt_dat = download_synapse_table(syn, cpt_table_id, select = "cpt_genie_sample_id") cpt_dat.index = cpt_dat.index.map(str) cpt_dat["index"] = cpt_dat.index genie_sample_dat = get_main_genie_clinical_sample_file( @@ -434,11 +424,9 @@ def custom_fix_for_cancer_panel_test_table( "SELECT cpt_sample_type FROM %s WHERE cpt_sample_type in (1,2,3,4,5,6,7)" % cpt_table_id ) - cpt_dat = cpt_dat_query.asDataFrame() + cpt_dat = download_synapse_table(syn, cpt_table_id, "cpt_sample_type", "cpt_sample_type in (1,2,3,4,5,6,7)") cpt_dat["cpt_sample_type"] = pandas.to_numeric(cpt_dat["cpt_sample_type"]) - sample_type_mapping = syn.tableQuery( - f"SELECT * FROM {config['main_genie_sample_mapping_table']}" - ).asDataFrame() + sample_type_mapping = download_synapse_table(syn, config['main_genie_sample_mapping_table']) sample_type_mapping_dict = sample_type_mapping.set_index("CODE").to_dict()[ "DESCRIPTION" ] @@ -511,7 +499,7 @@ def main(): else: TABLE_INFO = TABLES["staging"] table_id, condition = list(TABLE_INFO[table_type]) - master_table = download_synapse_table(syn, table_id, condition) + master_table = download_synapse_table(syn, table_id, condition = condition) # download data files # TODO: find the cohort that has new data # This is a mapping to all the intake data. e.g: ProstateBPCIntake_data @@ -526,7 +514,7 @@ def main(): custom_fix_for_cancer_panel_test_table(syn, master_table, logger, config) if table_type == "primary": table_id, condition = list(TABLE_INFO["redacted"]) - redacted_table_info = download_synapse_table(syn, table_id, condition) + redacted_table_info = download_synapse_table(syn, table_id, condition = condition) logger.info("Updating redacted tables...") update_redact_table(syn, redacted_table_info, master_table, cohort, logger) logger.info("Updating version for redacted tables") diff --git a/scripts/table_updates/utilities.py b/scripts/table_updates/utilities.py index 4302801b..a12b5e91 100644 --- a/scripts/table_updates/utilities.py +++ b/scripts/table_updates/utilities.py @@ -20,13 +20,14 @@ def _is_float(val): return True except ValueError: return False - + + def float_to_int(val): """Convert float type to integer if the value is integer - + Args: val: a value to be checked - + Returns: reformatted value """ @@ -36,57 +37,107 @@ def float_to_int(val): return str(int(val)) return val + def check_empty_row(row, cols_to_skip): """ Check if the row of data is empty with given columns to skip """ return row.drop(cols_to_skip).isnull().all() -def download_synapse_table(syn, table_id, condition): + +def download_synapse_table(syn, table_id: str, select: str = "*", condition: str = ""): """Download Synapse Table with the given table ID and condition - + Args: syn: Synapse credential table_id: Synapse ID of a table + select: Columns to be selected condition: additional condition for querying the table - + Returns: Dataframe: synapse table """ if condition: - condition = " WHERE "+condition - synapse_table = syn.tableQuery("SELECT * from %s%s" % (table_id,condition)) - synapse_table = synapse_table.asDataFrame() - return(synapse_table) + condition = " WHERE " + condition + synapse_table = syn.tableQuery(f"SELECT {select} from {table_id}{condition}") + na_values = [ + "-1.#IND", + "1.#QNAN", + "1.#IND", + "-1.#QNAN", + "#N/A N/A", + "#N/A", + "N/A", + "n/a", + "NA", + "", + "#NA", + "NULL", + "null", + "NaN", + "-NaN", + "nan", + "-nan", + "" + ] + synapse_table = synapse_table.asDataFrame(na_values=na_values, keep_default_na=False) + return synapse_table + def get_data(syn, label_data_id, cohort): """Download csv file from Synapse and add cohort column - + Args: syn (Object): Synapse credential label_data_id (String): Synapse ID of a csv file cohort: cohort value to be added as a column - + Returns: Dataframe: label data """ - na_values = ["", "#N/A", "#N/A N/A", "#NA", "-1.#IND", "-1.#QNAN", "-NaN", "-nan", "1.#IND", "1.#QNAN", "", "N/A", "NA", "NULL", "NaN", "n/a", "nan", "null"] - label_data = pandas.read_csv(syn.get(label_data_id).path, low_memory=False, na_values=na_values, keep_default_na=False) - label_data['cohort'] = cohort - return(label_data) + na_values = [ + "-1.#IND", + "1.#QNAN", + "1.#IND", + "-1.#QNAN", + "#N/A N/A", + "#N/A", + "N/A", + "n/a", + "NA", + "", + "#NA", + "NULL", + "null", + "NaN", + "-NaN", + "nan", + "-nan", + "" + ] + label_data = pandas.read_csv( + syn.get(label_data_id).path, + low_memory=False, + na_values=na_values, + keep_default_na=False, + ) + label_data["cohort"] = cohort + return label_data + def setup_custom_logger(name): """Set up customer logger Args: - name (String): Name of the logger - + name (String): Name of the logger + Returns: logger """ - formatter = logging.Formatter(fmt='%(asctime)s %(levelname)-8s %(message)s', - datefmt='%Y-%m-%d %H:%M:%S') - handler = logging.FileHandler('log.txt', mode='w') + formatter = logging.Formatter( + fmt="%(asctime)s %(levelname)-8s %(message)s", datefmt="%Y-%m-%d %H:%M:%S" + ) + handler = logging.FileHandler("log.txt", mode="w") handler.setFormatter(formatter) screen_handler = logging.StreamHandler(stream=sys.stdout) screen_handler.setFormatter(formatter) @@ -94,14 +145,15 @@ def setup_custom_logger(name): logger.setLevel(logging.DEBUG) logger.addHandler(handler) logger.addHandler(screen_handler) - return(logger) + return logger + def synapse_login(synapse_config): """Log into Synapse Args: synapse_config (String): File path to the Synapse config file - + Returns: Synapse object """ @@ -110,14 +162,19 @@ def synapse_login(synapse_config): except Exception: syn = synapseclient.Synapse(configPath=synapse_config, silent=True) syn.login() - return(syn) + return syn + def update_version(syn, table_id, comment): """ Update the table version with given table ID and comment """ - syn.restPOST("/entity/%s/table/snapshot" % table_id, body='{"snapshotComment":"%s"}' % comment) - + syn.restPOST( + "/entity/%s/table/snapshot" % table_id, + body='{"snapshotComment":"%s"}' % comment, + ) + + def revert_table_version(syn, table_id): """Revert table data to previous version @@ -127,12 +184,14 @@ def revert_table_version(syn, table_id): """ table_schema = syn.get(table_id) table_columns = syn.getColumns(table_schema.columnIds) - table_columns = [col['name'] for col in list(table_columns)] - previous_version_num = table_schema.versionNumber-1 - old_data = syn.tableQuery('SELECT * FROM %s.%s' % (table_id, previous_version_num)).asDataFrame() + table_columns = [col["name"] for col in list(table_columns)] + previous_version_num = table_schema.versionNumber - 1 + old_data = syn.tableQuery( + "SELECT * FROM %s.%s" % (table_id, previous_version_num) + ).asDataFrame() old_data = old_data.reset_index(drop=True) - table_columns = list(set(table_columns) & set(old_data.columns)) + table_columns = list(set(table_columns) & set(old_data.columns)) temp_data = old_data[table_columns] table_query = syn.tableQuery("SELECT * from %s" % table_id) syn.delete(table_query.asRowSet()) - syn.store(Table(table_schema, temp_data)) \ No newline at end of file + syn.store(Table(table_schema, temp_data)) From 19afbf54a2c4c0cf3ce8e12aa890e29092b50510 Mon Sep 17 00:00:00 2001 From: danlu1 Date: Mon, 14 Oct 2024 18:25:59 +0000 Subject: [PATCH 14/40] remove dry-run --- modules/update_data_table.nf | 5 ----- 1 file changed, 5 deletions(-) diff --git a/modules/update_data_table.nf b/modules/update_data_table.nf index 0d9216a3..cccfb5a2 100644 --- a/modules/update_data_table.nf +++ b/modules/update_data_table.nf @@ -23,11 +23,6 @@ process update_data_table { cd /root/scripts/ python update_data_table.py -p /root/scripts/config.json -c $cohort -m "$comment" primary -pd """ - } else if (dry_run){ - """ - cd /root/scripts/ - python update_data_table.py -p /root/scripts/config.json -c $cohort -m "$comment" primary -d - """ } else { """ cd /root/scripts/ From 8966b3be7397383ca1957801e496e913376719bf Mon Sep 17 00:00:00 2001 From: danlu1 Date: Mon, 14 Oct 2024 19:01:30 +0000 Subject: [PATCH 15/40] update synapseclient version --- scripts/table_updates/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/table_updates/requirements.txt b/scripts/table_updates/requirements.txt index 7103bdc4..b4425edf 100644 --- a/scripts/table_updates/requirements.txt +++ b/scripts/table_updates/requirements.txt @@ -1 +1 @@ -synapseclient[pandas] == 2.7.2 +synapseclient[pandas] == 4.6.0 From 705617b8304d96e5f57c343158df63ba5ca32e11 Mon Sep 17 00:00:00 2001 From: danlu1 Date: Mon, 14 Oct 2024 19:02:06 +0000 Subject: [PATCH 16/40] add return type hint --- scripts/table_updates/utilities.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/table_updates/utilities.py b/scripts/table_updates/utilities.py index a12b5e91..529530f5 100644 --- a/scripts/table_updates/utilities.py +++ b/scripts/table_updates/utilities.py @@ -45,7 +45,7 @@ def check_empty_row(row, cols_to_skip): return row.drop(cols_to_skip).isnull().all() -def download_synapse_table(syn, table_id: str, select: str = "*", condition: str = ""): +def download_synapse_table(syn, table_id: str, select: str = "*", condition: str = "") -> pandas.DataFrame: """Download Synapse Table with the given table ID and condition Args: @@ -55,7 +55,7 @@ def download_synapse_table(syn, table_id: str, select: str = "*", condition: str condition: additional condition for querying the table Returns: - Dataframe: synapse table + A Pandas dataframe of the Synapse table """ if condition: condition = " WHERE " + condition From 57e796ff9dd454dd09248a99be11d132897a82f0 Mon Sep 17 00:00:00 2001 From: danlu1 Date: Mon, 14 Oct 2024 19:03:19 +0000 Subject: [PATCH 17/40] add test cases --- scripts/table_updates/tests/test_utilities.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/scripts/table_updates/tests/test_utilities.py b/scripts/table_updates/tests/test_utilities.py index f36d1519..2d810c74 100644 --- a/scripts/table_updates/tests/test_utilities.py +++ b/scripts/table_updates/tests/test_utilities.py @@ -5,15 +5,15 @@ import pandas as pd import pytest import synapseclient -from scripts.table_updates import utilities from synapseclient import Schema, Table +from table_updates import utilities @pytest.fixture(scope="session") def syn(): return create_autospec(synapseclient.Synapse) -def test_download_with_select(syn): +def test_download_synapse_table_with_selected_columns(syn): select = "col1" df = pd.DataFrame({ 'col1': ['value1', 'value2'] @@ -30,7 +30,7 @@ def test_download_with_select(syn): syn.tableQuery.assert_called_once_with("SELECT col1 from syn123456") pd.testing.assert_frame_equal(result, df) -def test_download_without_condition(syn): +def test_download_synapse_table_without_condition(syn): df = pd.DataFrame({ 'col1': ['value1', 'value2'], 'col2': [1, 2] @@ -47,7 +47,7 @@ def test_download_without_condition(syn): syn.tableQuery.assert_called_once_with("SELECT * from syn123456") pd.testing.assert_frame_equal(result, df) -def test_download_with_condition(syn): +def test_download_synapse_table_with_condition(syn): condition = "col1 = 'value1'" df = pd.DataFrame({ 'col1': ['value1'], @@ -65,7 +65,7 @@ def test_download_with_condition(syn): syn.tableQuery.assert_called_once_with("SELECT * from syn123456 WHERE col1 = 'value1'") pd.testing.assert_frame_equal(result, df) -def test_download_with_na_values(syn): +def test_download_synapse_table_with_na_values(syn): df = pd.DataFrame({ 'col1': ["NA", "value1", "None"], 'col2': [1, 2, 3] @@ -86,7 +86,7 @@ def test_download_with_na_values(syn): 'col2': [1, 2, 3] })) -def test_download_with_empty_table(syn): +def test_download_synapse_table_with_empty_table(syn): df = pd.DataFrame(columns = ["col1", "col2"]) schema = synapseclient.table.Schema( name="test_table", From ee37d5a76c061f6c94906fe6ce54645cc8434bf3 Mon Sep 17 00:00:00 2001 From: danlu1 Date: Mon, 14 Oct 2024 20:51:21 +0000 Subject: [PATCH 18/40] remove unused values --- modules/update_data_table.nf | 1 - 1 file changed, 1 deletion(-) diff --git a/modules/update_data_table.nf b/modules/update_data_table.nf index cccfb5a2..5e2b0cb4 100644 --- a/modules/update_data_table.nf +++ b/modules/update_data_table.nf @@ -12,7 +12,6 @@ process update_data_table { val cohort val production val comment - val dry_run output: stdout From deca7e7af68d399c6f7b4006be8d2156aba69070 Mon Sep 17 00:00:00 2001 From: danlu1 Date: Fri, 18 Oct 2024 07:54:51 +0000 Subject: [PATCH 19/40] add cohort parameter to update_data_table --- main.nf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.nf b/main.nf index 324dac60..232c6634 100644 --- a/main.nf +++ b/main.nf @@ -92,7 +92,7 @@ workflow BPC_PIPELINE { run_quac_upload_report_warning(run_quac_upload_report_error.out, ch_cohort, params.production) merge_and_uncode_rca_uploads(run_quac_upload_report_warning.out, ch_cohort, params.production) // remove_patients_from_merged(merge_and_uncode_rca_uploads.out, ch_cohort, params.production) - update_data_table(merge_and_uncode_rca_uploads.out, ch_comment, params.production) + update_data_table(merge_and_uncode_rca_uploads.out, ch_cohort, ch_comment, params.production) update_date_tracking_table(update_data_table.out, ch_cohort, ch_comment, params.production) run_quac_table_report(update_date_tracking_table.out, ch_cohort, params.production) run_quac_comparison_report(run_quac_table_report.out, ch_cohort, params.production) From 7825ebc2aa9eb24fdd40e4dde93494916df4bde8 Mon Sep 17 00:00:00 2001 From: danlu1 Date: Fri, 18 Oct 2024 22:01:59 +0000 Subject: [PATCH 20/40] upgrade python version for synapseclient 4.6 --- scripts/table_updates/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/table_updates/Dockerfile b/scripts/table_updates/Dockerfile index 5138a2c8..20b10228 100644 --- a/scripts/table_updates/Dockerfile +++ b/scripts/table_updates/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3.8 +FROM python:3.9 WORKDIR /root/scripts From 992f506038ac771cfd8f9729125c294ca059547d Mon Sep 17 00:00:00 2001 From: danlu1 Date: Wed, 23 Oct 2024 22:48:11 +0000 Subject: [PATCH 21/40] bump Python version to 3.11 --- scripts/table_updates/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/table_updates/Dockerfile b/scripts/table_updates/Dockerfile index 20b10228..3789640e 100644 --- a/scripts/table_updates/Dockerfile +++ b/scripts/table_updates/Dockerfile @@ -1,4 +1,4 @@ -FROM python:3.9 +FROM python:3.11 WORKDIR /root/scripts From 2df21900e7ad4e674ac2a3f4210e665e5a21c614 Mon Sep 17 00:00:00 2001 From: danlu1 Date: Wed, 23 Oct 2024 23:03:31 +0000 Subject: [PATCH 22/40] pulled changes from develop and add cohort param to update_data_update in main.nf --- main.nf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/main.nf b/main.nf index 3008a7a3..45a0e635 100644 --- a/main.nf +++ b/main.nf @@ -101,7 +101,7 @@ workflow BPC_PIPELINE { run_quac_upload_report_warning(run_quac_upload_report_error.out, ch_cohort, params.production) merge_and_uncode_rca_uploads(run_quac_upload_report_warning.out, ch_cohort, params.production) // remove_patients_from_merged(merge_and_uncode_rca_uploads.out, ch_cohort, params.production) - update_data_table(merge_and_uncode_rca_uploads.out, ch_comment, params.production) + update_data_table(merge_and_uncode_rca_uploads.out, ch_cohort, ch_comment, params.production) update_date_tracking_table(update_data_table.out, ch_cohort, ch_comment, params.production) run_quac_table_report(update_date_tracking_table.out, ch_cohort, params.production) run_quac_comparison_report(run_quac_table_report.out, ch_cohort, params.production) From 9521330689b1252cf18eb5801110d54c10695c1a Mon Sep 17 00:00:00 2001 From: danlu1 Date: Wed, 23 Oct 2024 23:31:46 +0000 Subject: [PATCH 23/40] add ddefault value description to download_synapse_table --- scripts/table_updates/utilities.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/table_updates/utilities.py b/scripts/table_updates/utilities.py index 529530f5..ef4f598a 100644 --- a/scripts/table_updates/utilities.py +++ b/scripts/table_updates/utilities.py @@ -51,8 +51,8 @@ def download_synapse_table(syn, table_id: str, select: str = "*", condition: str Args: syn: Synapse credential table_id: Synapse ID of a table - select: Columns to be selected - condition: additional condition for querying the table + select: Columns to be selected. Defaults to all columns. + condition: Additional condition for querying the table. Defaults to all rows. Returns: A Pandas dataframe of the Synapse table From e2651f46bf9902fd1d260b40abc575fccea07ee3 Mon Sep 17 00:00:00 2001 From: danlu1 Date: Wed, 23 Oct 2024 23:32:21 +0000 Subject: [PATCH 24/40] bump checkout action version --- .github/workflows/build-docker-images.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build-docker-images.yml b/.github/workflows/build-docker-images.yml index fe118f5f..24291021 100644 --- a/.github/workflows/build-docker-images.yml +++ b/.github/workflows/build-docker-images.yml @@ -23,7 +23,7 @@ jobs: steps: - name: Checkout repository - uses: actions/checkout@v3 + uses: actions/checkout@v4 with: fetch-depth: 2 From 70a3a0e63527da544ccbfc2ebdee9d82f0eb5055 Mon Sep 17 00:00:00 2001 From: Dan Lu <90745557+danlu1@users.noreply.github.com> Date: Wed, 23 Oct 2024 21:11:30 -0700 Subject: [PATCH 25/40] Update build-docker-images.yml test --- .github/workflows/build-docker-images.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build-docker-images.yml b/.github/workflows/build-docker-images.yml index 24291021..e417fab5 100644 --- a/.github/workflows/build-docker-images.yml +++ b/.github/workflows/build-docker-images.yml @@ -37,7 +37,7 @@ jobs: id: check_changes run: | # Determine the correct DIFF_BASE - if [ "${{ github.ref_name }}" = "develop" ]; then + if [ "${{ github.ref_name }}" == "develop" ]; then # On the develop branch, compare with the previous commit (HEAD^) DIFF_BASE="HEAD^" else From fe717c66d6bc59555792a8ba0df7b08b858e4556 Mon Sep 17 00:00:00 2001 From: danlu1 Date: Thu, 24 Oct 2024 04:42:47 +0000 Subject: [PATCH 26/40] test to create docker image --- .github/workflows/build-docker-images.yml | 2 +- scripts/table_updates/tests/test_utilities.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/build-docker-images.yml b/.github/workflows/build-docker-images.yml index e417fab5..24291021 100644 --- a/.github/workflows/build-docker-images.yml +++ b/.github/workflows/build-docker-images.yml @@ -37,7 +37,7 @@ jobs: id: check_changes run: | # Determine the correct DIFF_BASE - if [ "${{ github.ref_name }}" == "develop" ]; then + if [ "${{ github.ref_name }}" = "develop" ]; then # On the develop branch, compare with the previous commit (HEAD^) DIFF_BASE="HEAD^" else diff --git a/scripts/table_updates/tests/test_utilities.py b/scripts/table_updates/tests/test_utilities.py index 2d810c74..954f7486 100644 --- a/scripts/table_updates/tests/test_utilities.py +++ b/scripts/table_updates/tests/test_utilities.py @@ -13,7 +13,7 @@ def syn(): return create_autospec(synapseclient.Synapse) -def test_download_synapse_table_with_selected_columns(syn): +def test_download_synapse_table_with_selected_single_column(syn): select = "col1" df = pd.DataFrame({ 'col1': ['value1', 'value2'] From e53df57e7b206b1eaf6cc3c0a1b3ddf756cddc1b Mon Sep 17 00:00:00 2001 From: danlu1 Date: Thu, 24 Oct 2024 17:17:36 +0000 Subject: [PATCH 27/40] add table_update docker to nextflow pipeline --- main.nf | 7 ++++--- modules/update_data_table.nf | 4 ++-- nextflow.config | 1 + nextflow_schema.json | 7 ++++++- 4 files changed, 13 insertions(+), 6 deletions(-) diff --git a/main.nf b/main.nf index 45a0e635..0f47f65d 100644 --- a/main.nf +++ b/main.nf @@ -93,9 +93,10 @@ workflow BPC_PIPELINE { ch_comment = Channel.value(params.comment) if (params.step == "update_potential_phi_fields_table") { - update_potential_phi_fields_table(ch_comment, params.production) - // validate_data.out.view() - } else if (params.step == "genie_bpc_pipeline"){ + update_potential_phi_fields_table(ch_comment, params.production)// validate_data.out.view() + } else if (params.step == "update_data_table") { + update_data_table("default", ch_cohort, ch_comment, params.production) + } else if (params.step == "genie_bpc_pipeline"){ update_potential_phi_fields_table(ch_comment, params.production) run_quac_upload_report_error(update_potential_phi_fields_table.out, ch_cohort) run_quac_upload_report_warning(run_quac_upload_report_error.out, ch_cohort, params.production) diff --git a/modules/update_data_table.nf b/modules/update_data_table.nf index 5e2b0cb4..000fe425 100644 --- a/modules/update_data_table.nf +++ b/modules/update_data_table.nf @@ -2,16 +2,16 @@ Update Synapse tables with merged and uncoded data. */ process update_data_table { + container "$params.table_updates_docker" - container 'sagebionetworks/genie-bpc-pipeline-table-updates' secret 'SYNAPSE_AUTH_TOKEN' debug true input: val previous val cohort - val production val comment + val production output: stdout diff --git a/nextflow.config b/nextflow.config index 04a029c8..ad699829 100644 --- a/nextflow.config +++ b/nextflow.config @@ -53,6 +53,7 @@ profiles { params { // docker image parameters, see nextflow_schema.json for details references_docker = "sagebionetworks/genie-bpc-pipeline-references" + table_updates_docker = "sagebionetworks/genie-bpc-pipeline-table-updates" } } } diff --git a/nextflow_schema.json b/nextflow_schema.json index 197ed039..d5a111d2 100644 --- a/nextflow_schema.json +++ b/nextflow_schema.json @@ -50,13 +50,18 @@ "description": "Available BPC steps", "enum": [ "update_potential_phi_fields_table", - "genie_bpc_pipeline" + "genie_bpc_pipeline", + "update_data_table" ] }, "references_docker":{ "type": "string", "description": "Name of docker to use in processes in scripts/references" }, + "table_updates_docker":{ + "type": "string", + "description": "Name of docker to use in processes in scripts/table_updates" + }, "schema_ignore_params": { "type": "string", "description": "Put parameters to ignore for validation here separated by comma", From b89e6081bc9742320c7cf50d16a0bfea0a17e016 Mon Sep 17 00:00:00 2001 From: danlu1 Date: Thu, 24 Oct 2024 17:18:03 +0000 Subject: [PATCH 28/40] re-organize and add test cases for download_synapse_table --- scripts/table_updates/tests/test_utilities.py | 114 ++++++------------ 1 file changed, 40 insertions(+), 74 deletions(-) diff --git a/scripts/table_updates/tests/test_utilities.py b/scripts/table_updates/tests/test_utilities.py index 954f7486..e3638877 100644 --- a/scripts/table_updates/tests/test_utilities.py +++ b/scripts/table_updates/tests/test_utilities.py @@ -1,4 +1,3 @@ -import pdb from unittest.mock import MagicMock, create_autospec, patch import numpy as np @@ -13,89 +12,56 @@ def syn(): return create_autospec(synapseclient.Synapse) -def test_download_synapse_table_with_selected_single_column(syn): - select = "col1" - df = pd.DataFrame({ - 'col1': ['value1', 'value2'] - }) +@pytest.fixture(scope="session") +def table_schema(): schema = synapseclient.table.Schema( name="test_table", parent="syn123", column_names=["col1", "col2"], column_types=["STRING", "INTEGER"], ) - syn.tableQuery = MagicMock(return_value = Table(schema, df)) - result = utilities.download_synapse_table(syn, "syn123456", select) + return schema - syn.tableQuery.assert_called_once_with("SELECT col1 from syn123456") - pd.testing.assert_frame_equal(result, df) -def test_download_synapse_table_without_condition(syn): - df = pd.DataFrame({ - 'col1': ['value1', 'value2'], - 'col2': [1, 2] - }) - schema = synapseclient.table.Schema( - name="test_table", - parent="syn123", - column_names=["col1", "col2"], - column_types=["STRING", "INTEGER"], - ) - syn.tableQuery = MagicMock(return_value = Table(schema, df)) - result = utilities.download_synapse_table(syn, "syn123456", condition = "") +@pytest.mark.parametrize( + "query_return_df,select,query,expected_df", + [ + (pd.DataFrame({'col1': ['value1', 'value2']}), "col1", "SELECT col1 from syn123456",pd.DataFrame({'col1': ['value1', 'value2']})), + (pd.DataFrame({'col1': ['value1', 'value2'],'col2': [1, 2]}), "col1,col2", "SELECT col1,col2 from syn123456",pd.DataFrame({'col1': ['value1', 'value2'],'col2': [1, 2]})), + (pd.DataFrame({'col1': ["NA", "value1", "None"],'col2': [1, 2, 3]}),"*","SELECT * from syn123456",pd.DataFrame({'col1': [np.nan, "value1", "None"],'col2': [1, 2, 3]})), + (pd.DataFrame(columns = ["col1", "col2"]),"*","SELECT * from syn123456",pd.DataFrame(columns = ["col1", "col2"])), + ], + ids = ["selected_single_column","selected_multiple_column","pull_table_with_na_values_all_columns","pull_empty_table_all_columns"], +) +def test_download_synapse_table_default_condition(syn, table_schema, query_return_df, select, query, expected_df): + syn.tableQuery = MagicMock(return_value = Table(table_schema, query_return_df)) + result = utilities.download_synapse_table(syn, "syn123456", select) - syn.tableQuery.assert_called_once_with("SELECT * from syn123456") - pd.testing.assert_frame_equal(result, df) + # validate + syn.tableQuery.assert_called_once_with(query) + pd.testing.assert_frame_equal(result, expected_df) -def test_download_synapse_table_with_condition(syn): - condition = "col1 = 'value1'" - df = pd.DataFrame({ - 'col1': ['value1'], - 'col2': [1] - }) - schema = synapseclient.table.Schema( - name="test_table", - parent="syn123", - column_names=["col1", "col2"], - column_types=["STRING", "INTEGER"], - ) - syn.tableQuery = MagicMock(return_value = Table(schema, df)) +@pytest.mark.parametrize( + "query_return_df,condition,query,expected_df", + [ + (pd.DataFrame({'col1': ['value1'],'col2': [1]}), "col1 = 'value1'", "SELECT * from syn123456 WHERE col1 = 'value1'",pd.DataFrame({'col1': ['value1'],'col2': [1]})), + (pd.DataFrame({'col1': ["NA", "value1", "None"],'col2': [1, 1, 1]}), "col2 = 1","SELECT * from syn123456 WHERE col2 = 1",pd.DataFrame({'col1': [np.nan, "value1", "None"],'col2': [1, 1, 1]})), + ], + ids = ["selected_row_all_columns","pull_table_with_na_values_all_columns"], +) +def test_download_synapse_table_with_condition(syn, table_schema, query_return_df, condition, query,expected_df): + syn.tableQuery = MagicMock(return_value = Table(table_schema, query_return_df)) result = utilities.download_synapse_table(syn, "syn123456", condition = condition) - syn.tableQuery.assert_called_once_with("SELECT * from syn123456 WHERE col1 = 'value1'") - pd.testing.assert_frame_equal(result, df) - -def test_download_synapse_table_with_na_values(syn): - df = pd.DataFrame({ - 'col1': ["NA", "value1", "None"], - 'col2': [1, 2, 3] - }) - schema = synapseclient.table.Schema( - name="test_table", - parent="syn123", - column_names=["col1", "col2"], - column_types=["STRING", "INTEGER"], - ) - syn.tableQuery = MagicMock(return_value = Table(schema, df)) - result = utilities.download_synapse_table(syn, "syn123456", condition = "") - - syn.tableQuery.assert_called_once_with("SELECT * from syn123456") - # Unlike None is not converted to nan - pd.testing.assert_frame_equal(result, pd.DataFrame({ - 'col1': [np.nan, "value1", "None"], - 'col2': [1, 2, 3] - })) - -def test_download_synapse_table_with_empty_table(syn): - df = pd.DataFrame(columns = ["col1", "col2"]) - schema = synapseclient.table.Schema( - name="test_table", - parent="syn123", - column_names=["col1", "col2"], - column_types=["STRING", "INTEGER"], - ) - syn.tableQuery = MagicMock(return_value = Table(schema, df)) - result = utilities.download_synapse_table(syn, "syn123456", condition = "") + # validate + syn.tableQuery.assert_called_once_with(query) + pd.testing.assert_frame_equal(result, expected_df) - syn.tableQuery.assert_called_once_with("SELECT * from syn123456") - pd.testing.assert_frame_equal(result, df) \ No newline at end of file +def test_download_empty_synapse_table_with_condition(syn, table_schema, ): + syn.tableQuery = MagicMock(return_value = Table(table_schema, pd.DataFrame(columns = ["col1", "col2"]))) + result = utilities.download_synapse_table(syn, "syn123456", condition = "col2 = 1") + + # validate + syn.tableQuery.assert_called_once_with("SELECT * from syn123456 WHERE col2 = 1") + pd.testing.assert_frame_equal(result, pd.DataFrame(columns = ["col1", "col2"])) + \ No newline at end of file From 7d2a00012e4806272e0d9890cd07c3bde5c9bb4a Mon Sep 17 00:00:00 2001 From: danlu1 Date: Thu, 24 Oct 2024 18:28:46 +0000 Subject: [PATCH 29/40] add contributing instructions --- CONTRIBUTING.md | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 364f8103..d24a58a7 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,5 +1,28 @@ # genie-bpc-pipeline: Contributing Guidelines +## Getting started +1. [Clone the repository](https://help.github.com/articles/cloning-a-repository/) to your local machine so you can begin making changes. +2. On your local machine make sure you have the latest version of the `develop` branch: + + ``` + git checkout develop + git pull origin develop + ``` +3. Create a feature branch off the `develop` branch and work on it. The branch should be named the same as the JIRA issue you are working on in **lowercase** (e.g., `gen-1234-{feature-here}`). Make sure the branch name as informative as possible. + ``` + git checkout develop + git checkout -b gen-1234-{feature-here} + ``` +4. Once you have made your additions or changes, make sure you write tests and run the [comparison scripts](https://github.com/Sage-Bionetworks/Genie_processing/blob/create_generalized_comparison_script/utility_scripts/compare_between_two_synapse_entities.py) to ensure changes are expected. +5. At this point, you have only created the branch locally, you need to push this to your fork on GitHub. + + ``` + git add your file + git commit -m"your commit information" + git push --set-upstream origin SYNPY-1234-{feature-here} + ``` +6. Create a pull request from the feature branch to the develop branch. An Github action will be triggered to create a docker image for the branch, you can check it [here](https://github.com/Sage-Bionetworks/genie-bpc-pipeline/pkgs/container/genie-bpc-pipeline). + ## Nextflow Pipeline contribution Here is how to contribute to the nextflow workflow of the genie-bpc-pipeline From f711e296e2acee2d193d7aa3aa1e0044718df86e Mon Sep 17 00:00:00 2001 From: danlu1 Date: Thu, 24 Oct 2024 19:25:47 +0000 Subject: [PATCH 30/40] add arg names to download_synapse_table --- scripts/table_updates/update_data_table.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/scripts/table_updates/update_data_table.py b/scripts/table_updates/update_data_table.py index c89aa38a..85007c3c 100644 --- a/scripts/table_updates/update_data_table.py +++ b/scripts/table_updates/update_data_table.py @@ -54,7 +54,7 @@ def get_main_genie_clinical_sample_file( Returns: pandas.DataFrame: the read in clinical file as dataframe """ - release_files = download_synapse_table(syn, release_files_table_synid) + release_files = download_synapse_table(syn, table_id = release_files_table_synid) clinical_link_synid = release_files[ (release_files["release"] == release) & (release_files["name"] == "data_clinical_sample.txt") @@ -136,7 +136,7 @@ def _store_data(syn: synapseclient.Synapse, table_id: str, label_data: pandas.Da ) if table_type == "irr": # check for exsiting id to update for new data only - existing_records = list(set(table_query.asDataFrame()["record_id"])) + existing_records = list(set(download_synapse_table(syn, table_id = table_schema.id, condition= f"cohort = '{cohort}'")["record_id"])) temp_data = temp_data[~temp_data["record_id"].isin(existing_records)] if not dry_run: if table_type == "primary": @@ -275,7 +275,7 @@ def update_redact_table(syn: synapseclient.Synapse, redacted_table_info: pandas. cohort (string): Cohort name logger (logging.Logger): The custom logger. Optional. """ - interval_cols_info = download_synapse_table(syn, "syn23281483") + interval_cols_info = download_synapse_table(syn, table_id = "syn23281483") # Create new master table master_table = redacted_table_info.merge( full_data_table_info, on="name", suffixes=("_redacted", "_full") @@ -292,9 +292,9 @@ def update_redact_table(syn: synapseclient.Synapse, redacted_table_info: pandas. ].values[0] # download tables condition = f"cohort = '{cohort}'" - curation_info = download_synapse_table(syn, curation_table_id, "record_id, curation_dt", condition) - patient_info = download_synapse_table(syn, patient_table_id, "record_id, birth_year, hybrid_death_ind", condition) - sample_info = download_synapse_table(syn, sample_table_id, "record_id, cpt_genie_sample_id, age_at_seq_report", condition) + curation_info = download_synapse_table(syn, table_id = curation_table_id, select = "record_id, curation_dt", condition = condition) + patient_info = download_synapse_table(syn, table_id = patient_table_id, select = "record_id, birth_year, hybrid_death_ind", condition = condition) + sample_info = download_synapse_table(syn, table_id = sample_table_id, select = "record_id, cpt_genie_sample_id, age_at_seq_report", condition = condition) patient_curation_info = patient_info.merge( curation_info, how="left", on="record_id" ) @@ -332,7 +332,7 @@ def update_redact_table(syn: synapseclient.Synapse, redacted_table_info: pandas. table = syn.store(Table(table_schema, new_df)) # Modify patient table - df = download_synapse_table(syn, patient_table_id, condition = condition) + df = download_synapse_table(syn, table_id = patient_table_id, condition = condition) new_df, new_record_to_redact = _redact_table(df, interval_cols_info) new_df.reset_index(drop=True, inplace=True) record_to_redact = record_to_redact + new_record_to_redact @@ -361,7 +361,7 @@ def update_redact_table(syn: synapseclient.Synapse, redacted_table_info: pandas. pt_dat_query = syn.tableQuery( f"SELECT cohort, record_id FROM {full_pt_id} where cohort = '{cohort}'" ) - pt_dat = download_synapse_table(syn, full_pt_id, "cohort, record_id", condition = condition) + pt_dat = download_synapse_table(syn, table_id = full_pt_id, select = "cohort, record_id", condition = condition) pt_dat.index = pt_dat.index.map(str) pt_dat["index"] = pt_dat.index info_to_update = new_df[["cohort", "record_id", "redacted"]] @@ -398,7 +398,7 @@ def custom_fix_for_cancer_panel_test_table( ].values[0] cpt_table_schema = syn.get(cpt_table_id) cpt_dat_query = syn.tableQuery("SELECT cpt_genie_sample_id FROM %s" % cpt_table_id) - cpt_dat = download_synapse_table(syn, cpt_table_id, select = "cpt_genie_sample_id") + cpt_dat = download_synapse_table(syn, table_id = cpt_table_id, select = "cpt_genie_sample_id") cpt_dat.index = cpt_dat.index.map(str) cpt_dat["index"] = cpt_dat.index genie_sample_dat = get_main_genie_clinical_sample_file( @@ -424,9 +424,9 @@ def custom_fix_for_cancer_panel_test_table( "SELECT cpt_sample_type FROM %s WHERE cpt_sample_type in (1,2,3,4,5,6,7)" % cpt_table_id ) - cpt_dat = download_synapse_table(syn, cpt_table_id, "cpt_sample_type", "cpt_sample_type in (1,2,3,4,5,6,7)") + cpt_dat = download_synapse_table(syn, table_id = cpt_table_id, select = "cpt_sample_type", condition = "cpt_sample_type in (1,2,3,4,5,6,7)") cpt_dat["cpt_sample_type"] = pandas.to_numeric(cpt_dat["cpt_sample_type"]) - sample_type_mapping = download_synapse_table(syn, config['main_genie_sample_mapping_table']) + sample_type_mapping = download_synapse_table(syn, table_id = config['main_genie_sample_mapping_table']) sample_type_mapping_dict = sample_type_mapping.set_index("CODE").to_dict()[ "DESCRIPTION" ] From 19bcc841ba8a8a013ecca5a77c81123e33e096d8 Mon Sep 17 00:00:00 2001 From: danlu1 Date: Thu, 24 Oct 2024 22:06:23 +0000 Subject: [PATCH 31/40] add instructions for nextflow testing --- CONTRIBUTING.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index d24a58a7..50baa208 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -53,3 +53,7 @@ Parameters should be initialized / defined with default values in the set parame ### Default processes resource requirements Defaults for process resource requirements (CPUs / memory / time) for a process should be defined in `nextflow.config`. + +### Testing +1. To test locally, you can’t use ghcr.io/sage-bionetworks/docker_image_name locally directly. You will need to pull the docker image to local and test with it. +2. To test on Sequra, go to the test_bpc_pipeline, edit the pipeline by pointing it to your feature branch then update. Doing this will allow you to select parameters from the dropdown menu directly. \ No newline at end of file From 8c20d900c06633caa141a88cf892d1037d069174 Mon Sep 17 00:00:00 2001 From: Dan Lu <90745557+danlu1@users.noreply.github.com> Date: Thu, 24 Oct 2024 16:38:03 -0700 Subject: [PATCH 32/40] Update CONTRIBUTING.md --- CONTRIBUTING.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 50baa208..3781bcc7 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -19,9 +19,9 @@ ``` git add your file git commit -m"your commit information" - git push --set-upstream origin SYNPY-1234-{feature-here} + git push --set-upstream origin gen-1234-{feature-here} ``` -6. Create a pull request from the feature branch to the develop branch. An Github action will be triggered to create a docker image for the branch, you can check it [here](https://github.com/Sage-Bionetworks/genie-bpc-pipeline/pkgs/container/genie-bpc-pipeline). +6. Create a pull request from the feature branch to the develop branch. When changes are made to `script/` (only applies to scripts/references and scripts/table_updates for now), a Github action will be triggered to create a docker image for the branch, you can check it [here](https://github.com/Sage-Bionetworks/genie-bpc-pipeline/pkgs/container/genie-bpc-pipeline). ## Nextflow Pipeline contribution @@ -56,4 +56,4 @@ Defaults for process resource requirements (CPUs / memory / time) for a process ### Testing 1. To test locally, you can’t use ghcr.io/sage-bionetworks/docker_image_name locally directly. You will need to pull the docker image to local and test with it. -2. To test on Sequra, go to the test_bpc_pipeline, edit the pipeline by pointing it to your feature branch then update. Doing this will allow you to select parameters from the dropdown menu directly. \ No newline at end of file +2. To test on Sequra, go to the test_bpc_pipeline, edit the pipeline by pointing it to your feature branch then update. Doing this will allow you to select parameters from the dropdown menu directly. From a7ff3ab9f2278a0b1afd26b86e4123aa149c50d3 Mon Sep 17 00:00:00 2001 From: Dan Lu <90745557+danlu1@users.noreply.github.com> Date: Thu, 24 Oct 2024 16:40:48 -0700 Subject: [PATCH 33/40] Update CONTRIBUTING.md --- CONTRIBUTING.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 3781bcc7..6c419728 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -13,7 +13,7 @@ git checkout develop git checkout -b gen-1234-{feature-here} ``` -4. Once you have made your additions or changes, make sure you write tests and run the [comparison scripts](https://github.com/Sage-Bionetworks/Genie_processing/blob/create_generalized_comparison_script/utility_scripts/compare_between_two_synapse_entities.py) to ensure changes are expected. +4. Once you have made your additions or changes, make sure you write tests and run [comparison scripts](https://github.com/Sage-Bionetworks/Genie_processing/blob/ed806d163fa4063a84920483e8ada21ea4b6cf47/README.md#comparisons-between-two-synapse-entities) to ensure changes are expected. 5. At this point, you have only created the branch locally, you need to push this to your fork on GitHub. ``` From 3e114364e5dd910bf9af5a84dc658cb71f942c47 Mon Sep 17 00:00:00 2001 From: danlu1 Date: Thu, 24 Oct 2024 23:54:59 +0000 Subject: [PATCH 34/40] reformat code --- scripts/table_updates/tests/test_utilities.py | 84 +++++++++--- scripts/table_updates/update_data_table.py | 120 +++++++++++++----- scripts/table_updates/utilities.py | 16 ++- 3 files changed, 165 insertions(+), 55 deletions(-) diff --git a/scripts/table_updates/tests/test_utilities.py b/scripts/table_updates/tests/test_utilities.py index e3638877..fff06036 100644 --- a/scripts/table_updates/tests/test_utilities.py +++ b/scripts/table_updates/tests/test_utilities.py @@ -12,6 +12,7 @@ def syn(): return create_autospec(synapseclient.Synapse) + @pytest.fixture(scope="session") def table_schema(): schema = synapseclient.table.Schema( @@ -26,42 +27,87 @@ def table_schema(): @pytest.mark.parametrize( "query_return_df,select,query,expected_df", [ - (pd.DataFrame({'col1': ['value1', 'value2']}), "col1", "SELECT col1 from syn123456",pd.DataFrame({'col1': ['value1', 'value2']})), - (pd.DataFrame({'col1': ['value1', 'value2'],'col2': [1, 2]}), "col1,col2", "SELECT col1,col2 from syn123456",pd.DataFrame({'col1': ['value1', 'value2'],'col2': [1, 2]})), - (pd.DataFrame({'col1': ["NA", "value1", "None"],'col2': [1, 2, 3]}),"*","SELECT * from syn123456",pd.DataFrame({'col1': [np.nan, "value1", "None"],'col2': [1, 2, 3]})), - (pd.DataFrame(columns = ["col1", "col2"]),"*","SELECT * from syn123456",pd.DataFrame(columns = ["col1", "col2"])), + ( + pd.DataFrame({"col1": ["value1", "value2"]}), + "col1", + "SELECT col1 from syn123456", + pd.DataFrame({"col1": ["value1", "value2"]}), + ), + ( + pd.DataFrame({"col1": ["value1", "value2"], "col2": [1, 2]}), + "col1,col2", + "SELECT col1,col2 from syn123456", + pd.DataFrame({"col1": ["value1", "value2"], "col2": [1, 2]}), + ), + ( + pd.DataFrame({"col1": ["NA", "value1", "None"], "col2": [1, 2, 3]}), + "*", + "SELECT * from syn123456", + pd.DataFrame({"col1": [np.nan, "value1", "None"], "col2": [1, 2, 3]}), + ), + ( + pd.DataFrame(columns=["col1", "col2"]), + "*", + "SELECT * from syn123456", + pd.DataFrame(columns=["col1", "col2"]), + ), + ], + ids=[ + "selected_single_column", + "selected_multiple_column", + "pull_table_with_na_values_all_columns", + "pull_empty_table_all_columns", ], - ids = ["selected_single_column","selected_multiple_column","pull_table_with_na_values_all_columns","pull_empty_table_all_columns"], ) -def test_download_synapse_table_default_condition(syn, table_schema, query_return_df, select, query, expected_df): - syn.tableQuery = MagicMock(return_value = Table(table_schema, query_return_df)) +def test_download_synapse_table_default_condition( + syn, table_schema, query_return_df, select, query, expected_df +): + syn.tableQuery = MagicMock(return_value=Table(table_schema, query_return_df)) result = utilities.download_synapse_table(syn, "syn123456", select) # validate syn.tableQuery.assert_called_once_with(query) pd.testing.assert_frame_equal(result, expected_df) + @pytest.mark.parametrize( "query_return_df,condition,query,expected_df", [ - (pd.DataFrame({'col1': ['value1'],'col2': [1]}), "col1 = 'value1'", "SELECT * from syn123456 WHERE col1 = 'value1'",pd.DataFrame({'col1': ['value1'],'col2': [1]})), - (pd.DataFrame({'col1': ["NA", "value1", "None"],'col2': [1, 1, 1]}), "col2 = 1","SELECT * from syn123456 WHERE col2 = 1",pd.DataFrame({'col1': [np.nan, "value1", "None"],'col2': [1, 1, 1]})), + ( + pd.DataFrame({"col1": ["value1"], "col2": [1]}), + "col1 = 'value1'", + "SELECT * from syn123456 WHERE col1 = 'value1'", + pd.DataFrame({"col1": ["value1"], "col2": [1]}), + ), + ( + pd.DataFrame({"col1": ["NA", "value1", "None"], "col2": [1, 1, 1]}), + "col2 = 1", + "SELECT * from syn123456 WHERE col2 = 1", + pd.DataFrame({"col1": [np.nan, "value1", "None"], "col2": [1, 1, 1]}), + ), ], - ids = ["selected_row_all_columns","pull_table_with_na_values_all_columns"], + ids=["selected_row_all_columns", "pull_table_with_na_values_all_columns"], ) -def test_download_synapse_table_with_condition(syn, table_schema, query_return_df, condition, query,expected_df): - syn.tableQuery = MagicMock(return_value = Table(table_schema, query_return_df)) - result = utilities.download_synapse_table(syn, "syn123456", condition = condition) +def test_download_synapse_table_with_condition( + syn, table_schema, query_return_df, condition, query, expected_df +): + syn.tableQuery = MagicMock(return_value=Table(table_schema, query_return_df)) + result = utilities.download_synapse_table(syn, "syn123456", condition=condition) # validate syn.tableQuery.assert_called_once_with(query) pd.testing.assert_frame_equal(result, expected_df) -def test_download_empty_synapse_table_with_condition(syn, table_schema, ): - syn.tableQuery = MagicMock(return_value = Table(table_schema, pd.DataFrame(columns = ["col1", "col2"]))) - result = utilities.download_synapse_table(syn, "syn123456", condition = "col2 = 1") - + +def test_download_empty_synapse_table_with_condition( + syn, + table_schema, +): + syn.tableQuery = MagicMock( + return_value=Table(table_schema, pd.DataFrame(columns=["col1", "col2"])) + ) + result = utilities.download_synapse_table(syn, "syn123456", condition="col2 = 1") + # validate syn.tableQuery.assert_called_once_with("SELECT * from syn123456 WHERE col2 = 1") - pd.testing.assert_frame_equal(result, pd.DataFrame(columns = ["col1", "col2"])) - \ No newline at end of file + pd.testing.assert_frame_equal(result, pd.DataFrame(columns=["col1", "col2"])) diff --git a/scripts/table_updates/update_data_table.py b/scripts/table_updates/update_data_table.py index 85007c3c..20a29378 100644 --- a/scripts/table_updates/update_data_table.py +++ b/scripts/table_updates/update_data_table.py @@ -54,7 +54,7 @@ def get_main_genie_clinical_sample_file( Returns: pandas.DataFrame: the read in clinical file as dataframe """ - release_files = download_synapse_table(syn, table_id = release_files_table_synid) + release_files = download_synapse_table(syn, table_id=release_files_table_synid) clinical_link_synid = release_files[ (release_files["release"] == release) & (release_files["name"] == "data_clinical_sample.txt") @@ -74,17 +74,25 @@ def get_main_genie_clinical_sample_file( return clinical_df[["SAMPLE_ID", "SEQ_YEAR"]] -def _store_data(syn: synapseclient.Synapse, table_id: str, label_data: pandas.DataFrame, table_type: str, cohort: str, logger: logging.Logger, dry_run: bool): +def _store_data( + syn: synapseclient.Synapse, + table_id: str, + label_data: pandas.DataFrame, + table_type: str, + cohort: str, + logger: logging.Logger, + dry_run: bool, +): """Helper function to store data to each table in the master table. - Before uploading data to the Synapse table, the provided label data is filtered - based on matching columns between the label data and the table schema, as well - as the form_label. Data cleansing, including the removal of rows with no data + Before uploading data to the Synapse table, the provided label data is filtered + based on matching columns between the label data and the table schema, as well + as the form_label. Data cleansing, including the removal of rows with no data and the conversion of numeric values to integers, is applied to the label data. - When table_type is set to 'primary', existing data for the cohort is wiped, and - new data is inserted. When table_type is set to 'irr', only records that do not - already exist in the table are added. The dry_run flag can be used to toggle + When table_type is set to 'primary', existing data for the cohort is wiped, and + new data is inserted. When table_type is set to 'irr', only records that do not + already exist in the table are added. The dry_run flag can be used to toggle between uploading the table to Synapse or saving it locally. Args: @@ -136,7 +144,13 @@ def _store_data(syn: synapseclient.Synapse, table_id: str, label_data: pandas.Da ) if table_type == "irr": # check for exsiting id to update for new data only - existing_records = list(set(download_synapse_table(syn, table_id = table_schema.id, condition= f"cohort = '{cohort}'")["record_id"])) + existing_records = list( + set( + download_synapse_table( + syn, table_id=table_schema.id, condition=f"cohort = '{cohort}'" + )["record_id"] + ) + ) temp_data = temp_data[~temp_data["record_id"].isin(existing_records)] if not dry_run: if table_type == "primary": @@ -146,7 +160,15 @@ def _store_data(syn: synapseclient.Synapse, table_id: str, label_data: pandas.Da temp_data.to_csv(table_id + "_temp.csv") -def store_data(syn: synapseclient.Synapse, master_table: pandas.DataFrame, label_data: pandas.DataFrame, table_type: str, cohort: str, logger: logging.Logger, dry_run: bool): +def store_data( + syn: synapseclient.Synapse, + master_table: pandas.DataFrame, + label_data: pandas.DataFrame, + table_type: str, + cohort: str, + logger: logging.Logger, + dry_run: bool, +): """Store data to each table in the master table. Args: @@ -162,6 +184,7 @@ def store_data(syn: synapseclient.Synapse, master_table: pandas.DataFrame, label for table_id in master_table["id"]: _store_data(syn, table_id, label_data, table_type, cohort, logger, dry_run) + def get_phi_cutoff(unit): switcher = {"day": math.floor(89 * 365), "month": math.floor(89 * 12), "year": 89} return switcher.get(unit, "Invalid unit") @@ -257,17 +280,23 @@ def _redact_table(df, interval_cols_info): return df, record_to_redact -def update_redact_table(syn: synapseclient.Synapse, redacted_table_info: pandas.DataFrame, full_data_table_info: pandas.DataFrame, cohort: str, logger: logging.Logger): +def update_redact_table( + syn: synapseclient.Synapse, + redacted_table_info: pandas.DataFrame, + full_data_table_info: pandas.DataFrame, + cohort: str, + logger: logging.Logger, +): """Update redacted table - Before uploading data to the Synapse table, records are identified for redaction - based on criteria such as birth year, sequencing age, vital status, and interval + Before uploading data to the Synapse table, records are identified for redaction + based on criteria such as birth year, sequencing age, vital status, and interval fields. The redacted data is then stored in the BPC internal tables. - - A special case applies to the Patient Characteristics table: flagged records are - updated, with the birth_year field cleared. Additionally, the "redacted" column + + A special case applies to the Patient Characteristics table: flagged records are + updated, with the birth_year field cleared. Additionally, the "redacted" column in this table is updated within the Sage Internal project. - + Args: syn (synapseclient.Synapse): Synapse client connection redacted_table_info (pandas.DataFrame): Table of all of the redacted tables @@ -275,7 +304,7 @@ def update_redact_table(syn: synapseclient.Synapse, redacted_table_info: pandas. cohort (string): Cohort name logger (logging.Logger): The custom logger. Optional. """ - interval_cols_info = download_synapse_table(syn, table_id = "syn23281483") + interval_cols_info = download_synapse_table(syn, table_id="syn23281483") # Create new master table master_table = redacted_table_info.merge( full_data_table_info, on="name", suffixes=("_redacted", "_full") @@ -292,9 +321,24 @@ def update_redact_table(syn: synapseclient.Synapse, redacted_table_info: pandas. ].values[0] # download tables condition = f"cohort = '{cohort}'" - curation_info = download_synapse_table(syn, table_id = curation_table_id, select = "record_id, curation_dt", condition = condition) - patient_info = download_synapse_table(syn, table_id = patient_table_id, select = "record_id, birth_year, hybrid_death_ind", condition = condition) - sample_info = download_synapse_table(syn, table_id = sample_table_id, select = "record_id, cpt_genie_sample_id, age_at_seq_report", condition = condition) + curation_info = download_synapse_table( + syn, + table_id=curation_table_id, + select="record_id, curation_dt", + condition=condition, + ) + patient_info = download_synapse_table( + syn, + table_id=patient_table_id, + select="record_id, birth_year, hybrid_death_ind", + condition=condition, + ) + sample_info = download_synapse_table( + syn, + table_id=sample_table_id, + select="record_id, cpt_genie_sample_id, age_at_seq_report", + condition=condition, + ) patient_curation_info = patient_info.merge( curation_info, how="left", on="record_id" ) @@ -319,7 +363,7 @@ def update_redact_table(syn: synapseclient.Synapse, redacted_table_info: pandas. for _, row in master_table.iterrows(): if row["name"] != "Patient Characteristics": table_id = row["id_full"] - df = download_synapse_table(syn, table_id, condition = condition) + df = download_synapse_table(syn, table_id, condition=condition) new_df, new_record_to_redact = _redact_table(df, interval_cols_info) new_df.reset_index(drop=True, inplace=True) record_to_redact = record_to_redact + new_record_to_redact @@ -332,7 +376,7 @@ def update_redact_table(syn: synapseclient.Synapse, redacted_table_info: pandas. table = syn.store(Table(table_schema, new_df)) # Modify patient table - df = download_synapse_table(syn, table_id = patient_table_id, condition = condition) + df = download_synapse_table(syn, table_id=patient_table_id, condition=condition) new_df, new_record_to_redact = _redact_table(df, interval_cols_info) new_df.reset_index(drop=True, inplace=True) record_to_redact = record_to_redact + new_record_to_redact @@ -361,7 +405,9 @@ def update_redact_table(syn: synapseclient.Synapse, redacted_table_info: pandas. pt_dat_query = syn.tableQuery( f"SELECT cohort, record_id FROM {full_pt_id} where cohort = '{cohort}'" ) - pt_dat = download_synapse_table(syn, table_id = full_pt_id, select = "cohort, record_id", condition = condition) + pt_dat = download_synapse_table( + syn, table_id=full_pt_id, select="cohort, record_id", condition=condition + ) pt_dat.index = pt_dat.index.map(str) pt_dat["index"] = pt_dat.index info_to_update = new_df[["cohort", "record_id", "redacted"]] @@ -398,7 +444,9 @@ def custom_fix_for_cancer_panel_test_table( ].values[0] cpt_table_schema = syn.get(cpt_table_id) cpt_dat_query = syn.tableQuery("SELECT cpt_genie_sample_id FROM %s" % cpt_table_id) - cpt_dat = download_synapse_table(syn, table_id = cpt_table_id, select = "cpt_genie_sample_id") + cpt_dat = download_synapse_table( + syn, table_id=cpt_table_id, select="cpt_genie_sample_id" + ) cpt_dat.index = cpt_dat.index.map(str) cpt_dat["index"] = cpt_dat.index genie_sample_dat = get_main_genie_clinical_sample_file( @@ -424,9 +472,16 @@ def custom_fix_for_cancer_panel_test_table( "SELECT cpt_sample_type FROM %s WHERE cpt_sample_type in (1,2,3,4,5,6,7)" % cpt_table_id ) - cpt_dat = download_synapse_table(syn, table_id = cpt_table_id, select = "cpt_sample_type", condition = "cpt_sample_type in (1,2,3,4,5,6,7)") + cpt_dat = download_synapse_table( + syn, + table_id=cpt_table_id, + select="cpt_sample_type", + condition="cpt_sample_type in (1,2,3,4,5,6,7)", + ) cpt_dat["cpt_sample_type"] = pandas.to_numeric(cpt_dat["cpt_sample_type"]) - sample_type_mapping = download_synapse_table(syn, table_id = config['main_genie_sample_mapping_table']) + sample_type_mapping = download_synapse_table( + syn, table_id=config["main_genie_sample_mapping_table"] + ) sample_type_mapping_dict = sample_type_mapping.set_index("CODE").to_dict()[ "DESCRIPTION" ] @@ -444,7 +499,10 @@ def main(): description="Update data tables on Synapse for BPC databases" ) parser.add_argument( - "table", type=str, help="Specify table type to run", choices=TABLES["production"].keys() + "table", + type=str, + help="Specify table type to run", + choices=TABLES["production"].keys(), ) parser.add_argument( "-s", @@ -499,7 +557,7 @@ def main(): else: TABLE_INFO = TABLES["staging"] table_id, condition = list(TABLE_INFO[table_type]) - master_table = download_synapse_table(syn, table_id, condition = condition) + master_table = download_synapse_table(syn, table_id, condition=condition) # download data files # TODO: find the cohort that has new data # This is a mapping to all the intake data. e.g: ProstateBPCIntake_data @@ -514,7 +572,9 @@ def main(): custom_fix_for_cancer_panel_test_table(syn, master_table, logger, config) if table_type == "primary": table_id, condition = list(TABLE_INFO["redacted"]) - redacted_table_info = download_synapse_table(syn, table_id, condition = condition) + redacted_table_info = download_synapse_table( + syn, table_id, condition=condition + ) logger.info("Updating redacted tables...") update_redact_table(syn, redacted_table_info, master_table, cohort, logger) logger.info("Updating version for redacted tables") diff --git a/scripts/table_updates/utilities.py b/scripts/table_updates/utilities.py index ef4f598a..3fbca3e9 100644 --- a/scripts/table_updates/utilities.py +++ b/scripts/table_updates/utilities.py @@ -45,14 +45,16 @@ def check_empty_row(row, cols_to_skip): return row.drop(cols_to_skip).isnull().all() -def download_synapse_table(syn, table_id: str, select: str = "*", condition: str = "") -> pandas.DataFrame: +def download_synapse_table( + syn, table_id: str, select: str = "*", condition: str = "" +) -> pandas.DataFrame: """Download Synapse Table with the given table ID and condition Args: syn: Synapse credential table_id: Synapse ID of a table - select: Columns to be selected. Defaults to all columns. - condition: Additional condition for querying the table. Defaults to all rows. + select: Columns to be selected. Defaults to all columns. + condition: Additional condition for querying the table. Defaults to all rows. Returns: A Pandas dataframe of the Synapse table @@ -78,9 +80,11 @@ def download_synapse_table(syn, table_id: str, select: str = "*", condition: str "-NaN", "nan", "-nan", - "" + "", ] - synapse_table = synapse_table.asDataFrame(na_values=na_values, keep_default_na=False) + synapse_table = synapse_table.asDataFrame( + na_values=na_values, keep_default_na=False + ) return synapse_table @@ -113,7 +117,7 @@ def get_data(syn, label_data_id, cohort): "-NaN", "nan", "-nan", - "" + "", ] label_data = pandas.read_csv( syn.get(label_data_id).path, From c3aa72768724e26b216e437ed21def57e5591c17 Mon Sep 17 00:00:00 2001 From: danlu1 Date: Fri, 25 Oct 2024 00:22:28 +0000 Subject: [PATCH 35/40] add test cases --- scripts/table_updates/tests/test_utilities.py | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/scripts/table_updates/tests/test_utilities.py b/scripts/table_updates/tests/test_utilities.py index fff06036..bf19f78e 100644 --- a/scripts/table_updates/tests/test_utilities.py +++ b/scripts/table_updates/tests/test_utilities.py @@ -99,6 +99,42 @@ def test_download_synapse_table_with_condition( pd.testing.assert_frame_equal(result, expected_df) +@pytest.mark.parametrize( + "query_return_df,select,condition,query,expected_df", + [ + ( + pd.DataFrame({"col1": ["value1"], "col2": [1]}), + "col1", + "col1 = 'value1'", + "SELECT col1 from syn123456 WHERE col1 = 'value1'", + pd.DataFrame({"col1": ["value1"], "col2": [1]}), + ), + ( + pd.DataFrame({"col1": ["value1"], "col2": [1]}), + "col1,col2", + "col1 = 'value1'", + "SELECT col1,col2 from syn123456 WHERE col1 = 'value1'", + pd.DataFrame({"col1": ["value1"], "col2": [1]}), + ), + ], + ids=[ + "selected_one_columns_with_condition", + "select_multiple_columns_with_condition", + ], +) +def test_download_synapse_table_with_select_and_condition( + syn, table_schema, query_return_df, select, condition, query, expected_df +): + syn.tableQuery = MagicMock(return_value=Table(table_schema, query_return_df)) + result = utilities.download_synapse_table( + syn, "syn123456", select=select, condition=condition + ) + + # validate + syn.tableQuery.assert_called_once_with(query) + pd.testing.assert_frame_equal(result, expected_df) + + def test_download_empty_synapse_table_with_condition( syn, table_schema, From 961d47c6598d51d6a6a6c4d3cb8b1f3cbcf27a10 Mon Sep 17 00:00:00 2001 From: danlu1 Date: Mon, 28 Oct 2024 05:23:45 +0000 Subject: [PATCH 36/40] add function to remove unwanted backslash in Ca Directed Drugs table --- scripts/table_updates/tests/test_utilities.py | 42 +++++++++++++++++++ scripts/table_updates/update_data_table.py | 3 ++ scripts/table_updates/utilities.py | 22 ++++++++++ 3 files changed, 67 insertions(+) diff --git a/scripts/table_updates/tests/test_utilities.py b/scripts/table_updates/tests/test_utilities.py index bf19f78e..3433e8ec 100644 --- a/scripts/table_updates/tests/test_utilities.py +++ b/scripts/table_updates/tests/test_utilities.py @@ -147,3 +147,45 @@ def test_download_empty_synapse_table_with_condition( # validate syn.tableQuery.assert_called_once_with("SELECT * from syn123456 WHERE col2 = 1") pd.testing.assert_frame_equal(result, pd.DataFrame(columns=["col1", "col2"])) + + +@pytest.mark.parametrize( + "input_df,expected_df", + [ + ( + pd.DataFrame({"col1": ["\\abc", "def"], "col2": ["abc", "def\\"]}), + pd.DataFrame({"col1": ["abc", "def"], "col2": ["abc", "def"]}), + ), + ( + pd.DataFrame({"col1": ["abc", "def"], "col2": ["abc", "def\\"]}), + pd.DataFrame({"col1": ["abc", "def"], "col2": ["abc", "def"]}), + ), + ( + pd.DataFrame({"col1": ["abc", "def"], "col2": ["abc", "def"]}), + pd.DataFrame({"col1": ["abc", "def"], "col2": ["abc", "def"]}), + ), + ( + pd.DataFrame( + { + "col1": ["\\abc", "de\\f", "ghi\\"], + "col2": ["abc(\\hh)", "def,\\,hh", "ghi, ,\\hh"], + } + ), + pd.DataFrame( + { + "col1": ["abc", "def", "ghi"], + "col2": ["abc(hh)", "def,,hh", "ghi, ,hh"], + } + ), + ), + ], + ids=[ + "multiple_columns_with_backslash", + "one_column_with_backslash", + "none_column_with_backslash", + "backslashes_in_multiple_places", + ], +) +def test_remove_backslash(input_df, expected_df): + results = utilities.remove_backslash(input_df) + pd.testing.assert_frame_equal(results, expected_df) diff --git a/scripts/table_updates/update_data_table.py b/scripts/table_updates/update_data_table.py index 20a29378..1b005906 100644 --- a/scripts/table_updates/update_data_table.py +++ b/scripts/table_updates/update_data_table.py @@ -138,6 +138,9 @@ def _store_data( temp_data.drop(index=rows_to_drop, inplace=True) # remove .0 from all columns temp_data = temp_data.applymap(lambda x: float_to_int(x)) + # remove backslash from drugs_drug cols in ca_directed_drugs + if table_schema.name == "Ca Directed Drugs": + temp_data = remove_backslash(temp_data) # update table table_query = syn.tableQuery( f"SELECT * FROM {table_schema.id} where cohort = '{cohort}'" diff --git a/scripts/table_updates/utilities.py b/scripts/table_updates/utilities.py index 3fbca3e9..d18aad3e 100644 --- a/scripts/table_updates/utilities.py +++ b/scripts/table_updates/utilities.py @@ -199,3 +199,25 @@ def revert_table_version(syn, table_id): table_query = syn.tableQuery("SELECT * from %s" % table_id) syn.delete(table_query.asRowSet()) syn.store(Table(table_schema, temp_data)) + + +def remove_backslash(df: pandas.DataFrame) -> pandas.DataFrame: + """Function to detect and remove unwanted backslashes in columns from a dataframe + + Args: + df (pandas.DataFrame): A dataframe to check against + + Returns: + pandas.DataFrame: A dataframe with unwanted backslashes removed + """ + # Check columns with backslashes + columns_with_backslash = [ + col for col in df.columns if df[col].astype(str).str.contains(r"\\").any() + ] + + if columns_with_backslash: + df[columns_with_backslash] = df[columns_with_backslash].replace( + r"\\", "", regex=True + ) + + return df From edd22ea4777e718f0a8c7fc0754fa5a67a0b6a85 Mon Sep 17 00:00:00 2001 From: danlu1 Date: Mon, 28 Oct 2024 05:34:28 +0000 Subject: [PATCH 37/40] reformat tests/test_utilities.py --- scripts/table_updates/tests/test_utilities.py | 42 ++++++++++++++++--- 1 file changed, 36 insertions(+), 6 deletions(-) diff --git a/scripts/table_updates/tests/test_utilities.py b/scripts/table_updates/tests/test_utilities.py index 3433e8ec..c2d54a10 100644 --- a/scripts/table_updates/tests/test_utilities.py +++ b/scripts/table_updates/tests/test_utilities.py @@ -153,16 +153,46 @@ def test_download_empty_synapse_table_with_condition( "input_df,expected_df", [ ( - pd.DataFrame({"col1": ["\\abc", "def"], "col2": ["abc", "def\\"]}), - pd.DataFrame({"col1": ["abc", "def"], "col2": ["abc", "def"]}), + pd.DataFrame( + { + "col1": ["\\abc", "def"], + "col2": ["abc", "def\\"] + } + ), + pd.DataFrame( + { + "col1": ["abc", "def"], + "col2": ["abc", "def"] + } + ), ), ( - pd.DataFrame({"col1": ["abc", "def"], "col2": ["abc", "def\\"]}), - pd.DataFrame({"col1": ["abc", "def"], "col2": ["abc", "def"]}), + pd.DataFrame( + { + "col1": ["abc", "def"], + "col2": ["abc", "def\\"] + } + ), + pd.DataFrame( + { + "col1": ["abc", "def"], + "col2": ["abc", "def"] + } + ), ), ( - pd.DataFrame({"col1": ["abc", "def"], "col2": ["abc", "def"]}), - pd.DataFrame({"col1": ["abc", "def"], "col2": ["abc", "def"]}), + pd.DataFrame( + { + "col1": ["abc", "def"], + "col2": ["abc", "def"] + } + ), + pd.DataFrame( + { + "col1": ["abc", "def"], + "col2": ["abc", "def"] + } + ), ), ( pd.DataFrame( From ffa7a84afeb39647e756c313b8b398e85db11d65 Mon Sep 17 00:00:00 2001 From: rxu17 <26471741+rxu17@users.noreply.github.com> Date: Mon, 28 Oct 2024 11:31:22 -0700 Subject: [PATCH 38/40] add missing previous input req --- modules/run_quac_upload_report_error.nf | 1 + 1 file changed, 1 insertion(+) diff --git a/modules/run_quac_upload_report_error.nf b/modules/run_quac_upload_report_error.nf index 816c4979..79afa85a 100644 --- a/modules/run_quac_upload_report_error.nf +++ b/modules/run_quac_upload_report_error.nf @@ -9,6 +9,7 @@ process run_quac_upload_report_error { debug true input: + val previous val cohort output: From 665b77a2f65bb2075c0dfff0cbdf6b01ff6357b7 Mon Sep 17 00:00:00 2001 From: Dan Lu <90745557+danlu1@users.noreply.github.com> Date: Mon, 28 Oct 2024 23:17:36 -0700 Subject: [PATCH 39/40] Update CONTRIBUTING.md Co-authored-by: Thomas Yu --- CONTRIBUTING.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 6c419728..8e1d0f88 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -18,7 +18,7 @@ ``` git add your file - git commit -m"your commit information" + git commit -m "your commit information" git push --set-upstream origin gen-1234-{feature-here} ``` 6. Create a pull request from the feature branch to the develop branch. When changes are made to `script/` (only applies to scripts/references and scripts/table_updates for now), a Github action will be triggered to create a docker image for the branch, you can check it [here](https://github.com/Sage-Bionetworks/genie-bpc-pipeline/pkgs/container/genie-bpc-pipeline). From 7ed8bc197bc8f8aa9634f4fa5f6ddf4e7f001363 Mon Sep 17 00:00:00 2001 From: danlu1 Date: Wed, 30 Oct 2024 16:35:52 +0000 Subject: [PATCH 40/40] make remove_backslash column specific --- scripts/table_updates/tests/test_utilities.py | 79 +++++++++++-------- scripts/table_updates/update_data_table.py | 5 +- scripts/table_updates/utilities.py | 21 +++-- 3 files changed, 57 insertions(+), 48 deletions(-) diff --git a/scripts/table_updates/tests/test_utilities.py b/scripts/table_updates/tests/test_utilities.py index c2d54a10..642861a9 100644 --- a/scripts/table_updates/tests/test_utilities.py +++ b/scripts/table_updates/tests/test_utilities.py @@ -150,61 +150,55 @@ def test_download_empty_synapse_table_with_condition( @pytest.mark.parametrize( - "input_df,expected_df", + "input_df,cols,expected_df", [ ( - pd.DataFrame( - { - "col1": ["\\abc", "def"], - "col2": ["abc", "def\\"] - } - ), - pd.DataFrame( - { - "col1": ["abc", "def"], - "col2": ["abc", "def"] - } - ), + pd.DataFrame({"col1": ["\\abc", "def"], "col2": ["abc", "def\\"]}), + ["col1", "col2"], + pd.DataFrame({"col1": ["abc", "def"], "col2": ["abc", "def"]}), ), ( - pd.DataFrame( - { - "col1": ["abc", "def"], - "col2": ["abc", "def\\"] - } - ), - pd.DataFrame( - { - "col1": ["abc", "def"], - "col2": ["abc", "def"] - } - ), + pd.DataFrame({"col1": ["abc", "def"], "col2": ["abc", "def\\"]}), + ["col2"], + pd.DataFrame({"col1": ["abc", "def"], "col2": ["abc", "def"]}), + ), + ( + pd.DataFrame({"col1": ["abc", "def"], "col2": ["abc", "def\\"]}), + ["col1"], + pd.DataFrame({"col1": ["abc", "def"], "col2": ["abc", "def\\"]}), + ), + ( + pd.DataFrame({"col1": ["abc", "def"], "col2": ["abc", "def"]}), + ["col1", "col2"], + pd.DataFrame({"col1": ["abc", "def"], "col2": ["abc", "def"]}), ), ( pd.DataFrame( { - "col1": ["abc", "def"], - "col2": ["abc", "def"] + "col1": ["\\abc", "de\\f", "ghi\\"], + "col2": ["abc(\\hh)", "def,\\,hh", "ghi, ,\\hh"], } ), + ["col1", "col2"], pd.DataFrame( { - "col1": ["abc", "def"], - "col2": ["abc", "def"] + "col1": ["abc", "def", "ghi"], + "col2": ["abc(hh)", "def,,hh", "ghi, ,hh"], } ), ), ( pd.DataFrame( { - "col1": ["\\abc", "de\\f", "ghi\\"], - "col2": ["abc(\\hh)", "def,\\,hh", "ghi, ,\\hh"], + "col1": [1, "de\\f", "ghi\\", np.nan], + "col2": ["abc(\\hh)", "def,\\,hh", "ghi, ,\\hh", 2], } ), + ["col1", "col2"], pd.DataFrame( { - "col1": ["abc", "def", "ghi"], - "col2": ["abc(hh)", "def,,hh", "ghi, ,hh"], + "col1": [1, "def", "ghi", np.nan], + "col2": ["abc(hh)", "def,,hh", "ghi, ,hh", 2], } ), ), @@ -212,10 +206,25 @@ def test_download_empty_synapse_table_with_condition( ids=[ "multiple_columns_with_backslash", "one_column_with_backslash", + "one_column_with_backslash_but_not_selected", "none_column_with_backslash", "backslashes_in_multiple_places", + "various_column_types", ], ) -def test_remove_backslash(input_df, expected_df): - results = utilities.remove_backslash(input_df) +def test_remove_backslash(input_df, cols, expected_df): + results = utilities.remove_backslash(input_df, cols) pd.testing.assert_frame_equal(results, expected_df) + + +@pytest.mark.parametrize( + "input_df,cols", + [ + (pd.DataFrame({"col1": ["\\abc", "def"], "col2": ["abc", "def\\"]}), ["col3"]), + ], +) +def test_remove_backslsh_fail(input_df, cols): + with pytest.raises( + ValueError, match="Invalid column list. Not all columns are in the dataframe." + ): + utilities.remove_backslash(input_df, cols) diff --git a/scripts/table_updates/update_data_table.py b/scripts/table_updates/update_data_table.py index 1b005906..7753d8eb 100644 --- a/scripts/table_updates/update_data_table.py +++ b/scripts/table_updates/update_data_table.py @@ -18,6 +18,7 @@ import datetime import json import math +import re import numpy import pandas @@ -140,7 +141,9 @@ def _store_data( temp_data = temp_data.applymap(lambda x: float_to_int(x)) # remove backslash from drugs_drug cols in ca_directed_drugs if table_schema.name == "Ca Directed Drugs": - temp_data = remove_backslash(temp_data) + # extract drugs_drug_* columns + cols = [col for col in temp_data.columns if re.search("drugs_drug_\d$", col)] + temp_data = remove_backslash(temp_data, cols) # update table table_query = syn.tableQuery( f"SELECT * FROM {table_schema.id} where cohort = '{cohort}'" diff --git a/scripts/table_updates/utilities.py b/scripts/table_updates/utilities.py index d18aad3e..0feceac6 100644 --- a/scripts/table_updates/utilities.py +++ b/scripts/table_updates/utilities.py @@ -1,5 +1,6 @@ import logging import sys +from typing import List import pandas import synapseclient @@ -201,23 +202,19 @@ def revert_table_version(syn, table_id): syn.store(Table(table_schema, temp_data)) -def remove_backslash(df: pandas.DataFrame) -> pandas.DataFrame: +def remove_backslash(df: pandas.DataFrame, cols: List[str]) -> pandas.DataFrame: """Function to detect and remove unwanted backslashes in columns from a dataframe Args: df (pandas.DataFrame): A dataframe to check against + cols (List[str]): The list of columns to be updated Returns: pandas.DataFrame: A dataframe with unwanted backslashes removed """ - # Check columns with backslashes - columns_with_backslash = [ - col for col in df.columns if df[col].astype(str).str.contains(r"\\").any() - ] - - if columns_with_backslash: - df[columns_with_backslash] = df[columns_with_backslash].replace( - r"\\", "", regex=True - ) - - return df + # check if the given columns are in the dataframe + if all(col in df.columns for col in cols): + df[cols] = df[cols].replace(r"\\", "", regex=True) + return df + else: + raise ValueError("Invalid column list. Not all columns are in the dataframe.")