Skip to content

Commit

Permalink
Address fourth iteration of comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
tneymanov committed May 4, 2020
1 parent 5d7a264 commit 050e187
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 28 deletions.
46 changes: 22 additions & 24 deletions gcp_variant_transforms/bq_to_vcf.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,7 @@
_VCF_VERSION_LINE = (
vcf_header_io.FILE_FORMAT_HEADER_TEMPLATE.format(VERSION='4.3') + '\n')
_SAMPLE_INFO_QUERY_TEMPLATE = (
'SELECT sample_id, sample_name, file_path '
'FROM `{PROJECT_ID}.{DATASET_ID}.{BASE_TABLE_ID}' +
TABLE_SUFFIX_SEPARATOR + SAMPLE_INFO_TABLE_SUFFIX + '`')
'SELECT sample_id, sample_name, file_path FROM `{TABLE_NAME}`')


def run(argv=None):
Expand Down Expand Up @@ -175,17 +173,17 @@ def _bigquery_to_vcf_shards(
schema = _get_schema(known_args.input_table)
variant_query = _get_variant_query(known_args, schema)
logging.info('Processing BigQuery query %s:', variant_query)
project_id, dataset_id, table_id = bigquery_util.parse_table_reference(
_, _, table_id = bigquery_util.parse_table_reference(
known_args.input_table)
bq_variant_source = bigquery.BigQuerySource(query=variant_query,
validate=True,
use_standard_sql=True)
annotation_names = _extract_annotation_names(schema)

base_table_id = bigquery_util.get_table_base_name(table_id)
sample_query = _SAMPLE_INFO_QUERY_TEMPLATE.format(PROJECT_ID=project_id,
DATASET_ID=dataset_id,
BASE_TABLE_ID=base_table_id)
sample_query = _SAMPLE_INFO_QUERY_TEMPLATE.format(
TABLE_NAME=bigquery_util.compose_table_name(base_table_id,
TABLE_SUFFIX_SEPARATOR))
bq_sample_source = bigquery.BigQuerySource(query=sample_query,
validate=True,
use_standard_sql=True)
Expand All @@ -196,39 +194,39 @@ def _bigquery_to_vcf_shards(
sample_table_rows = (
p
| 'ReadFromSampleTable' >> beam.io.Read(bq_sample_source))
name_to_id_hash_table = (
sample_table_rows
| 'SampleNameToIdDict' >> sample_mapping_table.SampleNameToIdDict())
if known_args.sample_names:
sample_names = (p
| transforms.Create(known_args.sample_names,
reshuffle=False))
temp_sample_names = (p
| transforms.Create(known_args.sample_names,
reshuffle=False))
else:
# Get sample names from sample IDs in the variants and sort.
id_to_name_hash_table = (
sample_table_rows
| 'SampleIdToNameDict' >> sample_mapping_table.SampleIdToNameDict())
sample_ids = (variants
| 'CombineSampleIds' >>
combine_sample_ids.SampleIdsCombiner(
known_args.preserve_sample_order))
sample_names = (
sample_ids
temp_sample_ids = (variants
| 'CombineSampleIds' >>
combine_sample_ids.SampleIdsCombiner(
known_args.preserve_sample_order))
temp_sample_names = (
temp_sample_ids
| 'GetSampleNames' >>
sample_mapping_table.GetSampleNames(
beam.pvalue.AsSingleton(id_to_name_hash_table))
| 'CombineToList' >> beam.combiners.ToList()
| 'SortSampleNames' >> beam.ParDo(sorted))

combined_sample_ids = (
sample_names
name_to_id_hash_table = (
sample_table_rows
| 'SampleNameToIdDict' >> sample_mapping_table.SampleNameToIdDict())
sample_ids = (
temp_sample_names
| 'GetSampleIds' >>
sample_mapping_table.GetSampleIds(
beam.pvalue.AsSingleton(name_to_id_hash_table))
| 'CombineSortedSampleIds' >> beam.combiners.ToList())
combined_sample_names = sample_names | beam.combiners.ToList()
sample_names = temp_sample_names | beam.combiners.ToList()

_ = (combined_sample_names
_ = (sample_names
| 'GenerateVcfDataHeader' >>
beam.ParDo(_write_vcf_header_with_sample_names,
_VCF_FIXED_COLUMNS,
Expand All @@ -237,7 +235,7 @@ def _bigquery_to_vcf_shards(

_ = (variants
| densify_variants.DensifyVariants(
beam.pvalue.AsSingleton(combined_sample_ids))
beam.pvalue.AsSingleton(sample_ids))
| 'PairVariantWithKey' >>
beam.Map(_pair_variant_with_key, known_args.number_of_bases_per_shard)
| 'GroupVariantsByKey' >> beam.GroupByKey()
Expand Down
4 changes: 2 additions & 2 deletions gcp_variant_transforms/options/variant_transform_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -606,8 +606,8 @@ def validate(self, parsed_args, client=None):
'required'.format(parsed_args.input_table,
TABLE_SUFFIX_SEPARATOR))
base_table_id = table_id[:table_id.find(TABLE_SUFFIX_SEPARATOR)]
sample_table_id = (
base_table_id + TABLE_SUFFIX_SEPARATOR + SAMPLE_INFO_TABLE_SUFFIX)
sample_table_id = bigquery_util.compose_table_name(base_table_id,
SAMPLE_INFO_TABLE_SUFFIX)

if not bigquery_util.table_exist(client, project_id, dataset_id,
sample_table_id):
Expand Down
8 changes: 6 additions & 2 deletions gcp_variant_transforms/transforms/sample_mapping_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ def _get_sample_name(self, sample_id, id_to_name_dict):
raise ValueError('Sample ID `{}` was not found.'.format(sample_id))

def expand(self, pcoll):
return pcoll | beam.Map(self._get_sample_name, self._id_to_name_dict)
return (pcoll
| 'Generate Name to ID Mapping'
>> beam.Map(self._get_sample_name, self._id_to_name_dict))

class GetSampleIds(beam.PTransform):
"""Looks up sample_ids corresponding to the given sample_names"""
Expand All @@ -81,4 +83,6 @@ def _get_sample_id(self, sample_name, name_to_id_dict):
raise ValueError('Sample `{}` was not found.'.format(sample_name))

def expand(self, pcoll):
return pcoll | beam.Map(self._get_sample_id, self._name_to_id_dict)
return (pcoll
| 'Generate Name to ID Mapping'
>> beam.Map(self._get_sample_id, self._name_to_id_dict))

0 comments on commit 050e187

Please sign in to comment.