From ef5af9774ece3fae4587035ee42f9bef42a27a5c Mon Sep 17 00:00:00 2001 From: rxu17 <26471741+rxu17@users.noreply.github.com> Date: Fri, 3 Jan 2025 15:09:59 -0800 Subject: [PATCH] [GEN-1240] Add grs toggle (#162) * initial commit * add use_grs to nextflow_schema * add tests, use use_grs param instead * newline --- main.nf | 82 ++++++++++++-- modules/merge_and_uncode_rca_uploads.nf | 16 ++- nextflow_schema.json | 9 ++ .../uploads/merge_and_uncode_rca_uploads.R | 103 ++++++++++++------ .../tests/test_merge_and_uncode_rca_uploads.R | 69 +++++++++++- 5 files changed, 230 insertions(+), 49 deletions(-) diff --git a/main.nf b/main.nf index 2e7e10a8..856585a6 100644 --- a/main.nf +++ b/main.nf @@ -25,6 +25,7 @@ params.production = false params.schema_ignore_params = "" params.help = false params.step = "update_potential_phi_fields_table" +params.use_grs = false /* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -46,6 +47,7 @@ if (params.cohort == null) { exit 1, 'cohort parameter not specified!' } if (params.comment == null) { exit 1, 'comment parameter not specified!' } if (params.production == null) { exit 1, 'production parameter not specified!' } if (params.step == null) { exit 1, 'step parameter not specified!' } +if (params.use_grs == null) { exit 1, 'use_grs parameter not specified!' } // Print parameter summary log to screen @@ -96,21 +98,79 @@ workflow BPC_PIPELINE { update_potential_phi_fields_table(ch_comment, params.production) // validate_data.out.view() } else if (params.step == "merge_and_uncode_rca_uploads"){ - merge_and_uncode_rca_uploads("default", ch_cohort, ch_comment, params.production) + merge_and_uncode_rca_uploads( + "default", + ch_cohort, + ch_comment, + params.production, + params.use_grs + ) } else if (params.step == "update_data_table") { - update_data_table("default", ch_cohort, ch_comment, params.production) + update_data_table( + "default", + ch_cohort, + ch_comment, + params.production + ) } else if (params.step == "genie_bpc_pipeline"){ update_potential_phi_fields_table(ch_comment, params.production) - run_quac_upload_report_error(update_potential_phi_fields_table.out, ch_cohort) - run_quac_upload_report_warning(run_quac_upload_report_error.out, ch_cohort, params.production) - merge_and_uncode_rca_uploads(run_quac_upload_report_warning.out, ch_cohort, ch_comment, params.production) + + run_quac_upload_report_error( + update_potential_phi_fields_table.out, + ch_cohort + ) + + run_quac_upload_report_warning( + run_quac_upload_report_error.out, + ch_cohort, + params.production + ) + + merge_and_uncode_rca_uploads( + run_quac_upload_report_warning.out, + ch_cohort, + ch_comment, + params.production, + params.use_grs + ) // remove_patients_from_merged(merge_and_uncode_rca_uploads.out, ch_cohort, params.production) - update_data_table(merge_and_uncode_rca_uploads.out, ch_cohort, ch_comment, params.production) - update_date_tracking_table(update_data_table.out, ch_cohort, ch_comment, params.production) - run_quac_table_report(update_date_tracking_table.out, ch_cohort, params.production) - run_quac_comparison_report(run_quac_table_report.out, ch_cohort, params.production) - create_masking_report(run_quac_comparison_report.out, ch_cohort, params.production) - update_case_count_table(create_masking_report.out, ch_comment, params.production) + update_data_table( + merge_and_uncode_rca_uploads.out, + ch_cohort, + ch_comment, + params.production + ) + + update_date_tracking_table( + update_data_table.out, + ch_cohort, + ch_comment, + params.production + ) + + run_quac_table_report( + update_date_tracking_table.out, + ch_cohort, + params.production + ) + + run_quac_comparison_report( + run_quac_table_report.out, + ch_cohort, + params.production + ) + + create_masking_report( + run_quac_comparison_report.out, + ch_cohort, + params.production + ) + + update_case_count_table( + create_masking_report.out, + ch_comment, + params.production + ) } else { exit 1, 'step not supported' } diff --git a/modules/merge_and_uncode_rca_uploads.nf b/modules/merge_and_uncode_rca_uploads.nf index 37a95e1c..a72d29f0 100644 --- a/modules/merge_and_uncode_rca_uploads.nf +++ b/modules/merge_and_uncode_rca_uploads.nf @@ -12,6 +12,7 @@ process merge_and_uncode_rca_uploads { val cohort val comment val production + val use_grs output: stdout @@ -20,13 +21,24 @@ process merge_and_uncode_rca_uploads { if (production) { """ cd /usr/local/src/myscripts/ - Rscript merge_and_uncode_rca_uploads.R -c $cohort -v --production --save_synapse --comment $comment + Rscript merge_and_uncode_rca_uploads.R \ + -c $cohort + -v \ + --production \ + --save_synapse \ + --comment $comment \ + --use_grs $use_grs """ } else { """ cd /usr/local/src/myscripts/ - Rscript merge_and_uncode_rca_uploads.R -c $cohort -v --save_synapse --comment $comment + Rscript merge_and_uncode_rca_uploads.R \ + -c $cohort \ + -v \ + --save_synapse \ + --comment $comment \ + --use_grs $use_grs """ } } diff --git a/nextflow_schema.json b/nextflow_schema.json index 9ad90651..e1f694a5 100644 --- a/nextflow_schema.json +++ b/nextflow_schema.json @@ -55,6 +55,15 @@ "genie_bpc_pipeline" ] }, + "use_grs": { + "type": "boolean", + "description": "Whether to use grs as primary mapping (dd as secondary) or not (using dd only). ", + "default": false, + "enum": [ + true, + false + ] + }, "references_docker":{ "type": "string", "description": "Name of docker to use in processes in scripts/references" diff --git a/scripts/uploads/merge_and_uncode_rca_uploads.R b/scripts/uploads/merge_and_uncode_rca_uploads.R index 8c72f181..3298adfa 100644 --- a/scripts/uploads/merge_and_uncode_rca_uploads.R +++ b/scripts/uploads/merge_and_uncode_rca_uploads.R @@ -314,29 +314,36 @@ merge_mappings <- function(primarys, secondarys, debug = F) { return(mappings) } -#' Map any coded data to actual values as mapped in the -#' REDCap Data Dictionary (DD). +#' Map any coded data to actual values depending on if Global Response Set +#' (GRS) is provided. If provided, data will be mapped to the merged +#' set of REDCap Data Dictionary (DD) and GRS mapping. If not, data will be mapped +#' just to the DD. #' -#' @param data Data frame of coded data -#' @param mappings Matrix with two columns, first containing a label and +#' @param df_coded Data frame of coded data +#' @param dd Matrix with two columns, first containing a label and #' second columns a mapping string. -#' @param secondary_mappings Another mapping matrix that is used secondarily +#' @param grs Another mapping matrix that is used secondarily #' if the label is not found in the primary mapping matrix. +#' @param use_grs Whether we are using grs or not #' @return Data frame of uncoded data. #' @example #' map_code_to_value(data = my_data, dd = dd, grs = grs) -uncode_data <- function(df_coded, dd, grs) { +uncode_data <- function(df_coded, dd, grs, use_grs) { df_uncoded <- df_coded # merge reference mappings - mappings_primary <- parse_mappings(strs = grs[,config$column_name$variable_mapping], - labels = grs[,config$column_name$variable_name]) - mappings_secondary <- parse_mappings(strs = dd[[config$column_name$variable_mapping]], - labels = dd[[config$column_name$variable_name]]) - mappings <- merge_mappings(mappings_primary, mappings_secondary) - - # custom mappings + if(use_grs){ + mappings_primary <- parse_mappings(strs = grs[,config$column_name$variable_mapping], + labels = grs[,config$column_name$variable_name]) + mappings_secondary <- parse_mappings(strs = dd[[config$column_name$variable_mapping]], + labels = dd[[config$column_name$variable_name]]) + mappings <- merge_mappings(mappings_primary, mappings_secondary) + } else { + mappings <- parse_mappings(strs = dd[[config$column_name$variable_mapping]], + labels = dd[[config$column_name$variable_name]]) + } + mapping_complete <- data.frame(codes = names(config$mapping$complete), values = as.character(config$mapping$complete), stringsAsFactors = F) @@ -576,6 +583,7 @@ synLogin <- function(auth = NA, silent = T) { return(syn) } + get_data_dictionary <- function(cohort) { synid_dd <- get_bpc_synid_prissmm(synid_table_prissmm = config$synapse$prissmm$id, cohort = cohort, @@ -590,6 +598,24 @@ get_data_dictionary <- function(cohort) { return(dd) } +#' Retrieves the Global Response Set (grs) depending +#' on value of use_grs. If not using grs, returns NULL +#' +#' @param use_grs Whether to use grs or not +#' @return grs +get_global_response_set <- function(use_grs){ + if(use_grs){ + grs <- read.csv(synGet(config$synapse$grs$id)$path, + sep = ",", + stringsAsFactors = F, + check.names = F, + na.strings = c("")) + } else{ + grs <- NULL + } + return(grs) +} + get_data_uploads <- function(cohort) { data_upload <- list() for (i in 1:length(config$upload[[cohort]])) { @@ -639,30 +665,46 @@ get_output_folder_id <- function(config, environment){ return(config$synapse$rca_files[[glue(environment, "_id")]]) } + +#' Retrieves the provenance used +#' @param cohort Synaspe id of the data dictionary use +#' @param use_grs Whether we are using grs or not +#' @return list of the synapse ids used in provenance +get_prov_used <- function(cohort, use_grs){ + synid_dd <- get_bpc_synid_prissmm(synid_table_prissmm = config$synapse$prissmm$id, + cohort = cohort, + file_name = "Data Dictionary non-PHI") + if(use_grs){ + prov_used <- c(as.character(unlist(config$upload[[cohort]])), + synid_dd, + config$synapse$grs$id) + } else{ + prov_used <- c(as.character(unlist(config$upload[[cohort]])), + synid_dd) + } + return(prov_used) +} + #' Write to Synapse and clean up #' Remove leading and trailing whitespace from a string. #' @param cohort (string) name of cohort #' @param environment (string) whether we are running in production env or staging env #' @param comment (string) some sort of comment about the new version of the file #' related to cohort run -save_output_synapse <- function(cohort, environment, comment) { - +save_output_synapse <- function(cohort, environment, comment, use_grs) { + parent_id <- get_output_folder_id(config, environment) file_output_pri <- get_pri_file_name(cohort) file_output_irr <- get_irr_file_name(cohort) - synid_dd <- get_bpc_synid_prissmm(synid_table_prissmm = config$synapse$prissmm$id, - cohort = cohort, - file_name = "Data Dictionary non-PHI") + prov_used <- get_prov_used(cohort, use_grs) save_to_synapse(path = file_output_pri, file_name = gsub(pattern = ".csv|.tsv", replacement = "", x = file_output_pri), parent_id = parent_id, comment = comment, prov_name = "BPC non-IRR upload data", prov_desc = "Merged and uncoded BPC upload data from sites academic REDCap instances with IRR cases removed", - prov_used = c(as.character(unlist(config$upload[[cohort]])), - synid_dd, - config$synapse$grs$id), + prov_used = prov_used, prov_exec = "https://github.com/Sage-Bionetworks/genie-bpc-pipeline/tree/develop/scripts/uploads/merge_and_uncode_rca_uploads.R") if (file.exists(file_output_irr)) { @@ -672,9 +714,7 @@ save_output_synapse <- function(cohort, environment, comment) { comment = comment, prov_name = "BPC IRR upload data", prov_desc = "Merged and uncoded BPC upload IRR case data from sites academic REDCap instances", - prov_used = c(as.character(unlist(config$upload[[cohort]])), - synid_dd, - config$synapse$grs$id), + prov_used = prov_used, prov_exec = "https://github.com/Sage-Bionetworks/genie-bpc-pipeline/tree/develop/scripts/uploads/merge_and_uncode_rca_uploads.R") } @@ -704,7 +744,9 @@ main <- function(){ make_option(c("-v", "--verbose"), action="store_true", default = FALSE, help="Print out verbose output on script progress"), make_option(c("--comment"), type = "character", - help="Comment for new table snapshot version. This must be unique and is tied to the cohort run.") + help="Comment for new table snapshot version. This must be unique and is tied to the cohort run."), + make_option(c("--use_grs"), type="logical", default = FALSE, + help="Whether to use grs as primary mapping (dd as secondary) or not (using dd only).") ) opt <- parse_args(OptionParser(option_list=option_list)) @@ -745,11 +787,7 @@ main <- function(){ print(glue("{now(timeOnly = T)}: Reading global response set...")) } - grs <- read.csv(synGet(config$synapse$grs$id)$path, - sep = ",", - stringsAsFactors = F, - check.names = F, - na.strings = c("")) + grs <- get_global_response_set(use_grs = opt$use_grs) # for each user-specified cohort for (cohort in cohort_input) { @@ -781,7 +819,8 @@ main <- function(){ # uncode uncoded <- uncode_data(df_coded = coded, dd = dd, - grs = grs) + grs = grs, + use_grs = opt$use_grs) if (debug) { print(glue("{now(timeOnly = T)}: Formatting uncoded data...")) @@ -806,7 +845,7 @@ main <- function(){ print(glue("{now(timeOnly = T)}: Saving uncoded data to Synapse...")) } - save_output_synapse(cohort, environment = env, comment = opt$comment) + save_output_synapse(cohort, environment = env, comment = opt$comment, use_grs = opt$use_grs) } # clean up for memory diff --git a/scripts/uploads/tests/test_merge_and_uncode_rca_uploads.R b/scripts/uploads/tests/test_merge_and_uncode_rca_uploads.R index daa280dd..38ea49e2 100644 --- a/scripts/uploads/tests/test_merge_and_uncode_rca_uploads.R +++ b/scripts/uploads/tests/test_merge_and_uncode_rca_uploads.R @@ -1,3 +1,4 @@ +library(mockery) library(testthat) source(testthat::test_path("..", "merge_and_uncode_rca_uploads.R")) @@ -8,18 +9,78 @@ setup({ if (!file.exists("config.yaml")) { workdir <- "/usr/local/src/myscripts" } - config <<- read_yaml(glue("{workdir}/config.yaml")) + config_path <<- read_yaml(glue("{workdir}/config.yaml")) + + # Mock config + config <<- list( + synapse = list( + prissmm = list(id = "mock_prissmm_id"), + grs = list(id = "mock_grs_id") + ), + upload = list( + cohort1 = list( + site1 = list( + data1 = "mock_data1_id", + data2 = "mock_data2_id" + ), + site2 = list( + data1 = "mock_data3_id", + data2 = "mock_data4_id" + ) + ) + ) + ) }) test_that("get_output_folder_id gets expected id when environment is production", { - folder_id <- get_output_folder_id(config, environment = "production") + folder_id <- get_output_folder_id(config_path, environment = "production") expect_equal(folder_id, "syn23286928") }) test_that("get_output_folder_id gets expected id when environment is staging", { - folder_id <- get_output_folder_id(config, environment = "staging") + folder_id <- get_output_folder_id(config_path, environment = "staging") expect_equal(folder_id, "syn63887337") -}) \ No newline at end of file +}) + + +test_that("get_global_response_set returns NULL when use_grs is FALSE", { + result <- get_global_response_set(use_grs = FALSE) + expect_null(result) +}) + + +test_that("get_global_response_set calls read.csv and returns data when use_grs is TRUE", { + mock_read_csv <- mock(data.frame(a = 1:3, b = 4:6)) # Example mock data frame + stub(get_global_response_set, "read.csv", mock_read_csv) + + mock_synGet <- mock(list(path = "mock_path")) + stub(get_global_response_set, "synGet", mock_synGet) + + result <- get_global_response_set(use_grs = TRUE) + + # Verify the function returns the mocked data frame + expect_equal(result, data.frame(a = 1:3, b = 4:6)) +}) + + +test_that("get_prov_used returns correct prov when use_grs is FALSE", { + # Mock the `get_bpc_synid_prissmm` function + mock_get_bpc_synid_prissmm <- mock("mock_synid_dd") + stub(get_prov_used, "get_bpc_synid_prissmm", mock_get_bpc_synid_prissmm) + + result <- get_prov_used(cohort = "cohort1", use_grs = FALSE) + expect_equal(result, c("mock_data1_id", "mock_data2_id", "mock_data3_id", "mock_data4_id", "mock_synid_dd")) +}) + +test_that("get_prov_used returns correct prov when use_grs is TRUE", { + # Mock the `get_bpc_synid_prissmm` function + mock_get_bpc_synid_prissmm <- mock("mock_synid_dd") + stub(get_prov_used, "get_bpc_synid_prissmm", mock_get_bpc_synid_prissmm) + + result <- get_prov_used(cohort = "cohort1", use_grs = TRUE) + print(result) + expect_equal(result, c("mock_data1_id", "mock_data2_id", "mock_data3_id", "mock_data4_id", "mock_synid_dd", "mock_grs_id")) +})