Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Google Cloud Storage Adapter #84

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
Open
Next Next commit
Support file-like objects in record writer
  • Loading branch information
MaxGroot committed Sep 14, 2023
commit 5b6360e0d18fc93692cbc895546f547529d27822
4 changes: 2 additions & 2 deletions flow/record/adapter/jsonfile.py
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@ class JsonfileWriter(AbstractWriter):

def __init__(self, path, indent=None, descriptors=True, **kwargs):
self.descriptors = str(descriptors).lower() in ("true", "1")
self.fp = record.open_path(path, "w")
self.fp = record.open_file(path, "w")
if isinstance(indent, str):
indent = int(indent)
self.packer = JsonRecordPacker(indent=indent, pack_descriptors=self.descriptors)
@@ -55,7 +55,7 @@ class JsonfileReader(AbstractReader):

def __init__(self, path, selector=None, **kwargs):
self.selector = make_selector(selector)
self.fp = record.open_path(path, "r")
self.fp = record.open_file(path, "r")
self.packer = JsonRecordPacker()

def close(self):
4 changes: 2 additions & 2 deletions flow/record/adapter/line.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from flow.record import open_path
from flow.record import open_file
from flow.record.adapter import AbstractWriter
from flow.record.utils import is_stdout

@@ -16,7 +16,7 @@ class LineWriter(AbstractWriter):
fp = None

def __init__(self, path, fields=None, exclude=None, **kwargs):
self.fp = open_path(path, "wb")
self.fp = open_file(path, "wb")
self.count = 0
self.fields = fields
self.exclude = exclude
4 changes: 2 additions & 2 deletions flow/record/adapter/stream.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Iterator, Union

from flow.record import Record, RecordOutput, RecordStreamReader, open_file, open_path
from flow.record import Record, RecordOutput, RecordStreamReader, open_file
from flow.record.adapter import AbstractReader, AbstractWriter
from flow.record.selector import Selector
from flow.record.utils import is_stdout
@@ -19,7 +19,7 @@ class StreamWriter(AbstractWriter):
stream = None

def __init__(self, path: str, clobber=True, **kwargs):
self.fp = open_path(path, "wb", clobber=clobber)
self.fp = open_file(path, "wb", clobber=clobber)
self.stream = RecordOutput(self.fp)

def write(self, record: Record) -> None:
4 changes: 2 additions & 2 deletions flow/record/adapter/text.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from flow.record import open_path
from flow.record import open_file
from flow.record.adapter import AbstractWriter
from flow.record.utils import is_stdout

@@ -27,7 +27,7 @@ class TextWriter(AbstractWriter):
fp = None

def __init__(self, path, flush=True, format_spec=None, **kwargs):
self.fp = open_path(path, "wb")
self.fp = open_file(path, "wb")
self.auto_flush = flush
self.format_spec = format_spec

4 changes: 2 additions & 2 deletions flow/record/adapter/xlsx.py
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@ class XlsxWriter(AbstractWriter):
wb = None

def __init__(self, path, **kwargs):
self.fp = record.open_path(path, "wb")
self.fp = record.open_file(path, "wb")
self.wb = openpyxl.Workbook()
self.ws = self.wb.active
self.desc = None
@@ -51,7 +51,7 @@ class XlsxReader(AbstractReader):

def __init__(self, path, selector=None, **kwargs):
self.selector = make_selector(selector)
self.fp = record.open_path(path, "rb")
self.fp = record.open_file(path, "rb")
self.desc = None
self.wb = openpyxl.load_workbook(self.fp)
self.ws = self.wb.active
77 changes: 41 additions & 36 deletions flow/record/base.py
Original file line number Diff line number Diff line change
@@ -645,6 +645,8 @@ def DynamicDescriptor(name, fields):


def open_stream(fp: BinaryIO, mode: str) -> BinaryIO:
if "w" in mode:
return fp
if not hasattr(fp, "peek"):
fp = io.BufferedReader(fp)

@@ -688,7 +690,7 @@ def open_file(path: Union[str, Path, BinaryIO], mode: str, clobber: bool = True)
if isinstance(path, str):
return open_path(path, mode, clobber)
elif isinstance(path, io.IOBase):
return open_stream(path, "rb")
return open_stream(path, mode)
else:
raise ValueError(f"Unsupported path type {path}")

@@ -798,41 +800,43 @@ def RecordAdapter(
cls_url = p.netloc + p.path
if sub_adapter:
cls_url = sub_adapter + "://" + cls_url
elif url in ("-", ""):
# For reading stdin, we cannot rely on an extension to know what sort of stream is incoming. Thus, we will treat
# it as a 'fileobj', where we can peek into the stream and try to select the appropriate adapter.
fileobj = getattr(sys.stdin, "buffer", sys.stdin)
if fileobj is not None:
# This record adapter has received a file-like object for record reading
# We just need to find the right adapter by peeking into the first few bytes.

# First, we open the stream. If the stream is compressed, open_stream will wrap it for us into a decompressor.
cls_stream = open_stream(fileobj, "rb")

# Now, we have a stream that will be transparently decompressed but we still do not know what adapter to use.
# This requires a new peek into the transparent stream. This peek will cause the stream pointer to be moved.
# Therefore, find_adapter_for_stream returns both a BinaryIO-supportive object that can correctly read the
# adjusted stream, and a string indicating the type of adapter to be used on said stream.
arg_dict = kwargs.copy()

# If a user did not provide a url, we have to peek into the stream to be able to determine the right adapter
# based on magic bytes encountered in the first few bytes of the stream.
if adapter is None:
cls_stream, adapter = find_adapter_for_stream(cls_stream)
if out is False:
if url in ("-", ""):
# For reading stdin, we cannot rely on an extension to know what sort of stream is incoming. Thus, we will
# treat it as a 'fileobj', where we can peek into the stream and try to select the appropriate adapter.
fileobj = getattr(sys.stdin, "buffer", sys.stdin)
if fileobj is not None:
# This record adapter has received a file-like object for record reading
# We just need to find the right adapter by peeking into the first few bytes.

# First, we open the stream. If the stream is compressed, open_stream will wrap it for us into a
# decompressor.
cls_stream = open_stream(fileobj, "rb")

# Now, we have a stream that will be transparently decompressed but we still do not know what adapter to
# use. This requires a new peek into the transparent stream. This peek will cause the stream pointer to be
# moved. Therefore, find_adapter_for_stream returns both a BinaryIO-supportive object that can correctly
# read the adjusted stream, and a string indicating the type of adapter to be used on said stream.
arg_dict = kwargs.copy()

# If a user did not provide a url, we have to peek into the stream to be able to determine the right adapter
# based on magic bytes encountered in the first few bytes of the stream.
if adapter is None:
peek_data = cls_stream.peek(RECORDSTREAM_MAGIC_DEPTH)
if peek_data and peek_data.startswith(b"<"):
# As peek() can result in a larger buffer than requested, we make sure the peek_data variable isn't
# unnecessarily long in the error message.
peek_data = peek_data[:RECORDSTREAM_MAGIC_DEPTH]
raise RecordAdapterNotFound(
(
f"Could not find a reader for input {peek_data!r}. Are you perhaps "
"entering record text, rather than a record stream? This can be fixed by using "
"'rdump -w -' to write a record stream to stdout."
cls_stream, adapter = find_adapter_for_stream(cls_stream)
if adapter is None:
peek_data = cls_stream.peek(RECORDSTREAM_MAGIC_DEPTH)
if peek_data and peek_data.startswith(b"<"):
# As peek() can result in a larger buffer than requested, we make sure the peek_data variable
# isn't unnecessarily long in the error message.
peek_data = peek_data[:RECORDSTREAM_MAGIC_DEPTH]
raise RecordAdapterNotFound(
(
f"Could not find a reader for input {peek_data!r}. Are you perhaps "
"entering record text, rather than a record stream? This can be fixed by using "
"'rdump -w -' to write a record stream to stdout."
)
)
)
raise RecordAdapterNotFound("Could not find adapter for file-like object")
raise RecordAdapterNotFound("Could not find adapter for file-like object")

# Now that we know which adapter is needed, we import it.
mod = importlib.import_module("flow.record.adapter.{}".format(adapter))
@@ -845,9 +849,10 @@ def RecordAdapter(
if out:
arg_dict["clobber"] = clobber
log.debug("Creating {!r} for {!r} with args {!r}".format(cls, url, arg_dict))

if fileobj is not None:
if cls_stream is not None:
return cls(cls_stream, **arg_dict)
if fileobj is not None:
return cls(fileobj, **arg_dict)
return cls(cls_url, **arg_dict)


41 changes: 33 additions & 8 deletions tests/test_record_adapter.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
import datetime
import platform
import sys
from io import BytesIO

import pytest

try:
from StringIO import StringIO
except ImportError:
from io import BytesIO as StringIO

from flow.record import (
PathTemplateWriter,
RecordAdapter,
RecordArchiver,
RecordDescriptor,
RecordOutput,
RecordReader,
RecordStreamReader,
RecordWriter,
)
from flow.record.adapter.stream import StreamReader
from flow.record.adapter.stream import StreamReader, StreamWriter
from flow.record.base import (
BZ2_MAGIC,
GZIP_MAGIC,
@@ -33,7 +30,7 @@


def test_stream_writer_reader():
fp = StringIO()
fp = BytesIO()
out = RecordOutput(fp)
for rec in generate_records():
out.write(rec)
@@ -48,7 +45,7 @@ def test_stream_writer_reader():


def test_recordstream_filelike_object():
fp = StringIO()
fp = BytesIO()
out = RecordOutput(fp)
for rec in generate_records():
out.write(rec)
@@ -477,3 +474,31 @@ def test_csvfilereader(tmp_path):
with RecordReader(f"csvfile://{path}", selector="r.count == '2'") as reader:
for i, rec in enumerate(reader):
assert rec.count == "2"


def test_gcs_writer() -> None:
test_buf = BytesIO()

adapter = RecordAdapter(fileobj=test_buf, out=True)

assert isinstance(adapter, StreamWriter)

# Add mock records
test_records = list(generate_records(10))
for record in test_records:
adapter.write(record)

adapter.flush()

# Grab the bytes before closing the BytesIO object.
read_buf = BytesIO(test_buf.getvalue())

# Close the writer and assure the object has been closed
adapter.close()

# Verify if the written record stream is something we can read
reader = RecordAdapter(fileobj=read_buf)
read_records = list(reader)
assert len(read_records) == 10
for idx, record in enumerate(read_records):
assert record == test_records[idx]