diff --git a/src/cubic_loader/qlik/ods_qlik.py b/src/cubic_loader/qlik/ods_qlik.py index dca5479..5bd0c76 100644 --- a/src/cubic_loader/qlik/ods_qlik.py +++ b/src/cubic_loader/qlik/ods_qlik.py @@ -1,9 +1,12 @@ import os import json +import hashlib +import shutil +import tempfile +from itertools import batched from typing import List from typing import Tuple from typing import Optional -from typing import Set from tempfile import NamedTemporaryFile from operator import attrgetter from concurrent.futures import ThreadPoolExecutor @@ -16,6 +19,7 @@ from cubic_loader.utils.aws import s3_split_object_path from cubic_loader.utils.aws import s3_upload_file from cubic_loader.utils.aws import s3_delete_object +from cubic_loader.utils.aws import s3_download_object from cubic_loader.utils.remote_locations import S3_ARCHIVE from cubic_loader.utils.remote_locations import S3_ERROR from cubic_loader.utils.remote_locations import QLIK @@ -23,66 +27,32 @@ from cubic_loader.utils.remote_locations import ODS_SCHEMA from cubic_loader.utils.postgres import DatabaseManager from cubic_loader.utils.postgres import remote_csv_gz_copy +from cubic_loader.utils.postgres import header_from_csv_gz from cubic_loader.qlik.rds_utils import create_tables_from_schema from cubic_loader.qlik.rds_utils import create_history_table_partitions from cubic_loader.qlik.rds_utils import add_columns_to_table from cubic_loader.qlik.rds_utils import drop_table +from cubic_loader.qlik.rds_utils import bulk_delete_from_temp +from cubic_loader.qlik.rds_utils import bulk_update_from_temp +from cubic_loader.qlik.rds_utils import bulk_insert_from_temp from cubic_loader.qlik.utils import DFMDetails from cubic_loader.qlik.utils import DFMSchemaFields from cubic_loader.qlik.utils import re_get_first from cubic_loader.qlik.utils import RE_SNAPSHOT_TS +from cubic_loader.qlik.utils import CDC_COLUMNS +from cubic_loader.qlik.utils import MERGED_FNAME from cubic_loader.qlik.utils import RE_CDC_TS from cubic_loader.qlik.utils import TableStatus from cubic_loader.qlik.utils import threading_cpu_count +from cubic_loader.qlik.utils import merge_cdc_csv_gz_files +from cubic_loader.qlik.utils import dfm_schema_to_json +from cubic_loader.qlik.utils import status_schema_to_df +from cubic_loader.qlik.utils import dfm_schema_to_df +from cubic_loader.qlik.utils import dataframe_from_merged_csv +from cubic_loader.qlik.utils import s3_list_cdc_gz_objects from cubic_loader.utils.logger import ProcessLogger -DFM_COLUMN_SCHEMA = pl.Schema( - { - "name": pl.String(), - "type": pl.String(), - "length": pl.Int64(), - "precision": pl.Int64(), - "scale": pl.Int64(), - "primaryKeyPos": pl.Int64(), - } -) -CDC_COLUMNS = ( - "header__change_seq", - "header__change_oper", - "header__timestamp", -) - - -def dfm_schema_to_json(dfm_file: DFMDetails) -> List[DFMSchemaFields]: - """ - extract table schema from .dfm as json - """ - dfm_json = json.load(s3_get_object(dfm_file.path)) - return dfm_json["dataInfo"]["columns"] - - -def dfm_schema_to_df(dfm_file: DFMDetails) -> pl.DataFrame: - """ - extract table schema from .dfm and convert to Polars Dataframe - """ - json_schema = dfm_schema_to_json(dfm_file) - return pl.DataFrame( - json_schema, - schema=DFM_COLUMN_SCHEMA, - ) - - -def status_schema_to_df(status: TableStatus) -> pl.DataFrame: - """ - extract table schema from TableStatus and convert to Polars Dataframe - """ - return pl.DataFrame( - status.last_schema, - schema=DFM_COLUMN_SCHEMA, - ) - - def get_snapshot_dfms(table: str) -> List[DFMDetails]: """find all available snapshot dfm files for a qlik table from Archive bucket""" prefix = os.path.join(QLIK, f"{table}/") @@ -97,77 +67,53 @@ def get_snapshot_dfms(table: str) -> List[DFMDetails]: return sorted(found_snapshots, key=attrgetter("ts")) -def get_cdc_dfms(etl_status: TableStatus, table: str) -> List[DFMDetails]: +def get_cdc_gz_csvs(etl_status: TableStatus, table: str) -> List[str]: """ - find all available CDC dfm files for a Snapshot from Archive and Error buckets + find all available CDC csv.gz files for a Snapshot from Archive and Error buckets + :param etl_status: status of ETL operation + :param table: CUBIC Table Name :return: List of ChangeFile objects sorted by 'ts' (Ascending) """ - prefix = os.path.join(QLIK, f"{table}__ct/") + table_prefix = os.path.join(QLIK, f"{table}__ct/") + snapshot_prefix = f"{table_prefix}snapshot={etl_status.current_snapshot_ts}/" - # add archive files from snapshot folder - cdc_dfms: List[DFMDetails] = [] - for dfm_file in s3_list_objects( - S3_ARCHIVE, f"{prefix}snapshot={etl_status.current_snapshot_ts}/", in_filter=".dfm" - ): - cdc_dfms.append(DFMDetails(path=dfm_file, ts=re_get_first(dfm_file, RE_CDC_TS))) + cdc_csvs = s3_list_cdc_gz_objects(S3_ARCHIVE, snapshot_prefix, min_ts=etl_status.last_cdc_ts) # filter error files from table folder - for dfm_file in s3_list_objects(S3_ERROR, prefix, in_filter=".dfm"): - if re_get_first(dfm_file, RE_SNAPSHOT_TS) > etl_status.current_snapshot_ts: - cdc_dfms.append(DFMDetails(path=dfm_file, ts=re_get_first(dfm_file, RE_CDC_TS))) + for csv_file in s3_list_cdc_gz_objects(S3_ERROR, table, min_ts=etl_status.last_cdc_ts): + if re_get_first(csv_file, RE_SNAPSHOT_TS) > etl_status.current_snapshot_ts: + cdc_csvs.append(csv_file) - cdc_dfms = [dfm for dfm in cdc_dfms if dfm.ts > etl_status.last_cdc_ts] + return sorted(cdc_csvs, key=lambda l: re_get_first(l, RE_CDC_TS)) - return sorted(cdc_dfms, key=attrgetter("ts")) - -def thread_load_cdc_file(args: Tuple[DFMDetails, TableStatus]) -> Tuple[Optional[str], Optional[List[DFMSchemaFields]]]: - """ - work to load cdc file from S3 into RDS history table +def thread_save_csv_file(args: Tuple[str, str]) -> None: """ - dfm, status = args - logger = ProcessLogger("load_cdc_file", dfm=dfm.path, table=status.db_fact_table) - try: - dfm_schema = dfm_schema_to_df(dfm) - dfm_names = ",".join(dfm_schema.get_column("name")) - dfm_name_set = set(dfm_schema.get_column("name")) + work to download and partition cdc files - # check dfm schema contains CDC_COLUMNS - assert set(CDC_COLUMNS).issubset(dfm_name_set) - dfm_schema = dfm_schema.filter(pl.col("name").is_in(CDC_COLUMNS).not_()) - - truth_schema = status_schema_to_df(status) - truth_name_set = set(truth_schema.get_column("name")) - new_columns = dfm_schema.join( - truth_schema, - on=truth_schema.columns, - how="anti", - join_nulls=True, - ) + - download csv.gz file to tmp_folder + - encode header row as sha1 hash for foldername + - move file into hash foldername + """ + csv_object, tmp_dir = args + logger = ProcessLogger("download_cdc_file", csv_object=csv_object) - # new_columns found, can not load csv - if new_columns.shape[0] > 0: - # check if new_columns contains columns in truth schema, would - # indicate that a different dimension of the table changed (type, primary key) - new_truth_common = truth_name_set.intersection(set(new_columns.get_column("name"))) - assert len(new_truth_common) == 0, f"column dimension changed for {new_truth_common}" + try: + csv_local_file = csv_object.replace("s3://", "").replace("/", "|") + csv_local_path = os.path.join(tmp_dir, csv_local_file) + s3_download_object(csv_object, csv_local_path) - new_col_list: List[DFMSchemaFields] = new_columns.to_dicts() # type: ignore - logger.log_complete(new_columns_found=str(new_col_list)) - return (dfm.path, new_col_list) + csv_headers = header_from_csv_gz(csv_local_path) + hash_folder = os.path.join(tmp_dir, hashlib.sha1(csv_headers.encode("utf8")).hexdigest()) - csv_file = dfm.path.replace(".dfm", ".csv.gz") - table = f"{ODS_SCHEMA}.{status.db_fact_table}_history" - remote_csv_gz_copy(csv_file, table, dfm_names) + os.makedirs(hash_folder, exist_ok=True) + os.rename(csv_local_path, os.path.join(hash_folder, csv_local_file)) logger.log_complete() except Exception as exception: logger.log_failure(exception) - return (None, None) - - return (dfm.ts, None) # pylint: disable=too-many-instance-attributes @@ -178,16 +124,15 @@ class CubicODSQlik: used to load ODS data from S3 bucket to RDS tables """ - def __init__(self, table: str, db: DatabaseManager): + def __init__(self, table: str, db: DatabaseManager, schema: str = ODS_SCHEMA): """ :param table: Cubic ODS Table Name eg ("EDW.CARD_DIMENSION") """ - self.table = table self.db = db - self.status_path = os.path.join(ODS_STATUS, f"{self.table}.json") - self.db_fact_table = table.replace(".", "_").lower() + self.table = table + self.status_path = os.path.join(ODS_STATUS, f"{table}.json") + self.db_fact_table = f"{schema}.{table.replace(".", "_").lower()}" self.db_history_table = f"{self.db_fact_table}_history" - self.db_load_table = f"{self.db_fact_table}_load" self.s3_snapshot_dfms = get_snapshot_dfms(table) self.last_s3_snapshot_dfm = self.s3_snapshot_dfms[-1] self.etl_status = self.load_etl_status() @@ -231,7 +176,7 @@ def load_etl_status(self) -> TableStatus: db_fact_table=self.db_fact_table, current_snapshot_ts=self.last_s3_snapshot_dfm.ts, last_cdc_ts="", - last_schema=dfm_schema_to_json(self.last_s3_snapshot_dfm), + last_schema=dfm_schema_to_json(self.last_s3_snapshot_dfm.path), ) self.save_status(return_satus) @@ -246,26 +191,38 @@ def save_status(self, status: TableStatus) -> None: def rds_snapshot_load(self) -> None: """Perform load of initial load files to history table""" + # Create _history partitions to cover header__timestamp values of initial load self.db.execute(create_history_table_partitions(self.db_history_table, self.last_s3_snapshot_dfm.ts)) + # Load all csv.gz files from snapshot folder into _load table bucket, prefix = s3_split_object_path(self.last_s3_snapshot_dfm.path.rsplit("/", maxsplit=1)[0]) for s3_path in s3_list_objects(bucket, prefix, in_filter=".csv.gz"): - remote_csv_gz_copy(s3_path, f"{ODS_SCHEMA}.{self.db_load_table}") + remote_csv_gz_copy(s3_path, f"{self.db_fact_table}_load") + # update header__ columns in _load table load_update = ( - f"UPDATE {ODS_SCHEMA}.{self.db_load_table} " + f"UPDATE {self.db_fact_table}_load " f"SET header__timestamp=to_timestamp('{self.last_s3_snapshot_dfm.ts}','YYYYMMDDTHHMISSZ') " f", header__change_oper='L' " f", header__change_seq=rpad(regexp_replace('{self.last_s3_snapshot_dfm.ts}','\\D','','g'),35,'0')::numeric " f"WHERE header__timestamp IS NULL;" ) self.db.execute(load_update) - table_copy = ( - f"INSERT INTO {ODS_SCHEMA}.{self.db_history_table} " f"SELECT * FROM {ODS_SCHEMA}.{self.db_load_table};" + + # Load records from _load table into _history table + history_table_copy = f"INSERT INTO {self.db_history_table} SELECT * FROM {self.db_fact_table}_load;" + self.db.execute(history_table_copy) + self.db.vaccuum_analyze(f"{self.db_history_table}") + + # Load records from _load table into fact table + table_columns = [col["name"] for col in self.etl_status.last_schema] + table_column_str = ",".join(table_columns) + fact_table_copy = ( + f"INSERT INTO {self.db_fact_table} ({table_column_str}) " + f"SELECT {table_column_str} FROM {self.db_fact_table}_load;" ) - self.db.execute(table_copy) - self.db.truncate_table(f"{ODS_SCHEMA}.{self.db_load_table}") - self.db.vaccuum_analyze(f"{ODS_SCHEMA}.{self.db_history_table}") + self.db.execute(fact_table_copy) + self.db.vaccuum_analyze(f"{self.db_fact_table}") self.update_status( self.etl_status.current_snapshot_ts, @@ -273,112 +230,209 @@ def rds_snapshot_load(self) -> None: self.etl_status.last_schema, ) - def rds_cdc_load(self) -> None: - """Perform load of CDC files to history table""" - current_cdc_ts = self.etl_status.last_cdc_ts - current_schema = self.etl_status.last_schema - error_files: List[str] = [] - new_column_set: Set[str] = set() - work_files = [(wf, self.etl_status) for wf in get_cdc_dfms(self.etl_status, self.table)] - with ThreadPoolExecutor(max_workers=threading_cpu_count()) as pool: - for result in pool.map(thread_load_cdc_file, work_files): - dfm_result, new_columns = result - if new_columns is None and dfm_result is not None: - # load was success, save cdc_ts if needed - current_cdc_ts = max(current_cdc_ts, dfm_result) - elif new_columns is not None and dfm_result: - # handle file with new columns added - error_files.append(dfm_result) - for column in new_columns: - new_column_set.add(json.dumps(column)) - - if len(error_files) > 0: - # handle new column additions - new_columns_to_add: List[DFMSchemaFields] = [json.loads(column) for column in new_column_set] - self.db.execute(add_columns_to_table(new_columns_to_add, self.db_fact_table)) - for column in new_columns_to_add: - current_schema.append(column) - self.update_status(last_schema=current_schema) - # re-process failed files with added columns - work_files = [ - (DFMDetails(path=path, ts=re_get_first(path, RE_CDC_TS)), self.etl_status) for path in error_files - ] - with ThreadPoolExecutor(max_workers=threading_cpu_count()) as pool: - for result in pool.map(thread_load_cdc_file, work_files): - dfm_result, new_columns = result - if new_columns is None and dfm_result is not None: - current_cdc_ts = max(current_cdc_ts, dfm_result) - - self.db.vaccuum_analyze(f"{ODS_SCHEMA}.{self.db_history_table}") + def cdc_verify_schema(self, dfm_object: str) -> None: + """ + Verify SCHEMA of merged csv file by inspected dfm_object file of one csv.gz file - self.update_status( - last_cdc_ts=current_cdc_ts, - last_schema=current_schema, + If new columns are found, add them to RDS tables. + + If column dimension changed (such as Type or Primary Key designation) raise Error + + :param dfm_object: S3 path of .dfm file that wil be used for verification + """ + cdc_schema = dfm_schema_to_df(dfm_object) + cdc_name_set = set(cdc_schema.get_column("name")) + + # check dfm schema contains CDC_COLUMNS + assert set(CDC_COLUMNS).issubset(cdc_name_set) + cdc_schema = cdc_schema.filter(pl.col("name").is_in(CDC_COLUMNS).not_()) + + truth_schema = status_schema_to_df(self.etl_status) + truth_name_set = set(truth_schema.get_column("name")) + new_columns = cdc_schema.join( + truth_schema, + on=truth_schema.columns, + how="anti", + join_nulls=True, ) - def rds_fact_table_load(self) -> None: - """Load FACT Table records from History Table""" - logger = ProcessLogger("load_fact_table", table=self.db_fact_table) + # cdc_schema and truth_schema overlap, no action needed + if new_columns.shape[0] == 0: + return - fact_table = f"{ODS_SCHEMA}.{self.db_fact_table}" - history_table = f"{ODS_SCHEMA}.{self.db_history_table}" + # check if new_columns contains columns in truth schema, would + # indicate that a different dimension of the table changed (type, primary key) + new_truth_common = truth_name_set.intersection(set(new_columns.get_column("name"))) + assert len(new_truth_common) == 0, f"column dimension changed for {new_truth_common} from {dfm_object}" - table_columns = [col["name"] for col in self.etl_status.last_schema] - table_column_str = ",".join(table_columns) - key_columns = [col["name"] for col in self.etl_status.last_schema if col["primaryKeyPos"] > 0] - key_str = ",".join(key_columns) + add_columns: List[DFMSchemaFields] = new_columns.to_dicts() # type: ignore + self.db.execute(add_columns_to_table(add_columns, self.db_fact_table)) + current_schema = self.etl_status.last_schema + for column in add_columns: + current_schema.append(column) + self.update_status(last_schema=current_schema) - first_vals = [] - for column in ["header__change_oper"] + table_columns: - if column in key_columns: - continue - q = f"(array_remove(array_agg({column} ORDER BY header__change_seq DESC), NULL))[1] as {column}" - first_vals.append(q) + def cdc_update(self, cdc_df: pl.DataFrame, update_col: str, key_columns: List[str]) -> None: + """ + Perform UPDATE from cdc dataframe + """ + tmp_table = f"{self.db_fact_table}_load" + update_q = bulk_update_from_temp(self.db_fact_table, update_col, key_columns) - delete_str = " AND ".join( - [f"{fact_table}.{col}=to_delete.{col} AND NOT to_delete.{col} IS NULL" for col in key_columns] - ) - fact_delete_query = ( - f"WITH to_delete AS" - f" (" - f" SELECT {key_str} FROM" - f" (" - f" SELECT {key_str}" - f" , (array_remove(array_agg(header__change_oper ORDER BY header__change_seq DESC), NULL))[1] as header__change_oper" - f" FROM {history_table}" - f" WHERE header__change_oper <> 'B'" - f" GROUP BY {key_str}" - f" ) t_load" - f" WHERE t_load.header__change_oper = 'D'" - f" )" - f" DELETE FROM {fact_table}" - f" USING to_delete" - f" WHERE {delete_str}" + update_df = ( + cdc_df.filter( + pl.col("header__change_oper").eq("U"), + pl.col(update_col).is_not_null(), + ) + .sort(by="header__change_seq", descending=True) + .unique(key_columns, keep="first") + .select(key_columns + [update_col]) ) + if update_df.shape[0] == 0: + return + + with tempfile.TemporaryDirectory() as tmp_dir: + update_csv_path = os.path.join(tmp_dir, "update.csv") + update_df.write_csv(update_csv_path, quote_style="necessary") + self.db.truncate_table(tmp_table) + remote_csv_gz_copy(update_csv_path, tmp_table) - on_conflict_str = ",".join([f"{col}=EXCLUDED.{col}" for col in table_columns]) - fact_insert_query = ( - f"INSERT INTO {fact_table} ({table_column_str})" - f" SELECT {table_column_str} FROM" - f" (" - f" SELECT {key_str}" - f" , {','.join(first_vals)}" - f" FROM {history_table}" - f" WHERE header__change_oper <> 'B'" - f" GROUP BY {key_str}" - f" ) t_load" - f" WHERE t_load.header__change_oper <> 'D'" - f" ON CONFLICT ({key_str}) DO UPDATE SET " - f" {on_conflict_str}" - ";" + self.db.execute(update_q) + + def cdc_delete(self, cdc_df: pl.DataFrame, key_columns: List[str]) -> None: + """ + Perform DELETE from cdc dataframe + """ + tmp_table = f"{self.db_fact_table}_load" + delete_q = bulk_delete_from_temp(self.db_fact_table, key_columns) + + delete_df = ( + cdc_df.sort(by="header__change_seq", descending=True) + .unique(key_columns, keep="first") + .filter(pl.col("header__change_oper").eq("D")) + .select(key_columns) ) + if delete_df.shape[0] == 0: + return - self.db.execute(fact_delete_query) - self.db.execute(fact_insert_query) + with tempfile.TemporaryDirectory() as tmp_dir: + delete_csv_path = os.path.join(tmp_dir, "delete.csv") + delete_df.write_csv(delete_csv_path, quote_style="necessary") + self.db.truncate_table(tmp_table) + remote_csv_gz_copy(delete_csv_path, tmp_table) + self.db.execute(delete_q) - self.db.vaccuum_analyze(fact_table) + def cdc_insert(self, cdc_df: pl.DataFrame) -> None: + """ + Perform INSERT from cdc dataframe + """ + tmp_table = f"{self.db_fact_table}_load" - logger.log_complete() + insert_df = cdc_df.filter(pl.col("header__change_oper").eq("I")).drop(CDC_COLUMNS) + if insert_df.shape[0] == 0: + return + + insert_q = bulk_insert_from_temp(self.db_fact_table, insert_df.columns) + with tempfile.TemporaryDirectory() as tmp_dir: + insert_path = os.path.join(tmp_dir, "insert.csv") + insert_df.write_csv(insert_path, quote_style="necessary") + self.db.truncate_table(tmp_table) + remote_csv_gz_copy(insert_path, tmp_table) + self.db.execute(insert_q) + + def cdc_load_folder(self, load_folder: str) -> None: + """ + load all cdc.csv.gz file from load_folder into RDS + + 1. Merge all csv.gz files into one MERGED_FNAME csv file + 2. Verify SCHEMA of MERGED_FNAME matches RDS tables + 3. Load MERGED_FNAME csv file into self.db_history_table table + 4. Load INSERT records from MERGED_FNAME into self.db_fact_table + 5. For each non-key column of MERGED_FNAME, load UPDATE records into self.db_fact_table + 6. Perform DELETE operataions from MERGED_FNAME on self.db_fact_table + 7. Delete load_folder folder + + :param load_folder: folder containing all csv.gz files to be loaded + """ + logger = ProcessLogger("cdc_load_folder", load_folder=load_folder, table=self.db_fact_table) + try: + dfm_object = os.listdir(load_folder)[0].replace(".csv.gz", ".dfm").replace("|", "/") + merge_csv = os.path.join(load_folder, MERGED_FNAME) + + cdc_ts = merge_cdc_csv_gz_files(load_folder) + self.cdc_verify_schema(dfm_object) + + # Load records into _history table + remote_csv_gz_copy(merge_csv, self.db_history_table) + + cdc_df = dataframe_from_merged_csv(merge_csv, dfm_object) + + key_columns = [col["name"].lower() for col in self.etl_status.last_schema if col["primaryKeyPos"] > 0] + + self.cdc_insert(cdc_df) + + # Perform UPDATE Operations on fact table for each column indivduallly + for update_col in cdc_df.columns: + if update_col in key_columns or update_col in CDC_COLUMNS: + continue + self.cdc_update(cdc_df, update_col, key_columns) + + self.cdc_delete(cdc_df, key_columns) + + self.update_status(last_cdc_ts=max(cdc_ts, self.etl_status.last_cdc_ts)) + logger.log_complete() + + except Exception as exception: + logger.log_failure(exception) + + shutil.rmtree(load_folder, ignore_errors=True) + self.db.vaccuum_analyze(self.db_history_table) + self.db.vaccuum_analyze(self.db_fact_table) + + def cdc_check_load_folders(self, tmp_dir: str, max_folder_bytes: int = 0) -> None: + """ + Check all cdc hash folders in tmp_dir + if + size of hash folder is larger than max_folder_bytes + or more than 5000 files in folder + then load folder files into RDS + + :param tmp_dir: folder containing cdc hash folder partitions + :param max_folder_bytes: folder size threshold to trigger load operation + """ + for folder in os.listdir(tmp_dir): + load_folder = os.path.join(tmp_dir, folder) + if not os.path.isdir(load_folder): + continue + file_list = os.listdir(load_folder) + folder_count = len(file_list) + folder_bytes = sum(os.path.getsize(os.path.join(load_folder, f)) for f in file_list) + if folder_bytes > max_folder_bytes or folder_count > 5_000: + self.cdc_load_folder(load_folder) + + def process_cdc_files(self) -> None: + """ + 1. download cdc files in batches + 2. extract header row from each cdc file, convert it to a sha1 hash to be used as a folder name + 3. move cdc file to hash folder for later merging + 4. when hash folder size reaches threshold limit, load folder into RDS + """ + pool = ThreadPoolExecutor(max_workers=threading_cpu_count()) + + with tempfile.TemporaryDirectory(ignore_cleanup_errors=True) as tmp_dir: + work_files = [(wf, tmp_dir) for wf in get_cdc_gz_csvs(self.etl_status, self.table)] + for batch in batched(work_files, 10): + # Download batch of cdc csv.gz files + for _ in pool.map(thread_save_csv_file, batch): + pass + + # load any cdc hash folder greater than max_folder_bytes + self.cdc_check_load_folders(tmp_dir, max_folder_bytes=60_000_000) + + # load all remaining cdc hash folders + self.cdc_check_load_folders(tmp_dir) + + pool.shutdown() def snapshot_reset(self) -> None: """ @@ -397,16 +451,13 @@ def run_etl(self) -> None: """ Run table ETL Process - Currently this business logic will only process the latest QLIK "Snapshot" that has been issued - a feature will need to be added that will handle the issuance of new snapshots that should include: - - making sure all outstandind cdc files from last snapshot are loaded - - truncating the fact table - - resetting the table status file to the new snapshot - - re-doing initial load operations to history and fact tables + Currently this business logic will only process the latest QLIK Snapshot that has been issued + If a new QLIK Snapshot is detected, all existing tables will be dropped and whole process will be + reset to load NEW Snapshot """ logger = ProcessLogger( "ods_qlik_run_etl", - table=self.table, + table=self.db_fact_table, load_snapshot_ts=self.etl_status.current_snapshot_ts, last_cdc_ts=self.etl_status.last_cdc_ts, ) @@ -414,7 +465,7 @@ def run_etl(self) -> None: if self.etl_status.current_snapshot_ts != self.last_s3_snapshot_dfm.ts: new_snapshot_logger = ProcessLogger( "ods_snapshot_change", - table=self.table, + table=self.db_fact_table, old_shapshot=self.etl_status.current_snapshot_ts, new_shapshot=self.last_s3_snapshot_dfm.ts, ) @@ -429,10 +480,9 @@ def run_etl(self) -> None: if self.etl_status.last_cdc_ts == "": self.rds_snapshot_load() - self.rds_cdc_load() - self.rds_fact_table_load() + self.process_cdc_files() - self.db.execute(drop_table(self.db_load_table)) + self.db.execute(drop_table(f"{self.db_fact_table}_load")) self.save_status(self.etl_status) logger.log_complete() diff --git a/src/cubic_loader/qlik/rds_utils.py b/src/cubic_loader/qlik/rds_utils.py index 43cc2ec..d170bac 100644 --- a/src/cubic_loader/qlik/rds_utils.py +++ b/src/cubic_loader/qlik/rds_utils.py @@ -4,11 +4,9 @@ from datetime import datetime from dateutil.relativedelta import relativedelta -from cubic_loader.utils.remote_locations import ODS_SCHEMA from cubic_loader.qlik.utils import DFMSchemaFields -# pylint: disable=too-many-branches def qlik_type_to_pg(qlik_type: str, scale: int) -> str: """ convert qlik datatype from DFM file to postgres type @@ -16,15 +14,25 @@ def qlik_type_to_pg(qlik_type: str, scale: int) -> str: :param qlik_type: QLIK data type from DFM file :param scale: max number of digits to right of decimal - :return: postgres type + :return: postgres type as str """ - return_type = "VARCHAR" - - if qlik_type == "CHANGE_OPER": - return_type = "CHAR(1)" - elif qlik_type == "CHANGE_SEQ": - return_type = "NUMERIC(35,0)" - elif "INT1" in qlik_type: + exact_type_matches = { + "CHANGE_OPER": "CHAR(1)", + "CHANGE_SEQ": "NUMERIC(35,0)", + "REAL4": "REAL", + "REAL8": "DOUBLE PRECISION", + "BOOLEAN": "BOOLEAN", + "DATE": "DATE", + "TIME": "TIME WITHOUT TIME ZONE", + "DATETIME": "TIMESTAMP WITHOUT TIME ZONE", + } + # check for exacty type matching + return_type = exact_type_matches.get(qlik_type, None) + if return_type is not None: + return return_type + + # continue with alternate type matching + if "INT1" in qlik_type: return_type = "SMALLINT" elif "INT2" in qlik_type: return_type = "SMALLINT" @@ -32,34 +40,26 @@ def qlik_type_to_pg(qlik_type: str, scale: int) -> str: return_type = "INTEGER" elif "INT4" in qlik_type: return_type = "BIGINT" - elif qlik_type == "REAL4": - return_type = "REAL" - elif qlik_type == "REAL8": - return_type = "DOUBLE PRECISION" elif "NUMERIC" in qlik_type and scale == 0: return_type = "BIGINT" elif "NUMERIC" in qlik_type: return_type = "DOUBLE PRECISION" - elif qlik_type == "BOOLEAN": - return_type = qlik_type - elif qlik_type == "DATE": - return_type = qlik_type - elif qlik_type == "TIME": - return_type = "TIME WITHOUT TIME ZONE" - elif qlik_type == "DATETIME": - return_type = "TIMESTAMP WITHOUT TIME ZONE" + else: + return_type = "VARCHAR" return return_type -# pylint: enable=too-many-branches - - -def create_tables_from_schema(schema: List[DFMSchemaFields], table_name: str) -> str: +def create_tables_from_schema(schema: List[DFMSchemaFields], schema_and_table: str) -> str: """ produce CREATE table string for FACT and HISTORY tables from dfm snapshot path also CREATE INDEX for history table that will be used for inserting into FACT table. + + :param schema: Schema List from DFM file + :schema_and_table: Schema and Table as 'schema.table' + + :return: SQL Statements to CREATE FACT and HISTORY tables and any associated indexes """ ops: List[str] = [] dfm_columns: List[str] = [] @@ -73,7 +73,7 @@ def create_tables_from_schema(schema: List[DFMSchemaFields], table_name: str) -> # Create FACT Table fact_columns = dfm_columns + [f"PRIMARY KEY ({','.join(dfm_keys)})"] - ops.append(f"CREATE TABLE IF NOT EXISTS {ODS_SCHEMA}.{table_name} ({",".join(fact_columns)});") + ops.append(f"CREATE TABLE IF NOT EXISTS {schema_and_table} ({",".join(fact_columns)});") # Create HISTORY Table # partitioned by header__timestamp @@ -87,33 +87,33 @@ def create_tables_from_schema(schema: List[DFMSchemaFields], table_name: str) -> history_columns = header_cols + dfm_columns + [f"PRIMARY KEY ({','.join(history_keys)})"] ops.append( ( - f"CREATE TABLE IF NOT EXISTS {ODS_SCHEMA}.{table_name}_history ({",".join(history_columns)}) " + f"CREATE TABLE IF NOT EXISTS {schema_and_table}_history ({",".join(history_columns)}) " " PARTITION BY RANGE (header__timestamp);" ) ) # Create load Table for loading snapshot data load_columns = header_cols + dfm_columns - ops.append(f"CREATE TABLE IF NOT EXISTS {ODS_SCHEMA}.{table_name}_load ({",".join(load_columns)});") + ops.append(f"CREATE TABLE IF NOT EXISTS {schema_and_table}_load ({",".join(load_columns)});") # Create INDEX on HISTORY Table that will be used for creating FACT table index_columns = dfm_keys + ["header__change_oper", "header__change_seq DESC"] ops.append( - f"CREATE INDEX IF NOT EXISTS {table_name}_to_fact_idx on {ODS_SCHEMA}.{table_name}_history " + f"CREATE INDEX IF NOT EXISTS {schema_and_table.replace('.','_')}_to_fact_idx on {schema_and_table}_history " f"({','.join(index_columns)});" ) return " ".join(ops) -def create_history_table_partitions(table: str, start_ts: Optional[str] = None) -> str: +def create_history_table_partitions(schema_and_table: str, start_ts: Optional[str] = None) -> str: """ produce CREATE partition table strings for history table if `start_ts` IS NOT provided, produce CREATE statements for next 3 months from today if `start_ts` IS provided, produce CREATE statements for month of `start_ts` to 3 months from today - :param table: name of HISTORY table to create statements for + :param schema_and_table: name and schema of HISTORY table as 'schema.table' :param start_ts: date timestamp as string starting with YYYYMMDD :return: all partition CREATE statments as single string @@ -128,9 +128,9 @@ def create_history_table_partitions(table: str, start_ts: Optional[str] = None) part_tables: List[str] = [] while part_date < part_end: - part_table = f"{table}_y{part_date.year}m{part_date.month}" + part_table = f"{schema_and_table}_y{part_date.year}m{part_date.month}" create_part = ( - f"CREATE TABLE IF NOT EXISTS {ODS_SCHEMA}.{part_table} PARTITION OF {ODS_SCHEMA}.{table} " + f"CREATE TABLE IF NOT EXISTS {part_table} PARTITION OF {schema_and_table} " f"FOR VALUES FROM ('{part_date}') TO ('{part_date + relativedelta(months=1)}');" ) part_tables.append(create_part) @@ -139,34 +139,75 @@ def create_history_table_partitions(table: str, start_ts: Optional[str] = None) return " ".join(part_tables) -def drop_table(table_name: str) -> str: +def drop_table(schema_and_table: str) -> str: """ DROP table from RDS + + :param schema_and_table: name and schema of table to DROP as 'schema.table' + + :return: DROP TABLE command """ - return f"DROP TABLE IF EXISTS {ODS_SCHEMA}.{table_name} CASCADE;" + return f"DROP TABLE IF EXISTS {schema_and_table} CASCADE;" -def add_columns_to_table(new_columns: List[DFMSchemaFields], fact_table: str) -> str: +def add_columns_to_table(new_columns: List[DFMSchemaFields], schema_and_table: str) -> str: """ - produce ALTER table string to add columns to FACT, HISTORY and LOAD tables + produce ALTER table string to add columns to FACT and HISTORY tables :param new_columns: List of dictionaries containing new column name and QLIK type - :param fact_table: fact table for new columns + :param schema_and_table: name and schema of table as 'schema.table' :return: string to create all new columns """ tables = ( - fact_table, - f"{fact_table}_history", + schema_and_table, + f"{schema_and_table}_history", + f"{schema_and_table}_load", ) alter_strings: List[str] = [] for column in new_columns: for table in tables: alter_strings.append( - ( - f"ALTER TABLE {ODS_SCHEMA}.{table} ADD " - f"{column['name']} {qlik_type_to_pg(column['type'], column['scale'])};" - ) + f"ALTER TABLE {table} ADD {column['name']} {qlik_type_to_pg(column['type'], column['scale'])};" ) return " ".join(alter_strings) + + +def bulk_delete_from_temp(schema_and_table: str, key_columns: List[str]) -> str: + """ + create query to DELETE records from table based on key columns + """ + tmp_table = f"{schema_and_table}_load" + where_clause = " AND ".join([f"{schema_and_table}.{t}={tmp_table}.{t}" for t in key_columns]) + delete_query = f"DELETE FROM {schema_and_table} " f"USING {tmp_table} " f"WHERE {where_clause};" + + return delete_query + + +def bulk_update_from_temp(schema_and_table: str, update_column: str, key_columns: List[str]) -> str: + """ + create query to UPDATE records from table based on key columns + """ + tmp_table = f"{schema_and_table}_load" + where_clause = " AND ".join([f"{schema_and_table}.{t}={tmp_table}.{t}" for t in key_columns]) + update_query = ( + f"UPDATE {schema_and_table} SET {update_column}={tmp_table}.{update_column} " + f"FROM {tmp_table} WHERE {where_clause};" + ) + + return update_query + + +def bulk_insert_from_temp(schema_and_table: str, columns: List[str]) -> str: + """ + create query to INSERT records from temp table to fact table + """ + tmp_table = f"{schema_and_table}_load" + columns_str = ",".join(columns) + insert_query = ( + f"INSERT INTO {schema_and_table} ({columns_str}) " + f"SELECT {columns_str} FROM {tmp_table} " + f"ON CONFLICT DO NOTHING;" + ) + return insert_query diff --git a/src/cubic_loader/qlik/utils.py b/src/cubic_loader/qlik/utils.py index 40b0829..b565185 100644 --- a/src/cubic_loader/qlik/utils.py +++ b/src/cubic_loader/qlik/utils.py @@ -1,10 +1,17 @@ import os import re +import gzip +import json from typing import NamedTuple from typing import TypedDict from typing import List +import polars as pl + from cubic_loader.utils.aws import running_in_aws +from cubic_loader.utils.aws import s3_get_object +from cubic_loader.utils.aws import s3_get_client +from cubic_loader.utils.logger import ProcessLogger class DFMDetails(NamedTuple): @@ -39,6 +46,25 @@ class TableStatus(NamedTuple): RE_CDC_TS = re.compile(r"(\d{8}-\d{9})") +DFM_COLUMN_SCHEMA = pl.Schema( + { + "name": pl.String(), + "type": pl.String(), + "length": pl.Int64(), + "precision": pl.Int64(), + "scale": pl.Int64(), + "primaryKeyPos": pl.Int64(), + } +) + +CDC_COLUMNS = ( + "header__change_seq", + "header__change_oper", + "header__timestamp", +) + +MERGED_FNAME = "cdc_merged.csv" + def re_get_first(string: str, pattern: re.Pattern) -> str: """ @@ -58,6 +84,53 @@ def re_get_first(string: str, pattern: re.Pattern) -> str: return match.group(0) +def s3_list_cdc_gz_objects( + bucket: str, + prefix: str, + min_ts: str = "", +) -> List[str]: + """ + provide list of s3 objects based on bucket and prefix + + :param bucket: the name of the bucket with objects + :param prefix: prefix for objs to return + :param min_ts: filter for cdc.csv.gz objects + + :return: List[s3://bucket/key, ...] + """ + logger = ProcessLogger( + "s3_list_cdc_gz_objects", + bucket=bucket, + prefix=prefix, + min_ts=min_ts, + ) + try: + s3_client = s3_get_client() + paginator = s3_client.get_paginator("list_objects_v2") + pages = paginator.paginate(Bucket=bucket, Prefix=prefix) + + filepaths = [] + for page in pages: + if page["KeyCount"] == 0: + continue + for obj in page["Contents"]: + if obj["Size"] == 0 or not str(obj["Key"]).lower().endswith(".csv.gz"): + continue + try: + obj_ts = re_get_first(obj["Key"], RE_CDC_TS) + if obj_ts > min_ts: + filepaths.append(os.path.join("s3://", bucket, obj["Key"])) + except Exception as _: + continue + + logger.log_complete(objects_found=len(filepaths)) + return filepaths + + except Exception as exception: + logger.log_failure(exception) + return [] + + def threading_cpu_count() -> int: """ return an integer for the number of work threads to utilize @@ -70,3 +143,127 @@ def threading_cpu_count() -> int: return os_cpu_count * 2 return os_cpu_count + + +def dfm_schema_to_json(dfm_path: str) -> List[DFMSchemaFields]: + """ + extract table schema from S3 .dfm path as json + + :param dfm_path: S3 path to .dfm file as s3://bucket/object_path + """ + dfm_json = json.load(s3_get_object(dfm_path)) + return dfm_json["dataInfo"]["columns"] + + +def dfm_schema_to_df(dfm_path: str) -> pl.DataFrame: + """ + extract table schema from .dfm and convert to Polars Dataframe + + :param dfm_path: S3 path to .dfm file as s3://bucket/object_path + """ + json_schema = dfm_schema_to_json(dfm_path) + return pl.DataFrame( + json_schema, + schema=DFM_COLUMN_SCHEMA, + ) + + +def status_schema_to_df(status: TableStatus) -> pl.DataFrame: + """ + extract table schema from TableStatus and convert to Polars Dataframe + """ + return pl.DataFrame( + status.last_schema, + schema=DFM_COLUMN_SCHEMA, + ) + + +def merge_cdc_csv_gz_files(tmp_dir: str) -> str: + """ + Merge all cdc csv.gz files in tmp_dir, will create MERGED_FNAME file in tmp_dir + + :param tmp_dir: folder containing all csv.gz files to be merged + + :return: greatest cdc ts from files in tmp_dir + """ + merge_file = os.path.join(tmp_dir, MERGED_FNAME) + cdc_paths = [os.path.join(tmp_dir, f) for f in os.listdir(tmp_dir)] + + max_ts = "" + with open(merge_file, "wb") as fout: + for cdc_path in cdc_paths: + max_ts = max(max_ts, re_get_first(cdc_path, RE_CDC_TS)) + with gzip.open(cdc_path, "rb") as f: + if fout.tell() > 0: + next(f) + fout.write(f.read()) + + return max_ts + + +def qlik_type_to_polars(field: DFMSchemaFields) -> pl.DataType: + """ + convert QLIK datatypes to polars types + + :param field: QLIK .dfm Schema Field to be converted to Polars type + + :return: qlik type converted to polars + """ + qlik_type = field["type"] + + if field["name"] == "header__change_seq": + qlik_type = "CHANGE_SEQ" + + exact_type_matches = { + "CHANGE_SEQ": pl.Decimal(35, 0), + "REAL4": pl.Float32(), + "REAL8": pl.Float64(), + "BOOLEAN": pl.Boolean(), + "DATE": pl.Date(), + "TIME": pl.Time(), + "DATETIME": pl.Datetime(), + } + # check for exacty type matching + return_type = exact_type_matches.get(qlik_type, None) + if return_type is not None: + return return_type + + # continue with alternate type matching + if "INT" in qlik_type: + return_type = pl.Int64() + elif "NUMERIC" in qlik_type and field["scale"] == 0: + return_type = pl.Int64() + elif "NUMERIC" in qlik_type: + return_type = pl.Float64() + else: + return_type = pl.String() + + return return_type + + +def polars_schema_from_dfm(dfm_path: str) -> pl.Schema: + """ + create polars schema based on column names and types from dfm_path + + :param dfm_path: S3 path to .dfm file as s3://bucket/object_path + + :return: polars schema of dfm_path + """ + return pl.Schema({col["name"].lower(): qlik_type_to_polars(col) for col in dfm_schema_to_json(dfm_path)}) + + +def dataframe_from_merged_csv(csv_path: str, dfm_path: str) -> pl.DataFrame: + """ + load csv_path into dataframe with correct types + types will be inferred from dfm_path (one .csv.gz file from csv_path) + + dataframe drops header__change_oper="B" because they are redundant + + :param csv_path: local path for merged csv file + :param dfm_path: S3 path to .dfm file as s3://bucket/object_path + + :return: polars dataframe of csv_path file + """ + schema = polars_schema_from_dfm(dfm_path) + df = pl.read_csv(csv_path, schema=schema).filter(pl.col("header__change_oper").ne("B")) + return df diff --git a/src/cubic_loader/utils/postgres.py b/src/cubic_loader/utils/postgres.py index 950717e..ab5a79d 100644 --- a/src/cubic_loader/utils/postgres.py +++ b/src/cubic_loader/utils/postgres.py @@ -241,7 +241,7 @@ def get_session(self) -> sessionmaker: def execute(self, statement: PreAnyQuery) -> CursorResult: """ - execute SQL Statement with no return data + execute SQL Statement with no return data :param statement: SQL Statement to execute """ @@ -397,6 +397,9 @@ def header_from_csv_gz(obj_path: str) -> str: check=True, ) header_str = ps.stdout + elif obj_path.lower().endswith(".csv"): + with open(obj_path, "rt", encoding="utf8") as csv_file: + header_str = csv_file.readline() else: with gzip.open(obj_path, "rt") as gzip_file: header_str = gzip_file.readline()