diff --git a/src/cubic_loader/qlik/ods_qlik.py b/src/cubic_loader/qlik/ods_qlik.py index d026b84..edf65c5 100644 --- a/src/cubic_loader/qlik/ods_qlik.py +++ b/src/cubic_loader/qlik/ods_qlik.py @@ -257,7 +257,7 @@ def rds_snapshot_load(self) -> None: f"UPDATE {ODS_SCHEMA}.{self.db_load_table} " f"SET header__timestamp=to_timestamp('{self.last_s3_snapshot_dfm.ts}','YYYYMMDDTHHMISSZ') " f", header__change_oper='L' " - f", header__change_seq='SNAPSHOT_LOAD'" + f", header__change_seq=rpad(regexp_replace('{self.last_s3_snapshot_dfm.ts}','\\D','','g'),35,'0')::numeric " f"WHERE header__timestamp IS NULL;" ) self.db.execute(load_update) @@ -334,7 +334,7 @@ def rds_fact_table_load(self) -> None: for column in ["header__change_oper"] + table_columns: if column in key_columns: continue - q = f"(array_remove(array_agg({column} ORDER BY header__timestamp DESC), NULL))[1] as {column}" + q = f"(array_remove(array_agg({column} ORDER BY header__change_seq DESC), NULL))[1] as {column}" first_vals.append(q) fact_query = ( @@ -344,6 +344,7 @@ def rds_fact_table_load(self) -> None: f" SELECT {key_str}" f" , {','.join(first_vals)}" f" FROM {history_table}" + f" WHERE header__change_oper <> 'B'" f" GROUP BY {key_str}" f" ) t_load" f" WHERE t_load.header__change_oper <> 'D'" diff --git a/src/cubic_loader/qlik/rds_utils.py b/src/cubic_loader/qlik/rds_utils.py index 9c15fa7..fdcf7cb 100644 --- a/src/cubic_loader/qlik/rds_utils.py +++ b/src/cubic_loader/qlik/rds_utils.py @@ -22,6 +22,8 @@ def qlik_type_to_pg(qlik_type: str, scale: int) -> str: if qlik_type == "CHANGE_OPER": return_type = "CHAR(1)" + elif qlik_type == "CHANGE_SEQ": + return_type = "NUMERIC(35,0)" elif "INT1" in qlik_type: return_type = "SMALLINT" elif "INT2" in qlik_type: @@ -78,7 +80,7 @@ def create_tables_from_schema(schema: List[DFMSchemaFields], table_name: str) -> header_fields = ( ("header__timestamp", "DATETIME"), ("header__change_oper", "CHANGE_OPER"), - ("header__change_seq", "STRING"), + ("header__change_seq", "CHANGE_SEQ"), ) header_cols: List[str] = [f"{col[0]} {qlik_type_to_pg(col[1], 0)}" for col in header_fields] history_keys = dfm_keys + [ix[0] for ix in header_fields] @@ -95,7 +97,7 @@ def create_tables_from_schema(schema: List[DFMSchemaFields], table_name: str) -> ops.append(f"CREATE TABLE IF NOT EXISTS {ODS_SCHEMA}.{table_name}_load ({",".join(load_columns)});") # Create INDEX on HISTORY Table that will be used for creating FACT table - index_columns = dfm_keys + ["header__timestamp DESC", "header__change_oper"] + index_columns = ["header__change_oper"] + dfm_keys + ["header__change_seq DESC"] ops.append( f"CREATE INDEX IF NOT EXISTS {table_name}_to_fact_idx on {ODS_SCHEMA}.{table_name}_history " f"({','.join(index_columns)});" diff --git a/src/cubic_loader/utils/postgres.py b/src/cubic_loader/utils/postgres.py index c5ad1ef..950717e 100644 --- a/src/cubic_loader/utils/postgres.py +++ b/src/cubic_loader/utils/postgres.py @@ -440,14 +440,13 @@ def remote_csv_gz_copy(obj_path: str, destination_table: str, column_str: Option f"{copy_command}", ] - run_psql_subprocess(psql, copy_log) + run_psql_subprocess(psql, copy_log, max_retries=0) -def run_psql_subprocess(psql_cmd: List[str], logger: ProcessLogger) -> None: +def run_psql_subprocess(psql_cmd: List[str], logger: ProcessLogger, max_retries: int = 2) -> None: """ run psql command with retry logic """ - max_retries = 2 logger.add_metadata(max_retries=max_retries) for retry_attempts in range(max_retries + 1):