Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a PoC disk_estimator feature to the vcf_to_bq preprocessor. #335

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 235 additions & 0 deletions gcp_variant_transforms/beam_io/vcf_file_size_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
# Copyright 2018 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""A source for estimating the size of VCF files when processed by vcf_to_bq."""

from __future__ import absolute_import

from typing import Iterable, List, Tuple # pylint: disable=unused-import
import logging
import itertools

import apache_beam as beam
from apache_beam import coders
from apache_beam import transforms
from apache_beam.io import filebasedsource
from apache_beam.io import filesystem
from apache_beam.io import filesystems
from apache_beam.io import iobase
from apache_beam.io import range_trackers # pylint: disable=unused-import

from gcp_variant_transforms.beam_io import vcfio


def _get_file_size(file_name):
# type: (str) -> List[FileSizeInfo]
matched_files = filesystems.FileSystems.match([file_name])[0].metadata_list
if len(matched_files) != 1:
raise IOError("File name {} did not correspond to exactly 1 result. "
"Instead, got {} matches.".format(file_name,
len(matched_files)))
file_metadata = matched_files[0]

compression_type = filesystem.CompressionTypes.detect_compression_type(
file_metadata.path)
if compression_type != filesystem.CompressionTypes.UNCOMPRESSED:
logging.error("VCF file %s is compressed; disk requirement estimator "
"will not be accurate.", file_metadata.path)
return file_metadata.size_in_bytes


def _convert_variants_to_bytesize(variant):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need a separate method for this? Seems to only be used in 1 place and is a one-liner.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

# type: (vcfio.Variant) -> int
return coders.registry.get_coder(vcfio.Variant).estimate_size(variant)


class FileSizeInfo(object):
def __init__(self, name, raw_file_size, encoded_file_size=None):
# type: (str, int, int) -> None
self.name = name
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you use the name anywhere?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I use this in the generate_report module as well; WDYT?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry, I just cannot find where you are using it - in generate_report you call _append_disk_usage_estimate_to_report which seems to only be getting raw size and encoded size, no?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry this was my fault; I either cited some earlier revision incorrectly or misspoke/misread entirely

The self.name is only consumed when there's some issue reading the file in estimate_encoded_file_size so that a more descriptive message can be logged:

      logging.warning("File %s appears to have no valid Variant lines. File "
                      "will be ignored for size estimation.", self.name)

self.raw_size = raw_file_size
self.encoded_size = encoded_file_size # Optional, useful for SumFn.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd argue that the names for these variables should be same - either both raw_size/encoded_size or both raw_file_size/encoded_file_size.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


def estimate_encoded_file_size(self, raw_sample_size, encoded_sample_size):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we are populating self.encoded_size, lets use the same name here (ie by dropping 'file_') or vice versa.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, we seem to be missing the types for the args.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment might be from an earlier revision; there are type annotations at line 63 for this method, are you referring to something else?

# type: (int, int) -> None
"""Estimates a VCF file's encoded (byte) size by analyzing sample Variants.

Given the raw_file_size and measurements of several VCF lines from the file,
estimate how much disk the file will take after expansion due to encoding
lines as `vcfio.Variant` objects. The encoded_sample_size will be set as
`self.encoded_size`.

This is a simple ratio problem, solving for encoded_sample_size which is
the only unknown:
encoded_sample_size / raw_sample_size = encoded_file_size / raw_file_size
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although I also like to be verbose, general approach on GVT codebase is to add less descriptive texts.

This seems a bit too much. Since this method doesn't really use Variants, how about something like:

"""Given the sizes for the raw file, sample raw lines and sample encoded lines, estimate encoded file size."""

WDYT?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sg, thanks! done

"""
if raw_sample_size == 0:
# Propagate in-band error state to avoid divide-by-zero.
logging.warning("File %s appears to have no valid Variant lines. File "
"will be ignored for size estimation.", self.name)
self.encoded_size = 0
self.raw_size = 0
else:
self.encoded_size = (self.raw_size * encoded_sample_size /
raw_sample_size)


class FileSizeInfoSumFn(beam.CombineFn):
"""Combiner Function to sum up the size fields of FileSizeInfo objects.

Unlike VariantsSizeInfoSumFn, the input is a PTable mapping str to
FileSizeInfo, so the input is a tuple with the FileSizeInfos as the second
field. The output strips out the str key which represents the file path.

Example: [FileSizeInfo(a, b), FileSizeInfo(c, d)] -> FileSizeInfo(a+c, b+d)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, it seems that everything past first line in unnecessary. If you really want to include info about how the summing is done, how about

"""Combiner function, used to vector sum FileSizeInfo objects.""" or something of that sort?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sg, done

"""
def create_accumulator(self):
# type: (None) -> Tuple[int, int]
return (0, 0) # (raw, encoded) sums

def add_input(self, (raw, encoded), file_size_info):
# type: (Tuple[int, int], FileSizeInfo) -> Tuple[int, int]
return raw + file_size_info.raw_size, encoded + file_size_info.encoded_size

def merge_accumulators(self, accumulators):
# type: (Iterable[Tuple[int, int]]) -> Tuple[int, int]
raw, encoded = zip(*accumulators)
return sum(raw), sum(encoded)

def extract_output(self, (raw, encoded)):
# type: (Tuple[int, int]) -> FileSizeInfo
return FileSizeInfo("cumulative", raw, encoded)


class _EstimateVcfSizeSource(filebasedsource.FileBasedSource):
"""A source for estimating the encoded size of a VCF file in `vcf_to_bq`.

This source first obtains the raw file sizes of a set of VCF files. Then,
the source reads a limited number of variants from a set of VCF files,
both as raw strings and encoded `Variant` objects. Finally, the reader
returns a single `FileSizeInfo` object with an estimate of the input size
if all sizes had been encoded as `Variant` objects.

Lines that are malformed are skipped.

Parses VCF files (version 4) using PyVCF library.
"""

DEFAULT_VCF_READ_BUFFER_SIZE = 65536 # 64kB

def __init__(self,
file_pattern,
sample_size,
compression_type=filesystem.CompressionTypes.AUTO,
validate=True,
vcf_parser_type=vcfio.VcfParserType.PYVCF):
# type: (str, int, str, bool, vcfio.VcfParserType) -> None
super(_EstimateVcfSizeSource, self).__init__(
file_pattern,
compression_type=compression_type,
validate=validate,
splittable=False)
self._compression_type = compression_type
self._sample_size = sample_size
self._vcf_parser_type = vcf_parser_type

def read_records(
self,
file_name, # type: str
range_tracker # type: range_trackers.UnsplittableRangeTracker
):
# type: (...) -> Iterable[FileSizeInfo]
"""This "generator" only emits a single FileSizeInfo object per file."""
vcf_parser_class = vcfio.get_vcf_parser(self._vcf_parser_type)
record_iterator = vcf_parser_class(
file_name,
range_tracker,
self._pattern,
self._compression_type,
allow_malformed_records=True,
representative_header_lines=None,
buffer_size=self.DEFAULT_VCF_READ_BUFFER_SIZE,
skip_header_lines=0)

raw_file_size = _get_file_size(file_name)

# Open distinct channel to read lines as raw bytestrings.
with filesystems.FileSystems.open(file_name,
self._compression_type) as raw_iterator:
count, raw_size, encoded_size = 0, 0, 0
for encoded_record, raw_record in itertools.izip(record_iterator,
raw_iterator):
while raw_record and raw_record.startswith('#'):
# Skip headers. Assume that header size is negligible.
raw_record = raw_iterator.next()
logging.debug(
"Reading record for disk usage estimation. Encoded variant: %s\n"
"Raw variant: %s", encoded_record, raw_record)
if count >= self._sample_size:
break
if not isinstance(encoded_record, vcfio.Variant):
logging.error(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it for malformed variants?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I understand is that we have two iterators here, one for the encoded variant, one for raw record. While we skip some encoded variants here, should be skip the raw record as well?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct; I was concerned about ill-formed lines but didn't want to crash the estimator, so I just decided to try skipping these (though if an enormous proportion of lines are malformed, then this will tend to make the tool overestimate size). I don't have a very specific idea of how often/why variants tend to be malformed though, and I'm am open to ideas here, especially around the condition that I'm checking for

Oh this is a bug, I was intending to skip both raw and encoded; sorry about that, I'm rewriting this to use itertools which should be less errorprone

"Skipping VCF line that could not be decoded as a "
"`vcfio.Variant` in file %s: %s", file_name, raw_record)
continue
# Encoding in `utf-8` should represent the string as one byte per char,
# even for non-ASCII chars. Python adds significant overhead to the
# bytesize of the full str object.
raw_size += len(raw_record.encode('utf-8'))
encoded_size += _convert_variants_to_bytesize(encoded_record)
count += 1

file_size_info = FileSizeInfo(file_name, raw_file_size)
file_size_info.estimate_encoded_file_size(raw_size, encoded_size)
yield file_size_info


class EstimateVcfSize(transforms.PTransform):
"""A PTransform for reading a limited number of lines from a set of VCF files.

Output will be a PTable mapping from `file names -> Tuple[(line, Variant)]`
objects. The list contains the first `sample_size` number of lines that are
not malformed, first as a raw string and then encoded as a `Variant` class.

Parses VCF files (version 4) using PyVCF library.
"""

def __init__(
self,
file_pattern, # type: str
sample_size, # type: int
compression_type=filesystem.CompressionTypes.AUTO, # type: str
validate=True, # type: bool
**kwargs # type: **str
):
# type: (...) -> None
"""Initialize the :class:`ReadVcfHeaders` transform.

Args:
file_pattern: The file path to read from either as a single file or a glob
pattern.
sample_size: The number of lines that should be read from the file.
compression_type: Used to handle compressed input files.
Typical value is :attr:`CompressionTypes.AUTO
<apache_beam.io.filesystem.CompressionTypes.AUTO>`, in which case the
underlying file_path's extension will be used to detect the compression.
validate: Flag to verify that the files exist during the pipeline creation
time.
"""
super(EstimateVcfSize, self).__init__(**kwargs)
self._source = _EstimateVcfSizeSource(
file_pattern, sample_size, compression_type, validate=validate)

def expand(self, pvalue):
return pvalue.pipeline | iobase.Read(self._source)
Empty file.
48 changes: 26 additions & 22 deletions gcp_variant_transforms/beam_io/vcfio.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@

from __future__ import absolute_import

from typing import Any, Iterable, List, Tuple # pylint: disable=unused-import
from typing import Any, Iterable, List, Tuple, Type # pylint: disable=unused-import
from functools import partial
import enum

import apache_beam as beam
from apache_beam import transforms
from apache_beam.coders import coders
from apache_beam.io import filebasedsource
from apache_beam.io import filesystem
from apache_beam.io import filesystems
from apache_beam.io import iobase
from apache_beam.io import range_trackers # pylint: disable=unused-import
from apache_beam.io import textio
from apache_beam.io.filesystem import CompressionTypes
from apache_beam.io.iobase import Read
from apache_beam.transforms import PTransform

from gcp_variant_transforms.beam_io import vcf_parser

Expand All @@ -55,6 +55,16 @@ class VcfParserType(enum.Enum):
PYVCF = 0
NUCLEUS = 1

def get_vcf_parser(vcf_parser_type):
# type: (VcfParserType) -> Type[vcf_parser.VcfParser]
if vcf_parser_type == VcfParserType.PYVCF:
return vcf_parser.PyVcfParser
elif vcf_parser_type == VcfParserType.NUCLEUS:
return vcf_parser.NucleusParser
else:
raise ValueError(
'Unrecognized _vcf_parser_type: %s.' % str(vcf_parser_type))


class _ToVcfRecordCoder(coders.Coder):
"""Coder for encoding :class:`Variant` objects as VCF text lines."""
Expand Down Expand Up @@ -192,7 +202,7 @@ class _VcfSource(filebasedsource.FileBasedSource):
def __init__(self,
file_pattern, # type: str
representative_header_lines=None, # type: List[str]
compression_type=CompressionTypes.AUTO, # type: str
compression_type=filesystem.CompressionTypes.AUTO, # type: str
buffer_size=DEFAULT_VCF_READ_BUFFER_SIZE, # type: int
validate=True, # type: bool
allow_malformed_records=False, # type: bool
Expand All @@ -213,14 +223,7 @@ def read_records(self,
range_tracker # type: range_trackers.OffsetRangeTracker
):
# type: (...) -> Iterable[MalformedVcfRecord]
vcf_parser_class = None
if self._vcf_parser_type == VcfParserType.PYVCF:
vcf_parser_class = vcf_parser.PyVcfParser
elif self._vcf_parser_type == VcfParserType.NUCLEUS:
vcf_parser_class = vcf_parser.NucleusParser
else:
raise ValueError(
'Unrecognized _vcf_parser_type: %s.' % str(self._vcf_parser_type))
vcf_parser_class = get_vcf_parser(self._vcf_parser_type)
record_iterator = vcf_parser_class(
file_name,
range_tracker,
Expand All @@ -235,7 +238,8 @@ def read_records(self,
for record in record_iterator:
yield record

class ReadFromVcf(PTransform):

class ReadFromVcf(transforms.PTransform):
"""A :class:`~apache_beam.transforms.ptransform.PTransform` for reading VCF
files.

Expand All @@ -249,7 +253,7 @@ def __init__(
self,
file_pattern=None, # type: str
representative_header_lines=None, # type: List[str]
compression_type=CompressionTypes.AUTO, # type: str
compression_type=filesystem.CompressionTypes.AUTO, # type: str
validate=True, # type: bool
allow_malformed_records=False, # type: bool
vcf_parser_type=VcfParserType.PYVCF, # type: int
Expand Down Expand Up @@ -280,7 +284,7 @@ def __init__(
vcf_parser_type=vcf_parser_type)

def expand(self, pvalue):
return pvalue.pipeline | Read(self._source)
return pvalue.pipeline | iobase.Read(self._source)


def _create_vcf_source(
Expand All @@ -292,7 +296,7 @@ def _create_vcf_source(
allow_malformed_records=allow_malformed_records)


class ReadAllFromVcf(PTransform):
class ReadAllFromVcf(transforms.PTransform):
"""A :class:`~apache_beam.transforms.ptransform.PTransform` for reading a
:class:`~apache_beam.pvalue.PCollection` of VCF files.

Expand All @@ -310,7 +314,7 @@ def __init__(
self,
representative_header_lines=None, # type: List[str]
desired_bundle_size=DEFAULT_DESIRED_BUNDLE_SIZE, # type: int
compression_type=CompressionTypes.AUTO, # type: str
compression_type=filesystem.CompressionTypes.AUTO, # type: str
allow_malformed_records=False, # type: bool
**kwargs # type: **str
):
Expand Down Expand Up @@ -339,21 +343,21 @@ def __init__(
allow_malformed_records=allow_malformed_records)
self._read_all_files = filebasedsource.ReadAllFiles(
True, # splittable
CompressionTypes.AUTO, desired_bundle_size,
filesystem.CompressionTypes.AUTO, desired_bundle_size,
0, # min_bundle_size
source_from_file)

def expand(self, pvalue):
return pvalue | 'ReadAllFiles' >> self._read_all_files


class WriteToVcf(PTransform):
class WriteToVcf(transforms.PTransform):
"""A PTransform for writing to VCF files."""

def __init__(self,
file_path,
num_shards=1,
compression_type=CompressionTypes.AUTO,
compression_type=filesystem.CompressionTypes.AUTO,
headers=None):
# type: (str, int, str, List[str]) -> None
"""Initialize a WriteToVcf PTransform.
Expand Down Expand Up @@ -404,7 +408,7 @@ def process(self, (file_path, variants), *args, **kwargs):
file_to_write.write(self._coder.encode(variant))


class WriteVcfDataLines(PTransform):
class WriteVcfDataLines(transforms.PTransform):
"""A PTransform for writing VCF data lines.

This PTransform takes PCollection<`file_path`, `variants`> as input, and
Expand Down
Loading