Skip to content

Commit

Permalink
Address third iteration of comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
tneymanov committed Apr 30, 2020
1 parent 1ee5163 commit 4b732c7
Show file tree
Hide file tree
Showing 12 changed files with 51 additions and 137 deletions.
28 changes: 15 additions & 13 deletions gcp_variant_transforms/bq_to_vcf.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,35 +196,37 @@ def _bigquery_to_vcf_shards(
sample_table_rows = (
p
| 'ReadFromSampleTable' >> beam.io.Read(bq_sample_source))
name_to_id_hash_table = (
sample_table_rows
| 'SampleNameToIdDict' >> sample_mapping_table.SampleNameToIdDict())
if known_args.sample_names:
name_to_id_hash_table = (
sample_table_rows
| 'SampleNameToIdDict' >> sample_mapping_table.SampleNameToIdDict())
sample_names = (p
| transforms.Create(known_args.sample_names,
reshuffle=False))
combined_sample_ids = (
sample_names
| 'GetSampleIds' >>
sample_mapping_table.GetSampleIds(
beam.pvalue.AsSingleton(name_to_id_hash_table))
| 'CombineSampleIds' >> beam.combiners.ToList())
combined_sample_names = sample_names | beam.combiners.ToList()
else:
# Get sample names from sample IDs in the variants and sort.
id_to_name_hash_table = (
sample_table_rows
| 'SampleIdToNameDict' >> sample_mapping_table.SampleIdToNameDict())
sample_ids = (variants
| 'CombineSampleIds' >>
combine_sample_ids.SampleIdsCombiner(
known_args.preserve_sample_order))
combined_sample_names = (
sample_names = (
sample_ids
| 'GetSampleNames' >>
sample_mapping_table.GetSampleNames(
beam.pvalue.AsSingleton(id_to_name_hash_table))
| 'CombineSampleNames' >> beam.combiners.ToList())
combined_sample_ids = sample_ids | beam.combiners.ToList()
| 'CombineToList' >> beam.combiners.ToList()
| 'SortSampleNames' >> beam.ParDo(sorted))

combined_sample_ids = (
sample_names
| 'GetSampleIds' >>
sample_mapping_table.GetSampleIds(
beam.pvalue.AsSingleton(name_to_id_hash_table))
| 'CombineSortedSampleIds' >> beam.combiners.ToList())
combined_sample_names = sample_names | beam.combiners.ToList()

_ = (combined_sample_names
| 'GenerateVcfDataHeader' >>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
##INFO=<ID=DB,Number=0,Type=Flag,Description="dbSNP membership, build 129">
##INFO=<ID=H2,Number=0,Type=Flag,Description="HapMap2 membership">
##INFO=<ID=SVTYPE,Number=1,Type=String,Description="Type of structural variant (with unïcodé)">
##FORMAT=<ID=GQ,Number=1,Type=Integer,Description="Genotype Quality">
##FORMAT=<ID=DP,Number=1,Type=Integer,Description="Read Depth">
##FORMAT=<ID=GQ,Number=1,Type=Integer,Description="Genotype Quality">
##FORMAT=<ID=HQ,Number=.,Type=Integer,Description="Haplotype Quality">
##FORMAT=<ID=GL,Number=.,Type=Integer,Description="Genotype Likelihood">
#CHROM POS ID REF ALT QUAL FILTER INFO FORMAT NA00001 NA00002 NA00003
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
##INFO=<ID=AA,Number=1,Type=String,Description="Ancestral Allele">
##INFO=<ID=DB,Number=0,Type=Flag,Description="dbSNP membership, build 129">
##INFO=<ID=H2,Number=0,Type=Flag,Description="HapMap2 membership">
##FORMAT=<ID=GQ,Number=1,Type=Integer,Description="Genotype Quality">
##FORMAT=<ID=DP,Number=1,Type=Integer,Description="Read Depth">
##FORMAT=<ID=GQ,Number=1,Type=Integer,Description="Genotype Quality">
##FORMAT=<ID=HQ,Number=2,Type=Integer,Description="Haplotype Quality">
#CHROM POS ID REF ALT QUAL FILTER INFO FORMAT NA00001 NA00003
19 1234567 microsat1 GTCT G,GTACT 50.0 PASS AA=G;NS=3;DP=9 GT:DP:GQ 0/1:4:35 1/1:3:40
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
##INFO=<ID=AA,Number=1,Type=String,Description="Ancestral Allele">
##INFO=<ID=DB,Number=0,Type=Flag,Description="dbSNP membership, build 129">
##INFO=<ID=H2,Number=0,Type=Flag,Description="HapMap2 membership">
##FORMAT=<ID=GQ,Number=1,Type=Integer,Description="Genotype Quality">
##FORMAT=<ID=DP,Number=1,Type=Integer,Description="Read Depth">
##FORMAT=<ID=GQ,Number=1,Type=Integer,Description="Genotype Quality">
##FORMAT=<ID=HQ,Number=2,Type=Integer,Description="Haplotype Quality">
#CHROM POS ID REF ALT QUAL FILTER INFO FORMAT NA00001 NA00002 NA00003 NA00004
19 1234567 microsat1;microsat2 GTCT G,GTACT 50.0 PASS AA=G;NS=2;DP=9 GT:DP:GQ 0/1:4:35 0/2:2:17 1/1:3:40 .:.:.
Expand Down

This file was deleted.

This file was deleted.

53 changes: 27 additions & 26 deletions gcp_variant_transforms/transforms/sample_mapping_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,66 +18,67 @@

import apache_beam as beam

SAMPLE_ID_COLUMN = 'sample_id'
SAMPLE_NAME_COLUMN = 'sample_name'
FILE_PATH_COLUMN = 'file_path'
WITH_FILE_SAMPLE_TEMPLATE = "{FILE_PATH}/{SAMPLE_NAME}"
from gcp_variant_transforms.libs import sample_info_table_schema_generator

SAMPLE_ID_COLUMN = sample_info_table_schema_generator.SAMPLE_ID
SAMPLE_NAME_COLUMN = sample_info_table_schema_generator.SAMPLE_NAME


class SampleIdToNameDict(beam.PTransform):
"""Transforms BigQuery table rows to PCollection of `Variant`."""
"""Generate Id-to-Name hashing table from sample info table."""

def _convert_bq_row(self, row):
def _extract_id_name(self, row):
sample_id = row[SAMPLE_ID_COLUMN]
sample_name = row[SAMPLE_NAME_COLUMN]
return (sample_id, sample_name)

def expand(self, pcoll):
return (pcoll
| 'BigQueryToMapping' >> beam.Map(self._convert_bq_row)
| 'ExtractIdNameTuples' >> beam.Map(self._extract_id_name)
| 'CombineToDict' >> beam.combiners.ToDict())


class SampleNameToIdDict(beam.PTransform):
"""Transforms BigQuery table rows to PCollection of `Variant`."""
"""Generate Name-to-ID hashing table from sample info table."""

def _convert_bq_row(self, row):
def _extract_id_name(self, row):
sample_id = row[SAMPLE_ID_COLUMN]
sample_name = row[SAMPLE_NAME_COLUMN]
return (sample_name, sample_id)

def expand(self, pcoll):
return (pcoll
| 'BigQueryToMapping' >> beam.Map(self._convert_bq_row)
| 'ExtractNameIdTuples' >> beam.Map(self._extract_id_name)
| 'CombineToDict' >> beam.combiners.ToDict())

class GetSampleNames(beam.PTransform):
"""Transforms sample_ids to sample_names"""
"""Looks up sample_names corresponding to the given sample_ids"""

def __init__(self, hash_table):
# type: (Dict[int, Tuple(str, str)]) -> None
self._hash_table = hash_table
def __init__(self, id_to_name_dict):
# type: (Dict[int, str]) -> None
self._id_to_name_dict = id_to_name_dict

def _get_sample_id(self, sample_id, hash_table):
# type: (int, Dict[int, Tuple(str, str)]) -> str
sample = hash_table[sample_id]
return sample
def _get_sample_name(self, sample_id, id_to_name_dict):
# type: (int, Dict[int, str]) -> str
if sample_id in id_to_name_dict:
return id_to_name_dict[sample_id]
raise ValueError('Sample ID `{}` was not found.'.format(sample_id))

def expand(self, pcoll):
return pcoll | beam.Map(self._get_sample_id, self._hash_table)
return pcoll | beam.Map(self._get_sample_name, self._id_to_name_dict)

class GetSampleIds(beam.PTransform):
"""Transform sample_names to sample_ids"""
"""Looks up sample_ids corresponding to the given sample_names"""

def __init__(self, hash_table):
def __init__(self, name_to_id_dict):
# type: (Dict[str, int)]) -> None
self._hash_table = hash_table
self._name_to_id_dict = name_to_id_dict

def _get_sample_name(self, sample_name, hash_table):
def _get_sample_id(self, sample_name, name_to_id_dict):
# type: (str, Dict[str, int]) -> int
if sample_name in hash_table:
return hash_table[sample_name]
if sample_name in name_to_id_dict:
return name_to_id_dict[sample_name]
raise ValueError('Sample `{}` was not found.'.format(sample_name))

def expand(self, pcoll):
return pcoll | beam.Map(self._get_sample_name, self._hash_table)
return pcoll | beam.Map(self._get_sample_id, self._name_to_id_dict)
12 changes: 5 additions & 7 deletions gcp_variant_transforms/transforms/sample_mapping_table_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,16 @@
from gcp_variant_transforms.transforms.sample_mapping_table import SAMPLE_NAME_COLUMN
from gcp_variant_transforms.transforms.sample_mapping_table import SampleIdToNameDict
from gcp_variant_transforms.transforms.sample_mapping_table import SampleNameToIdDict
from gcp_variant_transforms.transforms.sample_mapping_table import FILE_PATH_COLUMN



def _generate_bq_row(sample_id, sample_name, file_path):
def _generate_bq_row(sample_id, sample_name):
return {SAMPLE_ID_COLUMN: sample_id,
SAMPLE_NAME_COLUMN: sample_name,
FILE_PATH_COLUMN: file_path}
SAMPLE_NAME_COLUMN: sample_name}

BQ_ROWS = [_generate_bq_row(1, 'N01', 'file1'),
_generate_bq_row(2, 'N02', 'file2'),
_generate_bq_row(3, 'N03', 'file3')]
BQ_ROWS = [_generate_bq_row(1, 'N01'),
_generate_bq_row(2, 'N02'),
_generate_bq_row(3, 'N03')]

class SampleIdToNameDictTest(unittest.TestCase):
"""Test cases for the ``SampleTableToDict`` transform."""
Expand Down
2 changes: 1 addition & 1 deletion gcp_variant_transforms/vcf_to_bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ def _shard_variants(known_args, pipeline_args, pipeline_mode):
sample_ids = (variants
| 'CombineSampleIds' >>
combine_sample_ids.SampleIdsCombiner()
| beam.combiners.ToList())
| 'CombineToList' >> beam.combiners.ToList())
# TODO(tneymanov): Annotation pipeline currently stores sample IDs instead
# of sample names in the the sharded VCF files, which would lead to double
# hashing of samples. Needs to be fixed ASAP.
Expand Down

0 comments on commit 4b732c7

Please sign in to comment.