-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
FIX: ODS Views and dmap-import sequences #45
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,14 +22,16 @@ | |
|
||
|
||
def upgrade() -> None: | ||
pass | ||
# Delete status files for all tables loaded since the NOT NULL Primary Key requirement was dropped. | ||
# This will force a reset/re-load for these tables in dmap-import DB | ||
s3_delete_object(os.path.join(ODS_STATUS, "EDW.PAYMENT_SUMMARY.json")) | ||
s3_delete_object(os.path.join(ODS_STATUS, "EDW.MEMBER_DIMENSION.json")) | ||
op.drop_table("edw_payment_summary", schema="ods") | ||
op.drop_table("edw_payment_summary_history", schema="ods") | ||
op.drop_table("edw_member_dimension", schema="ods") | ||
op.drop_table("edw_member_dimension_history", schema="ods") | ||
# This was one-time migration, does not need to remain active | ||
# s3_delete_object(os.path.join(ODS_STATUS, "EDW.PAYMENT_SUMMARY.json")) | ||
# s3_delete_object(os.path.join(ODS_STATUS, "EDW.MEMBER_DIMENSION.json")) | ||
# op.drop_table("edw_payment_summary", schema="ods") | ||
# op.drop_table("edw_payment_summary_history", schema="ods") | ||
# op.drop_table("edw_member_dimension", schema="ods") | ||
# op.drop_table("edw_member_dimension_history", schema="ods") | ||
Comment on lines
+25
to
+34
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was causing issues during local testing, should never need to be run again. |
||
|
||
|
||
def downgrade() -> None: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,43 @@ | ||
"""abp_tap index | ||
|
||
Revision ID: be5284c3563e | ||
Revises: bf94c2890bc9 | ||
Create Date: 2025-02-03 10:14:44.209868 | ||
|
||
""" | ||
|
||
from typing import Sequence, Union | ||
|
||
from alembic import op | ||
import sqlalchemy as sa | ||
|
||
from cubic_loader.qlik.sql_strings.views import AD_HOC_VIEW | ||
from cubic_loader.qlik.sql_strings.views import COMP_A_VIEW | ||
from cubic_loader.qlik.sql_strings.views import COMP_B_VIEW | ||
from cubic_loader.qlik.sql_strings.views import COMP_D_VIEW | ||
|
||
# revision identifiers, used by Alembic. | ||
revision: str = "be5284c3563e" | ||
down_revision: Union[str, None] = "bf94c2890bc9" | ||
branch_labels: Union[str, Sequence[str], None] = None | ||
depends_on: Union[str, Sequence[str], None] = None | ||
|
||
|
||
def upgrade() -> None: | ||
op.execute("ALTER SEQUENCE device_event_pk_id_seq AS bigint;") | ||
op.execute("ALTER SEQUENCE sale_transaction_pk_id_seq AS bigint;") | ||
op.execute("ALTER SEQUENCE use_transaction_location_pk_id_seq AS bigint;") | ||
op.execute("ALTER SEQUENCE use_transaction_longitudinal_pk_id_seq AS bigint;") | ||
op.execute("CREATE INDEX idx_abp_tap_inserted ON ods.edw_abp_tap (source_inserted_dtm);") | ||
op.execute(AD_HOC_VIEW) | ||
op.execute(COMP_A_VIEW) | ||
op.execute(COMP_B_VIEW) | ||
op.execute(COMP_D_VIEW) | ||
|
||
|
||
def downgrade() -> None: | ||
op.execute("DROP INDEX idx_abp_tap_inserted;") | ||
op.execute("DROP VIEW IF EXISTS ods.ad_hoc_processed_taps;") | ||
op.execute("DROP VIEW IF EXISTS ods.wc700_comp_a;") | ||
op.execute("DROP VIEW IF EXISTS ods.wc700_comp_b;") | ||
op.execute("DROP VIEW IF EXISTS ods.wc700_comp_d;") |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -445,7 +445,7 @@ def snapshot_reset(self) -> None: | |
self.etl_status = self.load_etl_status() | ||
|
||
self.db.execute(drop_table(self.db_history_table)) | ||
self.db.execute(drop_table(self.db_fact_table)) | ||
self.db.truncate_table(self.db_fact_table, restart_identity=True, cascade=True) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Drop command was causing VIEWS to get dropped... |
||
|
||
def run_etl(self) -> None: | ||
""" | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,134 @@ | ||
AD_HOC_VIEW = """ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Keep these VIEW statements in central location. |
||
DROP VIEW IF EXISTS ods.ad_hoc_processed_taps; | ||
CREATE OR REPLACE VIEW ods.ad_hoc_processed_taps | ||
AS | ||
SELECT | ||
tap_id | ||
,transaction_dtm | ||
,device_id | ||
,DEVICE_PREFIX | ||
,token_id | ||
,TRANSIT_ACCOUNT_ID | ||
,OPERATOR_ID | ||
,TRANSACTION_ID | ||
,tap_status_id | ||
,tap_status_desc | ||
,unmatched_flag | ||
,trip_id | ||
,sector_id | ||
,voidable_until_dtm | ||
,dw_transaction_id | ||
,source_inserted_dtm | ||
FROM ods.edw_abp_tap | ||
; | ||
""" | ||
|
||
COMP_A_VIEW = """ | ||
DROP VIEW IF EXISTS ods.wc700_comp_a; | ||
CREATE OR REPLACE VIEW ods.wc700_comp_a | ||
AS | ||
SELECT | ||
ps.settlement_day_key | ||
,ps.operating_day_key | ||
,ps.payment_type_key | ||
,tcm.txn_channel_display | ||
,tcm.sales_channel_display | ||
,SUM(COALESCE(transit_value,0) + COALESCE(benefit_value,0) + COALESCE(bankcard_payment_value,0) + COALESCE(one_account_value,0))/100 AS stored_value | ||
,SUM(COALESCE(pass_cost,0))/100 AS pass_cost | ||
,SUM(COALESCE(enablement_fee,0))/100 AS enablement_fee | ||
,SUM(COALESCE(transit_value,0) + COALESCE(benefit_value,0) + COALESCE(bankcard_payment_value,0) + COALESCE(one_account_value,0) + COALESCE(pass_cost,0) + COALESCE(enablement_fee,0) + COALESCE(replacement_fee, 0))/100 AS total_fare_revenue | ||
FROM | ||
ods.edw_payment_summary ps | ||
JOIN | ||
ods.edw_txn_channel_map tcm | ||
ON | ||
tcm.txn_source = ps.txn_source | ||
AND tcm.sales_channel_key = ps.sales_channel_key | ||
and tcm.payment_type_key = ps.payment_type_key | ||
WHERE | ||
tcm.txn_group = 'Product Sales' | ||
GROUP BY | ||
ps.settlement_day_key | ||
,ps.operating_day_key | ||
,ps.payment_type_key | ||
,tcm.txn_channel_display | ||
,tcm.sales_channel_display | ||
ORDER BY | ||
operating_day_key desc | ||
,settlement_day_key desc | ||
; | ||
""" | ||
|
||
|
||
COMP_B_VIEW = """ | ||
DROP VIEW IF EXISTS ods.wc700_comp_b; | ||
CREATE OR REPLACE VIEW ods.wc700_comp_b | ||
AS | ||
SELECT | ||
ps.settlement_day_key | ||
,ps.operating_day_key | ||
,ps.payment_type_key | ||
,tcm.txn_channel_display | ||
,tcm.sales_channel_display | ||
,SUM(COALESCE(payment_value,0))/100 AS total_fare_revenue | ||
FROM | ||
ods.edw_payment_summary ps | ||
JOIN | ||
ods.edw_txn_channel_map tcm | ||
ON | ||
tcm.txn_source = ps.txn_source | ||
AND tcm.sales_channel_key = ps.sales_channel_key | ||
and tcm.payment_type_key = ps.payment_type_key | ||
WHERE | ||
tcm.txn_group = 'Open Payment Trips' | ||
GROUP BY | ||
ps.settlement_day_key | ||
,ps.operating_day_key | ||
,ps.payment_type_key | ||
,tcm.txn_channel_display | ||
,tcm.sales_channel_display | ||
ORDER BY | ||
operating_day_key desc | ||
,settlement_day_key desc | ||
; | ||
""" | ||
|
||
|
||
COMP_D_VIEW = """ | ||
DROP VIEW IF EXISTS ods.wc700_comp_d; | ||
CREATE OR REPLACE VIEW ods.wc700_comp_d | ||
AS | ||
SELECT | ||
ps.settlement_day_key | ||
,ps.operating_day_key | ||
,ps.payment_type_key | ||
,tcm.txn_channel_display | ||
,tcm.sales_channel_display | ||
,rd.reason_name | ||
,SUM(ps.payment_value)/100 as refund_value | ||
FROM | ||
ods.edw_payment_summary ps | ||
JOIN | ||
ods.edw_txn_channel_map tcm | ||
ON | ||
tcm.txn_source = ps.txn_source | ||
AND tcm.sales_channel_key = ps.sales_channel_key | ||
AND tcm.payment_type_key = ps.payment_type_key | ||
LEFT JOIN | ||
ods.edw_reason_dimension rd | ||
ON | ||
rd.reason_key = ps.reason_key | ||
WHERE | ||
tcm.txn_group = 'Direct Refunds Applied' | ||
GROUP BY | ||
ps.settlement_day_key | ||
,ps.operating_day_key | ||
,ps.payment_type_key | ||
,tcm.txn_channel_display | ||
,tcm.sales_channel_display | ||
,rd.reason_name | ||
ORDER BY | ||
ps.operating_day_key desc | ||
,ps.settlement_day_key desc | ||
; | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I gotta tell you. This concept of using migrations to do other migration-related things blew my mind. 😂