Skip to content

Commit

Permalink
FIX: Handle NULL values in Key columns for Update operation (#36)
Browse files Browse the repository at this point in the history
This change joins Primary Key columns using IS NOT DISTINCT FROM instead of = to allow for the JOINING of NULL to NULL values in Key columns.

This change also contains a migration that deletes the ELT Status files for all tables loaded since PR #34 so that those tables will be re-loaded with the new loading process.
  • Loading branch information
rymarczy authored Jan 8, 2025
1 parent 0d31ce2 commit 3c224db
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""reset qlik tables with possible null primary key values
Revision ID: 96837b10c106
Revises: 8638e949eea1
Create Date: 2021-01-07 19:42:12.287594
"""

import os
from typing import Sequence, Union

from cubic_loader.utils.aws import s3_delete_object
from cubic_loader.utils.remote_locations import ODS_STATUS

# revision identifiers, used by Alembic.
revision: str = "96837b10c106"
down_revision: Union[str, None] = "8638e949eea1"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# Delete status files for all tables loaded since the NOT NULL Primary Key requirement was dropped.
# This will force a reset/re-load for these tables in dmap-import DB
s3_delete_object(os.path.join(ODS_STATUS, "EDW.PAYMENT_SUMMARY.json"))
s3_delete_object(os.path.join(ODS_STATUS, "EDW.MEMBER_DIMENSION.json"))


def downgrade() -> None:
# Nothing to downgrade
pass
4 changes: 2 additions & 2 deletions src/cubic_loader/qlik/rds_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def bulk_delete_from_temp(schema_and_table: str, key_columns: List[str]) -> str:
create query to DELETE records from table based on key columns
"""
tmp_table = f"{schema_and_table}_load"
where_clause = " AND ".join([f"{schema_and_table}.{t}={tmp_table}.{t}" for t in key_columns])
where_clause = " AND ".join([f"{schema_and_table}.{t} IS NOT DISTINCT FROM {tmp_table}.{t}" for t in key_columns])
delete_query = f"DELETE FROM {schema_and_table} " f"USING {tmp_table} " f"WHERE {where_clause};"

return delete_query
Expand All @@ -197,7 +197,7 @@ def bulk_update_from_temp(schema_and_table: str, update_column: str, key_columns
create query to UPDATE records from table based on key columns
"""
tmp_table = f"{schema_and_table}_load"
where_clause = " AND ".join([f"{schema_and_table}.{t}={tmp_table}.{t}" for t in key_columns])
where_clause = " AND ".join([f"{schema_and_table}.{t} IS NOT DISTINCT FROM {tmp_table}.{t}" for t in key_columns])
update_query = (
f"UPDATE {schema_and_table} SET {update_column}={tmp_table}.{update_column} "
f"FROM {tmp_table} WHERE {where_clause};"
Expand Down

0 comments on commit 3c224db

Please sign in to comment.