Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Parquet as a batch encoding option #2044

Merged
merged 51 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
67d100b
Add parquet encoding enum and dataclass
jamielxcarter Nov 7, 2022
e9973b7
WIP
jamielxcarter Nov 8, 2022
0e0c993
Add parquet support and tests
jamielxcarter Nov 16, 2022
85eb205
Merge branch 'main' into parquet-encoding
jamielxcarter Nov 17, 2022
89fa161
Write fastparquet file with fs
jamielxcarter Nov 29, 2022
a8dde0f
Change open_with argument
jamielxcarter Nov 29, 2022
481a3dd
Update fastparquet.write
jamielxcarter Nov 29, 2022
ae6a6fb
WIP
jamielxcarter Nov 29, 2022
6615259
WIP
jamielxcarter Nov 29, 2022
7992a3c
Adding s3fs as dependency
jamielxcarter Nov 29, 2022
91787bb
Remove s3fs
jamielxcarter Nov 29, 2022
4f594b3
Remove fastparquet add pyarrow as dependency
jamielxcarter Dec 1, 2022
de34efc
Add parquet dependency
jamielxcarter Dec 1, 2022
b93a830
Add support for gzip and snappy compression types for parquet
jamielxcarter Dec 2, 2022
3a390a9
Merge branch 'main' into parquet-encoding
jamielxcarter Dec 2, 2022
5d27f8f
Add pyarrow as a core dependency
jamielxcarter Dec 5, 2022
1c021c2
Add numpy for python 3.7-3.11
jamielxcarter Dec 6, 2022
53c530f
Merge branch 'main' into parquet-encoding
jamielxcarter Dec 6, 2022
6f7e28f
Merge branch 'main' into parquet-encoding
edgarrmondragon Dec 6, 2022
c63e661
Add schema parsing
jamielxcarter Dec 12, 2022
e5c0206
Merge branch 'parquet-encoding' of https://github.com/jamielxcarter/s…
jamielxcarter Dec 12, 2022
70fc775
Merge branch 'main' into parquet-encoding
jamielxcarter Dec 12, 2022
6e44e85
Change dict to Dict for parsing types
jamielxcarter Dec 12, 2022
eee06f9
Merge branch 'main' into parquet-encoding
jamielxcarter Oct 26, 2023
3ef8bba
Added Batch Factory
jamielxcarter Oct 26, 2023
e29e5a3
Remove pyarrow as core dependency and wrap logic in dependency checks
jamielxcarter Oct 26, 2023
65d87b9
Merge branch 'meltano:main' into parquet-encoding
jamielxcarter Oct 27, 2023
ad11db0
Added missing quotes
jamielxcarter Oct 27, 2023
8390904
Merge branch 'parquet-encoding' of github.com:jamielxcarter/sdk into …
jamielxcarter Oct 27, 2023
433aa56
Removed json schema to pyarrow schema support
jamielxcarter Nov 1, 2023
a2f1052
Updated poetry.lock to add pyarrow as extra
jamielxcarter Nov 1, 2023
14132b2
Updated formating
jamielxcarter Nov 1, 2023
09e0379
Updated for readability
jamielxcarter Nov 1, 2023
8449ccb
Merge branch 'meltano:main' into parquet-encoding
jamielxcarter Nov 1, 2023
d5dbe37
Added tests to account for missing pyarrow install
jamielxcarter Nov 14, 2023
7b2af3e
Merge branch 'main' into parquet-encoding
jamielxcarter Nov 14, 2023
f6937cf
Addressed ambiguous type issue
jamielxcarter Nov 14, 2023
4820a35
Adding type ignore
jamielxcarter Nov 14, 2023
a407e00
Added type ignore to correct location
jamielxcarter Nov 14, 2023
d62a03b
Update singer_sdk/batch.py
edgarrmondragon Nov 14, 2023
e69bb5f
Merge branch 'main' into parquet-encoding
edgarrmondragon Nov 14, 2023
621110e
Adding back normal imports
jamielxcarter Nov 16, 2023
ce1b2ef
Merge branch 'main' into parquet-encoding
edgarrmondragon Nov 17, 2023
ce8d2e6
Merge branch 'main' into parquet-encoding
edgarrmondragon Nov 21, 2023
90bbcc9
Merge branch 'main' into parquet-encoding
edgarrmondragon Nov 28, 2023
9cff703
mypy: install extras
edgarrmondragon Nov 28, 2023
2ea58b8
Ignore missig pyarrow types
edgarrmondragon Nov 28, 2023
e610bd3
Move batchers to contrib modules
edgarrmondragon Nov 28, 2023
823cbb6
Increase test coverage
edgarrmondragon Nov 28, 2023
65832b1
Fix types
edgarrmondragon Nov 28, 2023
3003777
Test batcher and target
edgarrmondragon Nov 28, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions noxfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
def mypy(session: Session) -> None:
"""Check types with mypy."""
args = session.posargs or ["singer_sdk"]
session.install(".")
session.install(".[s3,testing,parquet]")
session.install(
"mypy",
"pytest",
Expand All @@ -77,7 +77,7 @@ def mypy(session: Session) -> None:
@session(python=python_versions)
def tests(session: Session) -> None:
"""Execute pytest tests and compute coverage."""
session.install(".[s3]")
session.install(".[s3,parquet]")
session.install(*test_dependencies)

sqlalchemy_version = os.environ.get("SQLALCHEMY_VERSION")
Expand Down
27 changes: 14 additions & 13 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 17 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ backports-datetime-fromisoformat = { version = ">=2.0.1", python = "<3.11" }
click = "~=8.0"
cryptography = ">=3.4.6,<42.0.0"
fs = ">=2.4.16"
importlib-metadata = {version = "<7.0.0", markers = "python_version < \"3.8\""}
importlib-metadata = {version = "<7.0.0", python = "<3.12"}
importlib-resources = {version = ">=5.12.0", markers = "python_version < \"3.9\""}
inflection = ">=0.5.1"
joblib = ">=1.0.1"
Expand Down Expand Up @@ -82,6 +82,16 @@ sphinx-reredirects = {version = ">=0.1.1", optional = true}
# File storage dependencies installed as optional 'filesystem' extras
fs-s3fs = {version = ">=1.1.1", optional = true}

# Parquet file dependencies installed as optional 'parquet' extras
numpy = [
{ version = "<1.22", python = "<3.8", optional = true },
{ version = ">=1.22", python = ">=3.8", optional = true },
]
pyarrow = [
{ version = ">=11", python = "<3.8", optional = true },
{ version = ">=13", python = ">=3.8", optional = true }
]

# Testing dependencies installed as optional 'testing' extras
pytest = {version=">=7.2.1", optional = true}
pytest-durations = {version = ">=1.2.0", optional = true}
Expand All @@ -102,6 +112,7 @@ testing = [
"pytest",
"pytest-durations"
]
parquet = ["numpy", "pyarrow"]

[tool.poetry.group.dev.dependencies]
coverage = [
Expand All @@ -114,10 +125,6 @@ mypy = [
{ version = ">=1.0,<1.5", python = "<3.8" },
{ version = ">=1.0", python = ">=3.8" },
]
numpy = [
{ version = "<1.22", python = "<3.8" },
{ version = ">=1.22", python = ">=3.8" },
]
pyarrow = [
{ version = ">=11,<13", python = "<3.8" },
{ version = ">=11", python = ">=3.8" }
Expand Down Expand Up @@ -217,6 +224,7 @@ module = [
"backports.datetime_fromisoformat.*",
"joblib.*", # TODO: Remove when https://github.com/joblib/joblib/issues/1516 is shipped
"jsonpath_ng.*",
"pyarrow.*", # TODO: Remove when https://github.com/apache/arrow/issues/32609 if implemented and released
]

[build-system]
Expand All @@ -226,6 +234,10 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry.scripts]
pytest11 = { reference = "singer_sdk:testing.pytest_plugin", extras = ["testing"], type = "console" }

[tool.poetry.plugins."singer_sdk.batch_encoders"]
jsonl = "singer_sdk.contrib.batch_encoder_jsonl:JSONLinesBatcher"
parquet = "singer_sdk.contrib.batch_encoder_parquet:ParquetBatcher"

[tool.ruff]
line-length = 88
src = ["samples", "singer_sdk", "tests"]
Expand Down
4 changes: 2 additions & 2 deletions samples/sample_tap_countries/schemas/continents.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"type": "object",
"properties": {
"code": { "type": ["null", "string"] },
"name": { "type": ["null", "string"] }
"code": { "type": ["string", "null"] },
"name": { "type": ["string", "null"] }
}
}
89 changes: 55 additions & 34 deletions singer_sdk/batch.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,35 @@
"""Batching utilities for Singer SDK."""
from __future__ import annotations

import gzip
import itertools
import json
import typing as t
import warnings
from abc import ABC, abstractmethod
from uuid import uuid4

from singer_sdk.helpers._compat import entry_points

if t.TYPE_CHECKING:
from singer_sdk.helpers._batch import BatchConfig

_T = t.TypeVar("_T")


def __getattr__(name: str) -> t.Any: # noqa: ANN401 # pragma: no cover
if name == "JSONLinesBatcher":
warnings.warn(
"The class JSONLinesBatcher was moved to singer_sdk.contrib.batch_encoder_jsonl.", # noqa: E501
DeprecationWarning,
stacklevel=2,
)

from singer_sdk.contrib.batch_encoder_jsonl import JSONLinesBatcher

return JSONLinesBatcher

msg = f"module {__name__} has no attribute {name}"
raise AttributeError(msg)


def lazy_chunked_generator(
iterable: t.Iterable[_T],
chunk_size: int,
Expand Down Expand Up @@ -71,41 +87,46 @@ def get_batches(
raise NotImplementedError


class JSONLinesBatcher(BaseBatcher):
"""JSON Lines Record Batcher."""
class Batcher(BaseBatcher):
"""Determines batch type and then serializes batches to that format."""

def get_batches(
self,
records: t.Iterator[dict],
) -> t.Iterator[list[str]]:
"""Yield manifest of batches.
def get_batches(self, records: t.Iterator[dict]) -> t.Iterator[list[str]]:
"""Manifest of batches.

Args:
records: The records to batch.

Yields:
Returns:
A list of file paths (called a manifest).
"""
sync_id = f"{self.tap_name}--{self.stream_name}-{uuid4()}"
prefix = self.batch_config.storage.prefix or ""

for i, chunk in enumerate(
lazy_chunked_generator(
records,
self.batch_config.batch_size,
),
start=1,
):
filename = f"{prefix}{sync_id}-{i}.json.gz"
with self.batch_config.storage.fs(create=True) as fs:
# TODO: Determine compression from config.
with fs.open(filename, "wb") as f, gzip.GzipFile(
fileobj=f,
mode="wb",
) as gz:
gz.writelines(
(json.dumps(record, default=str) + "\n").encode()
for record in chunk
)
file_url = fs.geturl(filename)
yield [file_url]
encoding_format = self.batch_config.encoding.format
batcher_type = self.get_batcher(encoding_format)
batcher = batcher_type(
self.tap_name,
self.stream_name,
self.batch_config,
)
return batcher.get_batches(records)

@classmethod
def get_batcher(cls, name: str) -> type[BaseBatcher]:
"""Get a batcher by name.

Args:
name: The name of the batcher.

Returns:
The batcher class.

Raises:
ValueError: If the batcher is not found.
"""
plugins = entry_points(group="singer_sdk.batch_encoders")

try:
plugin = next(filter(lambda x: x.name == name, plugins))
except StopIteration:
message = f"Unsupported batcher: {name}"
raise ValueError(message) from None

return plugin.load() # type: ignore[no-any-return]
1 change: 1 addition & 0 deletions singer_sdk/contrib/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Singer SDK contrib modules."""
52 changes: 52 additions & 0 deletions singer_sdk/contrib/batch_encoder_jsonl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""JSON Lines Record Batcher."""

from __future__ import annotations

import gzip
import json
import typing as t
from uuid import uuid4

from singer_sdk.batch import BaseBatcher, lazy_chunked_generator

__all__ = ["JSONLinesBatcher"]


class JSONLinesBatcher(BaseBatcher):
"""JSON Lines Record Batcher."""

def get_batches(
self,
records: t.Iterator[dict],
) -> t.Iterator[list[str]]:
"""Yield manifest of batches.

Args:
records: The records to batch.

Yields:
A list of file paths (called a manifest).
"""
sync_id = f"{self.tap_name}--{self.stream_name}-{uuid4()}"
prefix = self.batch_config.storage.prefix or ""

for i, chunk in enumerate(
lazy_chunked_generator(
records,
self.batch_config.batch_size,
),
start=1,
):
filename = f"{prefix}{sync_id}-{i}.json.gz"
with self.batch_config.storage.fs(create=True) as fs:
# TODO: Determine compression from config.
with fs.open(filename, "wb") as f, gzip.GzipFile(
fileobj=f,
mode="wb",
) as gz:
gz.writelines(
(json.dumps(record, default=str) + "\n").encode()
for record in chunk
)
file_url = fs.geturl(filename)
yield [file_url]
54 changes: 54 additions & 0 deletions singer_sdk/contrib/batch_encoder_parquet.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
"""Parquet Record Batcher."""

from __future__ import annotations

import typing as t
from uuid import uuid4

from singer_sdk.batch import BaseBatcher, lazy_chunked_generator

__all__ = ["ParquetBatcher"]


class ParquetBatcher(BaseBatcher):
"""Parquet Record Batcher."""

def get_batches(
self,
records: t.Iterator[dict],
) -> t.Iterator[list[str]]:
"""Yield manifest of batches.

Args:
records: The records to batch.

Yields:
A list of file paths (called a manifest).
"""
import pyarrow as pa
import pyarrow.parquet as pq

sync_id = f"{self.tap_name}--{self.stream_name}-{uuid4()}"
prefix = self.batch_config.storage.prefix or ""

for i, chunk in enumerate(
lazy_chunked_generator(
records,
self.batch_config.batch_size,
),
start=1,
):
filename = f"{prefix}{sync_id}={i}.parquet"
if self.batch_config.encoding.compression == "gzip":
filename = f"{filename}.gz"
with self.batch_config.storage.fs() as fs:
with fs.open(filename, "wb") as f:
pylist = list(chunk)
table = pa.Table.from_pylist(pylist)
if self.batch_config.encoding.compression == "gzip":
pq.write_table(table, f, compression="GZIP")
else:
pq.write_table(table, f)

file_url = fs.geturl(filename)
yield [file_url]
Loading