From 06f54d078da3bf65a81f2e68110eeb4896640e04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Edgar=20Ram=C3=ADrez-Mondrag=C3=B3n?= Date: Mon, 21 Oct 2024 12:02:03 -0600 Subject: [PATCH] feat: Added `SQLConnector.prepare_primary_key` --- singer_sdk/connectors/sql.py | 44 ++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/singer_sdk/connectors/sql.py b/singer_sdk/connectors/sql.py index 87eb1eb98..b980e4104 100644 --- a/singer_sdk/connectors/sql.py +++ b/singer_sdk/connectors/sql.py @@ -982,6 +982,50 @@ def prepare_table( self.to_sql_type(property_def), ) + self.prepare_primary_key( + full_table_name=full_table_name, + primary_keys=primary_keys, + ) + + def prepare_primary_key( + self, + *, + full_table_name: str | FullyQualifiedName, + primary_keys: t.Sequence[str], + ) -> None: + """Adapt target table primary key to provided schema if possible. + + Args: + full_table_name: the target table name. + primary_keys: list of key properties. + """ + _, schema_name, table_name = self.parse_full_table_name(full_table_name) + meta = sa.MetaData(schema=schema_name) + meta.reflect(bind=self._engine, only=[table_name]) + table = meta.tables[f"{schema_name}.{table_name}"] + current_pk_cols = [col.name for col in table.primary_key.columns] + + # Nothing to do + if current_pk_cols == primary_keys: + return + + new_pk = sa.PrimaryKeyConstraint(*primary_keys) + + # If table has no primary key, add the provided one + if not current_pk_cols: + with self._connect() as conn, conn.begin(): + conn.execute(sa.schema.AddConstraint(new_pk)) + return + + # Drop the existing primary key + with self._connect() as conn, conn.begin(): + conn.execute(sa.schema.DropConstraint(table.primary_key)) + + # Add the new primary key + if primary_keys: + with self._connect() as conn, conn.begin(): + conn.execute(sa.schema.AddConstraint(new_pk)) + def prepare_column( self, full_table_name: str | FullyQualifiedName,