diff --git a/config/task_generator/manual_task_generator.properties b/config/task_generator/manual_task_generator.properties index 7690033..cb519a2 100644 --- a/config/task_generator/manual_task_generator.properties +++ b/config/task_generator/manual_task_generator.properties @@ -27,3 +27,4 @@ UPLOAD_TO_MINIO = True SEND_MERGE_REPORT = True PRE_TEMP_FIXES = True POST_TEMP_FIXES = True +FORCE_OUTAGE_FIX = False diff --git a/config/task_generator/task_generator.properties b/config/task_generator/task_generator.properties index 874ef01..70d5d95 100644 --- a/config/task_generator/task_generator.properties +++ b/config/task_generator/task_generator.properties @@ -21,12 +21,12 @@ RMM_EXCLUDED_TSO = RMM_LOCAL_IMPORT = LITGRID # MERGE CONFIGURATION FLAGS -RUN_REPLACEMENT_RMM = True +RUN_REPLACEMENT_RMM = False RUN_REPLACEMENT_CGM = False -RUN_REPLACEMENT_LOCAL = True +RUN_REPLACEMENT_LOCAL = False RUN_SCALING_RMM = False RUN_SCALING_CGM = False -UPLOAD_TO_OPDM_RMM = False +UPLOAD_TO_OPDM_RMM = True UPLOAD_TO_OPDM_CGM = True UPLOAD_TO_MINIO_RMM = True UPLOAD_TO_MINIO_CGM = True @@ -34,3 +34,4 @@ SEND_MERGE_REPORT_RMM = True SEND_MERGE_REPORT_CGM = True PRE_TEMP_FIXES = True POST_TEMP_FIXES = True +FORCE_OUTAGE_FIX = False \ No newline at end of file diff --git a/emf/common/integrations/object_storage/models.py b/emf/common/integrations/object_storage/models.py index d236b20..07f55ad 100644 --- a/emf/common/integrations/object_storage/models.py +++ b/emf/common/integrations/object_storage/models.py @@ -76,6 +76,9 @@ def query_data(metadata_query: dict, for num, item in enumerate(content_list): content_list[num] = get_content(item) + # Delete scroll after retrieving data + object_storage.elastic_service.client.clear_scroll(scroll_id=scroll_id) + return content_list @@ -187,7 +190,7 @@ def get_latest_models_and_download(time_horizon: str, test_query = {"pmd:TSO": "TERNA", "pmd:timeHorizon": "2D", - "pmd:scenarioDate": "2024-02-15T22:30:00Z", + "pmd:scenarioDate": "2025-02-15T22:30:00Z", } test_filter = "now-2w" test_response = query_data(test_query, query_filter=test_filter, return_payload=True) diff --git a/emf/loadflow_tool/model_merger/merge_functions.py b/emf/loadflow_tool/model_merger/merge_functions.py index 6be33c3..fc1f519 100644 --- a/emf/loadflow_tool/model_merger/merge_functions.py +++ b/emf/loadflow_tool/model_merger/merge_functions.py @@ -656,17 +656,21 @@ def check_and_fix_dependencies(cgm_sv_data, cgm_ssh_data, original_data): (cgm_ssh_data['VALUE'].str.contains('SteadyStateHypothesis'))] dependencies = pandas.concat([tp_file_ids, ssh_file_ids], ignore_index=True, sort=False) existing_dependencies = cgm_sv_data[cgm_sv_data['KEY'] == 'Model.DependentOn'] - if existing_dependencies.empty or len(existing_dependencies.index) < len(dependencies.index): - logger.info(f"Missing dependencies. Adding {len(dependencies.index)} dependencies to SV profile") + dependency_difference = existing_dependencies.merge(dependencies[['ID']].rename(columns={'ID': 'VALUE'}), + on='VALUE', how='outer', indicator=True) + if not dependency_difference.query('_merge == "right_only"').empty: + cgm_sv_data = triplets.rdf_parser.remove_triplet_from_triplet(cgm_sv_data, existing_dependencies) full_model_id = cgm_sv_data[(cgm_sv_data['KEY'] == 'Type') & (cgm_sv_data['VALUE'] == 'FullModel')] - new_dependencies = dependencies[['ID']].copy().rename(columns={'ID': 'VALUE'}).reset_index(drop=True) + dependencies_to_update = dependency_difference.query('_merge != "left_only"') + logger.info(f"Mismatch of dependencies. Inserting {len(dependencies_to_update.index)} " + f"dependencies to SV profile") + new_dependencies = dependencies_to_update[['VALUE']].copy().reset_index(drop=True) new_dependencies.loc[:, 'KEY'] = 'Model.DependentOn' new_dependencies.loc[:, 'ID'] = full_model_id['ID'].iloc[0] new_dependencies.loc[:, 'INSTANCE_ID'] = full_model_id['INSTANCE_ID'].iloc[0] cgm_sv_data = triplets.rdf_parser.update_triplet_from_triplet(cgm_sv_data, new_dependencies) return cgm_sv_data - def remove_small_islands(solved_data, island_size_limit): small_island = pandas.DataFrame(solved_data.query("KEY == 'TopologicalIsland.TopologicalNodes'").ID.value_counts()).reset_index().query("count <= @island_size_limit") solved_data = triplets.rdf_parser.remove_triplet_from_triplet(solved_data, small_island, columns=["ID"]) diff --git a/emf/loadflow_tool/model_merger/model_merger.py b/emf/loadflow_tool/model_merger/model_merger.py index 55fbc1f..7591e6b 100644 --- a/emf/loadflow_tool/model_merger/model_merger.py +++ b/emf/loadflow_tool/model_merger/model_merger.py @@ -114,6 +114,7 @@ def handle(self, task_object: dict, **kwargs): model_merge_report_send_to_elk = task_properties["send_merge_report"] pre_temp_fixes = task_properties['pre_temp_fixes'] post_temp_fixes = task_properties['post_temp_fixes'] + force_outage_fix = task_properties['force_outage_fix'] remove_non_generators_from_slack_participation = True @@ -201,6 +202,7 @@ def handle(self, task_object: dict, **kwargs): logger.error(f"Failed to run replacement: {error}") valid_models = valid_models + additional_models_data + valid_tso = [tso['pmd:TSO'] for tso in valid_models] # Return None if there are no models to be merged if not valid_models: @@ -228,7 +230,11 @@ def handle(self, task_object: dict, **kwargs): # Crosscheck replaced model outages with latest UAP if atleast one baltic model was replaced replaced_tso_list = [model['tso'] for model in merge_log['replaced_entity']] - if merging_area == 'BA' and any(tso in ['LITGRID', 'AST', 'ELERING'] for tso in replaced_tso_list): + # Fix model outages + if force_outage_fix: + merged_model, merge_log = fix_model_outages(merged_model, valid_tso, merge_log, scenario_datetime, time_horizon, + debug=False) + elif merging_area == 'BA' and any(tso in ['LITGRID', 'AST', 'ELERING'] for tso in replaced_tso_list): merged_model, merge_log = fix_model_outages(merged_model, replaced_tso_list, merge_log, scenario_datetime, time_horizon) # Various fixes from igmsshvscgmssh error @@ -477,13 +483,13 @@ def handle(self, task_object: dict, **kwargs): "job_period_start": "2024-05-24T22:00:00+00:00", "job_period_end": "2024-05-25T06:00:00+00:00", "task_properties": { - "timestamp_utc": "2025-01-06T08:30:00+00:00", + "timestamp_utc": "2025-02-15T08:30:00+00:00", "merge_type": "BA", "merging_entity": "BALTICRCC", - "included": ['PSE', 'AST', 'ELERING'], + "included": ['PSE', 'AST', 'LITGRID'], "excluded": [], - "local_import": ['LITGRID'], - "time_horizon": "2D", + "local_import": ['ELERING'], + "time_horizon": "ID", "version": "99", "mas": "http://www.baltic-rsc.eu/OperationalPlanning", "pre_temp_fixes": "True", @@ -493,7 +499,8 @@ def handle(self, task_object: dict, **kwargs): "scaling": "True", "upload_to_opdm": "False", "upload_to_minio": "False", - "send_merge_report": "True", + "send_merge_report": "False", + "force_outage_fix": "True", } } diff --git a/emf/loadflow_tool/model_merger/temporary_fixes.py b/emf/loadflow_tool/model_merger/temporary_fixes.py index d3ccbcf..15f80e8 100644 --- a/emf/loadflow_tool/model_merger/temporary_fixes.py +++ b/emf/loadflow_tool/model_merger/temporary_fixes.py @@ -58,10 +58,10 @@ def run_post_merge_processing(input_models, solved_model, task_properties, SMALL return sv_data, ssh_data -def fix_model_outages(merged_model, replaced_model_list: list, merge_log, scenario_datetime, time_horizon): +def fix_model_outages(merged_model, model_list: list, merge_log, scenario_datetime, time_horizon, debug=False): area_map = {"LITGRID": "Lithuania", "AST": "Latvia", "ELERING": "Estonia"} - outage_areas = [area_map.get(item, item) for item in replaced_model_list] + outage_areas = [area_map.get(item, item) for item in model_list] elk_service = elastic.Elastic() @@ -101,16 +101,18 @@ def fix_model_outages(merged_model, replaced_model_list: list, merge_log, scenar model_outages = pd.DataFrame(get_model_outages(merged_model['network'])) mapped_model_outages = pd.merge(model_outages, mrid_map, left_on='grid_id', right_on='mrid', how='inner') model_area_map = {"LITGRID": "LT", "AST": "LV", "ELERING": "EE"} - model_outage_areas = [model_area_map.get(item, item) for item in replaced_model_list] + model_outage_areas = [model_area_map.get(item, item) for item in model_list] filtered_model_outages = mapped_model_outages[mapped_model_outages['country'].isin(model_outage_areas)] - logger.info("Fixing outages inside merged model:") + logger.info("Fixing outages inside merged model") # Reconnecting outages from network-config list + logger.info("Reconnecting outages from network-config list") for index, outage in filtered_model_outages.iterrows(): try: if merged_model['network'].connect(outage['grid_id']): - logger.info(f" {outage['name']} {outage['grid_id']} successfully reconnected") + if debug: + logger.info(f" {outage['name']} {outage['grid_id']} successfully reconnected") merge_log.update({'outages_corrected': True}) merge_log.get('outage_fixes').extend([{'name': outage['name'], 'grid_id': outage['grid_id'], "eic": outage['eic'], "outage_status": "connected"}]) else: @@ -124,10 +126,12 @@ def fix_model_outages(merged_model, replaced_model_list: list, merge_log, scenar continue # Applying outages from UAP + logger.info("Applying outages from UAP") for index, outage in mapped_outages.iterrows(): try: if merged_model['network'].disconnect(outage['grid_id']): - logger.info(f"{outage['name']} {outage['grid_id']} successfully disconnected") + if debug: + logger.info(f"{outage['name']} {outage['grid_id']} successfully disconnected") merge_log.update({'outages_corrected': True}) merge_log.get('outage_fixes').extend([{'name': outage['name'], 'grid_id': outage['grid_id'], "eic": outage['eic'], "outage_status": "disconnected"}]) else: diff --git a/emf/task_generator/manual_worker.py b/emf/task_generator/manual_worker.py index 84c6083..338496e 100644 --- a/emf/task_generator/manual_worker.py +++ b/emf/task_generator/manual_worker.py @@ -61,6 +61,7 @@ process_config_json[0]['runs'][0]['properties']['send_merge_report'] = SEND_MERGE_REPORT process_config_json[0]['runs'][0]['properties']['pre_temp_fixes'] = PRE_TEMP_FIXES process_config_json[0]['runs'][0]['properties']['post_temp_fixes'] = POST_TEMP_FIXES +process_config_json[0]['runs'][0]['properties']['force_outage_fix'] = FORCE_OUTAGE_FIX if PROCESS_TIME_SHIFT: diff --git a/emf/task_generator/worker.py b/emf/task_generator/worker.py index 21787a3..cb90951 100644 --- a/emf/task_generator/worker.py +++ b/emf/task_generator/worker.py @@ -28,6 +28,8 @@ runs['properties']['send_merge_report'] = SEND_MERGE_REPORT_CGM runs['properties']['pre_temp_fixes'] = PRE_TEMP_FIXES runs['properties']['post_temp_fixes'] = POST_TEMP_FIXES + runs['properties']['force_outage_fix'] = FORCE_OUTAGE_FIX + for runs in process_config_json[1]['runs']: runs['properties']['included'] = [tso.strip() for tso in RMM_INCLUDED_TSO.split(',')] if RMM_INCLUDED_TSO else [] runs['properties']['excluded'] = [tso.strip() for tso in RMM_EXCLUDED_TSO.split(',')] if RMM_EXCLUDED_TSO else [] @@ -40,6 +42,7 @@ runs['properties']['send_merge_report'] = SEND_MERGE_REPORT_RMM runs['properties']['pre_temp_fixes'] = PRE_TEMP_FIXES runs['properties']['post_temp_fixes'] = POST_TEMP_FIXES + runs['properties']['force_outage_fix'] = FORCE_OUTAGE_FIX with open(process_conf, 'w') as file: json.dump(process_config_json, file, indent=1)