-
Notifications
You must be signed in to change notification settings - Fork 592
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Make startup more robust and prevent auto topic creation when using CruiseControlMetricsReporterSampler #2211
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -6,6 +6,7 @@ | |||
|
||||
import com.linkedin.kafka.cruisecontrol.exception.SamplingException; | ||||
import com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporterConfig; | ||||
import com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsUtils; | ||||
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.CruiseControlMetric; | ||||
import java.time.Duration; | ||||
import java.util.Collections; | ||||
|
@@ -36,6 +37,10 @@ public class CruiseControlMetricsReporterSampler extends AbstractMetricSampler { | |||
// Configurations | ||||
public static final String METRIC_REPORTER_SAMPLER_BOOTSTRAP_SERVERS = "metric.reporter.sampler.bootstrap.servers"; | ||||
public static final String METRIC_REPORTER_TOPIC = "metric.reporter.topic"; | ||||
public static final String METRIC_REPORTER_TOPIC_ASSERT_ATTEMPTS = "metric.reporter.topic.assert.attempts"; | ||||
|
||||
public static final int METRIC_REPORTER_TOPIC_ASSERT_ATTEMPTS_DEFAULT = 5; | ||||
|
||||
@Deprecated | ||||
public static final String METRIC_REPORTER_SAMPLER_GROUP_ID = "metric.reporter.sampler.group.id"; | ||||
public static final Duration METRIC_REPORTER_CONSUMER_POLL_TIMEOUT = Duration.ofMillis(5000L); | ||||
|
@@ -151,13 +156,26 @@ protected boolean refreshPartitionAssignment() { | |||
return false; | ||||
} | ||||
|
||||
private boolean isMetricsTopicExists() { | ||||
Map<String, List<PartitionInfo>> topics = _metricConsumer.listTopics(); | ||||
if (!topics.containsKey(_metricReporterTopic)) { | ||||
return false; | ||||
} | ||||
return true; | ||||
} | ||||
|
||||
@Override | ||||
public void configure(Map<String, ?> configs) { | ||||
super.configure(configs); | ||||
_metricReporterTopic = (String) configs.get(METRIC_REPORTER_TOPIC); | ||||
if (_metricReporterTopic == null) { | ||||
_metricReporterTopic = CruiseControlMetricsReporterConfig.DEFAULT_CRUISE_CONTROL_METRICS_TOPIC; | ||||
} | ||||
String metricTopicAssertAttemptsStr = (String) configs.get(METRIC_REPORTER_TOPIC_ASSERT_ATTEMPTS); | ||||
int metricTopicAssertAttempts = METRIC_REPORTER_TOPIC_ASSERT_ATTEMPTS_DEFAULT; | ||||
if (metricTopicAssertAttemptsStr != null) { | ||||
metricTopicAssertAttempts = Integer.parseInt(metricTopicAssertAttemptsStr); | ||||
} | ||||
CruiseControlMetricsReporterConfig reporterConfig = new CruiseControlMetricsReporterConfig(configs, false); | ||||
_acceptableMetricRecordProduceDelayMs = ACCEPTABLE_NETWORK_DELAY_MS | ||||
+ Math.max(reporterConfig.getLong(CruiseControlMetricsReporterConfig | ||||
|
@@ -166,9 +184,16 @@ public void configure(Map<String, ?> configs) { | |||
.CRUISE_CONTROL_METRICS_REPORTER_LINGER_MS_CONFIG)); | ||||
_metricConsumer = createMetricConsumer(configs, CONSUMER_CLIENT_ID_PREFIX); | ||||
_currentPartitionAssignment = Collections.emptySet(); | ||||
|
||||
LOG.info("Waiting for metrics reporter topic [{}] to be available in the target cluster.", _metricReporterTopic); | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: calling the cluster "target" is a bit misleading as it is rather a source cluster (source of the metrics), but I think we should just say "...to be available in the Kafka cluster". There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, I did not change that, this was the original form (with "target") as you can see: Line 171 in 2b81fb1
I reused that in the new exception message. I am happy to change it if you think it is better to use the mentioned form. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed to Kafka cluster |
||||
if (!CruiseControlMetricsUtils.retry(()->!this.isMetricsTopicExists(), metricTopicAssertAttempts)) { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As I see it uses exponential backoff periods. At the 7th retry it is already bit more than 10 minutes which I think may be unreasonably long. And for the 8th retry it waits 21 minutes which may be too long between retries. I'm not sure if this is the good approach, at least not with the current parameters. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Eventually, I have applied your suggestion and changed the base to 1 and scale to 5 to retry every 5s. |
||||
throw new IllegalStateException("Cruise Control cannot find the metrics reporter topic that matches [" + _metricReporterTopic | ||||
+ "] in the target cluster."); | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same thing about "target" as above. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed to Kafka cluster |
||||
} | ||||
|
||||
if (refreshPartitionAssignment()) { | ||||
throw new IllegalStateException("Cruise Control cannot find partitions for the metrics reporter that topic matches " | ||||
+ _metricReporterTopic + " in the target cluster."); | ||||
throw new IllegalStateException("Cruise Control cannot find partitions for the metrics reporter that topic matches [" | ||||
+ _metricReporterTopic + "] in the target cluster."); | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another "target". There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Changed to Kafka cluster |
||||
} | ||||
} | ||||
|
||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we should call the config
metric.reporter.sampler.topic.assert.attempts
as it is a config of the sampler and not the reporter.Also please update the proper section of the Configurations.md file with your configuration. I think you should also mention that this uses exponential backoff and too big numbers will cause infrequent and long backoff between retries after a while.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thx, I have fixed it!