Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make startup more robust and prevent auto topic creation when using CruiseControlMetricsReporterSampler #2211

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -906,6 +906,7 @@ public static <K, KT extends Deserializer<K>, V, VT extends Deserializer<V>> Con
consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer.getName());
consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer.getName());
consumerProps.setProperty(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, configs.get(RECONNECT_BACKOFF_MS_CONFIG).toString());
consumerProps.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
return new KafkaConsumer<>(consumerProps);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.linkedin.kafka.cruisecontrol.exception.SamplingException;
import com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig;
import com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsUtils;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.CruiseControlMetric;
import java.time.Duration;
import java.util.Collections;
Expand Down Expand Up @@ -36,6 +37,10 @@ public class CruiseControlMetricsReporterSampler extends AbstractMetricSampler {
// Configurations
public static final String METRIC_REPORTER_SAMPLER_BOOTSTRAP_SERVERS = "metric.reporter.sampler.bootstrap.servers";
public static final String METRIC_REPORTER_TOPIC = "metric.reporter.topic";
public static final String METRIC_REPORTER_TOPIC_ASSERT_ATTEMPTS = "metric.reporter.topic.assert.attempts";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we should call the config metric.reporter.sampler.topic.assert.attempts as it is a config of the sampler and not the reporter.

Also please update the proper section of the Configurations.md file with your configuration. I think you should also mention that this uses exponential backoff and too big numbers will cause infrequent and long backoff between retries after a while.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx, I have fixed it!


public static final int METRIC_REPORTER_TOPIC_ASSERT_ATTEMPTS_DEFAULT = 5;

@Deprecated
public static final String METRIC_REPORTER_SAMPLER_GROUP_ID = "metric.reporter.sampler.group.id";
public static final Duration METRIC_REPORTER_CONSUMER_POLL_TIMEOUT = Duration.ofMillis(5000L);
Expand Down Expand Up @@ -151,13 +156,26 @@ protected boolean refreshPartitionAssignment() {
return false;
}

private boolean isMetricsTopicExists() {
Map<String, List<PartitionInfo>> topics = _metricConsumer.listTopics();
if (!topics.containsKey(_metricReporterTopic)) {
return false;
}
return true;
}

@Override
public void configure(Map<String, ?> configs) {
super.configure(configs);
_metricReporterTopic = (String) configs.get(METRIC_REPORTER_TOPIC);
if (_metricReporterTopic == null) {
_metricReporterTopic = CruiseControlMetricsReporterConfig.DEFAULT_CRUISE_CONTROL_METRICS_TOPIC;
}
String metricTopicAssertAttemptsStr = (String) configs.get(METRIC_REPORTER_TOPIC_ASSERT_ATTEMPTS);
int metricTopicAssertAttempts = METRIC_REPORTER_TOPIC_ASSERT_ATTEMPTS_DEFAULT;
if (metricTopicAssertAttemptsStr != null) {
metricTopicAssertAttempts = Integer.parseInt(metricTopicAssertAttemptsStr);
}
CruiseControlMetricsReporterConfig reporterConfig = new CruiseControlMetricsReporterConfig(configs, false);
_acceptableMetricRecordProduceDelayMs = ACCEPTABLE_NETWORK_DELAY_MS
+ Math.max(reporterConfig.getLong(CruiseControlMetricsReporterConfig
Expand All @@ -166,9 +184,16 @@ public void configure(Map<String, ?> configs) {
.CRUISE_CONTROL_METRICS_REPORTER_LINGER_MS_CONFIG));
_metricConsumer = createMetricConsumer(configs, CONSUMER_CLIENT_ID_PREFIX);
_currentPartitionAssignment = Collections.emptySet();

LOG.info("Waiting for metrics reporter topic [{}] to be available in the target cluster.", _metricReporterTopic);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: calling the cluster "target" is a bit misleading as it is rather a source cluster (source of the metrics), but I think we should just say "...to be available in the Kafka cluster".

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I did not change that, this was the original form (with "target") as you can see:


I reused that in the new exception message.
I am happy to change it if you think it is better to use the mentioned form.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to Kafka cluster

if (!CruiseControlMetricsUtils.retry(()->!this.isMetricsTopicExists(), metricTopicAssertAttempts)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I see it uses exponential backoff periods. At the 7th retry it is already bit more than 10 minutes which I think may be unreasonably long. And for the 8th retry it waits 21 minutes which may be too long between retries. I'm not sure if this is the good approach, at least not with the current parameters.
I think we should either use 1 as a base, so we retry every 5 second for n times, or something closer to 1, like 1.25. With this parameter we can try 18 times until we get to 22 minutes total. To round it up, 20 times will get you a 35 minute total retry time. I think it's more reasonable to try 20 times in 35 minutes than 6 as it allows quicker startup.
Another approach is to use constant retry intervals with a given timeout. I think that is a more user-friendly approach as it's easier to calculate with that compared to exponents, especially that listTopics() is just a metadata call which is OK to do every 10 seconds or so with a single consumer. Overall I'm OK with the exponent approach if you or others agree on this but I favor the interval+timeout one for usability reasons. Hopefully if we set a good default, users won't have to change it too often and then it may matter less.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually, I have applied your suggestion and changed the base to 1 and scale to 5 to retry every 5s.

throw new IllegalStateException("Cruise Control cannot find the metrics reporter topic that matches [" + _metricReporterTopic
+ "] in the target cluster.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing about "target" as above.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to Kafka cluster

}

if (refreshPartitionAssignment()) {
throw new IllegalStateException("Cruise Control cannot find partitions for the metrics reporter that topic matches "
+ _metricReporterTopic + " in the target cluster.");
throw new IllegalStateException("Cruise Control cannot find partitions for the metrics reporter that topic matches ["
+ _metricReporterTopic + "] in the target cluster.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another "target".

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to Kafka cluster

}
}

Expand Down