Skip to content

Commit

Permalink
feat: Allow SQL tap developers to auto-skip certain stream names from…
Browse files Browse the repository at this point in the history
… discovery
  • Loading branch information
edgarrmondragon committed Jun 17, 2024
1 parent b233d3a commit 38f207e
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 4 deletions.
15 changes: 12 additions & 3 deletions singer_sdk/connectors/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -501,16 +501,26 @@ def discover_catalog_entry(
replication_key=None, # Must be defined by user
)

def discover_catalog_entries(self) -> list[dict]:
def discover_catalog_entries(
self,
*,
skip_schemas: t.Sequence[str] = (),
) -> list[dict]:
"""Return a list of catalog entries from discovery.
Args:
skip_schemas: A list of schema names to skip.
Returns:
The discovered catalog entries as a list.
"""
result: list[dict] = []
engine = self._engine
inspected = sa.inspect(engine)
for schema_name in self.get_schema_names(engine, inspected):
if schema_name in skip_schemas:
continue

Check warning on line 522 in singer_sdk/connectors/sql.py

View check run for this annotation

Codecov / codecov/patch

singer_sdk/connectors/sql.py#L522

Added line #L522 was not covered by tests

# Iterate through each table and view
for table_name, is_view in self.get_object_names(
engine,
Expand Down Expand Up @@ -749,8 +759,7 @@ def prepare_schema(self, schema_name: str) -> None:
Args:
schema_name: The target schema name.
"""
schema_exists = self.schema_exists(schema_name)
if not schema_exists:
if not self.schema_exists(schema_name):
self.create_schema(schema_name)

def prepare_table(
Expand Down
7 changes: 6 additions & 1 deletion singer_sdk/tap_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,9 @@ class SQLTap(Tap):
querying a database's system tables).
"""

skip_schemas: t.Sequence[str] = []
"""Hard-coded list of stream names to skip when discovering the catalog."""

_tap_connector: SQLConnector | None = None

def __init__(self, *args: t.Any, **kwargs: t.Any) -> None:
Expand Down Expand Up @@ -678,7 +681,9 @@ def catalog_dict(self) -> dict:
connector = self.tap_connector

result: dict[str, list[dict]] = {"streams": []}
result["streams"].extend(connector.discover_catalog_entries())
result["streams"].extend(
connector.discover_catalog_entries(skip_schemas=self.skip_schemas),
)

self._catalog_dict = result
return self._catalog_dict
Expand Down

0 comments on commit 38f207e

Please sign in to comment.