diff --git a/.env.sample b/.env.sample index 5d76d57e..bee6a6ba 100644 --- a/.env.sample +++ b/.env.sample @@ -1,2 +1,3 @@ ECOBALYSE_OUTPUT_DIR=/path/to/ecobalyse/public/data/ ECOBALYSE_LOCAL_EXPORT=True +PYTHONPATH=. diff --git a/bin/export_icv.py b/bin/export_icv.py new file mode 100755 index 00000000..0654cc59 --- /dev/null +++ b/bin/export_icv.py @@ -0,0 +1,174 @@ +#!/usr/bin/env python3 + +import logging +import multiprocessing +from multiprocessing import Pool +from typing import List, Optional + +import bw2calc +import bw2data +import orjson +import typer +from bw2data.project import projects +from rich.logging import RichHandler +from typing_extensions import Annotated + +from common import ( + calculate_aggregate, + compute_normalization_factors, + correct_process_impacts, + fix_unit, + with_subimpacts, +) +from common.export import IMPACTS_JSON, compute_brightway_impacts +from common.impacts import impacts as impacts_py +from common.impacts import main_method +from config import settings +from models.process import BwProcess, UnitEnum + +normalization_factors = compute_normalization_factors(IMPACTS_JSON) + +# Use rich for logging +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) +handler = RichHandler(markup=True) +handler.setFormatter(logging.Formatter(fmt="%(message)s", datefmt="[%X]")) +logger.addHandler(handler) + +# Init BW project +projects.set_current(settings.bw.project) +available_bw_databases = ", ".join(bw2data.databases) + + +def get_process_with_impacts( + activity, main_method, impacts_py, impacts_json, database_name +) -> dict: + impacts = None + try: + # Try to compute impacts using Brightway + impacts = compute_brightway_impacts(activity, main_method, impacts_py) + impacts = with_subimpacts(impacts) + + corrections = { + k: v["correction"] for (k, v) in impacts_json.items() if "correction" in v + } + # This function directly mutate the impacts dicts + correct_process_impacts(impacts, corrections) + + impacts["pef"] = calculate_aggregate(impacts, normalization_factors["pef"]) + impacts["ecs"] = calculate_aggregate(impacts, normalization_factors["ecs"]) + + except bw2calc.errors.BW2CalcError as e: + logger.error(f"-> Impossible to compute impacts for {activity}") + logger.exception(e) + + unit = fix_unit(activity.get("unit")) + + if unit not in UnitEnum.__members__.values(): + unit = None + + process = BwProcess( + categories=activity.get("categories", []), + comment=activity.get("comment", ""), + impacts=impacts, + name=activity.get("name"), + source=database_name, + sourceId=activity.get("Process identifier"), + unit=unit, + ) + + return process.model_dump() + + +def bw_database_validation(values: Optional[List[str]]): + for value in values: + if value not in bw2data.databases: + raise typer.BadParameter( + f"Database not present in Brightway. Available databases are: {available_bw_databases}." + ) + + return values + + +def main( + output_file: Annotated[ + typer.FileBinaryWrite, + typer.Argument(help="The output json file."), + ], + # Take all the cores available minus one to avoid locking the system + # If only one core is available, use it (that’s what the `or 1` is for) + cpu_count: Annotated[ + Optional[int], + typer.Option( + help="The number of CPUs/cores to use for computation. Default to MAX-1." + ), + ] = multiprocessing.cpu_count() - 1 or 1, + max: Annotated[ + int, + typer.Option( + help="Number of max processes to compute per DB. Useful for testing purpose. Negative value means all processes." + ), + ] = -1, + db: Annotated[ + Optional[List[str]], + typer.Option( + callback=bw_database_validation, + help=f"Brightway databases you want to computate impacts for. Default to all. You can specify multiple `--db`.\n\nAvailable databases are: {available_bw_databases}.", + ), + ] = [], +): + """ + Compute the detailed impacts for all the databases in the default Brightway project. + + You can specify the number of CPUs to be used for computation by specifying CPU_COUNT argument. + """ + all_impacts = {} + + # Get specified dbs or default to all BW databases + databases = db if db else bw2data.databases + + nb_processes = 0 + + for database_name in databases: + logger.info(f"-> Exploring DB '{database_name}'") + + db = bw2data.Database(database_name) + + logger.info(f"-> Total number of activities in db: {len(db)}") + + with Pool(cpu_count) as pool: + activities_paramaters = [] + nb_activity = 0 + + for activity in db: + if "process" in activity.get("type") and (max < 0 or nb_activity < max): + activities_paramaters.append( + # Parameters of the `get_process_with_impacts` function + (activity, main_method, impacts_py, IMPACTS_JSON, database_name) + ) + nb_activity += 1 + + processes_with_impacts = pool.starmap( + get_process_with_impacts, activities_paramaters + ) + + logger.info( + f"-> Computed impacts for {len(processes_with_impacts)} processes in '{database_name}'" + ) + + all_impacts[database_name] = processes_with_impacts + nb_processes += len(processes_with_impacts) + + db_names = ", ".join([f"'{db}'" for db in databases]) + + logger.info( + f"-> Finished computing impacts for {nb_processes} processes in {len(databases)} databases: {db_names}" + ) + + output_file.write( + orjson.dumps(all_impacts, option=orjson.OPT_INDENT_2 | orjson.OPT_SORT_KEYS) + ) + + +if __name__ == "__main__": + typer.run(main) diff --git a/common/__init__.py b/common/__init__.py index 0119882e..92557b24 100644 --- a/common/__init__.py +++ b/common/__init__.py @@ -133,6 +133,30 @@ def with_subimpacts(impacts): return impacts +def correct_process_impacts(impacts, corrections): + """ + Compute corrected impacts (`_c`) defined in the corrections map + + Python objects are passed `by assignement` (it can be considered the same as `by reference`) + So this function directly mutates the impacts dict, don’t judge me for that, it is needed to + allow the use of frozendicts in the outer calls + """ + # compute corrected impacts + for impact_to_correct, correction in corrections.items(): + # only correct if the impact is not already computed + if impact_to_correct not in impacts: + corrected_impact = 0 + for correction_item in correction: # For each sub-impact and its weighting + sub_impact_name = correction_item["sub-impact"] + if sub_impact_name in impacts: + sub_impact = impacts.get(sub_impact_name, 1) + corrected_impact += sub_impact * correction_item["weighting"] + del impacts[sub_impact_name] + impacts[impact_to_correct] = corrected_impact + + return impacts + + def with_corrected_impacts(impact_defs, frozen_processes, impacts="impacts"): """Add corrected impacts to the processes""" corrections = { @@ -141,22 +165,11 @@ def with_corrected_impacts(impact_defs, frozen_processes, impacts="impacts"): processes = dict(frozen_processes) processes_updated = {} for key, process in processes.items(): - # compute corrected impacts - for impact_to_correct, correction in corrections.items(): - # only correct if the impact is not already computed - dimpacts = process.get(impacts, {}) - if impact_to_correct not in dimpacts: - corrected_impact = 0 - for ( - correction_item - ) in correction: # For each sub-impact and its weighting - sub_impact_name = correction_item["sub-impact"] - if sub_impact_name in dimpacts: - sub_impact = dimpacts.get(sub_impact_name, 1) - corrected_impact += sub_impact * correction_item["weighting"] - del dimpacts[sub_impact_name] - dimpacts[impact_to_correct] = corrected_impact + # Python objects are passed `by assignement` (can be considered as `by reference`) + # So this function directly mutates the impacts dict + correct_process_impacts(process.get(impacts, {}), corrections) processes_updated[key] = process + return frozendict(processes_updated) diff --git a/models/process.py b/models/process.py new file mode 100644 index 00000000..0c8b4cb0 --- /dev/null +++ b/models/process.py @@ -0,0 +1,54 @@ +from enum import Enum +from typing import List, Optional + +from pydantic import BaseModel, Field + + +class UnitEnum(str, Enum): + KG = "kg" + TKM = "t⋅km" + KWH = "kWh" + MJ = "MJ" + L = "L" + ITEMS = "Item(s)" + M2 = "m2" + M3 = "m3" + + +class Impacts(BaseModel): + acd: float = 0 + cch: float = 0 + etf: float = 0 + etf_c: float = Field(default=0, alias="etf-c") + fru: float = 0 + fwe: float = 0 + htc: float = 0 + htc_c: float = Field(default=0, alias="htc-c") + htn: float = 0 + htn_c: float = Field(default=0, alias="htn-c") + ior: float = 0 + ldu: float = 0 + mru: float = 0 + ozd: float = 0 + pco: float = 0 + pma: float = 0 + swe: float = 0 + tre: float = 0 + wtu: float = 0 + ecs: float = 0 + pef: float = 0 + + +class BwProcess(BaseModel): + categories: List[str] + comment: str + impacts: Optional[Impacts] = None + name: str + source: str + # Process identifier in Simapro + sourceId: Optional[str] = None + unit: Optional[UnitEnum] + + class Config: + populate_by_name = True + use_enum_values = True diff --git a/pyproject.toml b/pyproject.toml index b7b5460a..05ecb239 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,8 @@ dependencies = [ "uvicorn>=0.34.0", "fastapi[all]>=0.115.6", "dynaconf>=3.2.6", + "orjson>=3.10.15", + "typer>=0.15.1", ] [dependency-groups] diff --git a/uv.lock b/uv.lock index 99dd8a45..362ad1dd 100644 --- a/uv.lock +++ b/uv.lock @@ -664,8 +664,10 @@ dependencies = [ { name = "jupyterlab" }, { name = "loguru" }, { name = "notebook" }, + { name = "orjson" }, { name = "pydantic-settings" }, { name = "pypardiso", marker = "platform_machine != 'aarch64' and platform_machine != 'arm64'" }, + { name = "typer" }, { name = "typing-extensions" }, { name = "uvicorn" }, { name = "wrapt" }, @@ -694,8 +696,10 @@ requires-dist = [ { name = "jupyterlab", specifier = ">=4.3.2" }, { name = "loguru", specifier = ">=0.7.2" }, { name = "notebook", specifier = ">=7.3.0" }, + { name = "orjson", specifier = ">=3.10.15" }, { name = "pydantic-settings", specifier = ">=2.6.1" }, { name = "pypardiso", marker = "platform_machine != 'aarch64' and platform_machine != 'arm64'", specifier = ">=0.4.6" }, + { name = "typer", specifier = ">=0.15.1" }, { name = "typing-extensions", specifier = ">=4.12.2" }, { name = "uvicorn", specifier = ">=0.34.0" }, { name = "wrapt", specifier = ">=1.17.0" },