Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kafka add gauge v1 #33408

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open

Conversation

Naireen
Copy link
Contributor

@Naireen Naireen commented Dec 17, 2024

Add per worker gauge support to add per backlog partition for kafka with java legacy worker for Dataflow

Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

Copy link

codecov bot commented Dec 18, 2024

Codecov Report

All modified and coverable lines are covered by tests ✅

Project coverage is 58.95%. Comparing base (8da9bbe) to head (b7b7e2a).
Report is 190 commits behind head on master.

Additional details and impacted files
@@             Coverage Diff              @@
##             master   #33408      +/-   ##
============================================
+ Coverage     58.86%   58.95%   +0.08%     
- Complexity     3112     3183      +71     
============================================
  Files          1130     1133       +3     
  Lines        174419   174576     +157     
  Branches       3343     3366      +23     
============================================
+ Hits         102680   102916     +236     
+ Misses        68392    68312      -80     
- Partials       3347     3348       +1     
Flag Coverage Δ
java 70.33% <ø> (+0.43%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@Naireen Naireen force-pushed the kafka_add_counters_V1 branch 4 times, most recently from 2791b23 to b91a77f Compare December 18, 2024 01:21
@Naireen Naireen marked this pull request as ready for review December 18, 2024 04:18
@Naireen
Copy link
Contributor Author

Naireen commented Dec 18, 2024

R: @sjvanrossum for the kafka io part, thanks in advance!

Copy link
Contributor

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control. If you'd like to restart, comment assign set of reviewers

@Naireen Naireen force-pushed the kafka_add_counters_V1 branch from b91a77f to 948dfe6 Compare December 18, 2024 07:34
@Naireen
Copy link
Contributor Author

Naireen commented Dec 18, 2024

Run Java PreCommit

@Naireen
Copy link
Contributor Author

Naireen commented Dec 18, 2024

R: @johnjcasey for the sdk portion of it.

@Naireen
Copy link
Contributor Author

Naireen commented Dec 19, 2024

Run Java_Pulsar_IO_Direct PreCommit

@Naireen Naireen force-pushed the kafka_add_counters_V1 branch from 11bee27 to b9d2f2b Compare December 19, 2024 19:54
@Naireen
Copy link
Contributor Author

Naireen commented Dec 19, 2024

Run Java PreCommit

@Naireen
Copy link
Contributor Author

Naireen commented Dec 19, 2024

Run Java_GCP_IO_Direct PreCommit

@Naireen
Copy link
Contributor Author

Naireen commented Dec 19, 2024

Run Java PreCommit

@Naireen
Copy link
Contributor Author

Naireen commented Dec 19, 2024

Run Java PreCommit

@Naireen
Copy link
Contributor Author

Naireen commented Dec 19, 2024

Run Java_GCP_IO_Direct PreCommit

1 similar comment
@Naireen
Copy link
Contributor Author

Naireen commented Dec 19, 2024

Run Java_GCP_IO_Direct PreCommit

@Naireen
Copy link
Contributor Author

Naireen commented Dec 19, 2024

Run Java PreCommit

@Naireen Naireen mentioned this pull request Jan 6, 2025
3 tasks
Comment on lines 110 to 112
/**
* @param topicName topicName
* @param partitionId partitionId for the topic Only included in the metric key if
* 'supportsMetricsDeletion' is enabled.
* @param backlog backlog for the topic Only included in the metric key if
* 'supportsMetricsDeletion' is enabled.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

" Only" -> ". Only"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed, thanks for catching that.

@@ -71,11 +79,17 @@ abstract class KafkaMetricsImpl implements KafkaMetrics {

abstract HashMap<String, ConcurrentLinkedQueue<Duration>> perTopicRpcLatencies();

static ConcurrentHashMap<String, Gauge> backlogGauges = new ConcurrentHashMap<String, Gauge>();

abstract HashMap<String, Long> perTopicPartitionBacklogs();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an instance of this class may be concurrently updated, then HashMap needs to be replaced (ditto for the existing HashMap fields). Use ConcurrentHashMap instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slightly unrelated, but why doesn't perTopicRpcLatencies use a gauge or sum as the value type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would the sum represent? the sum of latencies? but each individual one is important, and a sum would lose information.
A gauge isn't quite clear either, if you have two concurrent rpcs that completed, what value do you return?

A histogram of values provides more information (and allows us to see the spread of values)

@@ -743,6 +747,16 @@ private void reportBacklog() {
backlogElementsOfSplit.set(splitBacklogMessages);
}

private void reportBacklogMetrics() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this can be merged with reportBacklog (potentially rename that method to reportBacklogMetrics updateBacklogMetrics).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I explicitly moved it out to be separate, since reportBacklog() is called twice, and we only need to do this once (when we advance to the next record).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

@Naireen Naireen force-pushed the kafka_add_counters_V1 branch 3 times, most recently from ae2d885 to 0972ff3 Compare January 13, 2025 17:37
Comment on lines +77 to +78
static ConcurrentHashMap<String, Histogram> latencyHistograms =
new ConcurrentHashMap<String, Histogram>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
static ConcurrentHashMap<String, Histogram> latencyHistograms =
new ConcurrentHashMap<String, Histogram>();
private static final Map<String, Histogram> LATENCY_HISTOGRAMS =
new ConcurrentHashMap<>();


abstract HashMap<String, ConcurrentLinkedQueue<Duration>> perTopicRpcLatencies();
static ConcurrentHashMap<String, Gauge> backlogGauges = new ConcurrentHashMap<String, Gauge>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
static ConcurrentHashMap<String, Gauge> backlogGauges = new ConcurrentHashMap<String, Gauge>();
private static final Map<String, Gauge> BACKLOG_GAUGES = new ConcurrentHashMap<>();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm... I searched for backlogGauges, but I'm not finding any uses of this it.
Is this still needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed.

@@ -86,13 +100,26 @@ public void updateSuccessfulRpcMetrics(String topic, Duration elapsedTime) {
if (latencies == null) {
latencies = new ConcurrentLinkedQueue<Duration>();
latencies.add(elapsedTime);
perTopicRpcLatencies().put(topic, latencies);
perTopicRpcLatencies().putIfAbsent(topic, latencies);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I'm not mistaken, L99-106 can be replaced with perTopicRpcLatencies().computeIfAbsent(topic, ConcurrentLinkedQueue::new).add(elapsedTime); which reduces locking operations. It also changes what happens in L100-104, which currently drops elapsedTime if the call at 103 observes that a write for that key had happened in between.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It won't accept
perTopicRpcLatencies().computeIfAbsent(topic, ConcurrentLinkedQueue::new).add(elapsedTime);

saying that it's unable to infer the type of the argument. I tried a couple of different variations, and couldn't get it to work with a lambda. It think the issue is that it takes in a string, and we want a ConcurrentQueue of Duration

This is what the compiler wants, and I don't know how to pass in a lambda matching that signature
String,Function<? super String,? extends ConcurrentLinkedQueue<Duration>> (my java knowledge is a limited.)

@@ -743,6 +747,16 @@ private void reportBacklog() {
backlogElementsOfSplit.set(splitBacklogMessages);
}

private void reportBacklogMetrics() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM

@Naireen
Copy link
Contributor Author

Naireen commented Jan 17, 2025

Run Java_IOs_Direct PreCommit

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants