Skip to content

Commit

Permalink
Address second iteration of comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
tneymanov committed Apr 20, 2020
1 parent d33ae65 commit 8066aa6
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 25 deletions.
2 changes: 1 addition & 1 deletion cloudbuild_CI.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,4 @@ steps:
# - '--gs_dir bashir-variant_integration_test_runs'
images:
- 'gcr.io/${PROJECT_ID}/gcp-variant-transforms:${COMMIT_SHA}'
timeout: 240m
timeout: 270m
51 changes: 27 additions & 24 deletions gcp_variant_transforms/bq_to_vcf.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,13 @@
_COMMAND_LINE_OPTIONS = [variant_transform_options.BigQueryToVcfOptions]
TABLE_SUFFIX_SEPARATOR = bigquery_util.TABLE_SUFFIX_SEPARATOR
SAMPLE_INFO_TABLE_SUFFIX = bigquery_util.SAMPLE_INFO_TABLE_SUFFIX
_FULL_INPUT_TABLE = '{TABLE}' + TABLE_SUFFIX_SEPARATOR + '{SUFFIX}'
_GENOMIC_REGION_TEMPLATE = ('({REFERENCE_NAME_ID}="{REFERENCE_NAME_VALUE}" AND '
'{START_POSITION_ID}>={START_POSITION_VALUE} AND '
'{END_POSITION_ID}<={END_POSITION_VALUE})')
_VCF_FIXED_COLUMNS = ['#CHROM', 'POS', 'ID', 'REF', 'ALT', 'QUAL', 'FILTER',
'INFO', 'FORMAT']
_VCF_VERSION_LINE = '##fileformat=VCFv4.3\n'
_VCF_VERSION_LINE = (
vcf_header_io.FILE_FORMAT_HEADER_TEMPLATE.format(VERSION='4.3') + '\n')
_SAMPLE_INFO_QUERY_TEMPLATE = (
'SELECT sample_id, sample_name, file_path '
'FROM `{PROJECT_ID}.{DATASET_ID}.{BASE_TABLE_ID}' +
Expand Down Expand Up @@ -146,7 +146,7 @@ def run(argv=None):
def _write_vcf_meta_info(input_table,
representative_header_file,
allow_incompatible_schema):
# type: (str, str, bool, str) -> None
# type: (str, str, bool) -> None
"""Writes the meta information generated from BigQuery schema."""
header_fields = (
schema_converter.generate_header_fields_from_schema(
Expand All @@ -173,16 +173,16 @@ def _bigquery_to_vcf_shards(
`vcf_header_file_path`.
"""
schema = _get_schema(known_args.input_table)
query = _get_variant_query(known_args, schema)
logging.info('Processing BigQuery query %s:', query)
variant_query = _get_variant_query(known_args, schema)
logging.info('Processing BigQuery query %s:', variant_query)
project_id, dataset_id, table_id = bigquery_util.parse_table_reference(
known_args.input_table)
bq_variant_source = bigquery.BigQuerySource(query=query,
bq_variant_source = bigquery.BigQuerySource(query=variant_query,
validate=True,
use_standard_sql=True)
annotation_names = _extract_annotation_names(schema)

base_table_id = table_id[:table_id.find(TABLE_SUFFIX_SEPARATOR)]
base_table_id = bigquery_util.get_table_base_name(table_id)
sample_query = _SAMPLE_INFO_QUERY_TEMPLATE.format(PROJECT_ID=project_id,
DATASET_ID=dataset_id,
BASE_TABLE_ID=base_table_id)
Expand All @@ -197,42 +197,45 @@ def _bigquery_to_vcf_shards(
p
| 'ReadFromSampleTable' >> beam.io.Read(bq_sample_source))
if known_args.sample_names:
hash_table = (
name_to_id_hash_table = (
sample_table_rows
| 'SampleNameToIdDict' >> sample_mapping_table.SampleNameToIdDict())
sample_names = (p
| transforms.Create(known_args.sample_names,
reshuffle=False))
sample_ids = (sample_names
| 'GetSampleIds' >>
sample_mapping_table.GetSampleIds(
beam.pvalue.AsSingleton(hash_table))
| 'CombineSampleIds' >> beam.combiners.ToList())
sample_names = sample_names | beam.combiners.ToList()
combined_sample_ids = (
sample_names
| 'GetSampleIds' >>
sample_mapping_table.GetSampleIds(
beam.pvalue.AsSingleton(name_to_id_hash_table))
| 'CombineSampleIds' >> beam.combiners.ToList())
combined_sample_names = sample_names | beam.combiners.ToList()
else:
hash_table = (
id_to_name_hash_table = (
sample_table_rows
| 'SampleIdToNameDict' >> sample_mapping_table.SampleIdToNameDict())
sample_ids = (variants
| 'CombineSampleIds' >>
combine_sample_ids.SampleIdsCombiner(
known_args.preserve_sample_order))
sample_names = (sample_ids
| 'GetSampleNames' >>
sample_mapping_table.GetSampleNames(
beam.pvalue.AsSingleton(hash_table))
| 'CombineSampleNames' >> beam.combiners.ToList())
sample_ids = sample_ids | beam.combiners.ToList()

_ = (sample_names
combined_sample_names = (
sample_ids
| 'GetSampleNames' >>
sample_mapping_table.GetSampleNames(
beam.pvalue.AsSingleton(id_to_name_hash_table))
| 'CombineSampleNames' >> beam.combiners.ToList())
combined_sample_ids = sample_ids | beam.combiners.ToList()

_ = (combined_sample_names
| 'GenerateVcfDataHeader' >>
beam.ParDo(_write_vcf_header_with_sample_names,
_VCF_FIXED_COLUMNS,
known_args.representative_header_file,
header_file_path))

_ = (variants
| densify_variants.DensifyVariants(beam.pvalue.AsSingleton(sample_ids))
| densify_variants.DensifyVariants(
beam.pvalue.AsSingleton(combined_sample_ids))
| 'PairVariantWithKey' >>
beam.Map(_pair_variant_with_key, known_args.number_of_bases_per_shard)
| 'GroupVariantsByKey' >> beam.GroupByKey()
Expand Down

0 comments on commit 8066aa6

Please sign in to comment.