diff --git a/.gitignore b/.gitignore index ec43f5a..4a503a5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ .idea/ -containers.txt \ No newline at end of file +containers.txt +.DS_Store \ No newline at end of file diff --git a/config/cgm_worker/model_merge.properties b/config/cgm_worker/model_merge.properties new file mode 100644 index 0000000..c3aa7d3 --- /dev/null +++ b/config/cgm_worker/model_merge.properties @@ -0,0 +1,9 @@ +[MAIN] +RMQ_CGM_QUEUE = rabbit-task-test +EMF_OS_MINIO_BUCKET = opde-confidential-models +EMF_OS_MINIO_FOLDER = EMF_test_merge_find_better_place +EMF_OS_MINIO_BOUNDARY_BUCKET = opdm-data +EMF_OS_MINIO_BOUNDARY_FOLDER = CGMES/ENTSOE +ELK_VERSION_INDEX = emfos-logs* +MERGE_TYPES = {'CGM': {'AREA': 'EU', 'INCLUDED':[], 'EXCLUDED': ['APG'], 'LOCAL_IMPORT': []}, 'RMM': {'AREA':'BA', 'INCLUDED': ['ELERING', 'AST', 'LITGRID','PSE'], 'EXLUDED': [], 'LOCAL_IMPORT': []}} +MERGING_ENTITY = BALTICRSC \ No newline at end of file diff --git a/config/cgm_worker/validator.properties b/config/cgm_worker/validator.properties index 78a4236..0dbf3a1 100644 --- a/config/cgm_worker/validator.properties +++ b/config/cgm_worker/validator.properties @@ -1,2 +1,4 @@ [MAIN] -ELK_INDEX = emfos-igm-validation \ No newline at end of file +ELK_INDEX = emfos-igm-validation +ENTSOE_EXAMPLES_EXTERNAL=https://www.entsoe.eu/Documents/CIM_documents/Grid_Model_CIM/QoCDC_v3.2.1_test_models.zip +ENTSOE_EXAMPLES_LOCAL=./example_models/ \ No newline at end of file diff --git a/config/logging/custom_logger.properties b/config/logging/custom_logger.properties index 2fffffa..d5d6f21 100644 --- a/config/logging/custom_logger.properties +++ b/config/logging/custom_logger.properties @@ -2,4 +2,13 @@ LOGGING_INDEX = emfos-logs LOGGING_FORMAT = %(levelname) -10s %(asctime) -10s %(name) -50s %(funcName) -50s %(lineno) -5d: %(message)s LOGGING_DATEFMT = %Y-%m-%d %H:%M:%S -LOGGING_LEVEL = INFO \ No newline at end of file +LOGGING_LEVEL = INFO +LOCAL_FOLDER_FOR_PYPOWSYBL_LOGS = ./pypowsybl-logs +ELASTIC_INDEX_FOR_PYPOWSYBL_LOGS = emfos-pypowsybl-logs +MINIO_BUCKET_FOR_PYPOWSYBL_LOGS = external +MINIO_FOLDER_FOR_PYPOWSYBL_LOGS = EMF_OS_pypowsybl_logs +ELASTIC_FIELD_FOR_LOG_DATA = log_data +ELASTIC_FIELD_FOR_TSO = tso +ELASTIC_FIELD_FOR_TOPIC = topic +ELASTIC_FIELD_FOR_FILENAME = log_file_name +ELASTIC_FIELD_FOR_MINIO_BUCKET = minio_bucket \ No newline at end of file diff --git a/emf/common/integrations/elastic.py b/emf/common/integrations/elastic.py index 7853676..8dbb0cf 100644 --- a/emf/common/integrations/elastic.py +++ b/emf/common/integrations/elastic.py @@ -11,12 +11,21 @@ import warnings from elasticsearch.exceptions import ElasticsearchWarning + warnings.simplefilter('ignore', ElasticsearchWarning) logger = logging.getLogger(__name__) parse_app_properties(caller_globals=globals(), path=config.paths.integrations.elastic) +SCROLL_ID_FIELD = '_scroll_id' +RESULT_FIELD = 'hits' +DOCUMENT_COUNT = 10000 +DEFAULT_COLUMNS = ["value"] +INITIAL_SCROLL_TIME = "15m" +CONSECUTIVE_SCROLL_TIME = "12m" +MAGIC_KEYWORD = '_source' + class Elastic: @@ -60,7 +69,10 @@ def send_to_elastic(index, # Executing POST to push message into ELK if debug: logger.debug(f"Sending data to {url}") - response = requests.post(url=url, json=json_message) + if json_message.get('args', None): # TODO revise if this is best solution + json_message.pop('args') + json_data = json.dumps(json_message, default=str) + response = requests.post(url=url, data=json_data.encode(), headers={"Content-Type": "application/json"}) if debug: logger.debug(f"ELK response: {response.content}") @@ -92,9 +104,16 @@ def send_to_elastic_bulk(index, if id_from_metadata: id_separator = "_" - json_message_list = [value for element in json_message_list for value in ({"index": {"_index": index, "_id": id_separator.join([str(element.get(key, '')) for key in id_metadata_list])}}, element)] + json_message_list = [value for element in json_message_list for value in ({"index": {"_index": index, + "_id": id_separator.join( + [str(element.get( + key, '')) for + key in + id_metadata_list])}}, + element)] else: - json_message_list = [value for element in json_message_list for value in ({"index": {"_index": index}}, element)] + json_message_list = [value for element in json_message_list for value in + ({"index": {"_index": index}}, element)] response_list = [] for batch in range(0, len(json_message_list), batch_size): @@ -102,7 +121,7 @@ def send_to_elastic_bulk(index, if debug: logger.debug(f"Sending batch ({batch}-{batch + batch_size})/{len(json_message_list)} to {url}") response = requests.post(url=url, - data=(ndjson.dumps(json_message_list[batch:batch + batch_size])+"\n").encode(), + data=(ndjson.dumps(json_message_list[batch:batch + batch_size]) + "\n").encode(), timeout=None, headers={"Content-Type": "application/x-ndjson"}) if debug: @@ -128,6 +147,55 @@ def get_docs_by_query(self, index, query, size=None, return_df=True): return response + def get_data_by_scrolling(self, + query: dict, + index: str, + fields: []): + """ + Gets a large bulk of data from elastic + :param query: dictionary with parameters by which to select data from elastic + :param index: table name in elastic + :param fields: fields or columns to return from query + :return: dataframe with results + """ + result = self.client.search(index=index, + query=query, + source=fields, + size=DOCUMENT_COUNT, + scroll=INITIAL_SCROLL_TIME) + + scroll_id = result[SCROLL_ID_FIELD] + # Extract and return the relevant data from the initial response + hits = result[RESULT_FIELD][RESULT_FIELD] + yield hits + # Continue scrolling through the results until there are no more + while hits: + result = self.client.scroll(scroll_id=scroll_id, scroll=CONSECUTIVE_SCROLL_TIME) + hits = result[RESULT_FIELD][RESULT_FIELD] + yield hits + # Clear the scroll context after processing all results + self.client.clear_scroll(scroll_id=scroll_id) + + def get_data(self, + query: dict, + index: str, + fields: [] = None): + """ + Gets data from elastic + :param query: dictionary with parameters by which to select data from elastic + :param index: table name in elastic + :param fields: fields or columns to return from query + :return: dataframe with results + """ + # Gather all the results to list (of dictionaries) + list_of_lines = [] + for hits in self.get_data_by_scrolling(query=query, index=index, fields=fields): + for hit in hits: + list_of_lines.append({field: hit[MAGIC_KEYWORD][field] for field in fields}) + # convert list (of dictionaries) to pandas dataframe + data_frame = pd.DataFrame(list_of_lines) + return data_frame + def query_schedules_from_elk(self, index: str, utc_start: str, @@ -187,7 +255,6 @@ def __init__(self, auth=None, verify=False, debug=False): - self.index = index self.server = server self.id_from_metadata = id_from_metadata @@ -203,7 +270,6 @@ def __init__(self, self.session.auth = auth def handle(self, byte_string, properties): - Elastic.send_to_elastic_bulk(index=self.index, json_message_list=json.loads(byte_string), id_from_metadata=self.id_from_metadata, @@ -215,7 +281,6 @@ def handle(self, byte_string, properties): if __name__ == '__main__': - # Create client server = "http://test-rcc-logs-master.elering.sise:9200" service = Elastic(server=server) diff --git a/emf/common/integrations/object_storage/object_storage.py b/emf/common/integrations/object_storage/object_storage.py index 8e0c3ae..ce3c676 100644 --- a/emf/common/integrations/object_storage/object_storage.py +++ b/emf/common/integrations/object_storage/object_storage.py @@ -68,9 +68,10 @@ def get_content(metadata: dict, bucket_name=MINIO_BUCKET_NAME): logger.info(f"Getting data from MinIO") for component in metadata["opde:Component"]: content_reference = component.get("opdm:Profile").get("pmd:content-reference") + # Minio considers // as / + content_reference = content_reference.replace('//', '/') logger.info(f"Downloading {content_reference}") component["opdm:Profile"]["DATA"] = minio_service.download_object(bucket_name, content_reference) - return metadata diff --git a/emf/common/logging/custom_logger.py b/emf/common/logging/custom_logger.py index ca00cff..e16427c 100644 --- a/emf/common/logging/custom_logger.py +++ b/emf/common/logging/custom_logger.py @@ -1,13 +1,61 @@ +import os.path import sys import logging +from datetime import datetime, timedelta +from enum import Enum +from io import BytesIO +from zipfile import ZipFile + import requests + from emf.common.integrations import elastic import config from emf.common.config_parser import parse_app_properties +from emf.common.integrations.minio import ObjectStorage logger = logging.getLogger(__name__) parse_app_properties(caller_globals=globals(), path=config.paths.logging.custom_logger) +PYPOWSYBL_LOGGER = 'powsybl' +PYPOWSYBL_LOGGER_DEFAULT_LEVEL = 1 +CUSTOM_LOG_BUFFER_LINE_BREAK = '\r\n' +# Max allowed lifespan of link to file in minio bucket +DAYS_TO_STORE_DATA_IN_MINIO = 7 # Max allowed by Minio +# Default name of the subfolder for storing the results if needed +SEPARATOR_SYMBOL = '/' +WINDOWS_SEPARATOR = '\\' + + +def check_the_folder_path(folder_path: str): + """ + Checks folder path for special characters + :param folder_path: input given + :return checked folder path + """ + if not folder_path.endswith(SEPARATOR_SYMBOL): + folder_path = folder_path + SEPARATOR_SYMBOL + double_separator = SEPARATOR_SYMBOL + SEPARATOR_SYMBOL + # Escape '//' + folder_path = folder_path.replace(double_separator, SEPARATOR_SYMBOL) + # Escape '\' + folder_path = folder_path.replace(WINDOWS_SEPARATOR, SEPARATOR_SYMBOL) + return folder_path + + +def save_content_to_zip_file(content: {}): + """ + Saves content to zip file (in memory) + :param content: the content of zip file (key: file name, value: file content) + :return: byte array + """ + output_object = BytesIO() + with ZipFile(output_object, "w") as output_zip: + if content: + for file_name in content: + logger.info(f"Converting {file_name} to zip container") + output_zip.writestr(file_name, content[file_name]) + output_object.seek(0) + return output_object.getvalue() def initialize_custom_logger( @@ -18,8 +66,7 @@ def initialize_custom_logger( index: str = LOGGING_INDEX, extra: None | dict = None, fields_filter: None | list = None, - ): - +): root_logger = logging.getLogger() root_logger.setLevel(level) root_logger.propagate = True @@ -65,9 +112,11 @@ def elk_connection(self): logger.info(f"Connection to {self.server} successful") return True else: - logger.warning(f"ELK server response: [{response.status_code}] {response.reason}. Disabling ELK logging.") + logger.warning( + f"ELK server response: [{response.status_code}] {response.reason}. Disabling ELK logging.") except requests.exceptions.ConnectTimeout: - logger.warning(f"ELK server {self.server} does not responding with ConnectTimeout error. Disabling ELK logging.") + logger.warning( + f"ELK server {self.server} does not responding with ConnectTimeout error. Disabling ELK logging.") except Exception as e: logger.warning(f"ELK server {self.server} returned unknown error: {e}") @@ -89,6 +138,515 @@ def emit(self, record): elastic.Elastic.send_to_elastic(index=self.index, json_message=elk_record, server=self.server) +class LogStream(object): + """ + Some custom container for storing log related data + """ + + def __init__(self, formatter): + # self.logs = '' + self.logs = [] + self.formatter = formatter + self.single_entry = None + + def write(self, message): + """ + Writes the log message to the buffer + :param message: log message + """ + # formatted_message = self.formatter.format(message) + # self.logs += formatted_message + # self.logs += CUSTOM_LOG_BUFFER_LINE_BREAK + self.logs.append(message) + + def format_for_writing(self): + """ + Reduces double linebreaks to single linebreaks and some little adjustments + """ + # double_of_line_break = CUSTOM_LOG_BUFFER_LINE_BREAK + CUSTOM_LOG_BUFFER_LINE_BREAK + # self.logs.replace(double_of_line_break, CUSTOM_LOG_BUFFER_LINE_BREAK) + # self.logs = '\n'.join(self.logs.splitlines()) + pass + + def flush(self): + pass + + def reset(self): + """ + Resets the internal buffers + """ + self.logs = [] + self.single_entry = None + + def get_logs(self): + """ + Gets the content + :return: tuple of logs and entry that triggered reporting process + """ + return self.logs, self.single_entry + + +class PyPowsyblLogReportingPolicy(Enum): + """ + Some additional reporting types + """ + """ + Gathers all the pypowsybl output and reports everything when stop_working is called + """ + ALL_ENTRIES = "all_entries" + """ + Gathers all the pypowsybl output and reports when at least one entry reached to logging level + """ + ENTRIES_IF_LEVEL_REACHED = "entries_if_level_was_reached" + """ + Reports a single logging record that was over the logging level + """ + ENTRY_ON_LEVEL = "entry_on_level" + """ + Gathers only entries that are on the level or higher level + """ + ENTRIES_ON_LEVEL = "entries_on_level_and_higher" + """ + Reports all collected logging records from the last point when the level was reached + """ + ENTRIES_COLLECTED_TO_LEVEL = "entries_collected_to_level" + + +def get_buffer_size(buffer): + """ + Returns the length of the buffer + :param buffer: input buffer + :return: length of the buffer + """ + return len(buffer.encode('utf-8')) + + +class PyPowsyblLogGatherer: + """ + Governing class for the PyPowsyblLogHandler + Note that for posting the data to elastic, minio the default configuration (elastic.properties, mini.properties) + is used + """ + + def __init__(self, + topic_name: str = None, + reporting_level=None, + tso: str = None, + print_to_console: bool = False, + send_to_elastic: bool = True, + upload_to_minio: bool = False, + report_on_command: bool = True, + path_to_local_folder: str = LOCAL_FOLDER_FOR_PYPOWSYBL_LOGS, + minio_bucket: str = MINIO_BUCKET_FOR_PYPOWSYBL_LOGS, + logging_policy: PyPowsyblLogReportingPolicy = PyPowsyblLogReportingPolicy.ENTRIES_IF_LEVEL_REACHED, + elk_server=elastic.ELK_SERVER, + index=ELASTIC_INDEX_FOR_PYPOWSYBL_LOGS): + """ + Initializes the pypowsybl log gatherer. + :param topic_name: name (string) that can be used to distinguish log files + :param reporting_level: logging.level which triggers reporting + :param tso: the name of the tso (for naming the files) + :param print_to_console: If True then prints the pypowsybl log to console, false: consume it internally + :param send_to_elastic: If True then posts a log entry that triggered the gathering to elastic + :param upload_to_minio: If True then posts a log buffer to minio as .log file + :param report_on_command: If true then log entries/buffer are reported when entry has triggered gathering + and dedicated function is called manually (e.g. report when all validation failed) + :param minio_bucket: the name of the bucket in minio + :param logging_policy: determines which and how to collect (entire log or only entries on the level etc) + :param elk_server: name of the elk server instance + :index: name of the index in elastic search where to post the log entries + """ + self.topic_name = topic_name + self.formatter = logging.Formatter(LOGGING_FORMAT) + self.package_logger = logging.getLogger(PYPOWSYBL_LOGGER) + self.package_logger.setLevel(PYPOWSYBL_LOGGER_DEFAULT_LEVEL) + self.reporting_level = reporting_level + self.tso = tso + self.report_on_command = report_on_command + self.reporting_triggered_externally = True + self.reset_triggers_for_reporting() + if self.reporting_level is None: + self.reporting_level = logging.ERROR + self.gathering_handler = PyPowsyblLogGatheringHandler(formatter=self.formatter, + parent=self, + report_level=self.reporting_level) + self.package_logger.addHandler(self.gathering_handler) + # Switch reporting to console on or off + self.package_logger.propagate = print_to_console + self.path_to_local_folder = check_the_folder_path(path_to_local_folder) + # Initialize the elk instance + self.elastic_server = elk_server + self.index = index + self.send_to_elastic = send_to_elastic + self.report_to = True + self.logging_policy = None + self.set_reporting_policy(logging_policy) + + self.minio_instance = None + self.minio_bucket = minio_bucket + if upload_to_minio: + try: + self.minio_instance = ObjectStorage() + except Exception as ex: + # Check the exception + logger.warning(f"Cannot connect to Minio, staying offline") + self.identifier = datetime.now().strftime("%d-%m-%Y_%H-%M-%S") + # if needed use identifier + # self.identifier = uuid.uuid4() + + def set_report_on_command(self, report_on_command: bool = False): + """ + Sets manual reporting status + if report_on_command is true then reporting happens when self.report_to is true and trigger_to_report_externally + is called with value true + if report_on_command is false then reporting (posting the logs) happens when self.report_to is true + :param report_on_command: new status for manual reporting + """ + self.report_on_command = report_on_command + self.reporting_triggered_externally = False if self.report_on_command else True + + def reset_triggers_for_reporting(self): + """ + Resets the triggers used: + reporting_to which triggered by the log entry + reporting_triggered_externally which triggered outside manually + """ + self.reporting_triggered_externally = False if self.report_on_command else True + self.report_to = False + + @property + def elastic_is_connected(self): + """ + Borrowed from elastic handler above + Do check up to elastic, handle errors + """ + try: + response = requests.get(self.elastic_server, timeout=5) + if response.status_code == 200: + return True + else: + logger.warning( + f"ELK server response: [{response.status_code}] {response.reason}. Disabling ELK logging.") + except requests.exceptions.ConnectTimeout: + logger.warning(f"{self.elastic_server}: Timeout. Disabling ELK logging.") + except Exception as e: + logger.warning(f"{self.elastic_server}: unknown error: {e}") + return False + + def post_log_report(self, buffer='', single_entry=None): + """ + Handles the created report by sending it to elastic or saving it to local storage + Checks if send_to_elastic is enabled and instance of elastic is available. Composes a message where fields + are standard field of a logging.Record. Adds the entire log as a string to log_data field + :param buffer: buffer containing log entries + :param single_entry: first entry that reached to required level + """ + try: + if self.send_to_elastic and self.elastic_is_connected: + elastic_content = self.compose_elastic_message(buffer, single_entry) + if elastic_content is not None: + response = elastic.Elastic.send_to_elastic(index=self.index, + json_message=elastic_content, + server=self.elastic_server) + if response.ok: + # TODO: Is message pending needed? + # For example if sending message failed, keep it somewhere and send it when connection is + # available + self.reset_triggers_for_reporting() + return + raise ConnectionError + except ConnectionError: + if not self.send_to_elastic: + logger.info("Saving log to local storage") + else: + logger.error(f"Sending log to elastic failed, saving to local storage...") + self.compose_log_file(buffer, single_entry) + self.reset_triggers_for_reporting() + # except Exception: + # logger.error(f"Unable to post log report: {Exception}") + + def set_reporting_policy(self, new_policy: PyPowsyblLogReportingPolicy): + """ + Updates logging policy to new value + :param new_policy: new policy value + """ + self.logging_policy = new_policy + self.gathering_handler.set_reporting_policy(self.logging_policy) + if self.logging_policy != PyPowsyblLogReportingPolicy.ALL_ENTRIES: + self.reset_triggers_for_reporting() + + def set_tso(self, tso_name: str): + """ + Sets the tso to new tso, handles the log of previous tso + :param tso_name: name of tso + """ + self.stop_working() + self.gathering_handler.start_gathering() + self.tso = tso_name + + def trigger_to_report_externally(self, trigger_reporting: bool = True): + """ + Calls reporting when self.report_on_command is set to true + NOTE: That this works on policies which report at the end + :param trigger_reporting: if true then if self.report_to is true (set by log entry) the log entry/buffers + are reported otherwise not + """ + if self.report_on_command: + self.reporting_triggered_externally = trigger_reporting + + def set_to_reporting(self): + """ + Handles the logging event, decides whether and what to post: + Posts if + 1) log entry that reached to level when policy is set to PyPowsyblLogReportingPolicy.ENTRY_ON_LEVEL + 2) log entry and log buffer when policy is set to PyPowsyblLogReportingPolicy.ENTRIES_COLLECTED_TO_LEVEL + """ + self.report_to = True + # logger.info(f"{self.topic_name}: {self.get_reporting_level()} from Pypowsybl, setting to report") + if self.logging_policy == PyPowsyblLogReportingPolicy.ENTRY_ON_LEVEL: + logger.info(f"Passing at once") + buffer, single_entry = self.get_logs() + self.post_log_report(single_entry=single_entry) + elif self.logging_policy == PyPowsyblLogReportingPolicy.ENTRIES_COLLECTED_TO_LEVEL: + logger.info(f"Sending content gathered") + buffer, single_entry = self.get_logs() + self.post_log_report(buffer=buffer, single_entry=single_entry) + + def start_working(self): + """ + Starts gathering the logs + """ + self.gathering_handler.start_gathering() + + def stop_working(self): + """ + Stops gathering the logs, retrieves them from buffer and decides whether to post them: + posts if + 1) self.logging_policy is set to PyPowsyblLogReportingPolicy.ALL_ENTRIES or + 2) self.logging_policy is set to PyPowsyblLogReportingPolicy.ENTRIES_IF_LEVEL_REACHED and level was reached + (self.report_to is True) + : return: None + """ + self.gathering_handler.stop_gathering() + buffer, single_entry = self.get_logs() + if (buffer is None or buffer == '') and single_entry is None: + return + # Check if post is needed + # 1. If reporting was set to be triggered externally and no triggering case occurred + if self.report_on_command is False or self.reporting_triggered_externally is True: + # 2. If other conditions are met + if (self.logging_policy == PyPowsyblLogReportingPolicy.ALL_ENTRIES or + (self.report_to and + (self.logging_policy == PyPowsyblLogReportingPolicy.ENTRIES_IF_LEVEL_REACHED or + self.logging_policy == PyPowsyblLogReportingPolicy.ENTRIES_ON_LEVEL))): + self.post_log_report(buffer, single_entry) + self.reset_triggers_for_reporting() + + def get_logs(self): + """ + Gets and formats logs + """ + log_list, single_log = self.gathering_handler.get_buffer() + buffer = self.format_buffer_to_string(log_list) + single_entry = single_log + return buffer, single_entry + + def format_buffer_to_string(self, buffer): + """ + Returns log buffer combined to a string + Note! Be aware of the line break, it is currently set to Windows style! + """ + return CUSTOM_LOG_BUFFER_LINE_BREAK.join([self.formatter.format(message) for message in buffer]) + + def get_reporting_level(self): + """ + Gets required logging.Loglevel as a string + : return log level as a string + """ + return logging.getLevelName(self.reporting_level) + + def compose_log_file(self, buffer: str = '', single_entry: logging.LogRecord = None, file_name: str = None): + """ + Saves buffer to local log file: buffer if exists, last entry otherwise + :param buffer: buffer containing log entries + :param single_entry: first entry that reached to required level + :param file_name: name of the file where the content should be saved. Note that if not specified, the + a default file name will be used (combination of topic, tso and date and time of the analysis) + :return log message dictionary + """ + file_name = self.check_and_get_file_name(file_name) + if buffer != '' and buffer is not None: + payload = '\n'.join(buffer.splitlines()) + elif single_entry is not None: + payload = self.formatter.format(single_entry) + else: + return None + # And create directories + directory_name = os.path.dirname(file_name) + if not os.path.exists(directory_name): + os.makedirs(directory_name) + with open(file_name, mode='w', encoding="utf-8") as log_file: + log_file.write(payload) + return file_name + + def check_and_get_file_name(self, file_name: str = None, use_folders: bool = True, use_local: bool = True): + """ + Gets some predefined file name to be used when saving the logs + :param file_name: the input, if exists, leave empty otherwise + :param use_folders: create sub folders for storing file + :param use_local: store as a relative path when saving to local computer + :return file name + """ + if file_name is None or file_name == '': + time_moment_now = datetime.now().strftime("%d-%m-%Y_%H-%M-%S") + file_name = f"{self.topic_name}_pypowsybl_error_log_for_{self.tso}_from_{time_moment_now}.log" + if use_folders: + file_name = (MINIO_FOLDER_FOR_PYPOWSYBL_LOGS + + SEPARATOR_SYMBOL + + str(self.identifier) + + SEPARATOR_SYMBOL + file_name) + if use_local: + file_name = self.path_to_local_folder + file_name + return file_name + + def post_log_to_minio(self, buffer='', file_name: str = None): + """ + Posts log as a file to minio + :param buffer: logs as a string + :param file_name: if given + :return: file name and link to file, the link to the file + """ + link_to_file = None + if self.minio_instance is not None and buffer != '' and buffer is not None: + # check if the given bucket exists + if not self.minio_instance.client.bucket_exists(bucket_name=self.minio_bucket): + logger.warning(f"{self.minio_bucket} does not exist") + return link_to_file + # Adjust the filename to the default n + file_name = self.check_and_get_file_name(file_name, use_local=False) + file_object = BytesIO(str.encode(buffer)) + file_object.name = file_name + self.minio_instance.upload_object(file_path_or_file_object=file_object, + bucket_name=self.minio_bucket, + metadata=None) + time_to_expire = timedelta(days=DAYS_TO_STORE_DATA_IN_MINIO) + link_to_file = self.minio_instance.client.get_presigned_url(method="GET", + bucket_name=self.minio_bucket, + object_name=file_object.name, + expires=time_to_expire) + return file_name, link_to_file + + def compose_elastic_message(self, buffer: str = '', single_entry: logging.LogRecord = None): + """ + Put together a dictionary consisting of first log entry from the pypowsybl that met response level and the log + entry for the entire process + :param buffer: buffer containing log entries + :param single_entry: first entry that reached to required level + :return log message dictionary + """ + message_dict = {} + # Add first log entry that reached to level as a content of the payload + if single_entry is not None and isinstance(single_entry, logging.LogRecord): + message_dict = single_entry.__dict__ + file_name, link_to_log_file = self.post_log_to_minio(buffer=buffer) + message_dict[ELASTIC_FIELD_FOR_FILENAME] = file_name + if link_to_log_file != '' and link_to_log_file is not None: + message_dict[ELASTIC_FIELD_FOR_MINIO_BUCKET] = MINIO_BUCKET_FOR_PYPOWSYBL_LOGS + message_dict[ELASTIC_FIELD_FOR_LOG_DATA] = link_to_log_file + message_dict[ELASTIC_FIELD_FOR_TSO] = self.tso + message_dict[ELASTIC_FIELD_FOR_TOPIC] = self.topic_name + return message_dict + + +class PyPowsyblLogGatheringHandler(logging.StreamHandler): + """ + Initializes custom log handler to start and gather logs. + Depending on the policy either gathers logs to buffer or looks out for log entry which on the report level or + does both + """ + + def __init__(self, + formatter: logging.Formatter, + parent: PyPowsyblLogGatherer = None, + logging_policy: PyPowsyblLogReportingPolicy = PyPowsyblLogReportingPolicy.ALL_ENTRIES, + report_level=logging.ERROR): + """ + Constructor: + :param formatter: the formatter for converting the log entries + :param parent: the parent to whom to report to + :logging_policy: check if buffer is needed or not + :report_level: log level when caught propagates to parent to trigger event + """ + super().__init__() + self.parent = parent + self.gathering = False + self.originator_type = 'IGM_validation' + self.formatter = formatter + if self.formatter is None: + self.formatter = logging.Formatter(LOGGING_FORMAT) + self.gathering_buffer = LogStream(self.formatter) + self.report_level = report_level + self.logging_policy = None + self.write_all = False + self.write_only_levels = False + self.set_reporting_policy(logging_policy) + + def set_reporting_policy(self, new_policy: PyPowsyblLogReportingPolicy): + """ + Sets reporting policy to new value + :param new_policy: new policy value + """ + self.logging_policy = new_policy + self.write_all = (self.logging_policy != PyPowsyblLogReportingPolicy.ENTRY_ON_LEVEL and + self.logging_policy != PyPowsyblLogReportingPolicy.ENTRIES_ON_LEVEL) + self.write_only_levels = self.logging_policy == PyPowsyblLogReportingPolicy.ENTRIES_ON_LEVEL + + def emit(self, record: logging.LogRecord) -> None: + """ + Stores the log output from pypowsybl to internal buffer. Looks for log level as event to trigger reporting + in parent + :param record: log record + """ + if self.gathering: + # Bypass the buffer if the entire log is not required + if self.write_all: + self.gathering_buffer.write(message=record) + if record.levelno >= self.report_level: + if self.write_only_levels: + self.gathering_buffer.write(message=record) + self.gathering_buffer.single_entry = record + self.parent.set_to_reporting() + + def start_gathering(self): + """ + Resets the buffer to empty and turns gathering on + :return: None + """ + self.reset_gathering() + self.gathering = True + + def stop_gathering(self): + """ + Stops the gathering, leaves the content to buffer + :return: None + """ + self.gathering = False + + def reset_gathering(self): + """ + Resets the gathering status to default + :return: None + """ + self.gathering_buffer.reset() + + def get_buffer(self): + """ + Returns gathering buffer and last entry, resets the buffer + :return: log stream instance + """ + return self.gathering_buffer.get_logs() + + if __name__ == '__main__': # Start root logger STREAM_LOG_FORMAT = "%(levelname) -10s %(asctime) -10s %(name) -35s %(funcName) -30s %(lineno) -5d: %(message)s" @@ -105,3 +663,4 @@ def emit(self, record): if elk_handler.connected: logger.addHandler(elk_handler) logger.info(f"Info message", extra={'extra': 'logger testing'}) + diff --git a/emf/loadflow_tool/helper.py b/emf/loadflow_tool/helper.py index 36c65e7..1cfc1cb 100644 --- a/emf/loadflow_tool/helper.py +++ b/emf/loadflow_tool/helper.py @@ -1,3 +1,4 @@ +import zipfile from zipfile import ZipFile, ZIP_DEFLATED from uuid import uuid4 from io import BytesIO @@ -28,6 +29,10 @@ "iidm.export.cgmes.modeling-authority-set": "powsybl.org" } +NETWORK_KEYWORD = 'network' +NETWORK_META_KEYWORD = 'network_meta' +NETWORK_VALID_KEYWORD = 'network_valid' + # TODO - Add comments and docstring def package_for_pypowsybl(opdm_objects, return_zip: bool = False): @@ -153,15 +158,24 @@ def load_model(opdm_objects: List[dict]): model_data["network_valid"] = network.validate().name # Network model import reporter data - # model_data["import_report"] = json.loads(import_report.to_json()) + model_data["import_report"] = json.loads(import_report.to_json()) # model_data["import_report_str"] = str(import_report) return model_data def opdmprofile_to_bytes(opdm_profile): + # Temporary fix: input data (['opdm:Profile']['DATA']) can be a zip file, figure it out and extract + # before proceeding further data = BytesIO(opdm_profile['opdm:Profile']['DATA']) - data.name = opdm_profile['opdm:Profile']['pmd:fileName'] + file_name = opdm_profile['opdm:Profile']['pmd:fileName'] + if zipfile.is_zipfile(data) and not file_name.endswith('.zip'): + xml_tree_file = get_xml_from_zip(data) + bytes_object = BytesIO() + xml_tree_file.write(bytes_object, encoding='utf-8') + bytes_object.seek(0) + data = bytes_object + data.name = file_name return data @@ -339,16 +353,17 @@ def export_model(network: pypowsybl.network, opdm_object_meta, profiles=None): profiles = "SV,SSH,TP,EQ" file_base_name = filename_from_metadata(opdm_object_meta).split(".xml")[0] - - bytes_object = network.save_to_binary_buffer( - format="CGMES", - parameters={ - "iidm.export.cgmes.modeling-authority-set": opdm_object_meta['pmd:modelingAuthoritySet'], - "iidm.export.cgmes.base-name": file_base_name, - "iidm.export.cgmes.profiles": profiles, - "iidm.export.cgmes.naming-strategy": "cgmes-fix-all-invalid-ids", # identity, cgmes, cgmes-fix-all-invalid-ids - }) - - bytes_object.name = f"{file_base_name}_{uuid.uuid4()}.zip" - - return bytes_object + try: + bytes_object = network.save_to_binary_buffer( + format="CGMES", + parameters={ + "iidm.export.cgmes.modeling-authority-set": opdm_object_meta['pmd:modelingAuthoritySet'], + "iidm.export.cgmes.base-name": file_base_name, + "iidm.export.cgmes.profiles": profiles, + "iidm.export.cgmes.naming-strategy": "cgmes-fix-all-invalid-ids", # identity, cgmes, cgmes-fix-all-invalid-ids + }) + bytes_object.name = f"{file_base_name}_{uuid.uuid4()}.zip" + return bytes_object + except pypowsybl._pypowsybl.PyPowsyblError as p_error: + logger.error(f"Pypowsybl error on export: {p_error}") + raise Exception(p_error) diff --git a/emf/loadflow_tool/model_merge_handlers.py b/emf/loadflow_tool/model_merge_handlers.py new file mode 100644 index 0000000..1450455 --- /dev/null +++ b/emf/loadflow_tool/model_merge_handlers.py @@ -0,0 +1,412 @@ +import datetime +import logging +import time +import os + +import config +import json +import sys +from json import JSONDecodeError + +from emf.common.config_parser import parse_app_properties +from emf.common.integrations import elastic +from aniso8601 import parse_datetime +from emf.common.logging.custom_logger import ElkLoggingHandler +from emf.loadflow_tool.model_merger import (CgmModelComposer, get_models, get_local_models, PROCESS_ID_KEYWORD, + RUN_ID_KEYWORD, JOB_ID_KEYWORD, save_merged_model_to_local_storage, + publish_merged_model_to_opdm, save_merged_model_to_minio, + publish_metadata_to_elastic, DEFAULT_MERGE_TYPES, AREA_KEYWORD, + DEFAULT_AREA, INCLUDED_TSO_KEYWORD, DownloadModels, EXCLUDED_TSO_KEYWORD, + get_version_number) +from emf.task_generator.time_helper import parse_duration + +logger = logging.getLogger(__name__) + +parse_app_properties(caller_globals=globals(), path=config.paths.cgm_worker.model_merge) + +# TODO handle these constants +NUMBER_OF_CGM_TRIES = 3 +NUMBER_OF_CGM_TRIES_KEYWORD = 'task_retry_count' +TASK_TIMEOUT = 'PT5M' +TASK_TIMEOUT_KEYWORD = 'task_timeout' +SLEEP_BETWEEN_TRIES = 'PT1M' + +TASK_PROPERTIES_KEYWORD = 'task_properties' +TIMESTAMP_KEYWORD = 'timestamp_utc' +MERGE_TYPE_KEYWORD = 'merge_type' +TIME_HORIZON_KEYWORD = 'time_horizon' + +SAVE_MERGED_MODEL_TO_LOCAL_STORAGE = False +PUBLISH_MERGED_MODEL_TO_MINIO = True +PUBLISH_MERGED_MODEL_TO_OPDM = True +PUBLISH_METADATA_TO_ELASTIC = False + +failed_cases_collector = [] +succeeded_cases_collector = [] + + +def running_in_local_machine(): + """ + For debugging purposes only + """ + return "PYCHARM_HOSTED" in os.environ + + +def flatten_tuple(data): + """ + Flattens the nested tuple to eventually a single level tuple. + Use this when passing args as is from one handler to another + :param data: tuple of arguments + :return levelled tuple + """ + if isinstance(data, tuple): + if len(data) == 0: + return () + else: + return flatten_tuple(data[0]) + flatten_tuple(data[1:]) + else: + return (data,) + + +def find_key(input_dictionary: dict, key): + """ + Searches in depth for a key in dictionary + :param input_dictionary: from where to search + :param key: key to be searched + :return value of the key if found, None otherwise + """ + if key in input_dictionary: + return input_dictionary[key] + for value in input_dictionary.values(): + if isinstance(value, dict): + result = find_key(input_dictionary=value, key=key) + if result is not None: + return result + return None + + +def get_payload(args, keyword: str = MERGE_TYPE_KEYWORD): + """ + Searches keyword from args. Tries to parse the arg to dict and checks if keyword is present. if it + is returns the arg + :param args: tuple of args + :param keyword: keyword to be searched + :return argument which is dictionary and has the keyword or None + """ + args = flatten_tuple(args) + if args and len(args) > 0: + for argument in args: + try: + if isinstance(argument, dict): + dict_value = argument + else: + dict_value = json.loads(argument.decode('utf-8')) + if not find_key(dict_value, keyword): + raise UnknownArgumentException + return dict_value + except JSONDecodeError: + continue + return None + + +def run_sleep_timer(time_value: any = None): + """ + Waits for some given time + :param time_value: seconds to wait + """ + if time_value is not None: + if isinstance(time_value, float) and time_value > 0: + time.sleep(time_value) + + +class UnknownArgumentException(JSONDecodeError): + pass + + +def handle_not_received_case(message): + """ + Do something if models were not found + TODO: report rabbit context if available + :param message: log error message + """ + logger.error(message) + # currently in the debugging mode do not consume more messages + if running_in_local_machine(): + failed_cases_collector.append(message) + # raise SystemExit + + +class HandlerGetModels: + """ + This one gathers the necessary data + """ + + def __init__(self, + logger_handler: ElkLoggingHandler = None, + number_of_igm_tries: int = NUMBER_OF_CGM_TRIES, + default_area: str = DEFAULT_AREA, + cgm_minio_bucket: str = EMF_OS_MINIO_BUCKET, + cgm_minio_prefix: str = EMF_OS_MINIO_FOLDER, + merge_types: str |dict = MERGE_TYPES, + merging_entity: str = MERGING_ENTITY, + sleep_between_tries: str = SLEEP_BETWEEN_TRIES, + elk_index_version_number: str = ELK_VERSION_INDEX): + """ + :param logger_handler: attach rabbit context to it + :param number_of_igm_tries: max allowed tries before quitting + :param default_area: default merging area + :param cgm_minio_bucket: bucket where combined models are stored + :param cgm_minio_prefix: prefix of models + :param merge_types: the default dict consisting areas, included tsos and excluded tsos + :param merging_entity: the name of the merging entity + :param sleep_between_tries: sleep between igm requests if failed + :param elk_index_version_number: elastic index from where look version number + """ + self.number_of_igm_tries = number_of_igm_tries + self.logger_handler = logger_handler + self.opdm_service = None + merge_types = merge_types or DEFAULT_MERGE_TYPES + if isinstance(merge_types, str): + merge_types = merge_types.replace("'", "\"") + merge_types = json.loads(merge_types) + self.merge_types = merge_types + self.merging_entity = merging_entity + self.cgm_minio_bucket = cgm_minio_bucket + self.cgm_minio_prefix = cgm_minio_prefix + self.sleep_between_tries = parse_duration(sleep_between_tries) + self.elk_index_for_version_number = elk_index_version_number + self.default_area = default_area + + def handle(self, *args, **kwargs): + """ + Checks and parses the json, gathers necessary data and stores it to CGM_Composer and passes it on + """ + # Check the args: if there is a dict, json that can be converted to dict and consists a keyword + unnamed_args = args + input_data = get_payload(unnamed_args, keyword=MERGE_TYPE_KEYWORD) + # For debugging + manual_testing = False + if input_data is not None: + if self.logger_handler is not None: + # Pack rabbit context to elastic log handler + self.logger_handler.extra.update({PROCESS_ID_KEYWORD: input_data.get(PROCESS_ID_KEYWORD), + RUN_ID_KEYWORD: input_data.get(RUN_ID_KEYWORD), + JOB_ID_KEYWORD: input_data.get(JOB_ID_KEYWORD)}) + logger.info(f"Logger was updated with process_id, run_id and job_id (under extra fields)") + number_of_tries = self.number_of_igm_tries + + if TASK_PROPERTIES_KEYWORD in input_data and isinstance(input_data[TASK_PROPERTIES_KEYWORD], dict): + # Unpack the properties section + scenario_date = input_data[TASK_PROPERTIES_KEYWORD].get(TIMESTAMP_KEYWORD) + time_horizon = input_data[TASK_PROPERTIES_KEYWORD].get(TIME_HORIZON_KEYWORD) + merge_type = input_data[TASK_PROPERTIES_KEYWORD].get(MERGE_TYPE_KEYWORD) + # Get some models, allow only max tries + get_igms_try = 1 + available_models, latest_boundary = None, None + # Extract tso and area data + area = self.merge_types.get(merge_type, {}).get(AREA_KEYWORD, self.default_area) + included_tsos = self.merge_types.get(merge_type, {}).get(INCLUDED_TSO_KEYWORD, []) + excluded_tsos = self.merge_types.get(merge_type, {}).get(EXCLUDED_TSO_KEYWORD, []) + while get_igms_try <= number_of_tries: + if running_in_local_machine() and manual_testing: + available_models, latest_boundary = get_local_models(time_horizon=time_horizon, + scenario_date=scenario_date, + download_policy=DownloadModels.MINIO, + use_local_files=True) + else: + available_models, latest_boundary = get_models(time_horizon=time_horizon, + scenario_date=scenario_date, + download_policy=DownloadModels.MINIO, + included_tsos=included_tsos, + excluded_tsos=excluded_tsos) + available_models = [model for model in available_models + if model.get('pmd:TSO') not in ['APG', 'SEPS', '50Hertz']] + if available_models and latest_boundary: + break + message = [] + if not available_models: + message.append('models') + if not latest_boundary: + message.append('latest_boundary') + sleepy_message = f"Going to sleep {self.sleep_between_tries}" + logger.warning(f"Failed get {' and '.join(message)}. {sleepy_message}") + time.sleep(self.sleep_between_tries) + get_igms_try += 1 + # If no luck report to elastic and call it a day + if not available_models and not latest_boundary: + handle_not_received_case(f"Get Models: nothing got") + # Get the version number + version_number = get_version_number(scenario_date=scenario_date, + time_horizon=time_horizon, + modeling_entity=f"{self.merging_entity}-{area}") + # Pack everything and pass it on + cgm_input = CgmModelComposer(igm_models=available_models, + boundary_data=latest_boundary, + time_horizon=time_horizon, + scenario_date=scenario_date, + area=area, + merging_entity=self.merging_entity, + rabbit_data=input_data, + version=version_number) + return cgm_input, args, kwargs + return args, kwargs + + +class HandlerMergeModels: + def __init__(self, + publish_to_opdm: bool = PUBLISH_MERGED_MODEL_TO_OPDM, + publish_to_minio: bool = PUBLISH_MERGED_MODEL_TO_MINIO, + minio_bucket: str = EMF_OS_MINIO_BUCKET, + folder_in_bucket: str = EMF_OS_MINIO_FOLDER, + save_to_local_storage: bool = SAVE_MERGED_MODEL_TO_LOCAL_STORAGE, + publish_to_elastic: bool = PUBLISH_METADATA_TO_ELASTIC, + elk_server: str = elastic.ELK_SERVER, + cgm_index: str = ELK_VERSION_INDEX): + """ + Initializes the handler which starts to send out merged models + :param publish_to_opdm: publish cgm to opdm + :param publish_to_minio: save cgm to minio + :param minio_bucket: bucket where to store models + :param folder_in_bucket: prefix for models + :param save_to_local_storage: whether to save to local storage + :param publish_to_elastic: save metadata to elastic + :param elk_server: name of the elastic server + :param cgm_index: index in the elastic where to send the metadata + """ + self.minio_bucket = minio_bucket + self.folder_in_bucket = folder_in_bucket + self.use_folders = True + self.elastic_server = elk_server + self.cgm_index = cgm_index + self.save_to_local_storage = save_to_local_storage + self.send_to_minio = publish_to_minio + self.send_to_opdm = publish_to_opdm + self.send_to_elastic = publish_to_elastic + + def handle(self, *args, **kwargs): + """ + Calls the merge and posts the results + """ + # check if CgmModelComposerCgmModelComposer is in args + args = flatten_tuple(args) + + cgm_compose = None + # Check if CgmModelComposer is in args + for item in args: + if isinstance(item, CgmModelComposer): + cgm_compose = item + break + # check if CgmModelComposer is in kwargs + if cgm_compose is None: + for key in kwargs: + if isinstance(kwargs[key], CgmModelComposer): + cgm_compose = kwargs[key] + break + # If there was nothing, report and return + if cgm_compose is None: + handle_not_received_case("Merger: no inputs received") + logger.error(f"Pipeline failed, no dataclass present with igms") + return args, kwargs + # else merge the model and start sending it out + try: + cgm_compose.compose_cgm() + # Get the files + cgm_files = cgm_compose.cgm + folder_name = cgm_compose.get_folder_name() + # And send them out + if self.send_to_opdm: + publish_merged_model_to_opdm(cgm_files=cgm_files) + if self.send_to_minio: + save_merged_model_to_minio(cgm_files=cgm_files, + minio_bucket=self.minio_bucket, + folder_in_bucket=self.folder_in_bucket) + # For the future reference, store merge data to elastic + if self.send_to_elastic: + consolidated_metadata = cgm_compose.get_consolidated_metadata() + publish_metadata_to_elastic(metadata=consolidated_metadata, + cgm_index=self.cgm_index, + elastic_server=self.elastic_server) + if self.save_to_local_storage: + save_merged_model_to_local_storage(cgm_files=cgm_files, + cgm_folder_name=folder_name) + if running_in_local_machine(): + succeeded_cases_collector.append(cgm_compose.get_log_message()) + except Exception as ex_msg: + handle_not_received_case(f"Merger: {cgm_compose.get_log_message()} exception: {ex_msg}") + return args, kwargs + + +if __name__ == "__main__": + + logging.basicConfig( + format='%(levelname) -10s %(asctime) -20s %(name) -45s %(funcName) -35s %(lineno) -5d: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + level=logging.INFO, + handlers=[logging.StreamHandler(sys.stdout)] + ) + + testing_time_horizon = 'ID' + testing_merging_type = 'CGM' + start_date = parse_datetime("2024-04-11T00:30:00+00:00") + end_date = parse_datetime("2024-04-12T00:00:00+00:00") + + delta = end_date - start_date + delta_sec = delta.days * 24 * 3600 + delta.seconds + # Generate time array with 1h interval + time_step = 3600 + testing_scenario_dates = [(start_date + datetime.timedelta(0, t)).isoformat() + for t in range(0, delta_sec, time_step)] + + for testing_scenario_date in testing_scenario_dates: + example_input_from_rabbit = { + "@context": "https://example.com/task_context.jsonld", + "@type": "Task", + "@id": "urn:uuid:f9d476ec-2507-4ad2-8a37-72afdcd68bbf", + "process_id": "https://example.com/processes/CGM_CREATION", + "run_id": "https://example.com/runs/IntraDayCGM/1", + "job_id": "urn:uuid:00815bce-5cb5-4f45-8541-c5642680d474", + "task_type": "automatic", + "task_initiator": "some.body", + "task_priority": "normal", + "task_creation_time": "2024-04-04T06:57:51.018050", + "task_status": "created", + "task_status_trace": + [ + { + "status": "created", + "timestamp": "2024-04-04T06:57:51.018050" + } + ], + "task_dependencies": [], + "task_tags": [], + "task_retry_count": 0, + "task_timeout": "PT1H", + "task_gate_open": "2024-04-04T04:00:00+00:00", + "task_gate_close": "2024-04-04T04:15:00+00:00", + "job_period_start": "2024-04-04T05:00:00+00:00", + "job_period_end": "2024-04-04T13:00:00+00:00", + "task_properties": + { + "timestamp_utc": testing_scenario_date, + "merge_type": testing_merging_type, + "time_horizon": testing_time_horizon + } + } + + message_handlers = [HandlerGetModels(), + HandlerMergeModels(publish_to_opdm=False, + publish_to_minio=False, + save_to_local_storage=True)] + body = (example_input_from_rabbit,) + properties = {} + for message_handler in message_handlers: + try: + logger.info(f"---Handling message with {message_handler.__class__.__name__}") + body = message_handler.handle(body, properties=properties) + except Exception as ex: + logger.error(f"Message handling failed: {ex}") + if running_in_local_machine(): + print("FAILED:") + print('\r\n'.join(failed_cases_collector)) + print("SUCCESS:") + print('\r\n'.join(succeeded_cases_collector)) diff --git a/emf/loadflow_tool/model_merge_worker.py b/emf/loadflow_tool/model_merge_worker.py new file mode 100644 index 0000000..78f0023 --- /dev/null +++ b/emf/loadflow_tool/model_merge_worker.py @@ -0,0 +1,28 @@ +import logging +import config +import uuid +from emf.common.integrations import rabbit +from emf.common.logging import custom_logger +from emf.common.config_parser import parse_app_properties +from emf.common.converters import opdm_metadata_to_json +from emf.loadflow_tool.model_merge_handlers import HandlerGetModels, HandlerMergeModels + +# Initialize custom logger +elk_handler = custom_logger.initialize_custom_logger(extra={'worker': 'model-merger', 'worker_uuid': str(uuid.uuid4())}) +logger = logging.getLogger(__name__) + +parse_app_properties(caller_globals=globals(), path=config.paths.cgm_worker.model_merge) + +testing = True + +# RabbitMQ's consumer for CGM + +consumer = rabbit.RMQConsumer( + que=RMQ_CGM_QUEUE, + message_converter=opdm_metadata_to_json, + message_handlers=[HandlerGetModels(logger_handler=elk_handler), HandlerMergeModels()]) + +try: + consumer.run() +except KeyboardInterrupt: + consumer.stop() diff --git a/emf/loadflow_tool/model_merger.py b/emf/loadflow_tool/model_merger.py new file mode 100644 index 0000000..a9a5df3 --- /dev/null +++ b/emf/loadflow_tool/model_merger.py @@ -0,0 +1,1217 @@ +import math +from datetime import timedelta +from enum import Enum + +import pypowsybl +import zeep.exceptions + +import config +from emf.common.config_parser import parse_app_properties +from emf.common.integrations import minio, opdm, elastic +from emf.common.integrations.elastic import Elastic +from emf.common.integrations.object_storage.object_storage import query_data, get_content +from emf.common.logging.custom_logger import SEPARATOR_SYMBOL, check_the_folder_path +from emf.loadflow_tool.helper import (load_model, load_opdm_data, filename_from_metadata, export_model, + NETWORK_KEYWORD, NETWORK_META_KEYWORD, get_metadata_from_filename, attr_to_dict) +from emf.loadflow_tool.validator import (get_local_entsoe_files, LocalFileLoaderError, + parse_boundary_message_type_profile, OPDE_COMPONENT_KEYWORD, + MODEL_MESSAGE_TYPE, + OPDM_PROFILE_KEYWORD, DATA_KEYWORD, validate_models) +import logging +import json +from emf.loadflow_tool import loadflow_settings +import sys +from emf.common.integrations.opdm import OPDM +from aniso8601 import parse_datetime +import os +import triplets +import pandas +import datetime +from uuid import uuid4 + +from emf.model_retriever.model_retriever import HandlerModelsToMinio, HandlerMetadataToElastic, HandlerModelsValidator + +# Update SSH + +logger = logging.getLogger(__name__) + +parse_app_properties(caller_globals=globals(), path=config.paths.cgm_worker.model_merge) + +logging.basicConfig( + format='%(levelname)-10s %(asctime)s.%(msecs)03d %(name)-30s %(funcName)-35s %(lineno)-5d: %(message)s', + datefmt='%Y-%m-%dT%H:%M:%S', + level=logging.INFO, + handlers=[logging.StreamHandler(sys.stdout)] +) + +UPDATE_MAP = [ + { + "from_class": "SvPowerFlow", + "from_ID": "Terminal.ConductingEquipment", + "from_attribute": "SvPowerFlow.p", + "to_attribute": "EnergyConsumer.p", + }, + { + "from_class": "SvPowerFlow", + "from_ID": "Terminal.ConductingEquipment", + "from_attribute": "SvPowerFlow.q", + "to_attribute": "EnergyConsumer.q", + }, + { + "from_class": "SvPowerFlow", + "from_ID": "Terminal.ConductingEquipment", + "from_attribute": "SvPowerFlow.p", + "to_attribute": "RotatingMachine.p", + }, + { + "from_class": "SvPowerFlow", + "from_ID": "Terminal.ConductingEquipment", + "from_attribute": "SvPowerFlow.q", + "to_attribute": "RotatingMachine.q", + }, + { + "from_class": "SvTapStep", + "from_ID": "SvTapStep.TapChanger", + "from_attribute": "SvTapStep.position", + "to_attribute": "TapChanger.step", + }, + { + "from_class": "SvShuntCompensatorSections", + "from_ID": "SvShuntCompensatorSections.ShuntCompensator", + "from_attribute": "SvShuntCompensatorSections.sections", + "to_attribute": "ShuntCompensator.sections", + } +] + +FILENAME_MASK = ("{scenarioTime:%Y%m%dT%H%MZ}_{processType}_" + "{mergingEntity}-{domain}-{forEntity}_{messageType}_{version:03d}") + +NAMESPACE_MAP = { + "rdf": "http://www.w3.org/1999/02/22-rdf-syntax-ns#", + "cim": "http://iec.ch/TC57/2013/CIM-schema-cim16#", + "md": "http://iec.ch/TC57/61970-552/ModelDescription/1#", + "entsoe": "http://entsoe.eu/CIM/SchemaExtension/3/1#", + # "cgmbp": "http://entsoe.eu/CIM/Extensions/CGM-BP/2020#" +} +RDF_MAP_JSON = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'entsoe_v2.4.15_2014-08-07.json') +PATTERN_WITHOUT_TIMEZONE = '%Y-%m-%dT%H:%M:%S' + +CGM_CREATION_DATE_KEYWORD = "pmd:creationDate" +CGM_MERGING_ENTITY_KEYWORD = "pmd:mergingEntity" +CGM_MERGING_ENTITY = "BALTICRSC" +CGM_VERSION_NUMBER_KEYWORD = "pmd:versionNumber" +CGM_TIME_HORIZON_KEYWORD = 'pmd:timeHorizon' +CGM_MERGING_AREA_KEYWORD = 'pmd:mergingArea' +CGM_VALID_FROM_KEYWORD = 'pmd:validFrom' + +DEFAULT_INDEX_NAME = "emfos-logs*" + +# Variables used for local testing +TIME_HORIZON = '1D' +SCENARIO_DATE = '2024-03-14T09:30' +DEFAULT_AREA = 'EU' +VERSION = "104" +PUBLISH_TO_OPDM = False +USE_LOCAL_FILES = True +LOCAL_FOLDER = 'TC3_T1_Conform' + +PROCESS_ID_KEYWORD = "process_id" +RUN_ID_KEYWORD = 'run_id' +JOB_ID_KEYWORD = 'job_id' + +FULL_PATH_KEYWORD = 'full_path' +AREA_KEYWORD = 'AREA' +INCLUDED_TSO_KEYWORD = 'INCLUDED' +EXCLUDED_TSO_KEYWORD = 'EXCLUDED' +DEFAULT_MERGE_TYPES = {'CGM': {AREA_KEYWORD: 'EU', + INCLUDED_TSO_KEYWORD: [], + EXCLUDED_TSO_KEYWORD: ['APG']}, + 'RMM': {AREA_KEYWORD: 'BA', + INCLUDED_TSO_KEYWORD: ['ELERING', 'AST', 'LITGRID', 'PSE'], + EXCLUDED_TSO_KEYWORD: []}} + +LOCAL_STORAGE_LOCATION = './merged_examples/' + +DEFAULT_TSO = [] + + +class DownloadModels(Enum): + """ + For determining from where to download files + """ + OPDM = 1 + MINIO = 2 + OPDM_AND_MINIO = 3 + + +def load_rdf_map(file_name: str = RDF_MAP_JSON): + """ + loads rdf map file + :param file_name: file from where to load + :return: rdf map + """ + with open(file_name, 'r') as file_object: + rdf_map = json.load(file_object) + return rdf_map + + +def check_dataframe(first_input=None, second_input=None): + """ + Escapes first input if not given + :param first_input: first element to be checked + :param second_input: second element to be checked + :return: first_input ro second_input (for dataframes) + """ + if first_input is not None and isinstance(first_input, pandas.DataFrame): + return first_input + return second_input + + +def get_local_models(time_horizon: str = TIME_HORIZON, + scenario_date: str = SCENARIO_DATE, + use_local_files: bool = USE_LOCAL_FILES, + local_file_folder: str = LOCAL_FOLDER, + download_policy: DownloadModels = DownloadModels.OPDM_AND_MINIO, + allow_merging_entities: bool = False, + igm_files_needed=None, + opdm_client: OPDM = None): + """ + For local testing only. Takes the files from the folder specified that can be passed to the next steps. Fallback + is set to getting the files from opdm + :param time_horizon: time horizon of the igms + :param scenario_date: the date of the scenario for which the igm was created + :param use_local_files: true, uses local files + :param local_file_folder: unique folder where to find files + :param download_policy: from where to download models + :param allow_merging_entities: true escapes already merged files + :param igm_files_needed: specify specific igm files needed + :param opdm_client: client for the opdm + """ + if igm_files_needed is None: + igm_files_needed = ['EQ'] + try: + if use_local_files: + available_models, latest_boundary = get_local_entsoe_files(path_to_directory=local_file_folder, + allow_merging_entities=allow_merging_entities, + igm_files_needed=igm_files_needed) + else: + raise LocalFileLoaderError + except FileNotFoundError: + logger.info(f"Getting data from OPDM") + available_models, latest_boundary = get_models(time_horizon=time_horizon, + scenario_date=scenario_date, + download_policy=download_policy, + opdm_client=opdm_client) + return available_models, latest_boundary + + +class CgmModelType(Enum): + BOUNDARY = 1 + IGM = 2 + + +def run_model_retriever_pipeline(opdm_models: dict | list, + latest_boundary: dict = None, + model_type: CgmModelType = CgmModelType.IGM): + """ + Initializes model_retriever pipeline to download, validate and push the models to minio/elastic + THIS IS A HACK!!! DO NOT USE IT ANYWHERE ELSE THAN IN TESTING MODE + :param opdm_models: dictionary of opdm_models + :param latest_boundary: + :param model_type: specify whether the files are boundary data or igm data + : return updated opdm models + """ + minio_handler = HandlerModelsToMinio() + validator_handler = HandlerModelsValidator() + metadata_handler = HandlerMetadataToElastic() + if isinstance(opdm_models, dict): + opdm_models = [opdm_models] + opdm_models = minio_handler.handle_reduced(opdm_objects=opdm_models) + if model_type == CgmModelType.IGM: + opdm_models = validator_handler.handle(opdm_objects=opdm_models, latest_boundary=latest_boundary) + opdm_models = metadata_handler.handle(opdm_objects=opdm_models) + return opdm_models + + +def get_latest_boundary(opdm_client: OPDM = None, download_policy: DownloadModels = DownloadModels.OPDM_AND_MINIO): + """ + Tries to get the boundary data from OPDM, if not successful, fallback to Minio and take the latest + Alternative would be to check depends on + :param opdm_client: OPDM client + :param download_policy: where to first download the boundary data + :return boundary data + """ + boundary_data = None + if download_policy == DownloadModels.MINIO: + # Not the quickest way to get it + # boundary_data = get_boundary_data_from_minio() + return boundary_data + try: + opdm_client = opdm_client or OPDM() + boundary_data = opdm_client.get_latest_boundary() + # if model_retriever_pipeline: + # boundary_data = run_model_retriever_pipeline(opdm_models=boundary_data, model_type=CgmModelType.BOUNDARY) + # raise zeep.exceptions.Fault + except zeep.exceptions.Fault as fault: + logger.error(f"Could not get boundary data from OPDM: {fault}") + # boundary_data = get_boundary_data_from_minio() + # should be query_data, but for now ask it minio + except Exception as ex: + logger.error(f"Undefined exception when getting boundary data: {ex}") + # boundary_data = get_boundary_data_from_minio() + finally: + return boundary_data + + +def get_models(time_horizon: str = TIME_HORIZON, + scenario_date: str = SCENARIO_DATE, + included_tsos: list | str = None, + excluded_tsos: list | str = None, + download_policy: DownloadModels = DownloadModels.OPDM_AND_MINIO, + model_retriever_pipeline: bool = False, + opdm_client: OPDM = None): + """ + Gets models from opdm and/or minio + NB! Priority is given to Minio! + Workflow: + 1) Get models from opdm if selected + 2) Get models from minio if selected or opdm failed + 3) If requested from both, take data from minio and extend it from opdm + 4) By default get boundary from opdm + 5) Fallback: get boundary from minio + :param time_horizon: time horizon of the igms + :param scenario_date: the date of the scenario for which the igm was created + :param included_tsos: list or string of tso names, that should be included + :param excluded_tsos: list or string of tso names, that should be excluded + :param download_policy: from where to download models + :param model_retriever_pipeline + :param opdm_client: client for the opdm + """ + opdm_models = None + minio_models = None + # 1 Get boundary data + boundary_data = get_latest_boundary(opdm_client=opdm_client, download_policy=download_policy) + # 1 if opdm is selected, try to download from there + if download_policy == DownloadModels.OPDM or download_policy == DownloadModels.OPDM_AND_MINIO: + opdm_models = get_models_from_opdm(time_horizon=time_horizon, + scenario_date=scenario_date, + included_tsos=included_tsos, + excluded_tsos=excluded_tsos, + opdm_client=opdm_client) + # Validate raw input models + if not model_retriever_pipeline: + opdm_models = validate_models(available_models=opdm_models, latest_boundary=boundary_data) + # 2 if minio is selected or opdm failed, download data from there + if download_policy == DownloadModels.MINIO or download_policy == DownloadModels.OPDM_AND_MINIO or not opdm_models: + minio_models = get_models_from_elastic_minio(time_horizon=time_horizon, + scenario_date=scenario_date, + included_tsos=included_tsos, + excluded_tsos=excluded_tsos) + # If getting boundary failed try to get it from the dependencies + if not boundary_data: + boundary_data = get_boundary_from_dependencies(igm_models=minio_models) + # If something was got from opdm, run through it model_retriever pipeline + if download_policy == DownloadModels.OPDM: + if model_retriever_pipeline and opdm_models: + opdm_models = run_model_retriever_pipeline(opdm_models=opdm_models) + igm_models = opdm_models or minio_models + elif download_policy == DownloadModels.MINIO: + igm_models = minio_models + else: + # 3. When merge is requested, give priority to minio, update it from opdm + igm_models = minio_models + existing_tso_names = [model.get('pmd:TSO') for model in minio_models] + if opdm_models: + additional_tso_models = [model for model in opdm_models if model.get('pmd:TSO') not in existing_tso_names] + if model_retriever_pipeline and additional_tso_models: + additional_tso_models = run_model_retriever_pipeline(opdm_models=additional_tso_models) + igm_models.extend(additional_tso_models) + return igm_models, boundary_data + + +def filter_models_by_tsos(igm_models: list, included_tsos: list | str = None, excluded_tsos: list | str = None): + """ + Filters the list of models to include or to exclude specific tsos if they are given + :param igm_models: list of igm models + :param included_tsos: list or string of tso names, if given, only matching models are returned + :param excluded_tsos: list or string of tso names, if given, matching models will be discarded + :return updated list of igms + """ + if included_tsos: + included_tsos = [included_tsos] if isinstance(included_tsos, str) else included_tsos + igm_models = [model for model in igm_models if model.get('pmd:TSO') in included_tsos] + if excluded_tsos: + excluded_tsos = [excluded_tsos] if isinstance(excluded_tsos, str) else excluded_tsos + igm_models = [model for model in igm_models if not model.get('pmd:TSO') in excluded_tsos] + return igm_models + + +def get_models_from_opdm(time_horizon: str, + scenario_date: str, + included_tsos: list | str = None, + excluded_tsos: list | str = None, + opdm_client: OPDM = None): + """ + Gets models from opdm + :param time_horizon: time horizon of the igms + :param scenario_date: the date of the scenario for which the igm was created + :param included_tsos: list or string of tso names, if given, only matching models are returned + :param excluded_tsos: list or string of tso names, if given, matching models will be discarded + :param opdm_client: client for the opdm + :return list of models if found, None otherwise + """ + available_models = None + try: + opdm_client = opdm_client or OPDM() + scenario_date_iso = datetime.datetime.fromisoformat(scenario_date) + converted_scenario_date = scenario_date_iso.strftime(PATTERN_WITHOUT_TIMEZONE) + received_models = opdm_client.get_latest_models_and_download(time_horizon=time_horizon, + scenario_date=converted_scenario_date) + available_models = filter_models_by_tsos(igm_models=received_models, + included_tsos=included_tsos, + excluded_tsos=excluded_tsos) + except zeep.exceptions.Fault as fault: + logger.error(f"Could not connect to OPDM: {fault}") + except Exception as ex: + logger.error(f"Unknown exception when getting data from opdm: {ex}") + finally: + return available_models + + +def get_boundary_from_dependencies(igm_models: list): + """ + Gets boundary data from dependencies + Lists all dependencies from models, filters those which are BDS, takes the latest, unpacks it, downloads files to it + and if everything went well then returns the result + :param igm_models: list of igm models + :return: boundary data if everything went successfully, None otherwise + """ + # Get all dependencies + try: + dependencies = [model.get('opde:Dependencies', {}).get('opde:DependsOn') for model in igm_models] + boundaries = [dependency for dependency in dependencies + if dependency.get('opdm:OPDMObject', {}).get('opde:Object-Type') == 'BDS'] + latest_date = max([parse_datetime(entry.get('opdm:OPDMObject', {}).get('pmd:scenarioDate')) + for entry in boundaries]) + latest_boundaries = [boundary for boundary in boundaries + if + parse_datetime(boundary.get('opdm:OPDMObject', {}).get('pmd:scenarioDate')) == latest_date] + if len(latest_boundaries) > 0 and (latest_boundary_value := (latest_boundaries[0]).get('opdm:OPDMObject')): + latest_boundary_value = get_content(metadata=latest_boundary_value) + if all(profile.get('opdm:Profile', {}).get('DATA') + for profile in dict(latest_boundary_value).get('opde:Component', [])): + return latest_boundary_value + except ValueError: + logger.warning(f"Dependencies do not contain any boundary data") + return None + + +def get_models_from_elastic_minio(time_horizon: str, + scenario_date: str, + included_tsos: list | str = None, + excluded_tsos: list | str = None): + """ + Asks metadata from elastic, attaches files from minio + NB! currently only those models are returned which have files in minio + :param included_tsos: list or string of tso names, if given, only matching models are returned + :param excluded_tsos: list or string of tso names, if given, matching models will be discarded + :param time_horizon: the time horizon + :param scenario_date: the date requested + :return: list of models + """ + query = {'pmd:scenarioDate': scenario_date, 'valid': True} + + # If time horizon is not ID, query by time horizon + if time_horizon != 'ID': + query['pmd:timeHorizon'] = time_horizon + + query_response = query_data(metadata_query=query, return_payload=True) + + # filter out duds: igms that are missing file(s) + files_present = [model for model in query_response + if all(field.get('opdm:Profile', {}).get('DATA') for field in model.get('opde:Component', {}))] + query_response = files_present + + # If time horizon is ID query everything and filter the smallest run ids per tso + # TODO check if this is valid + if time_horizon == 'ID': + logger.warning(f"Selected time horizon {time_horizon}, smallest number of the runs") + time_horizon = [f"{time_h:02}" for time_h in range(1, 31)] + query_response = [response for response in query_response if response.get("pmd:timeHorizon") in time_horizon] + tsos = set([model.get('pmd:TSO') for model in query_response]) + latest_ids = [] + for tso in tsos: + smallest_id = sorted([model.get('pmd:timeHorizon') + for model in query_response if model.get('pmd:TSO') == tso], key=lambda x: int(x))[0] + igms_by_id = [model for model in query_response + if model.get('pmd:TSO') == tso and model.get('pmd:timeHorizon') == smallest_id] + latest_ids.extend(igms_by_id) + query_response = latest_ids + + # Drop duplicates: take the latest igm if there are multiple for the same scenario date and time horizon + latest_versions = [sorted([model for model in query_response if model.get('pmd:TSO') == tso], + key=lambda x: int(x.get('pmd:versionNumber')), reverse=True)[0] + for tso in set([model.get('pmd:TSO') for model in query_response])] + query_response = latest_versions + + return filter_models_by_tsos(igm_models=query_response, included_tsos=included_tsos, excluded_tsos=excluded_tsos) + + +def get_version_number_from_minio(minio_bucket: str = EMF_OS_MINIO_BUCKET, + sub_folder: str = EMF_OS_MINIO_FOLDER, + minio_client: minio.ObjectStorage = None, + scenario_date: str = f"{CGM_MERGING_ENTITY}-EU", + modeling_entity: str = None, + time_horizon: str = None): + """ + Gets file list from minio, explodes it and retrieves the biggest matched version number + :param minio_client: if given + :param minio_bucket: the name of the bucket + :param sub_folder: prefix + :param scenario_date: date of the merge + :param modeling_entity: name of the merging entity + :param time_horizon: the time horizon + """ + new_version_number = 1 + try: + exploded_results = get_filename_dataframe_from_minio(minio_bucket=minio_bucket, + minio_client=minio_client, + sub_folder=sub_folder) + new_version_number = get_largest_version_from_filename_dataframe(exploded_results=exploded_results, + scenario_date=scenario_date, + time_horizon=time_horizon, + modeling_entity=modeling_entity) + except (ValueError, KeyError): + logger.info(f"No previous entries found, starting with version number {new_version_number:03}") + except Exception as ex: + logger.warning(f"Got minio error: {ex}, starting with version number {new_version_number:03}") + return f"{new_version_number:03}" + + +def get_filename_dataframe_from_minio(minio_bucket: str, + minio_client: minio.ObjectStorage = None, + sub_folder: str = None): + """ + Gets file list from minio bucket (prefix can be specified with sub folder) and converts to dataframe following + the standard naming convention (see get_metadata_from_filename for more details) + :param minio_client: if given + :param minio_bucket: the name of the bucket + :param sub_folder: prefix + """ + minio_client = minio_client or minio.ObjectStorage() + if sub_folder: + list_of_files = minio_client.list_objects(bucket_name=minio_bucket, + prefix=sub_folder, + recursive=True) + else: + list_of_files = minio_client.list_objects(bucket_name=minio_bucket, recursive=True) + file_name_list = [] + for file_name in list_of_files: + try: + # Escape prefixes + if not file_name.object_name.endswith(SEPARATOR_SYMBOL): + path_list = file_name.object_name.split(SEPARATOR_SYMBOL) + file_metadata = get_metadata_from_filename(path_list[-1]) + file_metadata[FULL_PATH_KEYWORD] = file_name.object_name + file_name_list.append(file_metadata) + except ValueError: + continue + except Exception as ex: + logger.warning(f"Exception when parsing the filename: {ex}") + continue + exploded_results = pandas.DataFrame(file_name_list) + return exploded_results + + +def get_boundary_data_from_minio(minio_bucket: str = 'opdm-data', + sub_folder: str = 'CGMES/ENTSOE/', + minio_client: minio.ObjectStorage = None): + """ + Searches given bucket for boundary data (ENTSOE files) takes the last entries by message types + :param minio_bucket: bucket where to search from + :param sub_folder: ease the search by giving prefix + :param minio_client: instance on minio ObjectStorage if given + :return boundary data + """ + minio_client = minio_client or minio.ObjectStorage() + boundary_value = {OPDE_COMPONENT_KEYWORD: []} + file_list = get_filename_dataframe_from_minio(minio_bucket=minio_bucket, + sub_folder=sub_folder, + minio_client=minio_client) + boundary_list = file_list[file_list['Model.modelingEntity'] == 'ENTSOE'] + filtered = boundary_list.loc[boundary_list.groupby('Model.messageType')['Model.scenarioTime'].idxmax()] + # Check if input is valid + if len(filtered.index) != 2 or sorted(filtered['Model.messageType']) != ['EQBD', 'TPBD']: + return None + filtered_elements = filtered.to_dict('records') + for opdm_profile_content in filtered_elements: + object_name = opdm_profile_content[FULL_PATH_KEYWORD] + downloaded_file = minio_client.download_object(bucket_name=minio_bucket, object_name=object_name) + opdm_profile_content[MODEL_MESSAGE_TYPE] = parse_boundary_message_type_profile( + opdm_profile_content[MODEL_MESSAGE_TYPE]) + opdm_profile_content[DATA_KEYWORD] = downloaded_file + opdm_profile_content.pop(FULL_PATH_KEYWORD) + boundary_value[OPDE_COMPONENT_KEYWORD].append({OPDM_PROFILE_KEYWORD: opdm_profile_content}) + return boundary_value + + +def get_version_number_from_elastic(index_name: str = DEFAULT_INDEX_NAME, + start_looking: datetime.datetime | str = datetime.datetime.today(), + scenario_date: str = None, + time_horizon: str = None, + modeling_entity: str = None): + """ + Checks and gets the version number from elastic + Note that it works only if logger.info(f"Publishing {instance_file.name} to OPDM") + is used when publishing files to OPDM + :param index_name: index from where to search + :param start_looking: datetime instance from where to look, if not set then takes current day + :param scenario_date: filter the file names by scenario date + :param time_horizon: filter file names by time horizon + :param modeling_entity: filter file names by modeling entity + :return version number as a string + """ + must_elements = [] + query_part = {"query_string": {"default_field": "message", "query": "*Publishing* AND *to OPDM"}} + must_elements.append(query_part) + new_version_number = 1 + if start_looking: + if isinstance(start_looking, datetime.datetime): + start_looking = start_looking.strftime("%Y-%m-%dT%H:%M:%S") + range_part = {"range": {"log_timestamp": {"gte": start_looking}}} + must_elements.append(range_part) + previous_cgm_query = {"bool": {"must": must_elements}} + try: + elastic_client = Elastic() + results = elastic_client.get_data(index=index_name, + query=previous_cgm_query, + fields=['message']) + if results.empty: + raise NoContentFromElasticException + # Get the filenames and explode them + exploded_results = (results["message"]. + str.removesuffix(' to OPDM'). + str.removeprefix('Publishing '). + map(get_metadata_from_filename). + apply(pandas.Series)) + # Filter the results if needed + new_version_number = get_largest_version_from_filename_dataframe(exploded_results=exploded_results, + scenario_date=scenario_date, + time_horizon=time_horizon, + modeling_entity=modeling_entity) + except (NoContentFromElasticException, KeyError, ValueError): + logger.info(f"No previous entries found, starting with version number {new_version_number:03}") + except Exception as ex: + logger.warning(f"Got elastic error: {ex}, starting with version number {new_version_number:03}") + finally: + return f"{new_version_number:03}" + + +def get_largest_version_from_filename_dataframe(exploded_results: pandas.DataFrame, + scenario_date: str = None, + time_horizon: str = None, + modeling_entity: str = None): + """ + Searches largest version number from a dict. Optionally the dict can be filtered beforehand + :param exploded_results: the dictionary containing exploded filenames (used get_metadata_from_filename) + :param scenario_date: optionally filter filenames by scenario date + :param time_horizon: optionally filter filenames by time horizon + :param modeling_entity: optionally filter filenames by checking if modelling entity is in the field + :return: largest found file number or 1 if key error or not found + """ + try: + if modeling_entity is not None: + exploded_results = exploded_results[exploded_results['Model.modelingEntity'].str.contains(modeling_entity)] + if scenario_date is not None: + scenario_date = f"{parse_datetime(scenario_date):%Y%m%dT%H%MZ}" + exploded_results = exploded_results[exploded_results['Model.scenarioTime'].str.contains(scenario_date)] + if time_horizon is not None: + exploded_results = exploded_results[exploded_results['Model.processType'].str.contains(time_horizon)] + # Get the largest version number and increment it by 1 + new_version_number = max(pandas.to_numeric(exploded_results["Model.version"])) + 1 + logger.info(f"Continuing with version number {new_version_number:03}") + except KeyError as key_error: + logger.info(f"{key_error}") + new_version_number = 1 + return new_version_number + + +class NoContentFromElasticException(Exception): + pass + + +def get_version_number(scenario_date: str, + time_horizon: str, + modeling_entity: str, + start_looking: str | datetime.date = None, + use_elastic: bool = True, + use_minio: bool = True, + default_version_number='104'): + """ + Gets a version number from elastic and or minio. + :param scenario_date: the date by which to look the version number + :param time_horizon: the time horizon + :param modeling_entity: the author of the previous versions + :param start_looking: can be used to cut down the elastic logs + :param use_elastic: search version number from elastic + :param use_minio: search version number from minio + :param default_version_number: return value if not found + :return largest version number from minio, elastic or default one + """ + version_number = default_version_number + version_number_minio = None + version_number_elastic = None + if use_minio: + version_number_minio = get_version_number_from_minio(time_horizon=time_horizon, scenario_date=scenario_date) + if use_elastic: + if start_looking: + version_number_elastic = get_version_number_from_elastic(start_looking=start_looking, + modeling_entity=modeling_entity, + time_horizon=time_horizon, + scenario_date=scenario_date) + else: + version_number_elastic = get_version_number_from_elastic(modeling_entity=modeling_entity, + time_horizon=time_horizon, + scenario_date=scenario_date) + if version_number_minio and version_number_elastic: + version_number = version_number_minio if int(version_number_minio) > int(version_number_elastic) \ + else version_number_elastic + else: + version_number = version_number_minio or version_number_elastic or version_number + return version_number + + +def get_time_horizon_for_intra_day(time_horizon: str, scenario_date: str, skip_past_scenario_dates: bool = False): + """ + Taken as is from previous code + :param time_horizon: time_horizon of the merged model + :param scenario_date: scenario date of the merged model + :param skip_past_scenario_dates: either to skip past intra day scenarios + :return updated time horizon value + """ + if time_horizon == "ID": + utc_now = datetime.datetime.now(datetime.timezone.utc) + parsed_date = parse_datetime(scenario_date) + time_delta = parsed_date.replace(tzinfo=None) - utc_now.replace(tzinfo=None) + if skip_past_scenario_dates and utc_now > parsed_date.replace(tzinfo=datetime.timezone.utc): + raise IntraDayPastScenarioDateException + time_horizon = f"{int(time_delta.seconds / 3600) + 1 :02d}" + return time_horizon + + +class IntraDayPastScenarioDateException(Exception): + pass + + +class CgmModelComposer: + """ + Class for gathering the data and running the merge function (copy from merge.py) + """ + + def __init__(self, + igm_models=None, + boundary_data=None, + version: str = VERSION, + time_horizon: str = TIME_HORIZON, + area: str = DEFAULT_AREA, + scenario_date: str = SCENARIO_DATE, + merging_entity: str = CGM_MERGING_ENTITY, + namespace_map=None, + rdf_map_loc: str = RDF_MAP_JSON, + rabbit_data: dict = None): + """ + Constructor, note that data gathering and filtering must be done beforehand + This just stores and merges + A refactored version of merge.py + :param igm_models: the individual grid models of the tso's + :param boundary_data: the boundary data of the region + :param version: the version number to use for the merged model + :param time_horizon: the time horizon for the merge + :param area: the area of the merge + :param scenario_date: the date of the scenario + :param merging_entity: the author of the merged model + :param namespace_map: + :param rdf_map_loc: + :param rabbit_data: + """ + if namespace_map is None: + namespace_map = NAMESPACE_MAP + self.igm_models = igm_models + if self.igm_models is None: + self.igm_models = [] + self.boundary_data = boundary_data + self.sv_data = None + self.ssh_data = None + + self.time_horizon = time_horizon + self.area = area + self.scenario_date = scenario_date + + self._version = version + self.merging_entity = merging_entity + self._merged_model = None + self.merge_report = {} + self._opdm_data = None + self._opdm_object_meta = None + self.namespace_map = namespace_map + self.cgm = None + self.rdf_map = load_rdf_map(rdf_map_loc) + self.rabbit_data = rabbit_data + + def get_tso_list(self): + return ', '.join([model.get('pmd:TSO', '') for model in self.igm_models]) + + def get_log_message(self): + return f"Merge at {self.scenario_date}, time horizon {self.time_horizon}, tsos: {self.get_tso_list()}" + + @property + def merged_model(self): + """ + Gets merged model + """ + if self._merged_model is None and self.igm_models and self.boundary_data: + self._merged_model = load_model(self.igm_models + [self.boundary_data]) + # Run LF + self.merge_report = {} + loadflow_report = pypowsybl.report.Reporter() + try: + loadflow_result = pypowsybl.loadflow.run_ac(network=self._merged_model[NETWORK_KEYWORD], + parameters=loadflow_settings.CGM_DEFAULT, + reporter=loadflow_report) + loadflow_result_dict = [attr_to_dict(island) for island in loadflow_result] + self.merge_report["LOADFLOW_REPORT"] = json.loads(loadflow_report.to_json()) + self.merge_report["LOADFLOW_RESULTS"] = loadflow_result_dict + except pypowsybl._pypowsybl.PyPowsyblError as p_error: + logger.error(f"Error at calculating loadflow: {p_error}") + raise Exception(p_error) + return self._merged_model + + @property + def opdm_data(self): + """ + Gets opdm data (igm models and boundary data combined) + """ + if isinstance(self._opdm_data, pandas.DataFrame): + return self._opdm_data + if self.igm_models and self.boundary_data: + self._opdm_data = load_opdm_data(self.igm_models + [self.boundary_data]) + return self._opdm_data + + @property + def opdm_object_meta(self): + """ + Gets base for opdm object meta + """ + if self._opdm_object_meta is None: + sv_id = self.merged_model[NETWORK_META_KEYWORD]['id'].split("uuid:")[-1] + self.time_horizon = get_time_horizon_for_intra_day(self.time_horizon, self.scenario_date) + self._opdm_object_meta = {'pmd:fullModel_ID': sv_id, + 'pmd:creationDate': f"{datetime.datetime.utcnow():%Y-%m-%dT%H:%M:%S.%fZ}", + 'pmd:timeHorizon': self.time_horizon, + 'pmd:cgmesProfile': 'SV', + 'pmd:contentType': 'CGMES', + 'pmd:modelPartReference': '', + 'pmd:mergingEntity': f"{self.merging_entity}", + 'pmd:mergingArea': self.area, + 'pmd:validFrom': f"{parse_datetime(self.scenario_date):%Y%m%dT%H%MZ}", + 'pmd:modelingAuthoritySet': 'http://www.baltic-rsc.eu/OperationalPlanning', + 'pmd:scenarioDate': f"{parse_datetime(self.scenario_date):%Y-%m-%dT%H:%M:00Z}", + 'pmd:modelid': sv_id, + 'pmd:description': f""" + {self.time_horizon} + pypowsybl_{pypowsybl.__version__} + {self.merging_entity} + """, + 'pmd:versionNumber': self.version, + 'file_type': "xml"} + return self._opdm_object_meta + + @property + def version(self): + """ + Gets version + """ + return self._version + + def set_sv_file(self, + merged_model=None, + opdm_object_meta=None): + merged_model = merged_model or self.merged_model + opdm_object_meta = opdm_object_meta or self.opdm_object_meta + export_report = pypowsybl.report.Reporter() + exported_model = export_model(network=merged_model[NETWORK_KEYWORD], + opdm_object_meta=opdm_object_meta, + profiles=["SV"]) + logger.info(f"Exporting merged model to {exported_model.name}") + # Load SV data + sv_data = pandas.read_RDF([exported_model]) + # Update SV filename + sv_data.set_VALUE_at_KEY(key='label', value=filename_from_metadata(opdm_object_meta)) + # Update SV description + sv_data.set_VALUE_at_KEY(key='Model.description', value=opdm_object_meta['pmd:description']) + # Update SV created time + sv_data.set_VALUE_at_KEY(key='Model.created', value=opdm_object_meta['pmd:creationDate']) + # Update SSH Model.scenarioTime + sv_data.set_VALUE_at_KEY('Model.scenarioTime', opdm_object_meta['pmd:scenarioDate']) + # Update SV metadata + sv_data = triplets.cgmes_tools.update_FullModel_from_filename(sv_data) + self.sv_data = sv_data + return sv_data, opdm_object_meta + + def set_ssh_files(self, + valid_models=None, + latest_boundary=None, + sv_data=None, + opdm_object_meta=None, + update_map=None): + + valid_models = valid_models or self.igm_models + latest_boundary = latest_boundary or self.boundary_data + sv_data = check_dataframe(sv_data, self.sv_data) + opdm_object_meta = opdm_object_meta or self.opdm_object_meta + update_map = update_map or UPDATE_MAP + + ssh_data = load_opdm_data(valid_models, "SSH") + ssh_data = triplets.cgmes_tools.update_FullModel_from_filename(ssh_data) + + # Update SSH Model.scenarioTime + ssh_data.set_VALUE_at_KEY('Model.scenarioTime', opdm_object_meta['pmd:scenarioDate']) + + # Load full original data to fix issues + data = load_opdm_data(valid_models + [latest_boundary]) + terminals = data.type_tableview("Terminal") + + # Update SSH data from SV + updated_ssh_data = ssh_data.copy() + for update in update_map: + source_data = sv_data.type_tableview(update['from_class']).reset_index(drop=True) + + # Merge with terminal, if needed + if terminal_reference := \ + [column_name if ".Terminal" in column_name else None for column_name in source_data.columns][0]: + source_data = source_data.merge(terminals, left_on=terminal_reference, right_on='ID') + logger.debug(f"Added Terminals to {update['from_class']}") + + updated_ssh_data = updated_ssh_data.update_triplet_from_triplet( + source_data.rename(columns={update['from_ID']: 'ID', update['from_attribute']: update['to_attribute']})[ + ['ID', update['to_attribute']]].set_index('ID').tableview_to_triplet(), add=False) + + # Generate new UUID for updated SSH + updated_ssh_id_map = {} + for old_id in updated_ssh_data.query("KEY == 'Type' and VALUE == 'FullModel'").ID.unique(): + new_id = str(uuid4()) + updated_ssh_id_map[old_id] = new_id + logger.info(f"Assigned new UUID for updated SSH: {old_id} -> {new_id}") + + # Update SSH ID-s + updated_ssh_data = updated_ssh_data.replace(updated_ssh_id_map) + + # Update in SV SSH references + sv_data = sv_data.replace(updated_ssh_id_map) + + # Add SSH supersedes reference to old SSH + ssh_supersedes_data = pandas.DataFrame( + [{"ID": item[1], "KEY": "Model.Supersedes", "VALUE": item[0]} for item in updated_ssh_id_map.items()]) + ssh_supersedes_data['INSTANCE_ID'] = updated_ssh_data.query("KEY == 'Type'").merge(ssh_supersedes_data.ID)[ + 'INSTANCE_ID'] + updated_ssh_data = updated_ssh_data.update_triplet_from_triplet(ssh_supersedes_data) + + # Update SSH metadata + updated_ssh_data = triplets.cgmes_tools.update_FullModel_from_dict(updated_ssh_data, { + "Model.version": opdm_object_meta['pmd:versionNumber'], + "Model.created": opdm_object_meta['pmd:creationDate'], + "Model.mergingEntity": opdm_object_meta['pmd:mergingEntity'], + "Model.domain": opdm_object_meta['pmd:mergingArea'] + }) + self.ssh_data = updated_ssh_data + self.sv_data = sv_data + return updated_ssh_data, sv_data + + def set_cgm(self, updated_ssh_data=None, + sv_data=None, + valid_models=None, + latest_boundary=None, + opdm_object_meta=None, + filename_mask: str = FILENAME_MASK, + namespace_map=None): + # Update SSH filenames + updated_ssh_data = check_dataframe(updated_ssh_data, self.ssh_data) + sv_data = check_dataframe(sv_data, self.sv_data) + valid_models = valid_models or self.igm_models + latest_boundary = latest_boundary or self.boundary_data + opdm_object_meta = opdm_object_meta or self.opdm_object_meta + namespace_map = namespace_map or NAMESPACE_MAP + data = load_opdm_data(valid_models + [latest_boundary]) + updated_ssh_data = triplets.cgmes_tools.update_filename_from_FullModel(updated_ssh_data, + filename_mask=filename_mask) + + # Update SV metadata + sv_metadata = {"Model.version": opdm_object_meta['pmd:versionNumber'], + "Model.created": opdm_object_meta['pmd:creationDate']} + sv_data = triplets.cgmes_tools.update_FullModel_from_dict(sv_data, sv_metadata) + + # Fix SV - Remove Shunt Sections for EQV Shunts + equiv_shunt = data.query("KEY == 'Type' and VALUE == 'EquivalentShunt'") + if len(equiv_shunt) > 0: + shunts_to_remove = sv_data.merge(sv_data.query("KEY == 'SvShuntCompensatorSections.ShuntCompensator'"). + merge(equiv_shunt.ID, + left_on='VALUE', + right_on="ID", how='inner', + suffixes=('', '_EQVShunt')).ID) + if len(shunts_to_remove) > 0: + logger.warning(f'Removing invalid SvShuntCompensatorSections for EquivalentShunt') + sv_data = triplets.rdf_parser.remove_triplet_from_triplet(sv_data, shunts_to_remove) + + # Fix SV - add missing SV Tap Steps + ssh_tap_steps = updated_ssh_data.query("KEY == 'TapChanger.step'") + sv_tap_steps = sv_data.query("KEY == 'SvTapStep.TapChanger'") + missing_sv_tap_steps = ssh_tap_steps.merge(sv_tap_steps[['VALUE']], left_on='ID', right_on="VALUE", how='left', + indicator=True, suffixes=('', '_SV')).query("_merge == 'left_only'") + + tap_steps_to_be_added = [] + sv_instance_id = sv_data.INSTANCE_ID.iloc[0] + for tap_changer in missing_sv_tap_steps.itertuples(): + id_value = str(uuid4()) + logger.warning(f'Missing SvTapStep for {tap_changer.ID}, adding SvTapStep {id_value} ' + f'and taking tap value {tap_changer.VALUE} from SSH') + tap_steps_to_be_added.extend([ + (id_value, 'Type', 'SvTapStep', sv_instance_id), + (id_value, 'SvTapStep.TapChanger', tap_changer.ID, sv_instance_id), + (id_value, 'SvTapStep.position', tap_changer.VALUE, sv_instance_id), + ]) + + sv_data = pandas.concat( + [sv_data, pandas.DataFrame(tap_steps_to_be_added, columns=['ID', 'KEY', 'VALUE', 'INSTANCE_ID'])], + ignore_index=True) + + export = (pandas.concat([updated_ssh_data, sv_data], ignore_index=True). + export_to_cimxml(rdf_map=self.rdf_map, + namespace_map=namespace_map, + export_undefined=False, + export_type="xml_per_instance_zip_per_xml", + debug=False, + export_to_memory=True)) + self.cgm = export + return export + + def compose_cgm(self): + """ + Composes the cgm + """ + logger.info(f"Merging at {self.scenario_date}, " + f"time horizon: {self.time_horizon}, " + f"version: {self.version}, " + f"area: {self.area}, " + f"tsos: {', '.join([model.get('pmd:TSO') for model in self.igm_models])}") + self.set_sv_file() + self.set_ssh_files() + self.set_cgm() + return self.cgm + + def get_consolidated_metadata(self, rabbit_data: dict = None, additional_fields: dict = None): + """ + Combines existing metadata with rabbit data for reporting + NOTE! Change this + """ + if not rabbit_data: + rabbit_data = self.rabbit_data + consolidated_data = self.opdm_object_meta + consolidated_data[PROCESS_ID_KEYWORD] = rabbit_data.get(PROCESS_ID_KEYWORD) + consolidated_data[RUN_ID_KEYWORD] = rabbit_data.get(RUN_ID_KEYWORD) + consolidated_data[JOB_ID_KEYWORD] = rabbit_data.get(JOB_ID_KEYWORD) + if additional_fields: + consolidated_data.update(additional_fields) + return consolidated_data + + def get_folder_name(self): + model_date = f"{parse_datetime(self.scenario_date):%Y%m%dT%H%MZ}" + operator_name = '-'.join([self.merging_entity, self.area]) + folder_name = '_'.join([model_date, self.time_horizon, operator_name, self._version]) + return folder_name + + +def save_merged_model_to_local_storage(cgm_files, + cgm_folder_name: str = None, + local_storage_location: str = LOCAL_STORAGE_LOCATION): + """ + Saves merged cgm to local storage. This is meant for testing purposes only + :param cgm_files: list of cgm_files + :param cgm_folder_name: sub folder name where to gather files + :param local_storage_location: path to store + :return: None + """ + if not local_storage_location: + return + if cgm_folder_name is not None: + local_storage_location = local_storage_location + '/' + cgm_folder_name + local_storage_location = check_the_folder_path(local_storage_location) + if not os.path.exists(local_storage_location): + os.makedirs(local_storage_location) + for cgm_file in cgm_files: + full_file_name = local_storage_location + cgm_file.name + with open(full_file_name, 'wb') as f: + f.write(cgm_file.getbuffer()) + + +def publish_merged_model_to_opdm(opdm_client: opdm.OPDM = None, + cgm_files: list = None): + """ + Sends files to opdm + :param opdm_client: opdm client + :param cgm_files: list of files to be sent + :return tuple of results + """ + # Post files if given + result = () + # Send files out if given + if cgm_files and len(cgm_files) > 0: + opdm_publication_responses = [] + for instance_file in cgm_files: + try: + opdm_client = opdm_client or opdm.OPDM() + logger.info(f"Publishing {instance_file.name} to OPDM") + file_response = opdm_client.publication_request(instance_file, "CGMES") + opdm_publication_responses.append({"name": instance_file.name, "response": file_response}) + if "OperationFailure" in file_response.tag: + logger.error(f"Failed to publish {instance_file.name} to OPDM, OPDM OperationFailure") + except zeep.exceptions.Fault as fault: + logger.error(f"Failed to publish {instance_file.name} to OPDM, connection failure: {fault}") + except Exception as ex_message: + logger.error(f"Failed to publish {instance_file.name} to OPDM, unknown error: {ex_message}") + logger.info(f"Publishing results: {opdm_publication_responses}") + result = result + (opdm_publication_responses,) + return result + + +def save_merged_model_to_minio(minio_bucket: str = EMF_OS_MINIO_BUCKET, + folder_in_bucket: str = EMF_OS_MINIO_FOLDER, + minio_client: minio.ObjectStorage = None, + time_horizon: str = None, + scenario_datetime: str = None, + merging_entity: str = None, + area: str = None, + version: str = None, + cgm_files: [] = None): + """ + Posts cgm files to minio + implementation of path ///cgm_files + :param minio_bucket: bucket in minio + :param minio_client: the instance of object storage client + :param time_horizon: time_horizon for the path tree + :param scenario_datetime: scenario_date for the path tree + :param merging_entity: the entity responsible for the merge + :param version: the version number + :param area: the area where the merge was done + :param cgm_files: list of individual cgm files + :param folder_in_bucket: general folder (prefix) in bucket where + :return: file name and link to file, the link to the file + """ + links_to_file = {} + minio_client = minio_client or minio.ObjectStorage() + if cgm_files is not None: + # check if the given bucket exists + if not minio_client.client.bucket_exists(bucket_name=minio_bucket): + logger.warning(f"{minio_bucket} does not exist") + return links_to_file + for cgm_file in cgm_files: + file_name = cgm_file.name + file_name_exploded = get_metadata_from_filename(file_name) + time_horizon = time_horizon or file_name_exploded.get("Model.processType", '') + # TODO Keep intra day merged model in one folder? + file_scenario_datetime = scenario_datetime or file_name_exploded.get("Model.scenarioTime", None) + if file_scenario_datetime: + file_scenario_datetime = parse_datetime(file_scenario_datetime) + merging_entity = merging_entity or file_name_exploded.get("Model.mergingEntity", '') + area = area or file_name_exploded.get("Model.domain", '') + version = version or file_name_exploded.get("Model.version") + scenario_date = '' + scenario_time = '' + if file_scenario_datetime: + scenario_date = f"{file_scenario_datetime:%Y%m%d}" + scenario_time = f"{file_scenario_datetime:%H%M00}" + file_type = file_name_exploded.get("Model.messageType") + file_path_elements = [folder_in_bucket, time_horizon, merging_entity, area, + scenario_date, scenario_time, version, file_type, cgm_file.name] + full_file_name = SEPARATOR_SYMBOL.join(file_path_elements) + full_file_name = full_file_name.replace('//', '/') + cgm_file.name = full_file_name + minio_client.upload_object(file_path_or_file_object=cgm_file, bucket_name=minio_bucket) + time_to_expire = timedelta(days=7) + link_to_file = minio_client.client.get_presigned_url(method="GET", + bucket_name=minio_bucket, + object_name=cgm_file.name, + expires=time_to_expire) + cgm_file.name = file_name + links_to_file[file_name] = link_to_file + return links_to_file + + +def publish_metadata_to_elastic(cgm_index: str, metadata: dict, elastic_server: str = elastic.ELK_SERVER): + """ + Publishes metadata to elastic + :param cgm_index: table name + :param metadata: metadata information + :param elastic_server: address to elastic server + :return response + """ + if metadata: + response = elastic.Elastic.send_to_elastic(index=cgm_index, + json_message=metadata, + server=elastic_server) + return response + + +if __name__ == '__main__': + # Run the entire pipeline in functions + logging.basicConfig( + format='%(levelname) -10s %(asctime) -20s %(name) -45s %(funcName) -35s %(lineno) -5d: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + level=logging.INFO, + handlers=[logging.StreamHandler(sys.stdout)] + ) + # testing_time_horizon = 'ID' + testing_time_horizon = '1D' + # testing_scenario_date = "2024-04-05T08:30:00+00:00" + # testing_scenario_date = "2024-04-12T22:30:00+00:00" + # testing_scenario_date = "2024-04-12T21:30:00+00:00" + # testing_scenario_date = "2024-04-11T21:30:00+00:00" + # testing_scenario_date = "2024-04-12T03:30:00+00:00" + testing_scenario_date = "2024-04-11T11:30:00+00:00" + testing_area = 'EU' + take_data_from_local = False + testing_merging_entity = MERGING_ENTITY + + wanted_tsos = [] + unwanted_tsos = ['APG', '50Hertz', 'SEPS'] + + if take_data_from_local: + folder_to_study = 'test_case' + igm_model_data, latest_boundary_data = get_local_entsoe_files(path_to_directory=folder_to_study, + allow_merging_entities=False, + igm_files_needed=['EQ']) + else: + + igm_model_data, latest_boundary_data = get_models(time_horizon=testing_time_horizon, + scenario_date=testing_scenario_date, + included_tsos=wanted_tsos, + excluded_tsos=unwanted_tsos, + download_policy=DownloadModels.OPDM_AND_MINIO) + test_version_number = get_version_number(scenario_date=testing_scenario_date, + time_horizon=testing_time_horizon, + modeling_entity=f"{testing_merging_entity}-{testing_area}") + + if not igm_model_data or not latest_boundary_data: + logger.error(f"Terminating") + sys.exit() + cgm_input = CgmModelComposer(igm_models=igm_model_data, + boundary_data=latest_boundary_data, + time_horizon=testing_time_horizon, + scenario_date=testing_scenario_date, + area=testing_area, + merging_entity=testing_merging_entity, + version=test_version_number) + cgm = cgm_input.compose_cgm() + test_folder_name = cgm_input.get_folder_name() + save_merged_model_to_local_storage(cgm_files=cgm, cgm_folder_name=test_folder_name) + # save_merged_model_to_minio(cgm_files=cgm) + # publish_merged_model_to_opdm(cgm_files=cgm) diff --git a/emf/loadflow_tool/validator.py b/emf/loadflow_tool/validator.py index 3b2c255..f7a1fbd 100644 --- a/emf/loadflow_tool/validator.py +++ b/emf/loadflow_tool/validator.py @@ -1,12 +1,23 @@ -import pypowsybl +import os.path +import shutil +import zipfile +from enum import Enum +from io import BytesIO +from os import listdir +from os.path import join +from zipfile import ZipFile +import ntpath + import logging -import json import time import math + +import requests + import config +from emf.common.logging.custom_logger import PyPowsyblLogGatherer, PyPowsyblLogReportingPolicy, check_the_folder_path from emf.loadflow_tool.loadflow_settings import * -from emf.loadflow_tool.helper import attr_to_dict, load_model -from emf.common.logging import custom_logger +from emf.loadflow_tool.helper import attr_to_dict, load_model, metadata_from_filename from emf.common.config_parser import parse_app_properties from emf.common.integrations import elastic @@ -19,6 +30,73 @@ # TODO - record AC NP and DC Flows to metadata storage (and more), this is useful for replacement logic and scaling # note - multiple islands wo load or generation can be an issue +ENTSOE_FOLDER = './path_to_ENTSOE_zip/TestConfigurations_packageCASv2.0' + +TSO_KEYWORD = 'pmd:TSO' +DATA_KEYWORD = 'DATA' +FILENAME_KEYWORD = 'pmd:fileName' +CGMES_PROFILE = 'pmd:cgmesProfile' +MODEL_PART_REFERENCE = 'pmd:modelPartReference' +MODEL_MESSAGE_TYPE = 'Model.messageType' +XML_KEYWORD = '.xml' +ZIP_KEYWORD = '.zip' +MODEL_MODELING_ENTITY = 'Model.modelingEntity' +MODEL_MERGING_ENTITY = 'Model.mergingEntity' +PMD_MERGING_ENTITY = 'pmd:mergingEntity' +MODEL_DOMAIN = 'Model.domain' +PMD_MERGING_AREA = 'pmd:mergingArea' +MODEL_FOR_ENTITY = 'Model.forEntity' +OPDE_COMPONENT_KEYWORD = 'opde:Component' +OPDM_PROFILE_KEYWORD = 'opdm:Profile' +MISSING_TSO_NAME = 'UnknownTSO' +LONG_FILENAME_SUFFIX = u"\\\\?\\" + +VALIDATION_STATUS_KEYWORD = 'VALIDATION_STATUS' +VALID_KEYWORD = 'valid' +VALIDATION_DURATION_KEYWORD = 'validation_duration_s' +LOADFLOW_RESULTS_KEYWORD = 'loadflow_results' + +PREFERRED_FILE_TYPES = [XML_KEYWORD, ZIP_KEYWORD] +IGM_FILE_TYPES = ['_EQ_', '_TP_', '_SV_', '_SSH_'] +BOUNDARY_FILE_TYPES = ['_EQBD_', '_TPBD_', '_EQ_BD_', '_TP_BD_'] +BOUNDARY_FILE_TYPE_FIX = {'_EQ_BD_': '_EQBD_', '_TP_BD_': '_TPBD_'} +SPECIAL_TSO_NAME = ['ENTSO-E'] + +"""Mapper for elements of the file name to igm profile""" +IGM_FILENAME_MAPPING_TO_OPDM = {FILENAME_KEYWORD: FILENAME_KEYWORD, + 'Model.scenarioTime': 'pmd:scenarioDate', + 'Model.processType': 'pmd:timeHorizon', + MODEL_MODELING_ENTITY: MODEL_PART_REFERENCE, + MODEL_MESSAGE_TYPE: CGMES_PROFILE, + 'Model.version': 'pmd:versionNumber'} + +"""Mapper for the elements of the file name to boundary profile""" +BOUNDARY_FILENAME_MAPPING_TO_OPDM = {FILENAME_KEYWORD: FILENAME_KEYWORD, + 'Model.scenarioTime': 'pmd:scenarioDate', + MODEL_MODELING_ENTITY: MODEL_PART_REFERENCE, + MODEL_MESSAGE_TYPE: CGMES_PROFILE, + 'Model.version': 'pmd:versionNumber'} +SYSTEM_SPECIFIC_FOLDERS = ['__MACOSX'] +UNWANTED_FILE_TYPES = ['.xlsx', '.docx', '.pptx'] +RECURSION_LIMIT = 2 +USE_ROOT = False # extracts to root, not to folder specified to zip. Note that some zip examples may not work! + + +class LocalInputType(Enum): + """ + Enum for different data loads (igm and boundary) + """ + BOUNDARY = 'boundary', + IGM = 'igm' + UNDEFINED = 'undefined' + + +class LocalFileLoaderError(FileNotFoundError): + """ + For throwing when errors occur during the process of loading local files + """ + pass + def validate_model(opdm_objects, loadflow_parameters=CGM_RELAXED_2, run_element_validations=True): # Load data @@ -38,7 +116,9 @@ def validate_model(opdm_objects, loadflow_parameters=CGM_RELAXED_2, run_element_ logger.info(f"Running validation: {validation_type}") try: # TODO figure out how to store full validation results if needed. Currently only status is taken - model_data["validations"][validation] = pypowsybl.loadflow.run_validation(network=network, validation_types=[validation_type])._valid.__bool__() + model_data["validations"][validation] = pypowsybl.loadflow.run_validation(network=network, + validation_types=[ + validation_type])._valid.__bool__() except Exception as error: logger.error(f"Failed {validation_type} validation with error: {error}") continue @@ -50,14 +130,14 @@ def validate_model(opdm_objects, loadflow_parameters=CGM_RELAXED_2, run_element_ parameters=loadflow_parameters, reporter=loadflow_report) - # Parsing loadflow results # TODO move sanitization to Elastic integration loadflow_result_dict = {} for island in loadflow_result: island_results = attr_to_dict(island) island_results['status'] = island_results.get('status').name - island_results['distributed_active_power'] = 0.0 if math.isnan(island_results['distributed_active_power']) else island_results['distributed_active_power'] + island_results['distributed_active_power'] = 0.0 if math.isnan(island_results['distributed_active_power']) else \ + island_results['distributed_active_power'] loadflow_result_dict[f"component_{island.connected_component_num}"] = island_results model_data["loadflow_results"] = loadflow_result_dict # model_data["loadflow_report"] = json.loads(loadflow_report.to_json()) @@ -67,10 +147,11 @@ def validate_model(opdm_objects, loadflow_parameters=CGM_RELAXED_2, run_element_ # TODO check only main island component 0? model_valid = any([True if val["status"] == "CONVERGED" else False for key, val in loadflow_result_dict.items()]) model_data["valid"] = model_valid - model_data["validation_duration_s"] = time.time() - start_time + model_data["validation_duration_s"] = round(time.time() - start_time, 3) + logger.info(f"Load flow validation status: {model_valid} [duration {model_data['validation_duration_s']}s]") # Pop out pypowsybl network object - model_data.pop('network') + # model_data.pop('network') # Send validation data to Elastic try: @@ -81,41 +162,820 @@ def validate_model(opdm_objects, loadflow_parameters=CGM_RELAXED_2, run_element_ return model_data +def validate_models(available_models: list = None, latest_boundary: list = None): + """ + Validates the raw output from the opdm + :param available_models: list of igm models + :param latest_boundary: dictionary containing the boundary data + :return list of validated models + """ + valid_models = [] + invalid_models = [] + # Validate models + if not available_models or not latest_boundary: + logger.error(f"Missing input data") + return valid_models + for model in available_models: + + try: + response = validate_model([model, latest_boundary]) + model[VALIDATION_STATUS_KEYWORD] = response + if response[VALID_KEYWORD]: + valid_models.append(model) + else: + invalid_models.append(model) + except: + invalid_models.append(model) + logger.error("Validation failed") + return valid_models + + +"""-----------------CONTENT RELATED TO LOADING DATA FROM LOCAL STORAGE-----------------------------------------------""" + + +def read_in_zip_file(zip_file_path: str, file_types: [] = None) -> {}: + """ + Reads in files from the given zip file + :param zip_file_path: path to the zip file (relative or absolute) + :param file_types: list of file types + :return: dictionary with file names as keys and file contents as values + """ + content = {} + with ZipFile(zip_file_path, 'r') as zip_file: + for file_name in zip_file.namelist(): + if file_types is None or any([file_keyword in file_name for file_keyword in file_types]): + logger.info(f"Reading {file_name} from {zip_file_path}") + content[file_name] = zip_file.read(file_name) + return content + + +def read_in_xml_file(xml_file_path: str, file_types: [] = None) -> {}: + """ + Reads in data from the given xml file + :param xml_file_path: path to the xml file (relative or absolute) + :param file_types: list of file types + :return: dictionary with file names as keys and file contents as values + """ + content = {} + file_name = os.path.basename(xml_file_path) + if file_types is None or any([file_keyword in file_name for file_keyword in file_types]): + logger.info(f"Reading {file_name}") + with open(xml_file_path, 'r', encoding='utf8') as file_content: + content[file_name] = file_content.read() + return content + + +def save_content_to_zip_file(content: {}): + """ + Saves content to zip file (in memory) + :param content: the content of zip file (key: file name, value: file content) + :return: byte array + """ + output_object = BytesIO() + with ZipFile(output_object, "w") as output_zip: + if content: + for file_name in content: + logger.info(f"Converting {file_name} to zip container") + output_zip.writestr(file_name, content[file_name]) + output_object.seek(0) + return output_object.getvalue() + + +def parse_boundary_message_type_profile(message_type_value: str) -> str: + """ + Slices the 4-letter string to add _ in the middle: 'EQBD' to 'EQ_BD' + :param message_type_value: input string + :return: updated string if it was 4 chars long + """ + if len(message_type_value) == 4: + return message_type_value[:2] + '_' + message_type_value[2:] + return message_type_value + + +def map_meta_dict_to_dict(input_dict: {}, meta_dict: {}, key_dict: {}) -> {}: + """ + Maps values from meta_dict to input dict based on key value pairs from key_dict + input_dict[key_dict[key]] = meta_dict[key] + :param input_dict: input and output dictionary (OPDM profile) + :param meta_dict: metadata (parameters from file name) + :param key_dict: mapper, values are keys for input dict, keys are keys for meta dict + :return: updated input_dict + """ + if meta_dict != {} and key_dict != {}: + for key in meta_dict.keys(): + if key in key_dict: + input_dict[key_dict[key]] = meta_dict[key] + return input_dict + + +def get_meta_from_filename(file_name: str): + """ + Extends the 'get_metadata_from_filename(file_name)' from helper by adding file name to metadata dictionary + :param file_name: file name to be parsed + :return: dictionary with metadata + """ + try: + fixed_file_name = file_name + for key in BOUNDARY_FILE_TYPE_FIX: + if key in fixed_file_name: + fixed_file_name = fixed_file_name.replace(key, BOUNDARY_FILE_TYPE_FIX[key]) + # meta_data = get_metadata_from_filename(fixed_file_name) + meta_data = metadata_from_filename(fixed_file_name) + # Revert back cases where there is a '-' in TSO's name like ENTSO-E + for case in SPECIAL_TSO_NAME: + if case in fixed_file_name: + meta_data[MODEL_PART_REFERENCE] = case + if "-".join([meta_data.get(PMD_MERGING_ENTITY,''), meta_data.get(PMD_MERGING_AREA, '')]) == case: + meta_data[PMD_MERGING_ENTITY] = None + meta_data[PMD_MERGING_AREA] = None + break + except ValueError as err: + logger.warning(f"Unable to parse file name: {err}, trying to salvage") + meta_data = salvage_data_from_file_name(file_name=file_name) + meta_data[FILENAME_KEYWORD] = file_name + return meta_data + + +def salvage_data_from_file_name(file_name: str): + """ + Function to try to extract something from the file name + param file_name: name of the file as string + return dictionary with metadata + """ + meta_data = {} + for element in IGM_FILE_TYPES: + if element in file_name: + meta_data[MODEL_MESSAGE_TYPE] = element.replace("_", "") + return meta_data + + +def load_data(file_name: str, file_types: list = None): + """ + Loads data from given file. + :param file_name: file from where to load (with relative or absolute path) + :param file_types: list of file types + :return: dictionary with filenames as keys, contents as values, if something was found, none otherwise + """ + data = None + if zipfile.is_zipfile(file_name): + data = read_in_zip_file(file_name, file_types) + elif file_name.endswith(XML_KEYWORD): + data = read_in_xml_file(file_name, file_types) + return data + + +def get_one_set_of_igms_from_local_storage(file_names: [], tso_name: str = None, file_types: [] = None): + """ + Loads igm data from local storage. + :param file_names: list of file names + :param tso_name: the name of the tso if given + :param file_types: list of file types + :return: dictionary that wants to be similar to OPDM profile + """ + igm_value = {OPDE_COMPONENT_KEYWORD: []} + if tso_name is not None: + igm_value[TSO_KEYWORD] = tso_name + for file_name in file_names: + if (data := load_data(file_name, file_types)) is None: + continue + meta_for_data = {key: get_meta_from_filename(key) for key in data.keys()} + for datum in data: + if MODEL_MODELING_ENTITY in meta_for_data[datum] and TSO_KEYWORD not in igm_value: + igm_value[TSO_KEYWORD] = meta_for_data[datum][MODEL_MODELING_ENTITY] + opdm_profile_content = meta_for_data[datum] + # opdm_profile_content = map_meta_dict_to_dict(input_dict={}, + # meta_dict=meta_for_data[datum], + # key_dict=IGM_FILENAME_MAPPING_TO_OPDM) + opdm_profile_content[DATA_KEYWORD] = save_content_to_zip_file({datum: data[datum]}) + igm_value[OPDE_COMPONENT_KEYWORD].append({OPDM_PROFILE_KEYWORD: opdm_profile_content}) + return igm_value + + +def get_one_set_of_boundaries_from_local_storage(file_names: [], file_types: [] = None): + """ + Loads boundary data from local storage. + :param file_names: list of file names + :param file_types: list of file types + :return: dictionary that wants to be similar to OPDM profile + """ + boundary_value = {OPDE_COMPONENT_KEYWORD: []} + for file_name in file_names: + if (data := load_data(file_name, file_types)) is None: + continue + meta_for_data = {key: get_meta_from_filename(key) for key in data.keys()} + for datum in data: + if MODEL_MESSAGE_TYPE in meta_for_data: + meta_for_data[MODEL_MESSAGE_TYPE] = parse_boundary_message_type_profile( + meta_for_data[MODEL_MESSAGE_TYPE]) + elif CGMES_PROFILE in meta_for_data: + meta_for_data[CGMES_PROFILE] = parse_boundary_message_type_profile(meta_for_data[CGMES_PROFILE]) + opdm_profile_content = meta_for_data[datum] + # opdm_profile_content = map_meta_dict_to_dict(input_dict={}, + # meta_dict=meta_for_data[datum], + # key_dict=BOUNDARY_FILENAME_MAPPING_TO_OPDM) + opdm_profile_content[DATA_KEYWORD] = save_content_to_zip_file({datum: data[datum]}) + boundary_value[OPDE_COMPONENT_KEYWORD].append({OPDM_PROFILE_KEYWORD: opdm_profile_content}) + return boundary_value + + +def get_zip_file_list_from_dir(path_to_dir: str): + """ + Lists names of zip files from the given directory + :param path_to_dir: search directory + :return: list of file names + """ + file_list = [join(path_to_dir, file_name) + for file_name in listdir(path_to_dir) + if zipfile.is_zipfile(join(path_to_dir, file_name))] + return file_list + + +def get_xml_file_list_from_dir(path_to_dir: str): + """ + Lists names of .xml files from the given directory + :param path_to_dir: search directory + :return: list of file names + """ + file_list = [join(path_to_dir, file_name) + for file_name in listdir(path_to_dir) + if file_name.endswith(XML_KEYWORD)] + return file_list + + +def get_list_of_content_files(paths: str | list) -> []: + """ + Gets list of file names of interest (.zip, .xml) from the given path or paths (list or string) + :param paths: either directory (get multiple files) or single file name + :return: list of file names + """ + path_list = paths + if isinstance(paths, str): + path_list = [paths] + list_of_files = [] + for element in path_list: + if os.path.isdir(element): + zip_files = get_zip_file_list_from_dir(element) + xml_files = get_xml_file_list_from_dir(element) + list_of_files.extend(zip_files) + list_of_files.extend(xml_files) + elif os.path.isfile(element) and (zipfile.is_zipfile(element) or element.endswith(XML_KEYWORD)): + list_of_files.append(element) + else: + logger.error(f"{element} is not a path nor a .xml or a .zip file") + raise LocalFileLoaderError + return list_of_files + + +def get_data_from_files(file_locations: list | str | dict, + get_type: LocalInputType = LocalInputType.IGM, + file_keywords: list = None): + """ + Extracts and parses data to necessary profile + :param file_locations: list of files or their locations, one element per tso + :param get_type: type of data to be extracted + :param file_keywords: list of identifiers that are in file names that should be loaded + :return: dictionary wanting to be similar to opdm profile + """ + all_models = [] + tso_counter = 1 + if isinstance(file_locations, str): + file_locations = [file_locations] + if isinstance(file_locations, dict): + for element in file_locations: + file_set = get_list_of_content_files(file_locations[element]) + if get_type is LocalInputType.BOUNDARY: + all_models.append(get_one_set_of_boundaries_from_local_storage(file_names=file_set, + file_types=file_keywords)) + else: + all_models.append(get_one_set_of_igms_from_local_storage(file_names=file_set, + tso_name=element, + file_types=file_keywords)) + elif isinstance(file_locations, list): + for element in file_locations: + file_set = get_list_of_content_files(element) + if get_type is LocalInputType.BOUNDARY: + all_models.append(get_one_set_of_boundaries_from_local_storage(file_names=file_set, + file_types=file_keywords)) + else: + igm_value = get_one_set_of_igms_from_local_storage(file_names=file_set, + file_types=file_keywords) + if TSO_KEYWORD not in igm_value: + tso_name = f"{MISSING_TSO_NAME}-{tso_counter}" + tso_counter += 1 + logger.warning(f"TSO name not found assigning default name as {tso_name}") + igm_value[TSO_KEYWORD] = tso_name + all_models.append(igm_value) + else: + logger.error(f"Unsupported input") + raise LocalFileLoaderError + + return all_models + + +def filter_file_list_by_file_keywords(file_list: list | str | dict, file_keywords: list = None): + """ + Ables to filter the file list by file identifying keywords ('TP', 'SSH', 'EQ', 'SV') + :param file_list: list of file names + :param file_keywords: list of file identifiers + :return updated file list if file_keywords was provided, file_list otherwise + """ + if file_keywords is None: + return file_list + new_file_list = [] + for file_name in file_list: + if any([file_keyword in file_name for file_keyword in file_keywords]): + new_file_list.append(file_name) + return new_file_list + + +def get_local_igm_data(file_locations: list | str | dict, file_keywords: list = None): + """ + Call this with a list of files/directories to load igm data + :param file_locations: list of files or their locations, one element per tso + :param file_keywords: list of identifiers that are in file names that should be loaded + :return: dictionary wanting to be similar to opdm profile if something useful was found + """ + output = get_data_from_files(file_locations=file_locations, + get_type=LocalInputType.IGM, + file_keywords=file_keywords) + if len(output) == 0: + logger.error(f"Data for igms were not valid, no igms were extracted") + raise LocalFileLoaderError + return output + + +def get_local_boundary_data(file_locations: list | str, file_keywords: list = None): + """ + Call this with a list of files/directories to load boundary data + :param file_locations: list of files or their locations, one element per tso + :param file_keywords: list of identifiers that are in file names that should be loaded + :return: dictionary wanting to be similar to opdm profile if something useful was found + """ + boundaries = get_data_from_files(file_locations=file_locations, + get_type=LocalInputType.BOUNDARY, + file_keywords=file_keywords) + try: + return boundaries[0] + except IndexError: + logger.error(f"Data for boundaries were not valid, no boundaries were extracted") + raise LocalFileLoaderError + + +def get_local_files(): + """ + This is just an example + Input is a list or dictionary (tso name: path(s) to tso igm files) of elements when there are more than one + TSO, boundary, otherwise it can be a single string entry. + For each element in the list of inputs, the value can be single path to directory, zip file, xml file + or list of them + Note that inputs are not checked during the loading. For example if element of one TSO contains zip file + and directory to zip file (something like ['c:/Path_to_zip/', 'c:/Path_to_zip/zip_file.zip']) + then zip file (zip_file.zip) is read in twice and sent to validator (ending probably with pypowsybl error). + NB! if tso name is not given (input type is not dictionary), then it is extracted from the name of the first + file which is processed and which follows the standard described in helper.get_metadata_from_filename() + NB! Directories and file names used here (./path_to_data/, etc.) are for illustration purposes only. + To use the local files specify the paths to the data accordingly (absolute or relative path) + """ + # Addresses can be relative or absolute. + # 1. load in by directory per tso which contains zip files + # igm_files = ['./path_to_data/case_1_TSO1_zip_files/', + # './path_to_data/case_1_TSO2_zip_files/', + # './path_to_data/case_1_TSO3_zip_files/'] + # boundary_file = './path_to_data/case_1_BOUNDARY_zip_files/' + # 2. load in by zip files per tso + # igm_files = ['./path_to_data/case_2_combined/TSO1_ZIP_OF_XMLS.zip', + # './path_to_data/case_2_combined/TSO2_ZIP_OF_XMLS.zip', + # './path_to_data/case_2_combined/TSO3_ZIP_OF_XMLS.zip'] + # boundary_file = './path_to_data/case_2_combined/BOUNDARY_ZIP_OF_XMLS.zip' + # 3. load in by directory per tso which stores xml files + # igm_files = ['./path_to_data/case_3_TSO1_xml_files/', + # './path_to_data/case_3_TSO2_xml_files/', + # './path_to_data/case_3_TSO3_xml_files/'] + # boundary_file = './path_to_data/case_3_BOUNDARY_xml_files/' + # 4. Load data in as dictionary in form of TSO name: paths + igm_files = {'TSO1': './path_to_data/case_3_TSO1_xml_files/', + 'TSO2': './path_to_data/case_3_TSO2_xml_files/', + 'TSO3': './path_to_data/case_3_TSO3_xml_files/'} + boundary_file = './path_to_data/case_3_BOUNDARY_xml_files/' + # Get data and carry on + models = get_local_igm_data(igm_files, IGM_FILE_TYPES) + try: + boundary = get_local_boundary_data(boundary_file, BOUNDARY_FILE_TYPES) + except NameError: + boundary = None + return models, boundary + + +def check_and_create_the_folder_path(folder_path: str): + """ + Checks if folder path doesn't have any excessive special characters and it exists. Creates it if it does not + :param folder_path: input given + :return checked folder path + """ + folder_path = check_the_folder_path(folder_path) + if not os.path.exists(folder_path): + os.makedirs(folder_path) + return folder_path + + +def download_zip_file(url_to_zip: str, path_to_download: str = None): + """ + Downloads a zip file from url. + Note that the file may be rather large so do it in stream + :param url_to_zip: url of the zip file + :param path_to_download: location to download the file + : return loaded_file_name: the path to downloaded zip file + """ + loaded_file_name = url_to_zip.split('/')[-1] + if path_to_download is not None: + path_to_download = check_the_folder_path(path_to_download) + loaded_file_name = path_to_download + loaded_file_name + with requests.get(url_to_zip, stream=True) as r: + with open(loaded_file_name, 'wb') as f: + shutil.copyfileobj(r.raw, f) + return loaded_file_name + + +def check_and_extract_zip_files_in_folder(root_folder: str, + files: [], + depth: int = 1, + use_root: bool = USE_ROOT, + max_depth: int = RECURSION_LIMIT): + """ + Checks if files in folder are zip files, and extracts them recursively + :param root_folder: the name of the root folder + :param files: list of files + :param depth: current depth of recursion + :param use_root: use root folder for extraction + :param max_depth: max allowed recursion depth + """ + root_folder = check_the_folder_path(root_folder) + for file_name in files: + full_file_name = root_folder + file_name + file_extension = os.path.splitext(full_file_name)[-1] + xml_file = os.path.splitext(full_file_name)[0] + ".xlm" + if file_extension == ".xlm" or xml_file in files: + return + if zipfile.is_zipfile(full_file_name) and file_extension not in UNWANTED_FILE_TYPES: + extract_zip_file(current_zip_file=full_file_name, + root_folder=root_folder, + use_root=use_root, + depth=depth + 1, + max_depth=max_depth) + + +def extract_zip_file(current_zip_file: str, + root_folder: str = None, + use_root: bool = USE_ROOT, + depth: int = 1, + max_depth: int = RECURSION_LIMIT): + """ + Extracts content of the zip file to the root. + :param current_zip_file: zip file to be extracted + :param root_folder: folder where to extract + :param use_root: use root folder for extraction + :param depth: current depth of recursion + :param max_depth: max allowed recursion depth + """ + # Stop the recursion before going to deep + if depth > max_depth: + return + if root_folder is None or use_root is False: + root_folder = os.path.splitext(current_zip_file)[0] + root_folder = check_the_folder_path(root_folder) + logger.info(f"Extracting {current_zip_file} to {root_folder}") + with zipfile.ZipFile(current_zip_file, 'r') as level_one_zip_file: + # level_one_zip_file.extractall(path=root_folder) + for info in level_one_zip_file.infolist(): + zip_file_name = info.filename + try: + level_one_zip_file.extract(zip_file_name, path=root_folder) + except FileNotFoundError: + # Workaround for extracting long file names + output_path = root_folder + zip_file_name + check_and_create_the_folder_path(os.path.dirname(output_path)) + output_path_unicode = output_path.encode('unicode_escape').decode() + file_path = os.path.abspath(os.path.normpath(output_path_unicode)) + file_path = LONG_FILENAME_SUFFIX + file_path + buffer_size = 16 * 1024 + with level_one_zip_file.open(info) as f_in, open(file_path, 'wb') as f_out: + while True: + buffer = f_in.read(buffer_size) + if not buffer: + break + f_out.write(buffer) + except Exception as e: + logger.error(f"Uncaught exception: {e}") + os.remove(current_zip_file) + # Getting relevant paths + all_elements = [x for x in os.walk(root_folder)] + for root, folders, files in all_elements: + # Don't go to system specific folders or generate endless recursion + if any(root in system_folder for system_folder in SYSTEM_SPECIFIC_FOLDERS) or root == root_folder: + continue + check_and_extract_zip_files_in_folder(root_folder=root, + use_root=use_root, + files=files, + depth=depth, + max_depth=max_depth) + + +def search_directory(root_folder: str, search_path: str): + """ + Searches the search_path starting from the root_folder. Note that the requested path has to end with the search_path + :param root_folder: root folder from where to start looking + :param search_path: the part of the path to search from the root_folder + :return full path from root_folder to search_path if found, raise exception otherwise + """ + all_folders = [check_the_folder_path(x[0]) for x in os.walk(root_folder)] + search_path = check_the_folder_path(search_path) + matches = [path_name for path_name in all_folders if str(path_name).endswith(search_path)] + matches_count = len(matches) + if matches_count == 1: + return matches[0] + elif matches_count == 0: + raise LocalFileLoaderError(f"{search_path} not found in {root_folder}") + else: + raise LocalFileLoaderError(f"{search_path} is too broad, found {matches_count} possible matches") + + +def check_and_get_examples(path_to_search: str, + use_root: bool = USE_ROOT, + local_folder_for_examples: str = ENTSOE_EXAMPLES_LOCAL, + url_for_examples: str = ENTSOE_EXAMPLES_EXTERNAL, + recursion_depth: int = RECURSION_LIMIT): + """ + Checks if examples are present if no then downloads and extracts them + :param local_folder_for_examples: path to the examples + :param use_root: use root folder for extraction + :param url_for_examples: path to online storage + :param recursion_depth: the max allowed iterations for the recursion + :param path_to_search: folder to search + """ + file_name = url_for_examples.split('/')[-1] + local_folder_for_examples = check_the_folder_path(local_folder_for_examples) + full_file_name = local_folder_for_examples + file_name + # Check if folder exists, create it otherwise + if not os.path.exists(local_folder_for_examples): + os.makedirs(local_folder_for_examples) + try: + # Try to get the directory, catch error if not found + directory_needed = search_directory(local_folder_for_examples, path_to_search) + return directory_needed + except LocalFileLoaderError: + # Check if directory contains necessary file and it is zip file + if not os.path.isfile(full_file_name) or not zipfile.is_zipfile(full_file_name): + # Download the file + logger.info(f"Downloading examples from {url_for_examples} to {local_folder_for_examples}") + full_file_name = download_zip_file(url_for_examples, local_folder_for_examples) + # Now, there should be a zip present, extract it + extract_zip_file(current_zip_file=full_file_name, + root_folder=local_folder_for_examples, + use_root=use_root, + max_depth=recursion_depth) + # And try to find the necessary path + return search_directory(local_folder_for_examples, path_to_search) + + +def group_files_by_origin(list_of_files: [], root_folder: str = None, allow_merging_entities: bool = True): + """ + When input is a directory containing the .xml and .zip files for all the TSOs and boundaries as well and + if files follow the standard name convention, then this one sorts them by TSOs and by boundaries + The idea is that one tso can have only one type of file only once (e.g. one tso cannot have two 'TP' files) + and there is only one list of boundaries + :param list_of_files: list of files to divide + :param root_folder: root folder for relative or absolute paths + :param allow_merging_entities: true: allow cases like TECNET-CE-ELIA to list of models + :return: dictionaries for containing TSO files, boundary files + """ + tso_files = {} + # Take assumption that we have only one boundary + boundaries = {} + igm_file_types = [file_type.replace('_', '') for file_type in IGM_FILE_TYPES] + boundary_file_types = [file_type.strip("_") for file_type in BOUNDARY_FILE_TYPES] + if root_folder is not None: + root_folder = check_the_folder_path(root_folder) + for file_name in list_of_files: + file_extension = os.path.splitext(file_name)[-1] + file_base = os.path.splitext(file_name)[0] + # Check if file is supported file + if file_extension not in PREFERRED_FILE_TYPES: + continue + # Check if file supports standard naming convention, refer to helper.get_metadata_from_filename for more details + file_name_meta = get_meta_from_filename(file_name) + if root_folder is not None: + file_name = root_folder + file_name + tso_name = file_name_meta.get(MODEL_MODELING_ENTITY) or file_name_meta.get(MODEL_PART_REFERENCE) + file_type_name = file_name_meta.get(MODEL_MESSAGE_TYPE) or file_name_meta.get(CGMES_PROFILE) + merging_entity = file_name_meta.get(PMD_MERGING_ENTITY, '') or file_name_meta.get(MODEL_MERGING_ENTITY, '') + merging_entity = None if merging_entity == '' else merging_entity + modeling_entity = file_name_meta.get(MODEL_FOR_ENTITY, '') + modeling_entity = None if modeling_entity == '' else modeling_entity + # if needed skip the cases when there is merging entity and part_reference present, didn't like to pypowsybl + if not allow_merging_entities and tso_name and merging_entity and tso_name not in SPECIAL_TSO_NAME: + continue + if not tso_name: + tso_name = modeling_entity or merging_entity + if tso_name and file_type_name: + # Handle TSOs + if file_type_name in igm_file_types: + if tso_name not in tso_files.keys(): + tso_files[tso_name] = [] + # Check if file without the extension is already present + if not any(file_base in file_listed for file_listed in tso_files[tso_name]): + tso_files[tso_name].append(file_name) + # Handle boundaries + elif file_type_name in boundary_file_types: + if tso_name not in boundaries.keys(): + boundaries[tso_name] = [] + # Check if file without the extension is already present + if not any(file_base in file_listed for file_listed in boundaries[tso_name]): + boundaries[tso_name].append(file_name) + else: + logger.warning(f"Names follows convention but unable to categorize it: {file_name}") + else: + logger.warning(f"Unrecognized file: {file_name}") + return tso_files, boundaries + + +def check_model_completeness(model_data: list | dict, file_types: list | str): + """ + Skips models which do not contain necessary files + :param model_data: models to be checked + :param file_types: list of file types to search + :return updated file list + """ + checked_models = [] + if isinstance(file_types, str): + file_types = [file_types] + if isinstance(model_data, dict): + model_data = [model_data] + for model_datum in model_data: + existing_types = [item[OPDM_PROFILE_KEYWORD][CGMES_PROFILE] for item in model_datum[OPDE_COMPONENT_KEYWORD]] + if all(file_type in existing_types for file_type in file_types): + checked_models.append(model_datum) + return checked_models + + +def get_local_entsoe_files(path_to_directory: str | list, + allow_merging_entities: bool = True, + igm_files_needed: list = None, + boundary_files_needed: list = None): + """ + Gets list of files in directory and divides them to model and boundary data + :param path_to_directory: path to directory from where to search + :param allow_merging_entities: true allow cases like TECNET-CE-ELIA to list of models + :param igm_files_needed: specify explicitly the file types needed (escape pypowsybl "EQ" missing error) + :param boundary_files_needed: specify explicitly the file types needed for boundary data + :return dictionary of tso files and list of boundary data + """ + if isinstance(path_to_directory, str): + path_to_directory = [path_to_directory] + models = [] + all_boundaries = [] + boundary = None + for single_path in path_to_directory: + try: + full_path = check_and_get_examples(single_path) + except Exception as ex: + logger.error(f"FATAL ERROR WHEN GETTING FILES: {ex}") + sys.exit() + full_path = check_the_folder_path(full_path) + file_names = next(os.walk(full_path), (None, None, []))[2] + models_data, boundary_data = group_files_by_origin(list_of_files=file_names, + root_folder=full_path, + allow_merging_entities=allow_merging_entities) + if models_data: + models_transformed = get_local_igm_data(models_data, IGM_FILE_TYPES) + models.extend(models_transformed) + if boundary_data: + try: + boundary_transformed = get_local_boundary_data(boundary_data, BOUNDARY_FILE_TYPES) + except NameError: + boundary_transformed = None + if boundary_transformed is not None: + all_boundaries.append(boundary_transformed) + if len(all_boundaries) == 0: + logger.warning(f"No boundaries found") + else: + if len(all_boundaries) > 1: + logger.warning(f"Multiple boundaries detected, taking first occurrence") + boundary = all_boundaries[0] + if igm_files_needed is not None: + models = check_model_completeness(models, igm_files_needed) + if boundary_files_needed is not None: + boundary = check_model_completeness(boundary, boundary_files_needed) + return models, boundary + + +"""-----------------END OF CONTENT RELATED TO LOADING DATA FROM LOCAL STORAGE----------------------------------------""" + # TEST if __name__ == "__main__": import sys from emf.common.integrations.opdm import OPDM + logging.basicConfig( format='%(levelname)-10s %(asctime)s.%(msecs)03d %(name)-30s %(funcName)-35s %(lineno)-5d: %(message)s', datefmt='%Y-%m-%dT%H:%M:%S', level=logging.INFO, handlers=[logging.StreamHandler(sys.stdout)] ) - #logging.getLogger('powsybl').setLevel(1) - - opdm = OPDM() + # logging.getLogger('powsybl').setLevel(1) + # Add a pypowsybl log gatherer + # Set up the log gatherer: + # topic name: currently used as a start of a file name + # send_it_to_elastic: send the triggered log entry to elastic (parameters are defined in custom_logger.properties) + # upload_to_minio: upload log file to minio (parameters are defined in custom_logger.properties) + # report_on_command: trigger reporting explicitly + # logging policy: choose according to the need. Currently: + # ALL_ENTRIES: gathers all log entries no matter of what + # ENTRIES_IF_LEVEL_REACHED: gathers all log entries when at least one entry was at least on the level specified + # ENTRY_ON_LEVEL: gathers only entry which was at least on the level specified + # ENTRIES_ON_LEVEL: gathers all entries that were at least on the level specified + # ENTRIES_COLLECTED_TO_LEVEL: gathers all entries to the first entry that was at least on the level specified + # print_to_console: propagate log to parent + # reporting_level: level that triggers policy + pypowsybl_log_gatherer = PyPowsyblLogGatherer(topic_name='IGM_validation', + send_to_elastic=False, + upload_to_minio=False, + report_on_command=False, + logging_policy=PyPowsyblLogReportingPolicy.ENTRIES_IF_LEVEL_REACHED, + print_to_console=False, + reporting_level=logging.ERROR) - latest_boundary = opdm.get_latest_boundary() - available_models = opdm.get_latest_models_and_download(time_horizon='1D', scenario_date="2023-08-16T09:30")#, tso="ELERING") + # Switch this to True if files from local storage are used + load_data_from_local_storage = True + try: + if load_data_from_local_storage: + # available_models, latest_boundary = get_local_files() + # Change this according the test case to be used. Note that it must reference to the end folder that will + # be used. Also it must be unique enough do be distinguished from other folders (for example instead of + # using 'Combinations' use 'TC1_T11_NonConform_L1/Combinations' etc) + # Some examples for + # https://www.entsoe.eu/Documents/CIM_documents/Grid_Model_CIM/QoCDC_v3.2.1_test_models.zip + folder_to_study = 'TC3_T1_Conform' + # folder_to_study = 'TC3_T3_Conform' + # folder_to_study = 'TC4_T1_Conform/Initial' + # Some examples for + # https://www.entsoe.eu/Documents/CIM_documents/Grid_Model_CIM/TestConfigurations_packageCASv2.0.zip + # folder_to_study = ['CGMES_v2.4.15_MicroGridTestConfiguration_T1_BE_Complete_v2', + # 'CGMES_v2.4.15_MicroGridTestConfiguration_T1_NL_Complete_v2', + # 'Type1_T1/CGMES_v2.4.15_MicroGridTestConfiguration_BD_v2'] + # In general this function checks if the paths (path_to_directory) exist in ENTSOE_EXAMPLES_LOCAL, + # if not then it tries to download and extract zip from ENTSOE_EXAMPLES_EXTERNAL. If this fails or path + # is not still found it carries on as usual. + # Note that zip can be downloaded and extracted but in this case it must be extracted to the path + # path_to_directory: string or list, end of the path from where to load the files + # (starting from ENTSOE_EXAMPLES_LOCAL). Note that these must be unique enough (errors are thrown when + # two or more paths are found) + # allow_merging_entities: Whether to allow merging entities, pypowsybl validation was not happy about that + # igm_files_needed: in order for pypowsybl validation to work, atleast these files should be present in + # igm + available_models, latest_boundary = get_local_entsoe_files(path_to_directory=folder_to_study, + allow_merging_entities=False, + igm_files_needed=['EQ']) + else: + raise LocalFileLoaderError + except FileNotFoundError: + # if needed catch and handle LocalFileLoaderError separately + logger.info(f"Fetching data from external resources") + opdm = OPDM() + latest_boundary = opdm.get_latest_boundary() + available_models = opdm.get_latest_models_and_download(time_horizon='ID', + scenario_date='2024-04-05T22:30', + # tso='ELERING' + ) + # available_models = opdm.get_latest_models_and_download(time_horizon='1D', + # scenario_date='2024-03-14T09:30', + # # tso='ELERING' + # ) validated_models = [] - - # Validate models for model in available_models: - + tso = model['pmd:TSO'] + pypowsybl_log_gatherer.set_tso(tso) try: - response = validate_model([model, latest_boundary]) - model["VALIDATION_STATUS"] = response + if isinstance(latest_boundary, dict): + response = validate_model([model, latest_boundary]) + else: + response = validate_model([model]) + model[VALIDATION_STATUS_KEYWORD] = response + # Example for manual triggering for posting the logs. The value given must be positive: + log_post_trigger = model.get(VALIDATION_STATUS_KEYWORD, {}).get('valid') is False + # Note that this switch is governed by report_on_command in PyPowsyblLogGatherer initialization + pypowsybl_log_gatherer.trigger_to_report_externally(log_post_trigger) validated_models.append(model) - except Exception as error: validated_models.append(model) - #logger.error("Validation failed", error) - + logger.error(f"For {model.get('pmd:TSO')} validation failed", error) + pypowsybl_log_gatherer.stop_working() # Print validation statuses - [print(dict(tso=model['pmd:TSO'], valid=model.get('VALIDATION_STATUS', {}).get('VALID'), duration=model.get('VALIDATION_STATUS', {}).get('VALIDATION_DURATION_S'))) for model in validated_models] + [print(dict(tso=model['pmd:TSO'], valid=model.get('VALIDATION_STATUS', {}).get('valid'), + duration=model.get('VALIDATION_STATUS', {}).get('validation_duration_s'))) for model in + validated_models] # With EMF IGM Validation settings # {'tso': '50Hertz', 'valid': True, 'duration': 6.954386234283447} @@ -128,5 +988,3 @@ def validate_model(opdm_objects, loadflow_parameters=CGM_RELAXED_2, run_element_ # {'tso': 'TTG', 'valid': True, 'duration': 5.204774856567383} # {'tso': 'PSE', 'valid': True, 'duration': 1.555201530456543} - - diff --git a/emf/model_retriever/model_retriever.py b/emf/model_retriever/model_retriever.py index e0322c2..4c94918 100644 --- a/emf/model_retriever/model_retriever.py +++ b/emf/model_retriever/model_retriever.py @@ -4,10 +4,14 @@ from zipfile import ZipFile from typing import List import json + from emf.common.config_parser import parse_app_properties from emf.common.integrations import edx, elastic, opdm, minio from emf.common.converters import opdm_metadata_to_json from emf.loadflow_tool.validator import validate_model +from emf.loadflow_tool.helper import load_opdm_data +from emf.loadflow_tool.model_statistics import get_system_metrics + logger = logging.getLogger(__name__) @@ -29,10 +33,23 @@ def handle(self, opdm_objects: dict, **kwargs): for opdm_object in opdm_objects: # Get model from OPDM - response = self.opdm_service.download_object(opdm_object=opdm_object) + self.opdm_service.download_object(opdm_object=opdm_object) # Put all components to bytesio zip (each component to different zip) - for component in response['opde:Component']: + for component in opdm_object['opde:Component']: + + # Sanitize content-reference url + content_reference = component['opdm:Profile']['pmd:content-reference'] + content_reference = content_reference.replace('//', '/') + + # Check whether profile already exist in object storage (Minio) + if component['opdm:Profile']['pmd:cgmesProfile'] == "EQ": # TODO currently only for EQ + profile_exist = self.minio_service.object_exists(bucket_name=MINIO_BUCKET, object_name=content_reference) + if profile_exist: + logger.info(f"Profile already stored in object storage: {content_reference}") + continue + + # Put content data into bytes object output_object = BytesIO() with ZipFile(output_object, "w") as component_zip: with ZipFile(BytesIO(component['opdm:Profile']['DATA'])) as profile_zip: @@ -41,8 +58,7 @@ def handle(self, opdm_objects: dict, **kwargs): component_zip.writestr(file_name, profile_zip.open(file_name).read()) # Upload components to minio storage - output_object.name = component['opdm:Profile']['pmd:content-reference'] - output_object.name = output_object.name.replace('//', '/') # sanitize double slash in url + output_object.name = content_reference logger.info(f"Uploading component to object storage: {output_object.name}") self.minio_service.upload_object(file_path_or_file_object=output_object, bucket_name=MINIO_BUCKET) @@ -65,6 +81,55 @@ def handle(self, opdm_objects: dict, **kwargs): return updated_opdm_objects + def handle_reduced(self, opdm_objects: List[dict], **kwargs): + # Download each OPDM object network model from OPDE + updated_opdm_objects = [] + for opdm_object in opdm_objects: + # Put all components to bytesio zip (each component to different zip) + for component in opdm_object['opde:Component']: + # Sanitize content-reference url + content_reference = component['opdm:Profile']['pmd:content-reference'] + content_reference = content_reference.replace('//', '/') + # Check whether profile already exist in object storage (Minio) + if component['opdm:Profile']['pmd:cgmesProfile'] == "EQ": # TODO currently only for EQ + profile_exist = self.minio_service.object_exists(bucket_name=MINIO_BUCKET, object_name=content_reference) + if profile_exist: + logger.info(f"Profile already stored in object storage: {content_reference}") + continue + # Put content data into bytes object + output_object = BytesIO() + with ZipFile(output_object, "w") as component_zip: + with ZipFile(BytesIO(component['opdm:Profile']['DATA'])) as profile_zip: + for file_name in profile_zip.namelist(): + logger.debug(f"Adding file: {file_name}") + component_zip.writestr(file_name, profile_zip.open(file_name).read()) + + # Upload components to minio storage + output_object.name = content_reference + logger.info(f"Uploading component to object storage: {output_object.name}") + self.minio_service.upload_object(file_path_or_file_object=output_object, bucket_name=MINIO_BUCKET) + updated_opdm_objects.append(opdm_object) + return updated_opdm_objects + + +class HandlerModelsStat: + + def handle(self, opdm_objects: List[dict], **kwargs): + # Get the latest boundary set for validation + latest_boundary = self.opdm_service.get_latest_boundary() # TODO - get BDS from ELK+MINIO + + # Extract statistics + for opdm_object in opdm_objects: + stat = load_opdm_data(opdm_objects=[opdm_object, latest_boundary]) + opdm_object['total_load'] = stat['total_load'] + opdm_object['generation'] = stat['generation'] + opdm_object['losses'] = stat['losses'] + opdm_object['losses_coefficient'] = stat['losses_coefficient'] + opdm_object['acnp'] = stat['tieflow_acnp']['EquivalentInjection.p'] + opdm_object['hvdc'] = {key: value['EquivalentInjection.p'] for key, value in stat["tieflow_hvdc"].items()} + + return opdm_objects + class HandlerModelsValidator: @@ -73,14 +138,14 @@ def __init__(self): def handle(self, opdm_objects: List[dict], **kwargs): # Get the latest boundary set for validation - latest_boundary = self.opdm_service.get_latest_boundary() + latest_boundary = kwargs.get('latest_boundary') + if not latest_boundary: + latest_boundary = self.opdm_service.get_latest_boundary() # Run network model validation for opdm_object in opdm_objects: response = validate_model(opdm_objects=[opdm_object, latest_boundary]) opdm_object["valid"] = response["valid"] # taking only relevant data from validation step - for component in opdm_object['opde:Component']: # pop out initial binary network model data - component['opdm:Profile'].pop('DATA') return opdm_objects @@ -92,6 +157,12 @@ def __init__(self): self.elastic_service = elastic.HandlerSendToElastic(index=ELK_INDEX, id_from_metadata=True, id_metadata_list=['opde:Id']) def handle(self, opdm_objects: List[dict], **kwargs): + + # pop out initial binary network model data + for opdm_object in opdm_objects: + for component in opdm_object['opde:Component']: + component['opdm:Profile'].pop('DATA') + self.elastic_service.handle(byte_string=json.dumps(opdm_objects, default=str).encode('utf-8'), properties=kwargs.get('properties')) logger.info(f"Network model metadata sent to object-storage.elk")