diff --git a/gch4i/emis_processing/task_industrial_landfills_emis.py b/gch4i/emis_processing/task_industrial_landfills_emis.py index df0bd3f..c6649bc 100644 --- a/gch4i/emis_processing/task_industrial_landfills_emis.py +++ b/gch4i/emis_processing/task_industrial_landfills_emis.py @@ -404,6 +404,7 @@ def task_get_industrial_landfills_food_beverage_inv_data( .drop(columns=["tot_ch4_kt", "nr_ch4_kt"]) .rename(columns={"r_ch4_kt": "ghgi_ch4_kt"}) .dropna() + .query("ghgi_ch4_kt > 0") .reset_index(drop=True) ) @@ -411,6 +412,7 @@ def task_get_industrial_landfills_food_beverage_inv_data( .drop(columns=["tot_ch4_kt", "r_ch4_kt"]) .rename(columns={"nr_ch4_kt": "ghgi_ch4_kt"}) .dropna() + .query("ghgi_ch4_kt > 0") .reset_index(drop=True) ) diff --git a/gch4i/emis_processing/task_msw_landfills_emis.py b/gch4i/emis_processing/task_msw_landfills_emis.py index c827021..dac710f 100644 --- a/gch4i/emis_processing/task_msw_landfills_emis.py +++ b/gch4i/emis_processing/task_msw_landfills_emis.py @@ -216,6 +216,7 @@ def task_msw_landfills_emi( pd.concat(emi_df_list) .groupby(["state_code", "year"])["ghgi_ch4_kt"] .sum() + .query("ghgi_ch4_kt > 0") .reset_index() ) # Save the emissions data to the output path diff --git a/gch4i/proxy_processing/task_industrial_landfills_proxy.py b/gch4i/proxy_processing/task_industrial_landfills_proxy.py index 0eed7e7..62a43f7 100644 --- a/gch4i/proxy_processing/task_industrial_landfills_proxy.py +++ b/gch4i/proxy_processing/task_industrial_landfills_proxy.py @@ -38,8 +38,7 @@ def task_get_reporting_industrial_landfills_pulp_paper_proxy_data( subpart_tt_path = "https://data.epa.gov/efservice/tt_subpart_ghg_info/pub_dim_facility/ghg_name/=/Methane/CSV", state_path: Path = global_data_dir_path / "tl_2020_us_state.zip", - reporting_pulp_paper_proxy_output_path: Annotated[Path, Product] = proxy_data_dir_path - / "ind_landfills_pp_r_proxy.parquet", + reporting_pulp_paper_proxy_output_path: Annotated[Path, Product] = proxy_data_dir_path / "ind_landfills_pp_r_proxy.parquet", ): """ Relative emissions and location information for reporting facilities are taken from @@ -99,7 +98,7 @@ def task_get_reporting_industrial_landfills_pulp_paper_proxy_data( ), ) .drop(columns=["facility_id", "latitude", "longitude", "city", "zip"]) - .loc[:, ["facility_name", "state_code", "geometry", "year", "rel_emi"]] + .loc[:, ["year", "state_code", "geometry", "rel_emi"]] ) reporting_pulp_paper_gdf.to_parquet(reporting_pulp_paper_proxy_output_path) @@ -112,8 +111,7 @@ def task_get_nonreporting_industrial_landfills_pulp_paper_proxy_data( frs_naics_path = global_data_dir_path / "NATIONAL_NAICS_FILE.CSV", frs_facility_path = global_data_dir_path / "NATIONAL_FACILITY_FILE.CSV", mills_online_path: Path = V3_DATA_PATH / "sector/landfills/Mills_OnLine.xlsx", - nonreporting_pulp_paper_proxy_output_path: Annotated[Path, Product] = proxy_data_dir_path - / "ind_landfills_pp_nr_proxy.parquet", + nonreporting_pulp_paper_proxy_output_path: Annotated[Path, Product] = proxy_data_dir_path / "ind_landfills_pp_nr_proxy.parquet", ): """ The Mills OnLine database of facilities is compared against the Subpart HH @@ -243,7 +241,7 @@ def task_get_nonreporting_industrial_landfills_pulp_paper_proxy_data( FRS_facility_locs = frs_naics.merge(frs_main, how='inner', on='registry_id') - #try to match mills database with FRS database, based on state and city - + # try to match mills database with FRS database, based on state and city - # only need to find locations for where 'ghgrp_match' = 0 mills_locs['FRS_match'] = 0 for ifacility in np.arange(0, len(mills_locs)): @@ -256,11 +254,11 @@ def task_get_nonreporting_industrial_landfills_pulp_paper_proxy_data( mills_locs.loc[ifacility,'FRS_match'] = 1 elif len(imatch)>1: FRS_temp = FRS_facility_locs.loc[imatch,:] - new_match = np.where(np.max(FRS_temp['accuracy_value']))[0] - if len(new_match) >0: - mills_locs.loc[ifacility,'lat'] = FRS_facility_locs.loc[imatch[new_match[0]],'latitude'] - mills_locs.loc[ifacility,'lon'] = FRS_facility_locs.loc[imatch[new_match[0]],'longitude'] - mills_locs.loc[ifacility,'FRS_match'] = 1 + # use the location of the first occurance of the facility with the highest accuracy value + new_match = np.argmax(np.max(FRS_temp['accuracy_value'])) + mills_locs.loc[ifacility,'lat'] = FRS_facility_locs.loc[new_match,'latitude'] + mills_locs.loc[ifacility,'lon'] = FRS_facility_locs.loc[new_match,'longitude'] + mills_locs.loc[ifacility,'FRS_match'] = 1 else: continue @@ -268,17 +266,12 @@ def task_get_nonreporting_industrial_landfills_pulp_paper_proxy_data( # keep mills that only have FRS locations (mills that are not already covered by # subpart tt and mills that are not missing locations) - mills_locs = mills_locs.query( - "ghgrp_match == 0 & FRS_match == 1").drop( - columns=["state_name", "county", "city", "grades", "ghgrp_match", "FRS_match"]).rename( - columns={"lat": "latitude", "lon": "longitude"}).dropna() - - # add a column to equally allocate unaccounted for GHGI emissions to all non-reporting mills - mills_locs["ch4_kt"] = 1.0 - mills_locs = mills_locs.reset_index(drop=True) - - nonreporting_pulp_paper_df['rel_emi'] = nonreporting_pulp_paper_df.groupby(["state_code"])['ch4_kt'].transform(lambda x: x / x.sum() if x.sum() > 0 else 0) - nonreporting_pulp_paper_df = nonreporting_pulp_paper_df.drop(columns='ch4_kt') + mills_locs = (mills_locs + .query("ghgrp_match == 0 & FRS_match == 1") + .drop(columns=["state_name", "county", "city", "grades", "ghgrp_match", "FRS_match"]) + .rename(columns={"lat": "latitude", "lon": "longitude"}) + .dropna() + ) nonreporting_pulp_paper_gdf = ( gpd.GeoDataFrame( @@ -290,7 +283,7 @@ def task_get_nonreporting_industrial_landfills_pulp_paper_proxy_data( ), ) .drop(columns=["facility_id", "latitude", "longitude"]) - .loc[:, ["state_code", "geometry", "rel_emi"]] + .loc[:, ["state_code", "geometry"]] ) nonreporting_pulp_paper_gdf.to_parquet(nonreporting_pulp_paper_proxy_output_path) @@ -300,8 +293,7 @@ def task_get_nonreporting_industrial_landfills_pulp_paper_proxy_data( def task_get_reporting_industrial_landfills_food_beverage_proxy_data( subpart_tt_path = "https://data.epa.gov/efservice/tt_subpart_ghg_info/pub_dim_facility/ghg_name/=/Methane/CSV", state_path: Path = global_data_dir_path / "tl_2020_us_state.zip", - reporting_food_beverage_proxy_output_path: Annotated[Path, Product] = proxy_data_dir_path - / "ind_landfills_fb_r_proxy.parquet", + reporting_food_beverage_proxy_output_path: Annotated[Path, Product] = proxy_data_dir_path / "ind_landfills_fb_r_proxy.parquet", ): """ Relative emissions and location information for reporting facilities are taken from @@ -362,7 +354,7 @@ def task_get_reporting_industrial_landfills_food_beverage_proxy_data( ), ) .drop(columns=["latitude", "longitude", "city", "zip"]) - .loc[:, ["facility_id", "facility_name", "state_code", "geometry", "year", "rel_emi"]] + .loc[:, ["year", "state_code", "geometry", "rel_emi"]] ) reporting_food_beverage_gdf.to_parquet(reporting_food_beverage_proxy_output_path) @@ -370,13 +362,12 @@ def task_get_reporting_industrial_landfills_food_beverage_proxy_data( def task_get_nonreporting_industrial_landfills_food_beverage_proxy_data( - state_path: Path = global_data_dir_path / "tl_2020_us_state.zip", - subpart_tt_path = "https://data.epa.gov/efservice/tt_subpart_ghg_info/pub_dim_facility/ghg_name/=/Methane/CSV", - frs_naics_path = global_data_dir_path / "NATIONAL_NAICS_FILE.CSV", - frs_facility_path = global_data_dir_path / "NATIONAL_FACILITY_FILE.CSV", - food_manufacturers_processors_path = V3_DATA_PATH / "sector/landfills/Food Manufacturers and Processors.xlsx", - nonreporting_food_beverage_proxy_output_path: Annotated[Path, Product] = proxy_data_dir_path - / "ind_landfills_fb_nr_proxy.parquet", + state_path: Path = global_data_dir_path / "tl_2020_us_state.zip", + subpart_tt_path = "https://data.epa.gov/efservice/tt_subpart_ghg_info/pub_dim_facility/ghg_name/=/Methane/CSV", + frs_naics_path = global_data_dir_path / "NATIONAL_NAICS_FILE.CSV", + frs_facility_path = global_data_dir_path / "NATIONAL_FACILITY_FILE.CSV", + food_manufacturers_processors_path = V3_DATA_PATH / "sector/landfills/Food Manufacturers and Processors.xlsx", + nonreporting_food_beverage_proxy_output_path: Annotated[Path, Product] = proxy_data_dir_path / "ind_landfills_fb_nr_proxy.parquet", ): """ @@ -547,24 +538,26 @@ def task_get_nonreporting_industrial_landfills_food_beverage_proxy_data( food_beverage_facilities_locs.loc[ifacility,'FRS_match'] = 1 elif len(imatch)>1: FRS_temp = FRS_facility_locs.loc[imatch,:] - new_match = np.where(np.max(FRS_temp['accuracy_value']))[0] - if len(new_match) >0: - food_beverage_facilities_locs.loc[ifacility,'lat'] = FRS_facility_locs.loc[imatch[new_match[0]],'latitude'] - food_beverage_facilities_locs.loc[ifacility,'lon'] = FRS_facility_locs.loc[imatch[new_match[0]],'longitude'] - food_beverage_facilities_locs.loc[ifacility,'FRS_match'] = 1 + # use the location of the first occurance of the facility with the highest accuracy value + new_match = np.argmax(FRS_temp['accuracy_value']) + food_beverage_facilities_locs.loc[ifacility,'lat'] = FRS_facility_locs.loc[imatch[new_match],'latitude'] + food_beverage_facilities_locs.loc[ifacility,'lon'] = FRS_facility_locs.loc[imatch[new_match],'longitude'] + food_beverage_facilities_locs.loc[ifacility,'FRS_match'] = 1 else: continue print('Not Found:',len(food_beverage_facilities_locs)-(np.sum(food_beverage_facilities_locs.loc[:,'FRS_match'])+np.sum(food_beverage_facilities_locs.loc[:,'ghgrp_match'])), 'of',len(food_beverage_facilities_locs)) # Find missing locations by Geocoding addresses + # If this portion of the code fails due to timeout error (1), just re-run and it will work. + # This section takes ~152 minutes to run. geolocator = Nominatim(user_agent="myGeocode") geopy.geocoders.options.default_timeout = None print(geolocator.timeout) food_beverage_facilities_locs['geo_match'] = 0 for ifacility in np.arange(0,len(food_beverage_facilities_locs)): - if food_beverage_facilities_locs.loc[ifacility,'FRS_match'] ==0 and food_beverage_facilities_locs.loc[ifacility,'ghgrp_match'] == 0: + if food_beverage_facilities_locs.loc[ifacility,'FRS_match'] == 0 and food_beverage_facilities_locs.loc[ifacility,'ghgrp_match'] == 0: location = geolocator.geocode(food_beverage_facilities_locs['full_address'][ifacility]) if location is None: continue @@ -602,7 +595,7 @@ def task_get_nonreporting_industrial_landfills_food_beverage_proxy_data( #count -= 1 food_beverage_facilities_locs.loc[ifacility,'lat'] = location2.latitude food_beverage_facilities_locs.loc[ifacility,'lon'] = location2.longitude - food_beverage_facilities_locs.loc[ifacility,'geo_match']=1 + food_beverage_facilities_locs.loc[ifacility,'geo_match'] = 1 else: #count -= 1 food_beverage_facilities_locs.loc[ifacility,'lat'] = location.latitude @@ -631,7 +624,7 @@ def task_get_nonreporting_industrial_landfills_food_beverage_proxy_data( "excessfood_tonyear_lowest", "excessfood_tonyear_highest", "full_address", "partial_address", "lat", "lon", "ghgrp_match", "FRS_match", "geo_match"]) - .loc[:, ["facility_id", "state_code", "geometry", "rel_emi"]] + .loc[:, ["state_code", "geometry", "rel_emi"]] ) nonreporting_food_beverage_gdf.to_parquet(nonreporting_food_beverage_proxy_output_path) diff --git a/gch4i/proxy_processing/task_msw_landfills_proxy.py b/gch4i/proxy_processing/task_msw_landfills_proxy.py index 08b40f5..ddcd918 100644 --- a/gch4i/proxy_processing/task_msw_landfills_proxy.py +++ b/gch4i/proxy_processing/task_msw_landfills_proxy.py @@ -1,15 +1,24 @@ +""" +Name: task_msw_landfills_proxy.py +Date Last Modified: 2025-02-04 +Authors Name: H. Lohman (RTI International) +Purpose: Mapping of msw_landfill emissions to State, Year, emissions + format +gch4i_name: 5A_msw_landfills +Input Files: - {ghgi_data_dir_path}/{5A_msw_landfills}/ + State_MSW_LF_1990-2022_LA.xlsx +Output Files: - {emi_data_dir_path}/ + msw_landfills_r_emi.csv + msw_landfills_nr_emi.csv +Notes: - +""" +# %% STEP 0. Load packages, configuration files, and local parameters ------------------ from pathlib import Path from typing import Annotated -from zipfile import ZipFile -import calendar -import datetime -from pyarrow import parquet import pandas as pd -import osgeo import geopandas as gpd import numpy as np -import seaborn as sns from pytask import Product, task, mark import geopy from geopy.geocoders import Nominatim @@ -17,9 +26,9 @@ from gch4i.config import ( V3_DATA_PATH, + emi_data_dir_path, proxy_data_dir_path, global_data_dir_path, - ghgi_data_dir_path, max_year, min_year, ) @@ -27,18 +36,17 @@ from gch4i.utils import name_formatter tg_to_kt = 0.001 -year_range = [*range(min_year, max_year+1,1)] #List of emission years -year_range_str=[str(i) for i in year_range] +year_range = [*range(min_year, max_year + 1, 1)] # List of emission years +year_range_str = [str(i) for i in year_range] num_years = len(year_range) - +# %% @mark.persist @task(id="msw_landfills_proxy") def task_get_reporting_msw_landfills_proxy_data( subpart_hh_path = "https://data.epa.gov/efservice/hh_subpart_level_information/pub_dim_facility/ghg_name/=/Methane/CSV", state_path: Path = global_data_dir_path / "tl_2020_us_state.zip", - reporting_proxy_output_path: Annotated[Path, Product] = proxy_data_dir_path - / "msw_landfills_r_proxy.parquet", + reporting_proxy_output_path: Annotated[Path, Product] = proxy_data_dir_path / "msw_landfills_r_proxy.parquet", ): """ Relative emissions and location information for reporting facilities are taken from @@ -58,26 +66,27 @@ def task_get_reporting_msw_landfills_proxy_data( # Reporting facilities from Subpart HH reporting_facility_df = ( - pd.read_csv( - subpart_hh_path, - usecols=("facility_name", - "facility_id", - "reporting_year", - "ghg_quantity", - "latitude", - "longitude", - "state", - "zip")) - .rename(columns=lambda x: str(x).lower()) - .rename(columns={"reporting_year": "year", "ghg_quantity": "ch4_t", "state": "state_code"}) - .assign(ch4_kt=lambda df: df["ch4_t"] * tg_to_kt) - .drop(columns=["ch4_t"]) - .drop_duplicates(subset=['facility_id', 'year'], keep='last') - .astype({"year": int}) - .query("year.between(@min_year, @max_year)") - .query("state_code.isin(@state_gdf['state_code'])") - .reset_index(drop=True) - ) + pd.read_csv( + subpart_hh_path, + usecols=("facility_name", + "facility_id", + "reporting_year", + "ghg_quantity", + "latitude", + "longitude", + "state", + "zip") + ) + .rename(columns=lambda x: str(x).lower()) + .rename(columns={"reporting_year": "year", "ghg_quantity": "ch4_t", "state": "state_code"}) + .assign(ch4_kt=lambda df: df["ch4_t"] * tg_to_kt) + .drop(columns=["ch4_t"]) + .drop_duplicates(subset=['facility_id', 'year'], keep='last') + .astype({"year": int}) + .query("year.between(@min_year, @max_year)") + .query("state_code.isin(@state_gdf['state_code'])") + .reset_index(drop=True) + ) reporting_facility_df['rel_emi'] = reporting_facility_df.groupby(["state_code", "year"])['ch4_kt'].transform(lambda x: x / x.sum() if x.sum() > 0 else 0) reporting_facility_df = reporting_facility_df.drop(columns='ch4_kt') @@ -92,7 +101,8 @@ def task_get_reporting_msw_landfills_proxy_data( ), ) .drop(columns=["latitude", "longitude", "zip"]) - .loc[:, ["facility_id", "facility_name", "state_code", "geometry", "year", "rel_emi"]] + .loc[:, ["year", "state_code", "geometry", "rel_emi"]] + .dropna(subset=['geometry']) ) reporting_facility_gdf.to_parquet(reporting_proxy_output_path) @@ -106,26 +116,28 @@ def get_nonreporting_msw_landfills_proxy_data( # the NAICS code pulled from the v2 notebook for facilities in the FRS data MSW_FRS_NAICS_CODE = 562219, state_path: Path = global_data_dir_path / "tl_2020_us_state.zip", - nonreporting_proxy_output_path: Annotated[Path, Product] = proxy_data_dir_path - / "msw_landfills_nr_proxy.parquet", + nonreporting_proxy_output_path: Annotated[Path, Product] = proxy_data_dir_path / "msw_landfills_nr_proxy.parquet", ): # Non-reporting facilities from LMOP and WBJ nonreporting_facility_df = ( - pd.read_excel( + pd.read_excel( nonreporting_facility_path, sheet_name="LandfillComp", usecols="A:B,D,F,AP,BL:BM,BR:BS", skiprows=5, - nrows=1672) - .rename(columns={ - 'Landfill Name (as listed in Original Instance source)': 'Name', - 'HH Off-Ramped, Last Year Reported': 'Last GHGRP Year', - 'Avg. Est. Total WIP (MT)': 'WIP_MT', - 'LMOP Lat': 'LAT', - 'LMOP Long': 'LON', - }).astype({"WBJ City": str, "WBJ Location": str}) - ) + nrows=1672 + ) + .rename(columns={ + 'Landfill Name (as listed in Original Instance source)': 'Name', + 'HH Off-Ramped, Last Year Reported': 'Last GHGRP Year', + 'Avg. Est. Total WIP (MT)': 'WIP_MT', + 'LMOP Lat': 'LAT', + 'LMOP Long': 'LON' + } + ) + .astype({"WBJ City": str, "WBJ Location": str}) + ) nonreporting_facility_df['Full_Address'] = nonreporting_facility_df["WBJ Location"]+' '+nonreporting_facility_df["WBJ City"]+' '+nonreporting_facility_df["State"] nonreporting_facility_df['Partial_Address'] = nonreporting_facility_df["WBJ City"]+' '+nonreporting_facility_df["State"] nonreporting_facility_df.fillna('NaN', inplace=True) @@ -178,15 +190,11 @@ def get_nonreporting_msw_landfills_proxy_data( # if name and state match more than one entry, use the one with the higher accuracy # or the first entry if the accuracy values are the same FRS_temp = FRS_facility_locs.loc[imatch, :].copy() - new_match = np.where(np.max(FRS_temp['accuracy_value']))[0] - if len(new_match) ==1: - nonreporting_facility_noloc.loc[ifacility, 'LAT'] = FRS_facility_locs.loc[imatch[new_match[0]], 'latitude'] - nonreporting_facility_noloc.loc[ifacility, 'LON'] = FRS_facility_locs.loc[imatch[new_match[0]], 'longitude'] - nonreporting_facility_noloc.loc[ifacility, 'found'] = 1 - elif len(new_match)<1: - nonreporting_facility_noloc.loc[ifacility, 'LAT'] = FRS_facility_locs.loc[imatch[0], 'latitude'] - nonreporting_facility_noloc.loc[ifacility, 'LON'] = FRS_facility_locs.loc[imatch[0], 'longitude'] - nonreporting_facility_noloc.loc[ifacility, 'found'] = 1 + # use the location of the first occurance of the facility with the highest accuracy value + new_match = np.argmax(np.max(FRS_temp['accuracy_value'])) + nonreporting_facility_noloc.loc[ifacility, 'LAT'] = FRS_facility_locs.loc[imatch[new_match], 'latitude'] + nonreporting_facility_noloc.loc[ifacility, 'LON'] = FRS_facility_locs.loc[imatch[new_match], 'longitude'] + nonreporting_facility_noloc.loc[ifacility, 'found'] = 1 # next try matching based on any of the words in name and state and city if nonreporting_facility_noloc.loc[ifacility, 'found'] == 0: @@ -210,15 +218,11 @@ def get_nonreporting_msw_landfills_proxy_data( # if name and state match more than one entry, use the one with the higher accuracy # or the first entry if the accuracy values are the same FRS_temp = FRS_facility_locs.loc[imatch,:].copy() - new_match = np.where(np.max(FRS_temp['accuracy_value']))[0] - if len(new_match) ==1: - nonreporting_facility_noloc.loc[ifacility,'LAT'] = FRS_facility_locs.loc[imatch[new_match[0]],'latitude'] - nonreporting_facility_noloc.loc[ifacility,'LON'] = FRS_facility_locs.loc[imatch[new_match[0]],'longitude'] - nonreporting_facility_noloc.loc[ifacility,'found'] = 1 - elif len(new_match)<1: - nonreporting_facility_noloc.loc[ifacility,'LAT'] = FRS_facility_locs.loc[imatch[0],'latitude'] - nonreporting_facility_noloc.loc[ifacility,'LON'] = FRS_facility_locs.loc[imatch[0],'longitude'] - nonreporting_facility_noloc.loc[ifacility,'found'] = 1 + # use the location of the first occurance of the facility with the highest accuracy value + new_match = np.argmax(np.max(FRS_temp['accuracy_value'])) + nonreporting_facility_noloc.loc[ifacility,'LAT'] = FRS_facility_locs.loc[imatch[new_match],'latitude'] + nonreporting_facility_noloc.loc[ifacility,'LON'] = FRS_facility_locs.loc[imatch[new_match],'longitude'] + nonreporting_facility_noloc.loc[ifacility,'found'] = 1 # next try matching based on state and city if nonreporting_facility_noloc.loc[ifacility,'found'] == 0: @@ -286,6 +290,8 @@ def get_nonreporting_msw_landfills_proxy_data( # This uses the free openstreetmaps api (not as good as google maps, but free) # only need to get locations for facilities where found = 0 # if this doesn't work with run using 'run all', try running individually + # If this portion of the code fails due to timeout error (1), just re-run and it will work. + # This section takes ~15 minutes to run. print("Geolocating non-reporting facilities") geolocator = Nominatim(user_agent="myGeocode") geopy.geocoders.options.default_timeout = None @@ -313,7 +319,7 @@ def get_nonreporting_msw_landfills_proxy_data( else: nonreporting_facility_noloc.loc[ifacility,'LAT'] = location.latitude nonreporting_facility_noloc.loc[ifacility,'LON'] = location.longitude - nonreporting_facility_noloc.loc[ifacility,'found']=1 + nonreporting_facility_noloc.loc[ifacility,'found'] = 1 print('Second Try - Percentage found:',(np.sum(nonreporting_facility_noloc['found']))/len(nonreporting_facility_noloc)) print('Missing Count',len(nonreporting_facility_noloc[nonreporting_facility_noloc['found']==0])) @@ -360,8 +366,90 @@ def get_nonreporting_msw_landfills_proxy_data( ), ) .drop(columns=["facility_id", "LAT", "LON", "WBJ Location", "WBJ City", "Full_Address", "Partial_Address", "found"]) - .loc[:, ["facility_name", "state_code", "year", "geometry", "rel_emi"]] + .loc[:, ["year", "state_code", "geometry", "rel_emi"]] ) nonreporting_facility_gdf.to_parquet(nonreporting_proxy_output_path) return None + + +# Correct proxy data if missing states compared to emi files: +# If proxy data exists for a state in the other landfill proxy, assign emissions to +# those locations, otherwise assign the emissions uniformly across the entire state. +# For example, if the proxy data is missing for reporting landfills in RI, check to see +# if the data exists for nonreporting landfills in RI. If the data exists, assign +# reporting emissions to these locations, otherwise assign the emissions uniformly +# across the state of RI. +def add_missing_landfills_proxy_data( + state_path: Path = global_data_dir_path / "tl_2020_us_state.zip", + r_emi_path: Annotated[Path, Product] = emi_data_dir_path / "msw_landfills_r_emi.csv", + r_proxy_path: Annotated[Path, Product] = proxy_data_dir_path / "msw_landfills_r_proxy.parquet", + nr_emi_path: Annotated[Path, Product] = emi_data_dir_path / "msw_landfills_nr_emi.csv", + nr_proxy_path: Annotated[Path, Product] = proxy_data_dir_path / "msw_landfills_nr_proxy.parquet", + ): + + # Read in State Geo Data + state_gdf = ( + gpd.read_file(state_path) + .loc[:, ["NAME", "STATEFP", "STUSPS", "geometry"]] + .rename(columns=str.lower) + .rename(columns={"stusps": "state_code", "name": "state_name"}) + .astype({"statefp": int}) + # get only lower 48 + DC + .query("(statefp < 60) & (statefp != 2) & (statefp != 15)") + .to_crs(4326) + ) + + # Read in reporting and non-reporting emission and proxy files + r_emi_df = pd.read_csv(r_emi_path).query("ghgi_ch4_kt > 0") + r_proxy_gdf = gpd.read_parquet(r_proxy_path) + nr_emi_df = pd.read_csv(nr_emi_path).query("ghgi_ch4_kt > 0") + nr_proxy_gdf = gpd.read_parquet(nr_proxy_path) + + def get_alt_proxy_data(emi_to_update, proxy_to_update, proxy_to_compare): + # Get lists of state code-year pairs for all emi and proxy files + emi_to_update_states = set(emi_to_update[['state_code', 'year']].itertuples(index=False, name=None)) + proxy_to_update_states = set(proxy_to_update[['state_code', 'year']].itertuples(index=False, name=None)) + proxy_to_compare_states = set(proxy_to_compare[['state_code', 'year']].itertuples(index=False, name=None)) + # Get unique state code-year pairs without proxy data + missing_states = emi_to_update_states.difference(proxy_to_update_states) + if missing_states: + alt_proxy = gpd.GeoDataFrame() + for istate_year in np.arange(0, len(missing_states)): + istate = str(list(missing_states)[istate_year])[2:4] + iyear = int(str(list(missing_states)[istate_year])[7:11]) + # If the missing state code-year pair is in the non-reporting facility list, + # assign emissions to those facility locations. + if list(missing_states)[istate_year] in list(proxy_to_compare_states): + iproxy_data = (proxy_to_compare + .query("state_code == @istate") + .query("year == @iyear") + ) + alt_proxy = gpd.GeoDataFrame(pd.concat([alt_proxy, iproxy_data], ignore_index=True)) + # Otherwise, assign emissions evenly across the state. + else: + iproxy_data = (gpd.GeoDataFrame() + .assign(year=iyear, + state_code=istate, + rel_emi=1.0) + .merge( + state_gdf[['state_code', 'geometry']], + on='state_code', + how='left') + ) + alt_proxy = gpd.GeoDataFrame(pd.concat([alt_proxy, iproxy_data], ignore_index=True)) + # Add missing proxy to original proxy + proxy_gdf_updated = gpd.GeoDataFrame(pd.concat([proxy_to_update, alt_proxy], ignore_index=True).reset_index(drop=True)) + else: + proxy_gdf_updated = proxy_to_update + + return proxy_gdf_updated + + r_proxy_gdf_updated = get_alt_proxy_data(r_emi_df, r_proxy_gdf, nr_proxy_gdf) + nr_proxy_gdf_updated = get_alt_proxy_data(nr_emi_df, nr_proxy_gdf, r_proxy_gdf) + + # Save new proxy data + r_proxy_gdf_updated.to_parquet(r_proxy_path) + nr_proxy_gdf_updated.to_parquet(nr_proxy_path) + + return None