diff --git a/data_processing/irve/DAG.py b/data_processing/irve/DAG.py index d75a4140..d8de2abd 100644 --- a/data_processing/irve/DAG.py +++ b/data_processing/irve/DAG.py @@ -34,6 +34,8 @@ TMP_CONFIG_FILE = TMP_FOLDER / "schema.data.gouv.fr/config_consolidation.yml" SCHEMA_CATALOG = "https://schema.data.gouv.fr/schemas/schemas.json" GIT_REPO = "git@github.com:etalab/schema.data.gouv.fr.git" +# DEV : for local dev without SSH enabled +# GIT_REPO = "https://github.com/etalab/schema.data.gouv.fr.git" output_data_folder = f"{TMP_FOLDER}/output/" default_args = { @@ -59,8 +61,6 @@ clone_dag_schema_repo = BashOperator( task_id="clone_dag_schema_repo", bash_command=f"cd {TMP_FOLDER} && git clone {GIT_REPO} --depth 1 ", - # DEV : for local dev without SSH enabled - # bash_command=f"cd {TMP_FOLDER} && git clone https://github.com/etalab/schema.data.gouv.fr.git --depth 1 ", ) get_all_irve_resources = PythonOperator( diff --git a/data_processing/irve/geo_utils/france_bbox.geojson b/data_processing/irve/geo_utils/france_bbox.geojson deleted file mode 100644 index 46223cdc..00000000 --- a/data_processing/irve/geo_utils/france_bbox.geojson +++ /dev/null @@ -1,12 +0,0 @@ -{ -"type": "FeatureCollection", -"name": "out-bbox-modified", -"features": [ -{ "type": "Feature", "properties": { "code": "01", "nom": "Guadeloupe", "depts": "971", "x_min": -61.809839, "y_min": 15.832041, " x_max": -61.001959, "y_max": 16.514488 }, "geometry": { "type": "Polygon", "coordinates": [ [ [ -61.809839, 15.832041 ], [ -61.001959, 15.832041 ], [ -61.001959, 16.514488 ], [ -61.809839, 16.514488 ], [ -61.809839, 15.832041 ] ] ] } }, -{ "type": "Feature", "properties": { "code": "02", "nom": "Martinique", "depts": "972", "x_min": -61.229033, "y_min": 14.388646, " x_max": -60.809655, "y_max": 14.878723 }, "geometry": { "type": "Polygon", "coordinates": [ [ [ -61.229033, 14.388646 ], [ -60.809655, 14.388646 ], [ -60.809655, 14.878723 ], [ -61.229033, 14.878723 ], [ -61.229033, 14.388646 ] ] ] } }, -{ "type": "Feature", "properties": { "code": "03", "nom": "Guyane", "depts": "973", "x_min": -54.60239, "y_min": 2.111055, " x_max": -51.619041, "y_max": 5.748138 }, "geometry": { "type": "Polygon", "coordinates": [ [ [ -54.60239, 2.111055 ], [ -51.619041, 2.111055 ], [ -51.619041, 5.748138 ], [ -54.60239, 5.748138 ], [ -54.60239, 2.111055 ] ] ] } }, -{ "type": "Feature", "properties": { "code": "04", "nom": "La Réunion", "depts": "974", "x_min": 55.216526, "y_min": -21.389631, " x_max": 55.836654, "y_max": -20.8718 }, "geometry": { "type": "Polygon", "coordinates": [ [ [ 55.216526, -21.389631 ], [ 55.836654, -21.389631 ], [ 55.836654, -20.8718 ], [ 55.216526, -20.8718 ], [ 55.216526, -21.389631 ] ] ] } }, -{ "type": "Feature", "properties": { "code": "06", "nom": "Mayotte", "depts": "976", "x_min": 45.01833, "y_min": -13.005254, " x_max": 45.299985, "y_max": -12.63659 }, "geometry": { "type": "Polygon", "coordinates": [ [ [ 45.01833, -13.005254 ], [ 45.299985, -13.005254 ], [ 45.299985, -12.63659 ], [ 45.01833, -12.63659 ], [ 45.01833, -13.005254 ] ] ] } }, -{ "type": "Feature", "properties": { "code": null, "nom": "France", "depts": "01,02,03,04,05,06,07,08,09,10,11,12,13,14,15,16,17,18,19,2A,2B,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95", "x_min": -5.141277, "y_min": 41.333571, " x_max": 9.560091, "y_max": 51.088989 }, "geometry": { "type": "Polygon", "coordinates": [ [ [ -5.141277, 41.333571 ], [ 9.560091, 41.333571 ], [ 9.560091, 51.088989 ], [ -5.141277, 51.088989 ], [ -5.141277, 41.333571 ] ] ] } } -] -} diff --git a/data_processing/irve/geo_utils/geo.py b/data_processing/irve/geo_utils/geo.py index 661dd70f..4fc808c5 100644 --- a/data_processing/irve/geo_utils/geo.py +++ b/data_processing/irve/geo_utils/geo.py @@ -1,47 +1,24 @@ -from typing import Dict, List -import geojson +from typing import Dict import json import os import pandas as pd import requests -from shapely.geometry import Point, shape -from shapely.geometry.polygon import Polygon -from datagouvfr_data_pipelines.config import AIRFLOW_DAG_HOME - -with open( - f"{AIRFLOW_DAG_HOME}/datagouvfr_data_pipelines/schema/utils/france_bbox.geojson" -) as f: - FRANCE_BBOXES = geojson.load(f) - -# Create a Polygon -geoms = [region["geometry"] for region in FRANCE_BBOXES.get("features")] -polys = [shape(geom) for geom in geoms] - - -def is_point_in_polygon(x: float, y: float, polygon: List[List[float]]) -> bool: - point = Point(x, y) - polygon_shape = Polygon(polygon) - return polygon_shape.contains(point) - - -def is_point_in_france(coordonnees_xy: List[float]) -> bool: - p = Point(*coordonnees_xy) - return any(p.within(poly) for poly in polys) +from frformat.geo.coordonnees_gps_francaises import CoordonneesGPSFrancaises def fix_coordinates_order( df: pd.DataFrame, coordinates_column: str = "coordonneesXY" ) -> pd.DataFrame: """ - Cette fonction modifie une dataframe pour placer la longitude avant la latitude - dans la colonne qui contient les deux au format "[lon, lat]". + Cette fonction modifie un dataframe pour placer la longitude avant la latitude + dans la colonne qui contient les deux au format "[lat, lon]". """ def fix_coordinates(row: pd.Series) -> pd.Series: coordonnees_xy = json.loads(row[coordinates_column]) reversed_coordonnees = list(reversed(coordonnees_xy)) row["consolidated_coordinates_reordered"] = False - if is_point_in_france(reversed_coordonnees): + if CoordonneesGPSFrancaises.is_valid(*reversed_coordonnees): # Coordinates are inverted with lat before lon row[coordinates_column] = json.dumps(reversed_coordonnees) row["consolidated_coordinates_reordered"] = True diff --git a/schema/utils/france_bbox.geojson b/schema/utils/france_bbox.geojson deleted file mode 100644 index 46223cdc..00000000 --- a/schema/utils/france_bbox.geojson +++ /dev/null @@ -1,12 +0,0 @@ -{ -"type": "FeatureCollection", -"name": "out-bbox-modified", -"features": [ -{ "type": "Feature", "properties": { "code": "01", "nom": "Guadeloupe", "depts": "971", "x_min": -61.809839, "y_min": 15.832041, " x_max": -61.001959, "y_max": 16.514488 }, "geometry": { "type": "Polygon", "coordinates": [ [ [ -61.809839, 15.832041 ], [ -61.001959, 15.832041 ], [ -61.001959, 16.514488 ], [ -61.809839, 16.514488 ], [ -61.809839, 15.832041 ] ] ] } }, -{ "type": "Feature", "properties": { "code": "02", "nom": "Martinique", "depts": "972", "x_min": -61.229033, "y_min": 14.388646, " x_max": -60.809655, "y_max": 14.878723 }, "geometry": { "type": "Polygon", "coordinates": [ [ [ -61.229033, 14.388646 ], [ -60.809655, 14.388646 ], [ -60.809655, 14.878723 ], [ -61.229033, 14.878723 ], [ -61.229033, 14.388646 ] ] ] } }, -{ "type": "Feature", "properties": { "code": "03", "nom": "Guyane", "depts": "973", "x_min": -54.60239, "y_min": 2.111055, " x_max": -51.619041, "y_max": 5.748138 }, "geometry": { "type": "Polygon", "coordinates": [ [ [ -54.60239, 2.111055 ], [ -51.619041, 2.111055 ], [ -51.619041, 5.748138 ], [ -54.60239, 5.748138 ], [ -54.60239, 2.111055 ] ] ] } }, -{ "type": "Feature", "properties": { "code": "04", "nom": "La Réunion", "depts": "974", "x_min": 55.216526, "y_min": -21.389631, " x_max": 55.836654, "y_max": -20.8718 }, "geometry": { "type": "Polygon", "coordinates": [ [ [ 55.216526, -21.389631 ], [ 55.836654, -21.389631 ], [ 55.836654, -20.8718 ], [ 55.216526, -20.8718 ], [ 55.216526, -21.389631 ] ] ] } }, -{ "type": "Feature", "properties": { "code": "06", "nom": "Mayotte", "depts": "976", "x_min": 45.01833, "y_min": -13.005254, " x_max": 45.299985, "y_max": -12.63659 }, "geometry": { "type": "Polygon", "coordinates": [ [ [ 45.01833, -13.005254 ], [ 45.299985, -13.005254 ], [ 45.299985, -12.63659 ], [ 45.01833, -12.63659 ], [ 45.01833, -13.005254 ] ] ] } }, -{ "type": "Feature", "properties": { "code": null, "nom": "France", "depts": "01,02,03,04,05,06,07,08,09,10,11,12,13,14,15,16,17,18,19,2A,2B,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95", "x_min": -5.141277, "y_min": 41.333571, " x_max": 9.560091, "y_max": 51.088989 }, "geometry": { "type": "Polygon", "coordinates": [ [ [ -5.141277, 41.333571 ], [ 9.560091, 41.333571 ], [ 9.560091, 51.088989 ], [ -5.141277, 51.088989 ], [ -5.141277, 41.333571 ] ] ] } } -] -} diff --git a/schema/utils/geo.py b/schema/utils/geo.py deleted file mode 100644 index 12a06824..00000000 --- a/schema/utils/geo.py +++ /dev/null @@ -1,268 +0,0 @@ -from typing import Dict, List -import geojson -import json -import os -import pandas as pd -import requests -from unidecode import unidecode -from shapely.geometry import Point, shape -from shapely.geometry.polygon import Polygon -from datagouvfr_data_pipelines.config import AIRFLOW_DAG_HOME - -with open( - f"{AIRFLOW_DAG_HOME}/datagouvfr_data_pipelines/schema/utils/france_bbox.geojson" -) as f: - FRANCE_BBOXES = geojson.load(f) - - -def is_point_in_polygon(x: float, y: float, polygon: List[List[float]]) -> bool: - point = Point(x, y) - polygon_shape = Polygon(polygon) - return polygon_shape.contains(point) - - -def is_point_in_france(coordonnees_xy: List[float]) -> bool: - p = Point(*coordonnees_xy) - - # Create a Polygon - geoms = [region["geometry"] for region in FRANCE_BBOXES.get("features")] - polys = [shape(geom) for geom in geoms] - return any([p.within(poly) for poly in polys]) - - -def fix_coordinates_order( - df: pd.DataFrame, coordinates_column: str = "coordonneesXY" -) -> pd.DataFrame: - """ - Cette fonction modifie une dataframe pour placer la longitude avant la latitude - dans la colonne qui contient les deux au format "[lon, lat]". - """ - - def fix_coordinates(row: pd.Series) -> pd.Series: - coordonnees_xy = json.loads(row[coordinates_column]) - reversed_coordonnees = list(reversed(coordonnees_xy)) - row["consolidated_coordinates_reordered"] = False - if is_point_in_france(reversed_coordonnees): - # Coordinates are inverted with lat before lon - row[coordinates_column] = json.dumps(reversed_coordonnees) - row["consolidated_coordinates_reordered"] = True - fix_coordinates.rows_modified = fix_coordinates.rows_modified + 1 - return row - - fix_coordinates.rows_modified = 0 - df = df.apply(fix_coordinates, axis=1) - print(f"Coordinates reordered: {fix_coordinates.rows_modified}/{len(df)}") - return df - - -def create_lon_lat_cols( - df: pd.DataFrame, coordinates_column: str = "coordonneesXY" -) -> pd.DataFrame: - """Add longitude and latitude columns to dataframe using coordinates_column""" - coordinates = df[coordinates_column].apply(json.loads) - df["consolidated_longitude"] = coordinates.str[0] - df["consolidated_latitude"] = coordinates.str[1] - return df - - -def export_to_geojson( - df: pd.DataFrame, target_filepath: str, coordinates_column: str = "coordonneesXY" -) -> None: - """Export dataframe into Geojson format""" - json_result_string = df.to_json( - orient="records", double_precision=12, date_format="iso" - ) - json_result = json.loads(json_result_string) - - geojson = {"type": "FeatureCollection", "features": []} - for record in json_result: - coordinates = json.loads(record[coordinates_column]) - longitude, latitude = coordinates - geojson["features"].append( - { - "type": "Feature", - "geometry": { - "type": "Point", - "coordinates": [longitude, latitude], - }, - "properties": record, - } - ) - with open(target_filepath, "w") as f: - f.write(json.dumps(geojson, indent=2)) - - -def fix_code_insee( # noqa - df: pd.DataFrame, - code_insee_col: str = "code_insee_commune", - address_col: str = "adresse_station", - lon_col: str = "consolidated_longitude", - lat_col: str = "consolidated_latitude", -) -> pd.DataFrame: - """Check code INSEE in CSV file and enrich with postcode and city - Requires address and coordinates columns - """ - - def enrich_row_address(row: pd.Series) -> pd.Series: - row["consolidated_is_lon_lat_correct"] = False - row["consolidated_is_code_insee_verified"] = False - row["consolidated_code_insee_modified"] = False - # Try getting commune with code INSEE from latitude and longitude alone - url = ( - f"https://geo.api.gouv.fr/communes?lat={row[lat_col]}" - f"&lon={row[lon_col]}&fields=code,nom,codesPostaux" - ) - response = requests.get(url) - try: - commune_results = json.loads(response.content) - except json.decoder.JSONDecodeError: - commune_results = [] - if (response.status_code == requests.codes.ok) and (len(commune_results) > 0): - commune = commune_results[0] - if row[code_insee_col] == commune["code"]: - if len(commune["codesPostaux"]) == 1: - row["consolidated_code_postal"] = commune["codesPostaux"][0] - row["consolidated_commune"] = commune["nom"] - row["consolidated_is_lon_lat_correct"] = True - row["consolidated_is_code_insee_verified"] = True - enrich_row_address.already_good += 1 - return row - elif row[code_insee_col] in commune["codesPostaux"]: - row["consolidated_code_postal"] = row[code_insee_col] - row["consolidated_code_insee_modified"] = True - row[code_insee_col] = commune["code"] - row["consolidated_commune"] = commune["nom"] - row["consolidated_is_lon_lat_correct"] = True - row["consolidated_is_code_insee_verified"] = True - enrich_row_address.code_fixed += 1 - return row - else: - # Lat lon match a commune which does not match code INSEE - enrich_row_address.code_coords_mismatch += 1 - else: - # Lat lon do not match any commune - enrich_row_address.no_match_coords += 1 - print("⚠️ Issue when using coordinates for this row: ", row) - print('requested URL was: ', url) - - if str(row[code_insee_col]) in row[address_col]: - # Code INSEE field actually contains a postcode - url = f"https://geo.api.gouv.fr/communes?codePostal={row[code_insee_col]}&fields=code,nom" - response = requests.get(url) - try: - commune_results = json.loads(response.content) - except json.decoder.JSONDecodeError: - commune_results = [] - if (response.status_code == requests.codes.ok) and ( - len(commune_results) > 0 - ): - commune = commune_results[0] - row["consolidated_code_postal"] = row[code_insee_col] - row["consolidated_commune"] = commune["nom"] - row[code_insee_col] = commune["code"] - row["consolidated_code_insee_modified"] = True - row["consolidated_is_code_insee_verified"] = True - enrich_row_address.code_insee_is_postcode_in_address += 1 - return row - else: - print("⚠️ Issue when using postcode for this row: ", row) - print('requested URL was: ', url) - - # Check if postcode is in address - url = f"https://geo.api.gouv.fr/communes?code={row[code_insee_col]}&fields=codesPostaux,nom" - response = requests.get(url) - try: - commune_results = json.loads(response.content) - except json.decoder.JSONDecodeError: - commune_results = [] - if (response.status_code == requests.codes.ok) and (len(commune_results) > 0): - commune = commune_results[0] - for postcode in commune["codesPostaux"]: - if postcode in row[address_col]: - row["consolidated_code_postal"] = postcode - row["consolidated_commune"] = commune["nom"] - row["consolidated_is_code_insee_verified"] = True - enrich_row_address.code_insee_has_postcode_in_address += 1 - return row - - # None of the above checks succeeded. Code INSEE validity cannot be checked. - # Geo data is not enriched using code INSEE due to risk of introducing fake data - row["consolidated_code_postal"] = "" - row["consolidated_commune"] = "" - enrich_row_address.nothing_matches += 1 - print("⚠️ Issue when using INSEE code for this row: ", row) - print('requested URL was: ', url) - return row - - enrich_row_address.already_good = 0 - enrich_row_address.code_fixed = 0 - enrich_row_address.code_coords_mismatch = 0 - enrich_row_address.no_match_coords = 0 - enrich_row_address.code_insee_is_postcode_in_address = 0 - enrich_row_address.code_insee_has_postcode_in_address = 0 - enrich_row_address.nothing_matches = 0 - - df = df.apply(enrich_row_address, axis=1) - - total_rows = len(df) - print( - "Coords OK. INSEE codes already correct, simply enriched: " - f"{enrich_row_address.already_good}/{total_rows}" - ) - print( - "Coords OK. INSEE code field contained postcode. Fixed and enriched: " - f"{enrich_row_address.code_fixed}/{total_rows}" - ) - print( - "Coords not matching code INSEE field as code INSEE or postcode: " - f"{enrich_row_address.code_coords_mismatch}/{total_rows}" - ) - print( - f"Coords not matching any commune: {enrich_row_address.no_match_coords}/{total_rows}" - ) - print( - "Code INSEE is postcode in address. Fixed and enriched: " - f"{enrich_row_address.code_insee_is_postcode_in_address}/{total_rows}" - ) - print( - "Code INSEE has postcode in address. " - f"Enriched: {enrich_row_address.code_insee_has_postcode_in_address}/{total_rows}" - ) - print( - "No indication of postcode/code INSEE in address or coordinates matching code INSEE field. " - f"No enriching performed: {enrich_row_address.nothing_matches}/{total_rows}" - ) - return df - - -def improve_geo_data_quality(file_cols_mapping: Dict[str, Dict[str, str]]) -> None: - for filepath, cols_dict in file_cols_mapping.items(): - df = pd.read_csv(filepath, dtype="str", na_filter=False, keep_default_na=False) - schema_cols = list(df.columns) - df = fix_coordinates_order(df, coordinates_column=cols_dict["xy_coords"]) - print("Done fixing coordinates") - df = create_lon_lat_cols(df, coordinates_column=cols_dict["xy_coords"]) - print("Done creating long lat") - df = fix_code_insee( - df, - code_insee_col=cols_dict["code_insee"], - address_col=cols_dict["adress"], - lon_col=cols_dict["longitude"], - lat_col=cols_dict["latitude"], - ) - print("Done fixing code INSEE") - new_cols = [ - "consolidated_longitude", - "consolidated_latitude", - "consolidated_code_postal", - "consolidated_commune", - "consolidated_is_lon_lat_correct", - "consolidated_is_code_insee_verified", - ] - df = df[schema_cols + new_cols] - df.to_csv(filepath, index=False) - export_to_geojson( - df, - os.path.splitext(filepath)[0] + ".json", - coordinates_column=cols_dict["xy_coords"], - ) diff --git a/schema/utils/geo_data_quality.md b/schema/utils/geo_data_quality.md deleted file mode 100644 index 03dd3a70..00000000 --- a/schema/utils/geo_data_quality.md +++ /dev/null @@ -1,14 +0,0 @@ -# Geo data quality checks in consolidation - -A number of data quality improvement steps are now carried out on geographical data while consolidating datasets. - -So far, these checks are only applied to consolidated files that comply with an IRVE schema. - -Here are the steps we run: - -- Fix [x,y] coordinates order to ensure longitude comes before latitude. This is done by checking if [y,x] is located in France. If so, the coordinates column is modified accordingly and the `consolidated_coordinates_reordered` column entry is `True`. -- `consolidated_longitude` and `consolidated_latitude` columns are created by parsing the coordinates column. -- The function `fix_code_insee` implements a number of steps to check if the code INSEE is correct or can be fixed and enrich the geographical data where possible with `consolidated_code_postal` and `consolidated_commune`. This function also creates other fields, namely: - - `consolidated_is_code_insee_verified` which is `True` if the final code INSEE after fix matches the coordinates or has a postcode which is present in the address field. - - `consolidated_is_lon_lat_correct` which is `True` if the code INSEE field matches either the code INSEE of the commune where the coordinates point or one of the postcodes of that commune. -- Export consolidated CSV file to GeoJSON format. \ No newline at end of file