Skip to content

Commit

Permalink
Remove HTTPServer from YammerPrometheusMetricsReporter (#39)
Browse files Browse the repository at this point in the history
Signed-off-by: Mickael Maison <[email protected]>
  • Loading branch information
mimaison authored Aug 14, 2024
1 parent 586860c commit b572d91
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import com.yammer.metrics.core.MetricsRegistry;
import com.yammer.metrics.core.MetricsRegistryListener;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.prometheus.metrics.exporter.httpserver.HTTPServer;
import io.prometheus.metrics.model.registry.PrometheusRegistry;
import kafka.metrics.KafkaMetricsReporter;
import kafka.utils.VerifiableProperties;
Expand All @@ -19,7 +18,6 @@
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Optional;

/**
* KafkaMetricsReporter to export Kafka broker metrics in the Prometheus format.
Expand All @@ -32,9 +30,7 @@ public class YammerPrometheusMetricsReporter implements KafkaMetricsReporter, Me
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the init method
private YammerMetricsCollector collector;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the init method
private PrometheusMetricsReporterConfig config;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the configure method
private Optional<HTTPServer> httpServer;
/* test */ PrometheusMetricsReporterConfig config;

/**
* Constructor
Expand All @@ -56,7 +52,6 @@ public void init(VerifiableProperties props) {
for (MetricsRegistry yammerRegistry : Arrays.asList(KafkaYammerMetrics.defaultRegistry(), Metrics.defaultRegistry())) {
yammerRegistry.addListener(this);
}
httpServer = config.startHttpServer();
LOG.debug("YammerPrometheusMetricsReporter configured with {}", config);
}

Expand All @@ -75,9 +70,4 @@ public void onMetricAdded(MetricName name, Metric metric) {
public void onMetricRemoved(MetricName name) {
collector.removeMetric(name);
}

// for testing
Optional<Integer> getPort() {
return Optional.ofNullable(httpServer.isPresent() ? httpServer.get().getPort() : null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.yammer.metrics.core.Metric;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import io.prometheus.metrics.exporter.httpserver.HTTPServer;
import io.prometheus.metrics.model.registry.PrometheusRegistry;
import kafka.utils.VerifiableProperties;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -21,11 +22,9 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class YammerPrometheusMetricsReporterTest {

Expand All @@ -46,25 +45,26 @@ public void testLifeCycle() throws Exception {
configs.put(PrometheusMetricsReporterConfig.ALLOWLIST_CONFIG, "kafka_server_group_type.*");
reporter.init(new VerifiableProperties(configs));

Optional<Integer> port = reporter.getPort();
assertTrue(port.isPresent());
assertEquals(0, getMetrics(port.get()).size());
try (HTTPServer httpServer = reporter.config.startHttpServer().orElseThrow()) {
int port = httpServer.getPort();
assertEquals(0, getMetrics(port).size());

// Adding a metric not matching the allowlist does nothing
newCounter("other", "type", "name");
List<String> metrics = getMetrics(port.get());
assertEquals(0, metrics.size());
// Adding a metric not matching the allowlist does nothing
newCounter("other", "type", "name");
List<String> metrics = getMetrics(port);
assertEquals(0, metrics.size());

// Adding a metric that matches the allowlist
newCounter("group", "type", "name");
metrics = getMetrics(port.get());
assertEquals(1, metrics.size());
assertEquals("kafka_server_group_type_name_total 0.0", metrics.get(0));
// Adding a metric that matches the allowlist
newCounter("group", "type", "name");
metrics = getMetrics(port);
assertEquals(1, metrics.size());
assertEquals("kafka_server_group_type_name_total 0.0", metrics.get(0));

// Removing the metric
removeMetric("group", "type", "name");
metrics = getMetrics(port.get());
assertEquals(0, metrics.size());
// Removing the metric
removeMetric("group", "type", "name");
metrics = getMetrics(port);
assertEquals(0, metrics.size());
}
}

private List<String> getMetrics(int port) throws Exception {
Expand Down

0 comments on commit b572d91

Please sign in to comment.