Skip to content

Commit

Permalink
FIX: Drop Tables for migration / Dynamic JOIN operation (#37)
Browse files Browse the repository at this point in the history
- Explicitly drop these tables when deleting the status files to force full table reset.
- Dynamically select join operation (= or IS NOT DISTINCT FROM) in update and delete operations based on presence of NULL values in key columns.
  • Loading branch information
rymarczy authored Jan 9, 2025
1 parent 3c224db commit 77abc0c
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import os
from typing import Sequence, Union

from alembic import op

from cubic_loader.utils.aws import s3_delete_object
from cubic_loader.utils.remote_locations import ODS_STATUS

Expand All @@ -24,6 +26,10 @@ def upgrade() -> None:
# This will force a reset/re-load for these tables in dmap-import DB
s3_delete_object(os.path.join(ODS_STATUS, "EDW.PAYMENT_SUMMARY.json"))
s3_delete_object(os.path.join(ODS_STATUS, "EDW.MEMBER_DIMENSION.json"))
op.drop_table("edw_payment_summary", schema="ods")
op.drop_table("edw_payment_summary_history", schema="ods")
op.drop_table("edw_member_dimension", schema="ods")
op.drop_table("edw_member_dimension_history", schema="ods")


def downgrade() -> None:
Expand Down
65 changes: 32 additions & 33 deletions src/cubic_loader/qlik/ods_qlik.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from cubic_loader.qlik.rds_utils import bulk_delete_from_temp
from cubic_loader.qlik.rds_utils import bulk_update_from_temp
from cubic_loader.qlik.rds_utils import bulk_insert_from_temp
from cubic_loader.qlik.utils import key_column_join_type
from cubic_loader.qlik.utils import DFMDetails
from cubic_loader.qlik.utils import DFMSchemaFields
from cubic_loader.qlik.utils import re_get_first
Expand Down Expand Up @@ -272,40 +273,41 @@ def cdc_verify_schema(self, dfm_object: str) -> None:
current_schema.append(column)
self.update_status(last_schema=current_schema)

def cdc_update(self, cdc_df: pl.DataFrame, update_col: str, key_columns: List[str]) -> None:
def cdc_update(self, cdc_df: pl.DataFrame, tmp_table: str, key_columns: List[str]) -> None:
"""
Perform UPDATE from cdc dataframe
"""
tmp_table = f"{self.db_fact_table}_load"
update_q = bulk_update_from_temp(self.db_fact_table, update_col, key_columns)
# Perform UPDATE Operations on fact table for each column indivduallly
for update_col in cdc_df.columns:
if update_col in key_columns or update_col in CDC_COLUMNS:
continue

update_df = (
cdc_df.filter(
pl.col("header__change_oper").eq("U"),
pl.col(update_col).is_not_null(),
update_df = (
cdc_df.filter(
pl.col("header__change_oper").eq("U"),
pl.col(update_col).is_not_null(),
)
.sort(by="header__change_seq", descending=True)
.unique(key_columns, keep="first")
.select(key_columns + [update_col])
)
.sort(by="header__change_seq", descending=True)
.unique(key_columns, keep="first")
.select(key_columns + [update_col])
)
if update_df.shape[0] == 0:
return
if update_df.shape[0] == 0:
continue

with tempfile.TemporaryDirectory() as tmp_dir:
update_csv_path = os.path.join(tmp_dir, "update.csv")
update_df.write_csv(update_csv_path, quote_style="necessary")
self.db.truncate_table(tmp_table)
remote_csv_gz_copy(update_csv_path, tmp_table)
with tempfile.TemporaryDirectory() as tmp_dir:
update_csv_path = os.path.join(tmp_dir, "update.csv")
update_df.write_csv(update_csv_path, quote_style="necessary")
self.db.truncate_table(tmp_table)
remote_csv_gz_copy(update_csv_path, tmp_table)

self.db.execute(update_q)
op_and_key = key_column_join_type(update_df, key_columns)
update_q = bulk_update_from_temp(self.db_fact_table, update_col, op_and_key)
self.db.execute(update_q)

def cdc_delete(self, cdc_df: pl.DataFrame, key_columns: List[str]) -> None:
def cdc_delete(self, cdc_df: pl.DataFrame, tmp_table: str, key_columns: List[str]) -> None:
"""
Perform DELETE from cdc dataframe
"""
tmp_table = f"{self.db_fact_table}_load"
delete_q = bulk_delete_from_temp(self.db_fact_table, key_columns)

delete_df = (
cdc_df.sort(by="header__change_seq", descending=True)
.unique(key_columns, keep="first")
Expand All @@ -315,19 +317,20 @@ def cdc_delete(self, cdc_df: pl.DataFrame, key_columns: List[str]) -> None:
if delete_df.shape[0] == 0:
return

op_and_key = key_column_join_type(delete_df, key_columns)
delete_q = bulk_delete_from_temp(self.db_fact_table, op_and_key)

with tempfile.TemporaryDirectory() as tmp_dir:
delete_csv_path = os.path.join(tmp_dir, "delete.csv")
delete_df.write_csv(delete_csv_path, quote_style="necessary")
self.db.truncate_table(tmp_table)
remote_csv_gz_copy(delete_csv_path, tmp_table)
self.db.execute(delete_q)

def cdc_insert(self, cdc_df: pl.DataFrame) -> None:
def cdc_insert(self, cdc_df: pl.DataFrame, tmp_table: str) -> None:
"""
Perform INSERT from cdc dataframe
"""
tmp_table = f"{self.db_fact_table}_load"

insert_df = cdc_df.filter(pl.col("header__change_oper").eq("I")).drop(CDC_COLUMNS)
if insert_df.shape[0] == 0:
return
Expand Down Expand Up @@ -370,15 +373,11 @@ def cdc_load_folder(self, load_folder: str) -> None:
remote_csv_gz_copy(merge_csv, load_table)
self.db.execute(bulk_insert_from_temp(self.db_history_table, load_table, cdc_df.columns))

self.cdc_insert(cdc_df)
self.cdc_insert(cdc_df, load_table)

# Perform UPDATE Operations on fact table for each column indivduallly
for update_col in cdc_df.columns:
if update_col in key_columns or update_col in CDC_COLUMNS:
continue
self.cdc_update(cdc_df, update_col, key_columns)
self.cdc_update(cdc_df, load_table, key_columns)

self.cdc_delete(cdc_df, key_columns)
self.cdc_delete(cdc_df, load_table, key_columns)

self.update_status(last_cdc_ts=max(cdc_ts, self.etl_status.last_cdc_ts))
logger.log_complete()
Expand Down
11 changes: 6 additions & 5 deletions src/cubic_loader/qlik/rds_utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import List
from typing import Optional
from typing import Tuple
from datetime import date
from datetime import datetime
from dateutil.relativedelta import relativedelta
Expand Down Expand Up @@ -181,23 +182,23 @@ def add_columns_to_table(new_columns: List[DFMSchemaFields], schema_and_table: s
return " ".join(alter_strings)


def bulk_delete_from_temp(schema_and_table: str, key_columns: List[str]) -> str:
def bulk_delete_from_temp(schema_and_table: str, op_and_keys: List[Tuple[str, str]]) -> str:
"""
create query to DELETE records from table based on key columns
"""
tmp_table = f"{schema_and_table}_load"
where_clause = " AND ".join([f"{schema_and_table}.{t} IS NOT DISTINCT FROM {tmp_table}.{t}" for t in key_columns])
delete_query = f"DELETE FROM {schema_and_table} " f"USING {tmp_table} " f"WHERE {where_clause};"
where_clause = " AND ".join([f"{schema_and_table}.{t} {op} {tmp_table}.{t}" for op, t in op_and_keys])
delete_query = f"DELETE FROM {schema_and_table} USING {tmp_table} WHERE {where_clause};"

return delete_query


def bulk_update_from_temp(schema_and_table: str, update_column: str, key_columns: List[str]) -> str:
def bulk_update_from_temp(schema_and_table: str, update_column: str, op_and_keys: List[Tuple[str, str]]) -> str:
"""
create query to UPDATE records from table based on key columns
"""
tmp_table = f"{schema_and_table}_load"
where_clause = " AND ".join([f"{schema_and_table}.{t} IS NOT DISTINCT FROM {tmp_table}.{t}" for t in key_columns])
where_clause = " AND ".join([f"{schema_and_table}.{t} {op} {tmp_table}.{t}" for op, t in op_and_keys])
update_query = (
f"UPDATE {schema_and_table} SET {update_column}={tmp_table}.{update_column} "
f"FROM {tmp_table} WHERE {where_clause};"
Expand Down
14 changes: 14 additions & 0 deletions src/cubic_loader/qlik/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import NamedTuple
from typing import TypedDict
from typing import List
from typing import Tuple

import polars as pl

Expand Down Expand Up @@ -267,3 +268,16 @@ def dataframe_from_merged_csv(csv_path: str, dfm_path: str) -> pl.DataFrame:
schema = polars_schema_from_dfm(dfm_path)
df = pl.read_csv(csv_path, schema=schema).filter(pl.col("header__change_oper").ne("B"))
return df


def key_column_join_type(df: pl.DataFrame, key_columns: List[str]) -> List[Tuple[str, str]]:
"""
Check for NULL counts in key_columns to determine if `=` or `IS NOT DISTINCT FROM` can be used
"""
return_list = []
for column in key_columns:
if df.get_column(column).null_count() > 0:
return_list.append(("IS NOT DISTINCT FROM", column))
else:
return_list.append(("=", column))
return return_list

0 comments on commit 77abc0c

Please sign in to comment.