Skip to content

Commit

Permalink
FIX: ODS Views and dmap-import sequences (#45)
Browse files Browse the repository at this point in the history
This change includes:
- No longer dropping ODS fact tables during snapshot reset, truncate instead (this was causing views to drop)
- Recreate WC700 Views dropped during snapshot reset.
- Create ad_hoc_processed_taps View with associated index.
  • Loading branch information
rymarczy authored Feb 4, 2025
1 parent 5d572f3 commit 50c2d98
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,16 @@


def upgrade() -> None:
pass
# Delete status files for all tables loaded since the NOT NULL Primary Key requirement was dropped.
# This will force a reset/re-load for these tables in dmap-import DB
s3_delete_object(os.path.join(ODS_STATUS, "EDW.PAYMENT_SUMMARY.json"))
s3_delete_object(os.path.join(ODS_STATUS, "EDW.MEMBER_DIMENSION.json"))
op.drop_table("edw_payment_summary", schema="ods")
op.drop_table("edw_payment_summary_history", schema="ods")
op.drop_table("edw_member_dimension", schema="ods")
op.drop_table("edw_member_dimension_history", schema="ods")
# This was one-time migration, does not need to remain active
# s3_delete_object(os.path.join(ODS_STATUS, "EDW.PAYMENT_SUMMARY.json"))
# s3_delete_object(os.path.join(ODS_STATUS, "EDW.MEMBER_DIMENSION.json"))
# op.drop_table("edw_payment_summary", schema="ods")
# op.drop_table("edw_payment_summary_history", schema="ods")
# op.drop_table("edw_member_dimension", schema="ods")
# op.drop_table("edw_member_dimension_history", schema="ods")


def downgrade() -> None:
Expand Down
43 changes: 43 additions & 0 deletions src/alembic/versions/012_be5284c3563e.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
"""abp_tap index
Revision ID: be5284c3563e
Revises: bf94c2890bc9
Create Date: 2025-02-03 10:14:44.209868
"""

from typing import Sequence, Union

from alembic import op
import sqlalchemy as sa

from cubic_loader.qlik.sql_strings.views import AD_HOC_VIEW
from cubic_loader.qlik.sql_strings.views import COMP_A_VIEW
from cubic_loader.qlik.sql_strings.views import COMP_B_VIEW
from cubic_loader.qlik.sql_strings.views import COMP_D_VIEW

# revision identifiers, used by Alembic.
revision: str = "be5284c3563e"
down_revision: Union[str, None] = "bf94c2890bc9"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
op.execute("ALTER SEQUENCE device_event_pk_id_seq AS bigint;")
op.execute("ALTER SEQUENCE sale_transaction_pk_id_seq AS bigint;")
op.execute("ALTER SEQUENCE use_transaction_location_pk_id_seq AS bigint;")
op.execute("ALTER SEQUENCE use_transaction_longitudinal_pk_id_seq AS bigint;")
op.execute("CREATE INDEX idx_abp_tap_inserted ON ods.edw_abp_tap (source_inserted_dtm);")
op.execute(AD_HOC_VIEW)
op.execute(COMP_A_VIEW)
op.execute(COMP_B_VIEW)
op.execute(COMP_D_VIEW)


def downgrade() -> None:
op.execute("DROP INDEX idx_abp_tap_inserted;")
op.execute("DROP VIEW IF EXISTS ods.ad_hoc_processed_taps;")
op.execute("DROP VIEW IF EXISTS ods.wc700_comp_a;")
op.execute("DROP VIEW IF EXISTS ods.wc700_comp_b;")
op.execute("DROP VIEW IF EXISTS ods.wc700_comp_d;")
2 changes: 1 addition & 1 deletion src/cubic_loader/qlik/ods_qlik.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,7 +445,7 @@ def snapshot_reset(self) -> None:
self.etl_status = self.load_etl_status()

self.db.execute(drop_table(self.db_history_table))
self.db.execute(drop_table(self.db_fact_table))
self.db.truncate_table(self.db_fact_table, restart_identity=True, cascade=True)

def run_etl(self) -> None:
"""
Expand Down
134 changes: 134 additions & 0 deletions src/cubic_loader/qlik/sql_strings/views.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
AD_HOC_VIEW = """
DROP VIEW IF EXISTS ods.ad_hoc_processed_taps;
CREATE OR REPLACE VIEW ods.ad_hoc_processed_taps
AS
SELECT
tap_id
,transaction_dtm
,device_id
,DEVICE_PREFIX
,token_id
,TRANSIT_ACCOUNT_ID
,OPERATOR_ID
,TRANSACTION_ID
,tap_status_id
,tap_status_desc
,unmatched_flag
,trip_id
,sector_id
,voidable_until_dtm
,dw_transaction_id
,source_inserted_dtm
FROM ods.edw_abp_tap
;
"""

COMP_A_VIEW = """
DROP VIEW IF EXISTS ods.wc700_comp_a;
CREATE OR REPLACE VIEW ods.wc700_comp_a
AS
SELECT
ps.settlement_day_key
,ps.operating_day_key
,ps.payment_type_key
,tcm.txn_channel_display
,tcm.sales_channel_display
,SUM(COALESCE(transit_value,0) + COALESCE(benefit_value,0) + COALESCE(bankcard_payment_value,0) + COALESCE(one_account_value,0))/100 AS stored_value
,SUM(COALESCE(pass_cost,0))/100 AS pass_cost
,SUM(COALESCE(enablement_fee,0))/100 AS enablement_fee
,SUM(COALESCE(transit_value,0) + COALESCE(benefit_value,0) + COALESCE(bankcard_payment_value,0) + COALESCE(one_account_value,0) + COALESCE(pass_cost,0) + COALESCE(enablement_fee,0) + COALESCE(replacement_fee, 0))/100 AS total_fare_revenue
FROM
ods.edw_payment_summary ps
JOIN
ods.edw_txn_channel_map tcm
ON
tcm.txn_source = ps.txn_source
AND tcm.sales_channel_key = ps.sales_channel_key
and tcm.payment_type_key = ps.payment_type_key
WHERE
tcm.txn_group = 'Product Sales'
GROUP BY
ps.settlement_day_key
,ps.operating_day_key
,ps.payment_type_key
,tcm.txn_channel_display
,tcm.sales_channel_display
ORDER BY
operating_day_key desc
,settlement_day_key desc
;
"""


COMP_B_VIEW = """
DROP VIEW IF EXISTS ods.wc700_comp_b;
CREATE OR REPLACE VIEW ods.wc700_comp_b
AS
SELECT
ps.settlement_day_key
,ps.operating_day_key
,ps.payment_type_key
,tcm.txn_channel_display
,tcm.sales_channel_display
,SUM(COALESCE(payment_value,0))/100 AS total_fare_revenue
FROM
ods.edw_payment_summary ps
JOIN
ods.edw_txn_channel_map tcm
ON
tcm.txn_source = ps.txn_source
AND tcm.sales_channel_key = ps.sales_channel_key
and tcm.payment_type_key = ps.payment_type_key
WHERE
tcm.txn_group = 'Open Payment Trips'
GROUP BY
ps.settlement_day_key
,ps.operating_day_key
,ps.payment_type_key
,tcm.txn_channel_display
,tcm.sales_channel_display
ORDER BY
operating_day_key desc
,settlement_day_key desc
;
"""


COMP_D_VIEW = """
DROP VIEW IF EXISTS ods.wc700_comp_d;
CREATE OR REPLACE VIEW ods.wc700_comp_d
AS
SELECT
ps.settlement_day_key
,ps.operating_day_key
,ps.payment_type_key
,tcm.txn_channel_display
,tcm.sales_channel_display
,rd.reason_name
,SUM(ps.payment_value)/100 as refund_value
FROM
ods.edw_payment_summary ps
JOIN
ods.edw_txn_channel_map tcm
ON
tcm.txn_source = ps.txn_source
AND tcm.sales_channel_key = ps.sales_channel_key
AND tcm.payment_type_key = ps.payment_type_key
LEFT JOIN
ods.edw_reason_dimension rd
ON
rd.reason_key = ps.reason_key
WHERE
tcm.txn_group = 'Direct Refunds Applied'
GROUP BY
ps.settlement_day_key
,ps.operating_day_key
,ps.payment_type_key
,tcm.txn_channel_display
,tcm.sales_channel_display
,rd.reason_name
ORDER BY
ps.operating_day_key desc
,ps.settlement_day_key desc
;
"""

0 comments on commit 50c2d98

Please sign in to comment.