diff --git a/plugins/fluentd_telemetry_plugin/src/streamer.py b/plugins/fluentd_telemetry_plugin/src/streamer.py index bfdd58d8..2c051582 100644 --- a/plugins/fluentd_telemetry_plugin/src/streamer.py +++ b/plugins/fluentd_telemetry_plugin/src/streamer.py @@ -79,7 +79,7 @@ def ufm_telemetry_xdr_ports_types(self): @property def streaming_interval(self): return self.config_parser.get_streaming_interval() - + @property def fluentd_msg_tag(self): return self.config_parser.get_fluentd_msg_tag() @@ -161,7 +161,6 @@ def fluent_sender(self): timeout=timeout, use_c=_use_c) return self._fluent_sender - def init_streaming_attributes(self): # pylint: disable=too-many-locals Logger.log_message('Updating The streaming attributes', LOG_LEVELS.DEBUG) @@ -180,14 +179,14 @@ def init_streaming_attributes(self): # pylint: disable=too-many-locals if not is_processed: telemetry_data = self.telem_parser.get_metrics(_host, _port, _url, _msg_tag) if telemetry_data: - + # CSV format rows = telemetry_data.split("\n") if len(rows): headers = rows[0].split(",") for attribute in headers: self.attributes_mngr.add_streaming_attribute(attribute) - + processed_endpoints[endpoint_id] = True # update the streaming attributes files self.attributes_mngr.update_saved_streaming_attributes() @@ -249,8 +248,11 @@ def stream_data(self, telemetry_endpoint): # pylint: disable=too-many-locals self.last_streamed_data_sample_per_endpoint[msg_tag] = {} logging.info('Start Processing The Received Response From %s', msg_tag) start_time = time.time() - data_to_stream, new_data_timestamp, num_of_counters = self.telem_parser.parse_telemetry_csv_metrics_to_json(telemetry_data, msg_tag, is_xdr_mode, self.stream_only_new_samples) - + data_to_stream, new_data_timestamp, num_of_counters = \ + self.telem_parser.parse_telemetry_csv_metrics_to_json(telemetry_data, msg_tag, + is_xdr_mode, + self.stream_only_new_samples) + end_time = time.time() data_len = len(data_to_stream) resp_process_time = round(end_time - start_time, 6) diff --git a/plugins/fluentd_telemetry_plugin/src/telemetry_parser.py b/plugins/fluentd_telemetry_plugin/src/telemetry_parser.py index edd5a322..f124149b 100644 --- a/plugins/fluentd_telemetry_plugin/src/telemetry_parser.py +++ b/plugins/fluentd_telemetry_plugin/src/telemetry_parser.py @@ -26,7 +26,7 @@ class TelemetryParser: """ UFM TelemetryParser class - to fetch and parse the telemetry data """ - + PORT_CONSTANTS_KEYS = { 'timestamp': 'timestamp', 'source_id': 'source_id', 'tag': 'tag', 'node_guid': 'node_guid', 'port_guid': 'port_guid', @@ -40,14 +40,14 @@ class TelemetryParser: NORMAL_PORT_ID_KEYS = {'node_guid', 'Node_GUID', 'port_guid', 'port_num', 'Port_Number', 'Port'} AGG_PORT_ID_KEYS = {'sys_image_guid', 'aport'} PORT_TYPE_KEY = 'port_type' - + def __init__(self, conf_parser, monitor_streaming_mgr, _last_streamed_data_sample_per_endpoint, attr_mngr): self.config_parser = conf_parser self.streaming_metrics_mgr = monitor_streaming_mgr self.last_streamed_data_sample_per_endpoint = _last_streamed_data_sample_per_endpoint self.meta_fields = self.config_parser.get_meta_fields() self.attributes_mngr = attr_mngr - + @staticmethod def append_filters_to_telemetry_url(url: str, xdr_mode: bool, port_types: List[str]): """ @@ -97,7 +97,6 @@ def get_metrics(self, _host, _port, _url, msg_tag): except Exception as ex: # pylint: disable=broad-except logging.error(ex) return None - def _parse_telemetry_csv_metrics_to_json_with_delta(self, available_headers, rows, port_key_generator, port_key_generator_args, @@ -188,7 +187,7 @@ def _parse_telemetry_csv_metrics_to_json_with_delta(self, available_headers, row dic = self._append_meta_fields_to_dict(dic) output.append(dic) return output, None - + def _parse_telemetry_csv_metrics_to_json_without_delta(self, available_headers, rows, port_key_generator, port_key_generator_args, is_meta_fields_available, endpoint_key): @@ -291,7 +290,7 @@ def parse_telemetry_csv_metrics_to_json(self, data, msg_tag, is_xdr_mode, stream is_meta_fields_available, msg_tag) return parsed_data, new_timestamp, len(keys) - + def _append_meta_fields_to_dict(self, dic): keys = dic.keys() aliases_meta_fields, custom_meta_fields = self.meta_fields @@ -307,7 +306,7 @@ def _append_meta_fields_to_dict(self, dic): for custom_field in custom_meta_fields: dic[custom_field["key"]] = custom_field["value"] return dic - + def _get_filtered_counters(self, counters): """ :desc: @@ -337,7 +336,7 @@ def _convert_str_to_num(str_val): except ValueError: return str_val - @staticmethod + @staticmethod def _get_port_keys_indexes_from_csv_headers(headers: List[str]): """ Extracts the indexes of specific port keys from CSV headers. @@ -368,7 +367,7 @@ def _get_port_keys_indexes_from_csv_headers(headers: List[str]): if key == TelemetryParser.PORT_TYPE_KEY and port_type_key_index == -1: port_type_key_index = i return normal_port_id_keys_indexes, aggr_port_id_keys_indexes, port_type_key_index - + @staticmethod def _get_xdr_port_id_from_csv_row(port_values, normal_port_id_keys_indexes, @@ -396,7 +395,7 @@ def _get_xdr_port_id_from_csv_row(port_values, if port_type == PortType.AGGREGATED.value: port_id_keys_indexes = aggr_port_id_keys_indexes return TelemetryParser._get_port_id_from_csv_row(port_values, port_id_keys_indexes) - + @staticmethod def _get_port_id_from_csv_row(port_values, port_indexes): """