Skip to content

Commit

Permalink
Merge pull request #1070 from MTES-MCT/dagster-ban-address
Browse files Browse the repository at this point in the history
feat: add new jobs and assets for retrieving BAN addresses for owner and housings
  • Loading branch information
loicguillois authored Jan 28, 2025
2 parents ad53917 + 62b71ab commit bc54d16
Show file tree
Hide file tree
Showing 10 changed files with 400 additions and 2 deletions.
8 changes: 8 additions & 0 deletions .talismanrc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@ fileignoreconfig:
checksum: f7719ba0d36160d97e80ee15cb5415b601354576929e36df0596c7d192465cfb
- filename: README.md
checksum: df312ccb4c75fc4c2441a1f7f2c7817ee98ffb3065c78d5d7d6addf6ab129176
- filename: analytics/dagster/src/assets/populate_housings_ban_addresses.py
checksum: 66b41821bccc209598ed3d082e5666102edf52ae854b41db3f0b3fe3640657b7
- filename: analytics/dagster/src/assets/populate_owners_ban_addresses.py
checksum: 6d33b062918f2957e659ddf9f63413fe273ab88b34ba31932d8b9cfda996a1f1
- filename: analytics/dagster/src/resources/ban_config.py
checksum: 034c6924978983da0ca5897bb06b64598a5a813dc93d1d9e8f8a62da952d4d22
- filename: analytics/dagster/src/resources/database_resources.py
checksum: 12fb6c30e1a0378c39cd1da759ec1ece28bda86ea6353c3ea0076c2d94da682e
- filename: frontend/.env.example
checksum: 7e2a5ff197c49ff9f715b3d189da7282bdb40de53ea49735e9f183ece19168fc
- filename: frontend/src/components/Draft/DraftSender.tsx
Expand Down
4 changes: 3 additions & 1 deletion analytics/dagster/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ matplotlib
pandas
requests
dlt[duckdb]
psycopg2
pydantic_settings
dlt[filesystem]
dlt[parquet]
s3fs==2024.12.0
boto3==1.35.91
thefuzz==0.22.1
psutil
psutil
117 changes: 117 additions & 0 deletions analytics/dagster/src/assets/populate_housings_ban_addresses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
from dagster import asset, MetadataValue, AssetExecutionContext, resource, op
import requests
import pandas as pd
from io import StringIO

@asset(
description="Return housing records from `fast_housing` that have no matching entry in `ban_addresses`.",
required_resource_keys={"psycopg2_connection"}
)
def housings_without_address(context: AssetExecutionContext):
query = """
SELECT fh.id as housing_id, array_to_string(fh.address_dgfip, ' ') as address_dgfip, fh.geo_code
FROM fast_housing fh
LEFT JOIN ban_addresses ba ON fh.id = ba.ref_id
WHERE ba.ref_id IS NULL;
"""

try:
with context.resources.psycopg2_connection as conn:
df = pd.read_sql(query, conn)
except Exception as e:
context.log.error(f"Error executing query: {e}")
raise

return df

@asset(
description="Write housing records without addresses to a CSV file, log the file path and a preview, then return the file path as metadata.",
required_resource_keys={"ban_config"}
)
def create_csv_from_housings(context: AssetExecutionContext, housings_without_address):
config = context.resources.ban_config

csv_file_path = f"{config.csv_file_path}/search.csv"
housings_without_address.to_csv(csv_file_path, index=False, columns=["housing_id","address_dgfip", "geo_code"])

context.log.info(f"CSV file created at: {csv_file_path}")

return {
"metadata": {"file_path": MetadataValue.text(csv_file_path)}
}

@asset(
deps=[create_csv_from_housings], description="Send the local CSV file to the BAN address API for address processing. Raises an exception if the request fails.",
required_resource_keys={"ban_config"}
)
def send_csv_to_api(context: AssetExecutionContext):
config = context.resources.ban_config

files = {'data': open(f"{config.csv_file_path}/search.csv", 'rb')}

data = {'columns': 'address_dgfip', 'citycode': 'geo_code'}

response = requests.post(config.api_url, files=files, data=data)

if response.status_code == 200:
return response.text
else:
raise Exception(f"API request failed with status code {response.status_code}")


@asset(
description="Parse the CSV response from the BAN address API, insert valid addresses into the `ban_addresses` table, log a preview and any failed results, then return the total number of inserted records as metadata.",
required_resource_keys={"sqlalchemy_engine"}
)
def parse_api_response_and_insert_housing_addresses(context: AssetExecutionContext, send_csv_to_api):
api_df = pd.read_csv(StringIO(send_csv_to_api))

filtered_df = api_df[api_df['result_status'] == 'ok']
failed_rows = api_df[api_df['result_status'] != 'ok']
context.log.warning(f"Number of housings with failed API results: {len(failed_rows)}")
if not failed_rows.empty:
context.log.warning(f"Failed rows preview:\n{failed_rows.head(5)}")

filtered_df = filtered_df.applymap(lambda x: None if pd.isna(x) else x)
filtered_df['address_kind'] = "Housing"
engine = context.resources.sqlalchemy_engine

with engine.begin() as connection:
filtered_df.to_sql(
'ban_addresses',
connection,
if_exists='append',
index=False,
columns=[
'housing_id',
'result_housenumber',
'result_label',
'result_street',
'result_postcode',
'result_city',
'latitude',
'longitude',
'result_score',
'result_id',
'address_kind'
],
dtype={
'housing_id': 'INTEGER',
'result_housenumber': 'TEXT',
'result_label': 'TEXT',
'result_street': 'TEXT',
'result_postcode': 'TEXT',
'result_city': 'TEXT',
'latitude': 'FLOAT',
'longitude': 'FLOAT',
'result_score': 'FLOAT',
'result_id': 'TEXT',
'address_kind': 'TEXT'
}
)

context.log.info(f"{len(filtered_df)} valid records inserted successfully.")

return {
"metadata": {"num_records": MetadataValue.text(f"{len(filtered_df)} records inserted")}
}
155 changes: 155 additions & 0 deletions analytics/dagster/src/assets/populate_owners_ban_addresses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
from dagster import asset, MetadataValue, AssetExecutionContext, Output, op
import requests
import pandas as pd
from io import StringIO

@asset(
description="Return owners with no BAN address or a non-validated BAN address (score < 1).",
required_resource_keys={"psycopg2_connection"}
)
def owners_without_address(context: AssetExecutionContext):
query = """
SELECT
o.id as owner_id,
array_to_string(o.address_dgfip, ' ') as address_dgfip
FROM owners o
LEFT JOIN ban_addresses ba ON o.id = ba.ref_id
WHERE (ba.ref_id IS NULL) -- Propriétaires sans adresse
OR (ba.ref_id IS NOT NULL AND ba.address_kind = 'Owner' AND ba.score < 1); -- Propriétaires avec adresse non validée par Stéphanie
"""

try:
with context.resources.psycopg2_connection as conn:
df = pd.read_sql(query, conn)
except Exception as e:
context.log.error(f"Error executing query: {e}")
raise

return df

@asset(
description="Split the owners DataFrame into multiple CSV files (chunks), store them to disk, and return the file paths as metadata.",
required_resource_keys={"ban_config"}
)
def create_csv_chunks_from_owners(context: AssetExecutionContext, owners_without_address):
config = context.resources.ban_config

chunk_size = config.chunk_size
max_files = config.max_files
disable_max_files = config.disable_max_files

num_chunks = len(owners_without_address) // chunk_size + 1

if not disable_max_files:
num_chunks = min(num_chunks, max_files)

file_paths = []

for i in range(num_chunks):
start_idx = i * chunk_size
end_idx = (i + 1) * chunk_size
chunk = owners_without_address.iloc[start_idx:end_idx]

chunk_file_path = f"{config.csv_file_path}_part_{i+1}.csv"
file_paths.append(chunk_file_path)

chunk.to_csv(chunk_file_path, index=False, columns=["owner_id", "address_dgfip"])

context.log.info(f"CSV file created: {chunk_file_path}")
context.log.info(f"Preview of the CSV file:\n{chunk.head()}")
context.log.info(f"Generated {i + 1} out of {num_chunks} files")

return Output(value=file_paths, metadata={"file_paths": MetadataValue.text(", ".join(file_paths))})


@asset(
description="Send each CSV chunk to the BAN address API, aggregate valid responses into a single CSV, and return the path to the aggregated CSV file.",
required_resource_keys={"ban_config"}
)
def send_csv_chunks_to_api(context: AssetExecutionContext, create_csv_chunks_from_owners):
config = context.resources.ban_config

aggregated_file_path = f"{config.csv_file_path}_aggregated.csv"

with open(aggregated_file_path, 'w') as aggregated_file:
first_file = True

for file_path in create_csv_chunks_from_owners:
files = {'data': open(file_path, 'rb')}
data = {'columns': 'address_dgfip', 'citycode': 'geo_code'}

response = requests.post(config.api_url, files=files, data=data)

if response.status_code == 200:
api_data = pd.read_csv(StringIO(response.text))
api_data.to_csv(aggregated_file, mode='a', index=False, header=first_file)
first_file = False

context.log.info(f"Processed file: {file_path}")
else:
raise Exception(f"API request failed with status code {response.status_code}")

return aggregated_file_path

@asset(
description="Parse the aggregated CSV from the BAN address API, insert valid owners' addresses into `ban_addresses`, and return the count of processed records.",
required_resource_keys={"psycopg2_connection"}
)
def parse_api_response_and_insert_owners_addresses(context: AssetExecutionContext, send_csv_chunks_to_api):
api_df = pd.read_csv(send_csv_chunks_to_api)

filtered_df = api_df[api_df['result_status'] == 'ok']
failed_rows = api_df[api_df['result_status'] != 'ok']
context.log.warning(f"Number of owners with failed API results: {len(failed_rows)}")

filtered_df = filtered_df.applymap(lambda x: None if pd.isna(x) else x)
filtered_df['address_kind'] = "Owner"

with context.resources.psycopg2_connection as conn:
with conn.cursor() as cursor:
cursor.execute("""
CREATE TEMP TABLE temp_ban_addresses (
ref_id TEXT,
house_number TEXT,
address TEXT,
street TEXT,
postal_code TEXT,
city TEXT,
latitude FLOAT,
longitude FLOAT,
score FLOAT,
ban_id TEXT,
address_kind TEXT
);
""")

buffer = StringIO()
filtered_df.to_csv(buffer, sep='\t', header=False, index=False)
buffer.seek(0)
cursor.copy_from(buffer, 'temp_ban_addresses', sep='\t')

cursor.execute("""
INSERT INTO ban_addresses (ref_id, house_number, address, street, postal_code, city, latitude, longitude, score, ban_id, address_kind)
SELECT ref_id, house_number, address, street, postal_code, city, latitude, longitude, score, ban_id, address_kind
FROM temp_ban_addresses
ON CONFLICT (ref_id, address_kind)
DO UPDATE SET
house_number = EXCLUDED.house_number,
address = EXCLUDED.address,
street = EXCLUDED.street,
postal_code = EXCLUDED.postal_code,
city = EXCLUDED.city,
latitude = EXCLUDED.latitude,
longitude = EXCLUDED.longitude,
score = EXCLUDED.score,
ban_id = EXCLUDED.ban_id;
""")
cursor.execute("DROP TABLE temp_ban_addresses;")

conn.commit()

context.log.info(f"{len(filtered_df)} valid records inserted successfully.")

return {
"metadata": {"num_records": MetadataValue.text(f"{len(filtered_df)} records inserted")}
}
10 changes: 9 additions & 1 deletion analytics/dagster/src/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@

from .project import dbt_project
from .assets import production_dbt

from .assets import populate_owners_ban_addresses
from .assets import populate_housings_ban_addresses
from .resources.ban_config import ban_config_resource

from .assets import clever

from .assets.dwh.ingest.ingest_lovac_ff_s3_asset import (
Expand Down Expand Up @@ -51,7 +56,7 @@
name="datawarehouse_synchronize_and_build",
selection=AssetSelection.assets(*[*dwh_assets, *dbt_analytics_assets, *["setup_duckdb", "clevercloud_login_and_restart"]])
- AssetSelection.assets(
*[
*[
setup_s3_connection,
check_ff_lovac_on_duckdb,
import_cerema_ff_lovac_data_from_s3_to_duckdb,
Expand Down Expand Up @@ -89,6 +94,8 @@
# dagster_production_assets,
# dagster_notion_assets,
# dagster_notion_assets,
populate_owners_ban_addresses,
populate_housings_ban_addresses,
*dwh_assets,
*dbt_analytics_assets,
*clever_assets_assets
Expand All @@ -105,6 +112,7 @@
"duckdb_local_metabase": DuckDBResource(
database="db/metabase.duckdb",
),
"ban_config": ban_config_resource
},
schedules=[daily_refresh_schedule, yearly_ff_refresh_schedule],
)
14 changes: 14 additions & 0 deletions analytics/dagster/src/jobs/housings_ban_addresses_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from dagster import job
from dagster.src.assets.populate_owners_ban_addresses import housings_without_address
from dagster.src.resources.ban_config import ban_config_resource
from dagster.src.resources import sqlalchemy_engine_resource, psycopg2_connection_resource

@job(
resource_defs={
"ban_config": ban_config_resource,
"sqlalchemy_engine": sqlalchemy_engine_resource,
"psycopg2_connection_resource": psycopg2_connection_resource,
}
)
def housings_ban_addresses_job():
housings_without_address()
14 changes: 14 additions & 0 deletions analytics/dagster/src/jobs/owners_ban_addresses_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from dagster import job
from dagster.src.assets.populate_owners_ban_addresses import owners_without_address
from dagster.src.resources.ban_config import ban_config_resource
from dagster.src.resources import sqlalchemy_engine_resource, psycopg2_connection_resource

@job(
resource_defs={
"ban_config": ban_config_resource,
"sqlalchemy_engine": sqlalchemy_engine_resource,
"psycopg2_connection": psycopg2_connection_resource,
}
)
def owners_ban_addresses_job():
owners_without_address()
Loading

0 comments on commit bc54d16

Please sign in to comment.