Skip to content

Commit

Permalink
feat: populate edited owners ban_id
Browse files Browse the repository at this point in the history
  • Loading branch information
loicguillois committed Feb 4, 2025
1 parent b488181 commit 398c230
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 178 deletions.
2 changes: 2 additions & 0 deletions .talismanrc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ fileignoreconfig:
checksum: df312ccb4c75fc4c2441a1f7f2c7817ee98ffb3065c78d5d7d6addf6ab129176
- filename: analytics/.env.example
checksum: 8b09617118ef02245d361673d661fdee62b71a683a58d344dd09354f9c144c37
- filename: analytics/dagster/src/assets/populate_edited_owners_ban_addresses.py
checksum: c3a27f2457d2c1cd5f01ef1940c78bc40f8dedb0aff7a5edbc0d1b1e5000f5d3
- filename: analytics/dagster/src/assets/populate_housings_ban_addresses.py
checksum: 66b41821bccc209598ed3d082e5666102edf52ae854b41db3f0b3fe3640657b7
- filename: analytics/dagster/src/assets/populate_owners_ban_addresses.py
Expand Down
3 changes: 2 additions & 1 deletion analytics/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
.ipynb_checkpoints/
*.csv
.tmp
.log
.log
dagster/logs/event.log
2 changes: 1 addition & 1 deletion analytics/dagster/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ COPY src/ $DAGSTER_HOME
# Définir le répertoire de travail
WORKDIR $DAGSTER_HOME

RUN dbt deps
# RUN dbt deps

# Configurer l'authentification basique pour Nginx
RUN htpasswd -cb /etc/nginx/.htpasswd $ZLV_HTTP_USERNAME $ZLV_HTTP_PASSWORD
Expand Down
170 changes: 0 additions & 170 deletions analytics/dagster/logs/event.log

This file was deleted.

145 changes: 145 additions & 0 deletions analytics/dagster/src/assets/populate_edited_owners_ban_addresses.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
from dagster import asset, MetadataValue, AssetExecutionContext, Output, op
import requests
import pandas as pd
from io import StringIO

@asset(
description="Return edited owners (score = 1).",
required_resource_keys={"psycopg2_connection"}
)
def owners_with_edited_address(context: AssetExecutionContext):
query = """
SELECT
o.id as owner_id,
array_to_string(o.address_dgfip, ' ') as address_dgfip
FROM owners o
LEFT JOIN ban_addresses ba ON o.id = ba.ref_id
WHERE ba.ref_id IS NOT NULL AND ba.score = 1; -- Propriétaires avec adresse éditée par Stéphanie
"""

try:
with context.resources.psycopg2_connection as conn:
df = pd.read_sql(query, conn)
except Exception as e:
context.log.error(f"Error executing query: {e}")
raise

return df

@asset(
description="Split the owners DataFrame into multiple CSV files (chunks), store them to disk, and return the file paths as metadata.",
required_resource_keys={"ban_config"}
)
def create_csv_chunks_from_owners(context: AssetExecutionContext, owners_with_edited_address):
config = context.resources.ban_config

chunk_size = config.chunk_size
max_files = config.max_files
disable_max_files = config.disable_max_files

num_chunks = len(owners_with_edited_address) // chunk_size + 1

if not disable_max_files:
num_chunks = min(num_chunks, max_files)

file_paths = []

for i in range(num_chunks):
start_idx = i * chunk_size
end_idx = (i + 1) * chunk_size
chunk = owners_with_edited_address.iloc[start_idx:end_idx]

chunk_file_path = f"{config.csv_file_path}_part_{i+1}.csv"
file_paths.append(chunk_file_path)

chunk.to_csv(chunk_file_path, index=False, columns=["owner_id", "address_dgfip"])

context.log.info(f"CSV file created: {chunk_file_path}")
context.log.info(f"Preview of the CSV file:\n{chunk.head()}")
context.log.info(f"Generated {i + 1} out of {num_chunks} files")

return Output(value=file_paths, metadata={"file_paths": MetadataValue.text(", ".join(file_paths))})


@asset(
description="Send each CSV chunk to the BAN address API, aggregate valid responses into a single CSV, and return the path to the aggregated CSV file.",
required_resource_keys={"ban_config"}
)
def send_csv_chunks_to_api(context: AssetExecutionContext, create_csv_chunks_from_owners):
config = context.resources.ban_config

aggregated_file_path = f"{config.csv_file_path}_aggregated.csv"

with open(aggregated_file_path, 'w') as aggregated_file:
first_file = True

for file_path in create_csv_chunks_from_owners:
files = {'data': open(file_path, 'rb')}
data = {'columns': 'address_dgfip', 'citycode': 'geo_code'}

response = requests.post(config.api_url, files=files, data=data)

if response.status_code == 200:
api_data = pd.read_csv(StringIO(response.text))
api_data.to_csv(aggregated_file, mode='a', index=False, header=first_file)
first_file = False

context.log.info(f"Processed file: {file_path}")
else:
raise Exception(f"API request failed with status code {response.status_code}")

return aggregated_file_path

@asset(
description="Parse the aggregated CSV from the BAN address API, insert valid owners' addresses into `ban_addresses`, and return the count of processed records.",
required_resource_keys={"psycopg2_connection"}
)
def parse_api_response_and_insert_owners_addresses(context: AssetExecutionContext, send_csv_chunks_to_api):
api_df = pd.read_csv(send_csv_chunks_to_api)

filtered_df = api_df[api_df['result_status'] == 'ok']
failed_rows = api_df[api_df['result_status'] != 'ok']
context.log.warning(f"Number of owners with failed API results: {len(failed_rows)}")

filtered_df = filtered_df.applymap(lambda x: None if pd.isna(x) else x)
filtered_df['address_kind'] = "Owner"

with context.resources.psycopg2_connection as conn:
with conn.cursor() as cursor:
cursor.execute("""
CREATE TEMP TABLE temp_ban_addresses (
ref_id TEXT,
house_number TEXT,
address TEXT,
street TEXT,
postal_code TEXT,
city TEXT,
latitude FLOAT,
longitude FLOAT,
score FLOAT,
ban_id TEXT,
address_kind TEXT
);
""")

buffer = StringIO()
filtered_df.to_csv(buffer, sep='\t', header=False, index=False)
buffer.seek(0)
cursor.copy_from(buffer, 'temp_ban_addresses', sep='\t')

cursor.execute("""
UPDATE ban_addresses AS ba
SET ban_id = tba.ban_id
FROM temp_ban_addresses AS tba
WHERE ba.ref_id = tba.ref_id
AND ba.address_kind = tba.address_kind;
""")
cursor.execute("DROP TABLE temp_ban_addresses;")

conn.commit()

context.log.info(f"{len(filtered_df)} valid records inserted successfully.")

return {
"metadata": {"num_records": MetadataValue.text(f"{len(filtered_df)} records inserted")}
}
14 changes: 13 additions & 1 deletion analytics/dagster/src/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from .assets import production_dbt

from .assets.populate_owners_ban_addresses import owners_without_address, create_csv_chunks_from_owners, send_csv_chunks_to_api, parse_api_response_and_insert_owners_addresses
from .assets.populate_edited_owners_ban_addresses import owners_with_edited_address, create_csv_chunks_from_owners, send_csv_chunks_to_api, parse_api_response_and_insert_owners_addresses
from .assets.populate_housings_ban_addresses import housings_without_address, create_csv_from_housings, send_csv_to_api, parse_api_response_and_insert_housing_addresses
from .resources.ban_config import ban_config_resource
from .resources.database_resources import psycopg2_connection_resource
Expand Down Expand Up @@ -100,6 +101,16 @@
),
)

edited_owners_asset_job = define_asset_job(
name="populate_edited_owners_addresses",
selection=AssetSelection.assets(
"owners_with_edited_address",
"create_csv_chunks_from_owners",
"send_csv_chunks_to_api",
"parse_api_response_and_insert_owners_addresses",
),
)

housings_asset_job = define_asset_job(
name="populate_housings_addresses",
selection=AssetSelection.assets(
Expand All @@ -117,6 +128,7 @@
# dagster_notion_assets,
# dagster_notion_assets,
owners_without_address, create_csv_chunks_from_owners, send_csv_chunks_to_api, parse_api_response_and_insert_owners_addresses,
owners_with_edited_address, create_csv_chunks_from_owners, send_csv_chunks_to_api, parse_api_response_and_insert_owners_addresses,
housings_without_address, create_csv_from_housings, send_csv_to_api, parse_api_response_and_insert_housing_addresses,
*dwh_assets,
*dbt_analytics_assets,
Expand All @@ -139,5 +151,5 @@
"sqlalchemy_engine": sqlalchemy_engine_resource,
},
schedules=[daily_refresh_schedule, yearly_ff_refresh_schedule],
jobs=[owners_asset_job, housings_asset_job],
jobs=[owners_asset_job, edited_owners_asset_job, housings_asset_job],
)
14 changes: 14 additions & 0 deletions analytics/dagster/src/jobs/edited_owners_ban_addresses_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from dagster import job
from dagster.src.assets.populate_edited_owners_ban_addresses import owners_with_edited_address
from dagster.src.resources.ban_config import ban_config_resource
from dagster.src.resources import sqlalchemy_engine_resource, psycopg2_connection_resource

@job(
resource_defs={
"ban_config": ban_config_resource,
"sqlalchemy_engine": sqlalchemy_engine_resource,
"psycopg2_connection": psycopg2_connection_resource,
}
)
def edited_owners_ban_addresses_job():
owners_with_edited_address()
4 changes: 2 additions & 2 deletions analytics/dagster/src/jobs/housings_ban_addresses_job.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dagster import job
from dagster.src.assets.populate_owners_ban_addresses import housings_without_address
from dagster.src.assets.populate_owners_ban_addresses import owners_without_address
from dagster.src.resources.ban_config import ban_config_resource
from dagster.src.resources import sqlalchemy_engine_resource, psycopg2_connection_resource

Expand All @@ -11,4 +11,4 @@
}
)
def housings_ban_addresses_job():
housings_without_address()
owners_without_address()
6 changes: 3 additions & 3 deletions analytics/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ services:
- ./dbt:/opt/dagster/dbt
env_file:
- .env
expose:
expose:
- 3000
ports:
- 3000:3000

postgres:
image: postgres:latest
expose:
expose:
- 54322
ports:
- 54322:5432
Expand Down

0 comments on commit 398c230

Please sign in to comment.