From 27fa9924c5d85f9a58d2df0184c6662167a4af45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Guillois?= Date: Thu, 30 Jan 2025 15:17:00 +0100 Subject: [PATCH] fix: configure ban address jobs --- analytics/dagster/src/definitions.py | 35 ++++++++++++++++++++++++---- analytics/dagster/workspace.yaml | 6 ----- 2 files changed, 30 insertions(+), 11 deletions(-) diff --git a/analytics/dagster/src/definitions.py b/analytics/dagster/src/definitions.py index 4e09de1fd..a3e46537b 100644 --- a/analytics/dagster/src/definitions.py +++ b/analytics/dagster/src/definitions.py @@ -24,9 +24,11 @@ from .project import dbt_project from .assets import production_dbt -from .assets import populate_owners_ban_addresses -from .assets import populate_housings_ban_addresses +from .assets.populate_owners_ban_addresses import owners_without_address, create_csv_chunks_from_owners, send_csv_chunks_to_api, parse_api_response_and_insert_owners_addresses +from .assets.populate_housings_ban_addresses import housings_without_address, create_csv_from_housings, send_csv_to_api, parse_api_response_and_insert_housing_addresses from .resources.ban_config import ban_config_resource +from .resources.database_resources import psycopg2_connection_resource +from .resources.database_resources import sqlalchemy_engine_resource from .assets import clever @@ -88,14 +90,34 @@ job=yearly_update_ff_dwh_job, cron_schedule="@yearly" ) +owners_asset_job = define_asset_job( + name="populate_owners_addresses", + selection=AssetSelection.assets( + "owners_without_address", + "create_csv_chunks_from_owners", + "send_csv_chunks_to_api", + "parse_api_response_and_insert_owners_addresses", + ), +) + +housings_asset_job = define_asset_job( + name="populate_housings_addresses", + selection=AssetSelection.assets( + "housings_without_address", + "create_csv_from_housings", + "send_csv_to_api", + "parse_api_response_and_insert_housing_addresses", + ), +) + # Load definitions with assets, resources, and schedule defs = Definitions( assets=[ # dagster_production_assets, # dagster_notion_assets, # dagster_notion_assets, - populate_owners_ban_addresses, - populate_housings_ban_addresses, + owners_without_address, create_csv_chunks_from_owners, send_csv_chunks_to_api, parse_api_response_and_insert_owners_addresses, + housings_without_address, create_csv_from_housings, send_csv_to_api, parse_api_response_and_insert_housing_addresses, *dwh_assets, *dbt_analytics_assets, *clever_assets_assets @@ -112,7 +134,10 @@ "duckdb_local_metabase": DuckDBResource( database="db/metabase.duckdb", ), - "ban_config": ban_config_resource + "ban_config": ban_config_resource, + "psycopg2_connection": psycopg2_connection_resource, + "sqlalchemy_engine": sqlalchemy_engine_resource, }, schedules=[daily_refresh_schedule, yearly_ff_refresh_schedule], + jobs=[owners_asset_job, housings_asset_job], ) diff --git a/analytics/dagster/workspace.yaml b/analytics/dagster/workspace.yaml index 8dc74f405..9c76d4d40 100644 --- a/analytics/dagster/workspace.yaml +++ b/analytics/dagster/workspace.yaml @@ -1,8 +1,2 @@ load_from: - python_module: src - - python_file: - relative_path: jobs/ban_address_jobs.py - -resources: - ban_config: - config: {}