Skip to content

Commit

Permalink
Move tap logic upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Sep 18, 2024
1 parent 3d08c62 commit 2aabcc4
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 107 deletions.
104 changes: 3 additions & 101 deletions samples/sample_tap_csv/sample_tap_csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,109 +2,11 @@

from __future__ import annotations

import enum
import functools
import os
from singer_sdk.contrib.filesystem import FolderTap

import fsspec

import singer_sdk.typing as th
from samples.sample_tap_csv.client import CSVStream
from singer_sdk import Tap
from singer_sdk.contrib.filesystem.stream import SDC_META_FILEPATH

DEFAULT_MERGE_STREAM_NAME = "files"


def file_path_to_stream_name(file_path: str) -> str:
"""Convert a file path to a stream name."""
return os.path.basename(file_path).replace(".csv", "").replace(os.sep, "__") # noqa: PTH119


class ReadMode(str, enum.Enum):
"""Sync mode for the tap."""

one_stream_per_file = "one_stream_per_file"
merge = "merge"


class SampleTapCSV(Tap):
class SampleTapCSV(FolderTap):
"""Sample Tap for CSV files."""

name = "sample-tap-csv"

config_jsonschema = th.PropertiesList(
th.Property(
"path",
th.StringType,
required=True,
description="Path to CSV files.",
),
th.Property(
"read_mode",
th.StringType,
required=True,
description=(
"Use `one_stream_per_file` to read each file as a separate stream, or "
"`merge` to merge all files into a single stream."
),
allowed_values=[
ReadMode.one_stream_per_file,
ReadMode.merge,
],
),
th.Property(
"stream_name",
th.StringType,
required=True,
default=DEFAULT_MERGE_STREAM_NAME,
description="Name of the stream to use when `read_mode` is `merge`.",
),
# TODO(edgarmondragon): Other configuration options.
).to_dict()

@functools.cached_property
def read_mode(self) -> ReadMode:
return ReadMode(self.config["read_mode"])

def discover_streams(self) -> list:
# TODO(edgarmondragon): Implement stream discovery, based on the configured path
# and read mode.
# A directory for now, but could be a glob pattern.
path: str = self.config["path"]

fs: fsspec.AbstractFileSystem = fsspec.filesystem("local")

# One stream per file
if self.read_mode == ReadMode.one_stream_per_file:
if fs.isdir(path):
return [
CSVStream(
tap=self,
name=file_path_to_stream_name(member),
partitions=[{SDC_META_FILEPATH: os.path.join(path, member)}], # noqa: PTH118
)
for member in os.listdir(path)
if member.endswith(".csv")
]

msg = f"Path {path} is not a directory."
raise ValueError(msg)

# Merge
if fs.isdir(path):
contexts = [
{
SDC_META_FILEPATH: os.path.join(path, member), # noqa: PTH118
}
for member in os.listdir(path)
if member.endswith(".csv")
]
return [
CSVStream(
tap=self,
name=self.config["stream_name"],
partitions=contexts,
)
]
return []
valid_extensions: tuple[str, ...] = (".csv",)
3 changes: 2 additions & 1 deletion singer_sdk/contrib/filesystem/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@
from __future__ import annotations

from singer_sdk.contrib.filesystem.stream import FileStream
from singer_sdk.contrib.filesystem.tap import FolderTap

__all__ = ["FileStream"]
__all__ = ["FileStream", "FolderTap"]
4 changes: 2 additions & 2 deletions singer_sdk/contrib/filesystem/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __init__(
tap: Tap,
name: str,
*,
partitions: list[Context] | None = None,
partitions: list[dict[str, t.Any]] | None = None,
) -> None:
"""Create a new FileStream instance.
Expand All @@ -59,7 +59,7 @@ def __init__(
self._partitions = partitions or []

@property
def partitions(self) -> list[Context]:
def partitions(self) -> list[dict[str, t.Any]]:
"""Return the list of partitions for this stream."""
return self._partitions

Expand Down
127 changes: 127 additions & 0 deletions singer_sdk/contrib/filesystem/tap.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
"""Singer tap for files in a directory."""

from __future__ import annotations

import enum
import functools
import os
from pathlib import Path

import fsspec

import singer_sdk.typing as th
from samples.sample_tap_csv.client import CSVStream
from singer_sdk import Tap
from singer_sdk.contrib.filesystem.stream import SDC_META_FILEPATH

DEFAULT_MERGE_STREAM_NAME = "files"


def file_path_to_stream_name(file_path: str) -> str:
"""Convert a file path to a stream name.
- Get rid of any extensions
- Preserve the full path, but replace slashes with double underscores
Args:
file_path: The file path to convert.
Returns:
The stream name.
"""
path_obj = Path(file_path)
return path_obj.with_suffix("").as_posix().replace("/", "__")


class ReadMode(str, enum.Enum):
"""Sync mode for the tap."""

one_stream_per_file = "one_stream_per_file"
merge = "merge"


class FolderTap(Tap):
"""Singer tap for files in a directory."""

valid_extensions: tuple[str, ...]

config_jsonschema = th.PropertiesList(
th.Property(
"path",
th.StringType,
required=True,
description="Path to CSV files.",
),
th.Property(
"read_mode",
th.StringType,
required=True,
description=(
"Use `one_stream_per_file` to read each file as a separate stream, or "
"`merge` to merge all files into a single stream."
),
allowed_values=[
ReadMode.one_stream_per_file,
ReadMode.merge,
],
),
th.Property(
"stream_name",
th.StringType,
required=True,
default=DEFAULT_MERGE_STREAM_NAME,
description="Name of the stream to use when `read_mode` is `merge`.",
),
# TODO(edgarmondragon): Other configuration options.
).to_dict()

@functools.cached_property
def read_mode(self) -> ReadMode:
"""Folder read mode."""
return ReadMode(self.config["read_mode"])

def discover_streams(self) -> list:
"""Return a list of discovered streams.
Raises:
ValueError: If the path does not exist or is not a directory.
"""
# TODO(edgarmondragon): Implement stream discovery, based on the configured path
# and read mode.
# A directory for now, but could be a glob pattern.
path: str = self.config["path"]

fs: fsspec.AbstractFileSystem = fsspec.filesystem("local")

if not fs.exists(path) or not fs.isdir(path):
# Raise a more specific error if the path is not a directory.
msg = f"Path {path} does not exist or is not a directory"
raise ValueError(msg)

Check warning on line 99 in singer_sdk/contrib/filesystem/tap.py

View check run for this annotation

Codecov / codecov/patch

singer_sdk/contrib/filesystem/tap.py#L98-L99

Added lines #L98 - L99 were not covered by tests

# One stream per file
if self.read_mode == ReadMode.one_stream_per_file:
return [
CSVStream(
tap=self,
name=file_path_to_stream_name(member),
partitions=[{SDC_META_FILEPATH: os.path.join(path, member)}], # noqa: PTH118
)
for member in os.listdir(path)
if member.endswith(self.valid_extensions)
]

# Merge
contexts = [
{
SDC_META_FILEPATH: os.path.join(path, member), # noqa: PTH118
}
for member in os.listdir(path)
if member.endswith(self.valid_extensions)
]
return [
CSVStream(
tap=self,
name=self.config["stream_name"],
partitions=contexts,
)
]
6 changes: 3 additions & 3 deletions singer_sdk/streams/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -735,7 +735,7 @@ def stream_state(self) -> dict:
# Partitions

@property
def partitions(self) -> list[types.Context] | None:
def partitions(self) -> list[dict] | None:
"""Get stream partitions.
Developers may override this property to provide a default partitions list.
Expand All @@ -746,7 +746,7 @@ def partitions(self) -> list[types.Context] | None:
Returns:
A list of partition key dicts (if applicable), otherwise `None`.
"""
result: list[types.Mapping] = [
result: list[dict] = [
partition_state["context"]
for partition_state in (
get_state_partitions_list(self.tap_state, self.name) or []
Expand Down Expand Up @@ -1106,7 +1106,7 @@ def _sync_records( # noqa: C901

record_index = 0
context_element: types.Context | None
context_list: list[types.Context] | None
context_list: list[types.Context] | list[dict] | None
context_list = [context] if context is not None else self.partitions
selected = self.selected

Expand Down

0 comments on commit 2aabcc4

Please sign in to comment.