From 35849c0c1de53949c060d69e916fc6fa43acdc51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stephan=20G=C3=BCnther?= Date: Mon, 7 Nov 2022 13:53:39 +0100 Subject: [PATCH 01/18] Be more detailed in `assign_gas_bus_id`'s docstring The first line of the docstring was too long, because it was longer than the absolute limit of 79 characters and because it was longer than the limit of 72 characters for free flowing text. So I shortened it. And because shortening was done easiest by moving the remark in parenthesis to the long description, that's what I did. --- src/egon/data/db.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/egon/data/db.py b/src/egon/data/db.py index 962e4ff61..5f4fc8c4a 100755 --- a/src/egon/data/db.py +++ b/src/egon/data/db.py @@ -354,7 +354,11 @@ def commit(*args, **kwargs): def assign_gas_bus_id(dataframe, scn_name, carrier): - """Assigns bus_ids to points (contained in a dataframe) according to location + """Assigns bus_ids to points according to location. + + The points are taken from the given `dataframe` and the geometries by + which the `bus_id`s are assigned to them are taken from the + `grid.egon_gas_voronoi` table. Parameters ---------- From 9c0abd087082b16a08ef8594244a9b86385441ef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stephan=20G=C3=BCnther?= Date: Mon, 7 Nov 2022 13:59:15 +0100 Subject: [PATCH 02/18] Tell linters that this bare except is OK The main complainer was `flake8` when run via pre-commit hooks, but all linters respecting `# noqa` comments should be quieted by this. --- src/egon/data/db.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/egon/data/db.py b/src/egon/data/db.py index 5f4fc8c4a..ee0b2a7b6 100755 --- a/src/egon/data/db.py +++ b/src/egon/data/db.py @@ -128,7 +128,7 @@ def session_scope(): try: yield session session.commit() - except: + except: # noqa: E722 (This is ok because we immediatey reraise.) session.rollback() raise finally: From c270deec04d53db1fb50d62e01273d0356505956 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stephan=20G=C3=BCnther?= Date: Mon, 7 Nov 2022 16:37:25 +0100 Subject: [PATCH 03/18] Use a single `sqlalchemy.engine.Engine` per process According to the [SQLAlchemy documentation][0], the `Engine` should be "held globally", but also "initialized per process". Since parallelizing the workflow was the whole point of "egon-data" holding the `Engine` globally wasn't really an option, but I went overboard with an API that creates a new `Engine` for every `Session`. Fortunately creating the `Engine` through a factory function allows us to cache the `Engine` on a per process basis. This should hit the sweet spot demanded by the [SQLAlchemy documentation][0]. [0]: https://docs.sqlalchemy.org/en/13/core/connections.html#basic-usage --- src/egon/data/db.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/egon/data/db.py b/src/egon/data/db.py index ee0b2a7b6..8c19a2591 100755 --- a/src/egon/data/db.py +++ b/src/egon/data/db.py @@ -1,6 +1,7 @@ from contextlib import contextmanager import codecs import functools +import os import time from psycopg2.errors import DeadlockDetected, UniqueViolation @@ -40,13 +41,19 @@ def credentials(): def engine(): """Engine for local database.""" + if not hasattr(engine, "cache"): + engine.cache = {} + pid = os.getpid() + if pid in engine.cache: + return engine.cache[pid] db_config = credentials() - return create_engine( + engine.cache[pid] = create_engine( f"postgresql+psycopg2://{db_config['POSTGRES_USER']}:" f"{db_config['POSTGRES_PASSWORD']}@{db_config['HOST']}:" f"{db_config['PORT']}/{db_config['POSTGRES_DB']}", echo=False, ) + return engine.cache[pid] def execute_sql(sql_string): From 17fc025756913ef69cea484cd10016dd0fc19f61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stephan=20G=C3=BCnther?= Date: Tue, 8 Nov 2022 15:37:12 +0100 Subject: [PATCH 04/18] Add a helper converting result rows to dictionaries The "problem" that this solves is that SQLAlchemy has a weird quirk in that a `Query` returns data which is structured differently depending on what is queried. If a single mapped class is queried, the query returns a list of instances of the mapped class where each instance corresponds to a row of the query result. This case is a bit harder to convert to dictionaries, because one has to make use of the `__table__` attribute. All other cases, i.e. querying multiple mapped classes, explicitly listing the columns to query or a combination of both, results in a list of keyed tuples, which are much easier to convert to dictionaries. The helper implemented here combines both cases. --- src/egon/data/db.py | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/src/egon/data/db.py b/src/egon/data/db.py index 8c19a2591..f8fa52cac 100755 --- a/src/egon/data/db.py +++ b/src/egon/data/db.py @@ -14,6 +14,42 @@ from egon.data import config +def asdict(row): + """Convert a result row of an SQLAlchemy query to a dictionary. + + This helper unifies the conversion of two types of query result rows, + namely instances of mapped classes and keyed tuples, to dictionaries. + That way it's suitable for massaging query results into a format which + can easily be converted to a `pandas` `DataFrame` like this: + + ```python + df = pandas.DataFrame.from_records( + [asdict(row) for row in session.query(*columns).all()] + ) + ``` + + Parameters + ---------- + row : SQLAlchemy query result row + + Returns + ------- + dict + The argument converted to a dictionary with column names as keys. + """ + if hasattr(row, "_asdict"): + return row._asdict() + if hasattr(row, "__table__"): + return { + column.name: getattr(row, column.name) + for column in row.__table__.columns + } + raise TypeError( + "Don't know how to convert `row` argument to dict because it has" + " neither an `_asdict`, nor a `__table__` attribute." + ) + + def credentials(): """Return local database connection parameters. From c3b408e4e2582a72394191cdb8598377527a6a7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stephan=20G=C3=BCnther?= Date: Tue, 8 Nov 2022 22:33:04 +0100 Subject: [PATCH 05/18] Enable `asdict` to modify values --- src/egon/data/db.py | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/src/egon/data/db.py b/src/egon/data/db.py index f8fa52cac..7354a4e1b 100755 --- a/src/egon/data/db.py +++ b/src/egon/data/db.py @@ -14,7 +14,7 @@ from egon.data import config -def asdict(row): +def asdict(row, conversions=None): """Convert a result row of an SQLAlchemy query to a dictionary. This helper unifies the conversion of two types of query result rows, @@ -31,19 +31,33 @@ def asdict(row): Parameters ---------- row : SQLAlchemy query result row + conversions : dict + Dictionary mapping column names to functions applied to the values of + that column. The default ist `None` which means no conversion is + applied. Returns ------- dict - The argument converted to a dictionary with column names as keys. + The argument converted to a dictionary with column names as keys and + column values potentially converted by calling + `conversions[column_name](column_value)`. """ + result = None if hasattr(row, "_asdict"): - return row._asdict() + result = row._asdict() if hasattr(row, "__table__"): - return { + result = { column.name: getattr(row, column.name) for column in row.__table__.columns } + if (result is not None) and (conversions is None): + return result + if (result is not None) and (conversions is not None): + return { + k: conversions[k](v) if k in conversions else v + for k, v in result.items() + } raise TypeError( "Don't know how to convert `row` argument to dict because it has" " neither an `_asdict`, nor a `__table__` attribute." From 578146b9cc0e3359c88bdf735a3f10a07c7baeb0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stephan=20G=C3=BCnther?= Date: Tue, 8 Nov 2022 23:44:49 +0100 Subject: [PATCH 06/18] Implement yielding `session` and a `connection` This context manager can be used everywhere, where a `session` is needed to interact with the ORM at the same time as a `connection` to get more direct access to the database. The `session` and the `connection` share the same transaction and everything will be properly committed and closed when exiting the context manager. --- src/egon/data/db.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/egon/data/db.py b/src/egon/data/db.py index 7354a4e1b..1e7db729c 100755 --- a/src/egon/data/db.py +++ b/src/egon/data/db.py @@ -1,4 +1,5 @@ from contextlib import contextmanager +from types import SimpleNamespace import codecs import functools import os @@ -64,6 +65,13 @@ def asdict(row, conversions=None): ) +@contextmanager +def access(): + """Provide a context with a session and an associated connection.""" + with session_scope() as session, session.connection() as c, c.begin(): + yield SimpleNamespace(session=session, connection=c) + + def credentials(): """Return local database connection parameters. From fa8b2bd46d5b89508efc268855f7b8571ac02d4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stephan=20G=C3=BCnther?= Date: Tue, 8 Nov 2022 23:50:44 +0100 Subject: [PATCH 07/18] Use `with access()` to execute textual SQL Theoretically one could also use `session.execute` instead of using the `connection` obtained from `db.access()`, but this is illustrative and safe, since I'm not sure whether `session.execute` behaves exactly the same as `connection.execute`. --- src/egon/data/db.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/egon/data/db.py b/src/egon/data/db.py index 1e7db729c..0de72c445 100755 --- a/src/egon/data/db.py +++ b/src/egon/data/db.py @@ -126,10 +126,8 @@ def execute_sql(sql_string): SQL expression """ - engine_local = engine() - - with engine_local.connect().execution_options(autocommit=True) as con: - con.execute(text(sql_string)) + with access() as database: + database.connection.execute(text(sql_string)) def submit_comment(json, schema, table): From 2cc73c91bd02df79fe6c400fa079605dc11c9b8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stephan=20G=C3=BCnther?= Date: Tue, 8 Nov 2022 23:54:16 +0100 Subject: [PATCH 08/18] Wrap `read_postgis` and `read_sql` in a context manager Again, one could also have used `with engine.begin()` here, so in case this fails, that's what we can try instead. --- src/egon/data/db.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/egon/data/db.py b/src/egon/data/db.py index 0de72c445..9e66f1374 100755 --- a/src/egon/data/db.py +++ b/src/egon/data/db.py @@ -242,7 +242,8 @@ def select_dataframe(sql, index_col=None, warning=True): """ - df = pd.read_sql(sql, engine(), index_col=index_col) + with access() as database: + df = pd.read_sql(sql, database.connection, index_col=index_col) if df.size == 0 and warning is True: print(f"WARNING: No data returned by statement: \n {sql}") @@ -271,9 +272,10 @@ def select_geodataframe(sql, index_col=None, geom_col="geom", epsg=3035): """ - gdf = gpd.read_postgis( - sql, engine(), index_col=index_col, geom_col=geom_col - ) + with access() as database: + gdf = gpd.read_postgis( + sql, database.connection, index_col=index_col, geom_col=geom_col + ) if gdf.size == 0: print(f"WARNING: No data returned by statement: \n {sql}") From 4950314605f755f66cbab493682affde80fb2a95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stephan=20G=C3=BCnther?= Date: Wed, 9 Nov 2022 00:29:44 +0100 Subject: [PATCH 09/18] Replace some uses of `read_sql` and `read_postgis` I tried to replace all instances I found where these functions where used `session.bind`s outside of the `session`'s context manager. Using objects outside their context manager is not a good pattern. These instances worked, because `session.bind` effectively uses the underlying engine, so it should be the same as `db.engine()`, but you never know. Also, these uses where unnecessary because the `DataFrame`s could simply be obtained by using the actual query results. The `GeoDataFrame`s where a little bit harder because they expect Shapely geometries and Geoalchemy2 defaults to a different datatype, but thankfully it also supplies a conversion function. --- .../datasets/electricity_demand/__init__.py | 5 +- .../cts_buildings.py | 150 ++++++++++-------- .../hh_buildings.py | 26 +-- .../hh_profiles.py | 19 ++- .../heavy_duty_transport/create_h2_buses.py | 5 +- .../ev_allocation.py | 30 ++-- .../model_timeseries.py | 21 +-- src/egon/data/datasets/renewable_feedin.py | 29 ++-- src/egon/data/datasets/sanity_checks.py | 64 ++++---- 9 files changed, 200 insertions(+), 149 deletions(-) diff --git a/src/egon/data/datasets/electricity_demand/__init__.py b/src/egon/data/datasets/electricity_demand/__init__.py index 067ca90f6..4446a4c57 100644 --- a/src/egon/data/datasets/electricity_demand/__init__.py +++ b/src/egon/data/datasets/electricity_demand/__init__.py @@ -100,10 +100,11 @@ def get_annual_household_el_demand_cells(): == HouseholdElectricityProfilesInCensusCells.cell_id ) .order_by(HouseholdElectricityProfilesOfBuildings.id) + .all() ) - df_buildings_and_profiles = pd.read_sql( - cells_query.statement, cells_query.session.bind, index_col="id" + df_buildings_and_profiles = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query], index="id" ) # Read demand profiles from egon-data-bundle diff --git a/src/egon/data/datasets/electricity_demand_timeseries/cts_buildings.py b/src/egon/data/datasets/electricity_demand_timeseries/cts_buildings.py index 9d75d0d45..bc0b801ee 100644 --- a/src/egon/data/datasets/electricity_demand_timeseries/cts_buildings.py +++ b/src/egon/data/datasets/electricity_demand_timeseries/cts_buildings.py @@ -290,12 +290,17 @@ def amenities_without_buildings(): EgonDemandRegioZensusElectricity.sector == "service", EgonDemandRegioZensusElectricity.scenario == "eGon2035", ) + .all() ) - df_amenities_without_buildings = gpd.read_postgis( - cells_query.statement, - cells_query.session.bind, - geom_col="geom_amenity", + df_amenities_without_buildings = gpd.GeoDataFrame( + pd.DataFrame.from_records( + [ + db.asdict(row, conversions={"geom_amenity": to_shape}) + for row in cells_query + ] + ), + geometry="geom_amenity", ) return df_amenities_without_buildings @@ -451,9 +456,9 @@ def buildings_with_amenities(): EgonDemandRegioZensusElectricity.sector == "service", EgonDemandRegioZensusElectricity.scenario == "eGon2035", ) - ) - df_amenities_in_buildings = pd.read_sql( - cells_query.statement, cells_query.session.bind, index_col=None + ).all() + df_amenities_in_buildings = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query] ) df_amenities_in_buildings["geom_building"] = df_amenities_in_buildings[ @@ -530,11 +535,16 @@ def buildings_with_amenities(): df_lost_cells["zensus_population_id"] ) ) - - df_lost_cells = gpd.read_postgis( - cells_query.statement, - cells_query.session.bind, - geom_col="geom", + cells_query = cells_query.all() + + df_lost_cells = gpd.GeoDataFrame( + pd.DataFrame.from_records( + [ + db.asdict(row, conversions={"geom": to_shape}) + for row in cells_query + ] + ), + geometry="geom", ) # place random amenity in cell @@ -678,13 +688,18 @@ def buildings_without_amenities(): q_cts_without_amenities ) ) + cells_query = cells_query.all() # df_buildings_without_amenities = pd.read_sql( # cells_query.statement, cells_query.session.bind, index_col=None) - df_buildings_without_amenities = gpd.read_postgis( - cells_query.statement, - cells_query.session.bind, - geom_col="geom_building", + df_buildings_without_amenities = gpd.GeoDataFrame( + pd.DataFrame.from_records( + [ + db.asdict(row, conversions={"geom_building": to_shape}) + for row in cells_query + ] + ), + geometry="geom_building", ) df_buildings_without_amenities = df_buildings_without_amenities.rename( @@ -772,13 +787,17 @@ def cells_with_cts_demand_only(df_buildings_without_amenities): EgonDemandRegioZensusElectricity.zensus_population_id == DestatisZensusPopulationPerHa.id ) + .all() ) - df_cts_cell_without_amenities = gpd.read_postgis( - cells_query.statement, - cells_query.session.bind, - geom_col="geom", - index_col=None, + df_cts_cell_without_amenities = gpd.GeoDataFrame( + pd.DataFrame.from_records( + [ + db.asdict(row, conversions={"geom": to_shape}) + for row in cells_query + ] + ), + geometry="geom", ) # TODO remove after #722 @@ -829,6 +848,7 @@ def calc_census_cell_share(scenario, sector): EgonDemandRegioZensusElectricity.zensus_population_id == MapZensusGridDistricts.zensus_population_id ) + .all() ) elif sector == "heat": @@ -841,12 +861,12 @@ def calc_census_cell_share(scenario, sector): EgonPetaHeat.zensus_population_id == MapZensusGridDistricts.zensus_population_id ) + .all() ) - df_demand = pd.read_sql( - cells_query.statement, - cells_query.session.bind, - index_col="zensus_population_id", + df_demand = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query], + index="zensus_population_id", ) # get demand share of cell per bus @@ -992,23 +1012,24 @@ def calc_cts_building_profiles( egon_building_ids ) ) + .all() ) - df_demand_share = pd.read_sql( - cells_query.statement, cells_query.session.bind, index_col=None + df_demand_share = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query] ) # Get substation cts electricity load profiles of selected bus_ids with db.session_scope() as session: cells_query = ( - session.query(EgonEtragoElectricityCts).filter( - EgonEtragoElectricityCts.scn_name == scenario - ) - ).filter(EgonEtragoElectricityCts.bus_id.in_(bus_ids)) + session.query(EgonEtragoElectricityCts) + .filter(EgonEtragoElectricityCts.scn_name == scenario) + .filter(EgonEtragoElectricityCts.bus_id.in_(bus_ids)) + .all() + ) - df_cts_profiles = pd.read_sql( - cells_query.statement, - cells_query.session.bind, + df_cts_profiles = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query] ) df_cts_profiles = pd.DataFrame.from_dict( df_cts_profiles.set_index("bus_id")["p_set"].to_dict(), @@ -1029,23 +1050,24 @@ def calc_cts_building_profiles( egon_building_ids ) ) + .all() ) - df_demand_share = pd.read_sql( - cells_query.statement, cells_query.session.bind, index_col=None + df_demand_share = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query] ) # Get substation cts heat load profiles of selected bus_ids with db.session_scope() as session: cells_query = ( - session.query(EgonEtragoHeatCts).filter( - EgonEtragoHeatCts.scn_name == scenario - ) - ).filter(EgonEtragoHeatCts.bus_id.in_(bus_ids)) + session.query(EgonEtragoHeatCts) + .filter(EgonEtragoHeatCts.scn_name == scenario) + .filter(EgonEtragoHeatCts.bus_id.in_(bus_ids)) + .all() + ) - df_cts_profiles = pd.read_sql( - cells_query.statement, - cells_query.session.bind, + df_cts_profiles = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query] ) df_cts_profiles = pd.DataFrame.from_dict( df_cts_profiles.set_index("bus_id")["p_set"].to_dict(), @@ -1097,12 +1119,10 @@ def remove_double_bus_id(df_cts_buildings): cells_query = session.query( MapZensusGridDistricts.zensus_population_id, MapZensusGridDistricts.bus_id, - ) + ).all() - df_egon_map_zensus_buildings_buses = pd.read_sql( - cells_query.statement, - cells_query.session.bind, - index_col=None, + df_egon_map_zensus_buildings_buses = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query] ) df_cts_buildings = pd.merge( left=df_cts_buildings, @@ -1330,10 +1350,10 @@ def cts_electricity(): """ log.info("Start logging!") with db.session_scope() as session: - cells_query = session.query(CtsBuildings) + cells_query = session.query(CtsBuildings).all() - df_cts_buildings = pd.read_sql( - cells_query.statement, cells_query.session.bind, index_col=None + df_cts_buildings = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query] ) log.info("CTS buildings from DB imported!") df_demand_share_2035 = calc_building_demand_profile_share( @@ -1369,10 +1389,10 @@ def cts_heat(): """ log.info("Start logging!") with db.session_scope() as session: - cells_query = session.query(CtsBuildings) + cells_query = session.query(CtsBuildings).all() - df_cts_buildings = pd.read_sql( - cells_query.statement, cells_query.session.bind, index_col=None + df_cts_buildings = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query] ) log.info("CTS buildings from DB imported!") @@ -1425,19 +1445,20 @@ def get_cts_electricity_peak_load(): ).filter( EgonCtsElectricityDemandBuildingShare.scenario == scenario ) + cells_query = cells_query.all() - df_demand_share = pd.read_sql( - cells_query.statement, cells_query.session.bind, index_col=None + df_demand_share = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query] ) with db.session_scope() as session: cells_query = session.query(EgonEtragoElectricityCts).filter( EgonEtragoElectricityCts.scn_name == scenario ) + cells_query = cells_query.all() - df_cts_profiles = pd.read_sql( - cells_query.statement, - cells_query.session.bind, + df_cts_profiles = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query] ) df_cts_profiles = pd.DataFrame.from_dict( df_cts_profiles.set_index("bus_id")["p_set"].to_dict(), @@ -1498,9 +1519,10 @@ def get_cts_heat_peak_load(): ).filter( EgonCtsElectricityDemandBuildingShare.scenario == scenario ) + cells_query = cells_query.all() - df_demand_share = pd.read_sql( - cells_query.statement, cells_query.session.bind, index_col=None + df_demand_share = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query] ) log.info(f"Retrieved demand share for scenario: {scenario}") @@ -1508,10 +1530,10 @@ def get_cts_heat_peak_load(): cells_query = session.query(EgonEtragoHeatCts).filter( EgonEtragoHeatCts.scn_name == scenario ) + cells_query = cells_query.all() - df_cts_profiles = pd.read_sql( - cells_query.statement, - cells_query.session.bind, + df_cts_profiles = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query] ) log.info(f"Retrieved substation profiles for scenario: {scenario}") diff --git a/src/egon/data/datasets/electricity_demand_timeseries/hh_buildings.py b/src/egon/data/datasets/electricity_demand_timeseries/hh_buildings.py index 268d8ad97..a33b31ee8 100755 --- a/src/egon/data/datasets/electricity_demand_timeseries/hh_buildings.py +++ b/src/egon/data/datasets/electricity_demand_timeseries/hh_buildings.py @@ -249,12 +249,11 @@ def match_osm_and_zensus_data( cells_query = session.query( egon_destatis_building_count.c.zensus_population_id, egon_destatis_building_count.c.building_count, - ) + ).all() - egon_destatis_building_count = pd.read_sql( - cells_query.statement, - cells_query.session.bind, - index_col="zensus_population_id", + egon_destatis_building_count = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query], + index="zensus_population_id", ) egon_destatis_building_count = egon_destatis_building_count.dropna() @@ -649,10 +648,11 @@ def get_building_peak_loads(): == HouseholdElectricityProfilesInCensusCells.cell_id ) .order_by(HouseholdElectricityProfilesOfBuildings.id) + .all() ) - df_buildings_and_profiles = pd.read_sql( - cells_query.statement, cells_query.session.bind, index_col="id" + df_buildings_and_profiles = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query], index="id" ) # Read demand profiles from egon-data-bundle @@ -756,15 +756,17 @@ def map_houseprofiles_to_buildings(): with db.session_scope() as session: cells_query = session.query(egon_map_zensus_buildings_residential) - egon_map_zensus_buildings_residential = pd.read_sql( - cells_query.statement, cells_query.session.bind, index_col=None + cells_query = cells_query.all() + egon_map_zensus_buildings_residential = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query] ) with db.session_scope() as session: cells_query = session.query(HouseholdElectricityProfilesInCensusCells) - egon_hh_profile_in_zensus_cell = pd.read_sql( - cells_query.statement, cells_query.session.bind, index_col=None - ) # index_col="cell_id") + cells_query = cells_query.all() + egon_hh_profile_in_zensus_cell = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query] # , index="cell_id") + ) # Match OSM and zensus data to define missing buildings missing_buildings = match_osm_and_zensus_data( diff --git a/src/egon/data/datasets/electricity_demand_timeseries/hh_profiles.py b/src/egon/data/datasets/electricity_demand_timeseries/hh_profiles.py index fc12de792..1629c1447 100644 --- a/src/egon/data/datasets/electricity_demand_timeseries/hh_profiles.py +++ b/src/egon/data/datasets/electricity_demand_timeseries/hh_profiles.py @@ -1587,8 +1587,8 @@ def get_houseprofiles_in_census_cells(): with db.session_scope() as session: q = session.query(HouseholdElectricityProfilesInCensusCells) - census_profile_mapping = pd.read_sql( - q.statement, q.session.bind, index_col="cell_id" + census_profile_mapping = pd.DataFrame.from_records( + [db.asdict(row) for row in q.all()], index="cell_id" ) return census_profile_mapping @@ -1668,9 +1668,10 @@ def get_cell_demand_metadata_from_db(attribute, list_of_identifiers): list_of_identifiers ) ) + cells_query = cells_query.all() - cell_demand_metadata = pd.read_sql( - cells_query.statement, cells_query.session.bind, index_col="cell_id" + cell_demand_metadata = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query], index="cell_id" ) return cell_demand_metadata @@ -1699,9 +1700,10 @@ def get_hh_profiles_from_db(profile_ids): cells_query = session.query( IeeHouseholdLoadProfiles.load_in_wh, IeeHouseholdLoadProfiles.type ).filter(IeeHouseholdLoadProfiles.type.in_(profile_ids)) + cells_query = cells_query.all() - df_profile_loads = pd.read_sql( - cells_query.statement, cells_query.session.bind, index_col="type" + df_profile_loads = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query], index="type" ) # convert array to Dataframe @@ -1754,9 +1756,10 @@ def tuple_format(x): HouseholdElectricityProfilesInCensusCells.cell_id == MapZensusGridDistricts.zensus_population_id, ) + cells_query = cells_query.all() - cells = pd.read_sql( - cells_query.statement, cells_query.session.bind, index_col="cell_id" + cells = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query], index="cell_id" ) # convert profile ids to tuple (type, id) format diff --git a/src/egon/data/datasets/emobility/heavy_duty_transport/create_h2_buses.py b/src/egon/data/datasets/emobility/heavy_duty_transport/create_h2_buses.py index 7611db043..bd2fe76b3 100644 --- a/src/egon/data/datasets/emobility/heavy_duty_transport/create_h2_buses.py +++ b/src/egon/data/datasets/emobility/heavy_duty_transport/create_h2_buses.py @@ -163,8 +163,11 @@ def read_hgv_h2_demand(scenario: str = "eGon2035"): EgonHeavyDutyTransportVoronoi.scenario, EgonHeavyDutyTransportVoronoi.hydrogen_consumption, ).filter(EgonHeavyDutyTransportVoronoi.scenario == scenario) + query = query.all() - df = pd.read_sql(query.statement, query.session.bind, index_col="nuts3") + df = pd.DataFrame.from_records( + [db.asdict(row) for row in query], index="nuts3" + ) sql_vg250 = """ SELECT nuts as nuts3, geometry as geom diff --git a/src/egon/data/datasets/emobility/motorized_individual_travel/ev_allocation.py b/src/egon/data/datasets/emobility/motorized_individual_travel/ev_allocation.py index a64ed1c50..7ac19be63 100644 --- a/src/egon/data/datasets/emobility/motorized_individual_travel/ev_allocation.py +++ b/src/egon/data/datasets/emobility/motorized_individual_travel/ev_allocation.py @@ -205,10 +205,10 @@ def calc_evs_per_municipality(ev_data, rs7_data): Vg250GemPopulation.ags_0.label("ags"), Vg250GemPopulation.gen, Vg250GemPopulation.population_total.label("pop"), - ) + ).all() - muns = pd.read_sql( - query.statement, query.session.bind, index_col=None + muns = pd.DataFrame.from_records( + [db.asdict(row) for row in query], index=None ).astype({"ags": "int64"}) muns["ags_district"] = ( @@ -312,12 +312,11 @@ def calc_evs_per_grid_district(ev_data_muns): ) .group_by(MvGridDistricts.bus_id, Vg250Gem.ags) .order_by(Vg250Gem.ags) + .all() ) - mvgd_pop_per_mun = pd.read_sql( - query_pop_per_mvgd.statement, - query_pop_per_mvgd.session.bind, - index_col=None, + mvgd_pop_per_mun = pd.DataFrame.from_records( + [db.asdict(row) for row in query_pop_per_mvgd] ).astype({"bus_id": "int64", "pop": "int64", "ags": "int64"}) # Calc population share of each municipality in MVGD @@ -547,11 +546,13 @@ def get_random_evs(row): # Load EVs per grid district print("Loading EV counts for grid districts...") with db.session_scope() as session: - query = session.query(EgonEvCountMvGridDistrict).filter( - EgonEvCountMvGridDistrict.scenario == scenario_name + query = ( + session.query(EgonEvCountMvGridDistrict) + .filter(EgonEvCountMvGridDistrict.scenario == scenario_name) + .all() ) - ev_per_mvgd = pd.read_sql( - query.statement, query.session.bind, index_col=None + ev_per_mvgd = pd.DataFrame.from_records( + [db.asdict(row) for row in query] ) # Convert EV types' wide to long format @@ -569,11 +570,8 @@ def get_random_evs(row): query = session.query(EgonEvPool).filter( EgonEvPool.scenario == scenario_name ) - ev_pool = pd.read_sql( - query.statement, - query.session.bind, - index_col=None, - ) + query = query.all() + ev_pool = pd.DataFrame.from_records([db.asdict(row) for row in query]) # Draw EVs randomly for each grid district from pool print(" Draw EVs from pool for grid districts...") diff --git a/src/egon/data/datasets/emobility/motorized_individual_travel/model_timeseries.py b/src/egon/data/datasets/emobility/motorized_individual_travel/model_timeseries.py index 8ef1ca408..e99b797c0 100644 --- a/src/egon/data/datasets/emobility/motorized_individual_travel/model_timeseries.py +++ b/src/egon/data/datasets/emobility/motorized_individual_travel/model_timeseries.py @@ -473,10 +473,11 @@ def load_evs_trips( .order_by( EgonEvTrip.egon_ev_pool_ev_id, EgonEvTrip.simbev_event_id ) + .all() ) - trip_data = pd.read_sql( - query.statement, query.session.bind, index_col=None + trip_data = pd.DataFrame.from_records( + [db.asdict(row) for row in query] ).astype( { "ev_id": "int", @@ -562,10 +563,11 @@ def calc_initial_ev_soc(bus_id: int, scenario_name: str) -> pd.DataFrame: EgonEvTrip.simbev_event_id == 0, ) .group_by(EgonEvPool.type) + .all() ) - initial_soc_per_ev_type = pd.read_sql( - query_ev_soc.statement, query_ev_soc.session.bind, index_col="type" + initial_soc_per_ev_type = pd.DataFrame.from_records( + [db.asdict(row) for row in query_ev_soc], index="type" ) initial_soc_per_ev_type[ @@ -905,9 +907,9 @@ def delete_model_data_from_db(): def load_grid_district_ids() -> pd.Series: """Load bus IDs of all grid districts""" with db.session_scope() as session: - query_mvgd = session.query(MvGridDistricts.bus_id) - return pd.read_sql( - query_mvgd.statement, query_mvgd.session.bind, index_col=None + query_mvgd = session.query(MvGridDistricts.bus_id).all() + return pd.DataFrame.from_records( + [db.asdict(row) for row in query_mvgd] ).bus_id.sort_values() @@ -1014,9 +1016,10 @@ def generate_model_data_bunch(scenario_name: str, bunch: range) -> None: ) .filter(EgonEvMvGridDistrict.bus_id.in_(mvgd_bus_ids)) .filter(EgonEvMvGridDistrict.egon_ev_pool_ev_id.isnot(None)) + .all() ) - evs_grid_district = pd.read_sql( - query.statement, query.session.bind, index_col=None + evs_grid_district = pd.DataFrame.from_records( + [db.asdict(row) for row in query] ).astype({"ev_id": "int"}) mvgd_bus_ids = evs_grid_district.bus_id.unique() diff --git a/src/egon/data/datasets/renewable_feedin.py b/src/egon/data/datasets/renewable_feedin.py index c9122d6ef..9f5d379d7 100644 --- a/src/egon/data/datasets/renewable_feedin.py +++ b/src/egon/data/datasets/renewable_feedin.py @@ -2,6 +2,7 @@ Central module containing all code dealing with processing era5 weather data. """ +from geoalchemy2.shape import to_shape from sqlalchemy import Column, ForeignKey, Integer from sqlalchemy.ext.declarative import declarative_base import geopandas as gpd @@ -582,22 +583,30 @@ def mapping_zensus_weather(): ), DestatisZensusPopulationPerHaInsideGermany.geom_point, ) + cells_query = cells_query.all() - gdf_zensus_population = gpd.read_postgis( - cells_query.statement, - cells_query.session.bind, - index_col=None, - geom_col="geom_point", + gdf_zensus_population = gpd.GeoDataFrame( + pd.DataFrame.from_records( + [ + db.asdict(row, conversions={"geom_point": to_shape}) + for row in cells_query + ] + ), + geometry="geom_point", ) with db.session_scope() as session: cells_query = session.query(EgonEra5Cells.w_id, EgonEra5Cells.geom) + cells_query = cells_query.all() - gdf_weather_cell = gpd.read_postgis( - cells_query.statement, - cells_query.session.bind, - index_col=None, - geom_col="geom", + gdf_weather_cell = gpd.GeoDataFrame( + pd.DataFrame.from_records( + [ + db.asdict(row, conversions={"geom": to_shape}) + for row in cells_query + ] + ), + geometry="geom", ) # CRS is 4326 gdf_weather_cell = gdf_weather_cell.to_crs(epsg=3035) diff --git a/src/egon/data/datasets/sanity_checks.py b/src/egon/data/datasets/sanity_checks.py index b74101f58..8bfbb285a 100644 --- a/src/egon/data/datasets/sanity_checks.py +++ b/src/egon/data/datasets/sanity_checks.py @@ -620,9 +620,10 @@ def cts_electricity_demand_share(rtol=1e-5): with db.session_scope() as session: cells_query = session.query(EgonCtsElectricityDemandBuildingShare) + cells_query = cells_query.all() - df_demand_share = pd.read_sql( - cells_query.statement, cells_query.session.bind, index_col=None + df_demand_share = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query] ) np.testing.assert_allclose( @@ -646,10 +647,10 @@ def cts_heat_demand_share(rtol=1e-5): to all buildings.""" with db.session_scope() as session: - cells_query = session.query(EgonCtsHeatDemandBuildingShare) + cells_query = session.query(EgonCtsHeatDemandBuildingShare).all() - df_demand_share = pd.read_sql( - cells_query.statement, cells_query.session.bind, index_col=None + df_demand_share = pd.DataFrame.from_records( + [db.asdict(row) for row in cells_query] ) np.testing.assert_allclose( @@ -711,9 +712,10 @@ def check_ev_allocation(): table.scenario == scenario_name, table.scenario_variation == scenario_var_name, ) + query = query.all() - ev_counts = pd.read_sql( - query.statement, query.session.bind, index_col=None + ev_counts = pd.DataFrame.from_records( + [db.asdict(row) for row in query] ) ev_counts_dict[level] = ev_counts.iloc[0].ev_count print( @@ -744,8 +746,9 @@ def check_ev_allocation(): EgonEvMvGridDistrict.scenario == scenario_name, EgonEvMvGridDistrict.scenario_variation == scenario_var_name, ) + query = query.all() ev_count_alloc = ( - pd.read_sql(query.statement, query.session.bind, index_col=None) + pd.DataFrame.from_records([db.asdict(row) for row in query]) .iloc[0] .ev_count ) @@ -789,8 +792,9 @@ def check_trip_data(): ), EgonEvTrip.scenario == scenario_name, ) - invalid_trips = pd.read_sql( - query.statement, query.session.bind, index_col=None + query = query.all() + invalid_trips = pd.DataFrame.from_records( + [db.asdict(row) for row in query] ) np.testing.assert_equal( invalid_trips.iloc[0].cnt, @@ -820,8 +824,9 @@ def check_trip_data(): < cast(EgonEvTrip.charging_demand, Numeric), EgonEvTrip.scenario == scenario_name, ) - invalid_trips = pd.read_sql( - query.statement, query.session.bind, index_col=None + query = query.all() + invalid_trips = pd.DataFrame.from_records( + [db.asdict(row) for row in query] ) np.testing.assert_equal( invalid_trips.iloc[0].cnt, @@ -848,9 +853,10 @@ def check_model_data(): == scenario_var_name, ) .group_by(EgonEvMvGridDistrict.bus_id) + .all() ) mvgds_with_ev = ( - pd.read_sql(query.statement, query.session.bind, index_col=None) + pd.DataFrame.from_records([db.asdict(row) for row in query]) .bus_id.sort_values() .to_list() ) @@ -884,9 +890,10 @@ def check_model_data(): EgonPfHvLink.bus1 == EgonPfHvLoad.bus, EgonPfHvLink.bus1 == EgonPfHvStore.bus, ) + .all() ) - model_components = pd.read_sql( - query.statement, query.session.bind, index_col=None + model_components = pd.DataFrame.from_records( + [db.asdict(row) for row in query] ) # Check number of buses with model components connected @@ -966,10 +973,9 @@ def check_model_data(): ), attrs["table_ts"].scn_name == scenario_name, ) - attrs["ts"] = pd.read_sql( - query.statement, - query.session.bind, - index_col=attrs["column_id"], + attrs["ts"] = pd.DataFrame.from_records( + [db.asdict(row) for row in query.all()], + index=attrs["column_id"], ) # Check if all timeseries have 8760 steps @@ -1024,9 +1030,10 @@ def check_model_data(): EgonPfHvStore.scn_name == scenario_name, EgonPfHvStore.carrier == "battery storage", ) + query = query.all() storage_capacity_model = ( - pd.read_sql( - query.statement, query.session.bind, index_col=None + pd.DataFrame.from_records( + [db.asdict(row) for row in query] ).e_nom.sum() / 1e3 ) @@ -1057,9 +1064,10 @@ def check_model_data(): EgonEvPool.scenario == scenario_name, ) .group_by(EgonEvMvGridDistrict.bus_id, EgonEvPool.type) + .all() ) - count_per_ev_all = pd.read_sql( - query.statement, query.session.bind, index_col="bus_id" + count_per_ev_all = pd.DataFrame.from_records( + [db.asdict(row) for row in query], index="bus_id" ) count_per_ev_all["bat_cap"] = count_per_ev_all.type.map( meta_tech_data.battery_capacity @@ -1126,9 +1134,10 @@ def check_model_data_lowflex_eGon2035(): EgonPfHvLoad.scn_name == "eGon2035", EgonPfHvLoadTimeseries.scn_name == "eGon2035", ) + .all() ) - model_driving_load = pd.read_sql( - query.statement, query.session.bind, index_col=None + model_driving_load = pd.DataFrame.from_records( + [db.asdict(row) for row in query] ) driving_load = np.array(model_driving_load.p_set.to_list()).sum(axis=0) @@ -1151,9 +1160,10 @@ def check_model_data_lowflex_eGon2035(): EgonPfHvLoad.scn_name == "eGon2035_lowflex", EgonPfHvLoadTimeseries.scn_name == "eGon2035_lowflex", ) + .all() ) - model_charging_load_lowflex = pd.read_sql( - query.statement, query.session.bind, index_col=None + model_charging_load_lowflex = pd.DataFrame.from_records( + [db.asdict(row) for row in query] ) charging_load = np.array( model_charging_load_lowflex.p_set.to_list() From 2bed38d01dbb4f59a8b585803ad2ed9055670ba7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stephan=20G=C3=BCnther?= Date: Thu, 10 Nov 2022 01:11:46 +0100 Subject: [PATCH 10/18] Don't use session outside of its context manager In previous commits, this use of `read_postgis` was replaced with a combination of `GeoDataFrame` and `DataFrame.from_records`. I couldn't use the same technique here, because there's no `geom_column` argument to `read_postgis` which means that I don't know which column to convert using `to_shape`. While this can probably be figured out, I don't have the time for now so it's a TODO for later. So in order to not use the session after it is closed (which is not strictly wrong, because we only use the `bind` attribute, but it still leaves the door open to unknown behaviour), I'm replacing the session with a call to `db.engine()`. Due to the per-process caching of engines, this doesn't incur additional connections, while it also should be identical in behaviour to using `session.bind`. --- .../data/datasets/electricity_demand_timeseries/hh_buildings.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/egon/data/datasets/electricity_demand_timeseries/hh_buildings.py b/src/egon/data/datasets/electricity_demand_timeseries/hh_buildings.py index a33b31ee8..8c0340554 100755 --- a/src/egon/data/datasets/electricity_demand_timeseries/hh_buildings.py +++ b/src/egon/data/datasets/electricity_demand_timeseries/hh_buildings.py @@ -345,7 +345,7 @@ def generate_synthetic_buildings(missing_buildings, edge_length): ) destatis_zensus_population_per_ha_inside_germany = gpd.read_postgis( - cells_query.statement, cells_query.session.bind, index_col="id" + cells_query.statement, db.engine(), index_col="id" ) # add geom data of zensus cell From 269e3eed3ca62b1aa368d1cbf8dfbb866d1c212f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stephan=20G=C3=BCnther?= Date: Thu, 10 Nov 2022 02:31:13 +0100 Subject: [PATCH 11/18] Remove `db.engine()` as default parameter values Since default parameter values are evaluated at function definition time and stay with the function for its entire lifetime, they are essentially the same as module level variables (at least for top level functions, that is). So `db.engine()` as a default parameter value has the same problems as `db.engine()` at module level and should be removed accordingly. Fortunately removing it is as simple as setting the default parameter value to `None` and then checking for `None` at the start of the function body, which is what this commit does. --- src/egon/data/datasets/DSM_cts_ind.py | 5 ++++- .../datasets/electricity_demand_timeseries/tools.py | 10 ++++++++-- .../data/datasets/power_plants/pv_ground_mounted.py | 4 +++- 3 files changed, 15 insertions(+), 4 deletions(-) diff --git a/src/egon/data/datasets/DSM_cts_ind.py b/src/egon/data/datasets/DSM_cts_ind.py index a52279497..e1916a28d 100644 --- a/src/egon/data/datasets/DSM_cts_ind.py +++ b/src/egon/data/datasets/DSM_cts_ind.py @@ -778,7 +778,7 @@ def delete_dsm_entries(carrier): db.execute_sql(sql) def dsm_cts_ind( - con=db.engine(), + con=None, cts_cool_vent_ac_share=0.22, ind_cool_vent_share=0.039, ind_vent_share=0.017, @@ -805,6 +805,9 @@ def dsm_cts_ind( """ + if con is None: + con = db.engine() + # CTS per osm-area: cooling, ventilation and air conditioning print(" ") diff --git a/src/egon/data/datasets/electricity_demand_timeseries/tools.py b/src/egon/data/datasets/electricity_demand_timeseries/tools.py index 4ccabe603..24f94ae90 100644 --- a/src/egon/data/datasets/electricity_demand_timeseries/tools.py +++ b/src/egon/data/datasets/electricity_demand_timeseries/tools.py @@ -66,7 +66,7 @@ def random_ints_until_sum(s_sum, m_max): return list_r -def write_table_to_postgis(gdf, table, engine=db.engine(), drop=True): +def write_table_to_postgis(gdf, table, engine=None, drop=True): """ Helper function to append df data to table in db. Only predefined columns are passed. Error will raise if column is missing. Dtype of columns are @@ -85,6 +85,9 @@ def write_table_to_postgis(gdf, table, engine=db.engine(), drop=True): """ + if engine is None: + engine = db.engine() + # Only take in db table defined columns columns = [column.key for column in table.__table__.columns] gdf = gdf.loc[:, columns] @@ -139,7 +142,7 @@ def psql_insert_copy(table, conn, keys, data_iter): def write_table_to_postgres( - df, db_table, engine=db.engine(), drop=False, index=False, if_exists="append" + df, db_table, engine=None, drop=False, index=False, if_exists="append" ): """ Helper function to append df data to table in db. Fast string-copy is used. @@ -165,6 +168,9 @@ def write_table_to_postgres( """ + if engine is None: + engine = db.engine() + # Only take in db table defined columns and dtypes columns = { column.key: column.type for column in db_table.__table__.columns diff --git a/src/egon/data/datasets/power_plants/pv_ground_mounted.py b/src/egon/data/datasets/power_plants/pv_ground_mounted.py index e9b92b5b6..042b685c2 100644 --- a/src/egon/data/datasets/power_plants/pv_ground_mounted.py +++ b/src/egon/data/datasets/power_plants/pv_ground_mounted.py @@ -738,7 +738,7 @@ def keep_existing_pv(mastr, con): return pv_exist def run_methodology( - con=db.engine(), + con=None, path="", pow_per_area=0.04, join_buffer=10, @@ -765,6 +765,8 @@ def run_methodology( """ + if con is None: + con = db.engine() ### print(" ") print("MaStR-Data") From 1ad9e9fcd768ba10c078db5127d1761e2ab84b56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stephan=20G=C3=BCnther?= Date: Thu, 10 Nov 2022 02:50:27 +0100 Subject: [PATCH 12/18] Remove module level `engine` variables Engines are not supposed to be shared across process boundaries. This is ensured via returning distinct `engine` instances for distinct processes from `db.engine()`. Storing `engine`s on a module level might subvert this mechanism, so these variables get removed and replaced by individual calls to `db.engine()`. Some of these variable's weren't even used in their module. --- .../datasets/electricity_demand/__init__.py | 1 - .../hh_buildings.py | 27 ++++++++++--------- .../hh_profiles.py | 21 +++++++-------- .../electricity_demand_timeseries/tools.py | 2 -- src/egon/data/datasets/renewable_feedin.py | 5 ++-- 5 files changed, 26 insertions(+), 30 deletions(-) diff --git a/src/egon/data/datasets/electricity_demand/__init__.py b/src/egon/data/datasets/electricity_demand/__init__.py index 4446a4c57..15cb12d86 100644 --- a/src/egon/data/datasets/electricity_demand/__init__.py +++ b/src/egon/data/datasets/electricity_demand/__init__.py @@ -21,7 +21,6 @@ # will be later imported from another file ### Base = declarative_base() -engine = db.engine() class HouseholdElectricityDemand(Dataset): diff --git a/src/egon/data/datasets/electricity_demand_timeseries/hh_buildings.py b/src/egon/data/datasets/electricity_demand_timeseries/hh_buildings.py index 8c0340554..9edb61cf5 100755 --- a/src/egon/data/datasets/electricity_demand_timeseries/hh_buildings.py +++ b/src/egon/data/datasets/electricity_demand_timeseries/hh_buildings.py @@ -131,7 +131,6 @@ ) import egon.data.config -engine = db.engine() Base = declarative_base() data_config = egon.data.config.datasets() @@ -243,7 +242,7 @@ def match_osm_and_zensus_data( schema="society", ) # get table metadata from db by name and schema - inspect(engine).reflecttable(egon_destatis_building_count, None) + inspect(db.engine()).reflecttable(egon_destatis_building_count, None) with db.session_scope() as session: cells_query = session.query( @@ -331,7 +330,7 @@ def generate_synthetic_buildings(missing_buildings, edge_length): schema="society", ) # get table metadata from db by name and schema - inspect(engine).reflecttable( + inspect(db.engine()).reflecttable( destatis_zensus_population_per_ha_inside_germany, None ) @@ -384,7 +383,7 @@ def generate_synthetic_buildings(missing_buildings, edge_length): # get table metadata from db by name and schema buildings = Table("osm_buildings", Base.metadata, schema="openstreetmap") - inspect(engine).reflecttable(buildings, None) + inspect(db.engine()).reflecttable(buildings, None) # get max number of building ids from non-filtered building table with db.session_scope() as session: @@ -588,7 +587,7 @@ def reduce_synthetic_buildings( buildings = Table("osm_buildings", Base.metadata, schema="openstreetmap") # get table metadata from db by name and schema - inspect(engine).reflecttable(buildings, None) + inspect(db.engine()).reflecttable(buildings, None) # total number of buildings with db.session_scope() as session: @@ -705,10 +704,10 @@ def ve(s): df_building_peak_loads["sector"] = "residential" BuildingElectricityPeakLoads.__table__.drop( - bind=engine, checkfirst=True + bind=db.engine(), checkfirst=True ) BuildingElectricityPeakLoads.__table__.create( - bind=engine, checkfirst=True + bind=db.engine(), checkfirst=True ) df_building_peak_loads = df_building_peak_loads.melt( @@ -752,7 +751,9 @@ def map_houseprofiles_to_buildings(): schema="boundaries", ) # get table metadata from db by name and schema - inspect(engine).reflecttable(egon_map_zensus_buildings_residential, None) + inspect(db.engine()).reflecttable( + egon_map_zensus_buildings_residential, None + ) with db.session_scope() as session: cells_query = session.query(egon_map_zensus_buildings_residential) @@ -802,13 +803,13 @@ def map_houseprofiles_to_buildings(): # synthetic_buildings = synthetic_buildings.drop(columns=["grid_id"]) synthetic_buildings["n_amenities_inside"] = 0 - OsmBuildingsSynthetic.__table__.drop(bind=engine, checkfirst=True) - OsmBuildingsSynthetic.__table__.create(bind=engine, checkfirst=True) + OsmBuildingsSynthetic.__table__.drop(bind=db.engine(), checkfirst=True) + OsmBuildingsSynthetic.__table__.create(bind=db.engine(), checkfirst=True) # Write new buildings incl coord into db synthetic_buildings.to_postgis( "osm_buildings_synthetic", - con=engine, + con=db.engine(), if_exists="append", schema="openstreetmap", dtype={ @@ -823,10 +824,10 @@ def map_houseprofiles_to_buildings(): ) HouseholdElectricityProfilesOfBuildings.__table__.drop( - bind=engine, checkfirst=True + bind=db.engine(), checkfirst=True ) HouseholdElectricityProfilesOfBuildings.__table__.create( - bind=engine, checkfirst=True + bind=db.engine(), checkfirst=True ) # Write building mapping into db diff --git a/src/egon/data/datasets/electricity_demand_timeseries/hh_profiles.py b/src/egon/data/datasets/electricity_demand_timeseries/hh_profiles.py index 1629c1447..466fa2a4f 100644 --- a/src/egon/data/datasets/electricity_demand_timeseries/hh_profiles.py +++ b/src/egon/data/datasets/electricity_demand_timeseries/hh_profiles.py @@ -139,7 +139,6 @@ import egon.data.config Base = declarative_base() -engine = db.engine() # Get random seed from config @@ -270,13 +269,13 @@ def write_hh_profiles_to_db(hh_profiles): hh_profiles = hh_profiles.groupby("type").load_in_wh.apply(tuple) hh_profiles = hh_profiles.reset_index() - IeeHouseholdLoadProfiles.__table__.drop(bind=engine, checkfirst=True) - IeeHouseholdLoadProfiles.__table__.create(bind=engine) + IeeHouseholdLoadProfiles.__table__.drop(bind=db.engine(), checkfirst=True) + IeeHouseholdLoadProfiles.__table__.create(bind=db.engine()) hh_profiles.to_sql( name=IeeHouseholdLoadProfiles.__table__.name, schema=IeeHouseholdLoadProfiles.__table__.schema, - con=engine, + con=db.engine(), if_exists="append", method="multi", chunksize=100, @@ -1443,10 +1442,10 @@ def get_load_timeseries( def write_refinded_households_to_db(df_census_households_grid_refined): # Write allocation table into database EgonDestatisZensusHouseholdPerHaRefined.__table__.drop( - bind=engine, checkfirst=True + bind=db.engine(), checkfirst=True ) EgonDestatisZensusHouseholdPerHaRefined.__table__.create( - bind=engine, checkfirst=True + bind=db.engine(), checkfirst=True ) with db.session_scope() as session: @@ -1558,10 +1557,10 @@ def gen_profile_names(n): # Write allocation table into database HouseholdElectricityProfilesInCensusCells.__table__.drop( - bind=engine, checkfirst=True + bind=db.engine(), checkfirst=True ) HouseholdElectricityProfilesInCensusCells.__table__.create( - bind=engine, checkfirst=True + bind=db.engine(), checkfirst=True ) with db.session_scope() as session: @@ -1795,16 +1794,16 @@ def tuple_format(x): if drop_table: EgonEtragoElectricityHouseholds.__table__.drop( - bind=engine, checkfirst=True + bind=db.engine(), checkfirst=True ) EgonEtragoElectricityHouseholds.__table__.create( - bind=engine, checkfirst=True + bind=db.engine(), checkfirst=True ) # Insert data into respective database table mvgd_profiles.to_sql( name=EgonEtragoElectricityHouseholds.__table__.name, schema=EgonEtragoElectricityHouseholds.__table__.schema, - con=engine, + con=db.engine(), if_exists="append", method="multi", chunksize=10000, diff --git a/src/egon/data/datasets/electricity_demand_timeseries/tools.py b/src/egon/data/datasets/electricity_demand_timeseries/tools.py index 24f94ae90..aa6b90d03 100644 --- a/src/egon/data/datasets/electricity_demand_timeseries/tools.py +++ b/src/egon/data/datasets/electricity_demand_timeseries/tools.py @@ -8,8 +8,6 @@ from egon.data import db -engine = db.engine() - def random_point_in_square(geom, tol): """ diff --git a/src/egon/data/datasets/renewable_feedin.py b/src/egon/data/datasets/renewable_feedin.py index 9f5d379d7..55e422d80 100644 --- a/src/egon/data/datasets/renewable_feedin.py +++ b/src/egon/data/datasets/renewable_feedin.py @@ -37,7 +37,6 @@ def __init__(self, dependencies): Base = declarative_base() -engine = db.engine() class MapZensusWeatherCell(Base): @@ -615,8 +614,8 @@ def mapping_zensus_weather(): gdf_weather_cell, how="left", predicate="within" ) - MapZensusWeatherCell.__table__.drop(bind=engine, checkfirst=True) - MapZensusWeatherCell.__table__.create(bind=engine, checkfirst=True) + MapZensusWeatherCell.__table__.drop(bind=db.engine(), checkfirst=True) + MapZensusWeatherCell.__table__.create(bind=db.engine(), checkfirst=True) # Write mapping into db with db.session_scope() as session: From 94827fb5f4a017ffb23c836fa4d8ec8e1481779e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stephan=20G=C3=BCnther?= Date: Thu, 10 Nov 2022 03:09:54 +0100 Subject: [PATCH 13/18] Remove unclosed `session`s These `session` are used at or near the top of functions and are never closed, potentially leaking connections. Using the `session_scoped` decorator on the functions allows us to get a `session` for the whole function which is automatically committed and closed at the end of the function. Note that one `sessionmaker` import gets removed because it's just unused. --- src/egon/data/datasets/chp/__init__.py | 19 ++++++++++++------- src/egon/data/datasets/chp/match_nep.py | 6 ++---- src/egon/data/datasets/chp/small_chp.py | 16 ++++++++++------ .../data/datasets/electrical_neighbours.py | 11 ++++------- src/egon/data/datasets/scenario_capacities.py | 10 ++-------- .../datasets/scenario_parameters/__init__.py | 6 ++---- .../data/datasets/storages/pumped_hydro.py | 1 - 7 files changed, 32 insertions(+), 37 deletions(-) diff --git a/src/egon/data/datasets/chp/__init__.py b/src/egon/data/datasets/chp/__init__.py index 9749d7286..13ec477eb 100644 --- a/src/egon/data/datasets/chp/__init__.py +++ b/src/egon/data/datasets/chp/__init__.py @@ -8,7 +8,6 @@ from sqlalchemy import Boolean, Column, Float, Integer, Sequence, String from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import sessionmaker import geopandas as gpd import pandas as pd @@ -130,7 +129,8 @@ def nearest( return value -def assign_heat_bus(scenario="eGon2035"): +@db.session_scoped +def assign_heat_bus(scenario="eGon2035", session=None): """Selects heat_bus for chps used in district heating. Parameters @@ -138,6 +138,10 @@ def assign_heat_bus(scenario="eGon2035"): scenario : str, optional Name of the corresponding scenario. The default is 'eGon2035'. + session : sqlalchemy.orm.Session + The session used in this function. Can be ignored because it will be + supplied automatically. + Returns ------- None. @@ -192,7 +196,6 @@ def assign_heat_bus(scenario="eGon2035"): ) # Insert district heating CHP with heat_bus_id - session = sessionmaker(bind=db.engine())() for i, row in chp.iterrows(): if row.carrier != "biomass": entry = EgonChp( @@ -226,10 +229,10 @@ def assign_heat_bus(scenario="eGon2035"): geom=f"SRID=4326;POINT({row.geom.x} {row.geom.y})", ) session.add(entry) - session.commit() -def insert_biomass_chp(scenario): +@db.session_scoped +def insert_biomass_chp(scenario, session=None): """Insert biomass chp plants of future scenario Parameters @@ -237,6 +240,10 @@ def insert_biomass_chp(scenario): scenario : str Name of scenario. + session : sqlalchemy.orm.Session + The session used in this function. Can be ignored because it will be + supplied automatically. + Returns ------- None. @@ -283,7 +290,6 @@ def insert_biomass_chp(scenario): mastr_loc = assign_use_case(mastr_loc, cfg["sources"]) # Insert entries with location - session = sessionmaker(bind=db.engine())() for i, row in mastr_loc.iterrows(): if row.ThermischeNutzleistung > 0: entry = EgonChp( @@ -303,7 +309,6 @@ def insert_biomass_chp(scenario): geom=f"SRID=4326;POINT({row.Laengengrad} {row.Breitengrad})", ) session.add(entry) - session.commit() def insert_chp_egon2035(): diff --git a/src/egon/data/datasets/chp/match_nep.py b/src/egon/data/datasets/chp/match_nep.py index 341ed24d2..b5258e584 100755 --- a/src/egon/data/datasets/chp/match_nep.py +++ b/src/egon/data/datasets/chp/match_nep.py @@ -2,7 +2,6 @@ The module containing all code dealing with large chp from NEP list. """ -from sqlalchemy.orm import sessionmaker import geopandas import pandas as pd @@ -312,7 +311,8 @@ def match_nep_chp( ################################################### Final table ################################################### -def insert_large_chp(sources, target, EgonChp): +@db.session_scoped +def insert_large_chp(sources, target, EgonChp, session=None): # Select CHP from NEP list chp_NEP = select_chp_from_nep(sources) @@ -516,7 +516,6 @@ def insert_large_chp(sources, target, EgonChp): ) # Insert into target table - session = sessionmaker(bind=db.engine())() for i, row in insert_chp.iterrows(): entry = EgonChp( sources={ @@ -536,6 +535,5 @@ def insert_large_chp(sources, target, EgonChp): geom=f"SRID=4326;POINT({row.geometry.x} {row.geometry.y})", ) session.add(entry) - session.commit() return MaStR_konv diff --git a/src/egon/data/datasets/chp/small_chp.py b/src/egon/data/datasets/chp/small_chp.py index 486f55e2a..eea5175c2 100755 --- a/src/egon/data/datasets/chp/small_chp.py +++ b/src/egon/data/datasets/chp/small_chp.py @@ -1,7 +1,6 @@ """ The module containing all code dealing with chp < 10MW. """ -from sqlalchemy.orm import sessionmaker import geopandas as gpd import numpy as np @@ -13,7 +12,8 @@ ) -def insert_mastr_chp(mastr_chp, EgonChp): +@db.session_scoped +def insert_mastr_chp(mastr_chp, EgonChp, session=None): """Insert MaStR data from exising CHPs into database table Parameters @@ -22,6 +22,9 @@ def insert_mastr_chp(mastr_chp, EgonChp): List of existing CHPs in MaStR. EgonChp : class Class definition of daabase table for CHPs + session : sqlalchemy.orm.Session + The session inside which this function operates. Ignore this, because + it will be supplied automatically. Returns ------- @@ -29,7 +32,6 @@ def insert_mastr_chp(mastr_chp, EgonChp): """ - session = sessionmaker(bind=db.engine())() for i, row in mastr_chp.iterrows(): entry = EgonChp( sources={ @@ -49,7 +51,6 @@ def insert_mastr_chp(mastr_chp, EgonChp): geom=f"SRID=4326;POINT({row.geometry.x} {row.geometry.y})", ) session.add(entry) - session.commit() def existing_chp_smaller_10mw(sources, MaStR_konv, EgonChp): @@ -100,6 +101,7 @@ def existing_chp_smaller_10mw(sources, MaStR_konv, EgonChp): insert_mastr_chp(mastr_chp, EgonChp) +@db.session_scoped def extension_to_areas( areas, additional_capacity, @@ -108,6 +110,7 @@ def extension_to_areas( EgonChp, district_heating=True, scenario="eGon2035", + session=None, ): """Builds new CHPs on potential industry or district heating areas. @@ -151,14 +154,15 @@ def extension_to_areas( ORM-class definition of CHP database-table. district_heating : boolean, optional State if the areas are district heating areas. The default is True. + session : sqlalchemy.orm.Session + The session inside which this function operates. Ignore this, because + it will be supplied automatically. Returns ------- None. """ - session = sessionmaker(bind=db.engine())() - np.random.seed(seed=config.settings()["egon-data"]["--random-seed"]) # Add new CHP as long as the additional capacity is not reached diff --git a/src/egon/data/datasets/electrical_neighbours.py b/src/egon/data/datasets/electrical_neighbours.py index b1fa1ccb3..39d4c838a 100755 --- a/src/egon/data/datasets/electrical_neighbours.py +++ b/src/egon/data/datasets/electrical_neighbours.py @@ -1022,7 +1022,8 @@ def insert_generators(capacities): session.commit() -def insert_storage(capacities): +@db.session_scoped +def insert_storage(capacities, session=None): """Insert storage units for foreign countries based on TYNDP-data Parameters @@ -1093,7 +1094,6 @@ def insert_storage(capacities): ] = parameters_pumped_hydro[x] # insert data - session = sessionmaker(bind=db.engine())() for i, row in store.iterrows(): entry = etrago.EgonPfHvStorage( scn_name="eGon2035", @@ -1153,7 +1153,8 @@ def tyndp_generation(): insert_storage(capacities) -def tyndp_demand(): +@db.session_scoped +def tyndp_demand(session=None): """Copy load timeseries data from TYNDP 2020. According to NEP 2021, the data for 2030 and 2040 is interpolated linearly. @@ -1182,10 +1183,6 @@ def tyndp_demand(): """ ) - # Connect to database - engine = db.engine() - session = sessionmaker(bind=engine)() - nodes = [ "AT00", "BE00", diff --git a/src/egon/data/datasets/scenario_capacities.py b/src/egon/data/datasets/scenario_capacities.py index 9067c36e5..15ddb00de 100755 --- a/src/egon/data/datasets/scenario_capacities.py +++ b/src/egon/data/datasets/scenario_capacities.py @@ -6,7 +6,6 @@ from sqlalchemy import Column, Float, Integer, String from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import sessionmaker import numpy as np import pandas as pd import yaml @@ -494,7 +493,8 @@ def insert_nep_list_powerplants(export=True): return kw_liste_nep -def district_heating_input(): +@db.session_scoped +def district_heating_input(session=None): """Imports data for district heating networks in Germany Returns @@ -523,10 +523,6 @@ def district_heating_input(): pd.IndexSlice[:, "Fernwaermeerzeugung"], "Wert" ] *= population_share() - # Connect to database - engine = db.engine() - session = sessionmaker(bind=engine)() - # insert heatpumps and resistive heater as link for c in ["Grosswaermepumpe", "Elektrodenheizkessel"]: entry = EgonScenarioCapacities( @@ -562,8 +558,6 @@ def district_heating_input(): session.add(entry) - session.commit() - def insert_data_nep(): """Overall function for importing scenario input data for eGon2035 scenario diff --git a/src/egon/data/datasets/scenario_parameters/__init__.py b/src/egon/data/datasets/scenario_parameters/__init__.py index 19ff71316..a56c0f458 100755 --- a/src/egon/data/datasets/scenario_parameters/__init__.py +++ b/src/egon/data/datasets/scenario_parameters/__init__.py @@ -8,7 +8,6 @@ from sqlalchemy import VARCHAR, Column, String from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import sessionmaker import pandas as pd from egon.data import db @@ -45,7 +44,8 @@ def create_table(): EgonScenario.__table__.create(bind=engine, checkfirst=True) -def insert_scenarios(): +@db.session_scoped +def insert_scenarios(session=None): """Insert scenarios and their parameters to scenario table Returns @@ -56,8 +56,6 @@ def insert_scenarios(): db.execute_sql("DELETE FROM scenario.egon_scenario_parameters CASCADE;") - session = sessionmaker(bind=db.engine())() - # Scenario eGon2035 egon2035 = EgonScenario(name="eGon2035") diff --git a/src/egon/data/datasets/storages/pumped_hydro.py b/src/egon/data/datasets/storages/pumped_hydro.py index cae9eea88..75a451f92 100755 --- a/src/egon/data/datasets/storages/pumped_hydro.py +++ b/src/egon/data/datasets/storages/pumped_hydro.py @@ -4,7 +4,6 @@ """ from geopy.geocoders import Nominatim -from sqlalchemy.orm import sessionmaker import geopandas as gpd import pandas as pd From ba41e727983de4a9d70a6a997ce1e0cbcf71e1a9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stephan=20G=C3=BCnther?= Date: Thu, 10 Nov 2022 03:50:56 +0100 Subject: [PATCH 14/18] Use `session` instead of `db.execute_sql` That way the `DELETE` statement is guaranteed to interact correctly with the rest of the database interactions in the function, which is important, now that the whole function uses a single session. --- src/egon/data/datasets/scenario_parameters/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/egon/data/datasets/scenario_parameters/__init__.py b/src/egon/data/datasets/scenario_parameters/__init__.py index a56c0f458..5ab06b931 100755 --- a/src/egon/data/datasets/scenario_parameters/__init__.py +++ b/src/egon/data/datasets/scenario_parameters/__init__.py @@ -54,7 +54,8 @@ def insert_scenarios(session=None): """ - db.execute_sql("DELETE FROM scenario.egon_scenario_parameters CASCADE;") + session.execute("DELETE FROM scenario.egon_scenario_parameters CASCADE;") + session.commit() # Scenario eGon2035 egon2035 = EgonScenario(name="eGon2035") From d4faeabcabbb0b7eeda0bc74f17fa058f677d304 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stephan=20G=C3=BCnther?= Date: Thu, 10 Nov 2022 03:56:31 +0100 Subject: [PATCH 15/18] Manually close `session`s These sessions are not opened via a context manager and thus have to be closed manually in order to not potentially leak connections. It might not be necessary but it's best to be on the safe side. Also, these are `session` usages which I couldn't somehow refactor to working with a context manager, so this is the minimal effort to stay on the safe side w.r.t. connection leakage. --- src/egon/data/datasets/electrical_neighbours.py | 2 ++ src/egon/data/datasets/power_plants/__init__.py | 4 ++++ src/egon/data/datasets/storages/__init__.py | 3 +++ 3 files changed, 9 insertions(+) diff --git a/src/egon/data/datasets/electrical_neighbours.py b/src/egon/data/datasets/electrical_neighbours.py index 39d4c838a..1a6376e63 100755 --- a/src/egon/data/datasets/electrical_neighbours.py +++ b/src/egon/data/datasets/electrical_neighbours.py @@ -961,6 +961,7 @@ def insert_generators(capacities): session.add(entry) session.commit() + session.close() # assign generators time-series data renew_carriers_2035 = ["wind_onshore", "wind_offshore", "solar"] @@ -1020,6 +1021,7 @@ def insert_generators(capacities): session.add(entry) session.commit() + session.close() @db.session_scoped diff --git a/src/egon/data/datasets/power_plants/__init__.py b/src/egon/data/datasets/power_plants/__init__.py index ab7cd4d83..a45afb5e1 100755 --- a/src/egon/data/datasets/power_plants/__init__.py +++ b/src/egon/data/datasets/power_plants/__init__.py @@ -289,6 +289,7 @@ def insert_biomass_plants(scenario): session.add(entry) session.commit() + session.close() def insert_hydro_plants(scenario): @@ -376,6 +377,7 @@ def insert_hydro_plants(scenario): session.add(entry) session.commit() + session.close() def assign_voltage_level(mastr_loc, cfg): @@ -723,6 +725,7 @@ def allocate_conventional_non_chp_power_plants(): ) session.add(entry) session.commit() + session.close() def allocate_other_power_plants(): @@ -881,3 +884,4 @@ def allocate_other_power_plants(): ) session.add(entry) session.commit() + session.close() diff --git a/src/egon/data/datasets/storages/__init__.py b/src/egon/data/datasets/storages/__init__.py index d6cb666c2..c95a6e589 100755 --- a/src/egon/data/datasets/storages/__init__.py +++ b/src/egon/data/datasets/storages/__init__.py @@ -247,6 +247,7 @@ def allocate_pumped_hydro_eGon2035(export=True): ) session.add(entry) session.commit() + session.close() else: return power_plants @@ -317,6 +318,7 @@ def allocate_pumped_hydro_eGon100RE(): ) session.add(entry) session.commit() + session.close() def home_batteries_per_scenario(scenario): @@ -410,6 +412,7 @@ def home_batteries_per_scenario(scenario): ) session.add(entry) session.commit() + session.close() def allocate_pv_home_batteries(): From 1f4dbc36a78ab25bb4d2cf630ba000a9d18a2c07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stephan=20G=C3=BCnther?= Date: Thu, 10 Nov 2022 04:11:48 +0100 Subject: [PATCH 16/18] Merge too finely divided transactions These `with` blocks created two transactions inside functions which where wrapped in retrying error handlers. This could potentially lead to always failing retries because committed transactions can not be rolled back, so errors in the second transactions trigger retries on unchangeable state. In order to prevent this, it's best to have the whole function be one transaction. --- .../emobility/motorized_individual_travel/model_timeseries.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/egon/data/datasets/emobility/motorized_individual_travel/model_timeseries.py b/src/egon/data/datasets/emobility/motorized_individual_travel/model_timeseries.py index e99b797c0..90938674c 100644 --- a/src/egon/data/datasets/emobility/motorized_individual_travel/model_timeseries.py +++ b/src/egon/data/datasets/emobility/motorized_individual_travel/model_timeseries.py @@ -633,7 +633,6 @@ def write_link(scenario_name: str) -> None: terrain_factor=1, ) ) - with db.session_scope() as session: session.add( EgonPfHvLinkTimeseries( scn_name=scenario_name, @@ -672,7 +671,6 @@ def write_store(scenario_name: str) -> None: standing_loss=0, ) ) - with db.session_scope() as session: session.add( EgonPfHvStoreTimeseries( scn_name=scenario_name, @@ -699,7 +697,6 @@ def write_load( sign=-1, ) ) - with db.session_scope() as session: session.add( EgonPfHvLoadTimeseries( scn_name=scenario_name, From 482466e255fdd5c2f84f4488aae3d43c1a444c51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stephan=20G=C3=BCnther?= Date: Thu, 10 Nov 2022 02:45:05 +0100 Subject: [PATCH 17/18] Squelch `flake8` complaints --- .../data/datasets/electrical_neighbours.py | 3 +- .../data/datasets/power_plants/__init__.py | 9 +++-- .../power_plants/pv_ground_mounted.py | 38 +++++++++---------- src/egon/data/datasets/scenario_capacities.py | 14 ++++--- .../datasets/scenario_parameters/__init__.py | 5 ++- src/egon/data/datasets/storages/__init__.py | 4 +- 6 files changed, 40 insertions(+), 33 deletions(-) diff --git a/src/egon/data/datasets/electrical_neighbours.py b/src/egon/data/datasets/electrical_neighbours.py index 1a6376e63..6d4b3ccc4 100755 --- a/src/egon/data/datasets/electrical_neighbours.py +++ b/src/egon/data/datasets/electrical_neighbours.py @@ -1055,7 +1055,8 @@ def insert_storage(capacities, session=None): """ ) - # Add missing information suitable for eTraGo selected from scenario_parameter table + # Add missing information suitable for eTraGo selected from + # scenario_parameter table parameters_pumped_hydro = scenario_parameters.electricity("eGon2035")[ "efficiency" ]["pumped_hydro"] diff --git a/src/egon/data/datasets/power_plants/__init__.py b/src/egon/data/datasets/power_plants/__init__.py index a45afb5e1..f66dc3e01 100755 --- a/src/egon/data/datasets/power_plants/__init__.py +++ b/src/egon/data/datasets/power_plants/__init__.py @@ -3,7 +3,6 @@ from geoalchemy2 import Geometry from sqlalchemy import ( BigInteger, - Boolean, Column, Float, Integer, @@ -19,6 +18,7 @@ from egon.data import db from egon.data.datasets import Dataset +from egon.data.datasets.power_plants import assign_weather_data from egon.data.datasets.power_plants.conventional import ( match_nep_no_chp, select_nep_power_plants, @@ -26,7 +26,6 @@ ) from egon.data.datasets.power_plants.pv_rooftop import pv_rooftop_per_mv_grid import egon.data.config -import egon.data.datasets.power_plants.assign_weather_data as assign_weather_data import egon.data.datasets.power_plants.pv_ground_mounted as pv_ground_mounted import egon.data.datasets.power_plants.wind_farms as wind_onshore import egon.data.datasets.power_plants.wind_offshore as wind_offshore @@ -799,10 +798,12 @@ def allocate_other_power_plants(): # Select power plants representing carrier 'others' from MaStR files mastr_sludge = pd.read_csv(cfg["sources"]["mastr_gsgk"]).query( - """EinheitBetriebsstatus=='InBetrieb'and Energietraeger=='Klaerschlamm'""" + "EinheitBetriebsstatus=='InBetrieb' and Energietraeger=='Klaerschlamm'" ) mastr_geothermal = pd.read_csv(cfg["sources"]["mastr_gsgk"]).query( - """EinheitBetriebsstatus=='InBetrieb' and Energietraeger=='Geothermie' and Technologie == 'ORCOrganicRankineCycleAnlage'""" + "EinheitBetriebsstatus=='InBetrieb'" + " and Energietraeger=='Geothermie'" + " and Technologie == 'ORCOrganicRankineCycleAnlage'" ) mastr_sg = mastr_sludge.append(mastr_geothermal) diff --git a/src/egon/data/datasets/power_plants/pv_ground_mounted.py b/src/egon/data/datasets/power_plants/pv_ground_mounted.py index 042b685c2..fa46d09c1 100644 --- a/src/egon/data/datasets/power_plants/pv_ground_mounted.py +++ b/src/egon/data/datasets/power_plants/pv_ground_mounted.py @@ -1,8 +1,6 @@ -from shapely import wkb import geopandas as gpd import numpy as np import pandas as pd -import psycopg2 from egon.data import db @@ -36,7 +34,7 @@ def mastr_existing_pv(path, pow_per_area): ) df = df[df["Lage"] == "Freiflaeche"] - ### examine data concerning geographical locations and drop NaNs + # examine data concerning geographical locations and drop NaNs x1 = df["Laengengrad"].isnull().sum() x2 = df["Breitengrad"].isnull().sum() print(" ") @@ -104,7 +102,7 @@ def mastr_existing_pv(path, pow_per_area): v_l.loc[index] = np.NaN mastr["voltage_level"] = v_l - ### examine data concerning voltage level + # examine data concerning voltage level x1 = mastr["voltage_level"].isnull().sum() print(" ") print("Examination of voltage levels in MaStR data set:") @@ -127,7 +125,7 @@ def mastr_existing_pv(path, pow_per_area): x3 = len(index_names) mastr.drop(index_names, inplace=True) - ### further examination + # further examination print("Number of PVs in low voltage level: " + str(x2)) print("Number of PVs in LVMV level: " + str(x3)) print( @@ -173,7 +171,7 @@ def potential_areas(con, join_buffer): # roads and railways - ### counting variable for examination + # counting variable for examination before = len(potentials_rora) # get small areas and create buffer for joining around them @@ -199,7 +197,7 @@ def potential_areas(con, join_buffer): join = gpd.GeoSeries(data=[x, y]) potentials_rora["geom"].loc[index_potentials] = join.unary_union - ### examination of joining of areas + # examination of joining of areas count_small = len(small_buffers) count_join = len(o) count_delete = count_small - count_join @@ -216,7 +214,7 @@ def potential_areas(con, join_buffer): # agriculture - ### counting variable for examination + # counting variable for examination before = len(potentials_agri) # get small areas and create buffer for joining around them @@ -242,7 +240,7 @@ def potential_areas(con, join_buffer): join = gpd.GeoSeries(data=[x, y]) potentials_agri["geom"].loc[index_potentials] = join.unary_union - ### examination of joining of areas + # examination of joining of areas count_small = len(small_buffers) count_join = len(o) count_delete = count_small - count_join @@ -261,7 +259,7 @@ def potential_areas(con, join_buffer): # check intersection of potential areas - ### counting variable + # counting variable agri_vorher = len(potentials_agri) # if areas intersect, keep road & railway potential areas and drop agricultural ones @@ -272,7 +270,7 @@ def potential_areas(con, join_buffer): index = o.iloc[i] potentials_agri.drop([index], inplace=True) - ### examination of intersection of areas + # examination of intersection of areas print(" ") print("Review function to avoid intersection of potential areas:") print("Initial length potentials_agri: " + str(agri_vorher)) @@ -323,7 +321,7 @@ def select_pot_areas(mastr, potentials_pot): # get voltage level of existing PVs index_pv = o.index[i] pot_sel["voltage_level"] = mastr["voltage_level"].loc[index_pv] - pot_sel = pot_sel[pot_sel["selected"] == True] + pot_sel = pot_sel[pot_sel["selected"] is True] pot_sel.drop("selected", axis=1, inplace=True) # drop selected existing pv parks from mastr @@ -471,7 +469,7 @@ def build_additional_pv(potentials, pv, pow_per_area, con): overlay = gpd.sjoin(centroids, distr) - ### examine potential area per grid district + # examine potential area per grid district anz = len(overlay) anz_distr = len(overlay["index_right"].unique()) size = 137500 # m2 Fläche für > 5,5 MW: (5500 kW / (0,04 kW/m2)) @@ -902,7 +900,7 @@ def run_methodology( if len(distr_i) > 0: distr_i["nuts"] = target[target["nuts"] == i]["nuts"].iloc[0] - ### examination of built PV parks per state + # examination of built PV parks per state rora_i_mv = rora_i[rora_i["voltage_level"] == 5] rora_i_hv = rora_i[rora_i["voltage_level"] == 4] agri_i_mv = agri_i[agri_i["voltage_level"] == 5] @@ -983,8 +981,8 @@ def run_methodology( con, ) - ### create map to show distribution of installed capacity - if show_map == True: + # create map to show distribution of installed capacity + if show_map: # 1) eGon2035 @@ -1029,7 +1027,7 @@ def run_methodology( cmap="magma_r", legend=True, legend_kwds={ - "label": f"Installed capacity in MW", + "label": "Installed capacity in MW", "orientation": "vertical", }, ) @@ -1080,7 +1078,7 @@ def run_methodology( cmap="magma_r", legend=True, legend_kwds={ - "label": f"Installed capacity in MW", + "label": "Installed capacity in MW", "orientation": "vertical", }, ) @@ -1151,7 +1149,7 @@ def insert_pv_parks( sql = "SELECT MAX(id) FROM supply.egon_power_plants" max_id = pd.read_sql(sql, con) max_id = max_id["max"].iat[0] - if max_id == None: + if max_id is None: max_id = 1 pv_park_id = max_id + 1 @@ -1214,7 +1212,7 @@ def insert_pv_parks( show_map=False, ) - ### examination of results + # examination of results if len(pv_per_distr) > 0: pv_per_distr_mv = pv_per_distr[pv_per_distr["voltage_level"] == 5] pv_per_distr_hv = pv_per_distr[pv_per_distr["voltage_level"] == 4] diff --git a/src/egon/data/datasets/scenario_capacities.py b/src/egon/data/datasets/scenario_capacities.py index 15ddb00de..131ce2b80 100755 --- a/src/egon/data/datasets/scenario_capacities.py +++ b/src/egon/data/datasets/scenario_capacities.py @@ -15,7 +15,7 @@ from egon.data.datasets import Dataset import egon.data.config -### will be later imported from another file ### +# will be later imported from another file # Base = declarative_base() @@ -168,7 +168,8 @@ def insert_capacities_per_federal_state_nep(): # List federal state with an assigned wind offshore capacity index_list = list(df_windoff_fs.index.values) - # Overwrite capacities in df_windoff with more accurate values from df_windoff_fs + # Overwrite capacities in df_windoff with more accurate values from + # df_windoff_fs for state in index_list: @@ -193,7 +194,7 @@ def insert_capacities_per_federal_state_nep(): "Haushaltswaermepumpen": "residential_rural_heat_pump", "KWK < 10 MW": "small_chp", } - #'Elektromobilitaet gesamt': 'transport', + # 'Elektromobilitaet gesamt': 'transport', # 'Elektromobilitaet privat': 'transport'} # nuts1 to federal state in Germany @@ -265,7 +266,8 @@ def insert_capacities_per_federal_state_nep(): # Filter by carrier updated = insert_data[insert_data["carrier"].isin(carriers)] - # Merge to replace capacities for carriers "oil", "other_non_renewable" and "pumped_hydro" + # Merge to replace capacities for carriers "oil", + # "other_non_renewable" and "pumped_hydro" updated = ( updated.merge(capacities_list, on=["carrier", "nuts"], how="left") .fillna(0) @@ -702,7 +704,9 @@ def eGon100_capacities(): "OCGT": "gas", "rural_ground_heat_pump": "residential_rural_heat_pump", "urban_central_air_heat_pump": "urban_central_heat_pump", - "urban_central_solar_thermal": "urban_central_solar_thermal_collector", + "urban_central_solar_thermal": ( + "urban_central_solar_thermal_collector" + ), }, inplace=True, ) diff --git a/src/egon/data/datasets/scenario_parameters/__init__.py b/src/egon/data/datasets/scenario_parameters/__init__.py index 5ab06b931..e99d62946 100755 --- a/src/egon/data/datasets/scenario_parameters/__init__.py +++ b/src/egon/data/datasets/scenario_parameters/__init__.py @@ -174,7 +174,10 @@ def download_pypsa_technology_data(): sources = egon.data.config.datasets()["pypsa-technology-data"]["sources"][ "zenodo" ] - url = f"""https://zenodo.org/record/{sources['deposit_id']}/files/{sources['file']}""" + url = ( + f"https://zenodo.org/record/{sources['deposit_id']}/files/" + f"{sources['file']}" + ) target_file = egon.data.config.datasets()["pypsa-technology-data"][ "targets" ]["file"] diff --git a/src/egon/data/datasets/storages/__init__.py b/src/egon/data/datasets/storages/__init__.py index c95a6e589..7cfd1115f 100755 --- a/src/egon/data/datasets/storages/__init__.py +++ b/src/egon/data/datasets/storages/__init__.py @@ -293,8 +293,8 @@ def allocate_pumped_hydro_eGon100RE(): else: raise ValueError(f"'{boundary}' is not a valid dataset boundary.") - # Get allocation of pumped_hydro plants in eGon2035 scenario as the reference - # for the distribution in eGon100RE scenario + # Get allocation of pumped_hydro plants in eGon2035 scenario as the + # reference for the distribution in eGon100RE scenario allocation = allocate_pumped_hydro_eGon2035(export=False) scaling_factor = capacity_phes / allocation.el_capacity.sum() From b075f0f1e3a580702129a2d5aee3dc0275d738c1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Stephan=20G=C3=BCnther?= Date: Tue, 8 Nov 2022 23:59:23 +0100 Subject: [PATCH 18/18] Run `black` and `isort` --- src/egon/data/datasets/DSM_cts_ind.py | 1 - src/egon/data/datasets/chp/__init__.py | 7 +++-- .../data/datasets/electrical_neighbours.py | 17 ++++++----- .../data/datasets/power_plants/__init__.py | 9 +----- src/egon/data/datasets/renewable_feedin.py | 2 +- src/egon/data/datasets/scenario_capacities.py | 20 +++++++------ src/egon/data/datasets/storages/__init__.py | 28 +++++++++---------- 7 files changed, 41 insertions(+), 43 deletions(-) diff --git a/src/egon/data/datasets/DSM_cts_ind.py b/src/egon/data/datasets/DSM_cts_ind.py index e1916a28d..88af8459b 100644 --- a/src/egon/data/datasets/DSM_cts_ind.py +++ b/src/egon/data/datasets/DSM_cts_ind.py @@ -783,7 +783,6 @@ def dsm_cts_ind( ind_cool_vent_share=0.039, ind_vent_share=0.017, ): - """ Execute methodology to create and implement components for DSM considering a) CTS per osm-area: combined potentials of cooling, ventilation and air conditioning diff --git a/src/egon/data/datasets/chp/__init__.py b/src/egon/data/datasets/chp/__init__.py index 13ec477eb..3f35bbbe4 100644 --- a/src/egon/data/datasets/chp/__init__.py +++ b/src/egon/data/datasets/chp/__init__.py @@ -3,6 +3,8 @@ (CHP) plants. """ +from pathlib import Path + from geoalchemy2 import Geometry from shapely.ops import nearest_points from sqlalchemy import Boolean, Column, Float, Integer, Sequence, String @@ -10,6 +12,7 @@ from sqlalchemy.ext.declarative import declarative_base import geopandas as gpd import pandas as pd +import pypsa from egon.data import config, db from egon.data.datasets import Dataset @@ -18,6 +21,7 @@ assign_use_case, existing_chp_smaller_10mw, extension_per_federal_state, + extension_to_areas, select_target, ) from egon.data.datasets.power_plants import ( @@ -26,9 +30,6 @@ filter_mastr_geometry, scale_prox2now, ) -import pypsa -from egon.data.datasets.chp.small_chp import extension_to_areas -from pathlib import Path Base = declarative_base() diff --git a/src/egon/data/datasets/electrical_neighbours.py b/src/egon/data/datasets/electrical_neighbours.py index 6d4b3ccc4..735782f50 100755 --- a/src/egon/data/datasets/electrical_neighbours.py +++ b/src/egon/data/datasets/electrical_neighbours.py @@ -3,17 +3,17 @@ import zipfile -import geopandas as gpd -import pandas as pd from shapely.geometry import LineString from sqlalchemy.orm import sessionmaker +import geopandas as gpd +import pandas as pd -import egon.data.datasets.etrago_setup as etrago -import egon.data.datasets.scenario_parameters.parameters as scenario_parameters from egon.data import config, db from egon.data.datasets import Dataset from egon.data.datasets.fill_etrago_gen import add_marginal_costs from egon.data.datasets.scenario_parameters import get_sector_parameters +import egon.data.datasets.etrago_setup as etrago +import egon.data.datasets.scenario_parameters.parameters as scenario_parameters class ElectricalNeighbours(Dataset): @@ -1082,9 +1082,12 @@ def insert_storage(capacities, session=None): ) # Add columns for additional parameters to df - store["dispatch"], store["store"], store["standing_loss"], store[ - "max_hours" - ] = (None, None, None, None) + ( + store["dispatch"], + store["store"], + store["standing_loss"], + store["max_hours"], + ) = (None, None, None, None) # Insert carrier specific parameters diff --git a/src/egon/data/datasets/power_plants/__init__.py b/src/egon/data/datasets/power_plants/__init__.py index f66dc3e01..e2f640c48 100755 --- a/src/egon/data/datasets/power_plants/__init__.py +++ b/src/egon/data/datasets/power_plants/__init__.py @@ -1,14 +1,7 @@ """The central module containing all code dealing with power plant data. """ from geoalchemy2 import Geometry -from sqlalchemy import ( - BigInteger, - Column, - Float, - Integer, - Sequence, - String, -) +from sqlalchemy import BigInteger, Column, Float, Integer, Sequence, String from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker diff --git a/src/egon/data/datasets/renewable_feedin.py b/src/egon/data/datasets/renewable_feedin.py index 55e422d80..0bc5f0114 100644 --- a/src/egon/data/datasets/renewable_feedin.py +++ b/src/egon/data/datasets/renewable_feedin.py @@ -491,7 +491,7 @@ def heat_pump_cop(): # Calculate coefficient of performance for air sourced heat pumps # according to Brown et. al - cop = 6.81 - 0.121 * delta_t + 0.00063 * delta_t ** 2 + cop = 6.81 - 0.121 * delta_t + 0.00063 * delta_t**2 df = pd.DataFrame( index=temperature.to_pandas().index, diff --git a/src/egon/data/datasets/scenario_capacities.py b/src/egon/data/datasets/scenario_capacities.py index 131ce2b80..240af9ecc 100755 --- a/src/egon/data/datasets/scenario_capacities.py +++ b/src/egon/data/datasets/scenario_capacities.py @@ -84,7 +84,6 @@ def create_table(): def nuts_mapping(): - nuts_mapping = { "BW": "DE1", "NW": "DEA", @@ -157,13 +156,15 @@ def insert_capacities_per_federal_state_nep(): df_windoff = pd.read_excel( target_file, sheet_name="WInd_Offshore_NEP", - ).dropna(subset=['Bundesland', 'Netzverknuepfungspunkt']) + ).dropna(subset=["Bundesland", "Netzverknuepfungspunkt"]) # Remove trailing whitespace from column Bundesland - df_windoff['Bundesland']= df_windoff['Bundesland'].str.strip() + df_windoff["Bundesland"] = df_windoff["Bundesland"].str.strip() # Group and sum capacities per federal state - df_windoff_fs = df_windoff[['Bundesland', 'C 2035']].groupby(['Bundesland']).sum() + df_windoff_fs = ( + df_windoff[["Bundesland", "C 2035"]].groupby(["Bundesland"]).sum() + ) # List federal state with an assigned wind offshore capacity index_list = list(df_windoff_fs.index.values) @@ -172,9 +173,9 @@ def insert_capacities_per_federal_state_nep(): # df_windoff_fs for state in index_list: - - df.at['Wind offshore', state] = df_windoff_fs.at[state, 'C 2035']/1000 - + df.at["Wind offshore", state] = ( + df_windoff_fs.at[state, "C 2035"] / 1000 + ) # sort NEP-carriers: rename_carrier = { @@ -216,7 +217,6 @@ def insert_capacities_per_federal_state_nep(): ] for bl in map_nuts.index: - data = pd.DataFrame(df[bl]) # if distribution to federal states is not provided, @@ -687,7 +687,9 @@ def eGon100_capacities(): df.p_nom[f"residential_{merge_carrier}"] + df.p_nom[f"services_{merge_carrier}"] ), - "component": df.component[f"residential_{merge_carrier}"], + "component": df.component[ + f"residential_{merge_carrier}" + ], }, ) ) diff --git a/src/egon/data/datasets/storages/__init__.py b/src/egon/data/datasets/storages/__init__.py index 7cfd1115f..269069a37 100755 --- a/src/egon/data/datasets/storages/__init__.py +++ b/src/egon/data/datasets/storages/__init__.py @@ -1,25 +1,25 @@ """The central module containing all code dealing with power plant data. """ -from geoalchemy2 import Geometry from pathlib import Path + +from geoalchemy2 import Geometry from sqlalchemy import BigInteger, Column, Float, Integer, Sequence, String from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker -from egon.data.datasets.storages.pumped_hydro import ( - select_mastr_pumped_hydro, - select_nep_pumped_hydro, - match_storage_units, - get_location, - apply_voltage_level_thresholds, -) -from egon.data.datasets.power_plants import assign_voltage_level import geopandas as gpd import pandas as pd -from egon.data import db, config +from egon.data import config, db from egon.data.datasets import Dataset - +from egon.data.datasets.power_plants import assign_voltage_level +from egon.data.datasets.storages.pumped_hydro import ( + apply_voltage_level_thresholds, + get_location, + match_storage_units, + select_mastr_pumped_hydro, + select_nep_pumped_hydro, +) Base = declarative_base() @@ -353,9 +353,9 @@ def home_batteries_per_scenario(scenario): sheet_name="1.Entwurf_NEP2035_V2021", index_col="Unnamed: 0", ) - - # Select target value in MW - target = capacities_nep.Summe["PV-Batteriespeicher"]*1000 + + # Select target value in MW + target = capacities_nep.Summe["PV-Batteriespeicher"] * 1000 else: target = db.select_dataframe(