Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Generic incremental replication support for BATCH mode in SQL taps #1894

Closed
wants to merge 9 commits into from
Closed
13 changes: 13 additions & 0 deletions samples/sample_tap_duckdb/batch.config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{
"database": "needful_things.duckdb",
"batch_config": {
"batch_size": 10000,
"encoding": {
"format": "csv",
"compression": "none"
},
"storage": {
"root": "/Users/edgarramirez/Code/meltano/sdk/samples/sample_tap_duckdb/out"
}
}
}
857 changes: 857 additions & 0 deletions samples/sample_tap_duckdb/catalog.json

Large diffs are not rendered by default.

Binary file added samples/sample_tap_duckdb/data.duckdb
Binary file not shown.
Binary file added samples/sample_tap_duckdb/needful_things.duckdb
Binary file not shown.
1,504 changes: 1,504 additions & 0 deletions samples/sample_tap_duckdb/poetry.lock

Large diffs are not rendered by default.

23 changes: 23 additions & 0 deletions samples/sample_tap_duckdb/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[tool.poetry]
name = "sample-tap-duckdb"
version = "0.0.0"
description = ""
authors = ["Edgar Ramírez Mondragón <edgar@meltano.com>"]
license = "Apache-2.0"
packages = [{include = "tap_duckdb"}]

[tool.poetry.dependencies]
python = ">=3.9,<3.12"
duckdb-engine = "^0.9.2"
singer-sdk = {path = "../..", develop = true}

[tool.poetry.group.dev.dependencies]
singer-sdk = {path = "../..", develop = true, extras = ["testing"]}
ipython = "^8.14.0"

[tool.poetry.scripts]
sample-tap-duckdb = "tap_duckdb.tap:TapDuckDB.cli"

[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
Empty file.
111 changes: 111 additions & 0 deletions samples/sample_tap_duckdb/tap_duckdb/tap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
from __future__ import annotations

import typing as t
import uuid
from textwrap import dedent

import sqlalchemy as sa

from singer_sdk import SQLConnector, SQLTap
from singer_sdk import typing as th
from singer_sdk.exceptions import (
UnsupportedBatchCompressionError,
UnsupportedBatchFormatError,
)
from singer_sdk.helpers._batch import BatchFileFormat, CSVEncoding, JSONLinesEncoding
from singer_sdk.streams import SQLStream

if t.TYPE_CHECKING:
from singer_sdk.helpers._batch import BatchConfig


class DuckDBConnector(SQLConnector):
def get_sqlalchemy_url(self, config) -> str:
"""Return a SQLAlchemy URL."""
return f"duckdb:///{config['database']}"


class DuckDBStream(SQLStream):
connector_class = DuckDBConnector

def get_jsonl_copy_options(self, encoding: JSONLinesEncoding, filename: str):
options = ["FORMAT JSON"]
if encoding.compression is None or encoding.compression == "none":
filename = f"{filename}.jsonl"
elif encoding.compression == "gzip":
options.append("COMPRESSION GZIP")
filename = f"{filename}.jsonl.gz"
else:
raise UnsupportedBatchCompressionError(encoding.compression)

return options, filename

def get_csv_copy_options(self, encoding: CSVEncoding, filename: str):
options = ["FORMAT CSV", f"DELIMITER '{encoding.delimiter}'"]
if encoding.header:
options.append("HEADER")

if encoding.compression is None or encoding.compression == "none":
filename = f"{filename}.csv"
elif encoding.compression == "gzip":
options.append("COMPRESSION GZIP")
filename = f"{filename}.csv.gz"
else:
raise UnsupportedBatchCompressionError(encoding.compression)

return options, filename

def get_batches(self, batch_config: BatchConfig, context=None): # noqa: ARG002
prefix = batch_config.storage.prefix or ""
sync_id = f"{self.tap_name}--{self.name}-{uuid.uuid4()}"
filename = f"{prefix}{sync_id}"
file_format = batch_config.encoding.format

if file_format == BatchFileFormat.JSONL:
options, filename = self.get_jsonl_copy_options(
batch_config.encoding,
filename,
)
elif file_format == BatchFileFormat.CSV:
options, filename = self.get_csv_copy_options(
batch_config.encoding,
filename,
)
else:
raise UnsupportedBatchFormatError(file_format)

table_name = self.fully_qualified_name
filepath = f"{batch_config.storage.root}/{filename}"
copy_options = ",".join(options)
query = sa.text(
dedent(
f"COPY (SELECT * FROM {table_name}) TO '{filepath}' " # noqa: S608
f"({copy_options})",
),
)
query = query.execution_options(autocommit=True)

with self.connector._connect() as conn:
conn.execute(query)

files = [filepath]
yield (batch_config.encoding, files)


class TapDuckDB(SQLTap):
name = "tap-duckdb"
default_stream_class = DuckDBStream

config_jsonschema = th.PropertiesList(
th.Property(
"database",
th.StringType,
required=True,
description="Path to the DuckDB database file.",
default="data.duckdb",
),
).to_dict()


if __name__ == "__main__":
TapDuckDB.cli()
108 changes: 107 additions & 1 deletion singer_sdk/batch.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
"""Batching utilities for Singer SDK."""
from __future__ import annotations

import csv
import gzip
import io
import itertools
import json
import typing as t
from abc import ABC, abstractmethod
from uuid import uuid4

from singer_sdk.exceptions import UnsupportedBatchCompressionError
from singer_sdk.helpers._batch import BatchFileCompression

if t.TYPE_CHECKING:
from singer_sdk.helpers._batch import BatchConfig
from fs.base import FS

from singer_sdk.helpers._batch import BatchConfig, CSVEncoding

_T = t.TypeVar("_T")

@@ -109,3 +116,102 @@ def get_batches(
)
file_url = fs.geturl(filename)
yield [file_url]


class CSVBatcher(BaseBatcher):
"""JSON array Record Batcher."""

def __init__(
self,
tap_name: str,
stream_name: str,
batch_config: BatchConfig[CSVEncoding],
) -> None:
"""Initialize the batcher.
Args:
tap_name: The name of the tap.
stream_name: The name of the stream.
batch_config: The batch configuration.
"""
super().__init__(tap_name, stream_name, batch_config)
self.encoding = self.batch_config.encoding

def get_batches(
self,
records: t.Iterator[dict],
) -> t.Iterator[list[str]]:
"""Yield manifest of batches.
Args:
records: The records to batch.
Yields:
A list of file paths (called a manifest).
Raises:
UnsupportedBatchCompressionError: If the compression is not supported.
"""
sync_id = f"{self.tap_name}--{self.stream_name}-{uuid4()}"
prefix = self.batch_config.storage.prefix or ""

for i, chunk in enumerate(
lazy_chunked_generator(
records,
self.batch_config.batch_size,
),
start=1,
):
with self.batch_config.storage.fs(create=True) as fs:
if self.encoding.compression == BatchFileCompression.GZIP:
filename = f"{prefix}{sync_id}-{i}.csv.gz"
self._write_gzip(fs, filename, chunk)
elif self.encoding.compression == BatchFileCompression.NONE:
filename = f"{prefix}{sync_id}-{i}.csv"
self._write_plain(fs, filename, chunk)
else:
raise UnsupportedBatchCompressionError(
self.encoding.compression or "none",
)
file_url = fs.geturl(filename)
yield [file_url]

def _write_gzip(self, fs: FS, filename: str, records: t.Iterator[dict]) -> None:
first_record = next(records, None)

if first_record is None:
return

with fs.open(filename, "wb") as f, gzip.GzipFile(
fileobj=f,
mode="wb",
) as gz:
string_buffer = io.StringIO()
writer = csv.DictWriter(
string_buffer,
fieldnames=first_record.keys(),
delimiter=self.encoding.delimiter,
)
if self.encoding.header:
writer.writeheader()
writer.writerow(first_record)
writer.writerows(records)
gz.write(string_buffer.getvalue().encode())

def _write_plain(self, fs: FS, filename: str, records: t.Iterator[dict]) -> None:
first_record = next(records, None)

if first_record is None:
return

with fs.open(filename, "w") as f:
writer = csv.DictWriter(
f,
fieldnames=first_record.keys(),
delimiter=self.encoding.delimiter,
)
if self.encoding.header:
writer.writeheader()
writer.writeheader()
writer.writerow(first_record)
writer.writerows(records)
7 changes: 7 additions & 0 deletions singer_sdk/connectors/sql.py
Original file line number Diff line number Diff line change
@@ -509,7 +509,12 @@ def discover_catalog_entries(self) -> list[dict]:
result: list[dict] = []
engine = self._engine
inspected = sqlalchemy.inspect(engine)
processed_schemas = {}
for schema_name in self.get_schema_names(engine, inspected):
# Skip schemas that have already been seen
if schema_name in processed_schemas:
continue

# Iterate through each table and view
for table_name, is_view in self.get_object_names(
engine,
@@ -525,6 +530,8 @@ def discover_catalog_entries(self) -> list[dict]:
)
result.append(catalog_entry.to_dict())

processed_schemas[schema_name] = True

return result

def parse_full_table_name(
24 changes: 24 additions & 0 deletions singer_sdk/exceptions.py
Original file line number Diff line number Diff line change
@@ -123,3 +123,27 @@ class ConformedNameClashException(Exception):

class MissingKeyPropertiesError(Exception):
"""Raised when a recieved (and/or transformed) record is missing key properties."""


class UnsupportedBatchFormatError(Exception):
"""Raised when an unsupported batch format is requested."""

def __init__(self, batch_format: str) -> None:
"""Initialize the exception.
Args:
batch_format: The unsupported batch format.
"""
super().__init__(f"Unsupported batch format: {batch_format}")


class UnsupportedBatchCompressionError(Exception):
"""Raised when an unsupported batch compression is requested."""

def __init__(self, compression: str) -> None:
"""Initialize the exception.
Args:
compression: The unsupported batch compression.
"""
super().__init__(f"Unsupported batch format: {compression}")
41 changes: 37 additions & 4 deletions singer_sdk/helpers/_batch.py
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@
import platform
import typing as t
from contextlib import contextmanager
from dataclasses import asdict, dataclass, field
from dataclasses import asdict, dataclass, field, fields
from urllib.parse import ParseResult, urlencode, urlparse

import fs
@@ -25,6 +25,19 @@ class BatchFileFormat(str, enum.Enum):
JSONL = "jsonl"
"""JSON Lines format."""

CSV = "csv"
"""CSV format."""


class BatchFileCompression(str, enum.Enum):
"""Batch file compression."""

NONE = "none"
"""No compression."""

GZIP = "gzip"
"""GZIP compression."""


@dataclass
class BaseBatchFileEncoding:
@@ -59,7 +72,11 @@ def from_dict(cls, data: dict[str, t.Any]) -> BaseBatchFileEncoding:
data = data.copy()
encoding_format = data.pop("format")
encoding_cls = cls.registered_encodings[encoding_format]
return encoding_cls(**data)
return encoding_cls(
**{
f.name: data[f.name] for f in fields(encoding_cls) if f.name != "format"
},
)


@dataclass
@@ -69,6 +86,19 @@ class JSONLinesEncoding(BaseBatchFileEncoding):
__encoding_format__ = "jsonl"


@dataclass
class CSVEncoding(BaseBatchFileEncoding):
"""JSON Lines encoding for batch files."""

__encoding_format__ = "csv"

delimiter: str = ","
"""The delimiter of the CSV file."""

header: bool = True
"""Whether the CSV file has a header row."""


@dataclass
class SDKBatchMessage(Message):
"""Singer batch message in the Meltano Singer SDK flavor."""
@@ -201,11 +231,14 @@ def open( # noqa: A003
filesystem.close()


T = t.TypeVar("T", bound=BaseBatchFileEncoding)


@dataclass
class BatchConfig:
class BatchConfig(t.Generic[T]):
"""Batch configuration."""

encoding: BaseBatchFileEncoding
encoding: T
"""The encoding of the batch file."""

storage: StorageTarget
7 changes: 7 additions & 0 deletions singer_sdk/helpers/_typing.py
Original file line number Diff line number Diff line change
@@ -59,6 +59,13 @@ def append_type(type_dict: dict, new_type: str) -> dict:
result["anyOf"] = [result["anyOf"], new_type]
return result

if "oneOf" in result:
if isinstance(result["oneOf"], list) and new_type not in result["oneOf"]:
result["oneOf"].append({"type": new_type})
elif new_type != result["oneOf"]:
result["oneOf"] = [result["oneOf"], {"type": new_type}]
return result

if "type" in result:
type_array = (
result["type"] if isinstance(result["type"], list) else [result["type"]]
42 changes: 31 additions & 11 deletions singer_sdk/helpers/capabilities.py
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@

from singer_sdk.typing import (
BooleanType,
DiscriminatedUnion,
IntegerType,
ObjectType,
PropertiesList,
@@ -58,18 +59,37 @@
Property(
"encoding",
description="Specifies the format and compression of the batch files.",
wrapped=ObjectType(
Property(
"format",
StringType,
allowed_values=["jsonl"],
description="Format to use for batch files.",
wrapped=DiscriminatedUnion(
"format",
jsonl=ObjectType(
Property(
"compression",
StringType,
allowed_values=["gzip", "none"],
description="Compression format to use for batch files.",
),
),
Property(
"compression",
StringType,
allowed_values=["gzip", "none"],
description="Compression format to use for batch files.",
csv=ObjectType(
Property(
"compression",
StringType,
allowed_values=["gzip", "none"],
description="Compression format to use for batch files.",
),
Property(
"delimiter",
StringType,
description="Delimiter to use for batch files.",
default=",",
),
Property(
"header",
BooleanType,
description=(
"Whether to include a header row in batch files."
),
default=True,
),
),
),
),
16 changes: 16 additions & 0 deletions singer_sdk/streams/core.py
Original file line number Diff line number Diff line change
@@ -22,10 +22,14 @@
InvalidReplicationKeyException,
InvalidStreamSortException,
MaxRecordsLimitException,
UnsupportedBatchCompressionError,
UnsupportedBatchFormatError,
)
from singer_sdk.helpers._batch import (
BaseBatchFileEncoding,
BatchConfig,
BatchFileCompression,
BatchFileFormat,
SDKBatchMessage,
)
from singer_sdk.helpers._catalog import pop_deselected_record_properties
@@ -1348,7 +1352,19 @@ def get_batches(
Yields:
A tuple of (encoding, manifest) for each batch.
Raises:
UnsupportedBatchCompressionError: Raised if the batch compression is not
supported by the stream.
UnsupportedBatchFormatError: Raised if the batch format is not supported
by the stream.
"""
if batch_config.encoding.format != BatchFileFormat.JSONL:
raise UnsupportedBatchFormatError(batch_config.encoding.format)

if batch_config.encoding.compression != BatchFileCompression.GZIP:
raise UnsupportedBatchCompressionError(batch_config.encoding.compression)

batcher = JSONLinesBatcher(
tap_name=self.tap_name,
stream_name=self.name,
2 changes: 1 addition & 1 deletion singer_sdk/target_base.py
Original file line number Diff line number Diff line change
@@ -370,7 +370,7 @@ def _process_schema_message(self, message_dict: dict) -> None:

stream_name = message_dict["stream"]
schema = message_dict["schema"]
key_properties = message_dict.get("key_properties", None)
key_properties = message_dict.get("key_properties")
do_registration = False
if stream_name not in self.mapper.stream_maps:
do_registration = True
90 changes: 89 additions & 1 deletion tests/core/test_batch.py
Original file line number Diff line number Diff line change
@@ -6,13 +6,58 @@

import pytest

from singer_sdk.batch import JSONLinesBatcher
from singer_sdk.batch import CSVBatcher, JSONLinesBatcher
from singer_sdk.exceptions import (
UnsupportedBatchCompressionError,
UnsupportedBatchFormatError,
)
from singer_sdk.helpers._batch import (
BaseBatchFileEncoding,
BatchConfig,
BatchFileFormat,
CSVEncoding,
JSONLinesEncoding,
StorageTarget,
)
from singer_sdk.streams.core import Stream
from singer_sdk.tap_base import Tap


class DefaultBatchesStream(Stream):
name = "default_batches"
schema = { # noqa: RUF012
"properties": {
"id": {"type": "integer"},
"name": {"type": "string"},
"amount": {"type": "number"},
},
}

def get_records(self, _):
yield {"id": 1, "name": "foo", "amount": decimal.Decimal("1.23")}
yield {"id": 2, "name": "bar", "amount": decimal.Decimal("4.56")}
yield {"id": 3, "name": "baz", "amount": decimal.Decimal("7.89")}


class CustomBatchesStream(DefaultBatchesStream):
name = "custom_batches"

def get_batches(self, batch_config, context=None):
if batch_config.encoding.format == BatchFileFormat.CSV:
csv_batcher = CSVBatcher(self.tap_name, self.name, batch_config)
records = self._sync_records(context, write_messages=False)
for manifest in csv_batcher.get_batches(records=records):
yield batch_config.encoding, manifest
else:
yield from super().get_batches(batch_config, context)


class BatchedTap(Tap):
name = "tap-batched"
config_jsonschema = {"properties": {}} # noqa: RUF012

def discover_streams(self):
return [DefaultBatchesStream(self), CustomBatchesStream(self)]


@pytest.mark.parametrize(
@@ -125,3 +170,46 @@ def test_json_lines_batcher():
for batch in batches
for filepath in batch
)


@pytest.mark.parametrize(
"compression",
[
pytest.param("gzip", id="gzip"),
pytest.param("none", id="none"),
],
)
def test_csv_batches(compression: str):
batch_config = BatchConfig(
encoding=CSVEncoding(compression=compression),
storage=StorageTarget("file:///tmp/sdk-batches"),
batch_size=2,
)
tap = BatchedTap()
streams = tap.streams
batches = list(streams["custom_batches"].get_batches(batch_config=batch_config))
assert len(batches) == 2


def test_csv_batches_unsupported_format():
batch_config = BatchConfig(
encoding=CSVEncoding(compression="unknown"),
storage=StorageTarget("file:///tmp/sdk-batches"),
batch_size=2,
)
tap = BatchedTap()
streams = tap.streams
with pytest.raises(UnsupportedBatchCompressionError):
list(streams["custom_batches"].get_batches(batch_config=batch_config))


def test_jsonl_batches_unsupported_format():
batch_config = BatchConfig(
encoding=CSVEncoding(compression="none"),
storage=StorageTarget("file:///tmp/sdk-batches"),
batch_size=2,
)
tap = BatchedTap()
streams = tap.streams
with pytest.raises(UnsupportedBatchFormatError):
list(streams["default_batches"].get_batches(batch_config=batch_config))