diff --git a/README.md b/README.md index 50e7e0b..27dc99c 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,23 @@ consumer.metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporte consumer.auto.include.jmx.reporter=false ``` +When setting configurations for the Prometheus metrics reporter, they also need to be set with the `admin.`, `producer.` and `consumer.`. +For example, to set the `listener` to `http://:8081`: +```properties +metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter +prometheus.metrics.reporter.listener=http://:8081 +auto.include.jmx.reporter=false +admin.metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter +admin.prometheus.metrics.reporter.listener=http://:8081 +admin.auto.include.jmx.reporter=false +producer.metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter +producer.prometheus.metrics.reporter.listener=http://:8081 +producer.auto.include.jmx.reporter=false +consumer.metric.reporters=io.strimzi.kafka.metrics.KafkaPrometheusMetricsReporter +consumer.prometheus.metrics.reporter.listener=http://:8081 +consumer.auto.include.jmx.reporter=false +``` + ## Accessing Metrics Metrics are exposed on the configured listener on the `GET /metrics` endpoint. For example, by default this is `http://localhost:8080/metrics`. diff --git a/src/main/java/io/strimzi/kafka/metrics/KafkaMetricsCollector.java b/src/main/java/io/strimzi/kafka/metrics/KafkaMetricsCollector.java deleted file mode 100644 index 84f73dd..0000000 --- a/src/main/java/io/strimzi/kafka/metrics/KafkaMetricsCollector.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright Strimzi authors. - * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). - */ -package io.strimzi.kafka.metrics; - -import io.prometheus.metrics.model.registry.MultiCollector; -import io.prometheus.metrics.model.snapshots.GaugeSnapshot; -import io.prometheus.metrics.model.snapshots.InfoSnapshot; -import io.prometheus.metrics.model.snapshots.Labels; -import io.prometheus.metrics.model.snapshots.MetricSnapshot; -import io.prometheus.metrics.model.snapshots.MetricSnapshots; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.metrics.KafkaMetric; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; - -/** - * Prometheus Collector to store and export metrics retrieved by {@link KafkaPrometheusMetricsReporter}. - */ -public class KafkaMetricsCollector implements MultiCollector { - - private static final Logger LOG = LoggerFactory.getLogger(KafkaMetricsCollector.class); - private final Map metrics; - - /** - * Constructs a new KafkaMetricsCollector with provided configuration. - */ - public KafkaMetricsCollector() { - this.metrics = new ConcurrentHashMap<>(); - } - - /** - * This method is used to add a Kafka metric to the collection for reporting. - * The metric is wrapped in a MetricWrapper object which contains additional information - * such as the prometheus name of the metric. - * - * @param name The name of the metric in the Kafka system. This is used as the key in the metrics map. - * @param metric The Kafka metric to add. This is wrapped in a MetricWrapper object. - */ - public void addMetric(MetricName name, MetricWrapper metric) { - metrics.put(name, metric); - } - - /** - * Removes a Kafka metric from collection. - * - * @param name The Kafka metric to remove. - */ - public void removeMetric(MetricName name) { - metrics.remove(name); - } - - /** - * Called when the Prometheus server scrapes metrics. - * @return MetricSnapshots object that contains snapshots of metrics - */ - @Override - public MetricSnapshots collect() { - Map gaugeBuilders = new HashMap<>(); - Map infoBuilders = new HashMap<>(); - - for (Map.Entry entry : metrics.entrySet()) { - MetricWrapper metricWrapper = entry.getValue(); - String prometheusMetricName = metricWrapper.prometheusName(); - Object metricValue = ((KafkaMetric) metricWrapper.metric()).metricValue(); - Labels labels = metricWrapper.labels(); - LOG.debug("Collecting metric {} with the following labels: {}", prometheusMetricName, labels); - - if (metricValue instanceof Number) { - double value = ((Number) metricValue).doubleValue(); - GaugeSnapshot.Builder builder = gaugeBuilders.computeIfAbsent(prometheusMetricName, k -> GaugeSnapshot.builder().name(prometheusMetricName)); - builder.dataPoint(DataPointSnapshotBuilder.gaugeDataPoint(labels, value)); - } else { - InfoSnapshot.Builder builder = infoBuilders.computeIfAbsent(prometheusMetricName, k -> InfoSnapshot.builder().name(prometheusMetricName)); - builder.dataPoint(DataPointSnapshotBuilder.infoDataPoint(labels, metricValue, metricWrapper.attribute())); - } - } - List snapshots = new ArrayList<>(); - for (GaugeSnapshot.Builder builder : gaugeBuilders.values()) { - snapshots.add(builder.build()); - } - for (InfoSnapshot.Builder builder : infoBuilders.values()) { - snapshots.add(builder.build()); - } - return new MetricSnapshots(snapshots); - } -} diff --git a/src/main/java/io/strimzi/kafka/metrics/KafkaPrometheusMetricsReporter.java b/src/main/java/io/strimzi/kafka/metrics/KafkaPrometheusMetricsReporter.java index f6a1134..7190911 100644 --- a/src/main/java/io/strimzi/kafka/metrics/KafkaPrometheusMetricsReporter.java +++ b/src/main/java/io/strimzi/kafka/metrics/KafkaPrometheusMetricsReporter.java @@ -5,7 +5,6 @@ package io.strimzi.kafka.metrics; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import io.prometheus.metrics.instrumentation.jvm.JvmMetrics; import io.prometheus.metrics.model.registry.PrometheusRegistry; import io.prometheus.metrics.model.snapshots.PrometheusNaming; import org.apache.kafka.common.config.ConfigException; @@ -23,15 +22,14 @@ /** * MetricsReporter implementation that expose Kafka metrics in the Prometheus format. - * * This can be used by Kafka brokers and clients. */ public class KafkaPrometheusMetricsReporter implements MetricsReporter { private static final Logger LOG = LoggerFactory.getLogger(KafkaPrometheusMetricsReporter.class); + private final PrometheusRegistry registry; - @SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the configure method - private KafkaMetricsCollector collector; + private final PrometheusCollector collector; @SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the configure method private PrometheusMetricsReporterConfig config; @SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the configure method @@ -44,26 +42,24 @@ public class KafkaPrometheusMetricsReporter implements MetricsReporter { */ public KafkaPrometheusMetricsReporter() { registry = PrometheusRegistry.defaultRegistry; + collector = PrometheusCollector.register(registry); } // for testing - KafkaPrometheusMetricsReporter(PrometheusRegistry registry) { + KafkaPrometheusMetricsReporter(PrometheusRegistry registry, PrometheusCollector collector) { this.registry = registry; + this.collector = collector; } @Override public void configure(Map map) { config = new PrometheusMetricsReporterConfig(map, registry); - collector = new KafkaMetricsCollector(); - // Add JVM metrics - JvmMetrics.builder().register(); httpServer = config.startHttpServer(); LOG.debug("KafkaPrometheusMetricsReporter configured with {}", config); } @Override public void init(List metrics) { - registry.register(collector); for (KafkaMetric metric : metrics) { metricChange(metric); } @@ -75,18 +71,17 @@ public void metricChange(KafkaMetric metric) { LOG.trace("Ignoring metric {} as it does not match the allowlist", prometheusName); } else { MetricWrapper metricWrapper = new MetricWrapper(prometheusName, metric, metric.metricName().name()); - collector.addMetric(metric.metricName(), metricWrapper); + collector.addKafkaMetric(metric.metricName(), metricWrapper); } } @Override public void metricRemoval(KafkaMetric metric) { - collector.removeMetric(metric.metricName()); + collector.removeKafkaMetric(metric.metricName()); } @Override public void close() { - registry.unregister(collector); httpServer.ifPresent(HttpServers::release); } diff --git a/src/main/java/io/strimzi/kafka/metrics/YammerMetricsCollector.java b/src/main/java/io/strimzi/kafka/metrics/PrometheusCollector.java similarity index 58% rename from src/main/java/io/strimzi/kafka/metrics/YammerMetricsCollector.java rename to src/main/java/io/strimzi/kafka/metrics/PrometheusCollector.java index 1bab0f1..838502c 100644 --- a/src/main/java/io/strimzi/kafka/metrics/YammerMetricsCollector.java +++ b/src/main/java/io/strimzi/kafka/metrics/PrometheusCollector.java @@ -8,10 +8,11 @@ import com.yammer.metrics.core.Gauge; import com.yammer.metrics.core.Histogram; import com.yammer.metrics.core.Meter; -import com.yammer.metrics.core.MetricName; import com.yammer.metrics.core.Sampling; import com.yammer.metrics.core.Timer; +import io.prometheus.metrics.instrumentation.jvm.JvmMetrics; import io.prometheus.metrics.model.registry.MultiCollector; +import io.prometheus.metrics.model.registry.PrometheusRegistry; import io.prometheus.metrics.model.snapshots.CounterSnapshot; import io.prometheus.metrics.model.snapshots.GaugeSnapshot; import io.prometheus.metrics.model.snapshots.InfoSnapshot; @@ -21,6 +22,8 @@ import io.prometheus.metrics.model.snapshots.Quantile; import io.prometheus.metrics.model.snapshots.Quantiles; import io.prometheus.metrics.model.snapshots.SummarySnapshot; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.KafkaMetric; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,43 +33,135 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; /** - * Prometheus Collector to store and export metrics retrieved by {@link YammerPrometheusMetricsReporter}. + * Prometheus Collector to store and export metrics retrieved by {@link KafkaPrometheusMetricsReporter} + * and {@link YammerPrometheusMetricsReporter}. */ @SuppressWarnings("ClassFanOutComplexity") -public class YammerMetricsCollector implements MultiCollector { +public class PrometheusCollector implements MultiCollector { - private static final Logger LOG = LoggerFactory.getLogger(YammerMetricsCollector.class); + private static final Logger LOG = LoggerFactory.getLogger(PrometheusCollector.class); + private static final AtomicBoolean REGISTERED = new AtomicBoolean(false); + private static final PrometheusCollector INSTANCE = new PrometheusCollector(); private static final List QUANTILES = Arrays.asList(0.50, 0.75, 0.95, 0.98, 0.99, 0.999); - private final Map metrics; + private final Map kafkaMetrics = new ConcurrentHashMap<>(); + private final Map yammerMetrics = new ConcurrentHashMap<>(); + + /* test */ PrometheusCollector() { } /** - * Constructor + * Add a Kafka metric to be collected. + * + * @param name The name of the Kafka metric to add. + * @param metric The Kafka metric to add. */ - public YammerMetricsCollector() { - this.metrics = new ConcurrentHashMap<>(); + public void addKafkaMetric(MetricName name, MetricWrapper metric) { + kafkaMetrics.put(name, metric); + } + + /** + * Remove a Kafka metric from collection. + * + * @param name The name of Kafka metric to remove. + */ + public void removeKafkaMetric(MetricName name) { + kafkaMetrics.remove(name); + } + + /** + * Add a Yammer metric to be collected. + * + * @param name The name of the Yammer metric to add. + * @param metric The Yammer metric to add. + */ + public void addYammerMetric(com.yammer.metrics.core.MetricName name, MetricWrapper metric) { + yammerMetrics.put(name, metric); + } + + /** + * Remove a Yammer metric from collection. + * + * @param name The name of the Yammer metric to remove. + */ + public void removeYammerMetric(com.yammer.metrics.core.MetricName name) { + yammerMetrics.remove(name); + } + + /** + * Register this Collector with the provided Prometheus registry + * @param registry The Prometheus registry + * @return The PrometheusCollector instance + */ + public static PrometheusCollector register(PrometheusRegistry registry) { + if (REGISTERED.compareAndSet(false, true)) { + // Add JVM metrics + JvmMetrics.builder().register(registry); + registry.register(INSTANCE); + } + return INSTANCE; } /** * Called when the Prometheus server scrapes metrics. - * @return metrics that match the configured allowlist + * @return MetricSnapshots that contains the metrics */ @Override - @SuppressWarnings({"CyclomaticComplexity", "NPathComplexity", "JavaNCSS"}) public MetricSnapshots collect() { Map counterBuilders = new HashMap<>(); Map gaugeBuilders = new HashMap<>(); Map infoBuilders = new HashMap<>(); Map summaryBuilders = new HashMap<>(); - for (Map.Entry entry : metrics.entrySet()) { - MetricWrapper metricWrapper = entry.getValue(); + collectKafkaMetrics(gaugeBuilders, infoBuilders); + collectYammerMetrics(counterBuilders, gaugeBuilders, infoBuilders, summaryBuilders); + + List snapshots = new ArrayList<>(); + for (GaugeSnapshot.Builder builder : gaugeBuilders.values()) { + snapshots.add(builder.build()); + } + for (CounterSnapshot.Builder builder : counterBuilders.values()) { + snapshots.add(builder.build()); + } + for (InfoSnapshot.Builder builder : infoBuilders.values()) { + snapshots.add(builder.build()); + } + for (SummarySnapshot.Builder builder : summaryBuilders.values()) { + snapshots.add(builder.build()); + } + return new MetricSnapshots(snapshots); + } + + private void collectKafkaMetrics(Map gaugeBuilders, + Map infoBuilders) { + for (MetricWrapper metricWrapper : kafkaMetrics.values()) { + String prometheusMetricName = metricWrapper.prometheusName(); + Object metricValue = ((KafkaMetric) metricWrapper.metric()).metricValue(); + Labels labels = metricWrapper.labels(); + LOG.debug("Collecting Kafka metric {} with the following labels: {}", prometheusMetricName, labels); + + if (metricValue instanceof Number) { + double value = ((Number) metricValue).doubleValue(); + GaugeSnapshot.Builder builder = gaugeBuilders.computeIfAbsent(prometheusMetricName, k -> GaugeSnapshot.builder().name(prometheusMetricName)); + builder.dataPoint(DataPointSnapshotBuilder.gaugeDataPoint(labels, value)); + } else { + InfoSnapshot.Builder builder = infoBuilders.computeIfAbsent(prometheusMetricName, k -> InfoSnapshot.builder().name(prometheusMetricName)); + builder.dataPoint(DataPointSnapshotBuilder.infoDataPoint(labels, metricValue, metricWrapper.attribute())); + } + } + } + + private void collectYammerMetrics(Map counterBuilders, + Map gaugeBuilders, + Map infoBuilders, + Map summaryBuilders) { + for (MetricWrapper metricWrapper : yammerMetrics.values()) { String prometheusMetricName = metricWrapper.prometheusName(); Object metric = metricWrapper.metric(); Labels labels = metricWrapper.labels(); - LOG.debug("Collecting metric {} with the following labels: {}", prometheusMetricName, labels); + LOG.debug("Collecting Yammer metric {} with the following labels: {}", prometheusMetricName, labels); if (metric instanceof Counter) { Counter counter = (Counter) metric; @@ -98,39 +193,6 @@ public MetricSnapshots collect() { LOG.error("The metric {} has an unexpected type: {}", prometheusMetricName, metric.getClass().getName()); } } - List snapshots = new ArrayList<>(); - for (GaugeSnapshot.Builder builder : gaugeBuilders.values()) { - snapshots.add(builder.build()); - } - for (CounterSnapshot.Builder builder : counterBuilders.values()) { - snapshots.add(builder.build()); - } - for (InfoSnapshot.Builder builder : infoBuilders.values()) { - snapshots.add(builder.build()); - } - for (SummarySnapshot.Builder builder : summaryBuilders.values()) { - snapshots.add(builder.build()); - } - return new MetricSnapshots(snapshots); - } - - /** - * Add a Yammer metric to be collected. - * - * @param name The name of the Yammer metric to add. - * @param metric The Yammer metric to add. - */ - public void addMetric(MetricName name, MetricWrapper metric) { - metrics.put(name, metric); - } - - /** - * Remove a Yammer metric from collection. - * - * @param name The name of the Yammer metric to remove. - */ - public void removeMetric(MetricName name) { - metrics.remove(name); } private static Quantiles quantiles(Sampling sampling) { diff --git a/src/main/java/io/strimzi/kafka/metrics/YammerPrometheusMetricsReporter.java b/src/main/java/io/strimzi/kafka/metrics/YammerPrometheusMetricsReporter.java index 79701a7..0f12d70 100644 --- a/src/main/java/io/strimzi/kafka/metrics/YammerPrometheusMetricsReporter.java +++ b/src/main/java/io/strimzi/kafka/metrics/YammerPrometheusMetricsReporter.java @@ -27,8 +27,7 @@ public class YammerPrometheusMetricsReporter implements KafkaMetricsReporter, Me private static final Logger LOG = LoggerFactory.getLogger(YammerPrometheusMetricsReporter.class); private final PrometheusRegistry registry; - @SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the init method - private YammerMetricsCollector collector; + private final PrometheusCollector collector; @SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the init method /* test */ PrometheusMetricsReporterConfig config; @@ -36,19 +35,19 @@ public class YammerPrometheusMetricsReporter implements KafkaMetricsReporter, Me * Constructor */ public YammerPrometheusMetricsReporter() { - this(PrometheusRegistry.defaultRegistry); + this.registry = PrometheusRegistry.defaultRegistry; + this.collector = PrometheusCollector.register(registry); } // for testing - YammerPrometheusMetricsReporter(PrometheusRegistry registry) { + YammerPrometheusMetricsReporter(PrometheusRegistry registry, PrometheusCollector collector) { this.registry = registry; + this.collector = collector; } @Override public void init(VerifiableProperties props) { config = new PrometheusMetricsReporterConfig(props.props(), registry); - collector = new YammerMetricsCollector(); - registry.register(collector); for (MetricsRegistry yammerRegistry : Arrays.asList(KafkaYammerMetrics.defaultRegistry(), Metrics.defaultRegistry())) { yammerRegistry.addListener(this); } @@ -62,12 +61,12 @@ public void onMetricAdded(MetricName name, Metric metric) { LOG.trace("Ignoring metric {} as it does not match the allowlist", prometheusName); } else { MetricWrapper metricWrapper = new MetricWrapper(prometheusName, name.getScope(), metric, name.getName()); - collector.addMetric(name, metricWrapper); + collector.addYammerMetric(name, metricWrapper); } } @Override public void onMetricRemoved(MetricName name) { - collector.removeMetric(name); + collector.removeYammerMetric(name); } } diff --git a/src/test/java/io/strimzi/kafka/metrics/KafkaMetricsCollectorTest.java b/src/test/java/io/strimzi/kafka/metrics/KafkaMetricsCollectorTest.java deleted file mode 100644 index 6844d0c..0000000 --- a/src/test/java/io/strimzi/kafka/metrics/KafkaMetricsCollectorTest.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Copyright Strimzi authors. - * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). - */ -package io.strimzi.kafka.metrics; - -import io.prometheus.metrics.model.snapshots.GaugeSnapshot; -import io.prometheus.metrics.model.snapshots.InfoSnapshot; -import io.prometheus.metrics.model.snapshots.Labels; -import io.prometheus.metrics.model.snapshots.MetricSnapshot; -import io.prometheus.metrics.model.snapshots.MetricSnapshots; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.metrics.Gauge; -import org.apache.kafka.common.metrics.KafkaMetric; -import org.apache.kafka.common.metrics.Measurable; -import org.apache.kafka.common.metrics.MetricConfig; -import org.apache.kafka.common.utils.Time; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.util.LinkedHashMap; -import java.util.Map; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertInstanceOf; - -public class KafkaMetricsCollectorTest { - - private static final String METRIC_PREFIX = "kafka.server"; - private final MetricConfig metricConfig = new MetricConfig(); - private final Time time = Time.SYSTEM; - private Map tagsMap; - private Labels labels; - - @BeforeEach - public void setup() { - tagsMap = new LinkedHashMap<>(); - tagsMap.put("k1", "v1"); - tagsMap.put("k2", "v2"); - Labels.Builder labelsBuilder = Labels.builder(); - for (Map.Entry tag : tagsMap.entrySet()) { - labelsBuilder.label(tag.getKey(), tag.getValue()); - } - labels = labelsBuilder.build(); - } - - @Test - public void testCollect() { - KafkaMetricsCollector collector = new KafkaMetricsCollector(); - - MetricSnapshots metrics = collector.collect(); - assertEquals(0, metrics.size()); - - // Adding a metric - MetricName metricName = new MetricName("name", "group", "description", tagsMap); - MetricWrapper metricWrapper = newMetric(metricName, 1.0); - - collector.addMetric(metricName, metricWrapper); - metrics = collector.collect(); - assertEquals(1, metrics.size()); - - MetricSnapshot snapshot = metrics.get(0); - assertGaugeSnapshot(snapshot, 1.0, labels); - - // Updating the value of the metric - collector.addMetric(metricName, newMetric(metricName, 3.0)); - metrics = collector.collect(); - assertEquals(1, metrics.size()); - - MetricSnapshot updatedSnapshot = metrics.get(0); - assertGaugeSnapshot(updatedSnapshot, 3.0, labels); - - // Removing a metric - collector.removeMetric(metricName); - metrics = collector.collect(); - assertEquals(0, metrics.size()); - } - - @Test - public void testCollectNonNumericMetric() { - KafkaMetricsCollector collector = new KafkaMetricsCollector(); - - MetricSnapshots metrics = collector.collect(); - assertEquals(0, metrics.size()); - - // Adding a non-numeric metric converted - String nonNumericValue = "myValue"; - MetricName metricName = new MetricName("name", "group", "description", tagsMap); - MetricWrapper metricWrapper = newNonNumericMetric(metricName, nonNumericValue); - collector.addMetric(metricName, metricWrapper); - metrics = collector.collect(); - - assertEquals(1, metrics.size()); - MetricSnapshot snapshot = metrics.get(0); - assertInstanceOf(InfoSnapshot.class, snapshot); - assertEquals(1, snapshot.getDataPoints().size()); - Labels expectedLabels = labels.add("name", nonNumericValue); - assertEquals(expectedLabels, snapshot.getDataPoints().get(0).getLabels()); - } - - private void assertGaugeSnapshot(MetricSnapshot snapshot, double expectedValue, Labels expectedLabels) { - assertInstanceOf(GaugeSnapshot.class, snapshot); - GaugeSnapshot gaugeSnapshot = (GaugeSnapshot) snapshot; - assertEquals(1, gaugeSnapshot.getDataPoints().size()); - GaugeSnapshot.GaugeDataPointSnapshot datapoint = gaugeSnapshot.getDataPoints().get(0); - assertEquals(expectedValue, datapoint.getValue()); - assertEquals(expectedLabels, datapoint.getLabels()); - } - - private MetricWrapper newMetric(MetricName metricName, double value) { - Measurable measurable = (config, now) -> value; - KafkaMetric kafkaMetric = new KafkaMetric( - new Object(), - metricName, - measurable, - metricConfig, - time); - String prometheusName = MetricWrapper.prometheusName(METRIC_PREFIX, metricName); - return new MetricWrapper(prometheusName, kafkaMetric, metricName.name()); - } - - private MetricWrapper newNonNumericMetric(MetricName metricName, String value) { - Gauge gauge = (config, now) -> value; - KafkaMetric kafkaMetric = new KafkaMetric( - new Object(), - metricName, - gauge, - metricConfig, - time); - String prometheusName = MetricWrapper.prometheusName(METRIC_PREFIX, metricName); - return new MetricWrapper(prometheusName, kafkaMetric, metricName.name()); - } - -} diff --git a/src/test/java/io/strimzi/kafka/metrics/KafkaPrometheusMetricsReporterTest.java b/src/test/java/io/strimzi/kafka/metrics/KafkaPrometheusMetricsReporterTest.java index bf0e487..407e0fa 100644 --- a/src/test/java/io/strimzi/kafka/metrics/KafkaPrometheusMetricsReporterTest.java +++ b/src/test/java/io/strimzi/kafka/metrics/KafkaPrometheusMetricsReporterTest.java @@ -5,138 +5,104 @@ package io.strimzi.kafka.metrics; import io.prometheus.metrics.model.registry.PrometheusRegistry; -import org.apache.kafka.common.MetricName; -import org.apache.kafka.common.metrics.Gauge; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.KafkaMetricsContext; -import org.apache.kafka.common.metrics.Measurable; -import org.apache.kafka.common.metrics.MetricConfig; -import org.apache.kafka.common.utils.Time; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.io.BufferedReader; -import java.io.InputStreamReader; -import java.net.HttpURLConnection; -import java.net.URL; -import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import static io.strimzi.kafka.metrics.MetricsUtils.getMetrics; +import static io.strimzi.kafka.metrics.MetricsUtils.newKafkaMetric; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; public class KafkaPrometheusMetricsReporterTest { - private final MetricConfig metricConfig = new MetricConfig(); - private final Time time = Time.SYSTEM; + private final Map labels = Collections.singletonMap("key", "value"); + private Map configs; + private PrometheusRegistry registry; + private PrometheusCollector collector; + + @BeforeEach + public void setup() { + configs = new HashMap<>(); + configs.put(PrometheusMetricsReporterConfig.LISTENER_CONFIG, "http://:0"); + registry = new PrometheusRegistry(); + collector = new PrometheusCollector(); + registry.register(collector); + } @Test public void testLifeCycle() throws Exception { - KafkaPrometheusMetricsReporter reporter = new KafkaPrometheusMetricsReporter(new PrometheusRegistry()); - Map configs = new HashMap<>(); - configs.put(PrometheusMetricsReporterConfig.LISTENER_CONFIG, "http://:0"); + KafkaPrometheusMetricsReporter reporter = new KafkaPrometheusMetricsReporter(registry, collector); configs.put(PrometheusMetricsReporterConfig.ALLOWLIST_CONFIG, "kafka_server_group_name.*"); reporter.configure(configs); reporter.contextChange(new KafkaMetricsContext("kafka.server")); int port = reporter.getPort().orElseThrow(); - // initialMetrics is used because JVM metrics are added to the registry - int initialMetrics = getMetrics(port).size(); + assertEquals(0, getMetrics(port).size()); // Adding a metric not matching the allowlist does nothing - KafkaMetric metric1 = buildMetric("other", "group", 0); + KafkaMetric metric1 = newKafkaMetric("other", "group", (config, now) -> 0, labels); reporter.init(Collections.singletonList(metric1)); List metrics = getMetrics(port); - assertEquals(initialMetrics, metrics.size()); + assertEquals(0, metrics.size()); // Adding a metric that matches the allowlist - KafkaMetric metric2 = buildMetric("name", "group", 0); + KafkaMetric metric2 = newKafkaMetric("name", "group", (config, now) -> 0, labels); reporter.metricChange(metric2); metrics = getMetrics(port); - assertEquals(initialMetrics + 1, metrics.size()); + assertEquals(1, metrics.size()); // Adding a non-numeric metric - KafkaMetric metric3 = buildNonNumericMetric("name1", "group"); + KafkaMetric metric3 = newKafkaMetric("name1", "group", (config, now) -> "hello", labels); reporter.metricChange(metric3); metrics = getMetrics(port); - assertEquals(initialMetrics + 2, metrics.size()); + assertEquals(2, metrics.size()); // Removing a metric reporter.metricRemoval(metric3); metrics = getMetrics(port); - assertEquals(initialMetrics + 1, metrics.size()); + assertEquals(1, metrics.size()); reporter.close(); } @Test public void testMultipleReporters() throws Exception { - Map configs = new HashMap<>(); - configs.put(PrometheusMetricsReporterConfig.LISTENER_CONFIG, "http://:0"); - - KafkaPrometheusMetricsReporter reporter1 = new KafkaPrometheusMetricsReporter(new PrometheusRegistry()); + KafkaPrometheusMetricsReporter reporter1 = new KafkaPrometheusMetricsReporter(registry, collector); reporter1.configure(configs); reporter1.contextChange(new KafkaMetricsContext("kafka.server")); - Optional port = reporter1.getPort(); - assertTrue(port.isPresent()); - int initialMetrics = getMetrics(port.get()).size(); + Optional port1 = reporter1.getPort(); + assertTrue(port1.isPresent()); + assertEquals(0, getMetrics(port1.get()).size()); - KafkaPrometheusMetricsReporter reporter2 = new KafkaPrometheusMetricsReporter(new PrometheusRegistry()); - configs.put(PrometheusMetricsReporterConfig.LISTENER_CONFIG, "http://:0"); + KafkaPrometheusMetricsReporter reporter2 = new KafkaPrometheusMetricsReporter(registry, collector); reporter2.configure(configs); reporter2.contextChange(new KafkaMetricsContext("kafka.server")); + Optional port2 = reporter1.getPort(); + assertTrue(port2.isPresent()); + assertEquals(0, getMetrics(port2.get()).size()); - KafkaMetric metric1 = buildMetric("name1", "group", 0); - reporter1.init(Collections.singletonList(metric1)); + assertEquals(port1, port2); - KafkaMetric metric2 = buildMetric("name2", "group", 0); + KafkaMetric metric1 = newKafkaMetric("name", "group", (config, now) -> 0, Collections.singletonMap("name", "metric1")); + reporter1.init(Collections.singletonList(metric1)); + KafkaMetric metric2 = newKafkaMetric("name", "group", (config, now) -> 0, Collections.singletonMap("name", "metric2")); reporter2.init(Collections.singletonList(metric2)); + assertEquals(2, getMetrics(port1.get()).size()); - int endMetrics = getMetrics(port.get()).size(); - assertTrue(initialMetrics < endMetrics); - + reporter1.metricRemoval(metric1); reporter1.close(); - reporter2.close(); - } - private KafkaMetric buildMetric(String name, String group, double value) { - Measurable measurable = (config, now) -> value; - return new KafkaMetric( - new Object(), - new MetricName(name, group, "", labels), - measurable, - metricConfig, - time); - } - - private KafkaMetric buildNonNumericMetric(String name, String group) { - Gauge measurable = (config, now) -> "hello"; - return new KafkaMetric( - new Object(), - new MetricName(name, group, "", labels), - measurable, - metricConfig, - time); - } + assertEquals(1, getMetrics(port1.get()).size()); - private List getMetrics(int port) throws Exception { - List metrics = new ArrayList<>(); - URL url = new URL("http://localhost:" + port + "/metrics"); - HttpURLConnection con = (HttpURLConnection) url.openConnection(); - con.setRequestMethod("GET"); - - try (BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()))) { - String inputLine; - while ((inputLine = in.readLine()) != null) { - if (!inputLine.startsWith("#")) { - metrics.add(inputLine); - } - } - } - return metrics; + reporter2.close(); } } diff --git a/src/test/java/io/strimzi/kafka/metrics/MetricWrapperTest.java b/src/test/java/io/strimzi/kafka/metrics/MetricWrapperTest.java index 0b43ae1..4e41b87 100644 --- a/src/test/java/io/strimzi/kafka/metrics/MetricWrapperTest.java +++ b/src/test/java/io/strimzi/kafka/metrics/MetricWrapperTest.java @@ -4,16 +4,11 @@ */ package io.strimzi.kafka.metrics; -import com.yammer.metrics.Metrics; -import com.yammer.metrics.core.Counter; +import com.yammer.metrics.core.Gauge; import com.yammer.metrics.core.MetricName; -import com.yammer.metrics.core.MetricsRegistry; import io.prometheus.metrics.model.snapshots.Labels; import io.prometheus.metrics.model.snapshots.PrometheusNaming; -import org.apache.kafka.common.metrics.Gauge; import org.apache.kafka.common.metrics.KafkaMetric; -import org.apache.kafka.common.metrics.MetricConfig; -import org.apache.kafka.common.utils.Time; import org.junit.jupiter.api.Test; import java.util.Collections; @@ -21,6 +16,7 @@ import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; +import static io.strimzi.kafka.metrics.MetricsUtils.newKafkaMetric; import static org.junit.jupiter.api.Assertions.assertEquals; public class MetricWrapperTest { @@ -71,10 +67,8 @@ public void testKafkaMetricName() { @Test public void testKafkaMetric() { AtomicInteger value = new AtomicInteger(0); - org.apache.kafka.common.MetricName name = new org.apache.kafka.common.MetricName("name", "kafka.server", "", Collections.emptyMap()); - KafkaMetric metric = new KafkaMetric(new Object(), name, (Gauge) (metricConfig, l) -> value.get(), new MetricConfig(), Time.SYSTEM); - String prometheusName = MetricWrapper.prometheusName("kafka_server", name); - MetricWrapper wrapper = new MetricWrapper(prometheusName, metric, "name"); + KafkaMetric metric = newKafkaMetric("name", "group", (config, now) -> value.get(), Collections.emptyMap()); + MetricWrapper wrapper = new MetricWrapper(MetricWrapper.prometheusName("kafka_server", metric.metricName()), metric, "name"); assertEquals(value.get(), ((KafkaMetric) wrapper.metric()).metricValue()); value.incrementAndGet(); assertEquals(value.get(), ((KafkaMetric) wrapper.metric()).metricValue()); @@ -82,13 +76,12 @@ public void testKafkaMetric() { @Test public void testYammerMetric() { + AtomicInteger value = new AtomicInteger(0); MetricName name = new MetricName("group", "type", "name"); - MetricsRegistry registry = Metrics.defaultRegistry(); - Counter counter = registry.newCounter(name); - String prometheusName = MetricWrapper.prometheusName(name); - MetricWrapper wrapper = new MetricWrapper(prometheusName, "", counter, "name"); - assertEquals(counter.count(), ((Counter) wrapper.metric()).count()); - counter.inc(); - assertEquals(counter.count(), ((Counter) wrapper.metric()).count()); + Gauge metric = MetricsUtils.newYammerMetric(value::get); + MetricWrapper wrapper = new MetricWrapper(MetricWrapper.prometheusName(name), "", metric, "name"); + assertEquals(value.get(), ((Gauge) wrapper.metric()).value()); + value.incrementAndGet(); + assertEquals(value.get(), ((Gauge) wrapper.metric()).value()); } } diff --git a/src/test/java/io/strimzi/kafka/metrics/MetricsUtils.java b/src/test/java/io/strimzi/kafka/metrics/MetricsUtils.java new file mode 100644 index 0000000..68dde3f --- /dev/null +++ b/src/test/java/io/strimzi/kafka/metrics/MetricsUtils.java @@ -0,0 +1,81 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.metrics; + +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.apache.kafka.common.metrics.MetricConfig; +import org.apache.kafka.common.utils.Time; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +/** + * Utility class to create and retrieve metrics + */ +public class MetricsUtils { + + /** + * Query the HTTP endpoint and returns the output + * @param port The port to query + * @return The lines from the output + * @throws Exception If any error occurs + */ + public static List getMetrics(int port) throws Exception { + List metrics = new ArrayList<>(); + URL url = new URL("http://localhost:" + port + "/metrics"); + HttpURLConnection con = (HttpURLConnection) url.openConnection(); + con.setRequestMethod("GET"); + + try (BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()))) { + String inputLine; + while ((inputLine = in.readLine()) != null) { + if (!inputLine.startsWith("#")) { + metrics.add(inputLine); + } + } + } + return metrics; + } + + /** + * Create a new Kafka metric + * @param name The name of the metric + * @param group The group of the metric + * @param gauge The gauge providing the value of the metric + * @param labels The labels of the metric + * @return The Kafka metric + */ + public static KafkaMetric newKafkaMetric(String name, String group, Gauge gauge, Map labels) { + return new KafkaMetric( + new Object(), + new MetricName(name, group, "", labels), + gauge, + new MetricConfig(), + Time.SYSTEM); + } + + /** + * Create a new Yammer metric + * @param valueSupplier The supplier providing the value of the metric + * @return The Yammer metric + */ + public static com.yammer.metrics.core.Gauge newYammerMetric(Supplier valueSupplier) { + return new com.yammer.metrics.core.Gauge<>() { + @Override + public T value() { + return valueSupplier.get(); + } + }; + } + +} diff --git a/src/test/java/io/strimzi/kafka/metrics/PrometheusCollectorTest.java b/src/test/java/io/strimzi/kafka/metrics/PrometheusCollectorTest.java new file mode 100644 index 0000000..c7cebd1 --- /dev/null +++ b/src/test/java/io/strimzi/kafka/metrics/PrometheusCollectorTest.java @@ -0,0 +1,190 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.kafka.metrics; + +import io.prometheus.metrics.model.registry.PrometheusRegistry; +import io.prometheus.metrics.model.snapshots.GaugeSnapshot; +import io.prometheus.metrics.model.snapshots.InfoSnapshot; +import io.prometheus.metrics.model.snapshots.Labels; +import io.prometheus.metrics.model.snapshots.MetricSnapshot; +import io.prometheus.metrics.model.snapshots.MetricSnapshots; +import org.apache.kafka.common.MetricName; +import org.apache.kafka.common.metrics.Gauge; +import org.apache.kafka.common.metrics.KafkaMetric; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; + +import static io.strimzi.kafka.metrics.MetricsUtils.newKafkaMetric; +import static io.strimzi.kafka.metrics.MetricsUtils.newYammerMetric; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@SuppressWarnings("ClassFanOutComplexity") +public class PrometheusCollectorTest { + + private static final String METRIC_PREFIX = "kafka.server"; + private Map tagsMap; + private Labels labels; + private String scope; + + @BeforeEach + public void setup() { + tagsMap = new LinkedHashMap<>(); + Labels.Builder labelsBuilder = Labels.builder(); + scope = ""; + for (int i = 0; i < 2; i++) { + labelsBuilder.label("k" + i, "v" + i); + tagsMap.put("k" + i, "v" + i); + scope += "k" + i + ".v" + i + "."; + } + labels = labelsBuilder.build(); + } + + @Test + public void testRegister() { + PrometheusRegistry registry = new PrometheusRegistry(); + PrometheusCollector pc1 = PrometheusCollector.register(registry); + PrometheusCollector pc2 = PrometheusCollector.register(registry); + assertSame(pc1, pc2); + assertTrue(registry.scrape().size() > 0); + } + + @Test + public void testCollectKafkaMetrics() { + PrometheusCollector collector = new PrometheusCollector(); + + MetricSnapshots metrics = collector.collect(); + assertEquals(0, metrics.size()); + + // Adding a metric + AtomicInteger value = new AtomicInteger(1); + MetricName metricName = new MetricName("name", "group", "description", tagsMap); + MetricWrapper metricWrapper = newKafkaMetricWrapper(metricName, (config, now) -> value.get()); + collector.addKafkaMetric(metricName, metricWrapper); + + metrics = collector.collect(); + assertEquals(1, metrics.size()); + MetricSnapshot snapshot = metrics.get(0); + assertGaugeSnapshot(snapshot, value.get(), labels); + + // Updating the value of the metric + value.set(3); + metrics = collector.collect(); + assertEquals(1, metrics.size()); + MetricSnapshot updatedSnapshot = metrics.get(0); + assertGaugeSnapshot(updatedSnapshot, 3, labels); + + // Removing a metric + collector.removeKafkaMetric(metricName); + metrics = collector.collect(); + assertEquals(0, metrics.size()); + } + + @Test + public void testCollectNonNumericKafkaMetric() { + PrometheusCollector collector = new PrometheusCollector(); + + MetricSnapshots metrics = collector.collect(); + assertEquals(0, metrics.size()); + + // Adding a non-numeric metric converted + String nonNumericValue = "myValue"; + MetricName metricName = new MetricName("name", "group", "description", tagsMap); + MetricWrapper metricWrapper = newKafkaMetricWrapper(metricName, (config, now) -> nonNumericValue); + collector.addKafkaMetric(metricName, metricWrapper); + metrics = collector.collect(); + + assertEquals(1, metrics.size()); + MetricSnapshot snapshot = metrics.get(0); + assertEquals(metricWrapper.prometheusName(), snapshot.getMetadata().getName()); + assertInfoSnapshot(snapshot, "name", nonNumericValue); + } + + @Test + public void testCollectYammerMetrics() { + PrometheusCollector collector = new PrometheusCollector(); + + MetricSnapshots metrics = collector.collect(); + assertEquals(0, metrics.size()); + + // Adding a metric + AtomicInteger value = new AtomicInteger(1); + com.yammer.metrics.core.MetricName metricName = new com.yammer.metrics.core.MetricName("group", "type", "name", scope); + MetricWrapper metricWrapper = newYammerMetricWrapper(metricName, value::get); + collector.addYammerMetric(metricName, metricWrapper); + + metrics = collector.collect(); + assertEquals(1, metrics.size()); + MetricSnapshot snapshot = metrics.get(0); + assertGaugeSnapshot(snapshot, value.get(), labels); + + // Updating the value of the metric + value.set(3); + metrics = collector.collect(); + assertEquals(1, metrics.size()); + MetricSnapshot updatedSnapshot = metrics.get(0); + assertGaugeSnapshot(updatedSnapshot, 3, labels); + + // Removing the metric + collector.removeYammerMetric(metricName); + metrics = collector.collect(); + assertEquals(0, metrics.size()); + } + + @Test + public void testCollectNonNumericYammerMetrics() { + PrometheusCollector collector = new PrometheusCollector(); + + MetricSnapshots metrics = collector.collect(); + assertEquals(0, metrics.size()); + + String nonNumericValue = "value"; + com.yammer.metrics.core.MetricName metricName = new com.yammer.metrics.core.MetricName("group", "type", "name", scope); + MetricWrapper metricWrapper = newYammerMetricWrapper(metricName, () -> nonNumericValue); + collector.addYammerMetric(metricName, metricWrapper); + metrics = collector.collect(); + + assertEquals(1, metrics.size()); + MetricSnapshot snapshot = metrics.get(0); + assertEquals(metricWrapper.prometheusName(), snapshot.getMetadata().getName()); + assertInfoSnapshot(snapshot, "name", nonNumericValue); + } + + private MetricWrapper newYammerMetricWrapper(com.yammer.metrics.core.MetricName metricName, Supplier valueSupplier) { + com.yammer.metrics.core.Gauge gauge = newYammerMetric(valueSupplier); + String prometheusName = MetricWrapper.prometheusName(metricName); + return new MetricWrapper(prometheusName, metricName.getScope(), gauge, metricName.getName()); + } + + private MetricWrapper newKafkaMetricWrapper(MetricName metricName, Gauge gauge) { + KafkaMetric kafkaMetric = newKafkaMetric(metricName.name(), metricName.group(), gauge, metricName.tags()); + String prometheusName = MetricWrapper.prometheusName(METRIC_PREFIX, metricName); + return new MetricWrapper(prometheusName, kafkaMetric, metricName.name()); + } + + private void assertGaugeSnapshot(MetricSnapshot snapshot, double expectedValue, Labels expectedLabels) { + assertInstanceOf(GaugeSnapshot.class, snapshot); + GaugeSnapshot gaugeSnapshot = (GaugeSnapshot) snapshot; + assertEquals(1, gaugeSnapshot.getDataPoints().size()); + GaugeSnapshot.GaugeDataPointSnapshot datapoint = gaugeSnapshot.getDataPoints().get(0); + assertEquals(expectedValue, datapoint.getValue()); + assertEquals(expectedLabels, datapoint.getLabels()); + } + + private void assertInfoSnapshot(MetricSnapshot snapshot, String newLabelName, String newLabelValue) { + assertInstanceOf(InfoSnapshot.class, snapshot); + assertEquals(1, snapshot.getDataPoints().size()); + Labels expectedLabels = labels.add(newLabelName, newLabelValue); + assertEquals(expectedLabels, snapshot.getDataPoints().get(0).getLabels()); + } + +} diff --git a/src/test/java/io/strimzi/kafka/metrics/YammerMetricsCollectorTest.java b/src/test/java/io/strimzi/kafka/metrics/YammerMetricsCollectorTest.java deleted file mode 100644 index e95eee2..0000000 --- a/src/test/java/io/strimzi/kafka/metrics/YammerMetricsCollectorTest.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Copyright Strimzi authors. - * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). - */ -package io.strimzi.kafka.metrics; - -import com.yammer.metrics.Metrics; -import com.yammer.metrics.core.Counter; -import com.yammer.metrics.core.Gauge; -import com.yammer.metrics.core.Metric; -import com.yammer.metrics.core.MetricName; -import com.yammer.metrics.core.MetricsRegistry; -import io.prometheus.metrics.model.snapshots.CounterSnapshot; -import io.prometheus.metrics.model.snapshots.InfoSnapshot; -import io.prometheus.metrics.model.snapshots.Labels; -import io.prometheus.metrics.model.snapshots.MetricSnapshot; -import io.prometheus.metrics.model.snapshots.MetricSnapshots; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.util.Map; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertInstanceOf; - -public class YammerMetricsCollectorTest { - - private final MetricsRegistry registry = Metrics.defaultRegistry(); - private String scope; - private Labels labels; - - @BeforeEach - public void setup() { - Labels.Builder labelsBuilder = Labels.builder(); - scope = ""; - for (int i = 0; i < 2; i++) { - labelsBuilder.label("k" + i, "v" + i); - scope += "k" + i + ".v" + i + "."; - } - labels = labelsBuilder.build(); - for (Map.Entry entry : registry.allMetrics().entrySet()) { - registry.removeMetric(entry.getKey()); - } - } - - @Test - public void testCollect() { - YammerMetricsCollector collector = new YammerMetricsCollector(); - - MetricSnapshots metrics = collector.collect(); - assertEquals(0, metrics.size()); - - // Add a metric - MetricName metricName = new MetricName("group", "type", "name", scope); - MetricWrapper metricWrapper = newMetric(metricName); - collector.addMetric(metricName, metricWrapper); - metrics = collector.collect(); - assertEquals(1, metrics.size()); - - MetricSnapshot snapshot = metrics.get(0); - assertEquals(metricWrapper.prometheusName(), snapshot.getMetadata().getName()); - assertInstanceOf(CounterSnapshot.class, snapshot); - CounterSnapshot counterSnapshot = (CounterSnapshot) snapshot; - - assertEquals(1, counterSnapshot.getDataPoints().size()); - CounterSnapshot.CounterDataPointSnapshot datapoint = counterSnapshot.getDataPoints().get(0); - assertEquals(0.0, datapoint.getValue(), 0.1); - assertEquals(labels, datapoint.getLabels()); - - // Update the value of the metric - ((Counter) metricWrapper.metric()).inc(10); - metrics = collector.collect(); - - assertEquals(1, metrics.size()); - snapshot = metrics.get(0); - assertEquals(metricWrapper.prometheusName(), snapshot.getMetadata().getName()); - assertInstanceOf(CounterSnapshot.class, snapshot); - counterSnapshot = (CounterSnapshot) snapshot; - assertEquals(1, counterSnapshot.getDataPoints().size()); - datapoint = counterSnapshot.getDataPoints().get(0); - assertEquals(10.0, datapoint.getValue(), 0.1); - - // Remove the metric - collector.removeMetric(metricName); - metrics = collector.collect(); - assertEquals(0, metrics.size()); - } - - @Test - public void testCollectNonNumericMetric() { - YammerMetricsCollector collector = new YammerMetricsCollector(); - - MetricSnapshots metrics = collector.collect(); - assertEquals(0, metrics.size()); - - String nonNumericValue = "value"; - MetricName metricName = new MetricName("group", "type", "name", scope); - MetricWrapper metricWrapper = newNonNumericMetric(metricName, nonNumericValue); - collector.addMetric(metricName, metricWrapper); - metrics = collector.collect(); - - assertEquals(1, metrics.size()); - MetricSnapshot snapshot = metrics.get(0); - assertEquals(metricWrapper.prometheusName(), snapshot.getMetadata().getName()); - assertInstanceOf(InfoSnapshot.class, snapshot); - Labels expectedLabels = labels.add("name", nonNumericValue); - assertEquals(1, snapshot.getDataPoints().size()); - assertEquals(expectedLabels, snapshot.getDataPoints().get(0).getLabels()); - } - - private MetricWrapper newMetric(MetricName metricName) { - Counter counter = registry.newCounter(metricName); - String prometheusName = MetricWrapper.prometheusName(metricName); - return new MetricWrapper(prometheusName, metricName.getScope(), counter, metricName.getName()); - } - - private MetricWrapper newNonNumericMetric(MetricName metricName, String value) { - Gauge gauge = registry.newGauge(metricName, new Gauge<>() { - @Override - public String value() { - return value; - } - }); - String prometheusName = MetricWrapper.prometheusName(metricName); - return new MetricWrapper(prometheusName, metricName.getScope(), gauge, metricName.getName()); - } - -} diff --git a/src/test/java/io/strimzi/kafka/metrics/YammerPrometheusMetricsReporterTest.java b/src/test/java/io/strimzi/kafka/metrics/YammerPrometheusMetricsReporterTest.java index 7844dae..fce15c0 100644 --- a/src/test/java/io/strimzi/kafka/metrics/YammerPrometheusMetricsReporterTest.java +++ b/src/test/java/io/strimzi/kafka/metrics/YammerPrometheusMetricsReporterTest.java @@ -8,39 +8,39 @@ import com.yammer.metrics.core.Counter; import com.yammer.metrics.core.Metric; import com.yammer.metrics.core.MetricName; -import com.yammer.metrics.core.MetricsRegistry; import io.prometheus.metrics.model.registry.PrometheusRegistry; import kafka.utils.VerifiableProperties; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import java.io.BufferedReader; -import java.io.InputStreamReader; -import java.net.HttpURLConnection; -import java.net.URL; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; +import static io.strimzi.kafka.metrics.MetricsUtils.getMetrics; import static org.junit.jupiter.api.Assertions.assertEquals; public class YammerPrometheusMetricsReporterTest { - private final MetricsRegistry registry = Metrics.defaultRegistry(); + private Properties configs; + private PrometheusRegistry registry; + private PrometheusCollector collector; @BeforeEach public void setup() { - for (Map.Entry entry : registry.allMetrics().entrySet()) { - registry.removeMetric(entry.getKey()); + registry = new PrometheusRegistry(); + collector = new PrometheusCollector(); + registry.register(collector); + configs = new Properties(); + configs.put(PrometheusMetricsReporterConfig.LISTENER_CONFIG, "http://:0"); + for (Map.Entry entry : Metrics.defaultRegistry().allMetrics().entrySet()) { + Metrics.defaultRegistry().removeMetric(entry.getKey()); } } @Test public void testLifeCycle() throws Exception { - YammerPrometheusMetricsReporter reporter = new YammerPrometheusMetricsReporter(new PrometheusRegistry()); - Properties configs = new Properties(); - configs.put(PrometheusMetricsReporterConfig.LISTENER_CONFIG, "http://:0"); + YammerPrometheusMetricsReporter reporter = new YammerPrometheusMetricsReporter(registry, collector); configs.put(PrometheusMetricsReporterConfig.ALLOWLIST_CONFIG, "kafka_server_group_type.*"); reporter.init(new VerifiableProperties(configs)); @@ -70,30 +70,13 @@ public void testLifeCycle() throws Exception { } } - private List getMetrics(int port) throws Exception { - List metrics = new ArrayList<>(); - URL url = new URL("http://localhost:" + port + "/metrics"); - HttpURLConnection con = (HttpURLConnection) url.openConnection(); - con.setRequestMethod("GET"); - - try (BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()))) { - String inputLine; - while ((inputLine = in.readLine()) != null) { - if (!inputLine.startsWith("#")) { - metrics.add(inputLine); - } - } - } - return metrics; - } - private Counter newCounter(String group, String type, String name) { MetricName metricName = new MetricName(group, type, name, ""); - return registry.newCounter(metricName); + return Metrics.defaultRegistry().newCounter(metricName); } private void removeMetric(String group, String type, String name) { MetricName metricName = new MetricName(group, type, name, ""); - registry.removeMetric(metricName); + Metrics.defaultRegistry().removeMetric(metricName); } }