Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support file-like objects in record writer #83

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions flow/record/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
dynamic_fieldtype,
extend_record,
iter_timestamped_records,
open_file,
open_path,
open_path_or_stream,
open_stream,
stream,
)
Expand Down Expand Up @@ -51,7 +51,7 @@
"JsonRecordPacker",
"RecordStreamWriter",
"RecordStreamReader",
"open_file",
"open_path_or_stream",
"open_path",
"open_stream",
"stream",
Expand Down
4 changes: 2 additions & 2 deletions flow/record/adapter/avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class AvroWriter(AbstractWriter):
writer = None

def __init__(self, path, key=None, **kwargs):
self.fp = record.open_file(path, "wb")
self.fp = record.open_path_or_stream(path, "wb")

self.desc = None
self.schema = None
Expand Down Expand Up @@ -93,7 +93,7 @@ class AvroReader(AbstractReader):
fp = None

def __init__(self, path, selector=None, **kwargs):
self.fp = record.open_file(path, "rb")
self.fp = record.open_path_or_stream(path, "rb")
self.selector = make_selector(selector)

self.reader = fastavro.reader(self.fp)
Expand Down
4 changes: 2 additions & 2 deletions flow/record/adapter/jsonfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class JsonfileWriter(AbstractWriter):

def __init__(self, path, indent=None, descriptors=True, **kwargs):
self.descriptors = str(descriptors).lower() in ("true", "1")
self.fp = record.open_path(path, "w")
self.fp = record.open_path_or_stream(path, "w")
if isinstance(indent, str):
indent = int(indent)
self.packer = JsonRecordPacker(indent=indent, pack_descriptors=self.descriptors)
Expand Down Expand Up @@ -55,7 +55,7 @@ class JsonfileReader(AbstractReader):

def __init__(self, path, selector=None, **kwargs):
self.selector = make_selector(selector)
self.fp = record.open_path(path, "r")
self.fp = record.open_path_or_stream(path, "r")
self.packer = JsonRecordPacker()

def close(self):
Expand Down
4 changes: 2 additions & 2 deletions flow/record/adapter/line.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from flow.record import open_path
from flow.record import open_path_or_stream
from flow.record.adapter import AbstractWriter
from flow.record.utils import is_stdout

Expand All @@ -16,7 +16,7 @@ class LineWriter(AbstractWriter):
fp = None

def __init__(self, path, fields=None, exclude=None, **kwargs):
self.fp = open_path(path, "wb")
self.fp = open_path_or_stream(path, "wb")
self.count = 0
self.fields = fields
self.exclude = exclude
Expand Down
6 changes: 3 additions & 3 deletions flow/record/adapter/stream.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Iterator, Union

from flow.record import Record, RecordOutput, RecordStreamReader, open_file, open_path
from flow.record import Record, RecordOutput, RecordStreamReader, open_path_or_stream
from flow.record.adapter import AbstractReader, AbstractWriter
from flow.record.selector import Selector
from flow.record.utils import is_stdout
Expand All @@ -19,7 +19,7 @@ class StreamWriter(AbstractWriter):
stream = None

def __init__(self, path: str, clobber=True, **kwargs):
self.fp = open_path(path, "wb", clobber=clobber)
self.fp = open_path_or_stream(path, "wb", clobber=clobber)
self.stream = RecordOutput(self.fp)

def write(self, record: Record) -> None:
Expand All @@ -46,7 +46,7 @@ class StreamReader(AbstractReader):
stream = None

def __init__(self, path: str, selector: Union[str, Selector] = None, **kwargs):
self.fp = open_file(path, "rb")
self.fp = open_path_or_stream(path, "rb")
self.stream = RecordStreamReader(self.fp, selector=selector)

def __iter__(self) -> Iterator[Record]:
Expand Down
4 changes: 2 additions & 2 deletions flow/record/adapter/text.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from flow.record import open_path
from flow.record import open_path_or_stream
from flow.record.adapter import AbstractWriter
from flow.record.utils import is_stdout

Expand Down Expand Up @@ -27,7 +27,7 @@ class TextWriter(AbstractWriter):
fp = None

def __init__(self, path, flush=True, format_spec=None, **kwargs):
self.fp = open_path(path, "wb")
self.fp = open_path_or_stream(path, "wb")
self.auto_flush = flush
self.format_spec = format_spec

Expand Down
4 changes: 2 additions & 2 deletions flow/record/adapter/xlsx.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class XlsxWriter(AbstractWriter):
wb = None

def __init__(self, path, **kwargs):
self.fp = record.open_path(path, "wb")
self.fp = record.open_path_or_stream(path, "wb")
self.wb = openpyxl.Workbook()
self.ws = self.wb.active
self.desc = None
Expand Down Expand Up @@ -51,7 +51,7 @@ class XlsxReader(AbstractReader):

def __init__(self, path, selector=None, **kwargs):
self.selector = make_selector(selector)
self.fp = record.open_path(path, "rb")
self.fp = record.open_path_or_stream(path, "rb")
self.desc = None
self.wb = openpyxl.load_workbook(self.fp)
self.ws = self.wb.active
Expand Down
81 changes: 44 additions & 37 deletions flow/record/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,8 @@ def DynamicDescriptor(name, fields):


def open_stream(fp: BinaryIO, mode: str) -> BinaryIO:
if "w" in mode:
return fp
if not hasattr(fp, "peek"):
fp = io.BufferedReader(fp)

Expand Down Expand Up @@ -682,13 +684,13 @@ def find_adapter_for_stream(fp: BinaryIO) -> tuple[BinaryIO, Optional[str]]:
return fp, None


def open_file(path: Union[str, Path, BinaryIO], mode: str, clobber: bool = True) -> IO:
def open_path_or_stream(path: Union[str, Path, BinaryIO], mode: str, clobber: bool = True) -> IO:
if isinstance(path, Path):
path = str(path)
if isinstance(path, str):
return open_path(path, mode, clobber)
elif isinstance(path, io.IOBase):
return open_stream(path, "rb")
return open_stream(path, mode)
else:
raise ValueError(f"Unsupported path type {path}")

Expand Down Expand Up @@ -798,41 +800,45 @@ def RecordAdapter(
cls_url = p.netloc + p.path
if sub_adapter:
cls_url = sub_adapter + "://" + cls_url
elif url in ("-", ""):
# For reading stdin, we cannot rely on an extension to know what sort of stream is incoming. Thus, we will treat
# it as a 'fileobj', where we can peek into the stream and try to select the appropriate adapter.
fileobj = getattr(sys.stdin, "buffer", sys.stdin)
if fileobj is not None:
# This record adapter has received a file-like object for record reading
# We just need to find the right adapter by peeking into the first few bytes.

# First, we open the stream. If the stream is compressed, open_stream will wrap it for us into a decompressor.
cls_stream = open_stream(fileobj, "rb")

# Now, we have a stream that will be transparently decompressed but we still do not know what adapter to use.
# This requires a new peek into the transparent stream. This peek will cause the stream pointer to be moved.
# Therefore, find_adapter_for_stream returns both a BinaryIO-supportive object that can correctly read the
# adjusted stream, and a string indicating the type of adapter to be used on said stream.
arg_dict = kwargs.copy()

# If a user did not provide a url, we have to peek into the stream to be able to determine the right adapter
# based on magic bytes encountered in the first few bytes of the stream.
if adapter is None:
cls_stream, adapter = find_adapter_for_stream(cls_stream)
if out is False:
if url in ("-", ""):
# For reading stdin, we cannot rely on an extension to know what sort of stream is incoming. Thus, we will
# treat it as a 'fileobj', where we can peek into the stream and try to select the appropriate adapter.
fileobj = getattr(sys.stdin, "buffer", sys.stdin)
if fileobj is not None:
# This record adapter has received a file-like object for record reading
# We just need to find the right adapter by peeking into the first few bytes.

# First, we open the stream. If the stream is compressed, open_stream will wrap it for us into a
# decompressor.
cls_stream = open_stream(fileobj, "rb")

# If a user did not provide a url, we have to peek into the stream to be able to determine the right adapter
# based on magic bytes encountered in the first few bytes of the stream.
if adapter is None:
peek_data = cls_stream.peek(RECORDSTREAM_MAGIC_DEPTH)
if peek_data and peek_data.startswith(b"<"):
# As peek() can result in a larger buffer than requested, we make sure the peek_data variable isn't
# unnecessarily long in the error message.
peek_data = peek_data[:RECORDSTREAM_MAGIC_DEPTH]
raise RecordAdapterNotFound(
(
f"Could not find a reader for input {peek_data!r}. Are you perhaps "
"entering record text, rather than a record stream? This can be fixed by using "
"'rdump -w -' to write a record stream to stdout."
# If we could not infere an adapter from the url, we have a stream that will be transparently
# decompressed but we still do not know what adapter to use. This requires a new peek into the
# transparent stream. This peek will cause the stream pointer to be moved. Therefore,
# find_adapter_for_stream returns both a BinaryIO-supportive object that can correctly read the adjusted
# stream, and a string indicating the type of adapter to be used on said stream.
cls_stream, adapter = find_adapter_for_stream(cls_stream)
if adapter is None:
# As peek() can result in a larger buffer than requested, so we truncate it just to be sure
peek_data = cls_stream.peek(RECORDSTREAM_MAGIC_DEPTH)[:RECORDSTREAM_MAGIC_DEPTH]
if peek_data and peek_data.startswith(b"<"):
raise RecordAdapterNotFound(
(
f"Could not find a reader for input {peek_data!r}. Are you perhaps "
"entering record text, rather than a record stream? This can be fixed by using "
"'rdump -w -' to write a record stream to stdout."
)
)
)
raise RecordAdapterNotFound("Could not find adapter for file-like object")
raise RecordAdapterNotFound("Could not find adapter for file-like object")

# Now that we found an adapter, we will fall back into the same code path as when a URL is given. As the url
# parsing path copied kwargs into an arg_dict variable, we will do the same so we do not get a variable
# referenced before assignment error.
arg_dict = kwargs.copy()

# Now that we know which adapter is needed, we import it.
mod = importlib.import_module("flow.record.adapter.{}".format(adapter))
Expand All @@ -845,9 +851,10 @@ def RecordAdapter(
if out:
arg_dict["clobber"] = clobber
log.debug("Creating {!r} for {!r} with args {!r}".format(cls, url, arg_dict))

if fileobj is not None:
if cls_stream is not None:
return cls(cls_stream, **arg_dict)
if fileobj is not None:
return cls(fileobj, **arg_dict)
return cls(cls_url, **arg_dict)


Expand Down
41 changes: 33 additions & 8 deletions tests/test_record_adapter.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,21 @@
import datetime
import platform
import sys
from io import BytesIO

import pytest

try:
from StringIO import StringIO
except ImportError:
from io import BytesIO as StringIO

from flow.record import (
PathTemplateWriter,
RecordAdapter,
RecordArchiver,
RecordDescriptor,
RecordOutput,
RecordReader,
RecordStreamReader,
RecordWriter,
)
from flow.record.adapter.stream import StreamReader
from flow.record.adapter.stream import StreamReader, StreamWriter
from flow.record.base import (
BZ2_MAGIC,
GZIP_MAGIC,
Expand All @@ -33,7 +30,7 @@


def test_stream_writer_reader():
fp = StringIO()
fp = BytesIO()
out = RecordOutput(fp)
for rec in generate_records():
out.write(rec)
Expand All @@ -48,7 +45,7 @@ def test_stream_writer_reader():


def test_recordstream_filelike_object():
fp = StringIO()
fp = BytesIO()
out = RecordOutput(fp)
for rec in generate_records():
out.write(rec)
Expand Down Expand Up @@ -477,3 +474,31 @@ def test_csvfilereader(tmp_path):
with RecordReader(f"csvfile://{path}", selector="r.count == '2'") as reader:
for i, rec in enumerate(reader):
assert rec.count == "2"


def test_file_like_writer_reader() -> None:
test_buf = BytesIO()

adapter = RecordAdapter(fileobj=test_buf, out=True)

assert isinstance(adapter, StreamWriter)

# Add mock records
test_records = list(generate_records(10))
for record in test_records:
adapter.write(record)

adapter.flush()

# Grab the bytes before closing the BytesIO object.
read_buf = BytesIO(test_buf.getvalue())

# Close the writer and assure the object has been closed
adapter.close()

# Verify if the written record stream is something we can read
reader = RecordAdapter(fileobj=read_buf)
read_records = list(reader)
assert len(read_records) == 10
for idx, record in enumerate(read_records):
assert record == test_records[idx]