Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make startup more robust and prevent auto topic creation when using CruiseControlMetricsReporterSampler #2211

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,7 @@ public static <K, KT extends Deserializer<K>, V, VT extends Deserializer<V>> Con
consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getName());
consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getName());
consumerProps.setProperty(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, configs.get(RECONNECT_BACKOFF_MS_CONFIG).toString());
consumerProps.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
return new KafkaConsumer<>(consumerProps);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.linkedin.kafka.cruisecontrol.exception.SamplingException;
import com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig;
import com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsUtils;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.CruiseControlMetric;
import java.time.Duration;
import java.util.Collections;
Expand Down Expand Up @@ -36,6 +37,10 @@ public class CruiseControlMetricsReporterSampler extends AbstractMetricSampler {
// Configurations
public static final String METRIC_REPORTER_SAMPLER_BOOTSTRAP_SERVERS = "metric.reporter.sampler.bootstrap.servers";
public static final String METRIC_REPORTER_TOPIC = "metric.reporter.topic";
public static final String METRIC_REPORTER_TOPIC_ASSERT_ATTEMPTS = "metric.reporter.sampler.topic.assert.attempts";

public static final int METRIC_REPORTER_TOPIC_ASSERT_ATTEMPTS_DEFAULT = 5;

@Deprecated
public static final String METRIC_REPORTER_SAMPLER_GROUP_ID = "metric.reporter.sampler.group.id";
public static final Duration METRIC_REPORTER_CONSUMER_POLL_TIMEOUT = Duration.ofMillis(5000L);
Expand Down Expand Up @@ -151,13 +156,26 @@ protected boolean refreshPartitionAssignment() {
return false;
}

private boolean isMetricsTopicExists() {
Map<String, List<PartitionInfo>> topics = _metricConsumer.listTopics();
if (!topics.containsKey(_metricReporterTopic)) {
return false;
}
return true;
}

@Override
public void configure(Map<String, ?> configs) {
super.configure(configs);
_metricReporterTopic = (String) configs.get(METRIC_REPORTER_TOPIC);
if (_metricReporterTopic == null) {
_metricReporterTopic = CruiseControlMetricsReporterConfig.DEFAULT_CRUISE_CONTROL_METRICS_TOPIC;
}
String metricTopicAssertAttemptsStr = (String) configs.get(METRIC_REPORTER_TOPIC_ASSERT_ATTEMPTS);
int metricTopicAssertAttempts = METRIC_REPORTER_TOPIC_ASSERT_ATTEMPTS_DEFAULT;
if (metricTopicAssertAttemptsStr != null) {
metricTopicAssertAttempts = Integer.parseInt(metricTopicAssertAttemptsStr);
}
CruiseControlMetricsReporterConfig reporterConfig = new CruiseControlMetricsReporterConfig(configs, false);
_acceptableMetricRecordProduceDelayMs = ACCEPTABLE_NETWORK_DELAY_MS
+ Math.max(reporterConfig.getLong(CruiseControlMetricsReporterConfig
Expand All @@ -166,9 +184,16 @@ public void configure(Map<String, ?> configs) {
.CRUISE_CONTROL_METRICS_REPORTER_LINGER_MS_CONFIG));
_metricConsumer = createMetricConsumer(configs, CONSUMER_CLIENT_ID_PREFIX);
_currentPartitionAssignment = Collections.emptySet();

LOG.info("Waiting for metrics reporter topic [{}] to be available in the Kafka cluster.", _metricReporterTopic);
if (!CruiseControlMetricsUtils.retry(() -> !this.isMetricsTopicExists(), 5, 1, metricTopicAssertAttempts)) {
throw new IllegalStateException("Cruise Control cannot find the metrics reporter topic that matches [" + _metricReporterTopic
+ "] in the Kafka cluster.");
}

if (refreshPartitionAssignment()) {
throw new IllegalStateException("Cruise Control cannot find partitions for the metrics reporter that topic matches "
+ _metricReporterTopic + " in the target cluster.");
throw new IllegalStateException("Cruise Control cannot find partitions for the metrics reporter that topic matches ["
+ _metricReporterTopic + "] in the Kafka cluster.");
}
}

Expand Down
11 changes: 6 additions & 5 deletions docs/wiki/User Guide/Configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -287,11 +287,12 @@ We are still trying to improve cruise control. And following are some configurat
## Configurations of pluggable classes

### CruiseControlMetricsReporterSampler configurations
| Name | Type | Required? | Default Value | Description |
|-------------------------------------------|--------|-----------|------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------|
| metric.reporter.sampler.bootstrap.servers | String | N | The same as `bootstrap.servers` config from Cruise Control | The Kafka cluster to consume the interested metrics collected by CruiseControlMetricsReporter. |
| metric.reporter.topic | String | N | "__CruiseControlMetrics" | The exact topic name from which the sampler should be consuming the interested metrics from. |
| metric.reporter.sampler.group.id | String | N | 60,000 | The consumer group id to use for the consumers to consume from the Kafka cluster. |
| Name | Type | Required? | Default Value | Description |
|------------------------------------------------|---------|-----------|------------------------------------------------------------|------------------------------------------------------------------------------------------------------|
| metric.reporter.sampler.bootstrap.servers | String | N | The same as `bootstrap.servers` config from Cruise Control | The Kafka cluster to consume the interested metrics collected by CruiseControlMetricsReporter. |
| metric.reporter.topic | String | N | "__CruiseControlMetrics" | The exact topic name from which the sampler should be consuming the interested metrics from. |
| metric.reporter.sampler.group.id | String | N | 60,000 | The consumer group id to use for the consumers to consume from the Kafka cluster. |
| metric.reporter.sampler.topic.assert.attempts | Integer | N | 5 | Number of attempts while waiting for metrics topic to appear in the Kafka cluster during the startup.|

### PrometheusMetricSampler configurations
| Name | Type | Required? | Default Value | Description |
Expand Down