diff --git a/.gitignore b/.gitignore
index ec43f5a..4a503a5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1,3 @@
.idea/
-containers.txt
\ No newline at end of file
+containers.txt
+.DS_Store
\ No newline at end of file
diff --git a/config/cgm_worker/model_merge.properties b/config/cgm_worker/model_merge.properties
new file mode 100644
index 0000000..c3aa7d3
--- /dev/null
+++ b/config/cgm_worker/model_merge.properties
@@ -0,0 +1,9 @@
+[MAIN]
+RMQ_CGM_QUEUE = rabbit-task-test
+EMF_OS_MINIO_BUCKET = opde-confidential-models
+EMF_OS_MINIO_FOLDER = EMF_test_merge_find_better_place
+EMF_OS_MINIO_BOUNDARY_BUCKET = opdm-data
+EMF_OS_MINIO_BOUNDARY_FOLDER = CGMES/ENTSOE
+ELK_VERSION_INDEX = emfos-logs*
+MERGE_TYPES = {'CGM': {'AREA': 'EU', 'INCLUDED':[], 'EXCLUDED': ['APG'], 'LOCAL_IMPORT': []}, 'RMM': {'AREA':'BA', 'INCLUDED': ['ELERING', 'AST', 'LITGRID','PSE'], 'EXLUDED': [], 'LOCAL_IMPORT': []}}
+MERGING_ENTITY = BALTICRSC
\ No newline at end of file
diff --git a/config/cgm_worker/validator.properties b/config/cgm_worker/validator.properties
index 78a4236..0dbf3a1 100644
--- a/config/cgm_worker/validator.properties
+++ b/config/cgm_worker/validator.properties
@@ -1,2 +1,4 @@
[MAIN]
-ELK_INDEX = emfos-igm-validation
\ No newline at end of file
+ELK_INDEX = emfos-igm-validation
+ENTSOE_EXAMPLES_EXTERNAL=https://www.entsoe.eu/Documents/CIM_documents/Grid_Model_CIM/QoCDC_v3.2.1_test_models.zip
+ENTSOE_EXAMPLES_LOCAL=./example_models/
\ No newline at end of file
diff --git a/config/logging/custom_logger.properties b/config/logging/custom_logger.properties
index 2fffffa..d5d6f21 100644
--- a/config/logging/custom_logger.properties
+++ b/config/logging/custom_logger.properties
@@ -2,4 +2,13 @@
LOGGING_INDEX = emfos-logs
LOGGING_FORMAT = %(levelname) -10s %(asctime) -10s %(name) -50s %(funcName) -50s %(lineno) -5d: %(message)s
LOGGING_DATEFMT = %Y-%m-%d %H:%M:%S
-LOGGING_LEVEL = INFO
\ No newline at end of file
+LOGGING_LEVEL = INFO
+LOCAL_FOLDER_FOR_PYPOWSYBL_LOGS = ./pypowsybl-logs
+ELASTIC_INDEX_FOR_PYPOWSYBL_LOGS = emfos-pypowsybl-logs
+MINIO_BUCKET_FOR_PYPOWSYBL_LOGS = external
+MINIO_FOLDER_FOR_PYPOWSYBL_LOGS = EMF_OS_pypowsybl_logs
+ELASTIC_FIELD_FOR_LOG_DATA = log_data
+ELASTIC_FIELD_FOR_TSO = tso
+ELASTIC_FIELD_FOR_TOPIC = topic
+ELASTIC_FIELD_FOR_FILENAME = log_file_name
+ELASTIC_FIELD_FOR_MINIO_BUCKET = minio_bucket
\ No newline at end of file
diff --git a/emf/common/integrations/elastic.py b/emf/common/integrations/elastic.py
index 7853676..8dbb0cf 100644
--- a/emf/common/integrations/elastic.py
+++ b/emf/common/integrations/elastic.py
@@ -11,12 +11,21 @@
import warnings
from elasticsearch.exceptions import ElasticsearchWarning
+
warnings.simplefilter('ignore', ElasticsearchWarning)
logger = logging.getLogger(__name__)
parse_app_properties(caller_globals=globals(), path=config.paths.integrations.elastic)
+SCROLL_ID_FIELD = '_scroll_id'
+RESULT_FIELD = 'hits'
+DOCUMENT_COUNT = 10000
+DEFAULT_COLUMNS = ["value"]
+INITIAL_SCROLL_TIME = "15m"
+CONSECUTIVE_SCROLL_TIME = "12m"
+MAGIC_KEYWORD = '_source'
+
class Elastic:
@@ -60,7 +69,10 @@ def send_to_elastic(index,
# Executing POST to push message into ELK
if debug:
logger.debug(f"Sending data to {url}")
- response = requests.post(url=url, json=json_message)
+ if json_message.get('args', None): # TODO revise if this is best solution
+ json_message.pop('args')
+ json_data = json.dumps(json_message, default=str)
+ response = requests.post(url=url, data=json_data.encode(), headers={"Content-Type": "application/json"})
if debug:
logger.debug(f"ELK response: {response.content}")
@@ -92,9 +104,16 @@ def send_to_elastic_bulk(index,
if id_from_metadata:
id_separator = "_"
- json_message_list = [value for element in json_message_list for value in ({"index": {"_index": index, "_id": id_separator.join([str(element.get(key, '')) for key in id_metadata_list])}}, element)]
+ json_message_list = [value for element in json_message_list for value in ({"index": {"_index": index,
+ "_id": id_separator.join(
+ [str(element.get(
+ key, '')) for
+ key in
+ id_metadata_list])}},
+ element)]
else:
- json_message_list = [value for element in json_message_list for value in ({"index": {"_index": index}}, element)]
+ json_message_list = [value for element in json_message_list for value in
+ ({"index": {"_index": index}}, element)]
response_list = []
for batch in range(0, len(json_message_list), batch_size):
@@ -102,7 +121,7 @@ def send_to_elastic_bulk(index,
if debug:
logger.debug(f"Sending batch ({batch}-{batch + batch_size})/{len(json_message_list)} to {url}")
response = requests.post(url=url,
- data=(ndjson.dumps(json_message_list[batch:batch + batch_size])+"\n").encode(),
+ data=(ndjson.dumps(json_message_list[batch:batch + batch_size]) + "\n").encode(),
timeout=None,
headers={"Content-Type": "application/x-ndjson"})
if debug:
@@ -128,6 +147,55 @@ def get_docs_by_query(self, index, query, size=None, return_df=True):
return response
+ def get_data_by_scrolling(self,
+ query: dict,
+ index: str,
+ fields: []):
+ """
+ Gets a large bulk of data from elastic
+ :param query: dictionary with parameters by which to select data from elastic
+ :param index: table name in elastic
+ :param fields: fields or columns to return from query
+ :return: dataframe with results
+ """
+ result = self.client.search(index=index,
+ query=query,
+ source=fields,
+ size=DOCUMENT_COUNT,
+ scroll=INITIAL_SCROLL_TIME)
+
+ scroll_id = result[SCROLL_ID_FIELD]
+ # Extract and return the relevant data from the initial response
+ hits = result[RESULT_FIELD][RESULT_FIELD]
+ yield hits
+ # Continue scrolling through the results until there are no more
+ while hits:
+ result = self.client.scroll(scroll_id=scroll_id, scroll=CONSECUTIVE_SCROLL_TIME)
+ hits = result[RESULT_FIELD][RESULT_FIELD]
+ yield hits
+ # Clear the scroll context after processing all results
+ self.client.clear_scroll(scroll_id=scroll_id)
+
+ def get_data(self,
+ query: dict,
+ index: str,
+ fields: [] = None):
+ """
+ Gets data from elastic
+ :param query: dictionary with parameters by which to select data from elastic
+ :param index: table name in elastic
+ :param fields: fields or columns to return from query
+ :return: dataframe with results
+ """
+ # Gather all the results to list (of dictionaries)
+ list_of_lines = []
+ for hits in self.get_data_by_scrolling(query=query, index=index, fields=fields):
+ for hit in hits:
+ list_of_lines.append({field: hit[MAGIC_KEYWORD][field] for field in fields})
+ # convert list (of dictionaries) to pandas dataframe
+ data_frame = pd.DataFrame(list_of_lines)
+ return data_frame
+
def query_schedules_from_elk(self,
index: str,
utc_start: str,
@@ -187,7 +255,6 @@ def __init__(self,
auth=None,
verify=False,
debug=False):
-
self.index = index
self.server = server
self.id_from_metadata = id_from_metadata
@@ -203,7 +270,6 @@ def __init__(self,
self.session.auth = auth
def handle(self, byte_string, properties):
-
Elastic.send_to_elastic_bulk(index=self.index,
json_message_list=json.loads(byte_string),
id_from_metadata=self.id_from_metadata,
@@ -215,7 +281,6 @@ def handle(self, byte_string, properties):
if __name__ == '__main__':
-
# Create client
server = "http://test-rcc-logs-master.elering.sise:9200"
service = Elastic(server=server)
diff --git a/emf/common/integrations/object_storage/object_storage.py b/emf/common/integrations/object_storage/object_storage.py
index 8e0c3ae..ce3c676 100644
--- a/emf/common/integrations/object_storage/object_storage.py
+++ b/emf/common/integrations/object_storage/object_storage.py
@@ -68,9 +68,10 @@ def get_content(metadata: dict, bucket_name=MINIO_BUCKET_NAME):
logger.info(f"Getting data from MinIO")
for component in metadata["opde:Component"]:
content_reference = component.get("opdm:Profile").get("pmd:content-reference")
+ # Minio considers // as /
+ content_reference = content_reference.replace('//', '/')
logger.info(f"Downloading {content_reference}")
component["opdm:Profile"]["DATA"] = minio_service.download_object(bucket_name, content_reference)
-
return metadata
diff --git a/emf/common/logging/custom_logger.py b/emf/common/logging/custom_logger.py
index ca00cff..e16427c 100644
--- a/emf/common/logging/custom_logger.py
+++ b/emf/common/logging/custom_logger.py
@@ -1,13 +1,61 @@
+import os.path
import sys
import logging
+from datetime import datetime, timedelta
+from enum import Enum
+from io import BytesIO
+from zipfile import ZipFile
+
import requests
+
from emf.common.integrations import elastic
import config
from emf.common.config_parser import parse_app_properties
+from emf.common.integrations.minio import ObjectStorage
logger = logging.getLogger(__name__)
parse_app_properties(caller_globals=globals(), path=config.paths.logging.custom_logger)
+PYPOWSYBL_LOGGER = 'powsybl'
+PYPOWSYBL_LOGGER_DEFAULT_LEVEL = 1
+CUSTOM_LOG_BUFFER_LINE_BREAK = '\r\n'
+# Max allowed lifespan of link to file in minio bucket
+DAYS_TO_STORE_DATA_IN_MINIO = 7 # Max allowed by Minio
+# Default name of the subfolder for storing the results if needed
+SEPARATOR_SYMBOL = '/'
+WINDOWS_SEPARATOR = '\\'
+
+
+def check_the_folder_path(folder_path: str):
+ """
+ Checks folder path for special characters
+ :param folder_path: input given
+ :return checked folder path
+ """
+ if not folder_path.endswith(SEPARATOR_SYMBOL):
+ folder_path = folder_path + SEPARATOR_SYMBOL
+ double_separator = SEPARATOR_SYMBOL + SEPARATOR_SYMBOL
+ # Escape '//'
+ folder_path = folder_path.replace(double_separator, SEPARATOR_SYMBOL)
+ # Escape '\'
+ folder_path = folder_path.replace(WINDOWS_SEPARATOR, SEPARATOR_SYMBOL)
+ return folder_path
+
+
+def save_content_to_zip_file(content: {}):
+ """
+ Saves content to zip file (in memory)
+ :param content: the content of zip file (key: file name, value: file content)
+ :return: byte array
+ """
+ output_object = BytesIO()
+ with ZipFile(output_object, "w") as output_zip:
+ if content:
+ for file_name in content:
+ logger.info(f"Converting {file_name} to zip container")
+ output_zip.writestr(file_name, content[file_name])
+ output_object.seek(0)
+ return output_object.getvalue()
def initialize_custom_logger(
@@ -18,8 +66,7 @@ def initialize_custom_logger(
index: str = LOGGING_INDEX,
extra: None | dict = None,
fields_filter: None | list = None,
- ):
-
+):
root_logger = logging.getLogger()
root_logger.setLevel(level)
root_logger.propagate = True
@@ -65,9 +112,11 @@ def elk_connection(self):
logger.info(f"Connection to {self.server} successful")
return True
else:
- logger.warning(f"ELK server response: [{response.status_code}] {response.reason}. Disabling ELK logging.")
+ logger.warning(
+ f"ELK server response: [{response.status_code}] {response.reason}. Disabling ELK logging.")
except requests.exceptions.ConnectTimeout:
- logger.warning(f"ELK server {self.server} does not responding with ConnectTimeout error. Disabling ELK logging.")
+ logger.warning(
+ f"ELK server {self.server} does not responding with ConnectTimeout error. Disabling ELK logging.")
except Exception as e:
logger.warning(f"ELK server {self.server} returned unknown error: {e}")
@@ -89,6 +138,515 @@ def emit(self, record):
elastic.Elastic.send_to_elastic(index=self.index, json_message=elk_record, server=self.server)
+class LogStream(object):
+ """
+ Some custom container for storing log related data
+ """
+
+ def __init__(self, formatter):
+ # self.logs = ''
+ self.logs = []
+ self.formatter = formatter
+ self.single_entry = None
+
+ def write(self, message):
+ """
+ Writes the log message to the buffer
+ :param message: log message
+ """
+ # formatted_message = self.formatter.format(message)
+ # self.logs += formatted_message
+ # self.logs += CUSTOM_LOG_BUFFER_LINE_BREAK
+ self.logs.append(message)
+
+ def format_for_writing(self):
+ """
+ Reduces double linebreaks to single linebreaks and some little adjustments
+ """
+ # double_of_line_break = CUSTOM_LOG_BUFFER_LINE_BREAK + CUSTOM_LOG_BUFFER_LINE_BREAK
+ # self.logs.replace(double_of_line_break, CUSTOM_LOG_BUFFER_LINE_BREAK)
+ # self.logs = '\n'.join(self.logs.splitlines())
+ pass
+
+ def flush(self):
+ pass
+
+ def reset(self):
+ """
+ Resets the internal buffers
+ """
+ self.logs = []
+ self.single_entry = None
+
+ def get_logs(self):
+ """
+ Gets the content
+ :return: tuple of logs and entry that triggered reporting process
+ """
+ return self.logs, self.single_entry
+
+
+class PyPowsyblLogReportingPolicy(Enum):
+ """
+ Some additional reporting types
+ """
+ """
+ Gathers all the pypowsybl output and reports everything when stop_working is called
+ """
+ ALL_ENTRIES = "all_entries"
+ """
+ Gathers all the pypowsybl output and reports when at least one entry reached to logging level
+ """
+ ENTRIES_IF_LEVEL_REACHED = "entries_if_level_was_reached"
+ """
+ Reports a single logging record that was over the logging level
+ """
+ ENTRY_ON_LEVEL = "entry_on_level"
+ """
+ Gathers only entries that are on the level or higher level
+ """
+ ENTRIES_ON_LEVEL = "entries_on_level_and_higher"
+ """
+ Reports all collected logging records from the last point when the level was reached
+ """
+ ENTRIES_COLLECTED_TO_LEVEL = "entries_collected_to_level"
+
+
+def get_buffer_size(buffer):
+ """
+ Returns the length of the buffer
+ :param buffer: input buffer
+ :return: length of the buffer
+ """
+ return len(buffer.encode('utf-8'))
+
+
+class PyPowsyblLogGatherer:
+ """
+ Governing class for the PyPowsyblLogHandler
+ Note that for posting the data to elastic, minio the default configuration (elastic.properties, mini.properties)
+ is used
+ """
+
+ def __init__(self,
+ topic_name: str = None,
+ reporting_level=None,
+ tso: str = None,
+ print_to_console: bool = False,
+ send_to_elastic: bool = True,
+ upload_to_minio: bool = False,
+ report_on_command: bool = True,
+ path_to_local_folder: str = LOCAL_FOLDER_FOR_PYPOWSYBL_LOGS,
+ minio_bucket: str = MINIO_BUCKET_FOR_PYPOWSYBL_LOGS,
+ logging_policy: PyPowsyblLogReportingPolicy = PyPowsyblLogReportingPolicy.ENTRIES_IF_LEVEL_REACHED,
+ elk_server=elastic.ELK_SERVER,
+ index=ELASTIC_INDEX_FOR_PYPOWSYBL_LOGS):
+ """
+ Initializes the pypowsybl log gatherer.
+ :param topic_name: name (string) that can be used to distinguish log files
+ :param reporting_level: logging.level which triggers reporting
+ :param tso: the name of the tso (for naming the files)
+ :param print_to_console: If True then prints the pypowsybl log to console, false: consume it internally
+ :param send_to_elastic: If True then posts a log entry that triggered the gathering to elastic
+ :param upload_to_minio: If True then posts a log buffer to minio as .log file
+ :param report_on_command: If true then log entries/buffer are reported when entry has triggered gathering
+ and dedicated function is called manually (e.g. report when all validation failed)
+ :param minio_bucket: the name of the bucket in minio
+ :param logging_policy: determines which and how to collect (entire log or only entries on the level etc)
+ :param elk_server: name of the elk server instance
+ :index: name of the index in elastic search where to post the log entries
+ """
+ self.topic_name = topic_name
+ self.formatter = logging.Formatter(LOGGING_FORMAT)
+ self.package_logger = logging.getLogger(PYPOWSYBL_LOGGER)
+ self.package_logger.setLevel(PYPOWSYBL_LOGGER_DEFAULT_LEVEL)
+ self.reporting_level = reporting_level
+ self.tso = tso
+ self.report_on_command = report_on_command
+ self.reporting_triggered_externally = True
+ self.reset_triggers_for_reporting()
+ if self.reporting_level is None:
+ self.reporting_level = logging.ERROR
+ self.gathering_handler = PyPowsyblLogGatheringHandler(formatter=self.formatter,
+ parent=self,
+ report_level=self.reporting_level)
+ self.package_logger.addHandler(self.gathering_handler)
+ # Switch reporting to console on or off
+ self.package_logger.propagate = print_to_console
+ self.path_to_local_folder = check_the_folder_path(path_to_local_folder)
+ # Initialize the elk instance
+ self.elastic_server = elk_server
+ self.index = index
+ self.send_to_elastic = send_to_elastic
+ self.report_to = True
+ self.logging_policy = None
+ self.set_reporting_policy(logging_policy)
+
+ self.minio_instance = None
+ self.minio_bucket = minio_bucket
+ if upload_to_minio:
+ try:
+ self.minio_instance = ObjectStorage()
+ except Exception as ex:
+ # Check the exception
+ logger.warning(f"Cannot connect to Minio, staying offline")
+ self.identifier = datetime.now().strftime("%d-%m-%Y_%H-%M-%S")
+ # if needed use identifier
+ # self.identifier = uuid.uuid4()
+
+ def set_report_on_command(self, report_on_command: bool = False):
+ """
+ Sets manual reporting status
+ if report_on_command is true then reporting happens when self.report_to is true and trigger_to_report_externally
+ is called with value true
+ if report_on_command is false then reporting (posting the logs) happens when self.report_to is true
+ :param report_on_command: new status for manual reporting
+ """
+ self.report_on_command = report_on_command
+ self.reporting_triggered_externally = False if self.report_on_command else True
+
+ def reset_triggers_for_reporting(self):
+ """
+ Resets the triggers used:
+ reporting_to which triggered by the log entry
+ reporting_triggered_externally which triggered outside manually
+ """
+ self.reporting_triggered_externally = False if self.report_on_command else True
+ self.report_to = False
+
+ @property
+ def elastic_is_connected(self):
+ """
+ Borrowed from elastic handler above
+ Do check up to elastic, handle errors
+ """
+ try:
+ response = requests.get(self.elastic_server, timeout=5)
+ if response.status_code == 200:
+ return True
+ else:
+ logger.warning(
+ f"ELK server response: [{response.status_code}] {response.reason}. Disabling ELK logging.")
+ except requests.exceptions.ConnectTimeout:
+ logger.warning(f"{self.elastic_server}: Timeout. Disabling ELK logging.")
+ except Exception as e:
+ logger.warning(f"{self.elastic_server}: unknown error: {e}")
+ return False
+
+ def post_log_report(self, buffer='', single_entry=None):
+ """
+ Handles the created report by sending it to elastic or saving it to local storage
+ Checks if send_to_elastic is enabled and instance of elastic is available. Composes a message where fields
+ are standard field of a logging.Record. Adds the entire log as a string to log_data field
+ :param buffer: buffer containing log entries
+ :param single_entry: first entry that reached to required level
+ """
+ try:
+ if self.send_to_elastic and self.elastic_is_connected:
+ elastic_content = self.compose_elastic_message(buffer, single_entry)
+ if elastic_content is not None:
+ response = elastic.Elastic.send_to_elastic(index=self.index,
+ json_message=elastic_content,
+ server=self.elastic_server)
+ if response.ok:
+ # TODO: Is message pending needed?
+ # For example if sending message failed, keep it somewhere and send it when connection is
+ # available
+ self.reset_triggers_for_reporting()
+ return
+ raise ConnectionError
+ except ConnectionError:
+ if not self.send_to_elastic:
+ logger.info("Saving log to local storage")
+ else:
+ logger.error(f"Sending log to elastic failed, saving to local storage...")
+ self.compose_log_file(buffer, single_entry)
+ self.reset_triggers_for_reporting()
+ # except Exception:
+ # logger.error(f"Unable to post log report: {Exception}")
+
+ def set_reporting_policy(self, new_policy: PyPowsyblLogReportingPolicy):
+ """
+ Updates logging policy to new value
+ :param new_policy: new policy value
+ """
+ self.logging_policy = new_policy
+ self.gathering_handler.set_reporting_policy(self.logging_policy)
+ if self.logging_policy != PyPowsyblLogReportingPolicy.ALL_ENTRIES:
+ self.reset_triggers_for_reporting()
+
+ def set_tso(self, tso_name: str):
+ """
+ Sets the tso to new tso, handles the log of previous tso
+ :param tso_name: name of tso
+ """
+ self.stop_working()
+ self.gathering_handler.start_gathering()
+ self.tso = tso_name
+
+ def trigger_to_report_externally(self, trigger_reporting: bool = True):
+ """
+ Calls reporting when self.report_on_command is set to true
+ NOTE: That this works on policies which report at the end
+ :param trigger_reporting: if true then if self.report_to is true (set by log entry) the log entry/buffers
+ are reported otherwise not
+ """
+ if self.report_on_command:
+ self.reporting_triggered_externally = trigger_reporting
+
+ def set_to_reporting(self):
+ """
+ Handles the logging event, decides whether and what to post:
+ Posts if
+ 1) log entry that reached to level when policy is set to PyPowsyblLogReportingPolicy.ENTRY_ON_LEVEL
+ 2) log entry and log buffer when policy is set to PyPowsyblLogReportingPolicy.ENTRIES_COLLECTED_TO_LEVEL
+ """
+ self.report_to = True
+ # logger.info(f"{self.topic_name}: {self.get_reporting_level()} from Pypowsybl, setting to report")
+ if self.logging_policy == PyPowsyblLogReportingPolicy.ENTRY_ON_LEVEL:
+ logger.info(f"Passing at once")
+ buffer, single_entry = self.get_logs()
+ self.post_log_report(single_entry=single_entry)
+ elif self.logging_policy == PyPowsyblLogReportingPolicy.ENTRIES_COLLECTED_TO_LEVEL:
+ logger.info(f"Sending content gathered")
+ buffer, single_entry = self.get_logs()
+ self.post_log_report(buffer=buffer, single_entry=single_entry)
+
+ def start_working(self):
+ """
+ Starts gathering the logs
+ """
+ self.gathering_handler.start_gathering()
+
+ def stop_working(self):
+ """
+ Stops gathering the logs, retrieves them from buffer and decides whether to post them:
+ posts if
+ 1) self.logging_policy is set to PyPowsyblLogReportingPolicy.ALL_ENTRIES or
+ 2) self.logging_policy is set to PyPowsyblLogReportingPolicy.ENTRIES_IF_LEVEL_REACHED and level was reached
+ (self.report_to is True)
+ : return: None
+ """
+ self.gathering_handler.stop_gathering()
+ buffer, single_entry = self.get_logs()
+ if (buffer is None or buffer == '') and single_entry is None:
+ return
+ # Check if post is needed
+ # 1. If reporting was set to be triggered externally and no triggering case occurred
+ if self.report_on_command is False or self.reporting_triggered_externally is True:
+ # 2. If other conditions are met
+ if (self.logging_policy == PyPowsyblLogReportingPolicy.ALL_ENTRIES or
+ (self.report_to and
+ (self.logging_policy == PyPowsyblLogReportingPolicy.ENTRIES_IF_LEVEL_REACHED or
+ self.logging_policy == PyPowsyblLogReportingPolicy.ENTRIES_ON_LEVEL))):
+ self.post_log_report(buffer, single_entry)
+ self.reset_triggers_for_reporting()
+
+ def get_logs(self):
+ """
+ Gets and formats logs
+ """
+ log_list, single_log = self.gathering_handler.get_buffer()
+ buffer = self.format_buffer_to_string(log_list)
+ single_entry = single_log
+ return buffer, single_entry
+
+ def format_buffer_to_string(self, buffer):
+ """
+ Returns log buffer combined to a string
+ Note! Be aware of the line break, it is currently set to Windows style!
+ """
+ return CUSTOM_LOG_BUFFER_LINE_BREAK.join([self.formatter.format(message) for message in buffer])
+
+ def get_reporting_level(self):
+ """
+ Gets required logging.Loglevel as a string
+ : return log level as a string
+ """
+ return logging.getLevelName(self.reporting_level)
+
+ def compose_log_file(self, buffer: str = '', single_entry: logging.LogRecord = None, file_name: str = None):
+ """
+ Saves buffer to local log file: buffer if exists, last entry otherwise
+ :param buffer: buffer containing log entries
+ :param single_entry: first entry that reached to required level
+ :param file_name: name of the file where the content should be saved. Note that if not specified, the
+ a default file name will be used (combination of topic, tso and date and time of the analysis)
+ :return log message dictionary
+ """
+ file_name = self.check_and_get_file_name(file_name)
+ if buffer != '' and buffer is not None:
+ payload = '\n'.join(buffer.splitlines())
+ elif single_entry is not None:
+ payload = self.formatter.format(single_entry)
+ else:
+ return None
+ # And create directories
+ directory_name = os.path.dirname(file_name)
+ if not os.path.exists(directory_name):
+ os.makedirs(directory_name)
+ with open(file_name, mode='w', encoding="utf-8") as log_file:
+ log_file.write(payload)
+ return file_name
+
+ def check_and_get_file_name(self, file_name: str = None, use_folders: bool = True, use_local: bool = True):
+ """
+ Gets some predefined file name to be used when saving the logs
+ :param file_name: the input, if exists, leave empty otherwise
+ :param use_folders: create sub folders for storing file
+ :param use_local: store as a relative path when saving to local computer
+ :return file name
+ """
+ if file_name is None or file_name == '':
+ time_moment_now = datetime.now().strftime("%d-%m-%Y_%H-%M-%S")
+ file_name = f"{self.topic_name}_pypowsybl_error_log_for_{self.tso}_from_{time_moment_now}.log"
+ if use_folders:
+ file_name = (MINIO_FOLDER_FOR_PYPOWSYBL_LOGS +
+ SEPARATOR_SYMBOL +
+ str(self.identifier) +
+ SEPARATOR_SYMBOL + file_name)
+ if use_local:
+ file_name = self.path_to_local_folder + file_name
+ return file_name
+
+ def post_log_to_minio(self, buffer='', file_name: str = None):
+ """
+ Posts log as a file to minio
+ :param buffer: logs as a string
+ :param file_name: if given
+ :return: file name and link to file, the link to the file
+ """
+ link_to_file = None
+ if self.minio_instance is not None and buffer != '' and buffer is not None:
+ # check if the given bucket exists
+ if not self.minio_instance.client.bucket_exists(bucket_name=self.minio_bucket):
+ logger.warning(f"{self.minio_bucket} does not exist")
+ return link_to_file
+ # Adjust the filename to the default n
+ file_name = self.check_and_get_file_name(file_name, use_local=False)
+ file_object = BytesIO(str.encode(buffer))
+ file_object.name = file_name
+ self.minio_instance.upload_object(file_path_or_file_object=file_object,
+ bucket_name=self.minio_bucket,
+ metadata=None)
+ time_to_expire = timedelta(days=DAYS_TO_STORE_DATA_IN_MINIO)
+ link_to_file = self.minio_instance.client.get_presigned_url(method="GET",
+ bucket_name=self.minio_bucket,
+ object_name=file_object.name,
+ expires=time_to_expire)
+ return file_name, link_to_file
+
+ def compose_elastic_message(self, buffer: str = '', single_entry: logging.LogRecord = None):
+ """
+ Put together a dictionary consisting of first log entry from the pypowsybl that met response level and the log
+ entry for the entire process
+ :param buffer: buffer containing log entries
+ :param single_entry: first entry that reached to required level
+ :return log message dictionary
+ """
+ message_dict = {}
+ # Add first log entry that reached to level as a content of the payload
+ if single_entry is not None and isinstance(single_entry, logging.LogRecord):
+ message_dict = single_entry.__dict__
+ file_name, link_to_log_file = self.post_log_to_minio(buffer=buffer)
+ message_dict[ELASTIC_FIELD_FOR_FILENAME] = file_name
+ if link_to_log_file != '' and link_to_log_file is not None:
+ message_dict[ELASTIC_FIELD_FOR_MINIO_BUCKET] = MINIO_BUCKET_FOR_PYPOWSYBL_LOGS
+ message_dict[ELASTIC_FIELD_FOR_LOG_DATA] = link_to_log_file
+ message_dict[ELASTIC_FIELD_FOR_TSO] = self.tso
+ message_dict[ELASTIC_FIELD_FOR_TOPIC] = self.topic_name
+ return message_dict
+
+
+class PyPowsyblLogGatheringHandler(logging.StreamHandler):
+ """
+ Initializes custom log handler to start and gather logs.
+ Depending on the policy either gathers logs to buffer or looks out for log entry which on the report level or
+ does both
+ """
+
+ def __init__(self,
+ formatter: logging.Formatter,
+ parent: PyPowsyblLogGatherer = None,
+ logging_policy: PyPowsyblLogReportingPolicy = PyPowsyblLogReportingPolicy.ALL_ENTRIES,
+ report_level=logging.ERROR):
+ """
+ Constructor:
+ :param formatter: the formatter for converting the log entries
+ :param parent: the parent to whom to report to
+ :logging_policy: check if buffer is needed or not
+ :report_level: log level when caught propagates to parent to trigger event
+ """
+ super().__init__()
+ self.parent = parent
+ self.gathering = False
+ self.originator_type = 'IGM_validation'
+ self.formatter = formatter
+ if self.formatter is None:
+ self.formatter = logging.Formatter(LOGGING_FORMAT)
+ self.gathering_buffer = LogStream(self.formatter)
+ self.report_level = report_level
+ self.logging_policy = None
+ self.write_all = False
+ self.write_only_levels = False
+ self.set_reporting_policy(logging_policy)
+
+ def set_reporting_policy(self, new_policy: PyPowsyblLogReportingPolicy):
+ """
+ Sets reporting policy to new value
+ :param new_policy: new policy value
+ """
+ self.logging_policy = new_policy
+ self.write_all = (self.logging_policy != PyPowsyblLogReportingPolicy.ENTRY_ON_LEVEL and
+ self.logging_policy != PyPowsyblLogReportingPolicy.ENTRIES_ON_LEVEL)
+ self.write_only_levels = self.logging_policy == PyPowsyblLogReportingPolicy.ENTRIES_ON_LEVEL
+
+ def emit(self, record: logging.LogRecord) -> None:
+ """
+ Stores the log output from pypowsybl to internal buffer. Looks for log level as event to trigger reporting
+ in parent
+ :param record: log record
+ """
+ if self.gathering:
+ # Bypass the buffer if the entire log is not required
+ if self.write_all:
+ self.gathering_buffer.write(message=record)
+ if record.levelno >= self.report_level:
+ if self.write_only_levels:
+ self.gathering_buffer.write(message=record)
+ self.gathering_buffer.single_entry = record
+ self.parent.set_to_reporting()
+
+ def start_gathering(self):
+ """
+ Resets the buffer to empty and turns gathering on
+ :return: None
+ """
+ self.reset_gathering()
+ self.gathering = True
+
+ def stop_gathering(self):
+ """
+ Stops the gathering, leaves the content to buffer
+ :return: None
+ """
+ self.gathering = False
+
+ def reset_gathering(self):
+ """
+ Resets the gathering status to default
+ :return: None
+ """
+ self.gathering_buffer.reset()
+
+ def get_buffer(self):
+ """
+ Returns gathering buffer and last entry, resets the buffer
+ :return: log stream instance
+ """
+ return self.gathering_buffer.get_logs()
+
+
if __name__ == '__main__':
# Start root logger
STREAM_LOG_FORMAT = "%(levelname) -10s %(asctime) -10s %(name) -35s %(funcName) -30s %(lineno) -5d: %(message)s"
@@ -105,3 +663,4 @@ def emit(self, record):
if elk_handler.connected:
logger.addHandler(elk_handler)
logger.info(f"Info message", extra={'extra': 'logger testing'})
+
diff --git a/emf/loadflow_tool/helper.py b/emf/loadflow_tool/helper.py
index 36c65e7..1cfc1cb 100644
--- a/emf/loadflow_tool/helper.py
+++ b/emf/loadflow_tool/helper.py
@@ -1,3 +1,4 @@
+import zipfile
from zipfile import ZipFile, ZIP_DEFLATED
from uuid import uuid4
from io import BytesIO
@@ -28,6 +29,10 @@
"iidm.export.cgmes.modeling-authority-set": "powsybl.org"
}
+NETWORK_KEYWORD = 'network'
+NETWORK_META_KEYWORD = 'network_meta'
+NETWORK_VALID_KEYWORD = 'network_valid'
+
# TODO - Add comments and docstring
def package_for_pypowsybl(opdm_objects, return_zip: bool = False):
@@ -153,15 +158,24 @@ def load_model(opdm_objects: List[dict]):
model_data["network_valid"] = network.validate().name
# Network model import reporter data
- # model_data["import_report"] = json.loads(import_report.to_json())
+ model_data["import_report"] = json.loads(import_report.to_json())
# model_data["import_report_str"] = str(import_report)
return model_data
def opdmprofile_to_bytes(opdm_profile):
+ # Temporary fix: input data (['opdm:Profile']['DATA']) can be a zip file, figure it out and extract
+ # before proceeding further
data = BytesIO(opdm_profile['opdm:Profile']['DATA'])
- data.name = opdm_profile['opdm:Profile']['pmd:fileName']
+ file_name = opdm_profile['opdm:Profile']['pmd:fileName']
+ if zipfile.is_zipfile(data) and not file_name.endswith('.zip'):
+ xml_tree_file = get_xml_from_zip(data)
+ bytes_object = BytesIO()
+ xml_tree_file.write(bytes_object, encoding='utf-8')
+ bytes_object.seek(0)
+ data = bytes_object
+ data.name = file_name
return data
@@ -339,16 +353,17 @@ def export_model(network: pypowsybl.network, opdm_object_meta, profiles=None):
profiles = "SV,SSH,TP,EQ"
file_base_name = filename_from_metadata(opdm_object_meta).split(".xml")[0]
-
- bytes_object = network.save_to_binary_buffer(
- format="CGMES",
- parameters={
- "iidm.export.cgmes.modeling-authority-set": opdm_object_meta['pmd:modelingAuthoritySet'],
- "iidm.export.cgmes.base-name": file_base_name,
- "iidm.export.cgmes.profiles": profiles,
- "iidm.export.cgmes.naming-strategy": "cgmes-fix-all-invalid-ids", # identity, cgmes, cgmes-fix-all-invalid-ids
- })
-
- bytes_object.name = f"{file_base_name}_{uuid.uuid4()}.zip"
-
- return bytes_object
+ try:
+ bytes_object = network.save_to_binary_buffer(
+ format="CGMES",
+ parameters={
+ "iidm.export.cgmes.modeling-authority-set": opdm_object_meta['pmd:modelingAuthoritySet'],
+ "iidm.export.cgmes.base-name": file_base_name,
+ "iidm.export.cgmes.profiles": profiles,
+ "iidm.export.cgmes.naming-strategy": "cgmes-fix-all-invalid-ids", # identity, cgmes, cgmes-fix-all-invalid-ids
+ })
+ bytes_object.name = f"{file_base_name}_{uuid.uuid4()}.zip"
+ return bytes_object
+ except pypowsybl._pypowsybl.PyPowsyblError as p_error:
+ logger.error(f"Pypowsybl error on export: {p_error}")
+ raise Exception(p_error)
diff --git a/emf/loadflow_tool/model_merge_handlers.py b/emf/loadflow_tool/model_merge_handlers.py
new file mode 100644
index 0000000..1450455
--- /dev/null
+++ b/emf/loadflow_tool/model_merge_handlers.py
@@ -0,0 +1,412 @@
+import datetime
+import logging
+import time
+import os
+
+import config
+import json
+import sys
+from json import JSONDecodeError
+
+from emf.common.config_parser import parse_app_properties
+from emf.common.integrations import elastic
+from aniso8601 import parse_datetime
+from emf.common.logging.custom_logger import ElkLoggingHandler
+from emf.loadflow_tool.model_merger import (CgmModelComposer, get_models, get_local_models, PROCESS_ID_KEYWORD,
+ RUN_ID_KEYWORD, JOB_ID_KEYWORD, save_merged_model_to_local_storage,
+ publish_merged_model_to_opdm, save_merged_model_to_minio,
+ publish_metadata_to_elastic, DEFAULT_MERGE_TYPES, AREA_KEYWORD,
+ DEFAULT_AREA, INCLUDED_TSO_KEYWORD, DownloadModels, EXCLUDED_TSO_KEYWORD,
+ get_version_number)
+from emf.task_generator.time_helper import parse_duration
+
+logger = logging.getLogger(__name__)
+
+parse_app_properties(caller_globals=globals(), path=config.paths.cgm_worker.model_merge)
+
+# TODO handle these constants
+NUMBER_OF_CGM_TRIES = 3
+NUMBER_OF_CGM_TRIES_KEYWORD = 'task_retry_count'
+TASK_TIMEOUT = 'PT5M'
+TASK_TIMEOUT_KEYWORD = 'task_timeout'
+SLEEP_BETWEEN_TRIES = 'PT1M'
+
+TASK_PROPERTIES_KEYWORD = 'task_properties'
+TIMESTAMP_KEYWORD = 'timestamp_utc'
+MERGE_TYPE_KEYWORD = 'merge_type'
+TIME_HORIZON_KEYWORD = 'time_horizon'
+
+SAVE_MERGED_MODEL_TO_LOCAL_STORAGE = False
+PUBLISH_MERGED_MODEL_TO_MINIO = True
+PUBLISH_MERGED_MODEL_TO_OPDM = True
+PUBLISH_METADATA_TO_ELASTIC = False
+
+failed_cases_collector = []
+succeeded_cases_collector = []
+
+
+def running_in_local_machine():
+ """
+ For debugging purposes only
+ """
+ return "PYCHARM_HOSTED" in os.environ
+
+
+def flatten_tuple(data):
+ """
+ Flattens the nested tuple to eventually a single level tuple.
+ Use this when passing args as is from one handler to another
+ :param data: tuple of arguments
+ :return levelled tuple
+ """
+ if isinstance(data, tuple):
+ if len(data) == 0:
+ return ()
+ else:
+ return flatten_tuple(data[0]) + flatten_tuple(data[1:])
+ else:
+ return (data,)
+
+
+def find_key(input_dictionary: dict, key):
+ """
+ Searches in depth for a key in dictionary
+ :param input_dictionary: from where to search
+ :param key: key to be searched
+ :return value of the key if found, None otherwise
+ """
+ if key in input_dictionary:
+ return input_dictionary[key]
+ for value in input_dictionary.values():
+ if isinstance(value, dict):
+ result = find_key(input_dictionary=value, key=key)
+ if result is not None:
+ return result
+ return None
+
+
+def get_payload(args, keyword: str = MERGE_TYPE_KEYWORD):
+ """
+ Searches keyword from args. Tries to parse the arg to dict and checks if keyword is present. if it
+ is returns the arg
+ :param args: tuple of args
+ :param keyword: keyword to be searched
+ :return argument which is dictionary and has the keyword or None
+ """
+ args = flatten_tuple(args)
+ if args and len(args) > 0:
+ for argument in args:
+ try:
+ if isinstance(argument, dict):
+ dict_value = argument
+ else:
+ dict_value = json.loads(argument.decode('utf-8'))
+ if not find_key(dict_value, keyword):
+ raise UnknownArgumentException
+ return dict_value
+ except JSONDecodeError:
+ continue
+ return None
+
+
+def run_sleep_timer(time_value: any = None):
+ """
+ Waits for some given time
+ :param time_value: seconds to wait
+ """
+ if time_value is not None:
+ if isinstance(time_value, float) and time_value > 0:
+ time.sleep(time_value)
+
+
+class UnknownArgumentException(JSONDecodeError):
+ pass
+
+
+def handle_not_received_case(message):
+ """
+ Do something if models were not found
+ TODO: report rabbit context if available
+ :param message: log error message
+ """
+ logger.error(message)
+ # currently in the debugging mode do not consume more messages
+ if running_in_local_machine():
+ failed_cases_collector.append(message)
+ # raise SystemExit
+
+
+class HandlerGetModels:
+ """
+ This one gathers the necessary data
+ """
+
+ def __init__(self,
+ logger_handler: ElkLoggingHandler = None,
+ number_of_igm_tries: int = NUMBER_OF_CGM_TRIES,
+ default_area: str = DEFAULT_AREA,
+ cgm_minio_bucket: str = EMF_OS_MINIO_BUCKET,
+ cgm_minio_prefix: str = EMF_OS_MINIO_FOLDER,
+ merge_types: str |dict = MERGE_TYPES,
+ merging_entity: str = MERGING_ENTITY,
+ sleep_between_tries: str = SLEEP_BETWEEN_TRIES,
+ elk_index_version_number: str = ELK_VERSION_INDEX):
+ """
+ :param logger_handler: attach rabbit context to it
+ :param number_of_igm_tries: max allowed tries before quitting
+ :param default_area: default merging area
+ :param cgm_minio_bucket: bucket where combined models are stored
+ :param cgm_minio_prefix: prefix of models
+ :param merge_types: the default dict consisting areas, included tsos and excluded tsos
+ :param merging_entity: the name of the merging entity
+ :param sleep_between_tries: sleep between igm requests if failed
+ :param elk_index_version_number: elastic index from where look version number
+ """
+ self.number_of_igm_tries = number_of_igm_tries
+ self.logger_handler = logger_handler
+ self.opdm_service = None
+ merge_types = merge_types or DEFAULT_MERGE_TYPES
+ if isinstance(merge_types, str):
+ merge_types = merge_types.replace("'", "\"")
+ merge_types = json.loads(merge_types)
+ self.merge_types = merge_types
+ self.merging_entity = merging_entity
+ self.cgm_minio_bucket = cgm_minio_bucket
+ self.cgm_minio_prefix = cgm_minio_prefix
+ self.sleep_between_tries = parse_duration(sleep_between_tries)
+ self.elk_index_for_version_number = elk_index_version_number
+ self.default_area = default_area
+
+ def handle(self, *args, **kwargs):
+ """
+ Checks and parses the json, gathers necessary data and stores it to CGM_Composer and passes it on
+ """
+ # Check the args: if there is a dict, json that can be converted to dict and consists a keyword
+ unnamed_args = args
+ input_data = get_payload(unnamed_args, keyword=MERGE_TYPE_KEYWORD)
+ # For debugging
+ manual_testing = False
+ if input_data is not None:
+ if self.logger_handler is not None:
+ # Pack rabbit context to elastic log handler
+ self.logger_handler.extra.update({PROCESS_ID_KEYWORD: input_data.get(PROCESS_ID_KEYWORD),
+ RUN_ID_KEYWORD: input_data.get(RUN_ID_KEYWORD),
+ JOB_ID_KEYWORD: input_data.get(JOB_ID_KEYWORD)})
+ logger.info(f"Logger was updated with process_id, run_id and job_id (under extra fields)")
+ number_of_tries = self.number_of_igm_tries
+
+ if TASK_PROPERTIES_KEYWORD in input_data and isinstance(input_data[TASK_PROPERTIES_KEYWORD], dict):
+ # Unpack the properties section
+ scenario_date = input_data[TASK_PROPERTIES_KEYWORD].get(TIMESTAMP_KEYWORD)
+ time_horizon = input_data[TASK_PROPERTIES_KEYWORD].get(TIME_HORIZON_KEYWORD)
+ merge_type = input_data[TASK_PROPERTIES_KEYWORD].get(MERGE_TYPE_KEYWORD)
+ # Get some models, allow only max tries
+ get_igms_try = 1
+ available_models, latest_boundary = None, None
+ # Extract tso and area data
+ area = self.merge_types.get(merge_type, {}).get(AREA_KEYWORD, self.default_area)
+ included_tsos = self.merge_types.get(merge_type, {}).get(INCLUDED_TSO_KEYWORD, [])
+ excluded_tsos = self.merge_types.get(merge_type, {}).get(EXCLUDED_TSO_KEYWORD, [])
+ while get_igms_try <= number_of_tries:
+ if running_in_local_machine() and manual_testing:
+ available_models, latest_boundary = get_local_models(time_horizon=time_horizon,
+ scenario_date=scenario_date,
+ download_policy=DownloadModels.MINIO,
+ use_local_files=True)
+ else:
+ available_models, latest_boundary = get_models(time_horizon=time_horizon,
+ scenario_date=scenario_date,
+ download_policy=DownloadModels.MINIO,
+ included_tsos=included_tsos,
+ excluded_tsos=excluded_tsos)
+ available_models = [model for model in available_models
+ if model.get('pmd:TSO') not in ['APG', 'SEPS', '50Hertz']]
+ if available_models and latest_boundary:
+ break
+ message = []
+ if not available_models:
+ message.append('models')
+ if not latest_boundary:
+ message.append('latest_boundary')
+ sleepy_message = f"Going to sleep {self.sleep_between_tries}"
+ logger.warning(f"Failed get {' and '.join(message)}. {sleepy_message}")
+ time.sleep(self.sleep_between_tries)
+ get_igms_try += 1
+ # If no luck report to elastic and call it a day
+ if not available_models and not latest_boundary:
+ handle_not_received_case(f"Get Models: nothing got")
+ # Get the version number
+ version_number = get_version_number(scenario_date=scenario_date,
+ time_horizon=time_horizon,
+ modeling_entity=f"{self.merging_entity}-{area}")
+ # Pack everything and pass it on
+ cgm_input = CgmModelComposer(igm_models=available_models,
+ boundary_data=latest_boundary,
+ time_horizon=time_horizon,
+ scenario_date=scenario_date,
+ area=area,
+ merging_entity=self.merging_entity,
+ rabbit_data=input_data,
+ version=version_number)
+ return cgm_input, args, kwargs
+ return args, kwargs
+
+
+class HandlerMergeModels:
+ def __init__(self,
+ publish_to_opdm: bool = PUBLISH_MERGED_MODEL_TO_OPDM,
+ publish_to_minio: bool = PUBLISH_MERGED_MODEL_TO_MINIO,
+ minio_bucket: str = EMF_OS_MINIO_BUCKET,
+ folder_in_bucket: str = EMF_OS_MINIO_FOLDER,
+ save_to_local_storage: bool = SAVE_MERGED_MODEL_TO_LOCAL_STORAGE,
+ publish_to_elastic: bool = PUBLISH_METADATA_TO_ELASTIC,
+ elk_server: str = elastic.ELK_SERVER,
+ cgm_index: str = ELK_VERSION_INDEX):
+ """
+ Initializes the handler which starts to send out merged models
+ :param publish_to_opdm: publish cgm to opdm
+ :param publish_to_minio: save cgm to minio
+ :param minio_bucket: bucket where to store models
+ :param folder_in_bucket: prefix for models
+ :param save_to_local_storage: whether to save to local storage
+ :param publish_to_elastic: save metadata to elastic
+ :param elk_server: name of the elastic server
+ :param cgm_index: index in the elastic where to send the metadata
+ """
+ self.minio_bucket = minio_bucket
+ self.folder_in_bucket = folder_in_bucket
+ self.use_folders = True
+ self.elastic_server = elk_server
+ self.cgm_index = cgm_index
+ self.save_to_local_storage = save_to_local_storage
+ self.send_to_minio = publish_to_minio
+ self.send_to_opdm = publish_to_opdm
+ self.send_to_elastic = publish_to_elastic
+
+ def handle(self, *args, **kwargs):
+ """
+ Calls the merge and posts the results
+ """
+ # check if CgmModelComposerCgmModelComposer is in args
+ args = flatten_tuple(args)
+
+ cgm_compose = None
+ # Check if CgmModelComposer is in args
+ for item in args:
+ if isinstance(item, CgmModelComposer):
+ cgm_compose = item
+ break
+ # check if CgmModelComposer is in kwargs
+ if cgm_compose is None:
+ for key in kwargs:
+ if isinstance(kwargs[key], CgmModelComposer):
+ cgm_compose = kwargs[key]
+ break
+ # If there was nothing, report and return
+ if cgm_compose is None:
+ handle_not_received_case("Merger: no inputs received")
+ logger.error(f"Pipeline failed, no dataclass present with igms")
+ return args, kwargs
+ # else merge the model and start sending it out
+ try:
+ cgm_compose.compose_cgm()
+ # Get the files
+ cgm_files = cgm_compose.cgm
+ folder_name = cgm_compose.get_folder_name()
+ # And send them out
+ if self.send_to_opdm:
+ publish_merged_model_to_opdm(cgm_files=cgm_files)
+ if self.send_to_minio:
+ save_merged_model_to_minio(cgm_files=cgm_files,
+ minio_bucket=self.minio_bucket,
+ folder_in_bucket=self.folder_in_bucket)
+ # For the future reference, store merge data to elastic
+ if self.send_to_elastic:
+ consolidated_metadata = cgm_compose.get_consolidated_metadata()
+ publish_metadata_to_elastic(metadata=consolidated_metadata,
+ cgm_index=self.cgm_index,
+ elastic_server=self.elastic_server)
+ if self.save_to_local_storage:
+ save_merged_model_to_local_storage(cgm_files=cgm_files,
+ cgm_folder_name=folder_name)
+ if running_in_local_machine():
+ succeeded_cases_collector.append(cgm_compose.get_log_message())
+ except Exception as ex_msg:
+ handle_not_received_case(f"Merger: {cgm_compose.get_log_message()} exception: {ex_msg}")
+ return args, kwargs
+
+
+if __name__ == "__main__":
+
+ logging.basicConfig(
+ format='%(levelname) -10s %(asctime) -20s %(name) -45s %(funcName) -35s %(lineno) -5d: %(message)s',
+ datefmt='%Y-%m-%d %H:%M:%S',
+ level=logging.INFO,
+ handlers=[logging.StreamHandler(sys.stdout)]
+ )
+
+ testing_time_horizon = 'ID'
+ testing_merging_type = 'CGM'
+ start_date = parse_datetime("2024-04-11T00:30:00+00:00")
+ end_date = parse_datetime("2024-04-12T00:00:00+00:00")
+
+ delta = end_date - start_date
+ delta_sec = delta.days * 24 * 3600 + delta.seconds
+ # Generate time array with 1h interval
+ time_step = 3600
+ testing_scenario_dates = [(start_date + datetime.timedelta(0, t)).isoformat()
+ for t in range(0, delta_sec, time_step)]
+
+ for testing_scenario_date in testing_scenario_dates:
+ example_input_from_rabbit = {
+ "@context": "https://example.com/task_context.jsonld",
+ "@type": "Task",
+ "@id": "urn:uuid:f9d476ec-2507-4ad2-8a37-72afdcd68bbf",
+ "process_id": "https://example.com/processes/CGM_CREATION",
+ "run_id": "https://example.com/runs/IntraDayCGM/1",
+ "job_id": "urn:uuid:00815bce-5cb5-4f45-8541-c5642680d474",
+ "task_type": "automatic",
+ "task_initiator": "some.body",
+ "task_priority": "normal",
+ "task_creation_time": "2024-04-04T06:57:51.018050",
+ "task_status": "created",
+ "task_status_trace":
+ [
+ {
+ "status": "created",
+ "timestamp": "2024-04-04T06:57:51.018050"
+ }
+ ],
+ "task_dependencies": [],
+ "task_tags": [],
+ "task_retry_count": 0,
+ "task_timeout": "PT1H",
+ "task_gate_open": "2024-04-04T04:00:00+00:00",
+ "task_gate_close": "2024-04-04T04:15:00+00:00",
+ "job_period_start": "2024-04-04T05:00:00+00:00",
+ "job_period_end": "2024-04-04T13:00:00+00:00",
+ "task_properties":
+ {
+ "timestamp_utc": testing_scenario_date,
+ "merge_type": testing_merging_type,
+ "time_horizon": testing_time_horizon
+ }
+ }
+
+ message_handlers = [HandlerGetModels(),
+ HandlerMergeModels(publish_to_opdm=False,
+ publish_to_minio=False,
+ save_to_local_storage=True)]
+ body = (example_input_from_rabbit,)
+ properties = {}
+ for message_handler in message_handlers:
+ try:
+ logger.info(f"---Handling message with {message_handler.__class__.__name__}")
+ body = message_handler.handle(body, properties=properties)
+ except Exception as ex:
+ logger.error(f"Message handling failed: {ex}")
+ if running_in_local_machine():
+ print("FAILED:")
+ print('\r\n'.join(failed_cases_collector))
+ print("SUCCESS:")
+ print('\r\n'.join(succeeded_cases_collector))
diff --git a/emf/loadflow_tool/model_merge_worker.py b/emf/loadflow_tool/model_merge_worker.py
new file mode 100644
index 0000000..78f0023
--- /dev/null
+++ b/emf/loadflow_tool/model_merge_worker.py
@@ -0,0 +1,28 @@
+import logging
+import config
+import uuid
+from emf.common.integrations import rabbit
+from emf.common.logging import custom_logger
+from emf.common.config_parser import parse_app_properties
+from emf.common.converters import opdm_metadata_to_json
+from emf.loadflow_tool.model_merge_handlers import HandlerGetModels, HandlerMergeModels
+
+# Initialize custom logger
+elk_handler = custom_logger.initialize_custom_logger(extra={'worker': 'model-merger', 'worker_uuid': str(uuid.uuid4())})
+logger = logging.getLogger(__name__)
+
+parse_app_properties(caller_globals=globals(), path=config.paths.cgm_worker.model_merge)
+
+testing = True
+
+# RabbitMQ's consumer for CGM
+
+consumer = rabbit.RMQConsumer(
+ que=RMQ_CGM_QUEUE,
+ message_converter=opdm_metadata_to_json,
+ message_handlers=[HandlerGetModels(logger_handler=elk_handler), HandlerMergeModels()])
+
+try:
+ consumer.run()
+except KeyboardInterrupt:
+ consumer.stop()
diff --git a/emf/loadflow_tool/model_merger.py b/emf/loadflow_tool/model_merger.py
new file mode 100644
index 0000000..a9a5df3
--- /dev/null
+++ b/emf/loadflow_tool/model_merger.py
@@ -0,0 +1,1217 @@
+import math
+from datetime import timedelta
+from enum import Enum
+
+import pypowsybl
+import zeep.exceptions
+
+import config
+from emf.common.config_parser import parse_app_properties
+from emf.common.integrations import minio, opdm, elastic
+from emf.common.integrations.elastic import Elastic
+from emf.common.integrations.object_storage.object_storage import query_data, get_content
+from emf.common.logging.custom_logger import SEPARATOR_SYMBOL, check_the_folder_path
+from emf.loadflow_tool.helper import (load_model, load_opdm_data, filename_from_metadata, export_model,
+ NETWORK_KEYWORD, NETWORK_META_KEYWORD, get_metadata_from_filename, attr_to_dict)
+from emf.loadflow_tool.validator import (get_local_entsoe_files, LocalFileLoaderError,
+ parse_boundary_message_type_profile, OPDE_COMPONENT_KEYWORD,
+ MODEL_MESSAGE_TYPE,
+ OPDM_PROFILE_KEYWORD, DATA_KEYWORD, validate_models)
+import logging
+import json
+from emf.loadflow_tool import loadflow_settings
+import sys
+from emf.common.integrations.opdm import OPDM
+from aniso8601 import parse_datetime
+import os
+import triplets
+import pandas
+import datetime
+from uuid import uuid4
+
+from emf.model_retriever.model_retriever import HandlerModelsToMinio, HandlerMetadataToElastic, HandlerModelsValidator
+
+# Update SSH
+
+logger = logging.getLogger(__name__)
+
+parse_app_properties(caller_globals=globals(), path=config.paths.cgm_worker.model_merge)
+
+logging.basicConfig(
+ format='%(levelname)-10s %(asctime)s.%(msecs)03d %(name)-30s %(funcName)-35s %(lineno)-5d: %(message)s',
+ datefmt='%Y-%m-%dT%H:%M:%S',
+ level=logging.INFO,
+ handlers=[logging.StreamHandler(sys.stdout)]
+)
+
+UPDATE_MAP = [
+ {
+ "from_class": "SvPowerFlow",
+ "from_ID": "Terminal.ConductingEquipment",
+ "from_attribute": "SvPowerFlow.p",
+ "to_attribute": "EnergyConsumer.p",
+ },
+ {
+ "from_class": "SvPowerFlow",
+ "from_ID": "Terminal.ConductingEquipment",
+ "from_attribute": "SvPowerFlow.q",
+ "to_attribute": "EnergyConsumer.q",
+ },
+ {
+ "from_class": "SvPowerFlow",
+ "from_ID": "Terminal.ConductingEquipment",
+ "from_attribute": "SvPowerFlow.p",
+ "to_attribute": "RotatingMachine.p",
+ },
+ {
+ "from_class": "SvPowerFlow",
+ "from_ID": "Terminal.ConductingEquipment",
+ "from_attribute": "SvPowerFlow.q",
+ "to_attribute": "RotatingMachine.q",
+ },
+ {
+ "from_class": "SvTapStep",
+ "from_ID": "SvTapStep.TapChanger",
+ "from_attribute": "SvTapStep.position",
+ "to_attribute": "TapChanger.step",
+ },
+ {
+ "from_class": "SvShuntCompensatorSections",
+ "from_ID": "SvShuntCompensatorSections.ShuntCompensator",
+ "from_attribute": "SvShuntCompensatorSections.sections",
+ "to_attribute": "ShuntCompensator.sections",
+ }
+]
+
+FILENAME_MASK = ("{scenarioTime:%Y%m%dT%H%MZ}_{processType}_"
+ "{mergingEntity}-{domain}-{forEntity}_{messageType}_{version:03d}")
+
+NAMESPACE_MAP = {
+ "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#",
+ "cim": "http://iec.ch/TC57/2013/CIM-schema-cim16#",
+ "md": "http://iec.ch/TC57/61970-552/ModelDescription/1#",
+ "entsoe": "http://entsoe.eu/CIM/SchemaExtension/3/1#",
+ # "cgmbp": "http://entsoe.eu/CIM/Extensions/CGM-BP/2020#"
+}
+RDF_MAP_JSON = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'entsoe_v2.4.15_2014-08-07.json')
+PATTERN_WITHOUT_TIMEZONE = '%Y-%m-%dT%H:%M:%S'
+
+CGM_CREATION_DATE_KEYWORD = "pmd:creationDate"
+CGM_MERGING_ENTITY_KEYWORD = "pmd:mergingEntity"
+CGM_MERGING_ENTITY = "BALTICRSC"
+CGM_VERSION_NUMBER_KEYWORD = "pmd:versionNumber"
+CGM_TIME_HORIZON_KEYWORD = 'pmd:timeHorizon'
+CGM_MERGING_AREA_KEYWORD = 'pmd:mergingArea'
+CGM_VALID_FROM_KEYWORD = 'pmd:validFrom'
+
+DEFAULT_INDEX_NAME = "emfos-logs*"
+
+# Variables used for local testing
+TIME_HORIZON = '1D'
+SCENARIO_DATE = '2024-03-14T09:30'
+DEFAULT_AREA = 'EU'
+VERSION = "104"
+PUBLISH_TO_OPDM = False
+USE_LOCAL_FILES = True
+LOCAL_FOLDER = 'TC3_T1_Conform'
+
+PROCESS_ID_KEYWORD = "process_id"
+RUN_ID_KEYWORD = 'run_id'
+JOB_ID_KEYWORD = 'job_id'
+
+FULL_PATH_KEYWORD = 'full_path'
+AREA_KEYWORD = 'AREA'
+INCLUDED_TSO_KEYWORD = 'INCLUDED'
+EXCLUDED_TSO_KEYWORD = 'EXCLUDED'
+DEFAULT_MERGE_TYPES = {'CGM': {AREA_KEYWORD: 'EU',
+ INCLUDED_TSO_KEYWORD: [],
+ EXCLUDED_TSO_KEYWORD: ['APG']},
+ 'RMM': {AREA_KEYWORD: 'BA',
+ INCLUDED_TSO_KEYWORD: ['ELERING', 'AST', 'LITGRID', 'PSE'],
+ EXCLUDED_TSO_KEYWORD: []}}
+
+LOCAL_STORAGE_LOCATION = './merged_examples/'
+
+DEFAULT_TSO = []
+
+
+class DownloadModels(Enum):
+ """
+ For determining from where to download files
+ """
+ OPDM = 1
+ MINIO = 2
+ OPDM_AND_MINIO = 3
+
+
+def load_rdf_map(file_name: str = RDF_MAP_JSON):
+ """
+ loads rdf map file
+ :param file_name: file from where to load
+ :return: rdf map
+ """
+ with open(file_name, 'r') as file_object:
+ rdf_map = json.load(file_object)
+ return rdf_map
+
+
+def check_dataframe(first_input=None, second_input=None):
+ """
+ Escapes first input if not given
+ :param first_input: first element to be checked
+ :param second_input: second element to be checked
+ :return: first_input ro second_input (for dataframes)
+ """
+ if first_input is not None and isinstance(first_input, pandas.DataFrame):
+ return first_input
+ return second_input
+
+
+def get_local_models(time_horizon: str = TIME_HORIZON,
+ scenario_date: str = SCENARIO_DATE,
+ use_local_files: bool = USE_LOCAL_FILES,
+ local_file_folder: str = LOCAL_FOLDER,
+ download_policy: DownloadModels = DownloadModels.OPDM_AND_MINIO,
+ allow_merging_entities: bool = False,
+ igm_files_needed=None,
+ opdm_client: OPDM = None):
+ """
+ For local testing only. Takes the files from the folder specified that can be passed to the next steps. Fallback
+ is set to getting the files from opdm
+ :param time_horizon: time horizon of the igms
+ :param scenario_date: the date of the scenario for which the igm was created
+ :param use_local_files: true, uses local files
+ :param local_file_folder: unique folder where to find files
+ :param download_policy: from where to download models
+ :param allow_merging_entities: true escapes already merged files
+ :param igm_files_needed: specify specific igm files needed
+ :param opdm_client: client for the opdm
+ """
+ if igm_files_needed is None:
+ igm_files_needed = ['EQ']
+ try:
+ if use_local_files:
+ available_models, latest_boundary = get_local_entsoe_files(path_to_directory=local_file_folder,
+ allow_merging_entities=allow_merging_entities,
+ igm_files_needed=igm_files_needed)
+ else:
+ raise LocalFileLoaderError
+ except FileNotFoundError:
+ logger.info(f"Getting data from OPDM")
+ available_models, latest_boundary = get_models(time_horizon=time_horizon,
+ scenario_date=scenario_date,
+ download_policy=download_policy,
+ opdm_client=opdm_client)
+ return available_models, latest_boundary
+
+
+class CgmModelType(Enum):
+ BOUNDARY = 1
+ IGM = 2
+
+
+def run_model_retriever_pipeline(opdm_models: dict | list,
+ latest_boundary: dict = None,
+ model_type: CgmModelType = CgmModelType.IGM):
+ """
+ Initializes model_retriever pipeline to download, validate and push the models to minio/elastic
+ THIS IS A HACK!!! DO NOT USE IT ANYWHERE ELSE THAN IN TESTING MODE
+ :param opdm_models: dictionary of opdm_models
+ :param latest_boundary:
+ :param model_type: specify whether the files are boundary data or igm data
+ : return updated opdm models
+ """
+ minio_handler = HandlerModelsToMinio()
+ validator_handler = HandlerModelsValidator()
+ metadata_handler = HandlerMetadataToElastic()
+ if isinstance(opdm_models, dict):
+ opdm_models = [opdm_models]
+ opdm_models = minio_handler.handle_reduced(opdm_objects=opdm_models)
+ if model_type == CgmModelType.IGM:
+ opdm_models = validator_handler.handle(opdm_objects=opdm_models, latest_boundary=latest_boundary)
+ opdm_models = metadata_handler.handle(opdm_objects=opdm_models)
+ return opdm_models
+
+
+def get_latest_boundary(opdm_client: OPDM = None, download_policy: DownloadModels = DownloadModels.OPDM_AND_MINIO):
+ """
+ Tries to get the boundary data from OPDM, if not successful, fallback to Minio and take the latest
+ Alternative would be to check depends on
+ :param opdm_client: OPDM client
+ :param download_policy: where to first download the boundary data
+ :return boundary data
+ """
+ boundary_data = None
+ if download_policy == DownloadModels.MINIO:
+ # Not the quickest way to get it
+ # boundary_data = get_boundary_data_from_minio()
+ return boundary_data
+ try:
+ opdm_client = opdm_client or OPDM()
+ boundary_data = opdm_client.get_latest_boundary()
+ # if model_retriever_pipeline:
+ # boundary_data = run_model_retriever_pipeline(opdm_models=boundary_data, model_type=CgmModelType.BOUNDARY)
+ # raise zeep.exceptions.Fault
+ except zeep.exceptions.Fault as fault:
+ logger.error(f"Could not get boundary data from OPDM: {fault}")
+ # boundary_data = get_boundary_data_from_minio()
+ # should be query_data, but for now ask it minio
+ except Exception as ex:
+ logger.error(f"Undefined exception when getting boundary data: {ex}")
+ # boundary_data = get_boundary_data_from_minio()
+ finally:
+ return boundary_data
+
+
+def get_models(time_horizon: str = TIME_HORIZON,
+ scenario_date: str = SCENARIO_DATE,
+ included_tsos: list | str = None,
+ excluded_tsos: list | str = None,
+ download_policy: DownloadModels = DownloadModels.OPDM_AND_MINIO,
+ model_retriever_pipeline: bool = False,
+ opdm_client: OPDM = None):
+ """
+ Gets models from opdm and/or minio
+ NB! Priority is given to Minio!
+ Workflow:
+ 1) Get models from opdm if selected
+ 2) Get models from minio if selected or opdm failed
+ 3) If requested from both, take data from minio and extend it from opdm
+ 4) By default get boundary from opdm
+ 5) Fallback: get boundary from minio
+ :param time_horizon: time horizon of the igms
+ :param scenario_date: the date of the scenario for which the igm was created
+ :param included_tsos: list or string of tso names, that should be included
+ :param excluded_tsos: list or string of tso names, that should be excluded
+ :param download_policy: from where to download models
+ :param model_retriever_pipeline
+ :param opdm_client: client for the opdm
+ """
+ opdm_models = None
+ minio_models = None
+ # 1 Get boundary data
+ boundary_data = get_latest_boundary(opdm_client=opdm_client, download_policy=download_policy)
+ # 1 if opdm is selected, try to download from there
+ if download_policy == DownloadModels.OPDM or download_policy == DownloadModels.OPDM_AND_MINIO:
+ opdm_models = get_models_from_opdm(time_horizon=time_horizon,
+ scenario_date=scenario_date,
+ included_tsos=included_tsos,
+ excluded_tsos=excluded_tsos,
+ opdm_client=opdm_client)
+ # Validate raw input models
+ if not model_retriever_pipeline:
+ opdm_models = validate_models(available_models=opdm_models, latest_boundary=boundary_data)
+ # 2 if minio is selected or opdm failed, download data from there
+ if download_policy == DownloadModels.MINIO or download_policy == DownloadModels.OPDM_AND_MINIO or not opdm_models:
+ minio_models = get_models_from_elastic_minio(time_horizon=time_horizon,
+ scenario_date=scenario_date,
+ included_tsos=included_tsos,
+ excluded_tsos=excluded_tsos)
+ # If getting boundary failed try to get it from the dependencies
+ if not boundary_data:
+ boundary_data = get_boundary_from_dependencies(igm_models=minio_models)
+ # If something was got from opdm, run through it model_retriever pipeline
+ if download_policy == DownloadModels.OPDM:
+ if model_retriever_pipeline and opdm_models:
+ opdm_models = run_model_retriever_pipeline(opdm_models=opdm_models)
+ igm_models = opdm_models or minio_models
+ elif download_policy == DownloadModels.MINIO:
+ igm_models = minio_models
+ else:
+ # 3. When merge is requested, give priority to minio, update it from opdm
+ igm_models = minio_models
+ existing_tso_names = [model.get('pmd:TSO') for model in minio_models]
+ if opdm_models:
+ additional_tso_models = [model for model in opdm_models if model.get('pmd:TSO') not in existing_tso_names]
+ if model_retriever_pipeline and additional_tso_models:
+ additional_tso_models = run_model_retriever_pipeline(opdm_models=additional_tso_models)
+ igm_models.extend(additional_tso_models)
+ return igm_models, boundary_data
+
+
+def filter_models_by_tsos(igm_models: list, included_tsos: list | str = None, excluded_tsos: list | str = None):
+ """
+ Filters the list of models to include or to exclude specific tsos if they are given
+ :param igm_models: list of igm models
+ :param included_tsos: list or string of tso names, if given, only matching models are returned
+ :param excluded_tsos: list or string of tso names, if given, matching models will be discarded
+ :return updated list of igms
+ """
+ if included_tsos:
+ included_tsos = [included_tsos] if isinstance(included_tsos, str) else included_tsos
+ igm_models = [model for model in igm_models if model.get('pmd:TSO') in included_tsos]
+ if excluded_tsos:
+ excluded_tsos = [excluded_tsos] if isinstance(excluded_tsos, str) else excluded_tsos
+ igm_models = [model for model in igm_models if not model.get('pmd:TSO') in excluded_tsos]
+ return igm_models
+
+
+def get_models_from_opdm(time_horizon: str,
+ scenario_date: str,
+ included_tsos: list | str = None,
+ excluded_tsos: list | str = None,
+ opdm_client: OPDM = None):
+ """
+ Gets models from opdm
+ :param time_horizon: time horizon of the igms
+ :param scenario_date: the date of the scenario for which the igm was created
+ :param included_tsos: list or string of tso names, if given, only matching models are returned
+ :param excluded_tsos: list or string of tso names, if given, matching models will be discarded
+ :param opdm_client: client for the opdm
+ :return list of models if found, None otherwise
+ """
+ available_models = None
+ try:
+ opdm_client = opdm_client or OPDM()
+ scenario_date_iso = datetime.datetime.fromisoformat(scenario_date)
+ converted_scenario_date = scenario_date_iso.strftime(PATTERN_WITHOUT_TIMEZONE)
+ received_models = opdm_client.get_latest_models_and_download(time_horizon=time_horizon,
+ scenario_date=converted_scenario_date)
+ available_models = filter_models_by_tsos(igm_models=received_models,
+ included_tsos=included_tsos,
+ excluded_tsos=excluded_tsos)
+ except zeep.exceptions.Fault as fault:
+ logger.error(f"Could not connect to OPDM: {fault}")
+ except Exception as ex:
+ logger.error(f"Unknown exception when getting data from opdm: {ex}")
+ finally:
+ return available_models
+
+
+def get_boundary_from_dependencies(igm_models: list):
+ """
+ Gets boundary data from dependencies
+ Lists all dependencies from models, filters those which are BDS, takes the latest, unpacks it, downloads files to it
+ and if everything went well then returns the result
+ :param igm_models: list of igm models
+ :return: boundary data if everything went successfully, None otherwise
+ """
+ # Get all dependencies
+ try:
+ dependencies = [model.get('opde:Dependencies', {}).get('opde:DependsOn') for model in igm_models]
+ boundaries = [dependency for dependency in dependencies
+ if dependency.get('opdm:OPDMObject', {}).get('opde:Object-Type') == 'BDS']
+ latest_date = max([parse_datetime(entry.get('opdm:OPDMObject', {}).get('pmd:scenarioDate'))
+ for entry in boundaries])
+ latest_boundaries = [boundary for boundary in boundaries
+ if
+ parse_datetime(boundary.get('opdm:OPDMObject', {}).get('pmd:scenarioDate')) == latest_date]
+ if len(latest_boundaries) > 0 and (latest_boundary_value := (latest_boundaries[0]).get('opdm:OPDMObject')):
+ latest_boundary_value = get_content(metadata=latest_boundary_value)
+ if all(profile.get('opdm:Profile', {}).get('DATA')
+ for profile in dict(latest_boundary_value).get('opde:Component', [])):
+ return latest_boundary_value
+ except ValueError:
+ logger.warning(f"Dependencies do not contain any boundary data")
+ return None
+
+
+def get_models_from_elastic_minio(time_horizon: str,
+ scenario_date: str,
+ included_tsos: list | str = None,
+ excluded_tsos: list | str = None):
+ """
+ Asks metadata from elastic, attaches files from minio
+ NB! currently only those models are returned which have files in minio
+ :param included_tsos: list or string of tso names, if given, only matching models are returned
+ :param excluded_tsos: list or string of tso names, if given, matching models will be discarded
+ :param time_horizon: the time horizon
+ :param scenario_date: the date requested
+ :return: list of models
+ """
+ query = {'pmd:scenarioDate': scenario_date, 'valid': True}
+
+ # If time horizon is not ID, query by time horizon
+ if time_horizon != 'ID':
+ query['pmd:timeHorizon'] = time_horizon
+
+ query_response = query_data(metadata_query=query, return_payload=True)
+
+ # filter out duds: igms that are missing file(s)
+ files_present = [model for model in query_response
+ if all(field.get('opdm:Profile', {}).get('DATA') for field in model.get('opde:Component', {}))]
+ query_response = files_present
+
+ # If time horizon is ID query everything and filter the smallest run ids per tso
+ # TODO check if this is valid
+ if time_horizon == 'ID':
+ logger.warning(f"Selected time horizon {time_horizon}, smallest number of the runs")
+ time_horizon = [f"{time_h:02}" for time_h in range(1, 31)]
+ query_response = [response for response in query_response if response.get("pmd:timeHorizon") in time_horizon]
+ tsos = set([model.get('pmd:TSO') for model in query_response])
+ latest_ids = []
+ for tso in tsos:
+ smallest_id = sorted([model.get('pmd:timeHorizon')
+ for model in query_response if model.get('pmd:TSO') == tso], key=lambda x: int(x))[0]
+ igms_by_id = [model for model in query_response
+ if model.get('pmd:TSO') == tso and model.get('pmd:timeHorizon') == smallest_id]
+ latest_ids.extend(igms_by_id)
+ query_response = latest_ids
+
+ # Drop duplicates: take the latest igm if there are multiple for the same scenario date and time horizon
+ latest_versions = [sorted([model for model in query_response if model.get('pmd:TSO') == tso],
+ key=lambda x: int(x.get('pmd:versionNumber')), reverse=True)[0]
+ for tso in set([model.get('pmd:TSO') for model in query_response])]
+ query_response = latest_versions
+
+ return filter_models_by_tsos(igm_models=query_response, included_tsos=included_tsos, excluded_tsos=excluded_tsos)
+
+
+def get_version_number_from_minio(minio_bucket: str = EMF_OS_MINIO_BUCKET,
+ sub_folder: str = EMF_OS_MINIO_FOLDER,
+ minio_client: minio.ObjectStorage = None,
+ scenario_date: str = f"{CGM_MERGING_ENTITY}-EU",
+ modeling_entity: str = None,
+ time_horizon: str = None):
+ """
+ Gets file list from minio, explodes it and retrieves the biggest matched version number
+ :param minio_client: if given
+ :param minio_bucket: the name of the bucket
+ :param sub_folder: prefix
+ :param scenario_date: date of the merge
+ :param modeling_entity: name of the merging entity
+ :param time_horizon: the time horizon
+ """
+ new_version_number = 1
+ try:
+ exploded_results = get_filename_dataframe_from_minio(minio_bucket=minio_bucket,
+ minio_client=minio_client,
+ sub_folder=sub_folder)
+ new_version_number = get_largest_version_from_filename_dataframe(exploded_results=exploded_results,
+ scenario_date=scenario_date,
+ time_horizon=time_horizon,
+ modeling_entity=modeling_entity)
+ except (ValueError, KeyError):
+ logger.info(f"No previous entries found, starting with version number {new_version_number:03}")
+ except Exception as ex:
+ logger.warning(f"Got minio error: {ex}, starting with version number {new_version_number:03}")
+ return f"{new_version_number:03}"
+
+
+def get_filename_dataframe_from_minio(minio_bucket: str,
+ minio_client: minio.ObjectStorage = None,
+ sub_folder: str = None):
+ """
+ Gets file list from minio bucket (prefix can be specified with sub folder) and converts to dataframe following
+ the standard naming convention (see get_metadata_from_filename for more details)
+ :param minio_client: if given
+ :param minio_bucket: the name of the bucket
+ :param sub_folder: prefix
+ """
+ minio_client = minio_client or minio.ObjectStorage()
+ if sub_folder:
+ list_of_files = minio_client.list_objects(bucket_name=minio_bucket,
+ prefix=sub_folder,
+ recursive=True)
+ else:
+ list_of_files = minio_client.list_objects(bucket_name=minio_bucket, recursive=True)
+ file_name_list = []
+ for file_name in list_of_files:
+ try:
+ # Escape prefixes
+ if not file_name.object_name.endswith(SEPARATOR_SYMBOL):
+ path_list = file_name.object_name.split(SEPARATOR_SYMBOL)
+ file_metadata = get_metadata_from_filename(path_list[-1])
+ file_metadata[FULL_PATH_KEYWORD] = file_name.object_name
+ file_name_list.append(file_metadata)
+ except ValueError:
+ continue
+ except Exception as ex:
+ logger.warning(f"Exception when parsing the filename: {ex}")
+ continue
+ exploded_results = pandas.DataFrame(file_name_list)
+ return exploded_results
+
+
+def get_boundary_data_from_minio(minio_bucket: str = 'opdm-data',
+ sub_folder: str = 'CGMES/ENTSOE/',
+ minio_client: minio.ObjectStorage = None):
+ """
+ Searches given bucket for boundary data (ENTSOE files) takes the last entries by message types
+ :param minio_bucket: bucket where to search from
+ :param sub_folder: ease the search by giving prefix
+ :param minio_client: instance on minio ObjectStorage if given
+ :return boundary data
+ """
+ minio_client = minio_client or minio.ObjectStorage()
+ boundary_value = {OPDE_COMPONENT_KEYWORD: []}
+ file_list = get_filename_dataframe_from_minio(minio_bucket=minio_bucket,
+ sub_folder=sub_folder,
+ minio_client=minio_client)
+ boundary_list = file_list[file_list['Model.modelingEntity'] == 'ENTSOE']
+ filtered = boundary_list.loc[boundary_list.groupby('Model.messageType')['Model.scenarioTime'].idxmax()]
+ # Check if input is valid
+ if len(filtered.index) != 2 or sorted(filtered['Model.messageType']) != ['EQBD', 'TPBD']:
+ return None
+ filtered_elements = filtered.to_dict('records')
+ for opdm_profile_content in filtered_elements:
+ object_name = opdm_profile_content[FULL_PATH_KEYWORD]
+ downloaded_file = minio_client.download_object(bucket_name=minio_bucket, object_name=object_name)
+ opdm_profile_content[MODEL_MESSAGE_TYPE] = parse_boundary_message_type_profile(
+ opdm_profile_content[MODEL_MESSAGE_TYPE])
+ opdm_profile_content[DATA_KEYWORD] = downloaded_file
+ opdm_profile_content.pop(FULL_PATH_KEYWORD)
+ boundary_value[OPDE_COMPONENT_KEYWORD].append({OPDM_PROFILE_KEYWORD: opdm_profile_content})
+ return boundary_value
+
+
+def get_version_number_from_elastic(index_name: str = DEFAULT_INDEX_NAME,
+ start_looking: datetime.datetime | str = datetime.datetime.today(),
+ scenario_date: str = None,
+ time_horizon: str = None,
+ modeling_entity: str = None):
+ """
+ Checks and gets the version number from elastic
+ Note that it works only if logger.info(f"Publishing {instance_file.name} to OPDM")
+ is used when publishing files to OPDM
+ :param index_name: index from where to search
+ :param start_looking: datetime instance from where to look, if not set then takes current day
+ :param scenario_date: filter the file names by scenario date
+ :param time_horizon: filter file names by time horizon
+ :param modeling_entity: filter file names by modeling entity
+ :return version number as a string
+ """
+ must_elements = []
+ query_part = {"query_string": {"default_field": "message", "query": "*Publishing* AND *to OPDM"}}
+ must_elements.append(query_part)
+ new_version_number = 1
+ if start_looking:
+ if isinstance(start_looking, datetime.datetime):
+ start_looking = start_looking.strftime("%Y-%m-%dT%H:%M:%S")
+ range_part = {"range": {"log_timestamp": {"gte": start_looking}}}
+ must_elements.append(range_part)
+ previous_cgm_query = {"bool": {"must": must_elements}}
+ try:
+ elastic_client = Elastic()
+ results = elastic_client.get_data(index=index_name,
+ query=previous_cgm_query,
+ fields=['message'])
+ if results.empty:
+ raise NoContentFromElasticException
+ # Get the filenames and explode them
+ exploded_results = (results["message"].
+ str.removesuffix(' to OPDM').
+ str.removeprefix('Publishing ').
+ map(get_metadata_from_filename).
+ apply(pandas.Series))
+ # Filter the results if needed
+ new_version_number = get_largest_version_from_filename_dataframe(exploded_results=exploded_results,
+ scenario_date=scenario_date,
+ time_horizon=time_horizon,
+ modeling_entity=modeling_entity)
+ except (NoContentFromElasticException, KeyError, ValueError):
+ logger.info(f"No previous entries found, starting with version number {new_version_number:03}")
+ except Exception as ex:
+ logger.warning(f"Got elastic error: {ex}, starting with version number {new_version_number:03}")
+ finally:
+ return f"{new_version_number:03}"
+
+
+def get_largest_version_from_filename_dataframe(exploded_results: pandas.DataFrame,
+ scenario_date: str = None,
+ time_horizon: str = None,
+ modeling_entity: str = None):
+ """
+ Searches largest version number from a dict. Optionally the dict can be filtered beforehand
+ :param exploded_results: the dictionary containing exploded filenames (used get_metadata_from_filename)
+ :param scenario_date: optionally filter filenames by scenario date
+ :param time_horizon: optionally filter filenames by time horizon
+ :param modeling_entity: optionally filter filenames by checking if modelling entity is in the field
+ :return: largest found file number or 1 if key error or not found
+ """
+ try:
+ if modeling_entity is not None:
+ exploded_results = exploded_results[exploded_results['Model.modelingEntity'].str.contains(modeling_entity)]
+ if scenario_date is not None:
+ scenario_date = f"{parse_datetime(scenario_date):%Y%m%dT%H%MZ}"
+ exploded_results = exploded_results[exploded_results['Model.scenarioTime'].str.contains(scenario_date)]
+ if time_horizon is not None:
+ exploded_results = exploded_results[exploded_results['Model.processType'].str.contains(time_horizon)]
+ # Get the largest version number and increment it by 1
+ new_version_number = max(pandas.to_numeric(exploded_results["Model.version"])) + 1
+ logger.info(f"Continuing with version number {new_version_number:03}")
+ except KeyError as key_error:
+ logger.info(f"{key_error}")
+ new_version_number = 1
+ return new_version_number
+
+
+class NoContentFromElasticException(Exception):
+ pass
+
+
+def get_version_number(scenario_date: str,
+ time_horizon: str,
+ modeling_entity: str,
+ start_looking: str | datetime.date = None,
+ use_elastic: bool = True,
+ use_minio: bool = True,
+ default_version_number='104'):
+ """
+ Gets a version number from elastic and or minio.
+ :param scenario_date: the date by which to look the version number
+ :param time_horizon: the time horizon
+ :param modeling_entity: the author of the previous versions
+ :param start_looking: can be used to cut down the elastic logs
+ :param use_elastic: search version number from elastic
+ :param use_minio: search version number from minio
+ :param default_version_number: return value if not found
+ :return largest version number from minio, elastic or default one
+ """
+ version_number = default_version_number
+ version_number_minio = None
+ version_number_elastic = None
+ if use_minio:
+ version_number_minio = get_version_number_from_minio(time_horizon=time_horizon, scenario_date=scenario_date)
+ if use_elastic:
+ if start_looking:
+ version_number_elastic = get_version_number_from_elastic(start_looking=start_looking,
+ modeling_entity=modeling_entity,
+ time_horizon=time_horizon,
+ scenario_date=scenario_date)
+ else:
+ version_number_elastic = get_version_number_from_elastic(modeling_entity=modeling_entity,
+ time_horizon=time_horizon,
+ scenario_date=scenario_date)
+ if version_number_minio and version_number_elastic:
+ version_number = version_number_minio if int(version_number_minio) > int(version_number_elastic) \
+ else version_number_elastic
+ else:
+ version_number = version_number_minio or version_number_elastic or version_number
+ return version_number
+
+
+def get_time_horizon_for_intra_day(time_horizon: str, scenario_date: str, skip_past_scenario_dates: bool = False):
+ """
+ Taken as is from previous code
+ :param time_horizon: time_horizon of the merged model
+ :param scenario_date: scenario date of the merged model
+ :param skip_past_scenario_dates: either to skip past intra day scenarios
+ :return updated time horizon value
+ """
+ if time_horizon == "ID":
+ utc_now = datetime.datetime.now(datetime.timezone.utc)
+ parsed_date = parse_datetime(scenario_date)
+ time_delta = parsed_date.replace(tzinfo=None) - utc_now.replace(tzinfo=None)
+ if skip_past_scenario_dates and utc_now > parsed_date.replace(tzinfo=datetime.timezone.utc):
+ raise IntraDayPastScenarioDateException
+ time_horizon = f"{int(time_delta.seconds / 3600) + 1 :02d}"
+ return time_horizon
+
+
+class IntraDayPastScenarioDateException(Exception):
+ pass
+
+
+class CgmModelComposer:
+ """
+ Class for gathering the data and running the merge function (copy from merge.py)
+ """
+
+ def __init__(self,
+ igm_models=None,
+ boundary_data=None,
+ version: str = VERSION,
+ time_horizon: str = TIME_HORIZON,
+ area: str = DEFAULT_AREA,
+ scenario_date: str = SCENARIO_DATE,
+ merging_entity: str = CGM_MERGING_ENTITY,
+ namespace_map=None,
+ rdf_map_loc: str = RDF_MAP_JSON,
+ rabbit_data: dict = None):
+ """
+ Constructor, note that data gathering and filtering must be done beforehand
+ This just stores and merges
+ A refactored version of merge.py
+ :param igm_models: the individual grid models of the tso's
+ :param boundary_data: the boundary data of the region
+ :param version: the version number to use for the merged model
+ :param time_horizon: the time horizon for the merge
+ :param area: the area of the merge
+ :param scenario_date: the date of the scenario
+ :param merging_entity: the author of the merged model
+ :param namespace_map:
+ :param rdf_map_loc:
+ :param rabbit_data:
+ """
+ if namespace_map is None:
+ namespace_map = NAMESPACE_MAP
+ self.igm_models = igm_models
+ if self.igm_models is None:
+ self.igm_models = []
+ self.boundary_data = boundary_data
+ self.sv_data = None
+ self.ssh_data = None
+
+ self.time_horizon = time_horizon
+ self.area = area
+ self.scenario_date = scenario_date
+
+ self._version = version
+ self.merging_entity = merging_entity
+ self._merged_model = None
+ self.merge_report = {}
+ self._opdm_data = None
+ self._opdm_object_meta = None
+ self.namespace_map = namespace_map
+ self.cgm = None
+ self.rdf_map = load_rdf_map(rdf_map_loc)
+ self.rabbit_data = rabbit_data
+
+ def get_tso_list(self):
+ return ', '.join([model.get('pmd:TSO', '') for model in self.igm_models])
+
+ def get_log_message(self):
+ return f"Merge at {self.scenario_date}, time horizon {self.time_horizon}, tsos: {self.get_tso_list()}"
+
+ @property
+ def merged_model(self):
+ """
+ Gets merged model
+ """
+ if self._merged_model is None and self.igm_models and self.boundary_data:
+ self._merged_model = load_model(self.igm_models + [self.boundary_data])
+ # Run LF
+ self.merge_report = {}
+ loadflow_report = pypowsybl.report.Reporter()
+ try:
+ loadflow_result = pypowsybl.loadflow.run_ac(network=self._merged_model[NETWORK_KEYWORD],
+ parameters=loadflow_settings.CGM_DEFAULT,
+ reporter=loadflow_report)
+ loadflow_result_dict = [attr_to_dict(island) for island in loadflow_result]
+ self.merge_report["LOADFLOW_REPORT"] = json.loads(loadflow_report.to_json())
+ self.merge_report["LOADFLOW_RESULTS"] = loadflow_result_dict
+ except pypowsybl._pypowsybl.PyPowsyblError as p_error:
+ logger.error(f"Error at calculating loadflow: {p_error}")
+ raise Exception(p_error)
+ return self._merged_model
+
+ @property
+ def opdm_data(self):
+ """
+ Gets opdm data (igm models and boundary data combined)
+ """
+ if isinstance(self._opdm_data, pandas.DataFrame):
+ return self._opdm_data
+ if self.igm_models and self.boundary_data:
+ self._opdm_data = load_opdm_data(self.igm_models + [self.boundary_data])
+ return self._opdm_data
+
+ @property
+ def opdm_object_meta(self):
+ """
+ Gets base for opdm object meta
+ """
+ if self._opdm_object_meta is None:
+ sv_id = self.merged_model[NETWORK_META_KEYWORD]['id'].split("uuid:")[-1]
+ self.time_horizon = get_time_horizon_for_intra_day(self.time_horizon, self.scenario_date)
+ self._opdm_object_meta = {'pmd:fullModel_ID': sv_id,
+ 'pmd:creationDate': f"{datetime.datetime.utcnow():%Y-%m-%dT%H:%M:%S.%fZ}",
+ 'pmd:timeHorizon': self.time_horizon,
+ 'pmd:cgmesProfile': 'SV',
+ 'pmd:contentType': 'CGMES',
+ 'pmd:modelPartReference': '',
+ 'pmd:mergingEntity': f"{self.merging_entity}",
+ 'pmd:mergingArea': self.area,
+ 'pmd:validFrom': f"{parse_datetime(self.scenario_date):%Y%m%dT%H%MZ}",
+ 'pmd:modelingAuthoritySet': 'http://www.baltic-rsc.eu/OperationalPlanning',
+ 'pmd:scenarioDate': f"{parse_datetime(self.scenario_date):%Y-%m-%dT%H:%M:00Z}",
+ 'pmd:modelid': sv_id,
+ 'pmd:description': f"""
+ {self.time_horizon}
+ pypowsybl_{pypowsybl.__version__}
+ {self.merging_entity}
+ """,
+ 'pmd:versionNumber': self.version,
+ 'file_type': "xml"}
+ return self._opdm_object_meta
+
+ @property
+ def version(self):
+ """
+ Gets version
+ """
+ return self._version
+
+ def set_sv_file(self,
+ merged_model=None,
+ opdm_object_meta=None):
+ merged_model = merged_model or self.merged_model
+ opdm_object_meta = opdm_object_meta or self.opdm_object_meta
+ export_report = pypowsybl.report.Reporter()
+ exported_model = export_model(network=merged_model[NETWORK_KEYWORD],
+ opdm_object_meta=opdm_object_meta,
+ profiles=["SV"])
+ logger.info(f"Exporting merged model to {exported_model.name}")
+ # Load SV data
+ sv_data = pandas.read_RDF([exported_model])
+ # Update SV filename
+ sv_data.set_VALUE_at_KEY(key='label', value=filename_from_metadata(opdm_object_meta))
+ # Update SV description
+ sv_data.set_VALUE_at_KEY(key='Model.description', value=opdm_object_meta['pmd:description'])
+ # Update SV created time
+ sv_data.set_VALUE_at_KEY(key='Model.created', value=opdm_object_meta['pmd:creationDate'])
+ # Update SSH Model.scenarioTime
+ sv_data.set_VALUE_at_KEY('Model.scenarioTime', opdm_object_meta['pmd:scenarioDate'])
+ # Update SV metadata
+ sv_data = triplets.cgmes_tools.update_FullModel_from_filename(sv_data)
+ self.sv_data = sv_data
+ return sv_data, opdm_object_meta
+
+ def set_ssh_files(self,
+ valid_models=None,
+ latest_boundary=None,
+ sv_data=None,
+ opdm_object_meta=None,
+ update_map=None):
+
+ valid_models = valid_models or self.igm_models
+ latest_boundary = latest_boundary or self.boundary_data
+ sv_data = check_dataframe(sv_data, self.sv_data)
+ opdm_object_meta = opdm_object_meta or self.opdm_object_meta
+ update_map = update_map or UPDATE_MAP
+
+ ssh_data = load_opdm_data(valid_models, "SSH")
+ ssh_data = triplets.cgmes_tools.update_FullModel_from_filename(ssh_data)
+
+ # Update SSH Model.scenarioTime
+ ssh_data.set_VALUE_at_KEY('Model.scenarioTime', opdm_object_meta['pmd:scenarioDate'])
+
+ # Load full original data to fix issues
+ data = load_opdm_data(valid_models + [latest_boundary])
+ terminals = data.type_tableview("Terminal")
+
+ # Update SSH data from SV
+ updated_ssh_data = ssh_data.copy()
+ for update in update_map:
+ source_data = sv_data.type_tableview(update['from_class']).reset_index(drop=True)
+
+ # Merge with terminal, if needed
+ if terminal_reference := \
+ [column_name if ".Terminal" in column_name else None for column_name in source_data.columns][0]:
+ source_data = source_data.merge(terminals, left_on=terminal_reference, right_on='ID')
+ logger.debug(f"Added Terminals to {update['from_class']}")
+
+ updated_ssh_data = updated_ssh_data.update_triplet_from_triplet(
+ source_data.rename(columns={update['from_ID']: 'ID', update['from_attribute']: update['to_attribute']})[
+ ['ID', update['to_attribute']]].set_index('ID').tableview_to_triplet(), add=False)
+
+ # Generate new UUID for updated SSH
+ updated_ssh_id_map = {}
+ for old_id in updated_ssh_data.query("KEY == 'Type' and VALUE == 'FullModel'").ID.unique():
+ new_id = str(uuid4())
+ updated_ssh_id_map[old_id] = new_id
+ logger.info(f"Assigned new UUID for updated SSH: {old_id} -> {new_id}")
+
+ # Update SSH ID-s
+ updated_ssh_data = updated_ssh_data.replace(updated_ssh_id_map)
+
+ # Update in SV SSH references
+ sv_data = sv_data.replace(updated_ssh_id_map)
+
+ # Add SSH supersedes reference to old SSH
+ ssh_supersedes_data = pandas.DataFrame(
+ [{"ID": item[1], "KEY": "Model.Supersedes", "VALUE": item[0]} for item in updated_ssh_id_map.items()])
+ ssh_supersedes_data['INSTANCE_ID'] = updated_ssh_data.query("KEY == 'Type'").merge(ssh_supersedes_data.ID)[
+ 'INSTANCE_ID']
+ updated_ssh_data = updated_ssh_data.update_triplet_from_triplet(ssh_supersedes_data)
+
+ # Update SSH metadata
+ updated_ssh_data = triplets.cgmes_tools.update_FullModel_from_dict(updated_ssh_data, {
+ "Model.version": opdm_object_meta['pmd:versionNumber'],
+ "Model.created": opdm_object_meta['pmd:creationDate'],
+ "Model.mergingEntity": opdm_object_meta['pmd:mergingEntity'],
+ "Model.domain": opdm_object_meta['pmd:mergingArea']
+ })
+ self.ssh_data = updated_ssh_data
+ self.sv_data = sv_data
+ return updated_ssh_data, sv_data
+
+ def set_cgm(self, updated_ssh_data=None,
+ sv_data=None,
+ valid_models=None,
+ latest_boundary=None,
+ opdm_object_meta=None,
+ filename_mask: str = FILENAME_MASK,
+ namespace_map=None):
+ # Update SSH filenames
+ updated_ssh_data = check_dataframe(updated_ssh_data, self.ssh_data)
+ sv_data = check_dataframe(sv_data, self.sv_data)
+ valid_models = valid_models or self.igm_models
+ latest_boundary = latest_boundary or self.boundary_data
+ opdm_object_meta = opdm_object_meta or self.opdm_object_meta
+ namespace_map = namespace_map or NAMESPACE_MAP
+ data = load_opdm_data(valid_models + [latest_boundary])
+ updated_ssh_data = triplets.cgmes_tools.update_filename_from_FullModel(updated_ssh_data,
+ filename_mask=filename_mask)
+
+ # Update SV metadata
+ sv_metadata = {"Model.version": opdm_object_meta['pmd:versionNumber'],
+ "Model.created": opdm_object_meta['pmd:creationDate']}
+ sv_data = triplets.cgmes_tools.update_FullModel_from_dict(sv_data, sv_metadata)
+
+ # Fix SV - Remove Shunt Sections for EQV Shunts
+ equiv_shunt = data.query("KEY == 'Type' and VALUE == 'EquivalentShunt'")
+ if len(equiv_shunt) > 0:
+ shunts_to_remove = sv_data.merge(sv_data.query("KEY == 'SvShuntCompensatorSections.ShuntCompensator'").
+ merge(equiv_shunt.ID,
+ left_on='VALUE',
+ right_on="ID", how='inner',
+ suffixes=('', '_EQVShunt')).ID)
+ if len(shunts_to_remove) > 0:
+ logger.warning(f'Removing invalid SvShuntCompensatorSections for EquivalentShunt')
+ sv_data = triplets.rdf_parser.remove_triplet_from_triplet(sv_data, shunts_to_remove)
+
+ # Fix SV - add missing SV Tap Steps
+ ssh_tap_steps = updated_ssh_data.query("KEY == 'TapChanger.step'")
+ sv_tap_steps = sv_data.query("KEY == 'SvTapStep.TapChanger'")
+ missing_sv_tap_steps = ssh_tap_steps.merge(sv_tap_steps[['VALUE']], left_on='ID', right_on="VALUE", how='left',
+ indicator=True, suffixes=('', '_SV')).query("_merge == 'left_only'")
+
+ tap_steps_to_be_added = []
+ sv_instance_id = sv_data.INSTANCE_ID.iloc[0]
+ for tap_changer in missing_sv_tap_steps.itertuples():
+ id_value = str(uuid4())
+ logger.warning(f'Missing SvTapStep for {tap_changer.ID}, adding SvTapStep {id_value} '
+ f'and taking tap value {tap_changer.VALUE} from SSH')
+ tap_steps_to_be_added.extend([
+ (id_value, 'Type', 'SvTapStep', sv_instance_id),
+ (id_value, 'SvTapStep.TapChanger', tap_changer.ID, sv_instance_id),
+ (id_value, 'SvTapStep.position', tap_changer.VALUE, sv_instance_id),
+ ])
+
+ sv_data = pandas.concat(
+ [sv_data, pandas.DataFrame(tap_steps_to_be_added, columns=['ID', 'KEY', 'VALUE', 'INSTANCE_ID'])],
+ ignore_index=True)
+
+ export = (pandas.concat([updated_ssh_data, sv_data], ignore_index=True).
+ export_to_cimxml(rdf_map=self.rdf_map,
+ namespace_map=namespace_map,
+ export_undefined=False,
+ export_type="xml_per_instance_zip_per_xml",
+ debug=False,
+ export_to_memory=True))
+ self.cgm = export
+ return export
+
+ def compose_cgm(self):
+ """
+ Composes the cgm
+ """
+ logger.info(f"Merging at {self.scenario_date}, "
+ f"time horizon: {self.time_horizon}, "
+ f"version: {self.version}, "
+ f"area: {self.area}, "
+ f"tsos: {', '.join([model.get('pmd:TSO') for model in self.igm_models])}")
+ self.set_sv_file()
+ self.set_ssh_files()
+ self.set_cgm()
+ return self.cgm
+
+ def get_consolidated_metadata(self, rabbit_data: dict = None, additional_fields: dict = None):
+ """
+ Combines existing metadata with rabbit data for reporting
+ NOTE! Change this
+ """
+ if not rabbit_data:
+ rabbit_data = self.rabbit_data
+ consolidated_data = self.opdm_object_meta
+ consolidated_data[PROCESS_ID_KEYWORD] = rabbit_data.get(PROCESS_ID_KEYWORD)
+ consolidated_data[RUN_ID_KEYWORD] = rabbit_data.get(RUN_ID_KEYWORD)
+ consolidated_data[JOB_ID_KEYWORD] = rabbit_data.get(JOB_ID_KEYWORD)
+ if additional_fields:
+ consolidated_data.update(additional_fields)
+ return consolidated_data
+
+ def get_folder_name(self):
+ model_date = f"{parse_datetime(self.scenario_date):%Y%m%dT%H%MZ}"
+ operator_name = '-'.join([self.merging_entity, self.area])
+ folder_name = '_'.join([model_date, self.time_horizon, operator_name, self._version])
+ return folder_name
+
+
+def save_merged_model_to_local_storage(cgm_files,
+ cgm_folder_name: str = None,
+ local_storage_location: str = LOCAL_STORAGE_LOCATION):
+ """
+ Saves merged cgm to local storage. This is meant for testing purposes only
+ :param cgm_files: list of cgm_files
+ :param cgm_folder_name: sub folder name where to gather files
+ :param local_storage_location: path to store
+ :return: None
+ """
+ if not local_storage_location:
+ return
+ if cgm_folder_name is not None:
+ local_storage_location = local_storage_location + '/' + cgm_folder_name
+ local_storage_location = check_the_folder_path(local_storage_location)
+ if not os.path.exists(local_storage_location):
+ os.makedirs(local_storage_location)
+ for cgm_file in cgm_files:
+ full_file_name = local_storage_location + cgm_file.name
+ with open(full_file_name, 'wb') as f:
+ f.write(cgm_file.getbuffer())
+
+
+def publish_merged_model_to_opdm(opdm_client: opdm.OPDM = None,
+ cgm_files: list = None):
+ """
+ Sends files to opdm
+ :param opdm_client: opdm client
+ :param cgm_files: list of files to be sent
+ :return tuple of results
+ """
+ # Post files if given
+ result = ()
+ # Send files out if given
+ if cgm_files and len(cgm_files) > 0:
+ opdm_publication_responses = []
+ for instance_file in cgm_files:
+ try:
+ opdm_client = opdm_client or opdm.OPDM()
+ logger.info(f"Publishing {instance_file.name} to OPDM")
+ file_response = opdm_client.publication_request(instance_file, "CGMES")
+ opdm_publication_responses.append({"name": instance_file.name, "response": file_response})
+ if "OperationFailure" in file_response.tag:
+ logger.error(f"Failed to publish {instance_file.name} to OPDM, OPDM OperationFailure")
+ except zeep.exceptions.Fault as fault:
+ logger.error(f"Failed to publish {instance_file.name} to OPDM, connection failure: {fault}")
+ except Exception as ex_message:
+ logger.error(f"Failed to publish {instance_file.name} to OPDM, unknown error: {ex_message}")
+ logger.info(f"Publishing results: {opdm_publication_responses}")
+ result = result + (opdm_publication_responses,)
+ return result
+
+
+def save_merged_model_to_minio(minio_bucket: str = EMF_OS_MINIO_BUCKET,
+ folder_in_bucket: str = EMF_OS_MINIO_FOLDER,
+ minio_client: minio.ObjectStorage = None,
+ time_horizon: str = None,
+ scenario_datetime: str = None,
+ merging_entity: str = None,
+ area: str = None,
+ version: str = None,
+ cgm_files: [] = None):
+ """
+ Posts cgm files to minio
+ implementation of path ///cgm_files
+ :param minio_bucket: bucket in minio
+ :param minio_client: the instance of object storage client
+ :param time_horizon: time_horizon for the path tree
+ :param scenario_datetime: scenario_date for the path tree
+ :param merging_entity: the entity responsible for the merge
+ :param version: the version number
+ :param area: the area where the merge was done
+ :param cgm_files: list of individual cgm files
+ :param folder_in_bucket: general folder (prefix) in bucket where
+ :return: file name and link to file, the link to the file
+ """
+ links_to_file = {}
+ minio_client = minio_client or minio.ObjectStorage()
+ if cgm_files is not None:
+ # check if the given bucket exists
+ if not minio_client.client.bucket_exists(bucket_name=minio_bucket):
+ logger.warning(f"{minio_bucket} does not exist")
+ return links_to_file
+ for cgm_file in cgm_files:
+ file_name = cgm_file.name
+ file_name_exploded = get_metadata_from_filename(file_name)
+ time_horizon = time_horizon or file_name_exploded.get("Model.processType", '')
+ # TODO Keep intra day merged model in one folder?
+ file_scenario_datetime = scenario_datetime or file_name_exploded.get("Model.scenarioTime", None)
+ if file_scenario_datetime:
+ file_scenario_datetime = parse_datetime(file_scenario_datetime)
+ merging_entity = merging_entity or file_name_exploded.get("Model.mergingEntity", '')
+ area = area or file_name_exploded.get("Model.domain", '')
+ version = version or file_name_exploded.get("Model.version")
+ scenario_date = ''
+ scenario_time = ''
+ if file_scenario_datetime:
+ scenario_date = f"{file_scenario_datetime:%Y%m%d}"
+ scenario_time = f"{file_scenario_datetime:%H%M00}"
+ file_type = file_name_exploded.get("Model.messageType")
+ file_path_elements = [folder_in_bucket, time_horizon, merging_entity, area,
+ scenario_date, scenario_time, version, file_type, cgm_file.name]
+ full_file_name = SEPARATOR_SYMBOL.join(file_path_elements)
+ full_file_name = full_file_name.replace('//', '/')
+ cgm_file.name = full_file_name
+ minio_client.upload_object(file_path_or_file_object=cgm_file, bucket_name=minio_bucket)
+ time_to_expire = timedelta(days=7)
+ link_to_file = minio_client.client.get_presigned_url(method="GET",
+ bucket_name=minio_bucket,
+ object_name=cgm_file.name,
+ expires=time_to_expire)
+ cgm_file.name = file_name
+ links_to_file[file_name] = link_to_file
+ return links_to_file
+
+
+def publish_metadata_to_elastic(cgm_index: str, metadata: dict, elastic_server: str = elastic.ELK_SERVER):
+ """
+ Publishes metadata to elastic
+ :param cgm_index: table name
+ :param metadata: metadata information
+ :param elastic_server: address to elastic server
+ :return response
+ """
+ if metadata:
+ response = elastic.Elastic.send_to_elastic(index=cgm_index,
+ json_message=metadata,
+ server=elastic_server)
+ return response
+
+
+if __name__ == '__main__':
+ # Run the entire pipeline in functions
+ logging.basicConfig(
+ format='%(levelname) -10s %(asctime) -20s %(name) -45s %(funcName) -35s %(lineno) -5d: %(message)s',
+ datefmt='%Y-%m-%d %H:%M:%S',
+ level=logging.INFO,
+ handlers=[logging.StreamHandler(sys.stdout)]
+ )
+ # testing_time_horizon = 'ID'
+ testing_time_horizon = '1D'
+ # testing_scenario_date = "2024-04-05T08:30:00+00:00"
+ # testing_scenario_date = "2024-04-12T22:30:00+00:00"
+ # testing_scenario_date = "2024-04-12T21:30:00+00:00"
+ # testing_scenario_date = "2024-04-11T21:30:00+00:00"
+ # testing_scenario_date = "2024-04-12T03:30:00+00:00"
+ testing_scenario_date = "2024-04-11T11:30:00+00:00"
+ testing_area = 'EU'
+ take_data_from_local = False
+ testing_merging_entity = MERGING_ENTITY
+
+ wanted_tsos = []
+ unwanted_tsos = ['APG', '50Hertz', 'SEPS']
+
+ if take_data_from_local:
+ folder_to_study = 'test_case'
+ igm_model_data, latest_boundary_data = get_local_entsoe_files(path_to_directory=folder_to_study,
+ allow_merging_entities=False,
+ igm_files_needed=['EQ'])
+ else:
+
+ igm_model_data, latest_boundary_data = get_models(time_horizon=testing_time_horizon,
+ scenario_date=testing_scenario_date,
+ included_tsos=wanted_tsos,
+ excluded_tsos=unwanted_tsos,
+ download_policy=DownloadModels.OPDM_AND_MINIO)
+ test_version_number = get_version_number(scenario_date=testing_scenario_date,
+ time_horizon=testing_time_horizon,
+ modeling_entity=f"{testing_merging_entity}-{testing_area}")
+
+ if not igm_model_data or not latest_boundary_data:
+ logger.error(f"Terminating")
+ sys.exit()
+ cgm_input = CgmModelComposer(igm_models=igm_model_data,
+ boundary_data=latest_boundary_data,
+ time_horizon=testing_time_horizon,
+ scenario_date=testing_scenario_date,
+ area=testing_area,
+ merging_entity=testing_merging_entity,
+ version=test_version_number)
+ cgm = cgm_input.compose_cgm()
+ test_folder_name = cgm_input.get_folder_name()
+ save_merged_model_to_local_storage(cgm_files=cgm, cgm_folder_name=test_folder_name)
+ # save_merged_model_to_minio(cgm_files=cgm)
+ # publish_merged_model_to_opdm(cgm_files=cgm)
diff --git a/emf/loadflow_tool/validator.py b/emf/loadflow_tool/validator.py
index 3b2c255..f7a1fbd 100644
--- a/emf/loadflow_tool/validator.py
+++ b/emf/loadflow_tool/validator.py
@@ -1,12 +1,23 @@
-import pypowsybl
+import os.path
+import shutil
+import zipfile
+from enum import Enum
+from io import BytesIO
+from os import listdir
+from os.path import join
+from zipfile import ZipFile
+import ntpath
+
import logging
-import json
import time
import math
+
+import requests
+
import config
+from emf.common.logging.custom_logger import PyPowsyblLogGatherer, PyPowsyblLogReportingPolicy, check_the_folder_path
from emf.loadflow_tool.loadflow_settings import *
-from emf.loadflow_tool.helper import attr_to_dict, load_model
-from emf.common.logging import custom_logger
+from emf.loadflow_tool.helper import attr_to_dict, load_model, metadata_from_filename
from emf.common.config_parser import parse_app_properties
from emf.common.integrations import elastic
@@ -19,6 +30,73 @@
# TODO - record AC NP and DC Flows to metadata storage (and more), this is useful for replacement logic and scaling
# note - multiple islands wo load or generation can be an issue
+ENTSOE_FOLDER = './path_to_ENTSOE_zip/TestConfigurations_packageCASv2.0'
+
+TSO_KEYWORD = 'pmd:TSO'
+DATA_KEYWORD = 'DATA'
+FILENAME_KEYWORD = 'pmd:fileName'
+CGMES_PROFILE = 'pmd:cgmesProfile'
+MODEL_PART_REFERENCE = 'pmd:modelPartReference'
+MODEL_MESSAGE_TYPE = 'Model.messageType'
+XML_KEYWORD = '.xml'
+ZIP_KEYWORD = '.zip'
+MODEL_MODELING_ENTITY = 'Model.modelingEntity'
+MODEL_MERGING_ENTITY = 'Model.mergingEntity'
+PMD_MERGING_ENTITY = 'pmd:mergingEntity'
+MODEL_DOMAIN = 'Model.domain'
+PMD_MERGING_AREA = 'pmd:mergingArea'
+MODEL_FOR_ENTITY = 'Model.forEntity'
+OPDE_COMPONENT_KEYWORD = 'opde:Component'
+OPDM_PROFILE_KEYWORD = 'opdm:Profile'
+MISSING_TSO_NAME = 'UnknownTSO'
+LONG_FILENAME_SUFFIX = u"\\\\?\\"
+
+VALIDATION_STATUS_KEYWORD = 'VALIDATION_STATUS'
+VALID_KEYWORD = 'valid'
+VALIDATION_DURATION_KEYWORD = 'validation_duration_s'
+LOADFLOW_RESULTS_KEYWORD = 'loadflow_results'
+
+PREFERRED_FILE_TYPES = [XML_KEYWORD, ZIP_KEYWORD]
+IGM_FILE_TYPES = ['_EQ_', '_TP_', '_SV_', '_SSH_']
+BOUNDARY_FILE_TYPES = ['_EQBD_', '_TPBD_', '_EQ_BD_', '_TP_BD_']
+BOUNDARY_FILE_TYPE_FIX = {'_EQ_BD_': '_EQBD_', '_TP_BD_': '_TPBD_'}
+SPECIAL_TSO_NAME = ['ENTSO-E']
+
+"""Mapper for elements of the file name to igm profile"""
+IGM_FILENAME_MAPPING_TO_OPDM = {FILENAME_KEYWORD: FILENAME_KEYWORD,
+ 'Model.scenarioTime': 'pmd:scenarioDate',
+ 'Model.processType': 'pmd:timeHorizon',
+ MODEL_MODELING_ENTITY: MODEL_PART_REFERENCE,
+ MODEL_MESSAGE_TYPE: CGMES_PROFILE,
+ 'Model.version': 'pmd:versionNumber'}
+
+"""Mapper for the elements of the file name to boundary profile"""
+BOUNDARY_FILENAME_MAPPING_TO_OPDM = {FILENAME_KEYWORD: FILENAME_KEYWORD,
+ 'Model.scenarioTime': 'pmd:scenarioDate',
+ MODEL_MODELING_ENTITY: MODEL_PART_REFERENCE,
+ MODEL_MESSAGE_TYPE: CGMES_PROFILE,
+ 'Model.version': 'pmd:versionNumber'}
+SYSTEM_SPECIFIC_FOLDERS = ['__MACOSX']
+UNWANTED_FILE_TYPES = ['.xlsx', '.docx', '.pptx']
+RECURSION_LIMIT = 2
+USE_ROOT = False # extracts to root, not to folder specified to zip. Note that some zip examples may not work!
+
+
+class LocalInputType(Enum):
+ """
+ Enum for different data loads (igm and boundary)
+ """
+ BOUNDARY = 'boundary',
+ IGM = 'igm'
+ UNDEFINED = 'undefined'
+
+
+class LocalFileLoaderError(FileNotFoundError):
+ """
+ For throwing when errors occur during the process of loading local files
+ """
+ pass
+
def validate_model(opdm_objects, loadflow_parameters=CGM_RELAXED_2, run_element_validations=True):
# Load data
@@ -38,7 +116,9 @@ def validate_model(opdm_objects, loadflow_parameters=CGM_RELAXED_2, run_element_
logger.info(f"Running validation: {validation_type}")
try:
# TODO figure out how to store full validation results if needed. Currently only status is taken
- model_data["validations"][validation] = pypowsybl.loadflow.run_validation(network=network, validation_types=[validation_type])._valid.__bool__()
+ model_data["validations"][validation] = pypowsybl.loadflow.run_validation(network=network,
+ validation_types=[
+ validation_type])._valid.__bool__()
except Exception as error:
logger.error(f"Failed {validation_type} validation with error: {error}")
continue
@@ -50,14 +130,14 @@ def validate_model(opdm_objects, loadflow_parameters=CGM_RELAXED_2, run_element_
parameters=loadflow_parameters,
reporter=loadflow_report)
-
# Parsing loadflow results
# TODO move sanitization to Elastic integration
loadflow_result_dict = {}
for island in loadflow_result:
island_results = attr_to_dict(island)
island_results['status'] = island_results.get('status').name
- island_results['distributed_active_power'] = 0.0 if math.isnan(island_results['distributed_active_power']) else island_results['distributed_active_power']
+ island_results['distributed_active_power'] = 0.0 if math.isnan(island_results['distributed_active_power']) else \
+ island_results['distributed_active_power']
loadflow_result_dict[f"component_{island.connected_component_num}"] = island_results
model_data["loadflow_results"] = loadflow_result_dict
# model_data["loadflow_report"] = json.loads(loadflow_report.to_json())
@@ -67,10 +147,11 @@ def validate_model(opdm_objects, loadflow_parameters=CGM_RELAXED_2, run_element_
# TODO check only main island component 0?
model_valid = any([True if val["status"] == "CONVERGED" else False for key, val in loadflow_result_dict.items()])
model_data["valid"] = model_valid
- model_data["validation_duration_s"] = time.time() - start_time
+ model_data["validation_duration_s"] = round(time.time() - start_time, 3)
+ logger.info(f"Load flow validation status: {model_valid} [duration {model_data['validation_duration_s']}s]")
# Pop out pypowsybl network object
- model_data.pop('network')
+ # model_data.pop('network')
# Send validation data to Elastic
try:
@@ -81,41 +162,820 @@ def validate_model(opdm_objects, loadflow_parameters=CGM_RELAXED_2, run_element_
return model_data
+def validate_models(available_models: list = None, latest_boundary: list = None):
+ """
+ Validates the raw output from the opdm
+ :param available_models: list of igm models
+ :param latest_boundary: dictionary containing the boundary data
+ :return list of validated models
+ """
+ valid_models = []
+ invalid_models = []
+ # Validate models
+ if not available_models or not latest_boundary:
+ logger.error(f"Missing input data")
+ return valid_models
+ for model in available_models:
+
+ try:
+ response = validate_model([model, latest_boundary])
+ model[VALIDATION_STATUS_KEYWORD] = response
+ if response[VALID_KEYWORD]:
+ valid_models.append(model)
+ else:
+ invalid_models.append(model)
+ except:
+ invalid_models.append(model)
+ logger.error("Validation failed")
+ return valid_models
+
+
+"""-----------------CONTENT RELATED TO LOADING DATA FROM LOCAL STORAGE-----------------------------------------------"""
+
+
+def read_in_zip_file(zip_file_path: str, file_types: [] = None) -> {}:
+ """
+ Reads in files from the given zip file
+ :param zip_file_path: path to the zip file (relative or absolute)
+ :param file_types: list of file types
+ :return: dictionary with file names as keys and file contents as values
+ """
+ content = {}
+ with ZipFile(zip_file_path, 'r') as zip_file:
+ for file_name in zip_file.namelist():
+ if file_types is None or any([file_keyword in file_name for file_keyword in file_types]):
+ logger.info(f"Reading {file_name} from {zip_file_path}")
+ content[file_name] = zip_file.read(file_name)
+ return content
+
+
+def read_in_xml_file(xml_file_path: str, file_types: [] = None) -> {}:
+ """
+ Reads in data from the given xml file
+ :param xml_file_path: path to the xml file (relative or absolute)
+ :param file_types: list of file types
+ :return: dictionary with file names as keys and file contents as values
+ """
+ content = {}
+ file_name = os.path.basename(xml_file_path)
+ if file_types is None or any([file_keyword in file_name for file_keyword in file_types]):
+ logger.info(f"Reading {file_name}")
+ with open(xml_file_path, 'r', encoding='utf8') as file_content:
+ content[file_name] = file_content.read()
+ return content
+
+
+def save_content_to_zip_file(content: {}):
+ """
+ Saves content to zip file (in memory)
+ :param content: the content of zip file (key: file name, value: file content)
+ :return: byte array
+ """
+ output_object = BytesIO()
+ with ZipFile(output_object, "w") as output_zip:
+ if content:
+ for file_name in content:
+ logger.info(f"Converting {file_name} to zip container")
+ output_zip.writestr(file_name, content[file_name])
+ output_object.seek(0)
+ return output_object.getvalue()
+
+
+def parse_boundary_message_type_profile(message_type_value: str) -> str:
+ """
+ Slices the 4-letter string to add _ in the middle: 'EQBD' to 'EQ_BD'
+ :param message_type_value: input string
+ :return: updated string if it was 4 chars long
+ """
+ if len(message_type_value) == 4:
+ return message_type_value[:2] + '_' + message_type_value[2:]
+ return message_type_value
+
+
+def map_meta_dict_to_dict(input_dict: {}, meta_dict: {}, key_dict: {}) -> {}:
+ """
+ Maps values from meta_dict to input dict based on key value pairs from key_dict
+ input_dict[key_dict[key]] = meta_dict[key]
+ :param input_dict: input and output dictionary (OPDM profile)
+ :param meta_dict: metadata (parameters from file name)
+ :param key_dict: mapper, values are keys for input dict, keys are keys for meta dict
+ :return: updated input_dict
+ """
+ if meta_dict != {} and key_dict != {}:
+ for key in meta_dict.keys():
+ if key in key_dict:
+ input_dict[key_dict[key]] = meta_dict[key]
+ return input_dict
+
+
+def get_meta_from_filename(file_name: str):
+ """
+ Extends the 'get_metadata_from_filename(file_name)' from helper by adding file name to metadata dictionary
+ :param file_name: file name to be parsed
+ :return: dictionary with metadata
+ """
+ try:
+ fixed_file_name = file_name
+ for key in BOUNDARY_FILE_TYPE_FIX:
+ if key in fixed_file_name:
+ fixed_file_name = fixed_file_name.replace(key, BOUNDARY_FILE_TYPE_FIX[key])
+ # meta_data = get_metadata_from_filename(fixed_file_name)
+ meta_data = metadata_from_filename(fixed_file_name)
+ # Revert back cases where there is a '-' in TSO's name like ENTSO-E
+ for case in SPECIAL_TSO_NAME:
+ if case in fixed_file_name:
+ meta_data[MODEL_PART_REFERENCE] = case
+ if "-".join([meta_data.get(PMD_MERGING_ENTITY,''), meta_data.get(PMD_MERGING_AREA, '')]) == case:
+ meta_data[PMD_MERGING_ENTITY] = None
+ meta_data[PMD_MERGING_AREA] = None
+ break
+ except ValueError as err:
+ logger.warning(f"Unable to parse file name: {err}, trying to salvage")
+ meta_data = salvage_data_from_file_name(file_name=file_name)
+ meta_data[FILENAME_KEYWORD] = file_name
+ return meta_data
+
+
+def salvage_data_from_file_name(file_name: str):
+ """
+ Function to try to extract something from the file name
+ param file_name: name of the file as string
+ return dictionary with metadata
+ """
+ meta_data = {}
+ for element in IGM_FILE_TYPES:
+ if element in file_name:
+ meta_data[MODEL_MESSAGE_TYPE] = element.replace("_", "")
+ return meta_data
+
+
+def load_data(file_name: str, file_types: list = None):
+ """
+ Loads data from given file.
+ :param file_name: file from where to load (with relative or absolute path)
+ :param file_types: list of file types
+ :return: dictionary with filenames as keys, contents as values, if something was found, none otherwise
+ """
+ data = None
+ if zipfile.is_zipfile(file_name):
+ data = read_in_zip_file(file_name, file_types)
+ elif file_name.endswith(XML_KEYWORD):
+ data = read_in_xml_file(file_name, file_types)
+ return data
+
+
+def get_one_set_of_igms_from_local_storage(file_names: [], tso_name: str = None, file_types: [] = None):
+ """
+ Loads igm data from local storage.
+ :param file_names: list of file names
+ :param tso_name: the name of the tso if given
+ :param file_types: list of file types
+ :return: dictionary that wants to be similar to OPDM profile
+ """
+ igm_value = {OPDE_COMPONENT_KEYWORD: []}
+ if tso_name is not None:
+ igm_value[TSO_KEYWORD] = tso_name
+ for file_name in file_names:
+ if (data := load_data(file_name, file_types)) is None:
+ continue
+ meta_for_data = {key: get_meta_from_filename(key) for key in data.keys()}
+ for datum in data:
+ if MODEL_MODELING_ENTITY in meta_for_data[datum] and TSO_KEYWORD not in igm_value:
+ igm_value[TSO_KEYWORD] = meta_for_data[datum][MODEL_MODELING_ENTITY]
+ opdm_profile_content = meta_for_data[datum]
+ # opdm_profile_content = map_meta_dict_to_dict(input_dict={},
+ # meta_dict=meta_for_data[datum],
+ # key_dict=IGM_FILENAME_MAPPING_TO_OPDM)
+ opdm_profile_content[DATA_KEYWORD] = save_content_to_zip_file({datum: data[datum]})
+ igm_value[OPDE_COMPONENT_KEYWORD].append({OPDM_PROFILE_KEYWORD: opdm_profile_content})
+ return igm_value
+
+
+def get_one_set_of_boundaries_from_local_storage(file_names: [], file_types: [] = None):
+ """
+ Loads boundary data from local storage.
+ :param file_names: list of file names
+ :param file_types: list of file types
+ :return: dictionary that wants to be similar to OPDM profile
+ """
+ boundary_value = {OPDE_COMPONENT_KEYWORD: []}
+ for file_name in file_names:
+ if (data := load_data(file_name, file_types)) is None:
+ continue
+ meta_for_data = {key: get_meta_from_filename(key) for key in data.keys()}
+ for datum in data:
+ if MODEL_MESSAGE_TYPE in meta_for_data:
+ meta_for_data[MODEL_MESSAGE_TYPE] = parse_boundary_message_type_profile(
+ meta_for_data[MODEL_MESSAGE_TYPE])
+ elif CGMES_PROFILE in meta_for_data:
+ meta_for_data[CGMES_PROFILE] = parse_boundary_message_type_profile(meta_for_data[CGMES_PROFILE])
+ opdm_profile_content = meta_for_data[datum]
+ # opdm_profile_content = map_meta_dict_to_dict(input_dict={},
+ # meta_dict=meta_for_data[datum],
+ # key_dict=BOUNDARY_FILENAME_MAPPING_TO_OPDM)
+ opdm_profile_content[DATA_KEYWORD] = save_content_to_zip_file({datum: data[datum]})
+ boundary_value[OPDE_COMPONENT_KEYWORD].append({OPDM_PROFILE_KEYWORD: opdm_profile_content})
+ return boundary_value
+
+
+def get_zip_file_list_from_dir(path_to_dir: str):
+ """
+ Lists names of zip files from the given directory
+ :param path_to_dir: search directory
+ :return: list of file names
+ """
+ file_list = [join(path_to_dir, file_name)
+ for file_name in listdir(path_to_dir)
+ if zipfile.is_zipfile(join(path_to_dir, file_name))]
+ return file_list
+
+
+def get_xml_file_list_from_dir(path_to_dir: str):
+ """
+ Lists names of .xml files from the given directory
+ :param path_to_dir: search directory
+ :return: list of file names
+ """
+ file_list = [join(path_to_dir, file_name)
+ for file_name in listdir(path_to_dir)
+ if file_name.endswith(XML_KEYWORD)]
+ return file_list
+
+
+def get_list_of_content_files(paths: str | list) -> []:
+ """
+ Gets list of file names of interest (.zip, .xml) from the given path or paths (list or string)
+ :param paths: either directory (get multiple files) or single file name
+ :return: list of file names
+ """
+ path_list = paths
+ if isinstance(paths, str):
+ path_list = [paths]
+ list_of_files = []
+ for element in path_list:
+ if os.path.isdir(element):
+ zip_files = get_zip_file_list_from_dir(element)
+ xml_files = get_xml_file_list_from_dir(element)
+ list_of_files.extend(zip_files)
+ list_of_files.extend(xml_files)
+ elif os.path.isfile(element) and (zipfile.is_zipfile(element) or element.endswith(XML_KEYWORD)):
+ list_of_files.append(element)
+ else:
+ logger.error(f"{element} is not a path nor a .xml or a .zip file")
+ raise LocalFileLoaderError
+ return list_of_files
+
+
+def get_data_from_files(file_locations: list | str | dict,
+ get_type: LocalInputType = LocalInputType.IGM,
+ file_keywords: list = None):
+ """
+ Extracts and parses data to necessary profile
+ :param file_locations: list of files or their locations, one element per tso
+ :param get_type: type of data to be extracted
+ :param file_keywords: list of identifiers that are in file names that should be loaded
+ :return: dictionary wanting to be similar to opdm profile
+ """
+ all_models = []
+ tso_counter = 1
+ if isinstance(file_locations, str):
+ file_locations = [file_locations]
+ if isinstance(file_locations, dict):
+ for element in file_locations:
+ file_set = get_list_of_content_files(file_locations[element])
+ if get_type is LocalInputType.BOUNDARY:
+ all_models.append(get_one_set_of_boundaries_from_local_storage(file_names=file_set,
+ file_types=file_keywords))
+ else:
+ all_models.append(get_one_set_of_igms_from_local_storage(file_names=file_set,
+ tso_name=element,
+ file_types=file_keywords))
+ elif isinstance(file_locations, list):
+ for element in file_locations:
+ file_set = get_list_of_content_files(element)
+ if get_type is LocalInputType.BOUNDARY:
+ all_models.append(get_one_set_of_boundaries_from_local_storage(file_names=file_set,
+ file_types=file_keywords))
+ else:
+ igm_value = get_one_set_of_igms_from_local_storage(file_names=file_set,
+ file_types=file_keywords)
+ if TSO_KEYWORD not in igm_value:
+ tso_name = f"{MISSING_TSO_NAME}-{tso_counter}"
+ tso_counter += 1
+ logger.warning(f"TSO name not found assigning default name as {tso_name}")
+ igm_value[TSO_KEYWORD] = tso_name
+ all_models.append(igm_value)
+ else:
+ logger.error(f"Unsupported input")
+ raise LocalFileLoaderError
+
+ return all_models
+
+
+def filter_file_list_by_file_keywords(file_list: list | str | dict, file_keywords: list = None):
+ """
+ Ables to filter the file list by file identifying keywords ('TP', 'SSH', 'EQ', 'SV')
+ :param file_list: list of file names
+ :param file_keywords: list of file identifiers
+ :return updated file list if file_keywords was provided, file_list otherwise
+ """
+ if file_keywords is None:
+ return file_list
+ new_file_list = []
+ for file_name in file_list:
+ if any([file_keyword in file_name for file_keyword in file_keywords]):
+ new_file_list.append(file_name)
+ return new_file_list
+
+
+def get_local_igm_data(file_locations: list | str | dict, file_keywords: list = None):
+ """
+ Call this with a list of files/directories to load igm data
+ :param file_locations: list of files or their locations, one element per tso
+ :param file_keywords: list of identifiers that are in file names that should be loaded
+ :return: dictionary wanting to be similar to opdm profile if something useful was found
+ """
+ output = get_data_from_files(file_locations=file_locations,
+ get_type=LocalInputType.IGM,
+ file_keywords=file_keywords)
+ if len(output) == 0:
+ logger.error(f"Data for igms were not valid, no igms were extracted")
+ raise LocalFileLoaderError
+ return output
+
+
+def get_local_boundary_data(file_locations: list | str, file_keywords: list = None):
+ """
+ Call this with a list of files/directories to load boundary data
+ :param file_locations: list of files or their locations, one element per tso
+ :param file_keywords: list of identifiers that are in file names that should be loaded
+ :return: dictionary wanting to be similar to opdm profile if something useful was found
+ """
+ boundaries = get_data_from_files(file_locations=file_locations,
+ get_type=LocalInputType.BOUNDARY,
+ file_keywords=file_keywords)
+ try:
+ return boundaries[0]
+ except IndexError:
+ logger.error(f"Data for boundaries were not valid, no boundaries were extracted")
+ raise LocalFileLoaderError
+
+
+def get_local_files():
+ """
+ This is just an example
+ Input is a list or dictionary (tso name: path(s) to tso igm files) of elements when there are more than one
+ TSO, boundary, otherwise it can be a single string entry.
+ For each element in the list of inputs, the value can be single path to directory, zip file, xml file
+ or list of them
+ Note that inputs are not checked during the loading. For example if element of one TSO contains zip file
+ and directory to zip file (something like ['c:/Path_to_zip/', 'c:/Path_to_zip/zip_file.zip'])
+ then zip file (zip_file.zip) is read in twice and sent to validator (ending probably with pypowsybl error).
+ NB! if tso name is not given (input type is not dictionary), then it is extracted from the name of the first
+ file which is processed and which follows the standard described in helper.get_metadata_from_filename()
+ NB! Directories and file names used here (./path_to_data/, etc.) are for illustration purposes only.
+ To use the local files specify the paths to the data accordingly (absolute or relative path)
+ """
+ # Addresses can be relative or absolute.
+ # 1. load in by directory per tso which contains zip files
+ # igm_files = ['./path_to_data/case_1_TSO1_zip_files/',
+ # './path_to_data/case_1_TSO2_zip_files/',
+ # './path_to_data/case_1_TSO3_zip_files/']
+ # boundary_file = './path_to_data/case_1_BOUNDARY_zip_files/'
+ # 2. load in by zip files per tso
+ # igm_files = ['./path_to_data/case_2_combined/TSO1_ZIP_OF_XMLS.zip',
+ # './path_to_data/case_2_combined/TSO2_ZIP_OF_XMLS.zip',
+ # './path_to_data/case_2_combined/TSO3_ZIP_OF_XMLS.zip']
+ # boundary_file = './path_to_data/case_2_combined/BOUNDARY_ZIP_OF_XMLS.zip'
+ # 3. load in by directory per tso which stores xml files
+ # igm_files = ['./path_to_data/case_3_TSO1_xml_files/',
+ # './path_to_data/case_3_TSO2_xml_files/',
+ # './path_to_data/case_3_TSO3_xml_files/']
+ # boundary_file = './path_to_data/case_3_BOUNDARY_xml_files/'
+ # 4. Load data in as dictionary in form of TSO name: paths
+ igm_files = {'TSO1': './path_to_data/case_3_TSO1_xml_files/',
+ 'TSO2': './path_to_data/case_3_TSO2_xml_files/',
+ 'TSO3': './path_to_data/case_3_TSO3_xml_files/'}
+ boundary_file = './path_to_data/case_3_BOUNDARY_xml_files/'
+ # Get data and carry on
+ models = get_local_igm_data(igm_files, IGM_FILE_TYPES)
+ try:
+ boundary = get_local_boundary_data(boundary_file, BOUNDARY_FILE_TYPES)
+ except NameError:
+ boundary = None
+ return models, boundary
+
+
+def check_and_create_the_folder_path(folder_path: str):
+ """
+ Checks if folder path doesn't have any excessive special characters and it exists. Creates it if it does not
+ :param folder_path: input given
+ :return checked folder path
+ """
+ folder_path = check_the_folder_path(folder_path)
+ if not os.path.exists(folder_path):
+ os.makedirs(folder_path)
+ return folder_path
+
+
+def download_zip_file(url_to_zip: str, path_to_download: str = None):
+ """
+ Downloads a zip file from url.
+ Note that the file may be rather large so do it in stream
+ :param url_to_zip: url of the zip file
+ :param path_to_download: location to download the file
+ : return loaded_file_name: the path to downloaded zip file
+ """
+ loaded_file_name = url_to_zip.split('/')[-1]
+ if path_to_download is not None:
+ path_to_download = check_the_folder_path(path_to_download)
+ loaded_file_name = path_to_download + loaded_file_name
+ with requests.get(url_to_zip, stream=True) as r:
+ with open(loaded_file_name, 'wb') as f:
+ shutil.copyfileobj(r.raw, f)
+ return loaded_file_name
+
+
+def check_and_extract_zip_files_in_folder(root_folder: str,
+ files: [],
+ depth: int = 1,
+ use_root: bool = USE_ROOT,
+ max_depth: int = RECURSION_LIMIT):
+ """
+ Checks if files in folder are zip files, and extracts them recursively
+ :param root_folder: the name of the root folder
+ :param files: list of files
+ :param depth: current depth of recursion
+ :param use_root: use root folder for extraction
+ :param max_depth: max allowed recursion depth
+ """
+ root_folder = check_the_folder_path(root_folder)
+ for file_name in files:
+ full_file_name = root_folder + file_name
+ file_extension = os.path.splitext(full_file_name)[-1]
+ xml_file = os.path.splitext(full_file_name)[0] + ".xlm"
+ if file_extension == ".xlm" or xml_file in files:
+ return
+ if zipfile.is_zipfile(full_file_name) and file_extension not in UNWANTED_FILE_TYPES:
+ extract_zip_file(current_zip_file=full_file_name,
+ root_folder=root_folder,
+ use_root=use_root,
+ depth=depth + 1,
+ max_depth=max_depth)
+
+
+def extract_zip_file(current_zip_file: str,
+ root_folder: str = None,
+ use_root: bool = USE_ROOT,
+ depth: int = 1,
+ max_depth: int = RECURSION_LIMIT):
+ """
+ Extracts content of the zip file to the root.
+ :param current_zip_file: zip file to be extracted
+ :param root_folder: folder where to extract
+ :param use_root: use root folder for extraction
+ :param depth: current depth of recursion
+ :param max_depth: max allowed recursion depth
+ """
+ # Stop the recursion before going to deep
+ if depth > max_depth:
+ return
+ if root_folder is None or use_root is False:
+ root_folder = os.path.splitext(current_zip_file)[0]
+ root_folder = check_the_folder_path(root_folder)
+ logger.info(f"Extracting {current_zip_file} to {root_folder}")
+ with zipfile.ZipFile(current_zip_file, 'r') as level_one_zip_file:
+ # level_one_zip_file.extractall(path=root_folder)
+ for info in level_one_zip_file.infolist():
+ zip_file_name = info.filename
+ try:
+ level_one_zip_file.extract(zip_file_name, path=root_folder)
+ except FileNotFoundError:
+ # Workaround for extracting long file names
+ output_path = root_folder + zip_file_name
+ check_and_create_the_folder_path(os.path.dirname(output_path))
+ output_path_unicode = output_path.encode('unicode_escape').decode()
+ file_path = os.path.abspath(os.path.normpath(output_path_unicode))
+ file_path = LONG_FILENAME_SUFFIX + file_path
+ buffer_size = 16 * 1024
+ with level_one_zip_file.open(info) as f_in, open(file_path, 'wb') as f_out:
+ while True:
+ buffer = f_in.read(buffer_size)
+ if not buffer:
+ break
+ f_out.write(buffer)
+ except Exception as e:
+ logger.error(f"Uncaught exception: {e}")
+ os.remove(current_zip_file)
+ # Getting relevant paths
+ all_elements = [x for x in os.walk(root_folder)]
+ for root, folders, files in all_elements:
+ # Don't go to system specific folders or generate endless recursion
+ if any(root in system_folder for system_folder in SYSTEM_SPECIFIC_FOLDERS) or root == root_folder:
+ continue
+ check_and_extract_zip_files_in_folder(root_folder=root,
+ use_root=use_root,
+ files=files,
+ depth=depth,
+ max_depth=max_depth)
+
+
+def search_directory(root_folder: str, search_path: str):
+ """
+ Searches the search_path starting from the root_folder. Note that the requested path has to end with the search_path
+ :param root_folder: root folder from where to start looking
+ :param search_path: the part of the path to search from the root_folder
+ :return full path from root_folder to search_path if found, raise exception otherwise
+ """
+ all_folders = [check_the_folder_path(x[0]) for x in os.walk(root_folder)]
+ search_path = check_the_folder_path(search_path)
+ matches = [path_name for path_name in all_folders if str(path_name).endswith(search_path)]
+ matches_count = len(matches)
+ if matches_count == 1:
+ return matches[0]
+ elif matches_count == 0:
+ raise LocalFileLoaderError(f"{search_path} not found in {root_folder}")
+ else:
+ raise LocalFileLoaderError(f"{search_path} is too broad, found {matches_count} possible matches")
+
+
+def check_and_get_examples(path_to_search: str,
+ use_root: bool = USE_ROOT,
+ local_folder_for_examples: str = ENTSOE_EXAMPLES_LOCAL,
+ url_for_examples: str = ENTSOE_EXAMPLES_EXTERNAL,
+ recursion_depth: int = RECURSION_LIMIT):
+ """
+ Checks if examples are present if no then downloads and extracts them
+ :param local_folder_for_examples: path to the examples
+ :param use_root: use root folder for extraction
+ :param url_for_examples: path to online storage
+ :param recursion_depth: the max allowed iterations for the recursion
+ :param path_to_search: folder to search
+ """
+ file_name = url_for_examples.split('/')[-1]
+ local_folder_for_examples = check_the_folder_path(local_folder_for_examples)
+ full_file_name = local_folder_for_examples + file_name
+ # Check if folder exists, create it otherwise
+ if not os.path.exists(local_folder_for_examples):
+ os.makedirs(local_folder_for_examples)
+ try:
+ # Try to get the directory, catch error if not found
+ directory_needed = search_directory(local_folder_for_examples, path_to_search)
+ return directory_needed
+ except LocalFileLoaderError:
+ # Check if directory contains necessary file and it is zip file
+ if not os.path.isfile(full_file_name) or not zipfile.is_zipfile(full_file_name):
+ # Download the file
+ logger.info(f"Downloading examples from {url_for_examples} to {local_folder_for_examples}")
+ full_file_name = download_zip_file(url_for_examples, local_folder_for_examples)
+ # Now, there should be a zip present, extract it
+ extract_zip_file(current_zip_file=full_file_name,
+ root_folder=local_folder_for_examples,
+ use_root=use_root,
+ max_depth=recursion_depth)
+ # And try to find the necessary path
+ return search_directory(local_folder_for_examples, path_to_search)
+
+
+def group_files_by_origin(list_of_files: [], root_folder: str = None, allow_merging_entities: bool = True):
+ """
+ When input is a directory containing the .xml and .zip files for all the TSOs and boundaries as well and
+ if files follow the standard name convention, then this one sorts them by TSOs and by boundaries
+ The idea is that one tso can have only one type of file only once (e.g. one tso cannot have two 'TP' files)
+ and there is only one list of boundaries
+ :param list_of_files: list of files to divide
+ :param root_folder: root folder for relative or absolute paths
+ :param allow_merging_entities: true: allow cases like TECNET-CE-ELIA to list of models
+ :return: dictionaries for containing TSO files, boundary files
+ """
+ tso_files = {}
+ # Take assumption that we have only one boundary
+ boundaries = {}
+ igm_file_types = [file_type.replace('_', '') for file_type in IGM_FILE_TYPES]
+ boundary_file_types = [file_type.strip("_") for file_type in BOUNDARY_FILE_TYPES]
+ if root_folder is not None:
+ root_folder = check_the_folder_path(root_folder)
+ for file_name in list_of_files:
+ file_extension = os.path.splitext(file_name)[-1]
+ file_base = os.path.splitext(file_name)[0]
+ # Check if file is supported file
+ if file_extension not in PREFERRED_FILE_TYPES:
+ continue
+ # Check if file supports standard naming convention, refer to helper.get_metadata_from_filename for more details
+ file_name_meta = get_meta_from_filename(file_name)
+ if root_folder is not None:
+ file_name = root_folder + file_name
+ tso_name = file_name_meta.get(MODEL_MODELING_ENTITY) or file_name_meta.get(MODEL_PART_REFERENCE)
+ file_type_name = file_name_meta.get(MODEL_MESSAGE_TYPE) or file_name_meta.get(CGMES_PROFILE)
+ merging_entity = file_name_meta.get(PMD_MERGING_ENTITY, '') or file_name_meta.get(MODEL_MERGING_ENTITY, '')
+ merging_entity = None if merging_entity == '' else merging_entity
+ modeling_entity = file_name_meta.get(MODEL_FOR_ENTITY, '')
+ modeling_entity = None if modeling_entity == '' else modeling_entity
+ # if needed skip the cases when there is merging entity and part_reference present, didn't like to pypowsybl
+ if not allow_merging_entities and tso_name and merging_entity and tso_name not in SPECIAL_TSO_NAME:
+ continue
+ if not tso_name:
+ tso_name = modeling_entity or merging_entity
+ if tso_name and file_type_name:
+ # Handle TSOs
+ if file_type_name in igm_file_types:
+ if tso_name not in tso_files.keys():
+ tso_files[tso_name] = []
+ # Check if file without the extension is already present
+ if not any(file_base in file_listed for file_listed in tso_files[tso_name]):
+ tso_files[tso_name].append(file_name)
+ # Handle boundaries
+ elif file_type_name in boundary_file_types:
+ if tso_name not in boundaries.keys():
+ boundaries[tso_name] = []
+ # Check if file without the extension is already present
+ if not any(file_base in file_listed for file_listed in boundaries[tso_name]):
+ boundaries[tso_name].append(file_name)
+ else:
+ logger.warning(f"Names follows convention but unable to categorize it: {file_name}")
+ else:
+ logger.warning(f"Unrecognized file: {file_name}")
+ return tso_files, boundaries
+
+
+def check_model_completeness(model_data: list | dict, file_types: list | str):
+ """
+ Skips models which do not contain necessary files
+ :param model_data: models to be checked
+ :param file_types: list of file types to search
+ :return updated file list
+ """
+ checked_models = []
+ if isinstance(file_types, str):
+ file_types = [file_types]
+ if isinstance(model_data, dict):
+ model_data = [model_data]
+ for model_datum in model_data:
+ existing_types = [item[OPDM_PROFILE_KEYWORD][CGMES_PROFILE] for item in model_datum[OPDE_COMPONENT_KEYWORD]]
+ if all(file_type in existing_types for file_type in file_types):
+ checked_models.append(model_datum)
+ return checked_models
+
+
+def get_local_entsoe_files(path_to_directory: str | list,
+ allow_merging_entities: bool = True,
+ igm_files_needed: list = None,
+ boundary_files_needed: list = None):
+ """
+ Gets list of files in directory and divides them to model and boundary data
+ :param path_to_directory: path to directory from where to search
+ :param allow_merging_entities: true allow cases like TECNET-CE-ELIA to list of models
+ :param igm_files_needed: specify explicitly the file types needed (escape pypowsybl "EQ" missing error)
+ :param boundary_files_needed: specify explicitly the file types needed for boundary data
+ :return dictionary of tso files and list of boundary data
+ """
+ if isinstance(path_to_directory, str):
+ path_to_directory = [path_to_directory]
+ models = []
+ all_boundaries = []
+ boundary = None
+ for single_path in path_to_directory:
+ try:
+ full_path = check_and_get_examples(single_path)
+ except Exception as ex:
+ logger.error(f"FATAL ERROR WHEN GETTING FILES: {ex}")
+ sys.exit()
+ full_path = check_the_folder_path(full_path)
+ file_names = next(os.walk(full_path), (None, None, []))[2]
+ models_data, boundary_data = group_files_by_origin(list_of_files=file_names,
+ root_folder=full_path,
+ allow_merging_entities=allow_merging_entities)
+ if models_data:
+ models_transformed = get_local_igm_data(models_data, IGM_FILE_TYPES)
+ models.extend(models_transformed)
+ if boundary_data:
+ try:
+ boundary_transformed = get_local_boundary_data(boundary_data, BOUNDARY_FILE_TYPES)
+ except NameError:
+ boundary_transformed = None
+ if boundary_transformed is not None:
+ all_boundaries.append(boundary_transformed)
+ if len(all_boundaries) == 0:
+ logger.warning(f"No boundaries found")
+ else:
+ if len(all_boundaries) > 1:
+ logger.warning(f"Multiple boundaries detected, taking first occurrence")
+ boundary = all_boundaries[0]
+ if igm_files_needed is not None:
+ models = check_model_completeness(models, igm_files_needed)
+ if boundary_files_needed is not None:
+ boundary = check_model_completeness(boundary, boundary_files_needed)
+ return models, boundary
+
+
+"""-----------------END OF CONTENT RELATED TO LOADING DATA FROM LOCAL STORAGE----------------------------------------"""
+
# TEST
if __name__ == "__main__":
import sys
from emf.common.integrations.opdm import OPDM
+
logging.basicConfig(
format='%(levelname)-10s %(asctime)s.%(msecs)03d %(name)-30s %(funcName)-35s %(lineno)-5d: %(message)s',
datefmt='%Y-%m-%dT%H:%M:%S',
level=logging.INFO,
handlers=[logging.StreamHandler(sys.stdout)]
)
- #logging.getLogger('powsybl').setLevel(1)
-
- opdm = OPDM()
+ # logging.getLogger('powsybl').setLevel(1)
+ # Add a pypowsybl log gatherer
+ # Set up the log gatherer:
+ # topic name: currently used as a start of a file name
+ # send_it_to_elastic: send the triggered log entry to elastic (parameters are defined in custom_logger.properties)
+ # upload_to_minio: upload log file to minio (parameters are defined in custom_logger.properties)
+ # report_on_command: trigger reporting explicitly
+ # logging policy: choose according to the need. Currently:
+ # ALL_ENTRIES: gathers all log entries no matter of what
+ # ENTRIES_IF_LEVEL_REACHED: gathers all log entries when at least one entry was at least on the level specified
+ # ENTRY_ON_LEVEL: gathers only entry which was at least on the level specified
+ # ENTRIES_ON_LEVEL: gathers all entries that were at least on the level specified
+ # ENTRIES_COLLECTED_TO_LEVEL: gathers all entries to the first entry that was at least on the level specified
+ # print_to_console: propagate log to parent
+ # reporting_level: level that triggers policy
+ pypowsybl_log_gatherer = PyPowsyblLogGatherer(topic_name='IGM_validation',
+ send_to_elastic=False,
+ upload_to_minio=False,
+ report_on_command=False,
+ logging_policy=PyPowsyblLogReportingPolicy.ENTRIES_IF_LEVEL_REACHED,
+ print_to_console=False,
+ reporting_level=logging.ERROR)
- latest_boundary = opdm.get_latest_boundary()
- available_models = opdm.get_latest_models_and_download(time_horizon='1D', scenario_date="2023-08-16T09:30")#, tso="ELERING")
+ # Switch this to True if files from local storage are used
+ load_data_from_local_storage = True
+ try:
+ if load_data_from_local_storage:
+ # available_models, latest_boundary = get_local_files()
+ # Change this according the test case to be used. Note that it must reference to the end folder that will
+ # be used. Also it must be unique enough do be distinguished from other folders (for example instead of
+ # using 'Combinations' use 'TC1_T11_NonConform_L1/Combinations' etc)
+ # Some examples for
+ # https://www.entsoe.eu/Documents/CIM_documents/Grid_Model_CIM/QoCDC_v3.2.1_test_models.zip
+ folder_to_study = 'TC3_T1_Conform'
+ # folder_to_study = 'TC3_T3_Conform'
+ # folder_to_study = 'TC4_T1_Conform/Initial'
+ # Some examples for
+ # https://www.entsoe.eu/Documents/CIM_documents/Grid_Model_CIM/TestConfigurations_packageCASv2.0.zip
+ # folder_to_study = ['CGMES_v2.4.15_MicroGridTestConfiguration_T1_BE_Complete_v2',
+ # 'CGMES_v2.4.15_MicroGridTestConfiguration_T1_NL_Complete_v2',
+ # 'Type1_T1/CGMES_v2.4.15_MicroGridTestConfiguration_BD_v2']
+ # In general this function checks if the paths (path_to_directory) exist in ENTSOE_EXAMPLES_LOCAL,
+ # if not then it tries to download and extract zip from ENTSOE_EXAMPLES_EXTERNAL. If this fails or path
+ # is not still found it carries on as usual.
+ # Note that zip can be downloaded and extracted but in this case it must be extracted to the path
+ # path_to_directory: string or list, end of the path from where to load the files
+ # (starting from ENTSOE_EXAMPLES_LOCAL). Note that these must be unique enough (errors are thrown when
+ # two or more paths are found)
+ # allow_merging_entities: Whether to allow merging entities, pypowsybl validation was not happy about that
+ # igm_files_needed: in order for pypowsybl validation to work, atleast these files should be present in
+ # igm
+ available_models, latest_boundary = get_local_entsoe_files(path_to_directory=folder_to_study,
+ allow_merging_entities=False,
+ igm_files_needed=['EQ'])
+ else:
+ raise LocalFileLoaderError
+ except FileNotFoundError:
+ # if needed catch and handle LocalFileLoaderError separately
+ logger.info(f"Fetching data from external resources")
+ opdm = OPDM()
+ latest_boundary = opdm.get_latest_boundary()
+ available_models = opdm.get_latest_models_and_download(time_horizon='ID',
+ scenario_date='2024-04-05T22:30',
+ # tso='ELERING'
+ )
+ # available_models = opdm.get_latest_models_and_download(time_horizon='1D',
+ # scenario_date='2024-03-14T09:30',
+ # # tso='ELERING'
+ # )
validated_models = []
-
-
# Validate models
for model in available_models:
-
+ tso = model['pmd:TSO']
+ pypowsybl_log_gatherer.set_tso(tso)
try:
- response = validate_model([model, latest_boundary])
- model["VALIDATION_STATUS"] = response
+ if isinstance(latest_boundary, dict):
+ response = validate_model([model, latest_boundary])
+ else:
+ response = validate_model([model])
+ model[VALIDATION_STATUS_KEYWORD] = response
+ # Example for manual triggering for posting the logs. The value given must be positive:
+ log_post_trigger = model.get(VALIDATION_STATUS_KEYWORD, {}).get('valid') is False
+ # Note that this switch is governed by report_on_command in PyPowsyblLogGatherer initialization
+ pypowsybl_log_gatherer.trigger_to_report_externally(log_post_trigger)
validated_models.append(model)
-
except Exception as error:
validated_models.append(model)
- #logger.error("Validation failed", error)
-
+ logger.error(f"For {model.get('pmd:TSO')} validation failed", error)
+ pypowsybl_log_gatherer.stop_working()
# Print validation statuses
- [print(dict(tso=model['pmd:TSO'], valid=model.get('VALIDATION_STATUS', {}).get('VALID'), duration=model.get('VALIDATION_STATUS', {}).get('VALIDATION_DURATION_S'))) for model in validated_models]
+ [print(dict(tso=model['pmd:TSO'], valid=model.get('VALIDATION_STATUS', {}).get('valid'),
+ duration=model.get('VALIDATION_STATUS', {}).get('validation_duration_s'))) for model in
+ validated_models]
# With EMF IGM Validation settings
# {'tso': '50Hertz', 'valid': True, 'duration': 6.954386234283447}
@@ -128,5 +988,3 @@ def validate_model(opdm_objects, loadflow_parameters=CGM_RELAXED_2, run_element_
# {'tso': 'TTG', 'valid': True, 'duration': 5.204774856567383}
# {'tso': 'PSE', 'valid': True, 'duration': 1.555201530456543}
-
-
diff --git a/emf/model_retriever/model_retriever.py b/emf/model_retriever/model_retriever.py
index e0322c2..4c94918 100644
--- a/emf/model_retriever/model_retriever.py
+++ b/emf/model_retriever/model_retriever.py
@@ -4,10 +4,14 @@
from zipfile import ZipFile
from typing import List
import json
+
from emf.common.config_parser import parse_app_properties
from emf.common.integrations import edx, elastic, opdm, minio
from emf.common.converters import opdm_metadata_to_json
from emf.loadflow_tool.validator import validate_model
+from emf.loadflow_tool.helper import load_opdm_data
+from emf.loadflow_tool.model_statistics import get_system_metrics
+
logger = logging.getLogger(__name__)
@@ -29,10 +33,23 @@ def handle(self, opdm_objects: dict, **kwargs):
for opdm_object in opdm_objects:
# Get model from OPDM
- response = self.opdm_service.download_object(opdm_object=opdm_object)
+ self.opdm_service.download_object(opdm_object=opdm_object)
# Put all components to bytesio zip (each component to different zip)
- for component in response['opde:Component']:
+ for component in opdm_object['opde:Component']:
+
+ # Sanitize content-reference url
+ content_reference = component['opdm:Profile']['pmd:content-reference']
+ content_reference = content_reference.replace('//', '/')
+
+ # Check whether profile already exist in object storage (Minio)
+ if component['opdm:Profile']['pmd:cgmesProfile'] == "EQ": # TODO currently only for EQ
+ profile_exist = self.minio_service.object_exists(bucket_name=MINIO_BUCKET, object_name=content_reference)
+ if profile_exist:
+ logger.info(f"Profile already stored in object storage: {content_reference}")
+ continue
+
+ # Put content data into bytes object
output_object = BytesIO()
with ZipFile(output_object, "w") as component_zip:
with ZipFile(BytesIO(component['opdm:Profile']['DATA'])) as profile_zip:
@@ -41,8 +58,7 @@ def handle(self, opdm_objects: dict, **kwargs):
component_zip.writestr(file_name, profile_zip.open(file_name).read())
# Upload components to minio storage
- output_object.name = component['opdm:Profile']['pmd:content-reference']
- output_object.name = output_object.name.replace('//', '/') # sanitize double slash in url
+ output_object.name = content_reference
logger.info(f"Uploading component to object storage: {output_object.name}")
self.minio_service.upload_object(file_path_or_file_object=output_object, bucket_name=MINIO_BUCKET)
@@ -65,6 +81,55 @@ def handle(self, opdm_objects: dict, **kwargs):
return updated_opdm_objects
+ def handle_reduced(self, opdm_objects: List[dict], **kwargs):
+ # Download each OPDM object network model from OPDE
+ updated_opdm_objects = []
+ for opdm_object in opdm_objects:
+ # Put all components to bytesio zip (each component to different zip)
+ for component in opdm_object['opde:Component']:
+ # Sanitize content-reference url
+ content_reference = component['opdm:Profile']['pmd:content-reference']
+ content_reference = content_reference.replace('//', '/')
+ # Check whether profile already exist in object storage (Minio)
+ if component['opdm:Profile']['pmd:cgmesProfile'] == "EQ": # TODO currently only for EQ
+ profile_exist = self.minio_service.object_exists(bucket_name=MINIO_BUCKET, object_name=content_reference)
+ if profile_exist:
+ logger.info(f"Profile already stored in object storage: {content_reference}")
+ continue
+ # Put content data into bytes object
+ output_object = BytesIO()
+ with ZipFile(output_object, "w") as component_zip:
+ with ZipFile(BytesIO(component['opdm:Profile']['DATA'])) as profile_zip:
+ for file_name in profile_zip.namelist():
+ logger.debug(f"Adding file: {file_name}")
+ component_zip.writestr(file_name, profile_zip.open(file_name).read())
+
+ # Upload components to minio storage
+ output_object.name = content_reference
+ logger.info(f"Uploading component to object storage: {output_object.name}")
+ self.minio_service.upload_object(file_path_or_file_object=output_object, bucket_name=MINIO_BUCKET)
+ updated_opdm_objects.append(opdm_object)
+ return updated_opdm_objects
+
+
+class HandlerModelsStat:
+
+ def handle(self, opdm_objects: List[dict], **kwargs):
+ # Get the latest boundary set for validation
+ latest_boundary = self.opdm_service.get_latest_boundary() # TODO - get BDS from ELK+MINIO
+
+ # Extract statistics
+ for opdm_object in opdm_objects:
+ stat = load_opdm_data(opdm_objects=[opdm_object, latest_boundary])
+ opdm_object['total_load'] = stat['total_load']
+ opdm_object['generation'] = stat['generation']
+ opdm_object['losses'] = stat['losses']
+ opdm_object['losses_coefficient'] = stat['losses_coefficient']
+ opdm_object['acnp'] = stat['tieflow_acnp']['EquivalentInjection.p']
+ opdm_object['hvdc'] = {key: value['EquivalentInjection.p'] for key, value in stat["tieflow_hvdc"].items()}
+
+ return opdm_objects
+
class HandlerModelsValidator:
@@ -73,14 +138,14 @@ def __init__(self):
def handle(self, opdm_objects: List[dict], **kwargs):
# Get the latest boundary set for validation
- latest_boundary = self.opdm_service.get_latest_boundary()
+ latest_boundary = kwargs.get('latest_boundary')
+ if not latest_boundary:
+ latest_boundary = self.opdm_service.get_latest_boundary()
# Run network model validation
for opdm_object in opdm_objects:
response = validate_model(opdm_objects=[opdm_object, latest_boundary])
opdm_object["valid"] = response["valid"] # taking only relevant data from validation step
- for component in opdm_object['opde:Component']: # pop out initial binary network model data
- component['opdm:Profile'].pop('DATA')
return opdm_objects
@@ -92,6 +157,12 @@ def __init__(self):
self.elastic_service = elastic.HandlerSendToElastic(index=ELK_INDEX, id_from_metadata=True, id_metadata_list=['opde:Id'])
def handle(self, opdm_objects: List[dict], **kwargs):
+
+ # pop out initial binary network model data
+ for opdm_object in opdm_objects:
+ for component in opdm_object['opde:Component']:
+ component['opdm:Profile'].pop('DATA')
+
self.elastic_service.handle(byte_string=json.dumps(opdm_objects, default=str).encode('utf-8'),
properties=kwargs.get('properties'))
logger.info(f"Network model metadata sent to object-storage.elk")