Skip to content


Add multivariate fine-mapping workflows
Browse files Browse the repository at this point in the history
  • Loading branch information
gaow committed Dec 26, 2023
1 parent 6771056 commit ebf9ea8
Show file tree
Hide file tree
Showing 10 changed files with 1,125 additions and 0 deletions.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
335 changes: 335 additions & 0 deletions workflow/multivariate-fine-mapping/mash_data_preprocessing.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,335 @@
"cells": [
"cell_type": "markdown",
"metadata": {
"kernel": "SoS"
"source": [
"# Data munggling for multi-variant summary stats"
"cell_type": "markdown",
"metadata": {
"kernel": "SoS"
"source": [
"## Minimal working example"
"cell_type": "markdown",
"metadata": {
"kernel": "SoS"
"source": [
"To see the input requirements and output data formats, please [download a minimal working example here](, and run the following:\n",
"### Merge univariate results\n",
"sos run mixture_prior.ipynb merge \\\n",
" --analysis-units <FIXME> \\\n",
" --plink-sumstats <FIXME> \\\n",
" --name gtex_mixture\n",
"### Select and merge univariate effects\n",
"cd $m && ls *.rds | sed 's/\\.rds//g' > analysis_units.txt && cd -\n",
"sos run mixture_prior.ipynb extract_effects \\\n",
" --analysis-units $m/analysis_units.txt \\\n",
" --datadir $m --name `basename $m`\n",
"Notice that for production use, each `sos run` command should be submitted to the cluster as a job."
"cell_type": "markdown",
"metadata": {
"kernel": "SoS"
"source": [
"## Global parameters"
"cell_type": "code",
"execution_count": null,
"metadata": {
"kernel": "SoS"
"outputs": [],
"source": [
"import os\n",
"# Work directory & output directory\n",
"parameter: cwd = path('./output')\n",
"# The filename prefix for output data\n",
"parameter: name = str\n",
"parameter: mixture_components = ['flash', 'flash_nonneg', 'pca', 'canonical']\n",
"parameter: job_size = 1# Residual correlatoin file\n",
"parameter: resid_cor = path(\".\")\n",
"fail_if(not (resid_cor.is_file() or resid_cor == path('.')), msg = f'Cannot find ``{resid_cor}``')"
"cell_type": "markdown",
"metadata": {
"kernel": "SoS"
"source": [
"## Merge PLINK univariate association summary statistic to RDS format"
"cell_type": "code",
"execution_count": null,
"metadata": {
"kernel": "SoS"
"outputs": [],
"source": [
"parameter: molecular_pheno = path\n",
"# Analysis units file. For RDS files it can be generated by `ls *.rds | sed 's/\\.rds//g' > analysis_units.txt`\n",
"parameter: analysis_units = path\n",
"regions = [x.strip().split() for x in open(analysis_units).readlines() if x.strip() and not x.strip().startswith('#')]\n",
"input: molecular_pheno, for_each = \"regions\"\n",
"output: f'{cwd:a}/RDS/{_regions[0]}'\n",
"task: trunk_workers = 1, trunk_size = job_size, walltime = '4h', mem = '6G', tags = f'{step_name}_{_output:bn}' \n",
"R: expand = \"$[ ]\", stderr = f'{_output}.stderr', stdout = f'{_output}.stdout'\n",
" library(\"dplyr\")\n",
" library(\"tibble\")\n",
" library(\"purrr\")\n",
" library(\"readr\")\n",
" molecular_pheno = read_delim($[molecular_pheno:r], delim = \"\\t\")\n",
" molecular_pheno = molecular_pheno%>%mutate(dir = map_chr(`#molc_pheno`,~paste(c(`.x`,\"$[_regions[0]]\"),collapse = \"\")))\n",
" n = nrow(molecular_pheno)\n",
" # For every condition read rds and extract the bhat and sbhat.\n",
" genos = tibble( i = 1:n)\n",
" genos = genos%>%mutate(bhat = map(i, ~readRDS(molecular_pheno[[.x,2]])$bhat%>>%rownames_to_column),\n",
" sbhat = map(i, ~readRDS(molecular_pheno[[.x,2]])$sbhat%>>%rownames_to_column))\n",
" \n",
" # Join first two conditions\n",
" genos_join_bhat = full_join((genos%>%pull(bhat))[[1]],(genos%>%pull(bhat))[[2]],by = \"rowname\")\n",
" genos_join_sbhat = full_join((genos%>%pull(sbhat))[[1]],(genos%>%pull(sbhat))[[2]],by = \"rowname\")\n",
" \n",
" # If there are more conditions, join the rest\n",
" if(n > 2){\n",
" for(j in 3:n){\n",
" genos_join_bhat = full_join(genos_join_bhat,(genos%>%pull(bhat))[[j]],by = \"rowname\")%>%select(-rowname)%>%as.matrix\n",
" genos_join_sbhat = full_join(genos_join_sbhat,(genos%>%pull(sbhat))[[j]],by = \"rowname\")%>%select(-rowname)%>%as.matrix\n",
" }\n",
" }\n",
" \n",
" name = molecular_pheno%>%mutate(name = map(`#molc_pheno`, ~read.table(text = .x,sep = \"/\")),\n",
" name = map_chr(name, ~.x[,ncol(.x)-2]%>%as.character) )%>%pull(name)\n",
" colnames(genos_join_bhat) = name\n",
" colnames(genos_join_sbhat) = name\n",
" \n",
" \n",
" # save the rds file\n",
" saveRDS(file = \"$[_output]\", list(bhat=genos_join_bhat, sbhat=genos_join_sbhat))"
"cell_type": "markdown",
"metadata": {
"kernel": "SoS"
"source": [
"## Get top, random and null effects per analysis unit"
"cell_type": "code",
"execution_count": null,
"metadata": {
"kernel": "SoS"
"outputs": [],
"source": [
"# extract data for MASH from summary stats\n",
"parameter: table_name = \"\"\n",
"parameter: bhat = \"bhat\"\n",
"parameter: sbhat = \"sbhat\"\n",
"parameter: expected_ncondition = 0\n",
"parameter: datadir = path\n",
"parameter: seed = 999\n",
"parameter: n_random = 4\n",
"parameter: n_null = 4\n",
"parameter: z_only = True\n",
"# Analysis units file. For RDS files it can be generated by `ls *.rds | sed 's/\\.rds//g' > analysis_units.txt`\n",
"parameter: analysis_units = path\n",
"# handle N = per_chunk data-set in one job\n",
"parameter: per_chunk = 1000\n",
"regions = [x.strip().split() for x in open(analysis_units).readlines() if x.strip() and not x.strip().startswith('#')]\n",
"input: [f'{datadir}/{x[0]}.rds' for x in regions], group_by = per_chunk\n",
"output: f\"{cwd}/{name}/cache/{name}_{_index+1}.rds\"\n",
"task: trunk_workers = 1, walltime = '1h', trunk_size = 1, mem = '4G', cores = 1, tags = f'{_output:bn}'\n",
"R: expand = \"${ }\"\n",
" set.seed(${seed})\n",
" matxMax <- function(mtx) {\n",
" return(arrayInd(which.max(mtx), dim(mtx)))\n",
" }\n",
" remove_rownames = function(x) {\n",
" for (name in names(x)) rownames(x[[name]]) = NULL\n",
" return(x)\n",
" }\n",
" handle_nan_etc = function(x) {\n",
" x$bhat[which(is.nan(x$bhat))] = 0\n",
" x$sbhat[which(is.nan(x$sbhat) | is.infinite(x$sbhat))] = 1E3\n",
" return(x)\n",
" }\n",
" extract_one_data = function(dat, n_random, n_null, infile) {\n",
" if (is.null(dat)) return(NULL)\n",
" z = abs(dat$${bhat}/dat$${sbhat})\n",
" max_idx = matxMax(z)\n",
" # strong effect samples\n",
" strong = list(bhat = dat$${bhat}[max_idx[1],,drop=F], sbhat = dat$${sbhat}[max_idx[1],,drop=F])\n",
" # random samples excluding the top one\n",
" if (max_idx[1] == 1) {\n",
" sample_idx = 2:nrow(z)\n",
" } else if (max_idx[1] == nrow(z)) {\n",
" sample_idx = 1:(max_idx[1]-1)\n",
" } else {\n",
" sample_idx = c(1:(max_idx[1]-1), (max_idx[1]+1):nrow(z))\n",
" }\n",
" random_idx = sample(sample_idx, min(n_random, length(sample_idx)), replace = F)\n",
" random = list(bhat = dat$${bhat}[random_idx,,drop=F], sbhat = dat$${sbhat}[random_idx,,drop=F])\n",
" # null samples defined as |z| < 2\n",
" = which(apply(abs(z), 1, max) < 2)\n",
" if (length( == 0) {\n",
" warning(paste(\"Null data is empty for input file\", infile))\n",
" null = list()\n",
" } else {\n",
" null_idx = sample(, min(n_null, length(, replace = F)\n",
" null = list(bhat = dat$${bhat}[null_idx,,drop=F], sbhat = dat$${sbhat}[null_idx,,drop=F])\n",
" }\n",
" dat = (list(random = remove_rownames(random), null = remove_rownames(null), strong = remove_rownames(strong)))\n",
" dat$random = handle_nan_etc(dat$random)\n",
" dat$null = handle_nan_etc(dat$null)\n",
" dat$strong = handle_nan_etc(dat$strong)\n",
" return(dat)\n",
" }\n",
" reformat_data = function(dat, z_only = TRUE) {\n",
" # make output consistent in format with \n",
" # \n",
" res = list(random.z = dat$random$bhat/dat$random$sbhat, \n",
" strong.z = dat$strong$bhat/dat$strong$sbhat, \n",
" null.z = dat$null$bhat/dat$null$sbhat)\n",
" if (!z_only) {\n",
" res = c(res, list(random.b = dat$random$bhat,\n",
" strong.b = dat$strong$bhat,\n",
" null.b = dat$null$bhat,\n",
" null.s = dat$null$sbhat,\n",
" random.s = dat$random$sbhat,\n",
" strong.s = dat$strong$sbhat))\n",
" }\n",
" return(res)\n",
" }\n",
" merge_data = function(res, one_data) {\n",
" if (length(res) == 0) {\n",
" return(one_data)\n",
" } else if (is.null(one_data)) {\n",
" return(res)\n",
" } else {\n",
" for (d in names(one_data)) {\n",
" if (is.null(one_data[[d]])) {\n",
" next\n",
" } else {\n",
" res[[d]] = rbind(res[[d]], one_data[[d]])\n",
" }\n",
" }\n",
" return(res)\n",
" }\n",
" }\n",
" res = list()\n",
" for (f in c(${_input:r,})) {\n",
" # If cannot read the input for some reason then we just skip it, assuming we have other enough data-sets to use.\n",
" dat = tryCatch(readRDS(f), error = function(e) return(NULL))${(\"$\"+table_name) if table_name != \"\" else \"\"}\n",
" if (is.null(dat)) {\n",
" message(paste(\"Skip loading file\", f, \"due to load failure.\"))\n",
" next\n",
" }\n",
" if (${expected_ncondition} > 0 && (ncol(dat$${bhat}) != ${expected_ncondition} || ncol(dat$${sbhat}) != ${expected_ncondition})) {\n",
" message(paste(\"Skip loading file\", f, \"because it has\", ncol(dat$${bhat}), \"columns different from required\", ${expected_ncondition}))\n",
" next\n",
" }\n",
" res = merge_data(res, reformat_data(extract_one_data(dat, ${n_random}, ${n_null}, f), ${\"TRUE\" if z_only else \"FALSE\"}))\n",
" }\n",
" saveRDS(res, ${_output:r})"
"cell_type": "code",
"execution_count": null,
"metadata": {
"kernel": "SoS"
"outputs": [],
"source": [
"input: group_by = \"all\"\n",
"output: f\"{cwd}/{name}.rds\"\n",
"task: trunk_workers = 1, walltime = '1h', trunk_size = 1, mem = '16G', cores = 1, tags = f'{_output:bn}'\n",
"R: expand = \"${ }\"\n",
" merge_data = function(res, one_data) {\n",
" if (length(res) == 0) {\n",
" return(one_data)\n",
" } else {\n",
" for (d in names(one_data)) {\n",
" res[[d]] = rbind(res[[d]], one_data[[d]])\n",
" }\n",
" return(res)\n",
" }\n",
" }\n",
" dat = list()\n",
" for (f in c(${_input:r,})) {\n",
" dat = merge_data(dat, readRDS(f))\n",
" }\n",
" # compute empirical covariance XtX\n",
" X = dat$strong.z\n",
" X[] = 0\n",
" dat$XtX = t(as.matrix(X)) %*% as.matrix(X) / nrow(X)\n",
" saveRDS(dat, ${_output:r})"
"metadata": {
"kernelspec": {
"display_name": "SoS",
"language": "sos",
"name": "sos"
"language_info": {
"codemirror_mode": "sos",
"file_extension": ".sos",
"mimetype": "text/x-sos",
"name": "sos",
"nbconvert_exporter": "sos_notebook.converter.SoS_Exporter",
"pygments_lexer": "sos"
"sos": {
"kernels": [
"version": "0.22.4"
"nbformat": 4,
"nbformat_minor": 4

0 comments on commit ebf9ea8

Please sign in to comment.