Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prometheus support #30

Open
wants to merge 12 commits into
base: karapace-metrics
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion karapace.config.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,11 @@
"topic_name": "_schemas",
"protobuf_runtime_directory": "runtime",
"session_timeout_ms": 10000,
"stats_service": "statsd",
"metrics_extended": true,
"statsd_host": "127.0.0.1",
"statsd_port": 8125
"statsd_port": 8125,
"prometheus_host": "127.0.0.1",
"prometheus_port": 8005,

}
57 changes: 57 additions & 0 deletions karapace/base_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
"""
karapace - basestats

Supports base class for statsd and prometheus protocols:

Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from __future__ import annotations

from abc import ABC, abstractmethod
from contextlib import contextmanager
from karapace.config import Config
from karapace.sentry import get_sentry_client
from typing import Final, Iterator

import time


class StatsClient(ABC):
@abstractmethod
def __init__(
self,
config: Config,
) -> None:
self.sentry_client: Final = get_sentry_client(sentry_config=config.get("sentry", None))

@contextmanager
def timing_manager(self, metric: str, tags: dict | None = None) -> Iterator[None]:
start_time = time.monotonic()
yield
self.timing(metric, time.monotonic() - start_time, tags)

@abstractmethod
def gauge(self, metric: str, value: float, tags: dict | None = None) -> None:
pass

@abstractmethod
def increase(self, metric: str, inc_value: int = 1, tags: dict | None = None) -> None:
pass

@abstractmethod
def timing(self, metric: str, value: float, tags: dict | None = None) -> None:
pass

def unexpected_exception(self, ex: Exception, where: str, tags: dict | None = None) -> None:
all_tags = {
"exception": ex.__class__.__name__,
"where": where,
}
all_tags.update(tags or {})
self.increase("exception", tags=all_tags)
scope_args = {**(tags or {}), "where": where}
self.sentry_client.unexpected_exception(error=ex, where=where, tags=scope_args)

def close(self) -> None:
self.sentry_client.close()
6 changes: 6 additions & 0 deletions karapace/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,12 @@ class Config(TypedDict):
karapace_registry: bool
master_election_strategy: str
protobuf_runtime_directory: str
stats_service: str
metrics_extended: bool
statsd_host: str
statsd_port: int
prometheus_host: str | None
prometheus_port: int | None

sentry: NotRequired[Mapping[str, object]]
tags: NotRequired[Mapping[str, object]]
Expand Down Expand Up @@ -147,9 +150,12 @@ class ConfigDefaults(Config, total=False):
"karapace_registry": False,
"master_election_strategy": "lowest",
"protobuf_runtime_directory": "runtime",
"stats_service": "statsd",
"metrics_extended": True,
"statsd_host": "127.0.0.1",
"statsd_port": 8125,
"prometheus_host": "127.0.0.1",
"prometheus_port": 8005,
}
SECRET_CONFIG_OPTIONS = [SASL_PLAIN_PASSWORD]

Expand Down
57 changes: 34 additions & 23 deletions karapace/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
"""
from __future__ import annotations

from karapace.base_stats import StatsClient
from karapace.config import Config
from karapace.statsd import StatsClient
from karapace.prometheus import PrometheusClient
from karapace.statsd import StatsdClient

import os
import psutil
Expand All @@ -20,6 +22,10 @@
import time


class MetricsException(Exception):
pass


class Singleton(type):
_instance: Singleton | None = None

Expand All @@ -31,68 +37,71 @@ def __call__(cls, *args: str, **kwargs: int) -> Singleton:


class Metrics(metaclass=Singleton):
def __init__(self) -> None:
self.active = False
self.stats_client: StatsClient | None = None
stats_client: StatsClient

def __init__(
self,
) -> None:
self.is_ready = False
self.stop_event = threading.Event()
self.worker_thread = threading.Thread(target=self.worker)
self.lock = threading.Lock()

def setup(self, stats_client: StatsClient, config: Config) -> None:
self.active = config.get("metrics_extended") or False
if not self.active:
return
def setup(self, config: Config) -> None:
with self.lock:
if self.is_ready:
return
self.is_ready = True
if not self.stats_client:
self.stats_client = stats_client
else:
self.active = False
return

schedule.every(10).seconds.do(self.connections)
self.worker_thread.start()
stats_service = config.get("stats_service")
if not config.get("metrics_extended"):
return
if stats_service == "statsd":
self.stats_client = StatsdClient(config=config)
elif stats_service == "prometheus":
self.stats_client = PrometheusClient(config=config)
else:
raise MetricsException('Config variable "stats_service" is not defined')
self.is_ready = True
schedule.every(10).seconds.do(self.connections)
self.worker_thread.start()

def request(self, size: int) -> None:
if not self.active:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.gauge("request-size", size)

def response(self, size: int) -> None:
if not self.active:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.gauge("response-size", size)

def are_we_master(self, is_master: bool) -> None:
if not self.active:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.gauge("master-slave-role", int(is_master))

def latency(self, latency_ms: float) -> None:
if not self.active:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.timing("latency_ms", latency_ms)

def error(self) -> None:
if not self.active:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
self.stats_client.increase("error_total", 1)

def connections(self) -> None:
if not self.active:
if not self.is_ready or self.stats_client is None:
return
if not isinstance(self.stats_client, StatsClient):
raise RuntimeError("no StatsClient available")
Expand All @@ -112,7 +121,9 @@ def worker(self) -> None:
time.sleep(1)

def cleanup(self) -> None:
if not self.active:
if self.stats_client:
self.stats_client.close()
if not self.is_ready:
return
self.stop_event.set()
if self.worker_thread.is_alive():
Expand Down
69 changes: 69 additions & 0 deletions karapace/prometheus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""
karapace - prometheus

Supports telegraf's statsd protocol extension for 'key=value' tags:

https://github.com/influxdata/telegraf/tree/master/plugins/inputs/statsd

Copyright (c) 2023 Aiven Ltd
See LICENSE for details
"""
from __future__ import annotations

from karapace.base_stats import StatsClient
from karapace.config import Config
from prometheus_client import Counter, Gauge, start_http_server, Summary
from typing import Final

import logging

LOG = logging.getLogger(__name__)
HOST: Final = "127.0.0.1"
PORT: Final = 8005


class PrometheusException(Exception):
pass


class PrometheusClient(StatsClient):
server_is_active = False

def __init__(self, config: Config, host: str = HOST, port: int = PORT) -> None:
super().__init__(config)

_host = config.get("prometheus_host") if "prometheus_host" in config else host
_port = config.get("prometheus_port") if "prometheus_port" in config else port
if _host is None:
raise PrometheusException("prometheus_host host is undefined")
if _port is None:
raise PrometheusException("prometheus_host port is undefined")
if not self.server_is_active:
start_http_server(_port, _host)
self.server_is_active = True
else:
raise PrometheusException("Double instance of Prometheus interface")
self._gauge: dict[str, Gauge] = dict()
self._summary: dict[str, Summary] = dict()
self._counter: dict[str, Counter] = dict()

def gauge(self, metric: str, value: float, tags: dict | None = None) -> None:
m = self._gauge.get(metric)
if m is None:
m = Gauge(metric, metric)
self._gauge[metric] = m
m.set(value)

def increase(self, metric: str, inc_value: int = 1, tags: dict | None = None) -> None:
m = self._counter.get(metric)
if m is None:
m = Counter(metric, metric)
self._counter[metric] = m
m.inc(inc_value)

def timing(self, metric: str, value: float, tags: dict | None = None) -> None:
m = self._summary.get(metric)
if m is None:
m = Summary(metric, metric)
self._summary[metric] = m
m.observe(value)
6 changes: 2 additions & 4 deletions karapace/rapu.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from http import HTTPStatus
from karapace.config import Config, create_server_ssl_context
from karapace.metrics import Metrics
from karapace.statsd import StatsClient
from karapace.utils import json_decode, json_encode
from karapace.version import __version__
from typing import Callable, Dict, NoReturn, Optional, overload, Union
Expand Down Expand Up @@ -169,10 +168,10 @@ def __init__(
self.app_request_metric = f"{app_name}_request"
self.app = self._create_aiohttp_application(config=config)
self.log = logging.getLogger(self.app_name)
self.stats = StatsClient(config=config)
Metrics().setup(config)
self.stats = Metrics().stats_client
self.app.on_cleanup.append(self.close_by_app)
self.not_ready_handler = not_ready_handler
Metrics().setup(self.stats, config)

def _create_aiohttp_application(self, *, config: Config) -> aiohttp.web.Application:
return aiohttp.web.Application(client_max_size=config["http_request_max_size"])
Expand All @@ -188,7 +187,6 @@ async def close(self) -> None:
created by the aiohttp library.
"""
Metrics().cleanup()
self.stats.close()

@staticmethod
def cors_and_server_headers_for_request(*, request, origin="*"): # pylint: disable=unused-argument
Expand Down
5 changes: 3 additions & 2 deletions karapace/schema_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@
from karapace.in_memory_database import InMemoryDatabase
from karapace.key_format import is_key_in_canonical_format, KeyFormatter, KeyMode
from karapace.master_coordinator import MasterCoordinator
from karapace.metrics import Metrics
from karapace.offset_watcher import OffsetWatcher
from karapace.protobuf.schema import ProtobufSchema
from karapace.schema_models import parse_protobuf_schema_definition, SchemaType, TypedSchema, ValidatedTypedSchema
from karapace.schema_references import LatestVersionReference, Reference, reference_from_mapping, Referents
from karapace.statsd import StatsClient
from karapace.typing import JsonObject, ResolvedVersion, SchemaId, Subject
from karapace.utils import json_decode, JSONDecodeError, KarapaceKafkaClient
from threading import Event, Thread
Expand Down Expand Up @@ -127,7 +127,8 @@ def __init__(
self.topic_replication_factor = self.config["replication_factor"]
self.consumer: KafkaConsumer | None = None
self._offset_watcher = offset_watcher
self.stats = StatsClient(config=config)
Metrics().setup(config=config)
self.stats = Metrics().stats_client

# Thread synchronization objects
# - offset is used by the REST API to wait until this thread has
Expand Down
Loading