Skip to content

Commit

Permalink
Remove default implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Oct 22, 2024
1 parent 4dab2f3 commit ca5a44a
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 34 deletions.
34 changes: 6 additions & 28 deletions singer_sdk/connectors/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -990,41 +990,19 @@ def prepare_table(
def prepare_primary_key(
self,
*,
full_table_name: str | FullyQualifiedName,
primary_keys: t.Sequence[str],
full_table_name: str | FullyQualifiedName, # noqa: ARG002
primary_keys: t.Sequence[str], # noqa: ARG002
) -> None:
"""Adapt target table primary key to provided schema if possible.
Implement this method in a subclass to adapt the primary key of the target table
to the provided one if possible.
Args:
full_table_name: the target table name.
primary_keys: list of key properties.
"""
_, schema_name, table_name = self.parse_full_table_name(full_table_name)
meta = sa.MetaData(schema=schema_name)
meta.reflect(bind=self._engine, only=[table_name])
table = meta.tables[full_table_name] # type: ignore[index]
current_pk_cols = [col.name for col in table.primary_key.columns]

# Nothing to do
if current_pk_cols == primary_keys:
return

new_pk = sa.PrimaryKeyConstraint(*primary_keys)

# If table has no primary key, add the provided one
if not current_pk_cols:
with self._connect() as conn, conn.begin():
conn.execute(sa.schema.AddConstraint(new_pk))
return

# Drop the existing primary key
with self._connect() as conn, conn.begin():
conn.execute(sa.schema.DropConstraint(table.primary_key))

# Add the new primary key
if primary_keys:
with self._connect() as conn, conn.begin():
conn.execute(sa.schema.AddConstraint(new_pk))
self.logger.debug("Primary key adaptation is not implemented")

def prepare_column(
self,
Expand Down
8 changes: 2 additions & 6 deletions tests/core/test_connector_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,7 @@ def test_column_rename(self, connector: DuckDBConnector):
connector.rename_column("test_table", "old_name", "new_name")

with engine.connect() as conn:
result = conn.execute(
sa.text("SELECT * FROM test_table"),
)
result = conn.execute(sa.text("SELECT * FROM test_table"))
assert result.keys() == ["id", "new_name"]

def test_adapt_column_type(self, connector: DuckDBConnector):
Expand All @@ -341,9 +339,7 @@ def test_adapt_column_type(self, connector: DuckDBConnector):
connector._adapt_column_type("test_table", "name", sa.types.String())

with engine.connect() as conn:
result = conn.execute(
sa.text("SELECT * FROM test_table"),
)
result = conn.execute(sa.text("SELECT * FROM test_table"))
assert result.keys() == ["id", "name"]
assert result.cursor.description[1][1] == "STRING"

Expand Down

0 comments on commit ca5a44a

Please sign in to comment.