Skip to content

Commit

Permalink
Bq to vcf sample ids (#557)
Browse files Browse the repository at this point in the history
  • Loading branch information
tneymanov authored May 8, 2020
1 parent ef31cc0 commit 7f9136b
Show file tree
Hide file tree
Showing 21 changed files with 342 additions and 52 deletions.
2 changes: 1 addition & 1 deletion cloudbuild_CI.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,4 @@ steps:
# - '--gs_dir bashir-variant_integration_test_runs'
images:
- 'gcr.io/${PROJECT_ID}/gcp-variant-transforms:${COMMIT_SHA}'
timeout: 240m
timeout: 270m
84 changes: 63 additions & 21 deletions gcp_variant_transforms/bq_to_vcf.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,25 @@
from gcp_variant_transforms.transforms import bigquery_to_variant
from gcp_variant_transforms.transforms import combine_sample_ids
from gcp_variant_transforms.transforms import densify_variants
from gcp_variant_transforms.transforms import sample_mapping_table



_BASE_QUERY_TEMPLATE = 'SELECT {COLUMNS} FROM `{INPUT_TABLE}`'
_BQ_TO_VCF_SHARDS_JOB_NAME = 'bq-to-vcf-shards'
_COMMAND_LINE_OPTIONS = [variant_transform_options.BigQueryToVcfOptions]
TABLE_SUFFIX_SEPARATOR = bigquery_util.TABLE_SUFFIX_SEPARATOR
SAMPLE_INFO_TABLE_SUFFIX = bigquery_util.SAMPLE_INFO_TABLE_SUFFIX
_GENOMIC_REGION_TEMPLATE = ('({REFERENCE_NAME_ID}="{REFERENCE_NAME_VALUE}" AND '
'{START_POSITION_ID}>={START_POSITION_VALUE} AND '
'{END_POSITION_ID}<={END_POSITION_VALUE})')
_VCF_FIXED_COLUMNS = ['#CHROM', 'POS', 'ID', 'REF', 'ALT', 'QUAL', 'FILTER',
'INFO', 'FORMAT']
_VCF_VERSION_LINE = '##fileformat=VCFv4.3\n'
_VCF_VERSION_LINE = (
vcf_header_io.FILE_FORMAT_HEADER_TEMPLATE.format(VERSION='4.3') + '\n')
_SAMPLE_INFO_QUERY_TEMPLATE = (
'SELECT sample_id, sample_name, file_path FROM '
'`{PROJECT_ID}.{DATASET_ID}.{TABLE_NAME}`')


def run(argv=None):
Expand Down Expand Up @@ -164,30 +172,63 @@ def _bigquery_to_vcf_shards(
`vcf_header_file_path`.
"""
schema = _get_schema(known_args.input_table)
# TODO(allieychen): Modify the SQL query with the specified sample_ids.
query = _get_bigquery_query(known_args, schema)
logging.info('Processing BigQuery query %s:', query)
bq_source = bigquery.BigQuerySource(query=query,
validate=True,
use_standard_sql=True)
variant_query = _get_variant_query(known_args, schema)
logging.info('Processing BigQuery query %s:', variant_query)
project_id, dataset_id, table_id = bigquery_util.parse_table_reference(
known_args.input_table)
bq_variant_source = bigquery.BigQuerySource(query=variant_query,
validate=True,
use_standard_sql=True)
annotation_names = _extract_annotation_names(schema)

base_table_id = bigquery_util.get_table_base_name(table_id)
sample_query = _SAMPLE_INFO_QUERY_TEMPLATE.format(
PROJECT_ID=project_id,
DATASET_ID=dataset_id,
TABLE_NAME=bigquery_util.compose_table_name(base_table_id,
SAMPLE_INFO_TABLE_SUFFIX))
bq_sample_source = bigquery.BigQuerySource(query=sample_query,
validate=True,
use_standard_sql=True)
with beam.Pipeline(options=beam_pipeline_options) as p:
variants = (p
| 'ReadFromBigQuery ' >> beam.io.Read(bq_source)
| 'ReadFromBigQuery ' >> beam.io.Read(bq_variant_source)
| bigquery_to_variant.BigQueryToVariant(annotation_names))
sample_table_rows = (
p
| 'ReadFromSampleTable' >> beam.io.Read(bq_sample_source))
if known_args.sample_names:
sample_ids = (p
| transforms.Create(known_args.sample_names,
reshuffle=False)
| beam.combiners.ToList())
temp_sample_names = (p
| transforms.Create(known_args.sample_names,
reshuffle=False))
else:
sample_ids = (variants
| 'CombineSampleIds' >>
combine_sample_ids.SampleIdsCombiner(
known_args.preserve_sample_order))
# TODO(tneymanov): Add logic to extract sample names from sample IDs by
# joining with sample id-name mapping table, once that code is implemented.
sample_names = sample_ids
# Get sample names from sample IDs in the variants and sort.
id_to_name_hash_table = (
sample_table_rows
| 'SampleIdToNameDict' >> sample_mapping_table.SampleIdToNameDict())
temp_sample_ids = (variants
| 'CombineSampleIds' >>
combine_sample_ids.SampleIdsCombiner(
known_args.preserve_sample_order))
temp_sample_names = (
temp_sample_ids
| 'GetSampleNames' >>
sample_mapping_table.GetSampleNames(
beam.pvalue.AsSingleton(id_to_name_hash_table))
| 'CombineToList' >> beam.combiners.ToList()
| 'SortSampleNames' >> beam.ParDo(sorted))

name_to_id_hash_table = (
sample_table_rows
| 'SampleNameToIdDict' >> sample_mapping_table.SampleNameToIdDict())
sample_ids = (
temp_sample_names
| 'GetSampleIds' >>
sample_mapping_table.GetSampleIds(
beam.pvalue.AsSingleton(name_to_id_hash_table))
| 'CombineSortedSampleIds' >> beam.combiners.ToList())
sample_names = temp_sample_names | beam.combiners.ToList()

_ = (sample_names
| 'GenerateVcfDataHeader' >>
beam.ParDo(_write_vcf_header_with_sample_names,
Expand All @@ -196,7 +237,8 @@ def _bigquery_to_vcf_shards(
header_file_path))

_ = (variants
| densify_variants.DensifyVariants(beam.pvalue.AsSingleton(sample_ids))
| densify_variants.DensifyVariants(
beam.pvalue.AsSingleton(sample_ids))
| 'PairVariantWithKey' >>
beam.Map(_pair_variant_with_key, known_args.number_of_bases_per_shard)
| 'GroupVariantsByKey' >> beam.GroupByKey()
Expand All @@ -216,7 +258,7 @@ def _get_schema(input_table):
return table.schema


def _get_bigquery_query(known_args, schema):
def _get_variant_query(known_args, schema):
# type: (argparse.Namespace, bigquery_v2.TableSchema) -> str
"""Returns a BigQuery query for the interested regions."""
columns = _get_query_columns(schema)
Expand Down
8 changes: 4 additions & 4 deletions gcp_variant_transforms/bq_to_vcf_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def test_write_vcf_data_header(self):
content = f.readlines()
self.assertEqual(content, expected_content)

def test_get_bigquery_query_no_region(self):
def test_get_variant_query_no_region(self):
args = self._create_mock_args(
input_table='my_bucket:my_dataset.my_table',
genomic_regions=None)
Expand All @@ -71,11 +71,11 @@ def test_get_bigquery_query_no_region(self):
type=bigquery_util.TableFieldConstants.TYPE_STRING,
mode=bigquery_util.TableFieldConstants.MODE_NULLABLE,
description='Reference name.'))
self.assertEqual(bq_to_vcf._get_bigquery_query(args, schema),
self.assertEqual(bq_to_vcf._get_variant_query(args, schema),
'SELECT reference_name FROM '
'`my_bucket.my_dataset.my_table`')

def test_get_bigquery_query_with_regions(self):
def test_get_variant_query_with_regions(self):
args_1 = self._create_mock_args(
input_table='my_bucket:my_dataset.my_table',
genomic_regions=['c1:1,000-2,000', 'c2'])
Expand All @@ -98,7 +98,7 @@ def test_get_bigquery_query_with_regions(self):
'OR (reference_name="c2" AND start_position>=0 AND '
'end_position<=9223372036854775807)'
)
self.assertEqual(bq_to_vcf._get_bigquery_query(args_1, schema),
self.assertEqual(bq_to_vcf._get_variant_query(args_1, schema),
expected_query)

def test_get_query_columns(self):
Expand Down
2 changes: 2 additions & 0 deletions gcp_variant_transforms/libs/bigquery_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
_MAX_BQ_NUM_PARTITIONS = 4000
_RANGE_END_SIG_DIGITS = 4
_RANGE_INTERVAL_SIG_DIGITS = 1
_TOTAL_BASE_PAIRS_SIG_DIGITS = 4
_PARTITION_SIZE_SIG_DIGITS = 1

START_POSITION_COLUMN = 'start_position'
_BQ_CREATE_PARTITIONED_TABLE_COMMAND = (
Expand Down
31 changes: 29 additions & 2 deletions gcp_variant_transforms/options/variant_transform_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
from gcp_variant_transforms.libs import bigquery_util
from gcp_variant_transforms.libs import variant_sharding

TABLE_SUFFIX_SEPARATOR = bigquery_util.TABLE_SUFFIX_SEPARATOR
SAMPLE_INFO_TABLE_SUFFIX = bigquery_util.SAMPLE_INFO_TABLE_SUFFIX


class VariantTransformsOptions(object):
"""Base class for defining groups of options for Variant Transforms.
Expand Down Expand Up @@ -215,8 +218,7 @@ def validate(self, parsed_args, client=None):
dataset_id)
all_output_tables = []
all_output_tables.append(
bigquery_util.compose_table_name(
table_id, bigquery_util.SAMPLE_INFO_TABLE_SUFFIX))
bigquery_util.compose_table_name(table_id, SAMPLE_INFO_TABLE_SUFFIX))
sharding = variant_sharding.VariantSharding(
parsed_args.sharding_config_path)
num_shards = sharding.get_num_shards()
Expand Down Expand Up @@ -587,6 +589,31 @@ def add_arguments(self, parser):
'extracted variants to have the same sample ordering (usually '
'true for tables from single VCF file import).'))

def validate(self, parsed_args, client=None):
if not client:
credentials = GoogleCredentials.get_application_default().create_scoped(
['https://www.googleapis.com/auth/bigquery'])
client = bigquery.BigqueryV2(credentials=credentials)

project_id, dataset_id, table_id = bigquery_util.parse_table_reference(
parsed_args.input_table)
if not bigquery_util.table_exist(client, project_id, dataset_id, table_id):
raise ValueError('Table {}:{}.{} does not exist.'.format(
project_id, dataset_id, table_id))
if table_id.count(TABLE_SUFFIX_SEPARATOR) != 1:
raise ValueError(
'Input table {} is malformed - exactly one suffix separator "{}" is '
'required'.format(parsed_args.input_table,
TABLE_SUFFIX_SEPARATOR))
base_table_id = table_id[:table_id.find(TABLE_SUFFIX_SEPARATOR)]
sample_table_id = bigquery_util.compose_table_name(base_table_id,
SAMPLE_INFO_TABLE_SUFFIX)

if not bigquery_util.table_exist(client, project_id, dataset_id,
sample_table_id):
raise ValueError('Sample table {}:{}.{} does not exist.'.format(
project_id, dataset_id, sample_table_id))


def _validate_inputs(parsed_args):
if ((parsed_args.input_pattern and parsed_args.input_file) or
Expand Down
14 changes: 14 additions & 0 deletions gcp_variant_transforms/testing/asserts.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,20 @@ def _has_sample_ids(variants):
return _has_sample_ids


def dict_values_equal(expected_dict):
"""Verifies that dictionary is the same as expected."""
def _items_equal(actual_dict):
actual = actual_dict[0]
for k in expected_dict:
if k not in actual or expected_dict[k] != actual[k]:
raise BeamAssertException(
'Failed assert: %s == %s' % (expected_dict, actual))
if len(expected_dict) != len(actual):
raise BeamAssertException(
'Failed assert: %s == %s' % (expected_dict, actual))
return _items_equal


def header_vars_equal(expected):
def _vars_equal(actual):
expected_vars = [vars(header) for header in expected]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
##INFO=<ID=AA,Number=1,Type=String,Description="Ancestral Allele">
##INFO=<ID=DB,Number=0,Type=Flag,Description="dbSNP membership, build 129">
##INFO=<ID=H2,Number=0,Type=Flag,Description="HapMap2 membership">
##FORMAT=<ID=GQ,Number=1,Type=Integer,Description="Genotype Quality">
##FORMAT=<ID=DP,Number=1,Type=Integer,Description="Read Depth">
##FORMAT=<ID=GQ,Number=1,Type=Integer,Description="Genotype Quality">
##FORMAT=<ID=HQ,Number=2,Type=Integer,Description="Haplotype Quality">
#CHROM POS ID REF ALT QUAL FILTER INFO FORMAT NA00001 NA00002 NA00003
19 1234567 microsat1 GTCT G,GTACT 50.0 PASS AA=G;NS=3;DP=9 GT:DP:GQ 0/1:4:35 0/2:2:17 1/1:3:40
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
##INFO=<ID=DB,Number=0,Type=Flag,Description="dbSNP membership, build 129">
##INFO=<ID=H2,Number=0,Type=Flag,Description="HapMap2 membership">
##INFO=<ID=SVTYPE,Number=1,Type=String,Description="Type of structural variant (with unïcodé)">
##FORMAT=<ID=GQ,Number=1,Type=Integer,Description="Genotype Quality">
##FORMAT=<ID=DP,Number=1,Type=Integer,Description="Read Depth">
##FORMAT=<ID=GQ,Number=1,Type=Integer,Description="Genotype Quality">
##FORMAT=<ID=HQ,Number=.,Type=Integer,Description="Haplotype Quality">
##FORMAT=<ID=GL,Number=.,Type=Integer,Description="Genotype Likelihood">
#CHROM POS ID REF ALT QUAL FILTER INFO FORMAT NA00001 NA00002 NA00003
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
##INFO=<ID=AA,Number=1,Type=String,Description="Ancestral Allele">
##INFO=<ID=DB,Number=0,Type=Flag,Description="dbSNP membership, build 129">
##INFO=<ID=H2,Number=0,Type=Flag,Description="HapMap2 membership">
##FORMAT=<ID=GQ,Number=1,Type=Integer,Description="Genotype Quality">
##FORMAT=<ID=DP,Number=1,Type=Integer,Description="Read Depth">
##FORMAT=<ID=GQ,Number=1,Type=Integer,Description="Genotype Quality">
##FORMAT=<ID=HQ,Number=2,Type=Integer,Description="Haplotype Quality">
#CHROM POS ID REF ALT QUAL FILTER INFO FORMAT NA00001 NA00003
19 1234567 microsat1 GTCT G,GTACT 50.0 PASS AA=G;NS=3;DP=9 GT:DP:GQ 0/1:4:35 1/1:3:40
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
##INFO=<ID=AA,Number=1,Type=String,Description="Ancestral Allele">
##INFO=<ID=DB,Number=0,Type=Flag,Description="dbSNP membership, build 129">
##INFO=<ID=H2,Number=0,Type=Flag,Description="HapMap2 membership">
##FORMAT=<ID=GQ,Number=1,Type=Integer,Description="Genotype Quality">
##FORMAT=<ID=DP,Number=1,Type=Integer,Description="Read Depth">
##FORMAT=<ID=GQ,Number=1,Type=Integer,Description="Genotype Quality">
##FORMAT=<ID=HQ,Number=2,Type=Integer,Description="Haplotype Quality">
#CHROM POS ID REF ALT QUAL FILTER INFO FORMAT NA00001 NA00002 NA00003 NA00004
19 1234567 microsat1;microsat2 GTCT G,GTACT 50.0 PASS AA=G;NS=2;DP=9 GT:DP:GQ 0/1:4:35 0/2:2:17 1/1:3:40 .:.:.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[
{
"test_name": "bq-to-vcf-no-options",
"input_table": "gcp-variant-transforms-test:bq_to_vcf_integration_tests.4_0_new_schema",
"input_table": "gcp-variant-transforms-test:bq_to_vcf_integration_tests.4_0__suffix",
"output_file_name": "bq_to_vcf_no_options.vcf",
"runner": "DirectRunner",
"expected_output_file": "gcp_variant_transforms/testing/data/vcf/bq_to_vcf/expected_output/no_options.vcf"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[
{
"test_name": "bq-to-vcf-option-allow-incompatible-schema",
"input_table": "gcp-variant-transforms-test:bq_to_vcf_integration_tests.4_2_new_schema",
"input_table": "gcp-variant-transforms-test:bq_to_vcf_integration_tests.4_2__suffix",
"output_file_name": "bq_to_vcf_option_allow_incompatible_schema.vcf",
"allow_incompatible_schema": true,
"runner": "DirectRunner",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[
{
"test_name": "bq-to-vcf-option-customized-export",
"input_table": "gcp-variant-transforms-test:bq_to_vcf_integration_tests.4_0_new_schema",
"input_table": "gcp-variant-transforms-test:bq_to_vcf_integration_tests.4_0__suffix",
"output_file_name": "bq_to_vcf_option_customized_export.vcf",
"genomic_regions": "19:1234566-1234570 20:14369-17330",
"sample_names": "NA00001 NA00003",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
[
{
"test_name": "bq-to-vcf-option-number-of-bases-per-shard",
"input_table": "gcp-variant-transforms-test:bq_to_vcf_integration_tests.platinum_NA12877_hg38_10K_lines_new_schema",
"input_table": "gcp-variant-transforms-test:bq_to_vcf_integration_tests.platinum_NA12877_hg38_10K_lines__suffix",
"output_file_name": "bq_to_vcf_option_number_of_bases_per_shard.vcf",
"number_of_bases_per_shard": 100000,
"runner": "DataflowRunner",
"expected_output_file": "gs://gcp-variant-transforms-testfiles/bq_to_vcf_expected_output/platinum_NA12877_hg38_10K_lines.vcf"
"expected_output_file": "gs://gcp-variant-transforms-testfiles/bq_to_vcf_expected_output/platinum_NA12877_hg38_10K_lines_v2.vcf"
}
]
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[
{
"test_name": "bq-to-vcf-option-preserve-call-names-order",
"input_table": "gcp-variant-transforms-test:bq_to_vcf_integration_tests.merge_option_move_to_calls_new_schema",
"input_table": "gcp-variant-transforms-test:bq_to_vcf_integration_tests.merge_option_move_to_calls__suffix",
"output_file_name": "bq_to_vcf_option_preserve_sample_order.vcf",
"preserve_sample_order": false,
"runner": "DirectRunner",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[
{
"test_name": "bq-to-vcf-option-representative-header-file",
"input_table": "gcp-variant-transforms-test:bq_to_vcf_integration_tests.4_0_new_schema",
"input_table": "gcp-variant-transforms-test:bq_to_vcf_integration_tests.4_0__suffix",
"representative_header_file": "gs://gcp-variant-transforms-testfiles/small_tests/valid-4.0.vcf",
"preserve_sample_order": true,
"output_file_name": "bq_to_vcf_option_representative_header_file.vcf",
Expand Down
6 changes: 2 additions & 4 deletions gcp_variant_transforms/transforms/combine_sample_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,10 @@ def expand(self, pcoll):
| 'RemoveDuplicates' >> beam.RemoveDuplicates()
| 'Combine' >> beam.combiners.ToList()
| 'ExtractUniqueSampleIds'
>> beam.ParDo(self._extract_unique_sample_ids)
| beam.combiners.ToList())
>> beam.ParDo(self._extract_unique_sample_ids))
else:
return (pcoll
| 'GetSampleIds' >> beam.FlatMap(self._get_sample_ids)
| 'RemoveDuplicates' >> beam.RemoveDuplicates()
| 'Combine' >> beam.combiners.ToList()
| 'SortSampleIds' >> beam.ParDo(sorted)
| beam.combiners.ToList())
| 'SortSampleIds' >> beam.ParDo(sorted))
Loading

0 comments on commit 7f9136b

Please sign in to comment.