From 06f54d078da3bf65a81f2e68110eeb4896640e04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Mon, 21 Oct 2024 12:02:03 -0600 Subject: [PATCH 1/5] feat: Added `SQLConnector.prepare_primary_key` --- singer_sdk/connectors/sql.py | 44 ++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/singer_sdk/connectors/sql.py b/singer_sdk/connectors/sql.py index 87eb1eb98..b980e4104 100644 --- a/singer_sdk/connectors/sql.py +++ b/singer_sdk/connectors/sql.py @@ -982,6 +982,50 @@ def prepare_table( self.to_sql_type(property_def), ) + self.prepare_primary_key( + full_table_name=full_table_name, + primary_keys=primary_keys, + ) + + def prepare_primary_key( + self, + *, + full_table_name: str | FullyQualifiedName, + primary_keys: t.Sequence[str], + ) -> None: + """Adapt target table primary key to provided schema if possible. + + Args: + full_table_name: the target table name. + primary_keys: list of key properties. + """ + _, schema_name, table_name = self.parse_full_table_name(full_table_name) + meta = sa.MetaData(schema=schema_name) + meta.reflect(bind=self._engine, only=[table_name]) + table = meta.tables[f"{schema_name}.{table_name}"] + current_pk_cols = [col.name for col in table.primary_key.columns] + + # Nothing to do + if current_pk_cols == primary_keys: + return + + new_pk = sa.PrimaryKeyConstraint(*primary_keys) + + # If table has no primary key, add the provided one + if not current_pk_cols: + with self._connect() as conn, conn.begin(): + conn.execute(sa.schema.AddConstraint(new_pk)) + return + + # Drop the existing primary key + with self._connect() as conn, conn.begin(): + conn.execute(sa.schema.DropConstraint(table.primary_key)) + + # Add the new primary key + if primary_keys: + with self._connect() as conn, conn.begin(): + conn.execute(sa.schema.AddConstraint(new_pk)) + def prepare_column( self, full_table_name: str | FullyQualifiedName, From 900cd576e659deec9474ab32b7e36f278dbc6886 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Mon, 21 Oct 2024 12:06:33 -0600 Subject: [PATCH 2/5] Use `full_table_name` to get table object --- singer_sdk/connectors/sql.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/singer_sdk/connectors/sql.py b/singer_sdk/connectors/sql.py index b980e4104..26ecde9ab 100644 --- a/singer_sdk/connectors/sql.py +++ b/singer_sdk/connectors/sql.py @@ -1002,7 +1002,7 @@ def prepare_primary_key( _, schema_name, table_name = self.parse_full_table_name(full_table_name) meta = sa.MetaData(schema=schema_name) meta.reflect(bind=self._engine, only=[table_name]) - table = meta.tables[f"{schema_name}.{table_name}"] + table = meta.tables[full_table_name] current_pk_cols = [col.name for col in table.primary_key.columns] # Nothing to do From 42d8fd003a1c8ceffcd184c1c2be4636afda1771 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Mon, 21 Oct 2024 12:08:46 -0600 Subject: [PATCH 3/5] mypy is slightly wrong --- singer_sdk/connectors/sql.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/singer_sdk/connectors/sql.py b/singer_sdk/connectors/sql.py index 26ecde9ab..a606584e3 100644 --- a/singer_sdk/connectors/sql.py +++ b/singer_sdk/connectors/sql.py @@ -1002,7 +1002,7 @@ def prepare_primary_key( _, schema_name, table_name = self.parse_full_table_name(full_table_name) meta = sa.MetaData(schema=schema_name) meta.reflect(bind=self._engine, only=[table_name]) - table = meta.tables[full_table_name] + table = meta.tables[full_table_name] # type: ignore[index] current_pk_cols = [col.name for col in table.primary_key.columns] # Nothing to do From 4dab2f3fd1d9542f472bf399bf5c20fd3f73f6de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Mon, 21 Oct 2024 12:25:29 -0600 Subject: [PATCH 4/5] Add built-in test --- singer_sdk/testing/suites.py | 2 ++ singer_sdk/testing/target_test_streams/pk_updates.singer | 4 ++++ singer_sdk/testing/target_tests.py | 6 ++++++ 3 files changed, 12 insertions(+) create mode 100644 singer_sdk/testing/target_test_streams/pk_updates.singer diff --git a/singer_sdk/testing/suites.py b/singer_sdk/testing/suites.py index fd53e6e19..a61268759 100644 --- a/singer_sdk/testing/suites.py +++ b/singer_sdk/testing/suites.py @@ -36,6 +36,7 @@ TargetInvalidSchemaTest, TargetNoPrimaryKeys, TargetOptionalAttributes, + TargetPrimaryKeyUpdates, TargetRecordBeforeSchemaTest, TargetRecordMissingKeyProperty, TargetRecordMissingOptionalFields, @@ -104,6 +105,7 @@ class TestSuite(t.Generic[T]): # TargetMultipleStateMessages, TargetNoPrimaryKeys, TargetOptionalAttributes, + TargetPrimaryKeyUpdates, TargetRecordBeforeSchemaTest, TargetRecordMissingKeyProperty, TargetRecordMissingOptionalFields, diff --git a/singer_sdk/testing/target_test_streams/pk_updates.singer b/singer_sdk/testing/target_test_streams/pk_updates.singer new file mode 100644 index 000000000..c9b920599 --- /dev/null +++ b/singer_sdk/testing/target_test_streams/pk_updates.singer @@ -0,0 +1,4 @@ +{"type": "SCHEMA", "stream": "example_stream", "schema": {"properties": {"id": {"type": "integer"}, "name": {"type": "string"}, "email": {"type": "string"}}, "key_properties": ["id"]}} +{"type": "RECORD", "stream": "example_stream", "record": {"id": 1, "name": "Alice", "email": "alice@example.com"}} +{"type": "SCHEMA", "stream": "example_stream", "schema": {"properties": {"id": {"type": "integer"}, "name": {"type": "string"}, "email": {"type": "string"}}, "key_properties": ["email"]}} +{"type": "RECORD", "stream": "example_stream", "record": {"id": 2, "name": "Bob", "email": "bob@example.com"}} diff --git a/singer_sdk/testing/target_tests.py b/singer_sdk/testing/target_tests.py index 96e0b0d59..c9d5b94de 100644 --- a/singer_sdk/testing/target_tests.py +++ b/singer_sdk/testing/target_tests.py @@ -135,6 +135,12 @@ class TargetSchemaUpdates(TargetFileTestTemplate): name = "schema_updates" +class TargetPrimaryKeyUpdates(TargetFileTestTemplate): + """Test Target handles Primary Key updates.""" + + name = "pk_updates" + + class TargetSpecialCharsInAttributes(TargetFileTestTemplate): """Test Target handles special chars in attributes.""" From ca5a44ab4d84c9315fd4f8d4ae4f1e37ec99550c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Tue, 22 Oct 2024 11:33:09 -0600 Subject: [PATCH 5/5] Remove default implementation --- singer_sdk/connectors/sql.py | 34 ++++++-------------------------- tests/core/test_connector_sql.py | 8 ++------ 2 files changed, 8 insertions(+), 34 deletions(-) diff --git a/singer_sdk/connectors/sql.py b/singer_sdk/connectors/sql.py index a606584e3..98e6ea7d6 100644 --- a/singer_sdk/connectors/sql.py +++ b/singer_sdk/connectors/sql.py @@ -990,41 +990,19 @@ def prepare_table( def prepare_primary_key( self, *, - full_table_name: str | FullyQualifiedName, - primary_keys: t.Sequence[str], + full_table_name: str | FullyQualifiedName, # noqa: ARG002 + primary_keys: t.Sequence[str], # noqa: ARG002 ) -> None: """Adapt target table primary key to provided schema if possible. + Implement this method in a subclass to adapt the primary key of the target table + to the provided one if possible. + Args: full_table_name: the target table name. primary_keys: list of key properties. """ - _, schema_name, table_name = self.parse_full_table_name(full_table_name) - meta = sa.MetaData(schema=schema_name) - meta.reflect(bind=self._engine, only=[table_name]) - table = meta.tables[full_table_name] # type: ignore[index] - current_pk_cols = [col.name for col in table.primary_key.columns] - - # Nothing to do - if current_pk_cols == primary_keys: - return - - new_pk = sa.PrimaryKeyConstraint(*primary_keys) - - # If table has no primary key, add the provided one - if not current_pk_cols: - with self._connect() as conn, conn.begin(): - conn.execute(sa.schema.AddConstraint(new_pk)) - return - - # Drop the existing primary key - with self._connect() as conn, conn.begin(): - conn.execute(sa.schema.DropConstraint(table.primary_key)) - - # Add the new primary key - if primary_keys: - with self._connect() as conn, conn.begin(): - conn.execute(sa.schema.AddConstraint(new_pk)) + self.logger.debug("Primary key adaptation is not implemented") def prepare_column( self, diff --git a/tests/core/test_connector_sql.py b/tests/core/test_connector_sql.py index f76f30525..94bce926e 100644 --- a/tests/core/test_connector_sql.py +++ b/tests/core/test_connector_sql.py @@ -321,9 +321,7 @@ def test_column_rename(self, connector: DuckDBConnector): connector.rename_column("test_table", "old_name", "new_name") with engine.connect() as conn: - result = conn.execute( - sa.text("SELECT * FROM test_table"), - ) + result = conn.execute(sa.text("SELECT * FROM test_table")) assert result.keys() == ["id", "new_name"] def test_adapt_column_type(self, connector: DuckDBConnector): @@ -341,9 +339,7 @@ def test_adapt_column_type(self, connector: DuckDBConnector): connector._adapt_column_type("test_table", "name", sa.types.String()) with engine.connect() as conn: - result = conn.execute( - sa.text("SELECT * FROM test_table"), - ) + result = conn.execute(sa.text("SELECT * FROM test_table")) assert result.keys() == ["id", "name"] assert result.cursor.description[1][1] == "STRING"