diff --git a/plugins/ufm_events_grafana_dashboard_plugin/build/Dockerfile b/plugins/ufm_events_grafana_dashboard_plugin/build/Dockerfile index 47995f93..6933d930 100644 --- a/plugins/ufm_events_grafana_dashboard_plugin/build/Dockerfile +++ b/plugins/ufm_events_grafana_dashboard_plugin/build/Dockerfile @@ -7,15 +7,19 @@ ARG BASE_PATH=/opt/ufm/ufm_plugin_${PLUGIN_NAME} ARG SRC_BASE_DIR=${PLUGIN_NAME}_plugin ARG ETC_ALTERNATIVE_PATH=/var/etc ARG SUPERVISOR_PATH=${ETC_ALTERNATIVE_PATH}/supervisor +ARG LOKI_VERSION=3.1.0 +ARG PROMETHEUS_VERSION=2.54.0 +ARG GRAFANA_VERSION=11.1.0 ENV DEBIAN_FRONTEND=noninteractive ENV REQUIRED_UFM_VERSION=6.12.0 COPY ${SRC_BASE_DIR}/ ${BASE_PATH}/${SRC_BASE_DIR}/ +COPY utils/config_parser.py utils/singleton.py utils/logger.py ${BASE_PATH}/utils/ COPY ${SRC_BASE_DIR}/scripts/ / RUN apt-get update && apt-get upgrade -y && \ # Install plugin dependacies - apt-get install -y supervisor vim tzdata wget unzip curl \ + apt-get install -y supervisor vim tzdata wget unzip curl python3 python3-pip \ # Install Fluentd prerequisites gnupg build-essential ruby ruby-dev \ # Install Grafana prerequisites @@ -27,25 +31,32 @@ RUN apt-get update && apt-get upgrade -y && \ apt-get remove --purge -y ruby-dev build-essential && \ apt-get autoremove -y && \ # Install Loki - wget https://github.com/grafana/loki/releases/download/v3.1.0/loki-linux-amd64.zip && \ + wget https://github.com/grafana/loki/releases/download/v"${LOKI_VERSION}"/loki-linux-amd64.zip && \ unzip loki-linux-amd64.zip && \ mv loki-linux-amd64 /usr/local/bin/loki && \ rm loki-linux-amd64.zip && \ + # Install Prometheus + wget https://github.com/prometheus/prometheus/releases/download/v"${PROMETHEUS_VERSION}"/prometheus-"${PROMETHEUS_VERSION}".linux-amd64.tar.gz && \ + tar -xvf prometheus-"${PROMETHEUS_VERSION}".linux-amd64.tar.gz && \ + mv prometheus-"${PROMETHEUS_VERSION}".linux-amd64/prometheus /usr/local/bin/prometheus && \ + rm -rf prometheus-"${PROMETHEUS_VERSION}".linux-amd64.tar.gz && \ # Install Grafana - wget https://dl.grafana.com/oss/release/grafana_11.1.0_amd64.deb && \ - dpkg -i grafana_11.1.0_amd64.deb && \ - rm grafana_11.1.0_amd64.deb && \ + wget https://dl.grafana.com/oss/release/grafana_"${GRAFANA_VERSION}"_amd64.deb && \ + dpkg -i grafana_"${GRAFANA_VERSION}"_amd64.deb && \ + rm grafana_"${GRAFANA_VERSION}"_amd64.deb && \ # Final cleanup apt-get clean && \ rm -rf /var/lib/apt/lists/* -# move /etc/supervisor from the /etc, /etc dir will be overridden by the shared volume -RUN mkdir -p ${ETC_ALTERNATIVE_PATH} && mv /etc/supervisor ${ETC_ALTERNATIVE_PATH} +# install the python packages +RUN python3 -m pip install -r ${BASE_PATH}/${SRC_BASE_DIR}/src/${PLUGIN_NAME}/requirements.txt -RUN sed -i "s|/etc/supervisor/conf.d/\*.conf|${SUPERVISOR_PATH}/conf.d/\*.conf|g" ${SUPERVISOR_PATH}/supervisord.conf +# move /etc/supervisor from the /etc, /etc dir will be overridden by the shared volume +RUN mkdir -p ${ETC_ALTERNATIVE_PATH} && \ + mv /etc/supervisor ${ETC_ALTERNATIVE_PATH} && \ + sed -i "s|/etc/supervisor/conf.d/\*.conf|${SUPERVISOR_PATH}/conf.d/\*.conf|g" ${SUPERVISOR_PATH}/supervisord.conf # Copy Supervisor configuration file COPY ${SRC_BASE_DIR}/conf/supervisord.conf ${SUPERVISOR_PATH}/conf.d/ -# Start services using supervisord -CMD ["/usr/bin/supervisord", "-c", "/var/etc/supervisor/supervisord.conf"] +CMD ["/bin/bash", "-c", "/entrypoint.sh"] diff --git a/plugins/ufm_events_grafana_dashboard_plugin/build/docker_build.sh b/plugins/ufm_events_grafana_dashboard_plugin/build/docker_build.sh index d2fa8476..69a43ff3 100755 --- a/plugins/ufm_events_grafana_dashboard_plugin/build/docker_build.sh +++ b/plugins/ufm_events_grafana_dashboard_plugin/build/docker_build.sh @@ -115,6 +115,7 @@ echo ${IMAGE_VERSION} > ../../${PLUGIN_NAME}_plugin/version BUILD_DIR=$(create_out_dir) cp Dockerfile ${BUILD_DIR} +cp -r ../../../utils ${BUILD_DIR} cp -r ../../${PLUGIN_NAME}_plugin ${BUILD_DIR} echo "BUILD_DIR : [${BUILD_DIR}]" diff --git a/plugins/ufm_events_grafana_dashboard_plugin/conf/prometheus/prometheus-local-config.yaml b/plugins/ufm_events_grafana_dashboard_plugin/conf/prometheus/prometheus-local-config.yaml new file mode 100644 index 00000000..4a447e10 --- /dev/null +++ b/plugins/ufm_events_grafana_dashboard_plugin/conf/prometheus/prometheus-local-config.yaml @@ -0,0 +1,18 @@ +global: + scrape_interval: 60s + +scrape_configs: + # The job name is added as a label `job=` to any timeseries scraped from this config. + - job_name: "prometheus" + + # metrics_path defaults to '/metrics' + # scheme defaults to 'http'. + + static_configs: + - targets: ["127.0.0.1:@@PROMETHEUS_PORT@@"] + + metric_relabel_configs: + # Keep only the specified metrics and drop all others. + - source_labels: [__name__] + regex: '^(prometheus_tsdb_head_samples_appended_total|prometheus_tsdb_compaction_chunk_size_bytes_sum|prometheus_tsdb_compaction_chunk_samples_sum|prometheus_tsdb_storage_blocks_bytes|prometheus_tsdb_head_chunks_storage_size_bytes|prometheus_tsdb_wal_storage_size_bytes)$' + action: keep diff --git a/plugins/ufm_events_grafana_dashboard_plugin/conf/supervisord.conf b/plugins/ufm_events_grafana_dashboard_plugin/conf/supervisord.conf index e9c9bf52..54d87ba0 100644 --- a/plugins/ufm_events_grafana_dashboard_plugin/conf/supervisord.conf +++ b/plugins/ufm_events_grafana_dashboard_plugin/conf/supervisord.conf @@ -5,6 +5,18 @@ logfile=/opt/ufm/files/log/plugins/ufm_events_grafana_dashboard/supervisord.log logfile_backups=5 logfile_maxbytes=1048576 +[program:collector_service] +directory=/opt/ufm/ufm_plugin_ufm_events_grafana_dashboard +command=python3 /opt/ufm/ufm_plugin_ufm_events_grafana_dashboard/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/app.py +user=root +priority=100 +autostart=true +autorestart=true +startretries=1 +startsecs=1 +killasgroup=true +stopasgroup=true + [program:loki] command=/usr/local/bin/loki -config.file=/config/loki/loki-local-config.yaml user=root @@ -20,10 +32,24 @@ stderr_logfile_maxbytes=1048576 stdout_logfile_backups=5 stderr_logfile_backups=5 +[program:prometheus_server] +command=/usr/local/bin/prometheus --enable-feature=memory-snapshot-on-shutdown --web.enable-remote-write-receiver --web.enable-lifecycle --storage.tsdb.retention.time=@@prometheus_db_data_retention_time@@ --storage.tsdb.retention.size=@@prometheus_db_data_retention_size@@ --storage.tsdb.path=@@prometheus_db_folder@@ --config.file=@@prometheus_config_file@@ --web.listen-address=@@prometheus_ip@@:@@prometheus_port@@ +user=root +autostart=true +autorestart=true +startretries=1 +startsecs=1 +stdout_logfile=/opt/ufm/files/log/plugins/ufm_events_grafana_dashboard/prometheus.log +stderr_logfile=/opt/ufm/files/log/plugins/ufm_events_grafana_dashboard/prometheus.log +stdout_logfile_maxbytes=1048576 +stderr_logfile_maxbytes=1048576 +stdout_logfile_backups=5 +stderr_logfile_backups=5 + [program:fluentd] command=/usr/local/bin/fluentd -c /config/fluentd/fluentd.conf user=root -priority=200 +priority=250 autostart=true autorestart=true startretries=1 diff --git a/plugins/ufm_events_grafana_dashboard_plugin/conf/ufm_events_grafana_dashboard_plugin.conf b/plugins/ufm_events_grafana_dashboard_plugin/conf/ufm_events_grafana_dashboard_plugin.conf new file mode 100644 index 00000000..07dac5ae --- /dev/null +++ b/plugins/ufm_events_grafana_dashboard_plugin/conf/ufm_events_grafana_dashboard_plugin.conf @@ -0,0 +1,18 @@ +[telemetry] +url=http://127.0.0.1:9002/csv/xcset/low_freq_debug +interval=300 +enabled=True +labels_to_export_to_prometheus=Node_GUID,port_guid,Port_Number,Device_ID,node_description,link_partner_node_guid,link_partner_port_num,link_partner_description +metrics_to_export_to_prometheus=Link_Down + +[prometheus] +prometheus_ip=0.0.0.0 +prometheus_port=9292 +prometheus_db_data_retention_size=500MB +prometheus_db_data_retention_time=15d + +[logs-config] +logs_file_name = /opt/ufm/files/log/plugins/ufm_events_grafana_dashboard/plugin_console.log +logs_level = INFO +log_file_max_size = 10485760 +log_file_backup_count = 5 diff --git a/plugins/ufm_events_grafana_dashboard_plugin/scripts/entrypoint.sh b/plugins/ufm_events_grafana_dashboard_plugin/scripts/entrypoint.sh new file mode 100755 index 00000000..1010d994 --- /dev/null +++ b/plugins/ufm_events_grafana_dashboard_plugin/scripts/entrypoint.sh @@ -0,0 +1,93 @@ +#!/bin/bash +# +# Copyright © 2013-2024 NVIDIA CORPORATION & AFFILIATES. ALL RIGHTS RESERVED. +# +# This software product is a proprietary product of Nvidia Corporation and its affiliates +# (the "Company") and all right, title, and interest in and to the software +# product, including all associated intellectual property rights, are and +# shall remain exclusively with the Company. +# +# This software product is governed by the End User License Agreement +# provided with the software product. +# @author: Anan Al-Aghbar +# @date: Aug 25, 2024 +# + +CONFIG_FOLDER="/config" + +PLUGIN_CONF="${CONFIG_FOLDER}/ufm_events_grafana_dashboard_plugin.conf" + +ETC_PATH="/var/etc" +SUPERVISORD_PATH="${ETC_PATH}/supervisor" +SUPERVISORD_CONF="${SUPERVISORD_PATH}/conf.d/supervisord.conf" +PROMETHEUS_DATA_FOLDER="${CONFIG_FOLDER}/prometheus/prometheus_db" +PROMETHEUS_CONF_FILE="${CONFIG_FOLDER}/prometheus/prometheus-local-config.yaml" +PROMETHEUS_IP="127.0.0.1" +PROMETHEUS_PORT="9292" +PROMETHEUS_DB_DATA_RETENTION_SIZE="500MB" +PROMETHEUS_DB_DATA_RETENTION_TIME="15d" + +# Function to read parameters from a configuration file +read_param_from_conf_file() { + local config_file="$1" + local param_key="$2" + local default_value="$3" + local value="" + + # Check if the config file exists + if [[ ! -f "${config_file}" ]]; then + echo "${default_value}" + fi + + # Read the value associated with the key + value=$(sed -n -e "s/^${param_key}=\(.*\)/\1/p" "${config_file}") + + # Check if the value was found + if [[ -z "${value}" ]]; then + value="${default_value}" + fi + echo "${value}" +} + +modify_prometheus_server_conf_file() { + local prometheus_port="$1" + local prometheus_yaml_path="${PROMETHEUS_CONF_FILE}" + + echo "Updating the Prometheus port in ${prometheus_yaml_path} to ${prometheus_port}" + + sed -i "s|@@PROMETHEUS_PORT@@|${prometheus_port}|g" "${prometheus_yaml_path}" +} + +modify_prometheus_service_in_supervisor() { + local prometheus_ip="$1" + local prometheus_port="$2" + local prometheus_db_data_retention_size="$3" + local prometheus_db_data_retention_time="$4" + local prometheus_db_folder="${PROMETHEUS_DATA_FOLDER}" + local prometheus_config_file="${PROMETHEUS_CONF_FILE}" + local supervisor_conf="${SUPERVISORD_CONF}" + + sed -i "s|@@prometheus_ip@@|${prometheus_ip}|g" "${supervisor_conf}" + sed -i "s|@@prometheus_port@@|${prometheus_port}|g" "${supervisor_conf}" + sed -i "s|@@prometheus_db_data_retention_size@@|${prometheus_db_data_retention_size}|g" "${supervisor_conf}" + sed -i "s|@@prometheus_db_data_retention_time@@|${prometheus_db_data_retention_time}|g" "${supervisor_conf}" + sed -i "s|@@prometheus_db_folder@@|${prometheus_db_folder}|g" "${supervisor_conf}" + sed -i "s|@@prometheus_config_file@@|${prometheus_config_file}|g" "${supervisor_conf}" +} + +# Read configuration parameters +PROMETHEUS_IP="$(read_param_from_conf_file "${PLUGIN_CONF}" "prometheus_ip" "${PROMETHEUS_IP}")" +PROMETHEUS_PORT="$(read_param_from_conf_file "${PLUGIN_CONF}" "prometheus_port" "${PROMETHEUS_PORT}")" +PROMETHEUS_DB_DATA_RETENTION_SIZE="$(read_param_from_conf_file "${PLUGIN_CONF}" "prometheus_db_data_retention_size" "${PROMETHEUS_DB_DATA_RETENTION_SIZE}")" +PROMETHEUS_DB_DATA_RETENTION_TIME="$(read_param_from_conf_file "${PLUGIN_CONF}" "prometheus_db_data_retention_time" "${PROMETHEUS_DB_DATA_RETENTION_TIME}")" + + +# Update Prometheus server conf in supervisord +modify_prometheus_service_in_supervisor "${PROMETHEUS_IP}" "${PROMETHEUS_PORT}" "${PROMETHEUS_DB_DATA_RETENTION_SIZE}" "${PROMETHEUS_DB_DATA_RETENTION_TIME}" + +# Update prometheus-local-config.yaml file +modify_prometheus_server_conf_file "${PROMETHEUS_PORT}" + +echo "Starting supervisord..." +# Start services using supervisord +/usr/bin/supervisord -c "${SUPERVISORD_PATH}/supervisord.conf" diff --git a/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/__init__.py b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/__init__.py new file mode 100644 index 00000000..26680606 --- /dev/null +++ b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/__init__.py @@ -0,0 +1,11 @@ +# +# Copyright © 2013-2024 NVIDIA CORPORATION & AFFILIATES. ALL RIGHTS RESERVED. +# +# This software product is a proprietary product of Nvidia Corporation and its affiliates +# (the "Company") and all right, title, and interest in and to the software +# product, including all associated intellectual property rights, are and +# shall remain exclusively with the Company. +# +# This software product is governed by the End User License Agreement +# provided with the software product. +# diff --git a/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/app.py b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/app.py new file mode 100644 index 00000000..01d6d76b --- /dev/null +++ b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/app.py @@ -0,0 +1,49 @@ +# +# Copyright © 2013-2024 NVIDIA CORPORATION & AFFILIATES. ALL RIGHTS RESERVED. +# +# This software product is a proprietary product of Nvidia Corporation and its affiliates +# (the "Company") and all right, title, and interest in and to the software +# product, including all associated intellectual property rights, are and +# shall remain exclusively with the Company. +# +# This software product is governed by the End User License Agreement +# provided with the software product. +# +import os +import sys +sys.path.append(os.getcwd()) + +import time + +import data.manager as dm +from utils.logger import Logger, LOG_LEVELS +from mgr.configurations_mgr import UFMEventsGrafanaConfigParser +from data.collectors.collectors_mgr import CollectorMgr + + +def _init_logs(config_parser: UFMEventsGrafanaConfigParser) -> None: + # init logs configs + logs_file_name = config_parser.get_logs_file_name() + logs_level = config_parser.get_logs_level() + max_log_file_size = config_parser.get_log_file_max_size() + log_file_backup_count = config_parser.get_log_file_backup_count() + Logger.init_logs_config(logs_file_name, logs_level, max_log_file_size, log_file_backup_count) + + +if __name__ == '__main__': + + conf = None + try: + conf = UFMEventsGrafanaConfigParser.getInstance() + _init_logs(conf) + ####### + data_mgr = dm.DataManager() + collector_mgr = CollectorMgr(data_manager=data_mgr) + while True: + time.sleep(1) + except ValueError as ve: + Logger.log_message(f'Error occurred during the plugin initialization process : {str(ve)}', + LOG_LEVELS.ERROR) + except Exception as ex: + Logger.log_message(f'Error occurred during the plugin initialization process : {str(ex)}', + LOG_LEVELS.ERROR) diff --git a/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/constants/__init__.py b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/constants/__init__.py new file mode 100644 index 00000000..9034195f --- /dev/null +++ b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/constants/__init__.py @@ -0,0 +1,36 @@ +# +# Copyright © 2013-2024 NVIDIA CORPORATION & AFFILIATES. ALL RIGHTS RESERVED. +# +# This software product is a proprietary product of Nvidia Corporation and its affiliates +# (the "Company") and all right, title, and interest in and to the software +# product, including all associated intellectual property rights, are and +# shall remain exclusively with the Company. +# +# This software product is governed by the End User License Agreement +# provided with the software product. +# +from enum import Enum + + +class DataType(Enum): + """ + DataType Enums Class + """ + TELEMETRY = 1 + + +class ModelListeners(Enum): + """ + ModelListeners Enums class + """ + TELEMETRY_PROMETHEUS_EXPORTER = 1 + + +class Prometheus: + """ + Prometheus Constants Class + """ + LABELS = "labels" + COUNTER_VALUE = "counter_value" + TIMESTAMP = "timestamp" + diff --git a/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/data/__init__.py b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/data/__init__.py new file mode 100644 index 00000000..26680606 --- /dev/null +++ b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/data/__init__.py @@ -0,0 +1,11 @@ +# +# Copyright © 2013-2024 NVIDIA CORPORATION & AFFILIATES. ALL RIGHTS RESERVED. +# +# This software product is a proprietary product of Nvidia Corporation and its affiliates +# (the "Company") and all right, title, and interest in and to the software +# product, including all associated intellectual property rights, are and +# shall remain exclusively with the Company. +# +# This software product is governed by the End User License Agreement +# provided with the software product. +# diff --git a/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/data/collectors/__init__.py b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/data/collectors/__init__.py new file mode 100644 index 00000000..26680606 --- /dev/null +++ b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/data/collectors/__init__.py @@ -0,0 +1,11 @@ +# +# Copyright © 2013-2024 NVIDIA CORPORATION & AFFILIATES. ALL RIGHTS RESERVED. +# +# This software product is a proprietary product of Nvidia Corporation and its affiliates +# (the "Company") and all right, title, and interest in and to the software +# product, including all associated intellectual property rights, are and +# shall remain exclusively with the Company. +# +# This software product is governed by the End User License Agreement +# provided with the software product. +# diff --git a/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/data/collectors/base_collector.py b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/data/collectors/base_collector.py new file mode 100644 index 00000000..c725c2ac --- /dev/null +++ b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/data/collectors/base_collector.py @@ -0,0 +1,87 @@ +# +# Copyright © 2013-2024 NVIDIA CORPORATION & AFFILIATES. ALL RIGHTS RESERVED. +# +# This software product is a proprietary product of Nvidia Corporation and its affiliates +# (the "Company") and all right, title, and interest in and to the software +# product, including all associated intellectual property rights, are and +# shall remain exclusively with the Company. +# +# This software product is governed by the End User License Agreement +# provided with the software product. +# +import json +import httpx + +from abc import ABC, abstractmethod +from typing import Any + +from mgr.configurations_mgr import UFMEventsGrafanaConfigParser +from utils.logger import Logger, LOG_LEVELS +from data.models.base_model import BaseModel + + +class BaseCollector(ABC): + """Base class for a collector that collects data at a given interval""" + + def __init__(self, model: BaseModel, is_enabled: bool, interval: int): + if not isinstance(interval, int) or interval < 0: + raise RuntimeError(f"Invalid interval value {interval}. Please use non-negative int values") + self.model = model + self.interval = interval + self.is_enabled = is_enabled + + @abstractmethod + async def collect(self) -> None: + """Method that collects data""" + pass + + +class HttpCollector(BaseCollector): + """Base class that collects data from an HTTP URL""" + + def __init__(self, model: BaseModel, is_enabled: bool, + interval: int, url: str, jsonify: bool = False): + super().__init__(model, is_enabled, interval) + self.url = url + self.jsonify = jsonify + + async def collect(self): + """Method that collects data from an HTTP endpoint""" + try: + data = await self.do_http_get() + if self.model: + self.model.on_data(data) + except Exception as ex: + error_msg = f"Failed to collect data from {self.url} : {ex}" + Logger.log_message(error_msg, LOG_LEVELS.ERROR) + + async def do_http_get(self) -> Any: + """Method that performs an HTTP GET request""" + async with httpx.AsyncClient(verify=False) as client: + try: + Logger.log_message(f'Requesting URL: {self.url}', LOG_LEVELS.DEBUG) + response = await client.get(self.url) + Logger.log_message(f'Requesting URL: {self.url} ' + f'completed with status [{str(response.status_code)}]', LOG_LEVELS.DEBUG) + response.raise_for_status() + if self.jsonify: + return json.loads(response.text) + return response.text + except (ConnectionError, httpx.ConnectError) as con_err: + error_msg = f"Failed to GET from {self.url} : {con_err}" + Logger.log_message(error_msg, LOG_LEVELS.ERROR) + except Exception as ex: + error_msg = f"Failed to GET from {self.url} : {ex}" + Logger.log_message(error_msg, LOG_LEVELS.ERROR) + + +class TelemetryHttpCollector(HttpCollector): + """Class that collects telemetry metrics from a given URL""" + + def __init__(self, model: BaseModel): + conf = UFMEventsGrafanaConfigParser.getInstance() + is_enabled = conf.get_telemetry_enabled() + url = conf.get_telemetry_url() + interval = conf.get_telemetry_interval() + super(TelemetryHttpCollector, self).__init__(model=model, url=url, + interval=interval, is_enabled=is_enabled) diff --git a/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/data/collectors/collectors_mgr.py b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/data/collectors/collectors_mgr.py new file mode 100644 index 00000000..c714aa02 --- /dev/null +++ b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/data/collectors/collectors_mgr.py @@ -0,0 +1,69 @@ +# +# Copyright © 2013-2024 NVIDIA CORPORATION & AFFILIATES. ALL RIGHTS RESERVED. +# +# This software product is a proprietary product of Nvidia Corporation and its affiliates +# (the "Company") and all right, title, and interest in and to the software +# product, including all associated intellectual property rights, are and +# shall remain exclusively with the Company. +# +# This software product is governed by the End User License Agreement +# provided with the software product. +# +import asyncio +import datetime +import threading +from typing import Dict +from apscheduler.job import Job +from apscheduler.schedulers.background import BackgroundScheduler + + +from constants import DataType +from data.collectors.base_collector import BaseCollector,\ + TelemetryHttpCollector +from data.manager import DataManager + + +class CollectorMgr: + """Class that manages data collection""" + + DATA_TYPE_TO_COLLECTOR = { + DataType.TELEMETRY: TelemetryHttpCollector + } + + def __init__(self, data_manager: DataManager): + self.data_manager = data_manager + self.scheduler = BackgroundScheduler() + self.collectors: Dict[DataType, BaseCollector] = self._init_collectors() + self.jobs: Dict[DataType, Job] = {} + thread = threading.Thread(target=self.scheduler_jobs) + thread.start() + + def _init_collectors(self) -> Dict[DataType, BaseCollector]: + """Method that init the collectors objects""" + self.collectors = {} + for data_type, collector_cls in self.DATA_TYPE_TO_COLLECTOR.items(): + model = self.data_manager.get_model_by_data_type(data_type) + self.collectors[data_type] = collector_cls(model=model) + return self.collectors + + def scheduler_jobs(self): + jobs = {} + for collector_type, collector in self.collectors.items(): + if collector.is_enabled: + job = self.scheduler.add_job(self.do_collection, args=(collector,), + name=collector_type.name, + trigger='interval', seconds=collector.interval, + next_run_time=datetime.datetime.now()) + jobs[collector_type] = job + if not self.scheduler.running: + self.scheduler.start() + self.jobs = jobs + + def stop_job(self, data_type: DataType): + job = self.jobs.get(data_type) + if job and self.scheduler.running: + self.scheduler.remove_job(job.id) + + def do_collection(self, collector: BaseCollector): + """Method that performs data collection""" + asyncio.run(collector.collect()) diff --git a/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/data/listeners/__init__.py b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/data/listeners/__init__.py new file mode 100644 index 00000000..26680606 --- /dev/null +++ b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/data/listeners/__init__.py @@ -0,0 +1,11 @@ +# +# Copyright © 2013-2024 NVIDIA CORPORATION & AFFILIATES. ALL RIGHTS RESERVED. +# +# This software product is a proprietary product of Nvidia Corporation and its affiliates +# (the "Company") and all right, title, and interest in and to the software +# product, including all associated intellectual property rights, are and +# shall remain exclusively with the Company. +# +# This software product is governed by the End User License Agreement +# provided with the software product. +# diff --git a/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/data/listeners/base_listener.py b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/data/listeners/base_listener.py new file mode 100644 index 00000000..92864a9a --- /dev/null +++ b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/data/listeners/base_listener.py @@ -0,0 +1,35 @@ +# +# Copyright © 2013-2024 NVIDIA CORPORATION & AFFILIATES. ALL RIGHTS RESERVED. +# +# This software product is a proprietary product of Nvidia Corporation and its affiliates +# (the "Company") and all right, title, and interest in and to the software +# product, including all associated intellectual property rights, are and +# shall remain exclusively with the Company. +# +# This software product is governed by the End User License Agreement +# provided with the software product. +# +from abc import ABC, abstractmethod +from typing import List + +from constants import DataType +from data.models.base_model import BaseModel + + +class BaseListener(ABC): + def __init__(self, data_manager: 'Type[DataManager]', data_types: List[DataType]): + self.data_manager = data_manager + self.data_types = data_types + self._subscribe_to_models() + + @abstractmethod + def update_data(self): + """ + The implementation for the data logic upon data arrival + such as storing in database, trigger events, etc... + """ + + def _subscribe_to_models(self): + for data_type in self.data_types: + model: BaseModel = self.data_manager.get_model_by_data_type(data_type) + model.attach(self) diff --git a/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/data/listeners/telemetry_prometheus_exporter.py b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/data/listeners/telemetry_prometheus_exporter.py new file mode 100644 index 00000000..ec48289a --- /dev/null +++ b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/data/listeners/telemetry_prometheus_exporter.py @@ -0,0 +1,74 @@ +# +# Copyright © 2013-2024 NVIDIA CORPORATION & AFFILIATES. ALL RIGHTS RESERVED. +# +# This software product is a proprietary product of Nvidia Corporation and its affiliates +# (the "Company") and all right, title, and interest in and to the software +# product, including all associated intellectual property rights, are and +# shall remain exclusively with the Company. +# +# This software product is governed by the End User License Agreement +# provided with the software product. +# + +import pandas as pd +import time +from io import StringIO + +from mgr.configurations_mgr import UFMEventsGrafanaConfigParser +from data.listeners.base_listener import BaseListener +from data.models.telemetry_metrics_model import TelemetryMetricsModel +from constants import DataType, Prometheus +from prometheus.remote_write_utils import write_from_list_metrics_in_chunks +from utils.logger import Logger, LOG_LEVELS + + +class TelemetryPrometheusExporter(BaseListener): + + def __init__(self, data_manager: 'Type[DataManager]'): + super().__init__(data_manager, [DataType.TELEMETRY]) + conf: UFMEventsGrafanaConfigParser = UFMEventsGrafanaConfigParser.getInstance() + self.prometheus_ip = conf.get_prometheus_ip() + self.prometheus_port = conf.get_prometheus_port() + self.prometheus_max_chunk_size = conf.get_prometheus_request_max_chunk_size() + self.telemetry_prometheus_labels = conf.get_telemetry_labels_to_export_to_prometheus() + self.telemetry_prometheus_metrics = conf.get_telemetry_metrics_to_export_to_prometheus() + + def update_data(self): + telemetry_model: TelemetryMetricsModel = self.data_manager.get_model_by_data_type(DataType.TELEMETRY) + with telemetry_model.lock: + try: + prometheus_labels = self.telemetry_prometheus_labels + prometheus_metrics = self.telemetry_prometheus_metrics + data = telemetry_model.last_metrics_csv + Logger.log_message('Start processing telemetry metrics csv for export to Prometheus DB') + pst = time.time() + df = pd.read_csv(StringIO(data)) + df.fillna('', inplace=True) + data_dict = df.to_dict(orient='records') + metrics = [] + for row in data_dict: + metrics_dict = {} + basic_metric = { + Prometheus.TIMESTAMP: int(round(row.get('timestamp', time.time() * 1000))), + Prometheus.LABELS: {label: str(row.get(label, '')) for label in prometheus_labels} + } + for metric in prometheus_metrics: + value = row.get(metric, None) + if value is not None: + metrics_dict[metric] = { + **basic_metric, + Prometheus.COUNTER_VALUE: value + } + metrics.append(metrics_dict) + pet = time.time() + proc_time = round((pet - pst), 2) + Logger.log_message(f'Processing telemetry metrics csv for export to ' + f'Prometheus DB for {len(metrics)} ports' + f' completed successfully in {proc_time}') + write_from_list_metrics_in_chunks(metrics_data=metrics, + target_id=self.prometheus_ip, + target_port=self.prometheus_port, + max_chunk_size=self.prometheus_max_chunk_size) + except Exception as ex: + Logger.log_message(f'Failed to export telemetry metrics csv to Prometheus DB: {str(ex)}', + LOG_LEVELS.ERROR) diff --git a/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/data/manager.py b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/data/manager.py new file mode 100644 index 00000000..3d3c0ff5 --- /dev/null +++ b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/data/manager.py @@ -0,0 +1,53 @@ +# +# Copyright © 2013-2024 NVIDIA CORPORATION & AFFILIATES. ALL RIGHTS RESERVED. +# +# This software product is a proprietary product of Nvidia Corporation and its affiliates +# (the "Company") and all right, title, and interest in and to the software +# product, including all associated intellectual property rights, are and +# shall remain exclusively with the Company. +# +# This software product is governed by the End User License Agreement +# provided with the software product. +# +from typing import Dict +from constants import DataType, ModelListeners + +from data.models.base_model import BaseModel +from data.models.telemetry_metrics_model import TelemetryMetricsModel + +from data.listeners.base_listener import BaseListener +from data.listeners.telemetry_prometheus_exporter import TelemetryPrometheusExporter + + +class DataManager: + DATA_TYPE_TO_MODEL_CLS = { + DataType.TELEMETRY: TelemetryMetricsModel + } + + DATA_LISTENER_TO_CLS = { + ModelListeners.TELEMETRY_PROMETHEUS_EXPORTER: TelemetryPrometheusExporter + } + + def __init__(self): + self.models: Dict[DataType, BaseModel] = self._init_data_models() + self.listeners: Dict[ModelListeners, BaseListener] = self._init_data_listeners() + + def _init_data_models(self) -> Dict[DataType, BaseModel]: + models: Dict[DataType, BaseModel] = {} + for dtype, model_cls in self.DATA_TYPE_TO_MODEL_CLS.items(): + model: BaseModel = model_cls() + models[dtype] = model + return models + + def _init_data_listeners(self) -> Dict[ModelListeners, BaseListener]: + listeners: Dict[ModelListeners, BaseListener] = {} + for dtype, listener_cls in self.DATA_LISTENER_TO_CLS.items(): + listener: BaseListener = listener_cls(self) + listeners[dtype] = listener + return listeners + + def get_model_by_data_type(self, data_type: DataType): + """ + Returns the Model object for the given DataType + """ + return self.models.get(data_type) diff --git a/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/data/models/__init__.py b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/data/models/__init__.py new file mode 100644 index 00000000..26680606 --- /dev/null +++ b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/data/models/__init__.py @@ -0,0 +1,11 @@ +# +# Copyright © 2013-2024 NVIDIA CORPORATION & AFFILIATES. ALL RIGHTS RESERVED. +# +# This software product is a proprietary product of Nvidia Corporation and its affiliates +# (the "Company") and all right, title, and interest in and to the software +# product, including all associated intellectual property rights, are and +# shall remain exclusively with the Company. +# +# This software product is governed by the End User License Agreement +# provided with the software product. +# diff --git a/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/data/models/base_model.py b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/data/models/base_model.py new file mode 100644 index 00000000..20f7b3cc --- /dev/null +++ b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/data/models/base_model.py @@ -0,0 +1,50 @@ +# +# Copyright © 2013-2024 NVIDIA CORPORATION & AFFILIATES. ALL RIGHTS RESERVED. +# +# This software product is a proprietary product of Nvidia Corporation and its affiliates +# (the "Company") and all right, title, and interest in and to the software +# product, including all associated intellectual property rights, are and +# shall remain exclusively with the Company. +# +# This software product is governed by the End User License Agreement +# provided with the software product. +# +import threading +from abc import ABC, abstractmethod +from typing import Set, Any + + +class Notifier: + + def __init__(self, listeners: Set['Type[BaseListener]'] = None): + self.listeners: Set['Type[BaseListener]'] = listeners + if self.listeners is None: + self.listeners = set() + + def attach(self, listener): + if listener not in self.listeners: + self.listeners.add(listener) + + def notify_listeners(self): + for listener in self.listeners: + listener.update_data() + + +class BaseModel(Notifier, ABC): + """ + BaseModel is base class for all data model types + """ + def __init__(self): + super().__init__() + self.lock = threading.Lock() + self.ts = 0 + + @abstractmethod + def on_data(self, data: Any): + """ + called by designated data collector + """ + + def get_ts_milliseconds(self) -> int: + """Get the timestamp in milliseconds""" + return int(round(self.ts * 1000)) diff --git a/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/data/models/telemetry_metrics_model.py b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/data/models/telemetry_metrics_model.py new file mode 100644 index 00000000..8a825795 --- /dev/null +++ b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/data/models/telemetry_metrics_model.py @@ -0,0 +1,32 @@ +# +# Copyright © 2013-2024 NVIDIA CORPORATION & AFFILIATES. ALL RIGHTS RESERVED. +# +# This software product is a proprietary product of Nvidia Corporation and its affiliates +# (the "Company") and all right, title, and interest in and to the software +# product, including all associated intellectual property rights, are and +# shall remain exclusively with the Company. +# +# This software product is governed by the End User License Agreement +# provided with the software product. +# +import time + +from typing import Any + +from data.models.base_model import BaseModel + + +class TelemetryMetricsModel(BaseModel): + + def __init__(self): + super().__init__() + self.ts = 0 + self.last_metrics_csv: str = '' + + def on_data(self, data: Any): + if not data: + return + with self.lock: + self.ts = time.time() + self.last_metrics_csv = data + self.notify_listeners() diff --git a/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/mgr/__init__.py b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/mgr/__init__.py new file mode 100644 index 00000000..26680606 --- /dev/null +++ b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/mgr/__init__.py @@ -0,0 +1,11 @@ +# +# Copyright © 2013-2024 NVIDIA CORPORATION & AFFILIATES. ALL RIGHTS RESERVED. +# +# This software product is a proprietary product of Nvidia Corporation and its affiliates +# (the "Company") and all right, title, and interest in and to the software +# product, including all associated intellectual property rights, are and +# shall remain exclusively with the Company. +# +# This software product is governed by the End User License Agreement +# provided with the software product. +# diff --git a/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/mgr/configurations_mgr.py b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/mgr/configurations_mgr.py new file mode 100644 index 00000000..a35d72fa --- /dev/null +++ b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/mgr/configurations_mgr.py @@ -0,0 +1,86 @@ +# +# Copyright © 2013-2024 NVIDIA CORPORATION & AFFILIATES. ALL RIGHTS RESERVED. +# +# This software product is a proprietary product of Nvidia Corporation and its affiliates +# (the "Company") and all right, title, and interest in and to the software +# product, including all associated intellectual property rights, are and +# shall remain exclusively with the Company. +# +# This software product is governed by the End User License Agreement +# provided with the software product. +# +from typing import List + +from utils.config_parser import ConfigParser +from utils.singleton import Singleton + + +class UFMEventsGrafanaConfigParser(ConfigParser, Singleton): + + # for debugging + # CONFIG_FILE = '../../conf/ufm_events_grafana_dashboard_plugin.conf' + + CONFIG_FILE = '/config/ufm_events_grafana_dashboard_plugin.conf' + + TELEMETRY_CONFIG_SECTION = 'telemetry' + TELEMETRY_CONFIG_URL = 'url' + TELEMETRY_CONFIG_INTERVAL = 'interval' + TELEMETRY_CONFIG_ENABLED = 'enabled' + TELEMETRY_CONFIG_PROMETHEUS_LABELS = 'labels_to_export_to_prometheus' + TELEMETRY_CONFIG_PROMETHEUS_METRICS = 'metrics_to_export_to_prometheus' + ############ + PROMETHEUS_CONFIG_SECTION = "prometheus" + PROMETHEUS_SECTION_IP = "prometheus_ip" + PROMETHEUS_SECTION_PORT = "prometheus_port" + PROMETHEUS_REMOTE_WRITE_MAX_CHUNK_SIZE = "prometheus_remote_write_max_chunk_size" + + def __init__(self): + super().__init__(read_sdk_config=False) + self.sdk_config.read(self.CONFIG_FILE) + + def get_telemetry_url(self) -> str: + return self.get_config_value(None, + self.TELEMETRY_CONFIG_SECTION, + self.TELEMETRY_CONFIG_URL, + 'http://127.0.0.1:9002/csv/xcset/low_freq_debug') + + def get_telemetry_interval(self) -> int: + return self.safe_get_int(None, + self.TELEMETRY_CONFIG_SECTION, + self.TELEMETRY_CONFIG_INTERVAL, + 300) + + def get_telemetry_enabled(self) -> bool: + return self.safe_get_bool(None, + self.TELEMETRY_CONFIG_SECTION, + self.TELEMETRY_CONFIG_ENABLED, + True) + + def get_telemetry_labels_to_export_to_prometheus(self) -> List[str]: + return self.safe_get_list(None, + self.TELEMETRY_CONFIG_SECTION, + self.TELEMETRY_CONFIG_PROMETHEUS_LABELS, + 'Node_GUID,port_guid,Port_Number,Device_ID,node_description') + + def get_telemetry_metrics_to_export_to_prometheus(self) -> List[str]: + return self.safe_get_list(None, + self.TELEMETRY_CONFIG_SECTION, + self.TELEMETRY_CONFIG_PROMETHEUS_METRICS, + 'Link_Down') + + ##################################### + + def get_prometheus_ip(self) -> str: + return self.get_config_value(None, + self.PROMETHEUS_CONFIG_SECTION, + self.PROMETHEUS_SECTION_IP, "127.0.0.1") + + def get_prometheus_port(self) -> str: + return self.get_config_value(None, + self.PROMETHEUS_CONFIG_SECTION, + self.PROMETHEUS_SECTION_PORT, "9292") + + def get_prometheus_request_max_chunk_size(self) -> int: + return self.safe_get_int(None, + self.PROMETHEUS_CONFIG_SECTION, + self.PROMETHEUS_REMOTE_WRITE_MAX_CHUNK_SIZE, 10000) diff --git a/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/prometheus/__init__.py b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/prometheus/__init__.py new file mode 100644 index 00000000..26680606 --- /dev/null +++ b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/prometheus/__init__.py @@ -0,0 +1,11 @@ +# +# Copyright © 2013-2024 NVIDIA CORPORATION & AFFILIATES. ALL RIGHTS RESERVED. +# +# This software product is a proprietary product of Nvidia Corporation and its affiliates +# (the "Company") and all right, title, and interest in and to the software +# product, including all associated intellectual property rights, are and +# shall remain exclusively with the Company. +# +# This software product is governed by the End User License Agreement +# provided with the software product. +# diff --git a/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/prometheus/prometheus.proto b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/prometheus/prometheus.proto new file mode 100644 index 00000000..ba217d47 --- /dev/null +++ b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/prometheus/prometheus.proto @@ -0,0 +1,82 @@ +// This file cloned from https://github.com/prometheus/prometheus/blob/v2.24.0/prompb/remote.proto +// Copyright 2016 Prometheus Team +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +syntax = "proto3"; +package prometheus; + +option go_package = "prompb"; + +message WriteRequest { + repeated prometheus.TimeSeries timeseries = 1; +} + +message ReadRequest { + repeated Query queries = 1; +} + +message ReadResponse { + // In same order as the request's queries. + repeated QueryResult results = 1; +} + +message Query { + int64 start_timestamp_ms = 1; + int64 end_timestamp_ms = 2; + repeated prometheus.LabelMatcher matchers = 3; + prometheus.ReadHints hints = 4; +} + +message QueryResult { + // Samples within a time series must be ordered by time. + repeated prometheus.TimeSeries timeseries = 1; +} + +message Sample { + double value = 1; + int64 timestamp = 2; +} + +message TimeSeries { + repeated Label labels = 1; + repeated Sample samples = 2; +} + +message Label { + string name = 1; + string value = 2; +} + +message Labels { + repeated Label labels = 1; +} + +// Matcher specifies a rule, which can match or set of labels or not. +message LabelMatcher { + enum Type { + EQ = 0; + NEQ = 1; + RE = 2; + NRE = 3; + } + Type type = 1; + string name = 2; + string value = 3; +} + +message ReadHints { + int64 step_ms = 1; // Query step size in milliseconds. + string func = 2; // String representation of surrounding function or aggregation. + int64 start_ms = 3; // Start time in milliseconds. + int64 end_ms = 4; // End time in milliseconds. +} diff --git a/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/prometheus/prometheus_pb2.py b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/prometheus/prometheus_pb2.py new file mode 100644 index 00000000..2e522196 --- /dev/null +++ b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/prometheus/prometheus_pb2.py @@ -0,0 +1,581 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: prometheus.proto +# pylint: skip-file + +import sys +_b=sys.version_info[0]<3 and (lambda x:x) or (lambda x:x.encode('latin1')) +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor.FileDescriptor( + name='prometheus.proto', + package='prometheus', + syntax='proto3', + serialized_options=_b('Z\006prompb'), + serialized_pb=_b('\n\x10prometheus.proto\x12\nprometheus\":\n\x0cWriteRequest\x12*\n\ntimeseries\x18\x01 \x03(\x0b\x32\x16.prometheus.TimeSeries\"1\n\x0bReadRequest\x12\"\n\x07queries\x18\x01 \x03(\x0b\x32\x11.prometheus.Query\"8\n\x0cReadResponse\x12(\n\x07results\x18\x01 \x03(\x0b\x32\x17.prometheus.QueryResult\"\x8f\x01\n\x05Query\x12\x1a\n\x12start_timestamp_ms\x18\x01 \x01(\x03\x12\x18\n\x10\x65nd_timestamp_ms\x18\x02 \x01(\x03\x12*\n\x08matchers\x18\x03 \x03(\x0b\x32\x18.prometheus.LabelMatcher\x12$\n\x05hints\x18\x04 \x01(\x0b\x32\x15.prometheus.ReadHints\"9\n\x0bQueryResult\x12*\n\ntimeseries\x18\x01 \x03(\x0b\x32\x16.prometheus.TimeSeries\"*\n\x06Sample\x12\r\n\x05value\x18\x01 \x01(\x01\x12\x11\n\ttimestamp\x18\x02 \x01(\x03\"T\n\nTimeSeries\x12!\n\x06labels\x18\x01 \x03(\x0b\x32\x11.prometheus.Label\x12#\n\x07samples\x18\x02 \x03(\x0b\x32\x12.prometheus.Sample\"$\n\x05Label\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\r\n\x05value\x18\x02 \x01(\t\"+\n\x06Labels\x12!\n\x06labels\x18\x01 \x03(\x0b\x32\x11.prometheus.Label\"\x82\x01\n\x0cLabelMatcher\x12+\n\x04type\x18\x01 \x01(\x0e\x32\x1d.prometheus.LabelMatcher.Type\x12\x0c\n\x04name\x18\x02 \x01(\t\x12\r\n\x05value\x18\x03 \x01(\t\"(\n\x04Type\x12\x06\n\x02\x45Q\x10\x00\x12\x07\n\x03NEQ\x10\x01\x12\x06\n\x02RE\x10\x02\x12\x07\n\x03NRE\x10\x03\"L\n\tReadHints\x12\x0f\n\x07step_ms\x18\x01 \x01(\x03\x12\x0c\n\x04\x66unc\x18\x02 \x01(\t\x12\x10\n\x08start_ms\x18\x03 \x01(\x03\x12\x0e\n\x06\x65nd_ms\x18\x04 \x01(\x03\x42\x08Z\x06prompbb\x06proto3') +) + + + +_LABELMATCHER_TYPE = _descriptor.EnumDescriptor( + name='Type', + full_name='prometheus.LabelMatcher.Type', + filename=None, + file=DESCRIPTOR, + values=[ + _descriptor.EnumValueDescriptor( + name='EQ', index=0, number=0, + serialized_options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='NEQ', index=1, number=1, + serialized_options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='RE', index=2, number=2, + serialized_options=None, + type=None), + _descriptor.EnumValueDescriptor( + name='NRE', index=3, number=3, + serialized_options=None, + type=None), + ], + containing_type=None, + serialized_options=None, + serialized_start=710, + serialized_end=750, +) +_sym_db.RegisterEnumDescriptor(_LABELMATCHER_TYPE) + + +_WRITEREQUEST = _descriptor.Descriptor( + name='WriteRequest', + full_name='prometheus.WriteRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='timeseries', full_name='prometheus.WriteRequest.timeseries', index=0, + number=1, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=32, + serialized_end=90, +) + + +_READREQUEST = _descriptor.Descriptor( + name='ReadRequest', + full_name='prometheus.ReadRequest', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='queries', full_name='prometheus.ReadRequest.queries', index=0, + number=1, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=92, + serialized_end=141, +) + + +_READRESPONSE = _descriptor.Descriptor( + name='ReadResponse', + full_name='prometheus.ReadResponse', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='results', full_name='prometheus.ReadResponse.results', index=0, + number=1, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=143, + serialized_end=199, +) + + +_QUERY = _descriptor.Descriptor( + name='Query', + full_name='prometheus.Query', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='start_timestamp_ms', full_name='prometheus.Query.start_timestamp_ms', index=0, + number=1, type=3, cpp_type=2, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='end_timestamp_ms', full_name='prometheus.Query.end_timestamp_ms', index=1, + number=2, type=3, cpp_type=2, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='matchers', full_name='prometheus.Query.matchers', index=2, + number=3, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='hints', full_name='prometheus.Query.hints', index=3, + number=4, type=11, cpp_type=10, label=1, + has_default_value=False, default_value=None, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=202, + serialized_end=345, +) + + +_QUERYRESULT = _descriptor.Descriptor( + name='QueryResult', + full_name='prometheus.QueryResult', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='timeseries', full_name='prometheus.QueryResult.timeseries', index=0, + number=1, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=347, + serialized_end=404, +) + + +_SAMPLE = _descriptor.Descriptor( + name='Sample', + full_name='prometheus.Sample', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='value', full_name='prometheus.Sample.value', index=0, + number=1, type=1, cpp_type=5, label=1, + has_default_value=False, default_value=float(0), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='timestamp', full_name='prometheus.Sample.timestamp', index=1, + number=2, type=3, cpp_type=2, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=406, + serialized_end=448, +) + + +_TIMESERIES = _descriptor.Descriptor( + name='TimeSeries', + full_name='prometheus.TimeSeries', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='labels', full_name='prometheus.TimeSeries.labels', index=0, + number=1, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='samples', full_name='prometheus.TimeSeries.samples', index=1, + number=2, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=450, + serialized_end=534, +) + + +_LABEL = _descriptor.Descriptor( + name='Label', + full_name='prometheus.Label', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='name', full_name='prometheus.Label.name', index=0, + number=1, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='value', full_name='prometheus.Label.value', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=536, + serialized_end=572, +) + + +_LABELS = _descriptor.Descriptor( + name='Labels', + full_name='prometheus.Labels', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='labels', full_name='prometheus.Labels.labels', index=0, + number=1, type=11, cpp_type=10, label=3, + has_default_value=False, default_value=[], + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=574, + serialized_end=617, +) + + +_LABELMATCHER = _descriptor.Descriptor( + name='LabelMatcher', + full_name='prometheus.LabelMatcher', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='type', full_name='prometheus.LabelMatcher.type', index=0, + number=1, type=14, cpp_type=8, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='name', full_name='prometheus.LabelMatcher.name', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='value', full_name='prometheus.LabelMatcher.value', index=2, + number=3, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + _LABELMATCHER_TYPE, + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=620, + serialized_end=750, +) + + +_READHINTS = _descriptor.Descriptor( + name='ReadHints', + full_name='prometheus.ReadHints', + filename=None, + file=DESCRIPTOR, + containing_type=None, + fields=[ + _descriptor.FieldDescriptor( + name='step_ms', full_name='prometheus.ReadHints.step_ms', index=0, + number=1, type=3, cpp_type=2, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='func', full_name='prometheus.ReadHints.func', index=1, + number=2, type=9, cpp_type=9, label=1, + has_default_value=False, default_value=_b("").decode('utf-8'), + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='start_ms', full_name='prometheus.ReadHints.start_ms', index=2, + number=3, type=3, cpp_type=2, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + _descriptor.FieldDescriptor( + name='end_ms', full_name='prometheus.ReadHints.end_ms', index=3, + number=4, type=3, cpp_type=2, label=1, + has_default_value=False, default_value=0, + message_type=None, enum_type=None, containing_type=None, + is_extension=False, extension_scope=None, + serialized_options=None, file=DESCRIPTOR), + ], + extensions=[ + ], + nested_types=[], + enum_types=[ + ], + serialized_options=None, + is_extendable=False, + syntax='proto3', + extension_ranges=[], + oneofs=[ + ], + serialized_start=752, + serialized_end=828, +) + +_WRITEREQUEST.fields_by_name['timeseries'].message_type = _TIMESERIES +_READREQUEST.fields_by_name['queries'].message_type = _QUERY +_READRESPONSE.fields_by_name['results'].message_type = _QUERYRESULT +_QUERY.fields_by_name['matchers'].message_type = _LABELMATCHER +_QUERY.fields_by_name['hints'].message_type = _READHINTS +_QUERYRESULT.fields_by_name['timeseries'].message_type = _TIMESERIES +_TIMESERIES.fields_by_name['labels'].message_type = _LABEL +_TIMESERIES.fields_by_name['samples'].message_type = _SAMPLE +_LABELS.fields_by_name['labels'].message_type = _LABEL +_LABELMATCHER.fields_by_name['type'].enum_type = _LABELMATCHER_TYPE +_LABELMATCHER_TYPE.containing_type = _LABELMATCHER +DESCRIPTOR.message_types_by_name['WriteRequest'] = _WRITEREQUEST +DESCRIPTOR.message_types_by_name['ReadRequest'] = _READREQUEST +DESCRIPTOR.message_types_by_name['ReadResponse'] = _READRESPONSE +DESCRIPTOR.message_types_by_name['Query'] = _QUERY +DESCRIPTOR.message_types_by_name['QueryResult'] = _QUERYRESULT +DESCRIPTOR.message_types_by_name['Sample'] = _SAMPLE +DESCRIPTOR.message_types_by_name['TimeSeries'] = _TIMESERIES +DESCRIPTOR.message_types_by_name['Label'] = _LABEL +DESCRIPTOR.message_types_by_name['Labels'] = _LABELS +DESCRIPTOR.message_types_by_name['LabelMatcher'] = _LABELMATCHER +DESCRIPTOR.message_types_by_name['ReadHints'] = _READHINTS +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +WriteRequest = _reflection.GeneratedProtocolMessageType('WriteRequest', (_message.Message,), dict( + DESCRIPTOR = _WRITEREQUEST, + __module__ = 'prometheus_pb2' + # @@protoc_insertion_point(class_scope:prometheus.WriteRequest) + )) +_sym_db.RegisterMessage(WriteRequest) + +ReadRequest = _reflection.GeneratedProtocolMessageType('ReadRequest', (_message.Message,), dict( + DESCRIPTOR = _READREQUEST, + __module__ = 'prometheus_pb2' + # @@protoc_insertion_point(class_scope:prometheus.ReadRequest) + )) +_sym_db.RegisterMessage(ReadRequest) + +ReadResponse = _reflection.GeneratedProtocolMessageType('ReadResponse', (_message.Message,), dict( + DESCRIPTOR = _READRESPONSE, + __module__ = 'prometheus_pb2' + # @@protoc_insertion_point(class_scope:prometheus.ReadResponse) + )) +_sym_db.RegisterMessage(ReadResponse) + +Query = _reflection.GeneratedProtocolMessageType('Query', (_message.Message,), dict( + DESCRIPTOR = _QUERY, + __module__ = 'prometheus_pb2' + # @@protoc_insertion_point(class_scope:prometheus.Query) + )) +_sym_db.RegisterMessage(Query) + +QueryResult = _reflection.GeneratedProtocolMessageType('QueryResult', (_message.Message,), dict( + DESCRIPTOR = _QUERYRESULT, + __module__ = 'prometheus_pb2' + # @@protoc_insertion_point(class_scope:prometheus.QueryResult) + )) +_sym_db.RegisterMessage(QueryResult) + +Sample = _reflection.GeneratedProtocolMessageType('Sample', (_message.Message,), dict( + DESCRIPTOR = _SAMPLE, + __module__ = 'prometheus_pb2' + # @@protoc_insertion_point(class_scope:prometheus.Sample) + )) +_sym_db.RegisterMessage(Sample) + +TimeSeries = _reflection.GeneratedProtocolMessageType('TimeSeries', (_message.Message,), dict( + DESCRIPTOR = _TIMESERIES, + __module__ = 'prometheus_pb2' + # @@protoc_insertion_point(class_scope:prometheus.TimeSeries) + )) +_sym_db.RegisterMessage(TimeSeries) + +Label = _reflection.GeneratedProtocolMessageType('Label', (_message.Message,), dict( + DESCRIPTOR = _LABEL, + __module__ = 'prometheus_pb2' + # @@protoc_insertion_point(class_scope:prometheus.Label) + )) +_sym_db.RegisterMessage(Label) + +Labels = _reflection.GeneratedProtocolMessageType('Labels', (_message.Message,), dict( + DESCRIPTOR = _LABELS, + __module__ = 'prometheus_pb2' + # @@protoc_insertion_point(class_scope:prometheus.Labels) + )) +_sym_db.RegisterMessage(Labels) + +LabelMatcher = _reflection.GeneratedProtocolMessageType('LabelMatcher', (_message.Message,), dict( + DESCRIPTOR = _LABELMATCHER, + __module__ = 'prometheus_pb2' + # @@protoc_insertion_point(class_scope:prometheus.LabelMatcher) + )) +_sym_db.RegisterMessage(LabelMatcher) + +ReadHints = _reflection.GeneratedProtocolMessageType('ReadHints', (_message.Message,), dict( + DESCRIPTOR = _READHINTS, + __module__ = 'prometheus_pb2' + # @@protoc_insertion_point(class_scope:prometheus.ReadHints) + )) +_sym_db.RegisterMessage(ReadHints) + + +DESCRIPTOR._options = None +# @@protoc_insertion_point(module_scope) diff --git a/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/prometheus/remote_write_utils.py b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/prometheus/remote_write_utils.py new file mode 100644 index 00000000..85c62888 --- /dev/null +++ b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/prometheus/remote_write_utils.py @@ -0,0 +1,141 @@ +# +# Copyright © 2013-2024 NVIDIA CORPORATION & AFFILIATES. ALL RIGHTS RESERVED. +# +# This software product is a proprietary product of Nvidia Corporation and its affiliates +# (the "Company") and all right, title, and interest in and to the software +# product, including all associated intellectual property rights, are and +# shall remain exclusively with the Company. +# +# This software product is governed by the End User License Agreement +# provided with the software product. +# +import requests +import snappy +import calendar + +from datetime import datetime +from concurrent.futures import ThreadPoolExecutor, as_completed + +from prometheus.prometheus_pb2 import WriteRequest +from utils.logger import Logger, LOG_LEVELS + +BASE_LABELS = ['Node_GUID', 'port_guid', 'Port_Number', 'Device_ID', 'node_description'] +DEFAULT_TARGET_IP = "localhost" +DEFAULT_TARGET_PORT = "9292" + + +def dt2ts(dt): + """Converts a datetime object to UTC timestamp + naive datetime will be considered UTC. + """ + return calendar.timegm(dt.utctimetuple()) + + +def send(write_request, target_id=DEFAULT_TARGET_IP, target_port=DEFAULT_TARGET_PORT): + """ + Send Write Request to Prometheus DB + """ + uncompressed = write_request.SerializeToString() + compressed = snappy.compress(uncompressed) + url = f"http://{target_id}:{target_port}/api/v1/write" + headers = { + "Content-Encoding": "snappy", + "Content-Type": "application/x-protobuf", + "X-Prometheus-Remote-Write-Version": "0.1.0", + "User-Agent": "metrics-worker" + } + try: + # pylint: disable=missing-timeout + response = requests.post(url, headers=headers, data=compressed) + if response.status_code in [204, 200]: + return True + err_msg = f'failed to send metrics to server {target_id}:{target_port}, ' \ + f'with response info: {response.status_code} {response.text}' + Logger.log_message(err_msg, LOG_LEVELS.WARNING) + return False + # pylint: disable=broad-exception-caught + except Exception as e: + Logger.log_message(e, LOG_LEVELS.ERROR) + return False + + +def add_metric(write_request, name: str, labels: dict, value: float, timestamp: int = None): + """ + Add counter/metric to write_request object + """ + series = write_request.timeseries.add() + + # name label always required + label = series.labels.add() + label.name = "__name__" + label.value = name + + for label_name, label_value in labels.items(): + label = series.labels.add() + label.name = label_name + label.value = label_value + + sample = series.samples.add() + sample.value = value + sample.timestamp = timestamp or (dt2ts(datetime.utcnow()) * 1000) + + +def split_metrics_into_chunks(data: list, chunk_size: int): + """Yield successive chunk_size chunks from data.""" + for i in range(0, len(data), chunk_size): + yield data[i:i + chunk_size] + + +def write_from_list_metrics(metrics_data: list, + target_id=DEFAULT_TARGET_IP, + target_port=DEFAULT_TARGET_PORT): + """ + :param metrics_data: + [{counter_name:{labels:{label:value,..},counter_value:value,timestamp:value}] + :param target_id: + :param target_port: + :return: + """ + write_request = WriteRequest() + for data in metrics_data: + for counter, counter_data in data.items(): + labels = counter_data.get('labels') + counter_value = counter_data.get('counter_value') + timestamp = counter_data.get('timestamp') + + add_metric(write_request, counter, labels, counter_value, timestamp) + # Send to remote write endpoint + return send(write_request, target_id, target_port) + + +def write_from_chunk_metrics(metrics_data: list, chunk_index: int, + target_id=DEFAULT_TARGET_IP, target_port=DEFAULT_TARGET_PORT): + debug_msg = f"Start processing metrics chunk [{chunk_index}]" + Logger.log_message(debug_msg, LOG_LEVELS.DEBUG) + success = write_from_list_metrics(metrics_data, target_id, target_port) + if success: + debug_msg = f"Successfully sent metrics chunk [{chunk_index}]" + Logger.log_message(debug_msg, LOG_LEVELS.DEBUG) + else: + err_msg = f"Failed to send metrics chunk [{chunk_index}]" + Logger.log_message(err_msg, LOG_LEVELS.WARNING) + return success + + +def write_from_list_metrics_in_chunks(metrics_data: list, + target_id=DEFAULT_TARGET_IP, target_port=DEFAULT_TARGET_PORT, + max_chunk_size=10000, max_workers=5): + """ + Send metrics_data list in chunks to Prometheus DB + """ + metrics_chunks = list(split_metrics_into_chunks(metrics_data, max_chunk_size)) + with ThreadPoolExecutor(max_workers=max_workers) as executor: + write_requests = { + executor.submit(write_from_chunk_metrics, chunk, i, target_id, target_port): + i for i, chunk in enumerate(metrics_chunks) + } + for request in as_completed(write_requests): + chunk_index = write_requests[request] + if not request.result(): + err_msg = f"Metrics Chunk [{chunk_index}] failed to send." + Logger.log_message(err_msg, LOG_LEVELS.ERROR) diff --git a/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/requirements.txt b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/requirements.txt new file mode 100644 index 00000000..f0e1e366 --- /dev/null +++ b/plugins/ufm_events_grafana_dashboard_plugin/src/ufm_events_grafana_dashboard/requirements.txt @@ -0,0 +1,10 @@ +apscheduler==3.10.1 +httpx==0.24.1 +protobuf==3.20.2 +prometheus-client==0.17.1 +jsonschema==4.18.4 +requests==2.31.0 +python-snappy==0.6.1 +# installing this version of numpy as its dependency for pandas +numpy==1.26.4 +pandas==2.0.3