Skip to content

Commit

Permalink
feat: add script to compute all impacts of LCA databases (#41)
Browse files Browse the repository at this point in the history
* feat: compute all impacts

* feat: compute corrected impacts

* feat: use typer for better CLI management

* chore: remove unused code

* chore: add . to default PYTHONPATH
  • Loading branch information
vjousse authored Feb 13, 2025
1 parent adea5ad commit d6a1f5c
Show file tree
Hide file tree
Showing 6 changed files with 263 additions and 15 deletions.
1 change: 1 addition & 0 deletions .env.sample
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
ECOBALYSE_OUTPUT_DIR=/path/to/ecobalyse/public/data/
ECOBALYSE_LOCAL_EXPORT=True
PYTHONPATH=.
174 changes: 174 additions & 0 deletions bin/export_icv.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
#!/usr/bin/env python3

import logging
import multiprocessing
from multiprocessing import Pool
from typing import List, Optional

import bw2calc
import bw2data
import orjson
import typer
from bw2data.project import projects
from rich.logging import RichHandler
from typing_extensions import Annotated

from common import (
calculate_aggregate,
compute_normalization_factors,
correct_process_impacts,
fix_unit,
with_subimpacts,
)
from common.export import IMPACTS_JSON, compute_brightway_impacts
from common.impacts import impacts as impacts_py
from common.impacts import main_method
from config import settings
from models.process import BwProcess, UnitEnum

normalization_factors = compute_normalization_factors(IMPACTS_JSON)

# Use rich for logging
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = RichHandler(markup=True)
handler.setFormatter(logging.Formatter(fmt="%(message)s", datefmt="[%X]"))
logger.addHandler(handler)

# Init BW project
projects.set_current(settings.bw.project)
available_bw_databases = ", ".join(bw2data.databases)


def get_process_with_impacts(
activity, main_method, impacts_py, impacts_json, database_name
) -> dict:
impacts = None
try:
# Try to compute impacts using Brightway
impacts = compute_brightway_impacts(activity, main_method, impacts_py)
impacts = with_subimpacts(impacts)

corrections = {
k: v["correction"] for (k, v) in impacts_json.items() if "correction" in v
}
# This function directly mutate the impacts dicts
correct_process_impacts(impacts, corrections)

impacts["pef"] = calculate_aggregate(impacts, normalization_factors["pef"])
impacts["ecs"] = calculate_aggregate(impacts, normalization_factors["ecs"])

except bw2calc.errors.BW2CalcError as e:
logger.error(f"-> Impossible to compute impacts for {activity}")
logger.exception(e)

unit = fix_unit(activity.get("unit"))

if unit not in UnitEnum.__members__.values():
unit = None

process = BwProcess(
categories=activity.get("categories", []),
comment=activity.get("comment", ""),
impacts=impacts,
name=activity.get("name"),
source=database_name,
sourceId=activity.get("Process identifier"),
unit=unit,
)

return process.model_dump()


def bw_database_validation(values: Optional[List[str]]):
for value in values:
if value not in bw2data.databases:
raise typer.BadParameter(
f"Database not present in Brightway. Available databases are: {available_bw_databases}."
)

return values


def main(
output_file: Annotated[
typer.FileBinaryWrite,
typer.Argument(help="The output json file."),
],
# Take all the cores available minus one to avoid locking the system
# If only one core is available, use it (that’s what the `or 1` is for)
cpu_count: Annotated[
Optional[int],
typer.Option(
help="The number of CPUs/cores to use for computation. Default to MAX-1."
),
] = multiprocessing.cpu_count() - 1 or 1,
max: Annotated[
int,
typer.Option(
help="Number of max processes to compute per DB. Useful for testing purpose. Negative value means all processes."
),
] = -1,
db: Annotated[
Optional[List[str]],
typer.Option(
callback=bw_database_validation,
help=f"Brightway databases you want to computate impacts for. Default to all. You can specify multiple `--db`.\n\nAvailable databases are: {available_bw_databases}.",
),
] = [],
):
"""
Compute the detailed impacts for all the databases in the default Brightway project.
You can specify the number of CPUs to be used for computation by specifying CPU_COUNT argument.
"""
all_impacts = {}

# Get specified dbs or default to all BW databases
databases = db if db else bw2data.databases

nb_processes = 0

for database_name in databases:
logger.info(f"-> Exploring DB '{database_name}'")

db = bw2data.Database(database_name)

logger.info(f"-> Total number of activities in db: {len(db)}")

with Pool(cpu_count) as pool:
activities_paramaters = []
nb_activity = 0

for activity in db:
if "process" in activity.get("type") and (max < 0 or nb_activity < max):
activities_paramaters.append(
# Parameters of the `get_process_with_impacts` function
(activity, main_method, impacts_py, IMPACTS_JSON, database_name)
)
nb_activity += 1

processes_with_impacts = pool.starmap(
get_process_with_impacts, activities_paramaters
)

logger.info(
f"-> Computed impacts for {len(processes_with_impacts)} processes in '{database_name}'"
)

all_impacts[database_name] = processes_with_impacts
nb_processes += len(processes_with_impacts)

db_names = ", ".join([f"'{db}'" for db in databases])

logger.info(
f"-> Finished computing impacts for {nb_processes} processes in {len(databases)} databases: {db_names}"
)

output_file.write(
orjson.dumps(all_impacts, option=orjson.OPT_INDENT_2 | orjson.OPT_SORT_KEYS)
)


if __name__ == "__main__":
typer.run(main)
43 changes: 28 additions & 15 deletions common/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,30 @@ def with_subimpacts(impacts):
return impacts


def correct_process_impacts(impacts, corrections):
"""
Compute corrected impacts (`_c`) defined in the corrections map
Python objects are passed `by assignement` (it can be considered the same as `by reference`)
So this function directly mutates the impacts dict, don’t judge me for that, it is needed to
allow the use of frozendicts in the outer calls
"""
# compute corrected impacts
for impact_to_correct, correction in corrections.items():
# only correct if the impact is not already computed
if impact_to_correct not in impacts:
corrected_impact = 0
for correction_item in correction: # For each sub-impact and its weighting
sub_impact_name = correction_item["sub-impact"]
if sub_impact_name in impacts:
sub_impact = impacts.get(sub_impact_name, 1)
corrected_impact += sub_impact * correction_item["weighting"]
del impacts[sub_impact_name]
impacts[impact_to_correct] = corrected_impact

return impacts


def with_corrected_impacts(impact_defs, frozen_processes, impacts="impacts"):
"""Add corrected impacts to the processes"""
corrections = {
Expand All @@ -141,22 +165,11 @@ def with_corrected_impacts(impact_defs, frozen_processes, impacts="impacts"):
processes = dict(frozen_processes)
processes_updated = {}
for key, process in processes.items():
# compute corrected impacts
for impact_to_correct, correction in corrections.items():
# only correct if the impact is not already computed
dimpacts = process.get(impacts, {})
if impact_to_correct not in dimpacts:
corrected_impact = 0
for (
correction_item
) in correction: # For each sub-impact and its weighting
sub_impact_name = correction_item["sub-impact"]
if sub_impact_name in dimpacts:
sub_impact = dimpacts.get(sub_impact_name, 1)
corrected_impact += sub_impact * correction_item["weighting"]
del dimpacts[sub_impact_name]
dimpacts[impact_to_correct] = corrected_impact
# Python objects are passed `by assignement` (can be considered as `by reference`)
# So this function directly mutates the impacts dict
correct_process_impacts(process.get(impacts, {}), corrections)
processes_updated[key] = process

return frozendict(processes_updated)


Expand Down
54 changes: 54 additions & 0 deletions models/process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
from enum import Enum
from typing import List, Optional

from pydantic import BaseModel, Field


class UnitEnum(str, Enum):
KG = "kg"
TKM = "t⋅km"
KWH = "kWh"
MJ = "MJ"
L = "L"
ITEMS = "Item(s)"
M2 = "m2"
M3 = "m3"


class Impacts(BaseModel):
acd: float = 0
cch: float = 0
etf: float = 0
etf_c: float = Field(default=0, alias="etf-c")
fru: float = 0
fwe: float = 0
htc: float = 0
htc_c: float = Field(default=0, alias="htc-c")
htn: float = 0
htn_c: float = Field(default=0, alias="htn-c")
ior: float = 0
ldu: float = 0
mru: float = 0
ozd: float = 0
pco: float = 0
pma: float = 0
swe: float = 0
tre: float = 0
wtu: float = 0
ecs: float = 0
pef: float = 0


class BwProcess(BaseModel):
categories: List[str]
comment: str
impacts: Optional[Impacts] = None
name: str
source: str
# Process identifier in Simapro
sourceId: Optional[str] = None
unit: Optional[UnitEnum]

class Config:
populate_by_name = True
use_enum_values = True
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ dependencies = [
"uvicorn>=0.34.0",
"fastapi[all]>=0.115.6",
"dynaconf>=3.2.6",
"orjson>=3.10.15",
"typer>=0.15.1",
]

[dependency-groups]
Expand Down
4 changes: 4 additions & 0 deletions uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit d6a1f5c

Please sign in to comment.