Skip to content

Commit

Permalink
use change_seq for fact load
Browse files Browse the repository at this point in the history
  • Loading branch information
rymarczy committed Aug 28, 2024
1 parent 37b86fd commit 693d4b6
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 7 deletions.
5 changes: 3 additions & 2 deletions src/cubic_loader/qlik/ods_qlik.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ def rds_snapshot_load(self) -> None:
f"UPDATE {ODS_SCHEMA}.{self.db_load_table} "
f"SET header__timestamp=to_timestamp('{self.last_s3_snapshot_dfm.ts}','YYYYMMDDTHHMISSZ') "
f", header__change_oper='L' "
f", header__change_seq='SNAPSHOT_LOAD'"
f", header__change_seq=rpad(regexp_replace('{self.last_s3_snapshot_dfm.ts}','\\D','','g'),35,'0')::numeric "
f"WHERE header__timestamp IS NULL;"
)
self.db.execute(load_update)
Expand Down Expand Up @@ -334,7 +334,7 @@ def rds_fact_table_load(self) -> None:
for column in ["header__change_oper"] + table_columns:
if column in key_columns:
continue
q = f"(array_remove(array_agg({column} ORDER BY header__timestamp DESC), NULL))[1] as {column}"
q = f"(array_remove(array_agg({column} ORDER BY header__change_seq DESC), NULL))[1] as {column}"
first_vals.append(q)

fact_query = (
Expand All @@ -344,6 +344,7 @@ def rds_fact_table_load(self) -> None:
f" SELECT {key_str}"
f" , {','.join(first_vals)}"
f" FROM {history_table}"
f" WHERE header__change_oper <> 'B'"
f" GROUP BY {key_str}"
f" ) t_load"
f" WHERE t_load.header__change_oper <> 'D'"
Expand Down
6 changes: 4 additions & 2 deletions src/cubic_loader/qlik/rds_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ def qlik_type_to_pg(qlik_type: str, scale: int) -> str:

if qlik_type == "CHANGE_OPER":
return_type = "CHAR(1)"
elif qlik_type == "CHANGE_SEQ":
return_type = "NUMERIC(35,0)"
elif "INT1" in qlik_type:
return_type = "SMALLINT"
elif "INT2" in qlik_type:
Expand Down Expand Up @@ -78,7 +80,7 @@ def create_tables_from_schema(schema: List[DFMSchemaFields], table_name: str) ->
header_fields = (
("header__timestamp", "DATETIME"),
("header__change_oper", "CHANGE_OPER"),
("header__change_seq", "STRING"),
("header__change_seq", "CHANGE_SEQ"),
)
header_cols: List[str] = [f"{col[0]} {qlik_type_to_pg(col[1], 0)}" for col in header_fields]
history_keys = dfm_keys + [ix[0] for ix in header_fields]
Expand All @@ -95,7 +97,7 @@ def create_tables_from_schema(schema: List[DFMSchemaFields], table_name: str) ->
ops.append(f"CREATE TABLE IF NOT EXISTS {ODS_SCHEMA}.{table_name}_load ({",".join(load_columns)});")

# Create INDEX on HISTORY Table that will be used for creating FACT table
index_columns = dfm_keys + ["header__timestamp DESC", "header__change_oper"]
index_columns = ["header__change_oper"] + dfm_keys + ["header__change_seq DESC"]
ops.append(
f"CREATE INDEX IF NOT EXISTS {table_name}_to_fact_idx on {ODS_SCHEMA}.{table_name}_history "
f"({','.join(index_columns)});"
Expand Down
5 changes: 2 additions & 3 deletions src/cubic_loader/utils/postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,14 +440,13 @@ def remote_csv_gz_copy(obj_path: str, destination_table: str, column_str: Option
f"{copy_command}",
]

run_psql_subprocess(psql, copy_log)
run_psql_subprocess(psql, copy_log, max_retries=0)


def run_psql_subprocess(psql_cmd: List[str], logger: ProcessLogger) -> None:
def run_psql_subprocess(psql_cmd: List[str], logger: ProcessLogger, max_retries: int = 2) -> None:
"""
run psql command with retry logic
"""
max_retries = 2
logger.add_metadata(max_retries=max_retries)

for retry_attempts in range(max_retries + 1):
Expand Down

0 comments on commit 693d4b6

Please sign in to comment.