Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MR: Merge Worker issue #57 to main #63

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
c2d5a46
updates validator to use local igm and boundary files
mr0321 Mar 7, 2024
5775327
Update .gitignore, avoid dragging zip files along
mr0321 Mar 7, 2024
842ce13
resolves issues from transfer
mr0321 Mar 7, 2024
c54d15d
Update validator.py
mr0321 Mar 8, 2024
161e628
Adds comments
mr0321 Mar 8, 2024
1602990
Updates validator.py to load and parse local files from ENTSOE zip
mr0321 Mar 15, 2024
25eec8c
#45 refactors code to use in other scripts. when using, note ENTSOE z…
mr0321 Mar 20, 2024
632897d
#55 Gathers and saves pypowsybl logs to files (elastic: todo). It is …
mr0321 Mar 22, 2024
19ea862
Reports entry that triggers to elastic, and log if requested to minio
mr0321 Mar 26, 2024
ab8480e
adds minio file name and bucket to elastic
mr0321 Mar 27, 2024
9b02ffb
enables to trigger log reporting manually (if triggered by log entry)
mr0321 Mar 27, 2024
7313039
Adds file downloading, TODO check it and resolve recursive unzipping
mr0321 Mar 27, 2024
1822e68
Loads zip from ENTSOE, unzips it, looks for given directory, sorts th…
mr0321 Mar 28, 2024
f72cc80
bug fix for limiting the depth of the recursion
mr0321 Mar 28, 2024
640179c
Adds custom path for when storing pypowsybl logs to local storage
mr0321 Apr 1, 2024
c8ac951
bug fix: handle long path/filenames when unzipping
mr0321 Apr 2, 2024
1b94b49
reverts gitignore back to original state
mr0321 Apr 2, 2024
3b2a4e8
Check-in: refactors merge.py cgm_compose class in model_merger.py, in…
mr0321 Apr 16, 2024
0849b10
filters models before merge by tsos and bugfixes
mr0321 Apr 19, 2024
bebab20
reverts debugging settings
mr0321 Apr 19, 2024
2a6c334
fixes additional bugs for version numbers
mr0321 Apr 19, 2024
05f4fbe
some additional local bugging fixes
mr0321 Apr 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.idea/
containers.txt
containers.txt
.DS_Store
9 changes: 9 additions & 0 deletions config/cgm_worker/model_merge.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[MAIN]
RMQ_CGM_QUEUE = rabbit-task-test
EMF_OS_MINIO_BUCKET = opde-confidential-models
EMF_OS_MINIO_FOLDER = EMF_test_merge_find_better_place
EMF_OS_MINIO_BOUNDARY_BUCKET = opdm-data
EMF_OS_MINIO_BOUNDARY_FOLDER = CGMES/ENTSOE
ELK_VERSION_INDEX = emfos-logs*
MERGE_TYPES = {'CGM': {'AREA': 'EU', 'INCLUDED':[], 'EXCLUDED': ['APG'], 'LOCAL_IMPORT': []}, 'RMM': {'AREA':'BA', 'INCLUDED': ['ELERING', 'AST', 'LITGRID','PSE'], 'EXLUDED': [], 'LOCAL_IMPORT': []}}
MERGING_ENTITY = BALTICRSC
4 changes: 3 additions & 1 deletion config/cgm_worker/validator.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
[MAIN]
ELK_INDEX = emfos-igm-validation
ELK_INDEX = emfos-igm-validation
ENTSOE_EXAMPLES_EXTERNAL=https://www.entsoe.eu/Documents/CIM_documents/Grid_Model_CIM/QoCDC_v3.2.1_test_models.zip
ENTSOE_EXAMPLES_LOCAL=./example_models/
11 changes: 10 additions & 1 deletion config/logging/custom_logger.properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,13 @@
LOGGING_INDEX = emfos-logs
LOGGING_FORMAT = %(levelname) -10s %(asctime) -10s %(name) -50s %(funcName) -50s %(lineno) -5d: %(message)s
LOGGING_DATEFMT = %Y-%m-%d %H:%M:%S
LOGGING_LEVEL = INFO
LOGGING_LEVEL = INFO
LOCAL_FOLDER_FOR_PYPOWSYBL_LOGS = ./pypowsybl-logs
ELASTIC_INDEX_FOR_PYPOWSYBL_LOGS = emfos-pypowsybl-logs
MINIO_BUCKET_FOR_PYPOWSYBL_LOGS = external
MINIO_FOLDER_FOR_PYPOWSYBL_LOGS = EMF_OS_pypowsybl_logs
ELASTIC_FIELD_FOR_LOG_DATA = log_data
ELASTIC_FIELD_FOR_TSO = tso
ELASTIC_FIELD_FOR_TOPIC = topic
ELASTIC_FIELD_FOR_FILENAME = log_file_name
ELASTIC_FIELD_FOR_MINIO_BUCKET = minio_bucket
79 changes: 72 additions & 7 deletions emf/common/integrations/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,21 @@

import warnings
from elasticsearch.exceptions import ElasticsearchWarning

warnings.simplefilter('ignore', ElasticsearchWarning)

logger = logging.getLogger(__name__)

parse_app_properties(caller_globals=globals(), path=config.paths.integrations.elastic)

SCROLL_ID_FIELD = '_scroll_id'
RESULT_FIELD = 'hits'
DOCUMENT_COUNT = 10000
DEFAULT_COLUMNS = ["value"]
INITIAL_SCROLL_TIME = "15m"
CONSECUTIVE_SCROLL_TIME = "12m"
MAGIC_KEYWORD = '_source'


class Elastic:

Expand Down Expand Up @@ -60,7 +69,10 @@ def send_to_elastic(index,
# Executing POST to push message into ELK
if debug:
logger.debug(f"Sending data to {url}")
response = requests.post(url=url, json=json_message)
if json_message.get('args', None): # TODO revise if this is best solution
json_message.pop('args')
json_data = json.dumps(json_message, default=str)
response = requests.post(url=url, data=json_data.encode(), headers={"Content-Type": "application/json"})
if debug:
logger.debug(f"ELK response: {response.content}")

Expand Down Expand Up @@ -92,17 +104,24 @@ def send_to_elastic_bulk(index,

if id_from_metadata:
id_separator = "_"
json_message_list = [value for element in json_message_list for value in ({"index": {"_index": index, "_id": id_separator.join([str(element.get(key, '')) for key in id_metadata_list])}}, element)]
json_message_list = [value for element in json_message_list for value in ({"index": {"_index": index,
"_id": id_separator.join(
[str(element.get(
key, '')) for
key in
id_metadata_list])}},
element)]
else:
json_message_list = [value for element in json_message_list for value in ({"index": {"_index": index}}, element)]
json_message_list = [value for element in json_message_list for value in
({"index": {"_index": index}}, element)]

response_list = []
for batch in range(0, len(json_message_list), batch_size):
# Executing POST to push messages into ELK
if debug:
logger.debug(f"Sending batch ({batch}-{batch + batch_size})/{len(json_message_list)} to {url}")
response = requests.post(url=url,
data=(ndjson.dumps(json_message_list[batch:batch + batch_size])+"\n").encode(),
data=(ndjson.dumps(json_message_list[batch:batch + batch_size]) + "\n").encode(),
timeout=None,
headers={"Content-Type": "application/x-ndjson"})
if debug:
Expand All @@ -128,6 +147,55 @@ def get_docs_by_query(self, index, query, size=None, return_df=True):

return response

def get_data_by_scrolling(self,
query: dict,
index: str,
fields: []):
"""
Gets a large bulk of data from elastic
:param query: dictionary with parameters by which to select data from elastic
:param index: table name in elastic
:param fields: fields or columns to return from query
:return: dataframe with results
"""
result = self.client.search(index=index,
query=query,
source=fields,
size=DOCUMENT_COUNT,
scroll=INITIAL_SCROLL_TIME)

scroll_id = result[SCROLL_ID_FIELD]
# Extract and return the relevant data from the initial response
hits = result[RESULT_FIELD][RESULT_FIELD]
yield hits
# Continue scrolling through the results until there are no more
while hits:
result = self.client.scroll(scroll_id=scroll_id, scroll=CONSECUTIVE_SCROLL_TIME)
hits = result[RESULT_FIELD][RESULT_FIELD]
yield hits
# Clear the scroll context after processing all results
self.client.clear_scroll(scroll_id=scroll_id)

def get_data(self,
query: dict,
index: str,
fields: [] = None):
"""
Gets data from elastic
:param query: dictionary with parameters by which to select data from elastic
:param index: table name in elastic
:param fields: fields or columns to return from query
:return: dataframe with results
"""
# Gather all the results to list (of dictionaries)
list_of_lines = []
for hits in self.get_data_by_scrolling(query=query, index=index, fields=fields):
for hit in hits:
list_of_lines.append({field: hit[MAGIC_KEYWORD][field] for field in fields})
# convert list (of dictionaries) to pandas dataframe
data_frame = pd.DataFrame(list_of_lines)
return data_frame

def query_schedules_from_elk(self,
index: str,
utc_start: str,
Expand Down Expand Up @@ -187,7 +255,6 @@ def __init__(self,
auth=None,
verify=False,
debug=False):

self.index = index
self.server = server
self.id_from_metadata = id_from_metadata
Expand All @@ -203,7 +270,6 @@ def __init__(self,
self.session.auth = auth

def handle(self, byte_string, properties):

Elastic.send_to_elastic_bulk(index=self.index,
json_message_list=json.loads(byte_string),
id_from_metadata=self.id_from_metadata,
Expand All @@ -215,7 +281,6 @@ def handle(self, byte_string, properties):


if __name__ == '__main__':

# Create client
server = "http://test-rcc-logs-master.elering.sise:9200"
service = Elastic(server=server)
Expand Down
3 changes: 2 additions & 1 deletion emf/common/integrations/object_storage/object_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,10 @@ def get_content(metadata: dict, bucket_name=MINIO_BUCKET_NAME):
logger.info(f"Getting data from MinIO")
for component in metadata["opde:Component"]:
content_reference = component.get("opdm:Profile").get("pmd:content-reference")
# Minio considers // as /
content_reference = content_reference.replace('//', '/')
logger.info(f"Downloading {content_reference}")
component["opdm:Profile"]["DATA"] = minio_service.download_object(bucket_name, content_reference)

return metadata


Expand Down
Loading