Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Google Cloud Storage Adapter #84

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
97 changes: 97 additions & 0 deletions flow/record/adapter/gcs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import logging
from fnmatch import fnmatch
from typing import Iterator, Union

from google.cloud.storage.client import Client
from google.cloud.storage.fileio import BlobReader, BlobWriter
MaxGroot marked this conversation as resolved.
Show resolved Hide resolved

from flow.record.adapter import AbstractReader, AbstractWriter
from flow.record.base import Record, RecordAdapter
from flow.record.selector import CompiledSelector, Selector

__usage__ = """
Google Cloud Storage adapter
---
Read usage: rdump gcs://[PROJECT-ID]:[BUCKET-ID]?path=[PATH]
MaxGroot marked this conversation as resolved.
Show resolved Hide resolved
[PROJECT-ID]: Google Cloud Project ID
[BUCKET-ID]: Bucket ID
[path]: Path to look for files, with support for glob-pattern matching

Write usage: rdump gcs://[PROJECT-ID]:[BUCKET-ID]?path=[PATH]
[PROJECT-ID]: Google Cloud Project ID
[BUCKET-ID]: Bucket ID
[path]: Path to write records to
"""

log = logging.getLogger(__name__)

GLOB_CHARACTERS = "*?[]"


class GcsReader(AbstractReader):
def __init__(self, uri: str, path: str, selector: Union[None, Selector, CompiledSelector] = None, **kwargs) -> None:
self.selector = selector
project_name = uri[: uri.find(":")]
bucket_name = uri[uri.find(":") + 1 :]
MaxGroot marked this conversation as resolved.
Show resolved Hide resolved

self.gcs = Client(project=project_name)
self.bucket = self.gcs.bucket(bucket_name)

# GCS Doesn't support iterating blobs using a glob pattern, so we have to do that ourselves.
# To split the path prefix from the glob-specific stuff, we have to find the first place where
# the glob starts. We'll then go through all files that match the path prefix before the glob,
# and do fnmatch ourselves to check whether any given blob matches with our glob.
lowest_pos = min([path.find(char) if path.find(char) >= 0 else float("inf") for char in GLOB_CHARACTERS])
MaxGroot marked this conversation as resolved.
Show resolved Hide resolved
if lowest_pos == float("inf"):
# No glob character was found
self.glob = None
self.prefix = path
else:
# Split the glob and the prefix
self.prefix = path[:lowest_pos]
self.glob = path

def __iter__(self) -> Iterator[Record]:
blobs = self.gcs.list_blobs(bucket_or_name=self.bucket, prefix=self.prefix)
for blob in blobs:
if blob.size == 0: # Skip empty files
continue
if self.glob and not fnmatch(blob.name, self.glob):
continue
blobreader = BlobReader(blob)

# Give the file-like object to RecordAdapter so it will select the right adapter by peeking into the stream
reader = RecordAdapter(fileobj=blobreader, out=False, selector=self.selector)
for record in reader:
yield record

def close(self) -> None:
self.gcs.close()


class GcsWriter(AbstractWriter):
def __init__(self, uri: str, path: str, **kwargs):
project_name = uri[: uri.find(":")]
bucket_name = uri[uri.find(":") + 1 :]
MaxGroot marked this conversation as resolved.
Show resolved Hide resolved
self.writer = None

self.gcs = Client(project=project_name)
self.bucket = self.gcs.bucket(bucket_name)

blob = self.bucket.blob(path)
self.writer = BlobWriter(blob, ignore_flush=True)
self.adapter = RecordAdapter(url=path, fileobj=self.writer, out=True, **kwargs)

def write(self, record: Record) -> None:
self.adapter.write(record)

def flush(self) -> None:
# https://cloud.google.com/python/docs/reference/storage/latest/google.cloud.storage.fileio.BlobWriter)
# Flushing without closing is not supported by the remote service and therefore calling it on this class
# normally results in io.UnsupportedOperation. However, that behavior is incompatible with some consumers and
# wrappers of fileobjects in Python.
pass

def close(self) -> None:
if self.writer:
self.writer.close()
MaxGroot marked this conversation as resolved.
Show resolved Hide resolved
4 changes: 2 additions & 2 deletions flow/record/adapter/jsonfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class JsonfileWriter(AbstractWriter):

def __init__(self, path, indent=None, descriptors=True, **kwargs):
self.descriptors = str(descriptors).lower() in ("true", "1")
self.fp = record.open_path(path, "w")
self.fp = record.open_file(path, "w")
if isinstance(indent, str):
indent = int(indent)
self.packer = JsonRecordPacker(indent=indent, pack_descriptors=self.descriptors)
Expand Down Expand Up @@ -55,7 +55,7 @@ class JsonfileReader(AbstractReader):

def __init__(self, path, selector=None, **kwargs):
self.selector = make_selector(selector)
self.fp = record.open_path(path, "r")
self.fp = record.open_file(path, "r")
self.packer = JsonRecordPacker()

def close(self):
Expand Down
4 changes: 2 additions & 2 deletions flow/record/adapter/line.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from flow.record import open_path
from flow.record import open_file
from flow.record.adapter import AbstractWriter
from flow.record.utils import is_stdout

Expand All @@ -16,7 +16,7 @@ class LineWriter(AbstractWriter):
fp = None

def __init__(self, path, fields=None, exclude=None, **kwargs):
self.fp = open_path(path, "wb")
self.fp = open_file(path, "wb")
self.count = 0
self.fields = fields
self.exclude = exclude
Expand Down
4 changes: 2 additions & 2 deletions flow/record/adapter/stream.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Iterator, Union

from flow.record import Record, RecordOutput, RecordStreamReader, open_file, open_path
from flow.record import Record, RecordOutput, RecordStreamReader, open_file
from flow.record.adapter import AbstractReader, AbstractWriter
from flow.record.selector import Selector
from flow.record.utils import is_stdout
Expand All @@ -19,7 +19,7 @@ class StreamWriter(AbstractWriter):
stream = None

def __init__(self, path: str, clobber=True, **kwargs):
self.fp = open_path(path, "wb", clobber=clobber)
self.fp = open_file(path, "wb", clobber=clobber)
self.stream = RecordOutput(self.fp)

def write(self, record: Record) -> None:
Expand Down
4 changes: 2 additions & 2 deletions flow/record/adapter/text.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from flow.record import open_path
from flow.record import open_file
from flow.record.adapter import AbstractWriter
from flow.record.utils import is_stdout

Expand Down Expand Up @@ -27,7 +27,7 @@ class TextWriter(AbstractWriter):
fp = None

def __init__(self, path, flush=True, format_spec=None, **kwargs):
self.fp = open_path(path, "wb")
self.fp = open_file(path, "wb")
self.auto_flush = flush
self.format_spec = format_spec

Expand Down
4 changes: 2 additions & 2 deletions flow/record/adapter/xlsx.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class XlsxWriter(AbstractWriter):
wb = None

def __init__(self, path, **kwargs):
self.fp = record.open_path(path, "wb")
self.fp = record.open_file(path, "wb")
self.wb = openpyxl.Workbook()
self.ws = self.wb.active
self.desc = None
Expand Down Expand Up @@ -51,7 +51,7 @@ class XlsxReader(AbstractReader):

def __init__(self, path, selector=None, **kwargs):
self.selector = make_selector(selector)
self.fp = record.open_path(path, "rb")
self.fp = record.open_file(path, "rb")
self.desc = None
self.wb = openpyxl.load_workbook(self.fp)
self.ws = self.wb.active
Expand Down
77 changes: 41 additions & 36 deletions flow/record/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,8 @@ def DynamicDescriptor(name, fields):


def open_stream(fp: BinaryIO, mode: str) -> BinaryIO:
if "w" in mode:
return fp
if not hasattr(fp, "peek"):
fp = io.BufferedReader(fp)

Expand Down Expand Up @@ -688,7 +690,7 @@ def open_file(path: Union[str, Path, BinaryIO], mode: str, clobber: bool = True)
if isinstance(path, str):
return open_path(path, mode, clobber)
elif isinstance(path, io.IOBase):
return open_stream(path, "rb")
return open_stream(path, mode)
else:
raise ValueError(f"Unsupported path type {path}")

Expand Down Expand Up @@ -798,41 +800,43 @@ def RecordAdapter(
cls_url = p.netloc + p.path
if sub_adapter:
cls_url = sub_adapter + "://" + cls_url
elif url in ("-", ""):
# For reading stdin, we cannot rely on an extension to know what sort of stream is incoming. Thus, we will treat
# it as a 'fileobj', where we can peek into the stream and try to select the appropriate adapter.
fileobj = getattr(sys.stdin, "buffer", sys.stdin)
if fileobj is not None:
# This record adapter has received a file-like object for record reading
# We just need to find the right adapter by peeking into the first few bytes.

# First, we open the stream. If the stream is compressed, open_stream will wrap it for us into a decompressor.
cls_stream = open_stream(fileobj, "rb")

# Now, we have a stream that will be transparently decompressed but we still do not know what adapter to use.
# This requires a new peek into the transparent stream. This peek will cause the stream pointer to be moved.
# Therefore, find_adapter_for_stream returns both a BinaryIO-supportive object that can correctly read the
# adjusted stream, and a string indicating the type of adapter to be used on said stream.
arg_dict = kwargs.copy()

# If a user did not provide a url, we have to peek into the stream to be able to determine the right adapter
# based on magic bytes encountered in the first few bytes of the stream.
if adapter is None:
cls_stream, adapter = find_adapter_for_stream(cls_stream)
if out is False:
if url in ("-", ""):
# For reading stdin, we cannot rely on an extension to know what sort of stream is incoming. Thus, we will
# treat it as a 'fileobj', where we can peek into the stream and try to select the appropriate adapter.
fileobj = getattr(sys.stdin, "buffer", sys.stdin)
if fileobj is not None:
# This record adapter has received a file-like object for record reading
# We just need to find the right adapter by peeking into the first few bytes.

# First, we open the stream. If the stream is compressed, open_stream will wrap it for us into a
# decompressor.
cls_stream = open_stream(fileobj, "rb")

# Now, we have a stream that will be transparently decompressed but we still do not know what adapter to
# use. This requires a new peek into the transparent stream. This peek will cause the stream pointer to be
# moved. Therefore, find_adapter_for_stream returns both a BinaryIO-supportive object that can correctly
# read the adjusted stream, and a string indicating the type of adapter to be used on said stream.
arg_dict = kwargs.copy()

# If a user did not provide a url, we have to peek into the stream to be able to determine the right adapter
# based on magic bytes encountered in the first few bytes of the stream.
if adapter is None:
peek_data = cls_stream.peek(RECORDSTREAM_MAGIC_DEPTH)
if peek_data and peek_data.startswith(b"<"):
# As peek() can result in a larger buffer than requested, we make sure the peek_data variable isn't
# unnecessarily long in the error message.
peek_data = peek_data[:RECORDSTREAM_MAGIC_DEPTH]
raise RecordAdapterNotFound(
(
f"Could not find a reader for input {peek_data!r}. Are you perhaps "
"entering record text, rather than a record stream? This can be fixed by using "
"'rdump -w -' to write a record stream to stdout."
cls_stream, adapter = find_adapter_for_stream(cls_stream)
if adapter is None:
peek_data = cls_stream.peek(RECORDSTREAM_MAGIC_DEPTH)
if peek_data and peek_data.startswith(b"<"):
# As peek() can result in a larger buffer than requested, we make sure the peek_data variable
# isn't unnecessarily long in the error message.
peek_data = peek_data[:RECORDSTREAM_MAGIC_DEPTH]
raise RecordAdapterNotFound(
(
f"Could not find a reader for input {peek_data!r}. Are you perhaps "
"entering record text, rather than a record stream? This can be fixed by using "
"'rdump -w -' to write a record stream to stdout."
)
)
)
raise RecordAdapterNotFound("Could not find adapter for file-like object")
raise RecordAdapterNotFound("Could not find adapter for file-like object")

# Now that we know which adapter is needed, we import it.
mod = importlib.import_module("flow.record.adapter.{}".format(adapter))
Expand All @@ -845,9 +849,10 @@ def RecordAdapter(
if out:
arg_dict["clobber"] = clobber
log.debug("Creating {!r} for {!r} with args {!r}".format(cls, url, arg_dict))

if fileobj is not None:
if cls_stream is not None:
return cls(cls_stream, **arg_dict)
if fileobj is not None:
return cls(fileobj, **arg_dict)
return cls(cls_url, **arg_dict)


Expand Down
Loading