Skip to content

Commit

Permalink
small fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
root committed Nov 13, 2024
1 parent 4bc6bc2 commit 47c0b47
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 16 deletions.
14 changes: 8 additions & 6 deletions plugins/fluentd_telemetry_plugin/src/streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ def ufm_telemetry_xdr_ports_types(self):
@property
def streaming_interval(self):
return self.config_parser.get_streaming_interval()

@property
def fluentd_msg_tag(self):
return self.config_parser.get_fluentd_msg_tag()
Expand Down Expand Up @@ -161,7 +161,6 @@ def fluent_sender(self):
timeout=timeout,
use_c=_use_c)
return self._fluent_sender


def init_streaming_attributes(self): # pylint: disable=too-many-locals
Logger.log_message('Updating The streaming attributes', LOG_LEVELS.DEBUG)
Expand All @@ -180,14 +179,14 @@ def init_streaming_attributes(self): # pylint: disable=too-many-locals
if not is_processed:
telemetry_data = self.telem_parser.get_metrics(_host, _port, _url, _msg_tag)
if telemetry_data:

# CSV format
rows = telemetry_data.split("\n")
if len(rows):
headers = rows[0].split(",")
for attribute in headers:
self.attributes_mngr.add_streaming_attribute(attribute)

processed_endpoints[endpoint_id] = True
# update the streaming attributes files
self.attributes_mngr.update_saved_streaming_attributes()
Expand Down Expand Up @@ -249,8 +248,11 @@ def stream_data(self, telemetry_endpoint): # pylint: disable=too-many-locals
self.last_streamed_data_sample_per_endpoint[msg_tag] = {}
logging.info('Start Processing The Received Response From %s', msg_tag)
start_time = time.time()
data_to_stream, new_data_timestamp, num_of_counters = self.telem_parser.parse_telemetry_csv_metrics_to_json(telemetry_data, msg_tag, is_xdr_mode, self.stream_only_new_samples)

data_to_stream, new_data_timestamp, num_of_counters = \
self.telem_parser.parse_telemetry_csv_metrics_to_json(telemetry_data, msg_tag,
is_xdr_mode,
self.stream_only_new_samples)

end_time = time.time()
data_len = len(data_to_stream)
resp_process_time = round(end_time - start_time, 6)
Expand Down
19 changes: 9 additions & 10 deletions plugins/fluentd_telemetry_plugin/src/telemetry_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class TelemetryParser:
"""
UFM TelemetryParser class - to fetch and parse the telemetry data
"""

PORT_CONSTANTS_KEYS = {
'timestamp': 'timestamp', 'source_id': 'source_id', 'tag': 'tag',
'node_guid': 'node_guid', 'port_guid': 'port_guid',
Expand All @@ -40,14 +40,14 @@ class TelemetryParser:
NORMAL_PORT_ID_KEYS = {'node_guid', 'Node_GUID', 'port_guid', 'port_num', 'Port_Number', 'Port'}
AGG_PORT_ID_KEYS = {'sys_image_guid', 'aport'}
PORT_TYPE_KEY = 'port_type'

def __init__(self, conf_parser, monitor_streaming_mgr, _last_streamed_data_sample_per_endpoint, attr_mngr):
self.config_parser = conf_parser
self.streaming_metrics_mgr = monitor_streaming_mgr
self.last_streamed_data_sample_per_endpoint = _last_streamed_data_sample_per_endpoint
self.meta_fields = self.config_parser.get_meta_fields()
self.attributes_mngr = attr_mngr

@staticmethod
def append_filters_to_telemetry_url(url: str, xdr_mode: bool, port_types: List[str]):
"""
Expand Down Expand Up @@ -97,7 +97,6 @@ def get_metrics(self, _host, _port, _url, msg_tag):
except Exception as ex: # pylint: disable=broad-except
logging.error(ex)
return None


def _parse_telemetry_csv_metrics_to_json_with_delta(self, available_headers, rows,
port_key_generator, port_key_generator_args,
Expand Down Expand Up @@ -188,7 +187,7 @@ def _parse_telemetry_csv_metrics_to_json_with_delta(self, available_headers, row
dic = self._append_meta_fields_to_dict(dic)
output.append(dic)
return output, None

def _parse_telemetry_csv_metrics_to_json_without_delta(self, available_headers, rows,
port_key_generator, port_key_generator_args,
is_meta_fields_available, endpoint_key):
Expand Down Expand Up @@ -291,7 +290,7 @@ def parse_telemetry_csv_metrics_to_json(self, data, msg_tag, is_xdr_mode, stream
is_meta_fields_available, msg_tag)

return parsed_data, new_timestamp, len(keys)

def _append_meta_fields_to_dict(self, dic):
keys = dic.keys()
aliases_meta_fields, custom_meta_fields = self.meta_fields
Expand All @@ -307,7 +306,7 @@ def _append_meta_fields_to_dict(self, dic):
for custom_field in custom_meta_fields:
dic[custom_field["key"]] = custom_field["value"]
return dic

def _get_filtered_counters(self, counters):
"""
:desc:
Expand Down Expand Up @@ -337,7 +336,7 @@ def _convert_str_to_num(str_val):
except ValueError:
return str_val

@staticmethod
@staticmethod
def _get_port_keys_indexes_from_csv_headers(headers: List[str]):
"""
Extracts the indexes of specific port keys from CSV headers.
Expand Down Expand Up @@ -368,7 +367,7 @@ def _get_port_keys_indexes_from_csv_headers(headers: List[str]):
if key == TelemetryParser.PORT_TYPE_KEY and port_type_key_index == -1:
port_type_key_index = i
return normal_port_id_keys_indexes, aggr_port_id_keys_indexes, port_type_key_index

@staticmethod
def _get_xdr_port_id_from_csv_row(port_values,
normal_port_id_keys_indexes,
Expand Down Expand Up @@ -396,7 +395,7 @@ def _get_xdr_port_id_from_csv_row(port_values,
if port_type == PortType.AGGREGATED.value:
port_id_keys_indexes = aggr_port_id_keys_indexes
return TelemetryParser._get_port_id_from_csv_row(port_values, port_id_keys_indexes)

@staticmethod
def _get_port_id_from_csv_row(port_values, port_indexes):
"""
Expand Down

0 comments on commit 47c0b47

Please sign in to comment.