Skip to content

Commit

Permalink
[GEN-1240] Add grs toggle (#162)
Browse files Browse the repository at this point in the history
* initial commit

* add use_grs to nextflow_schema

* add tests, use use_grs param instead

* newline
  • Loading branch information
rxu17 authored Jan 3, 2025
1 parent 0706c0e commit ef5af97
Show file tree
Hide file tree
Showing 5 changed files with 230 additions and 49 deletions.
82 changes: 71 additions & 11 deletions main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ params.production = false
params.schema_ignore_params = ""
params.help = false
params.step = "update_potential_phi_fields_table"
params.use_grs = false

/*
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand All @@ -46,6 +47,7 @@ if (params.cohort == null) { exit 1, 'cohort parameter not specified!' }
if (params.comment == null) { exit 1, 'comment parameter not specified!' }
if (params.production == null) { exit 1, 'production parameter not specified!' }
if (params.step == null) { exit 1, 'step parameter not specified!' }
if (params.use_grs == null) { exit 1, 'use_grs parameter not specified!' }


// Print parameter summary log to screen
Expand Down Expand Up @@ -96,21 +98,79 @@ workflow BPC_PIPELINE {
update_potential_phi_fields_table(ch_comment, params.production)
// validate_data.out.view()
} else if (params.step == "merge_and_uncode_rca_uploads"){
merge_and_uncode_rca_uploads("default", ch_cohort, ch_comment, params.production)
merge_and_uncode_rca_uploads(
"default",
ch_cohort,
ch_comment,
params.production,
params.use_grs
)
} else if (params.step == "update_data_table") {
update_data_table("default", ch_cohort, ch_comment, params.production)
update_data_table(
"default",
ch_cohort,
ch_comment,
params.production
)
} else if (params.step == "genie_bpc_pipeline"){
update_potential_phi_fields_table(ch_comment, params.production)
run_quac_upload_report_error(update_potential_phi_fields_table.out, ch_cohort)
run_quac_upload_report_warning(run_quac_upload_report_error.out, ch_cohort, params.production)
merge_and_uncode_rca_uploads(run_quac_upload_report_warning.out, ch_cohort, ch_comment, params.production)

run_quac_upload_report_error(
update_potential_phi_fields_table.out,
ch_cohort
)

run_quac_upload_report_warning(
run_quac_upload_report_error.out,
ch_cohort,
params.production
)

merge_and_uncode_rca_uploads(
run_quac_upload_report_warning.out,
ch_cohort,
ch_comment,
params.production,
params.use_grs
)
// remove_patients_from_merged(merge_and_uncode_rca_uploads.out, ch_cohort, params.production)
update_data_table(merge_and_uncode_rca_uploads.out, ch_cohort, ch_comment, params.production)
update_date_tracking_table(update_data_table.out, ch_cohort, ch_comment, params.production)
run_quac_table_report(update_date_tracking_table.out, ch_cohort, params.production)
run_quac_comparison_report(run_quac_table_report.out, ch_cohort, params.production)
create_masking_report(run_quac_comparison_report.out, ch_cohort, params.production)
update_case_count_table(create_masking_report.out, ch_comment, params.production)
update_data_table(
merge_and_uncode_rca_uploads.out,
ch_cohort,
ch_comment,
params.production
)

update_date_tracking_table(
update_data_table.out,
ch_cohort,
ch_comment,
params.production
)

run_quac_table_report(
update_date_tracking_table.out,
ch_cohort,
params.production
)

run_quac_comparison_report(
run_quac_table_report.out,
ch_cohort,
params.production
)

create_masking_report(
run_quac_comparison_report.out,
ch_cohort,
params.production
)

update_case_count_table(
create_masking_report.out,
ch_comment,
params.production
)
} else {
exit 1, 'step not supported'
}
Expand Down
16 changes: 14 additions & 2 deletions modules/merge_and_uncode_rca_uploads.nf
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ process merge_and_uncode_rca_uploads {
val cohort
val comment
val production
val use_grs

output:
stdout
Expand All @@ -20,13 +21,24 @@ process merge_and_uncode_rca_uploads {
if (production) {
"""
cd /usr/local/src/myscripts/
Rscript merge_and_uncode_rca_uploads.R -c $cohort -v --production --save_synapse --comment $comment
Rscript merge_and_uncode_rca_uploads.R \
-c $cohort
-v \
--production \
--save_synapse \
--comment $comment \
--use_grs $use_grs
"""
}
else {
"""
cd /usr/local/src/myscripts/
Rscript merge_and_uncode_rca_uploads.R -c $cohort -v --save_synapse --comment $comment
Rscript merge_and_uncode_rca_uploads.R \
-c $cohort \
-v \
--save_synapse \
--comment $comment \
--use_grs $use_grs
"""
}
}
9 changes: 9 additions & 0 deletions nextflow_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@
"genie_bpc_pipeline"
]
},
"use_grs": {
"type": "boolean",
"description": "Whether to use grs as primary mapping (dd as secondary) or not (using dd only). ",
"default": false,
"enum": [
true,
false
]
},
"references_docker":{
"type": "string",
"description": "Name of docker to use in processes in scripts/references"
Expand Down
103 changes: 71 additions & 32 deletions scripts/uploads/merge_and_uncode_rca_uploads.R
Original file line number Diff line number Diff line change
Expand Up @@ -314,29 +314,36 @@ merge_mappings <- function(primarys, secondarys, debug = F) {
return(mappings)
}

#' Map any coded data to actual values as mapped in the
#' REDCap Data Dictionary (DD).
#' Map any coded data to actual values depending on if Global Response Set
#' (GRS) is provided. If provided, data will be mapped to the merged
#' set of REDCap Data Dictionary (DD) and GRS mapping. If not, data will be mapped
#' just to the DD.
#'
#' @param data Data frame of coded data
#' @param mappings Matrix with two columns, first containing a label and
#' @param df_coded Data frame of coded data
#' @param dd Matrix with two columns, first containing a label and
#' second columns a mapping string.
#' @param secondary_mappings Another mapping matrix that is used secondarily
#' @param grs Another mapping matrix that is used secondarily
#' if the label is not found in the primary mapping matrix.
#' @param use_grs Whether we are using grs or not
#' @return Data frame of uncoded data.
#' @example
#' map_code_to_value(data = my_data, dd = dd, grs = grs)
uncode_data <- function(df_coded, dd, grs) {
uncode_data <- function(df_coded, dd, grs, use_grs) {

df_uncoded <- df_coded

# merge reference mappings
mappings_primary <- parse_mappings(strs = grs[,config$column_name$variable_mapping],
labels = grs[,config$column_name$variable_name])
mappings_secondary <- parse_mappings(strs = dd[[config$column_name$variable_mapping]],
labels = dd[[config$column_name$variable_name]])
mappings <- merge_mappings(mappings_primary, mappings_secondary)

# custom mappings
if(use_grs){
mappings_primary <- parse_mappings(strs = grs[,config$column_name$variable_mapping],
labels = grs[,config$column_name$variable_name])
mappings_secondary <- parse_mappings(strs = dd[[config$column_name$variable_mapping]],
labels = dd[[config$column_name$variable_name]])
mappings <- merge_mappings(mappings_primary, mappings_secondary)
} else {
mappings <- parse_mappings(strs = dd[[config$column_name$variable_mapping]],
labels = dd[[config$column_name$variable_name]])
}

mapping_complete <- data.frame(codes = names(config$mapping$complete),
values = as.character(config$mapping$complete),
stringsAsFactors = F)
Expand Down Expand Up @@ -576,6 +583,7 @@ synLogin <- function(auth = NA, silent = T) {
return(syn)
}


get_data_dictionary <- function(cohort) {
synid_dd <- get_bpc_synid_prissmm(synid_table_prissmm = config$synapse$prissmm$id,
cohort = cohort,
Expand All @@ -590,6 +598,24 @@ get_data_dictionary <- function(cohort) {
return(dd)
}

#' Retrieves the Global Response Set (grs) depending
#' on value of use_grs. If not using grs, returns NULL
#'
#' @param use_grs Whether to use grs or not
#' @return grs
get_global_response_set <- function(use_grs){
if(use_grs){
grs <- read.csv(synGet(config$synapse$grs$id)$path,
sep = ",",
stringsAsFactors = F,
check.names = F,
na.strings = c(""))
} else{
grs <- NULL
}
return(grs)
}

get_data_uploads <- function(cohort) {
data_upload <- list()
for (i in 1:length(config$upload[[cohort]])) {
Expand Down Expand Up @@ -639,30 +665,46 @@ get_output_folder_id <- function(config, environment){
return(config$synapse$rca_files[[glue(environment, "_id")]])
}


#' Retrieves the provenance used
#' @param cohort Synaspe id of the data dictionary use
#' @param use_grs Whether we are using grs or not
#' @return list of the synapse ids used in provenance
get_prov_used <- function(cohort, use_grs){
synid_dd <- get_bpc_synid_prissmm(synid_table_prissmm = config$synapse$prissmm$id,
cohort = cohort,
file_name = "Data Dictionary non-PHI")
if(use_grs){
prov_used <- c(as.character(unlist(config$upload[[cohort]])),
synid_dd,
config$synapse$grs$id)
} else{
prov_used <- c(as.character(unlist(config$upload[[cohort]])),
synid_dd)
}
return(prov_used)
}

#' Write to Synapse and clean up
#' Remove leading and trailing whitespace from a string.
#' @param cohort (string) name of cohort
#' @param environment (string) whether we are running in production env or staging env
#' @param comment (string) some sort of comment about the new version of the file
#' related to cohort run
save_output_synapse <- function(cohort, environment, comment) {
save_output_synapse <- function(cohort, environment, comment, use_grs) {

parent_id <- get_output_folder_id(config, environment)
file_output_pri <- get_pri_file_name(cohort)
file_output_irr <- get_irr_file_name(cohort)
synid_dd <- get_bpc_synid_prissmm(synid_table_prissmm = config$synapse$prissmm$id,
cohort = cohort,
file_name = "Data Dictionary non-PHI")

prov_used <- get_prov_used(cohort, use_grs)
save_to_synapse(path = file_output_pri,
file_name = gsub(pattern = ".csv|.tsv", replacement = "", x = file_output_pri),
parent_id = parent_id,
comment = comment,
prov_name = "BPC non-IRR upload data",
prov_desc = "Merged and uncoded BPC upload data from sites academic REDCap instances with IRR cases removed",
prov_used = c(as.character(unlist(config$upload[[cohort]])),
synid_dd,
config$synapse$grs$id),
prov_used = prov_used,
prov_exec = "https://github.com/Sage-Bionetworks/genie-bpc-pipeline/tree/develop/scripts/uploads/merge_and_uncode_rca_uploads.R")

if (file.exists(file_output_irr)) {
Expand All @@ -672,9 +714,7 @@ save_output_synapse <- function(cohort, environment, comment) {
comment = comment,
prov_name = "BPC IRR upload data",
prov_desc = "Merged and uncoded BPC upload IRR case data from sites academic REDCap instances",
prov_used = c(as.character(unlist(config$upload[[cohort]])),
synid_dd,
config$synapse$grs$id),
prov_used = prov_used,
prov_exec = "https://github.com/Sage-Bionetworks/genie-bpc-pipeline/tree/develop/scripts/uploads/merge_and_uncode_rca_uploads.R")
}

Expand Down Expand Up @@ -704,7 +744,9 @@ main <- function(){
make_option(c("-v", "--verbose"), action="store_true", default = FALSE,
help="Print out verbose output on script progress"),
make_option(c("--comment"), type = "character",
help="Comment for new table snapshot version. This must be unique and is tied to the cohort run.")
help="Comment for new table snapshot version. This must be unique and is tied to the cohort run."),
make_option(c("--use_grs"), type="logical", default = FALSE,
help="Whether to use grs as primary mapping (dd as secondary) or not (using dd only).")
)
opt <- parse_args(OptionParser(option_list=option_list))

Expand Down Expand Up @@ -745,11 +787,7 @@ main <- function(){
print(glue("{now(timeOnly = T)}: Reading global response set..."))
}

grs <- read.csv(synGet(config$synapse$grs$id)$path,
sep = ",",
stringsAsFactors = F,
check.names = F,
na.strings = c(""))
grs <- get_global_response_set(use_grs = opt$use_grs)

# for each user-specified cohort
for (cohort in cohort_input) {
Expand Down Expand Up @@ -781,7 +819,8 @@ main <- function(){
# uncode
uncoded <- uncode_data(df_coded = coded,
dd = dd,
grs = grs)
grs = grs,
use_grs = opt$use_grs)

if (debug) {
print(glue("{now(timeOnly = T)}: Formatting uncoded data..."))
Expand All @@ -806,7 +845,7 @@ main <- function(){
print(glue("{now(timeOnly = T)}: Saving uncoded data to Synapse..."))
}

save_output_synapse(cohort, environment = env, comment = opt$comment)
save_output_synapse(cohort, environment = env, comment = opt$comment, use_grs = opt$use_grs)
}

# clean up for memory
Expand Down
Loading

0 comments on commit ef5af97

Please sign in to comment.