-
Notifications
You must be signed in to change notification settings - Fork 55
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a PoC disk_estimator feature to the vcf_to_bq preprocessor. #335
base: master
Are you sure you want to change the base?
Changes from 3 commits
8c6638f
4ccd72d
68c8dad
ba9ffeb
18c9807
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,235 @@ | ||
# Copyright 2018 Google Inc. All Rights Reserved. | ||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
|
||
"""A source for estimating the size of VCF files when processed by vcf_to_bq.""" | ||
|
||
from __future__ import absolute_import | ||
|
||
from typing import Iterable, List, Tuple # pylint: disable=unused-import | ||
import logging | ||
import itertools | ||
|
||
import apache_beam as beam | ||
from apache_beam import coders | ||
from apache_beam import transforms | ||
from apache_beam.io import filebasedsource | ||
from apache_beam.io import filesystem | ||
from apache_beam.io import filesystems | ||
from apache_beam.io import iobase | ||
from apache_beam.io import range_trackers # pylint: disable=unused-import | ||
|
||
from gcp_variant_transforms.beam_io import vcfio | ||
|
||
|
||
def _get_file_size(file_name): | ||
# type: (str) -> List[FileSizeInfo] | ||
matched_files = filesystems.FileSystems.match([file_name])[0].metadata_list | ||
if len(matched_files) != 1: | ||
raise IOError("File name {} did not correspond to exactly 1 result. " | ||
"Instead, got {} matches.".format(file_name, | ||
len(matched_files))) | ||
file_metadata = matched_files[0] | ||
|
||
compression_type = filesystem.CompressionTypes.detect_compression_type( | ||
file_metadata.path) | ||
if compression_type != filesystem.CompressionTypes.UNCOMPRESSED: | ||
logging.error("VCF file %s is compressed; disk requirement estimator " | ||
"will not be accurate.", file_metadata.path) | ||
return file_metadata.size_in_bytes | ||
|
||
|
||
def _convert_variants_to_bytesize(variant): | ||
# type: (vcfio.Variant) -> int | ||
return coders.registry.get_coder(vcfio.Variant).estimate_size(variant) | ||
|
||
|
||
class FileSizeInfo(object): | ||
def __init__(self, name, raw_file_size, encoded_file_size=None): | ||
# type: (str, int, int) -> None | ||
self.name = name | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did you use the name anywhere? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I use this in the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm sorry, I just cannot find where you are using it - in generate_report you call _append_disk_usage_estimate_to_report which seems to only be getting raw size and encoded size, no? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry this was my fault; I either cited some earlier revision incorrectly or misspoke/misread entirely The
|
||
self.raw_size = raw_file_size | ||
self.encoded_size = encoded_file_size # Optional, useful for SumFn. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd argue that the names for these variables should be same - either both raw_size/encoded_size or both raw_file_size/encoded_file_size. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
|
||
def estimate_encoded_file_size(self, raw_sample_size, encoded_sample_size): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if we are populating self.encoded_size, lets use the same name here (ie by dropping 'file_') or vice versa. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, we seem to be missing the types for the args. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This comment might be from an earlier revision; there are type annotations at line 63 for this method, are you referring to something else? |
||
# type: (int, int) -> None | ||
"""Estimates a VCF file's encoded (byte) size by analyzing sample Variants. | ||
|
||
Given the raw_file_size and measurements of several VCF lines from the file, | ||
estimate how much disk the file will take after expansion due to encoding | ||
lines as `vcfio.Variant` objects. The encoded_sample_size will be set as | ||
`self.encoded_size`. | ||
|
||
This is a simple ratio problem, solving for encoded_sample_size which is | ||
the only unknown: | ||
encoded_sample_size / raw_sample_size = encoded_file_size / raw_file_size | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Although I also like to be verbose, general approach on GVT codebase is to add less descriptive texts. This seems a bit too much. Since this method doesn't really use Variants, how about something like: """Given the sizes for the raw file, sample raw lines and sample encoded lines, estimate encoded file size.""" WDYT? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sg, thanks! done |
||
""" | ||
if raw_sample_size == 0: | ||
# Propagate in-band error state to avoid divide-by-zero. | ||
logging.warning("File %s appears to have no valid Variant lines. File " | ||
"will be ignored for size estimation.", self.name) | ||
self.encoded_size = 0 | ||
self.raw_size = 0 | ||
else: | ||
self.encoded_size = (self.raw_size * encoded_sample_size / | ||
raw_sample_size) | ||
|
||
|
||
class FileSizeInfoSumFn(beam.CombineFn): | ||
"""Combiner Function to sum up the size fields of FileSizeInfo objects. | ||
|
||
Unlike VariantsSizeInfoSumFn, the input is a PTable mapping str to | ||
FileSizeInfo, so the input is a tuple with the FileSizeInfos as the second | ||
field. The output strips out the str key which represents the file path. | ||
|
||
Example: [FileSizeInfo(a, b), FileSizeInfo(c, d)] -> FileSizeInfo(a+c, b+d) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Again, it seems that everything past first line in unnecessary. If you really want to include info about how the summing is done, how about """Combiner function, used to vector sum FileSizeInfo objects.""" or something of that sort? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sg, done |
||
""" | ||
def create_accumulator(self): | ||
# type: (None) -> Tuple[int, int] | ||
return (0, 0) # (raw, encoded) sums | ||
|
||
def add_input(self, (raw, encoded), file_size_info): | ||
# type: (Tuple[int, int], FileSizeInfo) -> Tuple[int, int] | ||
return raw + file_size_info.raw_size, encoded + file_size_info.encoded_size | ||
|
||
def merge_accumulators(self, accumulators): | ||
# type: (Iterable[Tuple[int, int]]) -> Tuple[int, int] | ||
raw, encoded = zip(*accumulators) | ||
return sum(raw), sum(encoded) | ||
|
||
def extract_output(self, (raw, encoded)): | ||
# type: (Tuple[int, int]) -> FileSizeInfo | ||
return FileSizeInfo("cumulative", raw, encoded) | ||
|
||
|
||
class _EstimateVcfSizeSource(filebasedsource.FileBasedSource): | ||
"""A source for estimating the encoded size of a VCF file in `vcf_to_bq`. | ||
|
||
This source first obtains the raw file sizes of a set of VCF files. Then, | ||
the source reads a limited number of variants from a set of VCF files, | ||
both as raw strings and encoded `Variant` objects. Finally, the reader | ||
returns a single `FileSizeInfo` object with an estimate of the input size | ||
if all sizes had been encoded as `Variant` objects. | ||
|
||
Lines that are malformed are skipped. | ||
|
||
Parses VCF files (version 4) using PyVCF library. | ||
""" | ||
|
||
DEFAULT_VCF_READ_BUFFER_SIZE = 65536 # 64kB | ||
|
||
def __init__(self, | ||
file_pattern, | ||
sample_size, | ||
compression_type=filesystem.CompressionTypes.AUTO, | ||
validate=True, | ||
vcf_parser_type=vcfio.VcfParserType.PYVCF): | ||
# type: (str, int, str, bool, vcfio.VcfParserType) -> None | ||
super(_EstimateVcfSizeSource, self).__init__( | ||
file_pattern, | ||
compression_type=compression_type, | ||
validate=validate, | ||
splittable=False) | ||
self._compression_type = compression_type | ||
self._sample_size = sample_size | ||
self._vcf_parser_type = vcf_parser_type | ||
|
||
def read_records( | ||
self, | ||
file_name, # type: str | ||
range_tracker # type: range_trackers.UnsplittableRangeTracker | ||
): | ||
# type: (...) -> Iterable[FileSizeInfo] | ||
"""This "generator" only emits a single FileSizeInfo object per file.""" | ||
vcf_parser_class = vcfio.get_vcf_parser(self._vcf_parser_type) | ||
record_iterator = vcf_parser_class( | ||
file_name, | ||
range_tracker, | ||
self._pattern, | ||
self._compression_type, | ||
allow_malformed_records=True, | ||
representative_header_lines=None, | ||
buffer_size=self.DEFAULT_VCF_READ_BUFFER_SIZE, | ||
skip_header_lines=0) | ||
|
||
raw_file_size = _get_file_size(file_name) | ||
|
||
# Open distinct channel to read lines as raw bytestrings. | ||
with filesystems.FileSystems.open(file_name, | ||
self._compression_type) as raw_iterator: | ||
count, raw_size, encoded_size = 0, 0, 0 | ||
for encoded_record, raw_record in itertools.izip(record_iterator, | ||
raw_iterator): | ||
while raw_record and raw_record.startswith('#'): | ||
# Skip headers. Assume that header size is negligible. | ||
raw_record = raw_iterator.next() | ||
logging.debug( | ||
"Reading record for disk usage estimation. Encoded variant: %s\n" | ||
"Raw variant: %s", encoded_record, raw_record) | ||
if count >= self._sample_size: | ||
break | ||
if not isinstance(encoded_record, vcfio.Variant): | ||
logging.error( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it for malformed variants? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What I understand is that we have two iterators here, one for the encoded variant, one for raw record. While we skip some encoded variants here, should be skip the raw record as well? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's correct; I was concerned about ill-formed lines but didn't want to crash the estimator, so I just decided to try skipping these (though if an enormous proportion of lines are malformed, then this will tend to make the tool overestimate size). I don't have a very specific idea of how often/why variants tend to be malformed though, and I'm am open to ideas here, especially around the condition that I'm checking for Oh this is a bug, I was intending to skip both raw and encoded; sorry about that, I'm rewriting this to use itertools which should be less errorprone |
||
"Skipping VCF line that could not be decoded as a " | ||
"`vcfio.Variant` in file %s: %s", file_name, raw_record) | ||
continue | ||
# Encoding in `utf-8` should represent the string as one byte per char, | ||
# even for non-ASCII chars. Python adds significant overhead to the | ||
# bytesize of the full str object. | ||
raw_size += len(raw_record.encode('utf-8')) | ||
encoded_size += _convert_variants_to_bytesize(encoded_record) | ||
count += 1 | ||
|
||
file_size_info = FileSizeInfo(file_name, raw_file_size) | ||
file_size_info.estimate_encoded_file_size(raw_size, encoded_size) | ||
yield file_size_info | ||
|
||
|
||
class EstimateVcfSize(transforms.PTransform): | ||
"""A PTransform for reading a limited number of lines from a set of VCF files. | ||
|
||
Output will be a PTable mapping from `file names -> Tuple[(line, Variant)]` | ||
objects. The list contains the first `sample_size` number of lines that are | ||
not malformed, first as a raw string and then encoded as a `Variant` class. | ||
|
||
Parses VCF files (version 4) using PyVCF library. | ||
""" | ||
|
||
def __init__( | ||
self, | ||
file_pattern, # type: str | ||
sample_size, # type: int | ||
compression_type=filesystem.CompressionTypes.AUTO, # type: str | ||
validate=True, # type: bool | ||
**kwargs # type: **str | ||
): | ||
# type: (...) -> None | ||
"""Initialize the :class:`ReadVcfHeaders` transform. | ||
|
||
Args: | ||
file_pattern: The file path to read from either as a single file or a glob | ||
pattern. | ||
sample_size: The number of lines that should be read from the file. | ||
compression_type: Used to handle compressed input files. | ||
Typical value is :attr:`CompressionTypes.AUTO | ||
<apache_beam.io.filesystem.CompressionTypes.AUTO>`, in which case the | ||
underlying file_path's extension will be used to detect the compression. | ||
validate: Flag to verify that the files exist during the pipeline creation | ||
time. | ||
""" | ||
super(EstimateVcfSize, self).__init__(**kwargs) | ||
self._source = _EstimateVcfSizeSource( | ||
file_pattern, sample_size, compression_type, validate=validate) | ||
|
||
def expand(self, pvalue): | ||
return pvalue.pipeline | iobase.Read(self._source) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need a separate method for this? Seems to only be used in 1 place and is a one-liner.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done