diff --git a/karapace.config.json b/karapace.config.json index 3a4fe0a43..f07af6710 100644 --- a/karapace.config.json +++ b/karapace.config.json @@ -28,7 +28,11 @@ "topic_name": "_schemas", "protobuf_runtime_directory": "runtime", "session_timeout_ms": 10000, + "stats_service": "statsd", "metrics_extended": true, "statsd_host": "127.0.0.1", - "statsd_port": 8125 + "statsd_port": 8125, + "prometheus_host": "127.0.0.1", + "prometheus_port": 8005, + } diff --git a/karapace/base_stats.py b/karapace/base_stats.py new file mode 100644 index 000000000..b6fe87a0c --- /dev/null +++ b/karapace/base_stats.py @@ -0,0 +1,57 @@ +""" +karapace - basestats + +Supports base class for statsd and prometheus protocols: + +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" +from __future__ import annotations + +from abc import ABC, abstractmethod +from contextlib import contextmanager +from karapace.config import Config +from karapace.sentry import get_sentry_client +from typing import Final, Iterator + +import time + + +class StatsClient(ABC): + @abstractmethod + def __init__( + self, + config: Config, + ) -> None: + self.sentry_client: Final = get_sentry_client(sentry_config=config.get("sentry", None)) + + @contextmanager + def timing_manager(self, metric: str, tags: dict | None = None) -> Iterator[None]: + start_time = time.monotonic() + yield + self.timing(metric, time.monotonic() - start_time, tags) + + @abstractmethod + def gauge(self, metric: str, value: float, tags: dict | None = None) -> None: + pass + + @abstractmethod + def increase(self, metric: str, inc_value: int = 1, tags: dict | None = None) -> None: + pass + + @abstractmethod + def timing(self, metric: str, value: float, tags: dict | None = None) -> None: + pass + + def unexpected_exception(self, ex: Exception, where: str, tags: dict | None = None) -> None: + all_tags = { + "exception": ex.__class__.__name__, + "where": where, + } + all_tags.update(tags or {}) + self.increase("exception", tags=all_tags) + scope_args = {**(tags or {}), "where": where} + self.sentry_client.unexpected_exception(error=ex, where=where, tags=scope_args) + + def close(self) -> None: + self.sentry_client.close() diff --git a/karapace/config.py b/karapace/config.py index 4c93a2b0c..b3016eb07 100644 --- a/karapace/config.py +++ b/karapace/config.py @@ -75,9 +75,12 @@ class Config(TypedDict): karapace_registry: bool master_election_strategy: str protobuf_runtime_directory: str + stats_service: str metrics_extended: bool statsd_host: str statsd_port: int + prometheus_host: str | None + prometheus_port: int | None sentry: NotRequired[Mapping[str, object]] tags: NotRequired[Mapping[str, object]] @@ -147,9 +150,12 @@ class ConfigDefaults(Config, total=False): "karapace_registry": False, "master_election_strategy": "lowest", "protobuf_runtime_directory": "runtime", + "stats_service": "statsd", "metrics_extended": True, "statsd_host": "127.0.0.1", "statsd_port": 8125, + "prometheus_host": "127.0.0.1", + "prometheus_port": 8005, } SECRET_CONFIG_OPTIONS = [SASL_PLAIN_PASSWORD] diff --git a/karapace/metrics.py b/karapace/metrics.py index bf45bec3a..0cfaba056 100644 --- a/karapace/metrics.py +++ b/karapace/metrics.py @@ -10,8 +10,10 @@ """ from __future__ import annotations +from karapace.base_stats import StatsClient from karapace.config import Config -from karapace.statsd import StatsClient +from karapace.prometheus import PrometheusClient +from karapace.statsd import StatsdClient import os import psutil @@ -20,6 +22,10 @@ import time +class MetricsException(Exception): + pass + + class Singleton(type): _instance: Singleton | None = None @@ -31,68 +37,71 @@ def __call__(cls, *args: str, **kwargs: int) -> Singleton: class Metrics(metaclass=Singleton): - def __init__(self) -> None: - self.active = False - self.stats_client: StatsClient | None = None + stats_client: StatsClient + + def __init__( + self, + ) -> None: self.is_ready = False self.stop_event = threading.Event() self.worker_thread = threading.Thread(target=self.worker) self.lock = threading.Lock() - def setup(self, stats_client: StatsClient, config: Config) -> None: - self.active = config.get("metrics_extended") or False - if not self.active: - return + def setup(self, config: Config) -> None: with self.lock: if self.is_ready: return - self.is_ready = True - if not self.stats_client: - self.stats_client = stats_client - else: - self.active = False - return - schedule.every(10).seconds.do(self.connections) - self.worker_thread.start() + stats_service = config.get("stats_service") + if not config.get("metrics_extended"): + return + if stats_service == "statsd": + self.stats_client = StatsdClient(config=config) + elif stats_service == "prometheus": + self.stats_client = PrometheusClient(config=config) + else: + raise MetricsException('Config variable "stats_service" is not defined') + self.is_ready = True + schedule.every(10).seconds.do(self.connections) + self.worker_thread.start() def request(self, size: int) -> None: - if not self.active: + if not self.is_ready or self.stats_client is None: return if not isinstance(self.stats_client, StatsClient): raise RuntimeError("no StatsClient available") self.stats_client.gauge("request-size", size) def response(self, size: int) -> None: - if not self.active: + if not self.is_ready or self.stats_client is None: return if not isinstance(self.stats_client, StatsClient): raise RuntimeError("no StatsClient available") self.stats_client.gauge("response-size", size) def are_we_master(self, is_master: bool) -> None: - if not self.active: + if not self.is_ready or self.stats_client is None: return if not isinstance(self.stats_client, StatsClient): raise RuntimeError("no StatsClient available") self.stats_client.gauge("master-slave-role", int(is_master)) def latency(self, latency_ms: float) -> None: - if not self.active: + if not self.is_ready or self.stats_client is None: return if not isinstance(self.stats_client, StatsClient): raise RuntimeError("no StatsClient available") self.stats_client.timing("latency_ms", latency_ms) def error(self) -> None: - if not self.active: + if not self.is_ready or self.stats_client is None: return if not isinstance(self.stats_client, StatsClient): raise RuntimeError("no StatsClient available") self.stats_client.increase("error_total", 1) def connections(self) -> None: - if not self.active: + if not self.is_ready or self.stats_client is None: return if not isinstance(self.stats_client, StatsClient): raise RuntimeError("no StatsClient available") @@ -112,7 +121,9 @@ def worker(self) -> None: time.sleep(1) def cleanup(self) -> None: - if not self.active: + if self.stats_client: + self.stats_client.close() + if not self.is_ready: return self.stop_event.set() if self.worker_thread.is_alive(): diff --git a/karapace/prometheus.py b/karapace/prometheus.py new file mode 100644 index 000000000..2ecd78c37 --- /dev/null +++ b/karapace/prometheus.py @@ -0,0 +1,69 @@ +""" +karapace - prometheus + +Supports telegraf's statsd protocol extension for 'key=value' tags: + + https://github.com/influxdata/telegraf/tree/master/plugins/inputs/statsd + +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" +from __future__ import annotations + +from karapace.base_stats import StatsClient +from karapace.config import Config +from prometheus_client import Counter, Gauge, start_http_server, Summary +from typing import Final + +import logging + +LOG = logging.getLogger(__name__) +HOST: Final = "127.0.0.1" +PORT: Final = 8005 + + +class PrometheusException(Exception): + pass + + +class PrometheusClient(StatsClient): + server_is_active = False + + def __init__(self, config: Config, host: str = HOST, port: int = PORT) -> None: + super().__init__(config) + + _host = config.get("prometheus_host") if "prometheus_host" in config else host + _port = config.get("prometheus_port") if "prometheus_port" in config else port + if _host is None: + raise PrometheusException("prometheus_host host is undefined") + if _port is None: + raise PrometheusException("prometheus_host port is undefined") + if not self.server_is_active: + start_http_server(_port, _host) + self.server_is_active = True + else: + raise PrometheusException("Double instance of Prometheus interface") + self._gauge: dict[str, Gauge] = dict() + self._summary: dict[str, Summary] = dict() + self._counter: dict[str, Counter] = dict() + + def gauge(self, metric: str, value: float, tags: dict | None = None) -> None: + m = self._gauge.get(metric) + if m is None: + m = Gauge(metric, metric) + self._gauge[metric] = m + m.set(value) + + def increase(self, metric: str, inc_value: int = 1, tags: dict | None = None) -> None: + m = self._counter.get(metric) + if m is None: + m = Counter(metric, metric) + self._counter[metric] = m + m.inc(inc_value) + + def timing(self, metric: str, value: float, tags: dict | None = None) -> None: + m = self._summary.get(metric) + if m is None: + m = Summary(metric, metric) + self._summary[metric] = m + m.observe(value) diff --git a/karapace/rapu.py b/karapace/rapu.py index 5978d3c30..6ab3b2419 100644 --- a/karapace/rapu.py +++ b/karapace/rapu.py @@ -10,7 +10,6 @@ from http import HTTPStatus from karapace.config import Config, create_server_ssl_context from karapace.metrics import Metrics -from karapace.statsd import StatsClient from karapace.utils import json_decode, json_encode from karapace.version import __version__ from typing import Callable, Dict, NoReturn, Optional, overload, Union @@ -169,10 +168,10 @@ def __init__( self.app_request_metric = f"{app_name}_request" self.app = self._create_aiohttp_application(config=config) self.log = logging.getLogger(self.app_name) - self.stats = StatsClient(config=config) + Metrics().setup(config) + self.stats = Metrics().stats_client self.app.on_cleanup.append(self.close_by_app) self.not_ready_handler = not_ready_handler - Metrics().setup(self.stats, config) def _create_aiohttp_application(self, *, config: Config) -> aiohttp.web.Application: return aiohttp.web.Application(client_max_size=config["http_request_max_size"]) @@ -188,7 +187,6 @@ async def close(self) -> None: created by the aiohttp library. """ Metrics().cleanup() - self.stats.close() @staticmethod def cors_and_server_headers_for_request(*, request, origin="*"): # pylint: disable=unused-argument diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 3dec4a887..e7dab0448 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -26,11 +26,11 @@ from karapace.in_memory_database import InMemoryDatabase from karapace.key_format import is_key_in_canonical_format, KeyFormatter, KeyMode from karapace.master_coordinator import MasterCoordinator +from karapace.metrics import Metrics from karapace.offset_watcher import OffsetWatcher from karapace.protobuf.schema import ProtobufSchema from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents -from karapace.statsd import StatsClient from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject from karapace.utils import json_decode, JSONDecodeError, KarapaceKafkaClient from threading import Event, Thread @@ -127,7 +127,8 @@ def __init__( self.topic_replication_factor = self.config["replication_factor"] self.consumer: KafkaConsumer | None = None self._offset_watcher = offset_watcher - self.stats = StatsClient(config=config) + Metrics().setup(config=config) + self.stats = Metrics().stats_client # Thread synchronization objects # - offset is used by the REST API to wait until this thread has diff --git a/karapace/statsd.py b/karapace/statsd.py index 8115f76f9..36fdc24a6 100644 --- a/karapace/statsd.py +++ b/karapace/statsd.py @@ -10,40 +10,32 @@ """ from __future__ import annotations -from contextlib import contextmanager +from karapace.base_stats import StatsClient from karapace.config import Config -from karapace.sentry import get_sentry_client -from typing import Any, Final, Iterator +from typing import Any, Final import datetime import logging import socket -import time STATSD_HOST: Final = "127.0.0.1" STATSD_PORT: Final = 8125 LOG = logging.getLogger(__name__) -class StatsClient: +class StatsdClient(StatsClient): def __init__( self, config: Config, host: str = STATSD_HOST, port: int = STATSD_PORT, ) -> None: + super().__init__(config) + self._tags: Final = config.get("tags", {}) _host = config.get("statsd_host") if "statsd_host" in config else host _port = config.get("statsd_port") if "statsd_port" in config else port self._dest_addr: Final = (_host, _port) self._socket: Final = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self._tags: Final = config.get("tags", {}) - self.sentry_client: Final = get_sentry_client(sentry_config=config.get("sentry", None)) - - @contextmanager - def timing_manager(self, metric: str, tags: dict | None = None) -> Iterator[None]: - start_time = time.monotonic() - yield - self.timing(metric, time.monotonic() - start_time, tags) def gauge(self, metric: str, value: float, tags: dict | None = None) -> None: self._send(metric, b"g", value, tags) @@ -54,16 +46,6 @@ def increase(self, metric: str, inc_value: int = 1, tags: dict | None = None) -> def timing(self, metric: str, value: float, tags: dict | None = None) -> None: self._send(metric, b"ms", value, tags) - def unexpected_exception(self, ex: Exception, where: str, tags: dict | None = None) -> None: - all_tags = { - "exception": ex.__class__.__name__, - "where": where, - } - all_tags.update(tags or {}) - self.increase("exception", tags=all_tags) - scope_args = {**(tags or {}), "where": where} - self.sentry_client.unexpected_exception(error=ex, where=where, tags=scope_args) - def _send(self, metric: str, metric_type: bytes, value: Any, tags: dict | None) -> None: if None in self._dest_addr: # stats sending is disabled @@ -95,4 +77,4 @@ def _send(self, metric: str, metric_type: bytes, value: Any, tags: dict | None) def close(self) -> None: self._socket.close() - self.sentry_client.close() + super().close() diff --git a/requirements/requirements-dev.txt b/requirements/requirements-dev.txt index fedc61ebf..f023fcc5b 100644 --- a/requirements/requirements-dev.txt +++ b/requirements/requirements-dev.txt @@ -14,11 +14,11 @@ aiosignal==1.3.1 # via # -r requirements.txt # aiohttp -anyio==3.7.1 +anyio==4.0.0 # via # -r requirements.txt # watchfiles -async-timeout==4.0.2 +async-timeout==4.0.3 # via # -r requirements.txt # aiohttp @@ -48,7 +48,7 @@ charset-normalizer==3.2.0 # -r requirements.txt # aiohttp # requests -click==8.1.6 +click==8.1.7 # via flask commonmark==0.9.1 # via @@ -56,7 +56,7 @@ commonmark==0.9.1 # rich configargparse==1.7 # via locust -exceptiongroup==1.1.2 +exceptiongroup==1.1.3 # via # -r requirements.txt # anyio @@ -66,9 +66,9 @@ execnet==2.0.2 # via pytest-xdist fancycompleter==0.9.1 # via pdbpp -filelock==3.12.2 +filelock==3.12.3 # via -r requirements-dev.in -flask==2.3.2 +flask==2.3.3 # via # flask-basicauth # flask-cors @@ -82,15 +82,15 @@ frozenlist==1.4.0 # -r requirements.txt # aiohttp # aiosignal -gevent==23.7.0 +gevent==23.9.0.post1 # via # geventhttpclient # locust -geventhttpclient==2.0.9 +geventhttpclient==2.0.10 # via locust greenlet==2.0.2 # via gevent -hypothesis==6.82.7 +hypothesis==6.84.1 # via -r requirements-dev.in idna==3.4 # via @@ -149,8 +149,10 @@ pkgutil-resolve-name==1.3.10 # via # -r requirements.txt # jsonschema -pluggy==1.2.0 +pluggy==1.3.0 # via pytest +prometheus-client==0.17.1 + # via -r requirements.txt protobuf==3.20.3 # via -r requirements.txt psutil==5.9.5 @@ -166,7 +168,7 @@ pygments==2.16.1 # rich pyrepl==0.9.0 # via fancycompleter -pytest==7.4.0 +pytest==7.4.1 # via # -r requirements-dev.in # pytest-timeout @@ -177,7 +179,7 @@ pytest-xdist[psutil]==3.3.1 # via -r requirements-dev.in python-dateutil==2.8.2 # via -r requirements.txt -pyzmq==25.1.0 +pyzmq==25.1.1 # via locust referencing==0.30.2 # via @@ -192,7 +194,7 @@ rich==12.6.0 # via -r requirements.txt roundrobin==0.0.4 # via locust -rpds-py==0.9.2 +rpds-py==0.10.2 # via # -r requirements.txt # jsonschema @@ -213,13 +215,14 @@ sniffio==1.3.0 # anyio sortedcontainers==2.4.0 # via hypothesis -tenacity==8.2.2 +tenacity==8.2.3 # via -r requirements.txt tomli==2.0.1 # via pytest typing-extensions==4.7.1 # via # -r requirements.txt + # filelock # locust # rich ujson==5.8.0 @@ -230,7 +233,7 @@ urllib3==2.0.4 # sentry-sdk watchfiles==0.20.0 # via -r requirements.txt -werkzeug==2.3.6 +werkzeug==2.3.7 # via # flask # locust diff --git a/requirements/requirements-typing.txt b/requirements/requirements-typing.txt index 8e58e4e53..f035ad784 100644 --- a/requirements/requirements-typing.txt +++ b/requirements/requirements-typing.txt @@ -8,7 +8,7 @@ certifi==2023.7.22 # via # -c requirements-dev.txt # sentry-sdk -mypy==1.4.1 +mypy==1.5.1 # via -r requirements-typing.in mypy-extensions==1.0.0 # via mypy @@ -24,6 +24,8 @@ types-cachetools==5.3.0.6 # via -r requirements-typing.in types-jsonschema==4.17.0.10 # via -r requirements-typing.in +types-psutil==5.9.5.16 + # via -r requirements-typing.in typing-extensions==4.7.1 # via # -c requirements-dev.txt diff --git a/requirements/requirements.in b/requirements/requirements.in index 8e6893e2b..f4e95c895 100644 --- a/requirements/requirements.in +++ b/requirements/requirements.in @@ -13,6 +13,7 @@ ujson<6 watchfiles<1 schedule psutil +prometheus-client xxhash~=3.3 rich~=12.6.0 cachetools==5.3.1 diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 7fe54d377..3651ae066 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -12,9 +12,9 @@ aiokafka==0.8.1 # via -r requirements.in aiosignal==1.3.1 # via aiohttp -anyio==3.7.1 +anyio==4.0.0 # via watchfiles -async-timeout==4.0.2 +async-timeout==4.0.3 # via # aiohttp # aiokafka @@ -31,7 +31,7 @@ charset-normalizer==3.2.0 # via aiohttp commonmark==0.9.1 # via rich -exceptiongroup==1.1.2 +exceptiongroup==1.1.3 # via anyio frozenlist==1.4.0 # via @@ -65,6 +65,8 @@ packaging==23.1 # via aiokafka pkgutil-resolve-name==1.3.10 # via jsonschema +prometheus-client==0.17.1 + # via -r requirements.in protobuf==3.20.3 # via -r requirements.in psutil==5.9.5 @@ -79,7 +81,7 @@ referencing==0.30.2 # jsonschema-specifications rich==12.6.0 # via -r requirements.in -rpds-py==0.9.2 +rpds-py==0.10.2 # via # jsonschema # referencing @@ -91,7 +93,7 @@ six==1.16.0 # python-dateutil sniffio==1.3.0 # via anyio -tenacity==8.2.2 +tenacity==8.2.3 # via -r requirements.in typing-extensions==4.7.1 # via