From 9cdcba75ebd12ea20d3a549dd80a8855dd6bd04a Mon Sep 17 00:00:00 2001 From: libretto Date: Sat, 2 Sep 2023 18:56:27 +0300 Subject: [PATCH 01/10] add prometheus support --- karapace.config.json | 7 +++- karapace/base_stats.py | 58 ++++++++++++++++++++++++++ karapace/config.py | 11 ++++- karapace/karapacemetrics.py | 22 ++++++++-- karapace/prometheus.py | 69 +++++++++++++++++++++++++++++++ karapace/rapu.py | 8 ++-- karapace/schema_reader.py | 4 +- karapace/statsd.py | 28 ++----------- requirements/requirements-dev.txt | 2 + requirements/requirements.in | 1 + requirements/requirements.txt | 2 + 11 files changed, 175 insertions(+), 37 deletions(-) create mode 100644 karapace/base_stats.py create mode 100644 karapace/prometheus.py diff --git a/karapace.config.json b/karapace.config.json index 3a4fe0a43..798ab2ffe 100644 --- a/karapace.config.json +++ b/karapace.config.json @@ -28,7 +28,12 @@ "topic_name": "_schemas", "protobuf_runtime_directory": "runtime", "session_timeout_ms": 10000, + "stats_service": "statsd", "metrics_extended": true, "statsd_host": "127.0.0.1", - "statsd_port": 8125 + "statsd_port": 8125, + "prometheus_host": "127.0.0.1", + "prometheus_port": 8005 + + } diff --git a/karapace/base_stats.py b/karapace/base_stats.py new file mode 100644 index 000000000..0c467abaf --- /dev/null +++ b/karapace/base_stats.py @@ -0,0 +1,58 @@ +""" +karapace - basestats + +Supports base class for statsd and prometheus protocols: + +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" +from __future__ import annotations + +from abc import ABC, abstractmethod +from contextlib import contextmanager +from karapace.config import Config +from karapace.sentry import get_sentry_client +from typing import Final, Iterator + +import time + + +class StatsClient(ABC): + @abstractmethod + def __init__( + self, + config: Config, + ) -> None: + self._tags: Final = config.get("tags", {}) + self.sentry_client: Final = get_sentry_client(sentry_config=config.get("sentry", None)) + + @contextmanager + def timing_manager(self, metric: str, tags: dict | None = None) -> Iterator[None]: + start_time = time.monotonic() + yield + self.timing(metric, time.monotonic() - start_time, tags) + + @abstractmethod + def gauge(self, metric: str, value: float, tags: dict | None = None) -> None: + pass + + @abstractmethod + def increase(self, metric: str, inc_value: int = 1, tags: dict | None = None) -> None: + pass + + @abstractmethod + def timing(self, metric: str, value: float, tags: dict | None = None) -> None: + pass + + def unexpected_exception(self, ex: Exception, where: str, tags: dict | None = None) -> None: + all_tags = { + "exception": ex.__class__.__name__, + "where": where, + } + all_tags.update(tags or {}) + self.increase("exception", tags=all_tags) + scope_args = {**(tags or {}), "where": where} + self.sentry_client.unexpected_exception(error=ex, where=where, tags=scope_args) + + def close(self) -> None: + self.sentry_client.close() diff --git a/karapace/config.py b/karapace/config.py index 4c93a2b0c..1fc7f1a59 100644 --- a/karapace/config.py +++ b/karapace/config.py @@ -75,12 +75,16 @@ class Config(TypedDict): karapace_registry: bool master_election_strategy: str protobuf_runtime_directory: str + stats_service: str metrics_extended: bool statsd_host: str statsd_port: int + prometheus_host: str | None + prometheus_port: int | None - sentry: NotRequired[Mapping[str, object]] - tags: NotRequired[Mapping[str, object]] + +sentry: NotRequired[Mapping[str, object]] +tags: NotRequired[Mapping[str, object]] class ConfigDefaults(Config, total=False): @@ -147,9 +151,12 @@ class ConfigDefaults(Config, total=False): "karapace_registry": False, "master_election_strategy": "lowest", "protobuf_runtime_directory": "runtime", + "stats_service": "statsd", "metrics_extended": True, "statsd_host": "127.0.0.1", "statsd_port": 8125, + "prometheus_host": "127.0.0.1", + "prometheus_port": 8005, } SECRET_CONFIG_OPTIONS = [SASL_PLAIN_PASSWORD] diff --git a/karapace/karapacemetrics.py b/karapace/karapacemetrics.py index 8f90436f9..5e3acaa89 100644 --- a/karapace/karapacemetrics.py +++ b/karapace/karapacemetrics.py @@ -10,8 +10,10 @@ """ from __future__ import annotations +from karapace.base_stats import StatsClient from karapace.config import Config -from karapace.statsd import StatsClient +from karapace.prometheus import PrometheusClient +from karapace.statsd import StatsdClient import os import psutil @@ -20,6 +22,10 @@ import time +class MetricsException(Exception): + pass + + class Singleton(type): _instance: Singleton | None = None @@ -33,14 +39,22 @@ def __call__(cls, *args: str, **kwargs: int) -> Singleton: class KarapaceMetrics(metaclass=Singleton): def __init__(self) -> None: self.active = False - self.stats_client: StatsClient | None = None + self.stats_client: StatsClient | None self.is_ready = False self.stop_event = threading.Event() self.worker_thread = threading.Thread(target=self.worker) self.lock = threading.Lock() def setup(self, stats_client: StatsClient, config: Config) -> None: - self.active = config.get("metrics_extended") or False + stats_service = config.get("stats_service") + if stats_service == "statsd": + self.stats_client = StatsdClient(config=config) + elif stats_service == "prometheus": + self.stats_client = PrometheusClient(config=config) + else: + raise MetricsException('Config variable "stats_service" is not defined') + + self.active = config.get("metrics_extended") if not self.active: return with self.lock: @@ -112,6 +126,8 @@ def worker(self) -> None: time.sleep(1) def cleanup(self) -> None: + if self.stats_client: + self.stats_client.close() if not self.active: return self.stop_event.set() diff --git a/karapace/prometheus.py b/karapace/prometheus.py new file mode 100644 index 000000000..2ecd78c37 --- /dev/null +++ b/karapace/prometheus.py @@ -0,0 +1,69 @@ +""" +karapace - prometheus + +Supports telegraf's statsd protocol extension for 'key=value' tags: + + https://github.com/influxdata/telegraf/tree/master/plugins/inputs/statsd + +Copyright (c) 2023 Aiven Ltd +See LICENSE for details +""" +from __future__ import annotations + +from karapace.base_stats import StatsClient +from karapace.config import Config +from prometheus_client import Counter, Gauge, start_http_server, Summary +from typing import Final + +import logging + +LOG = logging.getLogger(__name__) +HOST: Final = "127.0.0.1" +PORT: Final = 8005 + + +class PrometheusException(Exception): + pass + + +class PrometheusClient(StatsClient): + server_is_active = False + + def __init__(self, config: Config, host: str = HOST, port: int = PORT) -> None: + super().__init__(config) + + _host = config.get("prometheus_host") if "prometheus_host" in config else host + _port = config.get("prometheus_port") if "prometheus_port" in config else port + if _host is None: + raise PrometheusException("prometheus_host host is undefined") + if _port is None: + raise PrometheusException("prometheus_host port is undefined") + if not self.server_is_active: + start_http_server(_port, _host) + self.server_is_active = True + else: + raise PrometheusException("Double instance of Prometheus interface") + self._gauge: dict[str, Gauge] = dict() + self._summary: dict[str, Summary] = dict() + self._counter: dict[str, Counter] = dict() + + def gauge(self, metric: str, value: float, tags: dict | None = None) -> None: + m = self._gauge.get(metric) + if m is None: + m = Gauge(metric, metric) + self._gauge[metric] = m + m.set(value) + + def increase(self, metric: str, inc_value: int = 1, tags: dict | None = None) -> None: + m = self._counter.get(metric) + if m is None: + m = Counter(metric, metric) + self._counter[metric] = m + m.inc(inc_value) + + def timing(self, metric: str, value: float, tags: dict | None = None) -> None: + m = self._summary.get(metric) + if m is None: + m = Summary(metric, metric) + self._summary[metric] = m + m.observe(value) diff --git a/karapace/rapu.py b/karapace/rapu.py index c506445a4..8157f22cb 100644 --- a/karapace/rapu.py +++ b/karapace/rapu.py @@ -10,7 +10,6 @@ from http import HTTPStatus from karapace.config import Config, create_server_ssl_context from karapace.karapacemetrics import KarapaceMetrics -from karapace.statsd import StatsClient from karapace.utils import json_decode, json_encode from karapace.version import __version__ from typing import Callable, Dict, NoReturn, Optional, overload, Union @@ -19,7 +18,7 @@ import aiohttp.web import aiohttp.web_exceptions import asyncio -import cgi # pylint: disable=deprecated-module +import cgi import hashlib import logging import re @@ -169,10 +168,10 @@ def __init__( self.app_request_metric = f"{app_name}_request" self.app = self._create_aiohttp_application(config=config) self.log = logging.getLogger(self.app_name) - self.stats = StatsClient(config=config) + KarapaceMetrics().setup(config) + self.stats = KarapaceMetrics().stats_client self.app.on_cleanup.append(self.close_by_app) self.not_ready_handler = not_ready_handler - KarapaceMetrics().setup(self.stats, config) def _create_aiohttp_application(self, *, config: Config) -> aiohttp.web.Application: return aiohttp.web.Application(client_max_size=config["http_request_max_size"]) @@ -188,7 +187,6 @@ async def close(self) -> None: created by the aiohttp library. """ KarapaceMetrics().cleanup() - self.stats.close() @staticmethod def cors_and_server_headers_for_request(*, request, origin="*"): # pylint: disable=unused-argument diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 3dec4a887..0bc8175be 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -24,13 +24,13 @@ from karapace.dependency import Dependency from karapace.errors import InvalidReferences, InvalidSchema from karapace.in_memory_database import InMemoryDatabase +from karapace.karapacemetrics import KarapaceMetrics from karapace.key_format import is_key_in_canonical_format, KeyFormatter, KeyMode from karapace.master_coordinator import MasterCoordinator from karapace.offset_watcher import OffsetWatcher from karapace.protobuf.schema import ProtobufSchema from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents -from karapace.statsd import StatsClient from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject from karapace.utils import json_decode, JSONDecodeError, KarapaceKafkaClient from threading import Event, Thread @@ -127,7 +127,7 @@ def __init__( self.topic_replication_factor = self.config["replication_factor"] self.consumer: KafkaConsumer | None = None self._offset_watcher = offset_watcher - self.stats = StatsClient(config=config) + self.stats = KarapaceMetrics().stats_client # Thread synchronization objects # - offset is used by the REST API to wait until this thread has diff --git a/karapace/statsd.py b/karapace/statsd.py index 8115f76f9..b54bcd5bd 100644 --- a/karapace/statsd.py +++ b/karapace/statsd.py @@ -10,22 +10,20 @@ """ from __future__ import annotations -from contextlib import contextmanager +from karapace.base_stats import StatsClient from karapace.config import Config -from karapace.sentry import get_sentry_client -from typing import Any, Final, Iterator +from typing import Any, Final import datetime import logging import socket -import time STATSD_HOST: Final = "127.0.0.1" STATSD_PORT: Final = 8125 LOG = logging.getLogger(__name__) -class StatsClient: +class StatsdClient(StatsClient): def __init__( self, config: Config, @@ -36,14 +34,6 @@ def __init__( _port = config.get("statsd_port") if "statsd_port" in config else port self._dest_addr: Final = (_host, _port) self._socket: Final = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - self._tags: Final = config.get("tags", {}) - self.sentry_client: Final = get_sentry_client(sentry_config=config.get("sentry", None)) - - @contextmanager - def timing_manager(self, metric: str, tags: dict | None = None) -> Iterator[None]: - start_time = time.monotonic() - yield - self.timing(metric, time.monotonic() - start_time, tags) def gauge(self, metric: str, value: float, tags: dict | None = None) -> None: self._send(metric, b"g", value, tags) @@ -54,16 +44,6 @@ def increase(self, metric: str, inc_value: int = 1, tags: dict | None = None) -> def timing(self, metric: str, value: float, tags: dict | None = None) -> None: self._send(metric, b"ms", value, tags) - def unexpected_exception(self, ex: Exception, where: str, tags: dict | None = None) -> None: - all_tags = { - "exception": ex.__class__.__name__, - "where": where, - } - all_tags.update(tags or {}) - self.increase("exception", tags=all_tags) - scope_args = {**(tags or {}), "where": where} - self.sentry_client.unexpected_exception(error=ex, where=where, tags=scope_args) - def _send(self, metric: str, metric_type: bytes, value: Any, tags: dict | None) -> None: if None in self._dest_addr: # stats sending is disabled @@ -95,4 +75,4 @@ def _send(self, metric: str, metric_type: bytes, value: Any, tags: dict | None) def close(self) -> None: self._socket.close() - self.sentry_client.close() + super().close() diff --git a/requirements/requirements-dev.txt b/requirements/requirements-dev.txt index 1d3a947d2..864dc0f3b 100644 --- a/requirements/requirements-dev.txt +++ b/requirements/requirements-dev.txt @@ -151,6 +151,8 @@ pkgutil-resolve-name==1.3.10 # jsonschema pluggy==1.2.0 # via pytest +prometheus-client==0.17.0 + # via -r requirements.txt protobuf==3.20.3 # via -r requirements.txt psutil==5.9.5 diff --git a/requirements/requirements.in b/requirements/requirements.in index 8e6893e2b..f4e95c895 100644 --- a/requirements/requirements.in +++ b/requirements/requirements.in @@ -13,6 +13,7 @@ ujson<6 watchfiles<1 schedule psutil +prometheus-client xxhash~=3.3 rich~=12.6.0 cachetools==5.3.1 diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 912af3587..857ffb3d0 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -65,6 +65,8 @@ packaging==23.1 # via aiokafka pkgutil-resolve-name==1.3.10 # via jsonschema +prometheus-client==0.17.0 + # via -r requirements.in protobuf==3.20.3 # via -r requirements.in psutil==5.9.5 From 30a30ad53990e1689a989e41ce14a6383940a056 Mon Sep 17 00:00:00 2001 From: libretto Date: Sat, 2 Sep 2023 20:41:28 +0300 Subject: [PATCH 02/10] refactoring --- karapace/{karapacemetrics.py => metrics.py} | 2 +- karapace/rapu.py | 18 +++++++++--------- karapace/schema_reader.py | 4 ++-- karapace/schema_registry.py | 4 ++-- 4 files changed, 14 insertions(+), 14 deletions(-) rename karapace/{karapacemetrics.py => metrics.py} (98%) diff --git a/karapace/karapacemetrics.py b/karapace/metrics.py similarity index 98% rename from karapace/karapacemetrics.py rename to karapace/metrics.py index 5e3acaa89..c05465d9f 100644 --- a/karapace/karapacemetrics.py +++ b/karapace/metrics.py @@ -36,7 +36,7 @@ def __call__(cls, *args: str, **kwargs: int) -> Singleton: return cls._instance -class KarapaceMetrics(metaclass=Singleton): +class Metrics(metaclass=Singleton): def __init__(self) -> None: self.active = False self.stats_client: StatsClient | None diff --git a/karapace/rapu.py b/karapace/rapu.py index 8157f22cb..cecd19381 100644 --- a/karapace/rapu.py +++ b/karapace/rapu.py @@ -9,7 +9,7 @@ from accept_types import get_best_match from http import HTTPStatus from karapace.config import Config, create_server_ssl_context -from karapace.karapacemetrics import KarapaceMetrics +from karapace.metrics import Metrics from karapace.utils import json_decode, json_encode from karapace.version import __version__ from typing import Callable, Dict, NoReturn, Optional, overload, Union @@ -135,7 +135,7 @@ def __init__( self.headers["Content-Type"] = content_type super().__init__(f"HTTPResponse {status.value}") if not is_success(status): - KarapaceMetrics().error() + Metrics().error() def ok(self) -> bool: """True if resposne has a 2xx status_code""" @@ -168,8 +168,8 @@ def __init__( self.app_request_metric = f"{app_name}_request" self.app = self._create_aiohttp_application(config=config) self.log = logging.getLogger(self.app_name) - KarapaceMetrics().setup(config) - self.stats = KarapaceMetrics().stats_client + Metrics().setup(config) + self.stats = Metrics().stats_client self.app.on_cleanup.append(self.close_by_app) self.not_ready_handler = not_ready_handler @@ -186,7 +186,7 @@ async def close(self) -> None: set as hook because the awaitables have to run inside the event loop created by the aiohttp library. """ - KarapaceMetrics().cleanup() + Metrics().cleanup() @staticmethod def cors_and_server_headers_for_request(*, request, origin="*"): # pylint: disable=unused-argument @@ -284,9 +284,9 @@ async def _handle_request( body = await request.read() if body: - KarapaceMetrics().request(len(body)) + Metrics().request(len(body)) else: - KarapaceMetrics().request(0) + Metrics().request(0) if json_request: if not body: raise HTTPResponse(body="Missing request JSON body", status=HTTPStatus.BAD_REQUEST) @@ -403,8 +403,8 @@ async def _handle_request( self.log.exception("Unexpected error handling user request: %s %s", request.method, request.url) resp = aiohttp.web.Response(text="Internal Server Error", status=HTTPStatus.INTERNAL_SERVER_ERROR.value) finally: - KarapaceMetrics().response(resp.content_length) - KarapaceMetrics().latency((time.monotonic() - start_time) * 1000) + Metrics().response(resp.content_length) + Metrics().latency((time.monotonic() - start_time) * 1000) self.stats.timing( self.app_request_metric, time.monotonic() - start_time, diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 0bc8175be..8b060a37b 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -24,9 +24,9 @@ from karapace.dependency import Dependency from karapace.errors import InvalidReferences, InvalidSchema from karapace.in_memory_database import InMemoryDatabase -from karapace.karapacemetrics import KarapaceMetrics from karapace.key_format import is_key_in_canonical_format, KeyFormatter, KeyMode from karapace.master_coordinator import MasterCoordinator +from karapace.metrics import Metrics from karapace.offset_watcher import OffsetWatcher from karapace.protobuf.schema import ProtobufSchema from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema @@ -127,7 +127,7 @@ def __init__( self.topic_replication_factor = self.config["replication_factor"] self.consumer: KafkaConsumer | None = None self._offset_watcher = offset_watcher - self.stats = KarapaceMetrics().stats_client + self.stats = Metrics().stats_client # Thread synchronization objects # - offset is used by the REST API to wait until this thread has diff --git a/karapace/schema_registry.py b/karapace/schema_registry.py index 8f251d3d8..e29e167c7 100644 --- a/karapace/schema_registry.py +++ b/karapace/schema_registry.py @@ -22,10 +22,10 @@ VersionNotFoundException, ) from karapace.in_memory_database import InMemoryDatabase -from karapace.karapacemetrics import KarapaceMetrics from karapace.key_format import KeyFormatter from karapace.master_coordinator import MasterCoordinator from karapace.messaging import KarapaceProducer +from karapace.metrics import Metrics from karapace.offset_watcher import OffsetWatcher from karapace.schema_models import ParsedTypedSchema, SchemaType, SchemaVersion, TypedSchema, ValidatedTypedSchema from karapace.schema_reader import KafkaSchemaReader @@ -124,7 +124,7 @@ async def get_master(self, ignore_readiness: bool = False) -> tuple[bool, str | elif not ignore_readiness and self.schema_reader.ready is False: LOG.info("Schema reader isn't ready yet: %r", self.schema_reader.ready) else: - KarapaceMetrics().are_we_master(are_we_master) + Metrics().are_we_master(are_we_master) return are_we_master, master_url await asyncio.sleep(1.0) From 33f6cebe84badfe185ad6ebea372b49d924e240b Mon Sep 17 00:00:00 2001 From: libretto Date: Tue, 5 Sep 2023 15:22:08 +0300 Subject: [PATCH 03/10] fixup issues --- karapace/metrics.py | 8 ++------ karapace/statsd.py | 1 - 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/karapace/metrics.py b/karapace/metrics.py index 5bbcb6b29..a0aff7e0d 100644 --- a/karapace/metrics.py +++ b/karapace/metrics.py @@ -52,8 +52,9 @@ def setup(self, config: Config) -> None: elif stats_service == "prometheus": self.stats_client = PrometheusClient(config=config) else: + self.active = False raise MetricsException('Config variable "stats_service" is not defined') - if config.get("metrics_extended"): # for mypy check pass + if config.get("metrics_extended"): # for mypy check pass self.active = True if not self.active: return @@ -61,11 +62,6 @@ def setup(self, config: Config) -> None: if self.is_ready: return self.is_ready = True - if not self.stats_client: - self.stats_client = stats_client - else: - self.active = False - return schedule.every(10).seconds.do(self.connections) self.worker_thread.start() diff --git a/karapace/statsd.py b/karapace/statsd.py index 4c1cefe50..fcd3c8572 100644 --- a/karapace/statsd.py +++ b/karapace/statsd.py @@ -30,7 +30,6 @@ def __init__( host: str = STATSD_HOST, port: int = STATSD_PORT, ) -> None: - super().__init__(config) self._tags: Final[dict] = config.get("tags", {}) _host = config.get("statsd_host") if "statsd_host" in config else host From 34fa7dcef5a363f21d50e12432fb8430488b7637 Mon Sep 17 00:00:00 2001 From: libretto Date: Tue, 5 Sep 2023 19:58:37 +0300 Subject: [PATCH 04/10] fixup --- karapace/metrics.py | 22 +++++++++------------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/karapace/metrics.py b/karapace/metrics.py index a0aff7e0d..03ee39718 100644 --- a/karapace/metrics.py +++ b/karapace/metrics.py @@ -38,7 +38,6 @@ def __call__(cls, *args: str, **kwargs: int) -> Singleton: class Metrics(metaclass=Singleton): def __init__(self) -> None: - self.active = False self.stats_client: StatsClient self.is_ready = False self.stop_event = threading.Event() @@ -47,17 +46,14 @@ def __init__(self) -> None: def setup(self, config: Config) -> None: stats_service = config.get("stats_service") + if not config.get("metrics_extended"): + return if stats_service == "statsd": self.stats_client = StatsdClient(config=config) elif stats_service == "prometheus": self.stats_client = PrometheusClient(config=config) else: - self.active = False raise MetricsException('Config variable "stats_service" is not defined') - if config.get("metrics_extended"): # for mypy check pass - self.active = True - if not self.active: - return with self.lock: if self.is_ready: return @@ -67,42 +63,42 @@ def setup(self, config: Config) -> None: self.worker_thread.start() def request(self, size: int) -> None: - if not self.active: + if not self.is_ready or self.stats_client is None: return if not isinstance(self.stats_client, StatsClient): raise RuntimeError("no StatsClient available") self.stats_client.gauge("request-size", size) def response(self, size: int) -> None: - if not self.active: + if not self.is_ready or self.stats_client is None: return if not isinstance(self.stats_client, StatsClient): raise RuntimeError("no StatsClient available") self.stats_client.gauge("response-size", size) def are_we_master(self, is_master: bool) -> None: - if not self.active: + if not self.is_ready or self.stats_client is None: return if not isinstance(self.stats_client, StatsClient): raise RuntimeError("no StatsClient available") self.stats_client.gauge("master-slave-role", int(is_master)) def latency(self, latency_ms: float) -> None: - if not self.active: + if not self.is_ready or self.stats_client is None: return if not isinstance(self.stats_client, StatsClient): raise RuntimeError("no StatsClient available") self.stats_client.timing("latency_ms", latency_ms) def error(self) -> None: - if not self.active: + if not self.is_ready or self.stats_client is None: return if not isinstance(self.stats_client, StatsClient): raise RuntimeError("no StatsClient available") self.stats_client.increase("error_total", 1) def connections(self) -> None: - if not self.active: + if not self.is_ready or self.stats_client is None: return if not isinstance(self.stats_client, StatsClient): raise RuntimeError("no StatsClient available") @@ -124,7 +120,7 @@ def worker(self) -> None: def cleanup(self) -> None: if self.stats_client: self.stats_client.close() - if not self.active: + if not self.is_ready: return self.stop_event.set() if self.worker_thread.is_alive(): From 46605f4a24d09825b6a6cfb35ef05041ad10bccf Mon Sep 17 00:00:00 2001 From: libretto Date: Tue, 5 Sep 2023 20:50:08 +0300 Subject: [PATCH 05/10] fixup --- karapace.config.json | 3 +-- karapace/config.py | 5 ++--- karapace/statsd.py | 2 +- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/karapace.config.json b/karapace.config.json index 798ab2ffe..f07af6710 100644 --- a/karapace.config.json +++ b/karapace.config.json @@ -33,7 +33,6 @@ "statsd_host": "127.0.0.1", "statsd_port": 8125, "prometheus_host": "127.0.0.1", - "prometheus_port": 8005 - + "prometheus_port": 8005, } diff --git a/karapace/config.py b/karapace/config.py index 1fc7f1a59..b3016eb07 100644 --- a/karapace/config.py +++ b/karapace/config.py @@ -82,9 +82,8 @@ class Config(TypedDict): prometheus_host: str | None prometheus_port: int | None - -sentry: NotRequired[Mapping[str, object]] -tags: NotRequired[Mapping[str, object]] + sentry: NotRequired[Mapping[str, object]] + tags: NotRequired[Mapping[str, object]] class ConfigDefaults(Config, total=False): diff --git a/karapace/statsd.py b/karapace/statsd.py index fcd3c8572..36fdc24a6 100644 --- a/karapace/statsd.py +++ b/karapace/statsd.py @@ -31,7 +31,7 @@ def __init__( port: int = STATSD_PORT, ) -> None: super().__init__(config) - self._tags: Final[dict] = config.get("tags", {}) + self._tags: Final = config.get("tags", {}) _host = config.get("statsd_host") if "statsd_host" in config else host _port = config.get("statsd_port") if "statsd_port" in config else port self._dest_addr: Final = (_host, _port) From f670782e4c50f360dfd64e0737dda7cce78d1c94 Mon Sep 17 00:00:00 2001 From: libretto Date: Tue, 5 Sep 2023 23:39:15 +0300 Subject: [PATCH 06/10] fixup --- karapace/rapu.py | 2 +- requirements/requirements-dev.txt | 33 ++++++++++++++-------------- requirements/requirements-typing.txt | 4 +++- requirements/requirements.txt | 12 +++++----- 4 files changed, 27 insertions(+), 24 deletions(-) diff --git a/karapace/rapu.py b/karapace/rapu.py index cecd19381..ed8bb43c8 100644 --- a/karapace/rapu.py +++ b/karapace/rapu.py @@ -18,7 +18,7 @@ import aiohttp.web import aiohttp.web_exceptions import asyncio -import cgi +import cgi # pylint: disable=deprecated-module import hashlib import logging import re diff --git a/requirements/requirements-dev.txt b/requirements/requirements-dev.txt index 1461090b1..f023fcc5b 100644 --- a/requirements/requirements-dev.txt +++ b/requirements/requirements-dev.txt @@ -14,11 +14,11 @@ aiosignal==1.3.1 # via # -r requirements.txt # aiohttp -anyio==3.7.1 +anyio==4.0.0 # via # -r requirements.txt # watchfiles -async-timeout==4.0.2 +async-timeout==4.0.3 # via # -r requirements.txt # aiohttp @@ -48,7 +48,7 @@ charset-normalizer==3.2.0 # -r requirements.txt # aiohttp # requests -click==8.1.6 +click==8.1.7 # via flask commonmark==0.9.1 # via @@ -56,7 +56,7 @@ commonmark==0.9.1 # rich configargparse==1.7 # via locust -exceptiongroup==1.1.2 +exceptiongroup==1.1.3 # via # -r requirements.txt # anyio @@ -66,9 +66,9 @@ execnet==2.0.2 # via pytest-xdist fancycompleter==0.9.1 # via pdbpp -filelock==3.12.2 +filelock==3.12.3 # via -r requirements-dev.in -flask==2.3.2 +flask==2.3.3 # via # flask-basicauth # flask-cors @@ -82,15 +82,15 @@ frozenlist==1.4.0 # -r requirements.txt # aiohttp # aiosignal -gevent==23.7.0 +gevent==23.9.0.post1 # via # geventhttpclient # locust -geventhttpclient==2.0.9 +geventhttpclient==2.0.10 # via locust greenlet==2.0.2 # via gevent -hypothesis==6.82.7 +hypothesis==6.84.1 # via -r requirements-dev.in idna==3.4 # via @@ -149,9 +149,9 @@ pkgutil-resolve-name==1.3.10 # via # -r requirements.txt # jsonschema -pluggy==1.2.0 +pluggy==1.3.0 # via pytest -prometheus-client==0.17.0 +prometheus-client==0.17.1 # via -r requirements.txt protobuf==3.20.3 # via -r requirements.txt @@ -168,7 +168,7 @@ pygments==2.16.1 # rich pyrepl==0.9.0 # via fancycompleter -pytest==7.4.0 +pytest==7.4.1 # via # -r requirements-dev.in # pytest-timeout @@ -179,7 +179,7 @@ pytest-xdist[psutil]==3.3.1 # via -r requirements-dev.in python-dateutil==2.8.2 # via -r requirements.txt -pyzmq==25.1.0 +pyzmq==25.1.1 # via locust referencing==0.30.2 # via @@ -194,7 +194,7 @@ rich==12.6.0 # via -r requirements.txt roundrobin==0.0.4 # via locust -rpds-py==0.9.2 +rpds-py==0.10.2 # via # -r requirements.txt # jsonschema @@ -215,13 +215,14 @@ sniffio==1.3.0 # anyio sortedcontainers==2.4.0 # via hypothesis -tenacity==8.2.2 +tenacity==8.2.3 # via -r requirements.txt tomli==2.0.1 # via pytest typing-extensions==4.7.1 # via # -r requirements.txt + # filelock # locust # rich ujson==5.8.0 @@ -232,7 +233,7 @@ urllib3==2.0.4 # sentry-sdk watchfiles==0.20.0 # via -r requirements.txt -werkzeug==2.3.6 +werkzeug==2.3.7 # via # flask # locust diff --git a/requirements/requirements-typing.txt b/requirements/requirements-typing.txt index 8e58e4e53..f035ad784 100644 --- a/requirements/requirements-typing.txt +++ b/requirements/requirements-typing.txt @@ -8,7 +8,7 @@ certifi==2023.7.22 # via # -c requirements-dev.txt # sentry-sdk -mypy==1.4.1 +mypy==1.5.1 # via -r requirements-typing.in mypy-extensions==1.0.0 # via mypy @@ -24,6 +24,8 @@ types-cachetools==5.3.0.6 # via -r requirements-typing.in types-jsonschema==4.17.0.10 # via -r requirements-typing.in +types-psutil==5.9.5.16 + # via -r requirements-typing.in typing-extensions==4.7.1 # via # -c requirements-dev.txt diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 34a4b11d0..3651ae066 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -12,9 +12,9 @@ aiokafka==0.8.1 # via -r requirements.in aiosignal==1.3.1 # via aiohttp -anyio==3.7.1 +anyio==4.0.0 # via watchfiles -async-timeout==4.0.2 +async-timeout==4.0.3 # via # aiohttp # aiokafka @@ -31,7 +31,7 @@ charset-normalizer==3.2.0 # via aiohttp commonmark==0.9.1 # via rich -exceptiongroup==1.1.2 +exceptiongroup==1.1.3 # via anyio frozenlist==1.4.0 # via @@ -65,7 +65,7 @@ packaging==23.1 # via aiokafka pkgutil-resolve-name==1.3.10 # via jsonschema -prometheus-client==0.17.0 +prometheus-client==0.17.1 # via -r requirements.in protobuf==3.20.3 # via -r requirements.in @@ -81,7 +81,7 @@ referencing==0.30.2 # jsonschema-specifications rich==12.6.0 # via -r requirements.in -rpds-py==0.9.2 +rpds-py==0.10.2 # via # jsonschema # referencing @@ -93,7 +93,7 @@ six==1.16.0 # python-dateutil sniffio==1.3.0 # via anyio -tenacity==8.2.2 +tenacity==8.2.3 # via -r requirements.in typing-extensions==4.7.1 # via From ff8bf58d42b3d8e27c9e73a5d3f3615ba4e23a6e Mon Sep 17 00:00:00 2001 From: libretto Date: Wed, 6 Sep 2023 13:52:12 +0300 Subject: [PATCH 07/10] fixup --- karapace/rapu.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/karapace/rapu.py b/karapace/rapu.py index ed8bb43c8..6ab3b2419 100644 --- a/karapace/rapu.py +++ b/karapace/rapu.py @@ -18,7 +18,7 @@ import aiohttp.web import aiohttp.web_exceptions import asyncio -import cgi # pylint: disable=deprecated-module +import cgi # pylint: disable=deprecated-module import hashlib import logging import re From a87a9892d629a44646dadba98f866310c1db2612 Mon Sep 17 00:00:00 2001 From: libretto Date: Wed, 6 Sep 2023 22:54:42 +0300 Subject: [PATCH 08/10] fixup --- karapace/metrics.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/karapace/metrics.py b/karapace/metrics.py index 03ee39718..c1c35f3e2 100644 --- a/karapace/metrics.py +++ b/karapace/metrics.py @@ -37,8 +37,9 @@ def __call__(cls, *args: str, **kwargs: int) -> Singleton: class Metrics(metaclass=Singleton): + stats_client: StatsClient + def __init__(self) -> None: - self.stats_client: StatsClient self.is_ready = False self.stop_event = threading.Event() self.worker_thread = threading.Thread(target=self.worker) From ec8473b8a08593298e12474c18e18cb50fe4e9fe Mon Sep 17 00:00:00 2001 From: libretto Date: Wed, 6 Sep 2023 23:23:34 +0300 Subject: [PATCH 09/10] fixup --- karapace/metrics.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/karapace/metrics.py b/karapace/metrics.py index c1c35f3e2..ab95d492c 100644 --- a/karapace/metrics.py +++ b/karapace/metrics.py @@ -38,8 +38,10 @@ def __call__(cls, *args: str, **kwargs: int) -> Singleton: class Metrics(metaclass=Singleton): stats_client: StatsClient - - def __init__(self) -> None: + + def __init__( + self, + ) -> None: self.is_ready = False self.stop_event = threading.Event() self.worker_thread = threading.Thread(target=self.worker) From 3ee04dcc2a4b21d200c8c8c81276c550bd015e75 Mon Sep 17 00:00:00 2001 From: libretto Date: Thu, 7 Sep 2023 01:26:52 +0300 Subject: [PATCH 10/10] fixup --- karapace/metrics.py | 24 ++++++++++++------------ karapace/schema_reader.py | 1 + 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/karapace/metrics.py b/karapace/metrics.py index ab95d492c..0cfaba056 100644 --- a/karapace/metrics.py +++ b/karapace/metrics.py @@ -48,22 +48,22 @@ def __init__( self.lock = threading.Lock() def setup(self, config: Config) -> None: - stats_service = config.get("stats_service") - if not config.get("metrics_extended"): - return - if stats_service == "statsd": - self.stats_client = StatsdClient(config=config) - elif stats_service == "prometheus": - self.stats_client = PrometheusClient(config=config) - else: - raise MetricsException('Config variable "stats_service" is not defined') with self.lock: if self.is_ready: return - self.is_ready = True - schedule.every(10).seconds.do(self.connections) - self.worker_thread.start() + stats_service = config.get("stats_service") + if not config.get("metrics_extended"): + return + if stats_service == "statsd": + self.stats_client = StatsdClient(config=config) + elif stats_service == "prometheus": + self.stats_client = PrometheusClient(config=config) + else: + raise MetricsException('Config variable "stats_service" is not defined') + self.is_ready = True + schedule.every(10).seconds.do(self.connections) + self.worker_thread.start() def request(self, size: int) -> None: if not self.is_ready or self.stats_client is None: diff --git a/karapace/schema_reader.py b/karapace/schema_reader.py index 8b060a37b..e7dab0448 100644 --- a/karapace/schema_reader.py +++ b/karapace/schema_reader.py @@ -127,6 +127,7 @@ def __init__( self.topic_replication_factor = self.config["replication_factor"] self.consumer: KafkaConsumer | None = None self._offset_watcher = offset_watcher + Metrics().setup(config=config) self.stats = Metrics().stats_client # Thread synchronization objects