From fc8725e900b01f148dc4d8f45a8a91bf7a14c622 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Guillois?= Date: Thu, 2 Jan 2025 16:38:25 +0100 Subject: [PATCH 01/10] feat: add new jobs and assets for retrieving BAN addresses for owners and housings --- .talismanrc | 6 + analytics/dagster/requirements.txt | 4 +- .../assets/populate_housings_ban_addresses.py | 125 +++++++++++++ .../assets/populate_owners_ban_addresses.py | 170 ++++++++++++++++++ analytics/dagster/src/definitions.py | 14 +- analytics/dagster/src/hello.py | 17 ++ .../src/jobs/owners_ban_addresses_job.py | 11 ++ analytics/dagster/src/resources/ban_config.py | 46 +++++ analytics/dagster/workspace.yaml | 6 + 9 files changed, 392 insertions(+), 7 deletions(-) create mode 100644 analytics/dagster/src/assets/populate_housings_ban_addresses.py create mode 100644 analytics/dagster/src/assets/populate_owners_ban_addresses.py create mode 100644 analytics/dagster/src/hello.py create mode 100644 analytics/dagster/src/jobs/owners_ban_addresses_job.py create mode 100644 analytics/dagster/src/resources/ban_config.py diff --git a/.talismanrc b/.talismanrc index 496404a86..a5b3bba29 100644 --- a/.talismanrc +++ b/.talismanrc @@ -3,6 +3,12 @@ fileignoreconfig: checksum: f7719ba0d36160d97e80ee15cb5415b601354576929e36df0596c7d192465cfb - filename: README.md checksum: df312ccb4c75fc4c2441a1f7f2c7817ee98ffb3065c78d5d7d6addf6ab129176 +- filename: analytics/dagster/src/assets/populate_housings_ban_addresses.py + checksum: 4c16939cca930517d45f30128d79332784cf09c933a1510f0f02e949b1eaac87 +- filename: analytics/dagster/src/assets/populate_owners_ban_addresses.py + checksum: b068d130431a26be58c78e4f14346be53d318110c4020c7f7212069cbd190bf2 +- filename: analytics/dagster/src/resources/ban_config.py + checksum: 034c6924978983da0ca5897bb06b64598a5a813dc93d1d9e8f8a62da952d4d22 - filename: frontend/src/components/Draft/DraftSender.tsx checksum: cfcc1023edac4d29c305f00a4bd86f27d491e4e9535bec2bd905a917b3f396b7 - filename: frontend/src/components/Draft/DraftSignature.tsx diff --git a/analytics/dagster/requirements.txt b/analytics/dagster/requirements.txt index 6c65284bd..fd5e33ccf 100644 --- a/analytics/dagster/requirements.txt +++ b/analytics/dagster/requirements.txt @@ -7,5 +7,5 @@ matplotlib pandas requests dlt[duckdb] - - +psycopg2 +pydantic_settings diff --git a/analytics/dagster/src/assets/populate_housings_ban_addresses.py b/analytics/dagster/src/assets/populate_housings_ban_addresses.py new file mode 100644 index 000000000..67a439c72 --- /dev/null +++ b/analytics/dagster/src/assets/populate_housings_ban_addresses.py @@ -0,0 +1,125 @@ +from dagster import asset, MetadataValue, AssetExecutionContext +import requests +import pandas as pd +import psycopg2 +from io import StringIO + + +@asset(description="Return housing records from `fast_housing` that have no matching entry in `ban_addresses`.", required_resource_keys={"ban_config"}) +def housings_without_address(context: AssetExecutionContext): + config = context.resources.ban_config + + query = """ + SELECT fh.id as housing_id, array_to_string(fh.address_dgfip, ' ') as address_dgfip, fh.geo_code + FROM fast_housing fh + LEFT JOIN ban_addresses ba ON fh.id = ba.ref_id + WHERE ba.ref_id IS NULL; + """ + + conn = psycopg2.connect( + dbname=config.db_name, + user=config.db_user, + password=config.db_password, + host=config.db_host, + port=config.db_port, + ) + df = pd.read_sql(query, conn) + conn.close() + + return df + + +@asset(description="Write housing records without addresses to a CSV file, log the file path and a preview, then return the file path as metadata.", required_resource_keys={"ban_config"}) +def create_csv_from_housings(context: AssetExecutionContext, housings_without_address): + config = context.resources.ban_config + + csv_file_path = f"{config.csv_file_path}/search.csv" + housings_without_address.to_csv(csv_file_path, index=False, columns=["housing_id","address_dgfip", "geo_code"]) + + context.log.info(f"CSV file created at: {csv_file_path}") + + df = pd.read_csv(csv_file_path) + pd.set_option('display.max_columns', None) + pd.set_option('display.max_rows', None) + pd.set_option('display.width', None) + context.log.info(f"Preview of the CSV file:\n{df.head()}") + + return { + "metadata": {"file_path": MetadataValue.text(csv_file_path)} + } + + +@asset(deps=[create_csv_from_housings], description="Send the local CSV file to the BAN address API for address processing. Raises an exception if the request fails.", required_resource_keys={"ban_config"}) +def send_csv_to_api(context: AssetExecutionContext): + config = context.resources.ban_config + + files = {'data': open(f"{config.csv_file_path}/search.csv", 'rb')} + + data = {'columns': 'address_dgfip', 'citycode': 'geo_code'} + + response = requests.post(config.api_url, files=files, data=data) + + if response.status_code == 200: + return response.text + else: + raise Exception(f"API request failed with status code {response.status_code}") + + +@asset(description="Parse the CSV response from the BAN address API, insert valid addresses into the `ban_addresses` table, log a preview and any failed results, then return the total number of inserted records as metadata.", required_resource_keys={"ban_config"}) +def parse_api_response_and_insert_housing_addresses(context: AssetExecutionContext, send_csv_to_api): + config = context.resources.ban_config + + api_df = pd.read_csv(StringIO(send_csv_to_api)) + + pd.set_option('display.max_columns', None) + pd.set_option('display.max_rows', None) + pd.set_option('display.width', None) + context.log.info(f"Preview of the CSV file:\n{api_df.head()}") + + conn = psycopg2.connect( + dbname=config.db_name, + user=config.db_user, + password=config.db_password, + host=config.db_host, + port=config.db_port, + ) + cursor = conn.cursor() + + filtered_df = api_df[api_df['result_status'] == 'ok'] + failed_rows = api_df[api_df['result_status'] != 'ok'] + context.log.warning(f"Number of housings with failed API results: {len(failed_rows)}") + + for _, row in filtered_df.iterrows(): + + # L'API BAN renvoie des valeurs NaN pour les champs vides. Par exemple pour les lieux-dits il n'y a pas de numéro de rue ni de rue + row = row.apply(lambda x: None if pd.isna(x) else x) + + cursor.execute( + """ + INSERT INTO ban_addresses (ref_id, house_number, address, street, postal_code, city, latitude, longitude, score, ban_id, address_kind) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + """, + ( + row['housing_id'], + row['result_housenumber'], + row['result_label'], + row['result_street'], + row['result_postcode'], + row['result_city'], + row['latitude'], + row['longitude'], + row['result_score'], + row['result_id'], + "Housing" + ), + ) + + conn.commit() + cursor.close() + conn.close() + + context.log.info(f"{len(api_df)} records inserted successfully.") + + return { + "metadata": {"num_records": MetadataValue.text(f"{len(api_df)} records inserted")} + } diff --git a/analytics/dagster/src/assets/populate_owners_ban_addresses.py b/analytics/dagster/src/assets/populate_owners_ban_addresses.py new file mode 100644 index 000000000..ead1caaae --- /dev/null +++ b/analytics/dagster/src/assets/populate_owners_ban_addresses.py @@ -0,0 +1,170 @@ +from dagster import asset, Config, MetadataValue, AssetExecutionContext, Output +import requests +import pandas as pd +import psycopg2 +from io import StringIO + +class BANConfig(Config): + db_name: str = "isoprod" + db_user: str = "postgres" + db_password: str = "postgres" + db_host: str = "localhost" + db_port: str = "5432" + api_url: str = "https://api-adresse.data.gouv.fr/search/csv/" + csv_file_path: str = "temp_csv" + chunk_size: int = 10000 + max_files: int = 5 + disable_max_files: bool = False + +@asset(description="Return owners with no BAN address or a non-validated BAN address (score < 1).", required_resource_keys={"ban_config"}) +def owners_without_address(context: AssetExecutionContext): + config = context.resources.ban_config + + query = """ + SELECT + o.id as owner_id, + array_to_string(o.address_dgfip, ' ') as address_dgfip + FROM owners o + LEFT JOIN ban_addresses ba ON o.id = ba.ref_id + WHERE (ba.ref_id IS NULL) -- Propriétaires sans adresse + OR (ba.ref_id IS NOT NULL AND ba.address_kind = 'Owner' AND ba.score < 1); -- Propriétaires avec adresse non validée par Stéphanie + """ + + conn = psycopg2.connect( + dbname=config.db_name, + user=config.db_user, + password=config.db_password, + host=config.db_host, + port=config.db_port, + ) + df = pd.read_sql(query, conn) + conn.close() + + return df + +@asset(description="Split the owners DataFrame into multiple CSV files (chunks), store them to disk, and return the file paths as metadata.", required_resource_keys={"ban_config"}) +def create_csv_chunks_from_owners(context: AssetExecutionContext, owners_without_address): + config = context.resources.ban_config + + chunk_size = config.chunk_size + max_files = config.max_files + disable_max_files = config.disable_max_files + + num_chunks = len(owners_without_address) // chunk_size + 1 + + if not disable_max_files: + num_chunks = min(num_chunks, max_files) + + file_paths = [] + + for i in range(num_chunks): + start_idx = i * chunk_size + end_idx = (i + 1) * chunk_size + chunk = owners_without_address.iloc[start_idx:end_idx] + + chunk_file_path = f"{config.csv_file_path}_part_{i+1}.csv" + file_paths.append(chunk_file_path) + + chunk.to_csv(chunk_file_path, index=False, columns=["owner_id", "address_dgfip"]) + + context.log.info(f"CSV file created: {chunk_file_path}") + context.log.info(f"Preview of the CSV file:\n{chunk.head()}") + context.log.info(f"Generated {i + 1} out of {num_chunks} files") + + return Output(value=file_paths, metadata={"file_paths": MetadataValue.text(", ".join(file_paths))}) + + +@asset(description="Send each CSV chunk to the BAN address API, aggregate valid responses into a single CSV, and return the path to the aggregated CSV file.", required_resource_keys={"ban_config"}) +def send_csv_chunks_to_api(context: AssetExecutionContext, create_csv_chunks_from_owners): + config = context.resources.ban_config + + aggregated_file_path = f"{config.csv_file_path}_aggregated.csv" + + with open(aggregated_file_path, 'w') as aggregated_file: + first_file = True + + for file_path in create_csv_chunks_from_owners: + files = {'data': open(file_path, 'rb')} + data = {'columns': 'address_dgfip', 'citycode': 'geo_code'} + + response = requests.post(config.api_url, files=files, data=data) + + if response.status_code == 200: + api_data = pd.read_csv(StringIO(response.text)) + api_data.to_csv(aggregated_file, mode='a', index=False, header=first_file) + first_file = False + + context.log.info(f"Processed file: {file_path}") + else: + raise Exception(f"API request failed with status code {response.status_code}") + + return aggregated_file_path + +@asset(description="Parse the aggregated CSV from the BAN address API, insert valid owners' addresses into `ban_addresses`, and return the count of processed records.", required_resource_keys={"ban_config"}) +def parse_api_response_and_insert_owners_addresses(context: AssetExecutionContext, send_csv_chunks_to_api): + config = context.resources.ban_config + + api_df = pd.read_csv(send_csv_chunks_to_api) + + pd.set_option('display.max_columns', None) + pd.set_option('display.max_rows', None) + pd.set_option('display.width', None) + context.log.info(f"Preview of the aggregated CSV file:\n{api_df.head()}") + + conn = psycopg2.connect( + dbname=config.db_name, + user=config.db_user, + password=config.db_password, + host=config.db_host, + port=config.db_port, + ) + cursor = conn.cursor() + + filtered_df = api_df[api_df['result_status'] == 'ok'] + failed_rows = api_df[api_df['result_status'] != 'ok'] + context.log.warning(f"Number of owners with failed API results: {len(failed_rows)}") + + for _, row in filtered_df.iterrows(): + # L'API BAN renvoie des valeurs NaN pour les champs vides. Par exemple pour les lieux-dits il n'y a pas de numéro de rue ni de rue + row = row.apply(lambda x: None if pd.isna(x) else x) + + cursor.execute( + """ + INSERT INTO ban_addresses (ref_id, house_number, address, street, postal_code, city, latitude, longitude, score, ban_id, address_kind) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (ref_id, address_kind) + DO UPDATE SET + house_number = EXCLUDED.house_number, + address = EXCLUDED.address, + street = EXCLUDED.street, + postal_code = EXCLUDED.postal_code, + city = EXCLUDED.city, + latitude = EXCLUDED.latitude, + longitude = EXCLUDED.longitude, + score = EXCLUDED.score, + ban_id = EXCLUDED.ban_id; + """, + ( + row['owner_id'], + row['result_housenumber'], + row['result_label'], + row['result_street'], + row['result_postcode'], + row['result_city'], + row['latitude'], + row['longitude'], + row['result_score'], + row['result_id'], + "Owner" + ), + ) + + conn.commit() + cursor.close() + conn.close() + + context.log.info(f"{len(api_df)} records inserted successfully.") + + return { + "metadata": {"num_records": MetadataValue.text(f"{len(api_df)} records inserted")} + } diff --git a/analytics/dagster/src/definitions.py b/analytics/dagster/src/definitions.py index 5292c4067..6e56621e3 100644 --- a/analytics/dagster/src/definitions.py +++ b/analytics/dagster/src/definitions.py @@ -17,6 +17,9 @@ from .project import dbt_project from .assets import production_dbt from .assets.notion import dagster_notion_assets +from .assets import populate_owners_ban_addresses + +from .resources.ban_config import ban_config_resource warnings.filterwarnings("ignore", category=dagster.ExperimentalWarning) @@ -35,7 +38,7 @@ # Define job for running all assets daily_refresh_job = define_asset_job( name="production_job", - #selection=[""] + #selection=[""] ) # Schedule the job to run daily at midnight @@ -50,14 +53,15 @@ dagster_production_assets, dagster_notion_assets, # dagster_notion_assets, - *dbt_analytics_assets + *dbt_analytics_assets, + populate_owners_ban_addresses ], resources={ "dlt": dlt_resource, - "dbt": dbt_resource - + "dbt": dbt_resource, + "ban_config": ban_config_resource }, schedules=[ daily_refresh_schedule, ], -) \ No newline at end of file +) diff --git a/analytics/dagster/src/hello.py b/analytics/dagster/src/hello.py new file mode 100644 index 000000000..e4315af65 --- /dev/null +++ b/analytics/dagster/src/hello.py @@ -0,0 +1,17 @@ +from dagster import job, op +from dagster import execute_job, DagsterInstance, reconstructable + +# Définition d'un "op" qui représente une étape de traitement +@op +def hello_world(context): + context.log.info('Hello, World!') + +# Définition d'un job utilisant l'op +@job +def hello_world_job(): + hello_world() + +# Exécution du job localement +if __name__ == "__main__": + instance = DagsterInstance.ephemeral() # Utilisation de l'instance éphémère + result = execute_job(reconstructable(hello_world_job), instance=instance) diff --git a/analytics/dagster/src/jobs/owners_ban_addresses_job.py b/analytics/dagster/src/jobs/owners_ban_addresses_job.py new file mode 100644 index 000000000..b5834aa63 --- /dev/null +++ b/analytics/dagster/src/jobs/owners_ban_addresses_job.py @@ -0,0 +1,11 @@ +from dagster import job +from assets.populate_owners_ban_addresses import housings_without_address +from resources.ban_config import ban_config_resource + +@job( + resource_defs={ + "ban_config": ban_config_resource, + } +) +def ban_address_job(): + housings_without_address() diff --git a/analytics/dagster/src/resources/ban_config.py b/analytics/dagster/src/resources/ban_config.py new file mode 100644 index 000000000..68647afbe --- /dev/null +++ b/analytics/dagster/src/resources/ban_config.py @@ -0,0 +1,46 @@ +from pydantic_settings import BaseSettings +from pydantic import Field, field_validator +from dagster import resource + +class BANConfig(BaseSettings): + """Configuration for BAN integration.""" + + db_name: str = Field("isoprod", env="DB_NAME") + db_user: str = Field("postgres", env="DB_USER") + db_password: str = Field("postgres", env="DB_PASSWORD") + db_host: str = Field("localhost", env="DB_HOST") + db_port: str = Field("5432", env="DB_PORT") + + api_url: str = Field("https://api-adresse.data.gouv.fr/search/csv/", env="BAN_API_URL") + csv_file_path: str = Field("temp_csv", env="CSV_FILE_PATH") + + chunk_size: int = Field(10000, env="CHUNK_SIZE") + max_files: int = Field(5, env="MAX_FILES") + disable_max_files: bool = Field(False, env="DISABLE_MAX_FILES") + + @field_validator("chunk_size") + def chunk_size_positive(cls, v): + if v <= 0: + raise ValueError("chunk_size must be > 0") + return v + + class Config: + env_file = ".env" + env_file_encoding = "utf-8" + +@resource( + config_schema={ + "db_name": str, + "db_user": str, + "db_password": str, + "db_host": str, + "db_port": str, + "api_url": str, + "csv_file_path": str, + "chunk_size": int, + "max_files": int, + "disable_max_files": bool, + } +) +def ban_config_resource(init_context): + return BANConfig(**init_context.resource_config) diff --git a/analytics/dagster/workspace.yaml b/analytics/dagster/workspace.yaml index 9c76d4d40..8dc74f405 100644 --- a/analytics/dagster/workspace.yaml +++ b/analytics/dagster/workspace.yaml @@ -1,2 +1,8 @@ load_from: - python_module: src + - python_file: + relative_path: jobs/ban_address_jobs.py + +resources: + ban_config: + config: {} From 078f129480bd4e63dbc39268f1b93dc8028837ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Guillois?= Date: Thu, 2 Jan 2025 16:53:24 +0100 Subject: [PATCH 02/10] fix: add owners_ban_addresses_job definition --- analytics/dagster/src/definitions.py | 2 ++ analytics/dagster/src/jobs/owners_ban_addresses_job.py | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/analytics/dagster/src/definitions.py b/analytics/dagster/src/definitions.py index 6e56621e3..52ac85ca6 100644 --- a/analytics/dagster/src/definitions.py +++ b/analytics/dagster/src/definitions.py @@ -20,6 +20,7 @@ from .assets import populate_owners_ban_addresses from .resources.ban_config import ban_config_resource +from .jobs.owners_ban_addresses_job import owners_ban_addresses_job warnings.filterwarnings("ignore", category=dagster.ExperimentalWarning) @@ -64,4 +65,5 @@ schedules=[ daily_refresh_schedule, ], + jobs=[owners_ban_addresses_job] ) diff --git a/analytics/dagster/src/jobs/owners_ban_addresses_job.py b/analytics/dagster/src/jobs/owners_ban_addresses_job.py index b5834aa63..eaa49ffaf 100644 --- a/analytics/dagster/src/jobs/owners_ban_addresses_job.py +++ b/analytics/dagster/src/jobs/owners_ban_addresses_job.py @@ -7,5 +7,5 @@ "ban_config": ban_config_resource, } ) -def ban_address_job(): +def owners_ban_addresses_job(): housings_without_address() From 05814dd6ea9ea2251ec16ac3b565e2da18c8c008 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Guillois?= Date: Thu, 2 Jan 2025 17:05:04 +0100 Subject: [PATCH 03/10] fix: job import --- analytics/dagster/src/jobs/owners_ban_addresses_job.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/analytics/dagster/src/jobs/owners_ban_addresses_job.py b/analytics/dagster/src/jobs/owners_ban_addresses_job.py index eaa49ffaf..f9feae84d 100644 --- a/analytics/dagster/src/jobs/owners_ban_addresses_job.py +++ b/analytics/dagster/src/jobs/owners_ban_addresses_job.py @@ -1,6 +1,6 @@ from dagster import job -from assets.populate_owners_ban_addresses import housings_without_address -from resources.ban_config import ban_config_resource +from dagster.src.assets.populate_owners_ban_addresses import housings_without_address +from dagster.src.resources.ban_config import ban_config_resource @job( resource_defs={ From 0cf11d01db2b54d79c8d95d9eeb53b000c98ee4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Guillois?= Date: Mon, 6 Jan 2025 11:24:15 +0100 Subject: [PATCH 04/10] refactoring: remove useless configuration class --- .../src/assets/populate_owners_ban_addresses.py | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/analytics/dagster/src/assets/populate_owners_ban_addresses.py b/analytics/dagster/src/assets/populate_owners_ban_addresses.py index ead1caaae..ab4f0fec8 100644 --- a/analytics/dagster/src/assets/populate_owners_ban_addresses.py +++ b/analytics/dagster/src/assets/populate_owners_ban_addresses.py @@ -4,18 +4,6 @@ import psycopg2 from io import StringIO -class BANConfig(Config): - db_name: str = "isoprod" - db_user: str = "postgres" - db_password: str = "postgres" - db_host: str = "localhost" - db_port: str = "5432" - api_url: str = "https://api-adresse.data.gouv.fr/search/csv/" - csv_file_path: str = "temp_csv" - chunk_size: int = 10000 - max_files: int = 5 - disable_max_files: bool = False - @asset(description="Return owners with no BAN address or a non-validated BAN address (score < 1).", required_resource_keys={"ban_config"}) def owners_without_address(context: AssetExecutionContext): config = context.resources.ban_config From 539776515021455e921ffbe5eaf3dbce1f4db4d9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Guillois?= Date: Tue, 14 Jan 2025 10:27:50 +0100 Subject: [PATCH 05/10] feat: add housings ban job --- analytics/dagster/src/definitions.py | 5 +++-- .../dagster/src/jobs/housings_ban_addresses_job.py | 11 +++++++++++ .../dagster/src/jobs/owners_ban_addresses_job.py | 4 ++-- analytics/dagster/src/resources/ban_config.py | 2 -- 4 files changed, 16 insertions(+), 6 deletions(-) create mode 100644 analytics/dagster/src/jobs/housings_ban_addresses_job.py diff --git a/analytics/dagster/src/definitions.py b/analytics/dagster/src/definitions.py index b2a3b4a54..a5bd5ee7c 100644 --- a/analytics/dagster/src/definitions.py +++ b/analytics/dagster/src/definitions.py @@ -24,8 +24,8 @@ from .assets import production_dbt from .assets import populate_owners_ban_addresses +from .assets import populate_housings_ban_addresses from .resources.ban_config import ban_config_resource -from .jobs.owners_ban_addresses_job import owners_ban_addresses_job from .assets import clever @@ -55,7 +55,7 @@ name="datawarehouse_synchronize_and_build", selection=AssetSelection.assets(*[*dwh_assets, *dbt_analytics_assets, *["setup_duckdb", "clevercloud_login_and_restart"]]) - AssetSelection.assets( - *[ + *[ setup_s3_connection, check_ff_lovac_on_duckdb, import_cerema_ff_lovac_data_from_s3_to_duckdb, @@ -94,6 +94,7 @@ # dagster_notion_assets, # dagster_notion_assets, populate_owners_ban_addresses, + populate_housings_ban_addresses, *dwh_assets, *dbt_analytics_assets, *clever_assets_assets diff --git a/analytics/dagster/src/jobs/housings_ban_addresses_job.py b/analytics/dagster/src/jobs/housings_ban_addresses_job.py new file mode 100644 index 000000000..f64a942f7 --- /dev/null +++ b/analytics/dagster/src/jobs/housings_ban_addresses_job.py @@ -0,0 +1,11 @@ +from dagster import job +from dagster.src.assets.populate_owners_ban_addresses import housings_without_address +from dagster.src.resources.ban_config import ban_config_resource + +@job( + resource_defs={ + "ban_config": ban_config_resource, + } +) +def housings_ban_addresses_job(): + housings_without_address() diff --git a/analytics/dagster/src/jobs/owners_ban_addresses_job.py b/analytics/dagster/src/jobs/owners_ban_addresses_job.py index f9feae84d..12558323e 100644 --- a/analytics/dagster/src/jobs/owners_ban_addresses_job.py +++ b/analytics/dagster/src/jobs/owners_ban_addresses_job.py @@ -1,5 +1,5 @@ from dagster import job -from dagster.src.assets.populate_owners_ban_addresses import housings_without_address +from dagster.src.assets.populate_owners_ban_addresses import owners_without_address from dagster.src.resources.ban_config import ban_config_resource @job( @@ -8,4 +8,4 @@ } ) def owners_ban_addresses_job(): - housings_without_address() + owners_without_address() diff --git a/analytics/dagster/src/resources/ban_config.py b/analytics/dagster/src/resources/ban_config.py index 68647afbe..c6565b368 100644 --- a/analytics/dagster/src/resources/ban_config.py +++ b/analytics/dagster/src/resources/ban_config.py @@ -3,8 +3,6 @@ from dagster import resource class BANConfig(BaseSettings): - """Configuration for BAN integration.""" - db_name: str = Field("isoprod", env="DB_NAME") db_user: str = Field("postgres", env="DB_USER") db_password: str = Field("postgres", env="DB_PASSWORD") From cea47c0e1d3d0d36844e8d15ff86fa9bdce613b8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Guillois?= Date: Mon, 27 Jan 2025 16:36:24 +0100 Subject: [PATCH 06/10] refactor: remove unnecessary debug log --- .../dagster/src/assets/populate_housings_ban_addresses.py | 5 ----- .../dagster/src/assets/populate_owners_ban_addresses.py | 5 ----- 2 files changed, 10 deletions(-) diff --git a/analytics/dagster/src/assets/populate_housings_ban_addresses.py b/analytics/dagster/src/assets/populate_housings_ban_addresses.py index 67a439c72..3415b4470 100644 --- a/analytics/dagster/src/assets/populate_housings_ban_addresses.py +++ b/analytics/dagster/src/assets/populate_housings_ban_addresses.py @@ -71,11 +71,6 @@ def parse_api_response_and_insert_housing_addresses(context: AssetExecutionConte api_df = pd.read_csv(StringIO(send_csv_to_api)) - pd.set_option('display.max_columns', None) - pd.set_option('display.max_rows', None) - pd.set_option('display.width', None) - context.log.info(f"Preview of the CSV file:\n{api_df.head()}") - conn = psycopg2.connect( dbname=config.db_name, user=config.db_user, diff --git a/analytics/dagster/src/assets/populate_owners_ban_addresses.py b/analytics/dagster/src/assets/populate_owners_ban_addresses.py index ab4f0fec8..b80fa2b0c 100644 --- a/analytics/dagster/src/assets/populate_owners_ban_addresses.py +++ b/analytics/dagster/src/assets/populate_owners_ban_addresses.py @@ -94,11 +94,6 @@ def parse_api_response_and_insert_owners_addresses(context: AssetExecutionContex api_df = pd.read_csv(send_csv_chunks_to_api) - pd.set_option('display.max_columns', None) - pd.set_option('display.max_rows', None) - pd.set_option('display.width', None) - context.log.info(f"Preview of the aggregated CSV file:\n{api_df.head()}") - conn = psycopg2.connect( dbname=config.db_name, user=config.db_user, From 1dbbe72b5a06257a7ceb2935a5fbb5a7cc8910e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Guillois?= Date: Mon, 27 Jan 2025 16:47:52 +0100 Subject: [PATCH 07/10] feat: optimize data insertion with batch processing --- .talismanrc | 2 +- .../assets/populate_housings_ban_addresses.py | 75 +++++++++---------- 2 files changed, 38 insertions(+), 39 deletions(-) diff --git a/.talismanrc b/.talismanrc index 65c9228a7..d9e2b3c12 100644 --- a/.talismanrc +++ b/.talismanrc @@ -4,7 +4,7 @@ fileignoreconfig: - filename: README.md checksum: df312ccb4c75fc4c2441a1f7f2c7817ee98ffb3065c78d5d7d6addf6ab129176 - filename: analytics/dagster/src/assets/populate_housings_ban_addresses.py - checksum: 4c16939cca930517d45f30128d79332784cf09c933a1510f0f02e949b1eaac87 + checksum: 08c9deed34bb3cce9e77601a1db4d9e8ca8acc5862c8ee7f73f6164c30a45946 - filename: analytics/dagster/src/assets/populate_owners_ban_addresses.py checksum: b068d130431a26be58c78e4f14346be53d318110c4020c7f7212069cbd190bf2 - filename: analytics/dagster/src/resources/ban_config.py diff --git a/analytics/dagster/src/assets/populate_housings_ban_addresses.py b/analytics/dagster/src/assets/populate_housings_ban_addresses.py index 3415b4470..3efbff625 100644 --- a/analytics/dagster/src/assets/populate_housings_ban_addresses.py +++ b/analytics/dagster/src/assets/populate_housings_ban_addresses.py @@ -3,7 +3,7 @@ import pandas as pd import psycopg2 from io import StringIO - +from sqlalchemy import create_engine @asset(description="Return housing records from `fast_housing` that have no matching entry in `ban_addresses`.", required_resource_keys={"ban_config"}) def housings_without_address(context: AssetExecutionContext): @@ -71,47 +71,46 @@ def parse_api_response_and_insert_housing_addresses(context: AssetExecutionConte api_df = pd.read_csv(StringIO(send_csv_to_api)) - conn = psycopg2.connect( - dbname=config.db_name, - user=config.db_user, - password=config.db_password, - host=config.db_host, - port=config.db_port, - ) - cursor = conn.cursor() - filtered_df = api_df[api_df['result_status'] == 'ok'] failed_rows = api_df[api_df['result_status'] != 'ok'] context.log.warning(f"Number of housings with failed API results: {len(failed_rows)}") - for _, row in filtered_df.iterrows(): - - # L'API BAN renvoie des valeurs NaN pour les champs vides. Par exemple pour les lieux-dits il n'y a pas de numéro de rue ni de rue - row = row.apply(lambda x: None if pd.isna(x) else x) - - cursor.execute( - """ - INSERT INTO ban_addresses (ref_id, house_number, address, street, postal_code, city, latitude, longitude, score, ban_id, address_kind) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) - """, - ( - row['housing_id'], - row['result_housenumber'], - row['result_label'], - row['result_street'], - row['result_postcode'], - row['result_city'], - row['latitude'], - row['longitude'], - row['result_score'], - row['result_id'], - "Housing" - ), - ) - - conn.commit() - cursor.close() - conn.close() + filtered_df = filtered_df.applymap(lambda x: None if pd.isna(x) else x) + + engine = create_engine(f'postgresql://{config.db_user}:{config.db_password}@{config.db_host}:{config.db_port}/{config.db_name}') + + filtered_df.to_sql( + 'ban_addresses', + engine, + if_exists='append', + index=False, + columns=[ + 'housing_id', + 'result_housenumber', + 'result_label', + 'result_street', + 'result_postcode', + 'result_city', + 'latitude', + 'longitude', + 'result_score', + 'result_id', + 'address_kind' + ], + dtype={ + 'housing_id': 'INTEGER', + 'result_housenumber': 'TEXT', + 'result_label': 'TEXT', + 'result_street': 'TEXT', + 'result_postcode': 'TEXT', + 'result_city': 'TEXT', + 'latitude': 'FLOAT', + 'longitude': 'FLOAT', + 'result_score': 'FLOAT', + 'result_id': 'TEXT', + 'address_kind': 'TEXT' + } + ) context.log.info(f"{len(api_df)} records inserted successfully.") From 0a7a5fe5aac795ca84ab6f3cd5a5e2fe15ee87b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Guillois?= Date: Mon, 27 Jan 2025 17:20:03 +0100 Subject: [PATCH 08/10] feat: optimize and refactor BAN API response parsing and insertion workflows --- .talismanrc | 6 +- .../assets/populate_housings_ban_addresses.py | 124 +++++++++--------- .../assets/populate_owners_ban_addresses.py | 116 ++++++++-------- .../src/jobs/housings_ban_addresses_job.py | 3 + .../src/jobs/owners_ban_addresses_job.py | 3 + analytics/dagster/src/resources/ban_config.py | 11 -- .../src/resources/database_resources.py | 41 ++++++ 7 files changed, 174 insertions(+), 130 deletions(-) create mode 100644 analytics/dagster/src/resources/database_resources.py diff --git a/.talismanrc b/.talismanrc index d9e2b3c12..d491e668b 100644 --- a/.talismanrc +++ b/.talismanrc @@ -4,11 +4,13 @@ fileignoreconfig: - filename: README.md checksum: df312ccb4c75fc4c2441a1f7f2c7817ee98ffb3065c78d5d7d6addf6ab129176 - filename: analytics/dagster/src/assets/populate_housings_ban_addresses.py - checksum: 08c9deed34bb3cce9e77601a1db4d9e8ca8acc5862c8ee7f73f6164c30a45946 + checksum: 66b41821bccc209598ed3d082e5666102edf52ae854b41db3f0b3fe3640657b7 - filename: analytics/dagster/src/assets/populate_owners_ban_addresses.py - checksum: b068d130431a26be58c78e4f14346be53d318110c4020c7f7212069cbd190bf2 + checksum: 6d33b062918f2957e659ddf9f63413fe273ab88b34ba31932d8b9cfda996a1f1 - filename: analytics/dagster/src/resources/ban_config.py checksum: 034c6924978983da0ca5897bb06b64598a5a813dc93d1d9e8f8a62da952d4d22 +- filename: analytics/dagster/src/resources/database_resources.py + checksum: 12fb6c30e1a0378c39cd1da759ec1ece28bda86ea6353c3ea0076c2d94da682e - filename: frontend/.env.example checksum: 7e2a5ff197c49ff9f715b3d189da7282bdb40de53ea49735e9f183ece19168fc - filename: frontend/src/components/Draft/DraftSender.tsx diff --git a/analytics/dagster/src/assets/populate_housings_ban_addresses.py b/analytics/dagster/src/assets/populate_housings_ban_addresses.py index 3efbff625..79ab414a4 100644 --- a/analytics/dagster/src/assets/populate_housings_ban_addresses.py +++ b/analytics/dagster/src/assets/populate_housings_ban_addresses.py @@ -1,14 +1,13 @@ -from dagster import asset, MetadataValue, AssetExecutionContext +from dagster import asset, MetadataValue, AssetExecutionContext, resource, op import requests import pandas as pd -import psycopg2 from io import StringIO -from sqlalchemy import create_engine -@asset(description="Return housing records from `fast_housing` that have no matching entry in `ban_addresses`.", required_resource_keys={"ban_config"}) +@asset( + description="Return housing records from `fast_housing` that have no matching entry in `ban_addresses`.", + required_resource_keys={"psycopg2_connection"} +) def housings_without_address(context: AssetExecutionContext): - config = context.resources.ban_config - query = """ SELECT fh.id as housing_id, array_to_string(fh.address_dgfip, ' ') as address_dgfip, fh.geo_code FROM fast_housing fh @@ -16,20 +15,19 @@ def housings_without_address(context: AssetExecutionContext): WHERE ba.ref_id IS NULL; """ - conn = psycopg2.connect( - dbname=config.db_name, - user=config.db_user, - password=config.db_password, - host=config.db_host, - port=config.db_port, - ) - df = pd.read_sql(query, conn) - conn.close() + try: + with context.resources.psycopg2_connection as conn: + df = pd.read_sql(query, conn) + except Exception as e: + context.log.error(f"Error executing query: {e}") + raise return df - -@asset(description="Write housing records without addresses to a CSV file, log the file path and a preview, then return the file path as metadata.", required_resource_keys={"ban_config"}) +@asset( + description="Write housing records without addresses to a CSV file, log the file path and a preview, then return the file path as metadata.", + required_resource_keys={"ban_config"} +) def create_csv_from_housings(context: AssetExecutionContext, housings_without_address): config = context.resources.ban_config @@ -48,8 +46,10 @@ def create_csv_from_housings(context: AssetExecutionContext, housings_without_ad "metadata": {"file_path": MetadataValue.text(csv_file_path)} } - -@asset(deps=[create_csv_from_housings], description="Send the local CSV file to the BAN address API for address processing. Raises an exception if the request fails.", required_resource_keys={"ban_config"}) +@asset( + deps=[create_csv_from_housings], description="Send the local CSV file to the BAN address API for address processing. Raises an exception if the request fails.", + required_resource_keys={"ban_config"} +) def send_csv_to_api(context: AssetExecutionContext): config = context.resources.ban_config @@ -65,55 +65,59 @@ def send_csv_to_api(context: AssetExecutionContext): raise Exception(f"API request failed with status code {response.status_code}") -@asset(description="Parse the CSV response from the BAN address API, insert valid addresses into the `ban_addresses` table, log a preview and any failed results, then return the total number of inserted records as metadata.", required_resource_keys={"ban_config"}) +@asset( + description="Parse the CSV response from the BAN address API, insert valid addresses into the `ban_addresses` table, log a preview and any failed results, then return the total number of inserted records as metadata.", + required_resource_keys={"sqlalchemy_engine"} +) def parse_api_response_and_insert_housing_addresses(context: AssetExecutionContext, send_csv_to_api): - config = context.resources.ban_config - api_df = pd.read_csv(StringIO(send_csv_to_api)) filtered_df = api_df[api_df['result_status'] == 'ok'] failed_rows = api_df[api_df['result_status'] != 'ok'] context.log.warning(f"Number of housings with failed API results: {len(failed_rows)}") + if not failed_rows.empty: + context.log.warning(f"Failed rows preview:\n{failed_rows.head(5)}") filtered_df = filtered_df.applymap(lambda x: None if pd.isna(x) else x) - - engine = create_engine(f'postgresql://{config.db_user}:{config.db_password}@{config.db_host}:{config.db_port}/{config.db_name}') - - filtered_df.to_sql( - 'ban_addresses', - engine, - if_exists='append', - index=False, - columns=[ - 'housing_id', - 'result_housenumber', - 'result_label', - 'result_street', - 'result_postcode', - 'result_city', - 'latitude', - 'longitude', - 'result_score', - 'result_id', - 'address_kind' - ], - dtype={ - 'housing_id': 'INTEGER', - 'result_housenumber': 'TEXT', - 'result_label': 'TEXT', - 'result_street': 'TEXT', - 'result_postcode': 'TEXT', - 'result_city': 'TEXT', - 'latitude': 'FLOAT', - 'longitude': 'FLOAT', - 'result_score': 'FLOAT', - 'result_id': 'TEXT', - 'address_kind': 'TEXT' - } - ) - - context.log.info(f"{len(api_df)} records inserted successfully.") + filtered_df['address_kind'] = "Housing" + engine = context.resources.sqlalchemy_engine + + with engine.begin() as connection: + filtered_df.to_sql( + 'ban_addresses', + connection, + if_exists='append', + index=False, + columns=[ + 'housing_id', + 'result_housenumber', + 'result_label', + 'result_street', + 'result_postcode', + 'result_city', + 'latitude', + 'longitude', + 'result_score', + 'result_id', + 'address_kind' + ], + dtype={ + 'housing_id': 'INTEGER', + 'result_housenumber': 'TEXT', + 'result_label': 'TEXT', + 'result_street': 'TEXT', + 'result_postcode': 'TEXT', + 'result_city': 'TEXT', + 'latitude': 'FLOAT', + 'longitude': 'FLOAT', + 'result_score': 'FLOAT', + 'result_id': 'TEXT', + 'address_kind': 'TEXT' + } + ) + + context.log.info(f"{len(filtered_df)} valid records inserted successfully.") return { - "metadata": {"num_records": MetadataValue.text(f"{len(api_df)} records inserted")} + "metadata": {"num_records": MetadataValue.text(f"{len(filtered_df)} records inserted")} } diff --git a/analytics/dagster/src/assets/populate_owners_ban_addresses.py b/analytics/dagster/src/assets/populate_owners_ban_addresses.py index b80fa2b0c..c44bafb7b 100644 --- a/analytics/dagster/src/assets/populate_owners_ban_addresses.py +++ b/analytics/dagster/src/assets/populate_owners_ban_addresses.py @@ -1,13 +1,13 @@ -from dagster import asset, Config, MetadataValue, AssetExecutionContext, Output +from dagster import asset, MetadataValue, AssetExecutionContext, Output, op import requests import pandas as pd -import psycopg2 from io import StringIO -@asset(description="Return owners with no BAN address or a non-validated BAN address (score < 1).", required_resource_keys={"ban_config"}) +@asset( + description="Return owners with no BAN address or a non-validated BAN address (score < 1).", + required_resource_keys={"psycopg2_connection"} +) def owners_without_address(context: AssetExecutionContext): - config = context.resources.ban_config - query = """ SELECT o.id as owner_id, @@ -18,19 +18,19 @@ def owners_without_address(context: AssetExecutionContext): OR (ba.ref_id IS NOT NULL AND ba.address_kind = 'Owner' AND ba.score < 1); -- Propriétaires avec adresse non validée par Stéphanie """ - conn = psycopg2.connect( - dbname=config.db_name, - user=config.db_user, - password=config.db_password, - host=config.db_host, - port=config.db_port, - ) - df = pd.read_sql(query, conn) - conn.close() + try: + with context.resources.psycopg2_connection as conn: + df = pd.read_sql(query, conn) + except Exception as e: + context.log.error(f"Error executing query: {e}") + raise return df -@asset(description="Split the owners DataFrame into multiple CSV files (chunks), store them to disk, and return the file paths as metadata.", required_resource_keys={"ban_config"}) +@asset( + description="Split the owners DataFrame into multiple CSV files (chunks), store them to disk, and return the file paths as metadata.", + required_resource_keys={"ban_config"} +) def create_csv_chunks_from_owners(context: AssetExecutionContext, owners_without_address): config = context.resources.ban_config @@ -62,7 +62,10 @@ def create_csv_chunks_from_owners(context: AssetExecutionContext, owners_without return Output(value=file_paths, metadata={"file_paths": MetadataValue.text(", ".join(file_paths))}) -@asset(description="Send each CSV chunk to the BAN address API, aggregate valid responses into a single CSV, and return the path to the aggregated CSV file.", required_resource_keys={"ban_config"}) +@asset( + description="Send each CSV chunk to the BAN address API, aggregate valid responses into a single CSV, and return the path to the aggregated CSV file.", + required_resource_keys={"ban_config"} +) def send_csv_chunks_to_api(context: AssetExecutionContext, create_csv_chunks_from_owners): config = context.resources.ban_config @@ -88,33 +91,47 @@ def send_csv_chunks_to_api(context: AssetExecutionContext, create_csv_chunks_fro return aggregated_file_path -@asset(description="Parse the aggregated CSV from the BAN address API, insert valid owners' addresses into `ban_addresses`, and return the count of processed records.", required_resource_keys={"ban_config"}) +@asset( + description="Parse the aggregated CSV from the BAN address API, insert valid owners' addresses into `ban_addresses`, and return the count of processed records.", + required_resource_keys={"psycopg2_connection"} +) def parse_api_response_and_insert_owners_addresses(context: AssetExecutionContext, send_csv_chunks_to_api): - config = context.resources.ban_config - api_df = pd.read_csv(send_csv_chunks_to_api) - conn = psycopg2.connect( - dbname=config.db_name, - user=config.db_user, - password=config.db_password, - host=config.db_host, - port=config.db_port, - ) - cursor = conn.cursor() - filtered_df = api_df[api_df['result_status'] == 'ok'] failed_rows = api_df[api_df['result_status'] != 'ok'] context.log.warning(f"Number of owners with failed API results: {len(failed_rows)}") - for _, row in filtered_df.iterrows(): - # L'API BAN renvoie des valeurs NaN pour les champs vides. Par exemple pour les lieux-dits il n'y a pas de numéro de rue ni de rue - row = row.apply(lambda x: None if pd.isna(x) else x) - - cursor.execute( - """ + filtered_df = filtered_df.applymap(lambda x: None if pd.isna(x) else x) + filtered_df['address_kind'] = "Owner" + + with context.resources.psycopg2_connection as conn: + with conn.cursor() as cursor: + cursor.execute(""" + CREATE TEMP TABLE temp_ban_addresses ( + ref_id TEXT, + house_number TEXT, + address TEXT, + street TEXT, + postal_code TEXT, + city TEXT, + latitude FLOAT, + longitude FLOAT, + score FLOAT, + ban_id TEXT, + address_kind TEXT + ); + """) + + buffer = StringIO() + filtered_df.to_csv(buffer, sep='\t', header=False, index=False) + buffer.seek(0) + cursor.copy_from(buffer, 'temp_ban_addresses', sep='\t') + + cursor.execute(""" INSERT INTO ban_addresses (ref_id, house_number, address, street, postal_code, city, latitude, longitude, score, ban_id, address_kind) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + SELECT ref_id, house_number, address, street, postal_code, city, latitude, longitude, score, ban_id, address_kind + FROM temp_ban_addresses ON CONFLICT (ref_id, address_kind) DO UPDATE SET house_number = EXCLUDED.house_number, @@ -126,28 +143,13 @@ def parse_api_response_and_insert_owners_addresses(context: AssetExecutionContex longitude = EXCLUDED.longitude, score = EXCLUDED.score, ban_id = EXCLUDED.ban_id; - """, - ( - row['owner_id'], - row['result_housenumber'], - row['result_label'], - row['result_street'], - row['result_postcode'], - row['result_city'], - row['latitude'], - row['longitude'], - row['result_score'], - row['result_id'], - "Owner" - ), - ) - - conn.commit() - cursor.close() - conn.close() - - context.log.info(f"{len(api_df)} records inserted successfully.") + """) + cursor.execute("DROP TABLE temp_ban_addresses;") + + conn.commit() + + context.log.info(f"{len(filtered_df)} valid records inserted successfully.") return { - "metadata": {"num_records": MetadataValue.text(f"{len(api_df)} records inserted")} + "metadata": {"num_records": MetadataValue.text(f"{len(filtered_df)} records inserted")} } diff --git a/analytics/dagster/src/jobs/housings_ban_addresses_job.py b/analytics/dagster/src/jobs/housings_ban_addresses_job.py index f64a942f7..131aeb9a4 100644 --- a/analytics/dagster/src/jobs/housings_ban_addresses_job.py +++ b/analytics/dagster/src/jobs/housings_ban_addresses_job.py @@ -1,10 +1,13 @@ from dagster import job from dagster.src.assets.populate_owners_ban_addresses import housings_without_address from dagster.src.resources.ban_config import ban_config_resource +from dagster.src.resources import sqlalchemy_engine_resource, postgres_resource @job( resource_defs={ "ban_config": ban_config_resource, + "sqlalchemy_engine": sqlalchemy_engine_resource, + "postgres": postgres_resource, } ) def housings_ban_addresses_job(): diff --git a/analytics/dagster/src/jobs/owners_ban_addresses_job.py b/analytics/dagster/src/jobs/owners_ban_addresses_job.py index 12558323e..a412613e1 100644 --- a/analytics/dagster/src/jobs/owners_ban_addresses_job.py +++ b/analytics/dagster/src/jobs/owners_ban_addresses_job.py @@ -1,10 +1,13 @@ from dagster import job from dagster.src.assets.populate_owners_ban_addresses import owners_without_address from dagster.src.resources.ban_config import ban_config_resource +from dagster.src.resources import sqlalchemy_engine_resource, postgres_resource @job( resource_defs={ "ban_config": ban_config_resource, + "sqlalchemy_engine": sqlalchemy_engine_resource, + "postgres": postgres_resource, } ) def owners_ban_addresses_job(): diff --git a/analytics/dagster/src/resources/ban_config.py b/analytics/dagster/src/resources/ban_config.py index c6565b368..07f35ac40 100644 --- a/analytics/dagster/src/resources/ban_config.py +++ b/analytics/dagster/src/resources/ban_config.py @@ -3,12 +3,6 @@ from dagster import resource class BANConfig(BaseSettings): - db_name: str = Field("isoprod", env="DB_NAME") - db_user: str = Field("postgres", env="DB_USER") - db_password: str = Field("postgres", env="DB_PASSWORD") - db_host: str = Field("localhost", env="DB_HOST") - db_port: str = Field("5432", env="DB_PORT") - api_url: str = Field("https://api-adresse.data.gouv.fr/search/csv/", env="BAN_API_URL") csv_file_path: str = Field("temp_csv", env="CSV_FILE_PATH") @@ -28,11 +22,6 @@ class Config: @resource( config_schema={ - "db_name": str, - "db_user": str, - "db_password": str, - "db_host": str, - "db_port": str, "api_url": str, "csv_file_path": str, "chunk_size": int, diff --git a/analytics/dagster/src/resources/database_resources.py b/analytics/dagster/src/resources/database_resources.py new file mode 100644 index 000000000..dcbe9e37e --- /dev/null +++ b/analytics/dagster/src/resources/database_resources.py @@ -0,0 +1,41 @@ +from dagster import resource +from sqlalchemy import create_engine +import psycopg2 + +@resource(config_schema={ + "db_name": str, + "db_user": str, + "db_password": str, + "db_host": str, + "db_port": int, +}) +def psycopg2_connection_resource(init_context): + config = init_context.resource_config + conn = psycopg2.connect( + dbname=config["db_name"], + user=config["db_user"], + password=config["db_password"], + host=config["db_host"], + port=config["db_port"], + ) + try: + yield conn + finally: + conn.close() + +@resource(config_schema={ + "db_name": str, + "db_user": str, + "db_password": str, + "db_host": str, + "db_port": int, +}) +def sqlalchemy_engine_resource(init_context): + config = init_context.resource_config + engine = create_engine( + f'postgresql://{config["db_user"]}:{config["db_password"]}@{config["db_host"]}:{config["db_port"]}/{config["db_name"]}' + ) + try: + yield engine + finally: + engine.dispose() From e83fad1e8c1d0ca857e96f0275d18a9339652c5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Guillois?= Date: Mon, 27 Jan 2025 17:21:09 +0100 Subject: [PATCH 09/10] refactor: remove unnecessary debug log --- .../dagster/src/assets/populate_housings_ban_addresses.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/analytics/dagster/src/assets/populate_housings_ban_addresses.py b/analytics/dagster/src/assets/populate_housings_ban_addresses.py index 79ab414a4..be6d0833b 100644 --- a/analytics/dagster/src/assets/populate_housings_ban_addresses.py +++ b/analytics/dagster/src/assets/populate_housings_ban_addresses.py @@ -36,12 +36,6 @@ def create_csv_from_housings(context: AssetExecutionContext, housings_without_ad context.log.info(f"CSV file created at: {csv_file_path}") - df = pd.read_csv(csv_file_path) - pd.set_option('display.max_columns', None) - pd.set_option('display.max_rows', None) - pd.set_option('display.width', None) - context.log.info(f"Preview of the CSV file:\n{df.head()}") - return { "metadata": {"file_path": MetadataValue.text(csv_file_path)} } From 62b71ab7da8c37e8a471d3b1d631120dbadf7f07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Guillois?= Date: Tue, 28 Jan 2025 10:14:10 +0100 Subject: [PATCH 10/10] refactoring: remove hello world + fix psycopg2_connection resource name --- analytics/dagster/src/hello.py | 17 ----------------- .../src/jobs/housings_ban_addresses_job.py | 4 ++-- .../src/jobs/owners_ban_addresses_job.py | 4 ++-- 3 files changed, 4 insertions(+), 21 deletions(-) delete mode 100644 analytics/dagster/src/hello.py diff --git a/analytics/dagster/src/hello.py b/analytics/dagster/src/hello.py deleted file mode 100644 index e4315af65..000000000 --- a/analytics/dagster/src/hello.py +++ /dev/null @@ -1,17 +0,0 @@ -from dagster import job, op -from dagster import execute_job, DagsterInstance, reconstructable - -# Définition d'un "op" qui représente une étape de traitement -@op -def hello_world(context): - context.log.info('Hello, World!') - -# Définition d'un job utilisant l'op -@job -def hello_world_job(): - hello_world() - -# Exécution du job localement -if __name__ == "__main__": - instance = DagsterInstance.ephemeral() # Utilisation de l'instance éphémère - result = execute_job(reconstructable(hello_world_job), instance=instance) diff --git a/analytics/dagster/src/jobs/housings_ban_addresses_job.py b/analytics/dagster/src/jobs/housings_ban_addresses_job.py index 131aeb9a4..027b4a0d1 100644 --- a/analytics/dagster/src/jobs/housings_ban_addresses_job.py +++ b/analytics/dagster/src/jobs/housings_ban_addresses_job.py @@ -1,13 +1,13 @@ from dagster import job from dagster.src.assets.populate_owners_ban_addresses import housings_without_address from dagster.src.resources.ban_config import ban_config_resource -from dagster.src.resources import sqlalchemy_engine_resource, postgres_resource +from dagster.src.resources import sqlalchemy_engine_resource, psycopg2_connection_resource @job( resource_defs={ "ban_config": ban_config_resource, "sqlalchemy_engine": sqlalchemy_engine_resource, - "postgres": postgres_resource, + "psycopg2_connection_resource": psycopg2_connection_resource, } ) def housings_ban_addresses_job(): diff --git a/analytics/dagster/src/jobs/owners_ban_addresses_job.py b/analytics/dagster/src/jobs/owners_ban_addresses_job.py index a412613e1..f23aca8bf 100644 --- a/analytics/dagster/src/jobs/owners_ban_addresses_job.py +++ b/analytics/dagster/src/jobs/owners_ban_addresses_job.py @@ -1,13 +1,13 @@ from dagster import job from dagster.src.assets.populate_owners_ban_addresses import owners_without_address from dagster.src.resources.ban_config import ban_config_resource -from dagster.src.resources import sqlalchemy_engine_resource, postgres_resource +from dagster.src.resources import sqlalchemy_engine_resource, psycopg2_connection_resource @job( resource_defs={ "ban_config": ban_config_resource, "sqlalchemy_engine": sqlalchemy_engine_resource, - "postgres": postgres_resource, + "psycopg2_connection": psycopg2_connection_resource, } ) def owners_ban_addresses_job():