-
Notifications
You must be signed in to change notification settings - Fork 4.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kafka add gauge v1 #33408
base: master
Are you sure you want to change the base?
Kafka add gauge v1 #33408
Conversation
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## master #33408 +/- ##
============================================
+ Coverage 58.86% 58.95% +0.08%
- Complexity 3112 3183 +71
============================================
Files 1130 1133 +3
Lines 174419 174576 +157
Branches 3343 3366 +23
============================================
+ Hits 102680 102916 +236
+ Misses 68392 68312 -80
- Partials 3347 3348 +1
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. |
2791b23
to
b91a77f
Compare
R: @sjvanrossum for the kafka io part, thanks in advance! |
Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment |
b91a77f
to
948dfe6
Compare
Run Java PreCommit |
R: @johnjcasey for the sdk portion of it. |
.../java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverter.java
Outdated
Show resolved
Hide resolved
Run Java_Pulsar_IO_Direct PreCommit |
11bee27
to
b9d2f2b
Compare
Run Java PreCommit |
Run Java_GCP_IO_Direct PreCommit |
Run Java PreCommit |
Run Java PreCommit |
Run Java_GCP_IO_Direct PreCommit |
1 similar comment
Run Java_GCP_IO_Direct PreCommit |
Run Java PreCommit |
sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/NoOpGauge.java
Outdated
Show resolved
Hide resolved
...a/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java
Outdated
Show resolved
Hide resolved
/** | ||
* @param topicName topicName | ||
* @param partitionId partitionId for the topic Only included in the metric key if | ||
* 'supportsMetricsDeletion' is enabled. | ||
* @param backlog backlog for the topic Only included in the metric key if | ||
* 'supportsMetricsDeletion' is enabled. | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
" Only" -> ". Only"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed, thanks for catching that.
@@ -71,11 +79,17 @@ abstract class KafkaMetricsImpl implements KafkaMetrics { | |||
|
|||
abstract HashMap<String, ConcurrentLinkedQueue<Duration>> perTopicRpcLatencies(); | |||
|
|||
static ConcurrentHashMap<String, Gauge> backlogGauges = new ConcurrentHashMap<String, Gauge>(); | |||
|
|||
abstract HashMap<String, Long> perTopicPartitionBacklogs(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If an instance of this class may be concurrently updated, then HashMap
needs to be replaced (ditto for the existing HashMap
fields). Use ConcurrentHashMap
instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Slightly unrelated, but why doesn't perTopicRpcLatencies
use a gauge or sum as the value type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would the sum represent? the sum of latencies? but each individual one is important, and a sum would lose information.
A gauge isn't quite clear either, if you have two concurrent rpcs that completed, what value do you return?
A histogram of values provides more information (and allows us to see the spread of values)
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java
Outdated
Show resolved
Hide resolved
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java
Outdated
Show resolved
Hide resolved
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java
Outdated
Show resolved
Hide resolved
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java
Outdated
Show resolved
Hide resolved
@@ -743,6 +747,16 @@ private void reportBacklog() { | |||
backlogElementsOfSplit.set(splitBacklogMessages); | |||
} | |||
|
|||
private void reportBacklogMetrics() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this can be merged with reportBacklog
(potentially rename that method to reportBacklogMetrics
updateBacklogMetrics
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, I explicitly moved it out to be separate, since reportBacklog() is called twice, and we only need to do this once (when we advance to the next record).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM
ae2d885
to
0972ff3
Compare
...a/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java
Outdated
Show resolved
Hide resolved
static ConcurrentHashMap<String, Histogram> latencyHistograms = | ||
new ConcurrentHashMap<String, Histogram>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static ConcurrentHashMap<String, Histogram> latencyHistograms = | |
new ConcurrentHashMap<String, Histogram>(); | |
private static final Map<String, Histogram> LATENCY_HISTOGRAMS = | |
new ConcurrentHashMap<>(); |
|
||
abstract HashMap<String, ConcurrentLinkedQueue<Duration>> perTopicRpcLatencies(); | ||
static ConcurrentHashMap<String, Gauge> backlogGauges = new ConcurrentHashMap<String, Gauge>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static ConcurrentHashMap<String, Gauge> backlogGauges = new ConcurrentHashMap<String, Gauge>(); | |
private static final Map<String, Gauge> BACKLOG_GAUGES = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm... I searched for backlogGauges
, but I'm not finding any uses of this it.
Is this still needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed.
@@ -86,13 +100,26 @@ public void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime) { | |||
if (latencies == null) { | |||
latencies = new ConcurrentLinkedQueue<Duration>(); | |||
latencies.add(elapsedTime); | |||
perTopicRpcLatencies().put(topic, latencies); | |||
perTopicRpcLatencies().putIfAbsent(topic, latencies); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I'm not mistaken, L99-106 can be replaced with perTopicRpcLatencies().computeIfAbsent(topic, ConcurrentLinkedQueue::new).add(elapsedTime);
which reduces locking operations. It also changes what happens in L100-104, which currently drops elapsedTime
if the call at 103 observes that a write for that key had happened in between.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It won't accept
perTopicRpcLatencies().computeIfAbsent(topic, ConcurrentLinkedQueue::new).add(elapsedTime);
saying that it's unable to infer the type of the argument. I tried a couple of different variations, and couldn't get it to work with a lambda. It think the issue is that it takes in a string, and we want a ConcurrentQueue
of Duration
This is what the compiler wants, and I don't know how to pass in a lambda matching that signature
String,Function<? super String,? extends ConcurrentLinkedQueue<Duration>>
(my java knowledge is a limited.)
@@ -743,6 +747,16 @@ private void reportBacklog() { | |||
backlogElementsOfSplit.set(splitBacklogMessages); | |||
} | |||
|
|||
private void reportBacklogMetrics() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SGTM
f33b235
to
925290b
Compare
Run Java_IOs_Direct PreCommit |
Add per worker gauge support to add per backlog partition for kafka with java legacy worker for Dataflow
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.