-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][broker] Fix rate limiting causing connections to time out #23930
base: master
Are you sure you want to change the base?
[fix][broker] Fix rate limiting causing connections to time out #23930
Conversation
…yed when there are races
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #23930 +/- ##
============================================
+ Coverage 73.57% 74.27% +0.69%
+ Complexity 32624 32272 -352
============================================
Files 1877 1853 -24
Lines 139502 143832 +4330
Branches 15299 16345 +1046
============================================
+ Hits 102638 106826 +4188
+ Misses 28908 28610 -298
- Partials 7956 8396 +440
Flags with carried forward coverage won't be shown. Click here to find out more.
|
} | ||
} | ||
|
||
@Override | ||
public void close() { | ||
thread.interrupt(); | ||
tickUpdaterThread.interrupt(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we terminate while loop with some variable stop
and update it here. instead of depending on interrupting as it can lead leak if one changes the interrupt logic in future in previous method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that there's a problem in using .interrupt()
in this case. Internally, interrupt is a flag in the Thread class.
baseSnapshotTickNanos = previousSnapshotTickNanos; | ||
if (!snapshotRequested) { | ||
// if the snapshot value is not requested, increment by the snapshot interval | ||
baseSnapshotTickNanos += snapshotIntervalNanos; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here, we assume that updateSnapshotTickNanos
gets invoked at every snapshotIntervalNanos
time so, we are adding it to baseSnapshotTickNanos
.
instead of if we pass snapshotIntervalNanos
as a variable to this method then method could be less error prone and can be also used in unit-test if needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll refactor out the separate concern to detect leaps in the clock source values and normalize the output values to a separate class since that could help unit test it separately.
long newSnapshotTickNanos = baseSnapshotTickNanos + durationSinceReference; | ||
|
||
// reset the reference clock source value if the clock source value leaps backward or forward | ||
if (newSnapshotTickNanos < previousSnapshotTickNanos - maxDelta |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now, as we are anyway, using single thread update, then can't we just avoid this complex check, and just directly use System.nanoTime()
retrieving from the single CPU and use that incremented count?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A single thread could get migrated from one CPU to another. In certain virtualized environments, the counter can leap forward or backward. It can also happen on real multi-core/multi-socket hardware with TSC clock source when the hardware doesn't support CPU instructions which are required to address challenges in synchronizing the TSC values across CPUs. Some of the low level details is explained in "TSC hardware".
The purpose of this class is to address any such problems where the time leap forward or backward and removing this logic would make this solution pointless.
Although this logic might seem complex, the problem is also a very low level problem. The logic is performant and has been validated with JMH benchmarks.
|
||
// reset the reference clock source value if the clock source value leaps backward or forward | ||
if (newSnapshotTickNanos < previousSnapshotTickNanos - maxDelta | ||
|| newSnapshotTickNanos > previousSnapshotTickNanos + maxDelta) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
btw, can we document what exactly we are doing with baseSnapshotTickNanos
to explain the logic here as it's little hard to understand what is happening here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I'll add some more comments to describe the logic.
try { | ||
producer.decrementThrottleCount(); | ||
} catch (Exception e) { | ||
log.error("Failed to unthrottle producer {}", producer, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
worth to add topic name here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The topic name is already included in producer's toString() method implementation.
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
Lines 659 to 662 in 3d0625b
public String toString() { | |
return MoreObjects.toStringHelper(this).add("topic", topic).add("client", cnx.toString()) | |
.add("producerName", producerName).add("producerId", producerId).toString(); | |
} |
} | ||
} | ||
|
||
private void updateSnapshotTickNanos(boolean snapshotRequested) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be great if we can add unit-test for this method and try to get max code-coverage from unit-test as we can see high cyclomatic complexity in this method code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added a few tests and the line coverage is now 97% (73/75) and branch coverage is 90% (18/20) with the current DefaultMonotonicSnapshotClock. There's only 2 lines and 2 branches that aren't covered which are lines 196 and 209 where there's handling for InterruptedException. Adding test coverage for those lines isn't reasonable, so this means that max code coverage is reached.
Based on metrics, the complexity of the of method code isn't high since methods are fairly small.
![image](https://private-user-images.githubusercontent.com/66864/410365882-3bf121c1-694b-4a61-bb98-90abc6710dfd.png?jwt=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJnaXRodWIuY29tIiwiYXVkIjoicmF3LmdpdGh1YnVzZXJjb250ZW50LmNvbSIsImtleSI6ImtleTUiLCJleHAiOjE3Mzg4OTA2NTEsIm5iZiI6MTczODg5MDM1MSwicGF0aCI6Ii82Njg2NC80MTAzNjU4ODItM2JmMTIxYzEtNjk0Yi00YTYxLWJiOTgtOTBhYmM2NzEwZGZkLnBuZz9YLUFtei1BbGdvcml0aG09QVdTNC1ITUFDLVNIQTI1NiZYLUFtei1DcmVkZW50aWFsPUFLSUFWQ09EWUxTQTUzUFFLNFpBJTJGMjAyNTAyMDclMkZ1cy1lYXN0LTElMkZzMyUyRmF3czRfcmVxdWVzdCZYLUFtei1EYXRlPTIwMjUwMjA3VDAxMDU1MVomWC1BbXotRXhwaXJlcz0zMDAmWC1BbXotU2lnbmF0dXJlPTIyYjcxMTVmZmI2ZWExMzI2ZmYyMDEzOTA4YjNmYWE4YTM2ZWEwZjk3NWE3OWI3ODNiYTMxZDQxNzg3MGY0OWUmWC1BbXotU2lnbmVkSGVhZGVycz1ob3N0In0.R17HNaXsCDKTWo0qCzDsAREDEGHIdr-dYrtJ95BbCW4)
for example, PersistentTopic contains methods which have high complexity according to metrics:
Metrics calculated with IntelliJ plugin "MetricsReloaded"
This reverts commit d6d60f7.
… don't cause problems in certain tests
Although clock sources aren't necessarily monotonic in all platforms, it seems that the root cause of this issue is could be different. |
I found a solution to the problem. It's possible to detect leaps backwards, but forward leaps should be unhandled since the tick updater thread could starve and time should be able to leap forward in that case when it finally gets updated. My assumption currently is that the minimal fix for #23920 is this diff: diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java
index ac9a1f03e59..36edb1c5d8a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java
@@ -159,16 +159,15 @@ public abstract class AsyncTokenBucket {
if (shouldUpdateTokensImmediately(currentNanos, forceUpdateTokens)) {
// calculate the number of new tokens since the last update
long newTokens = calculateNewTokensSinceLastUpdate(currentNanos);
- // calculate the total amount of tokens to consume in this update
// flush the pendingConsumedTokens by calling "sumThenReset"
- long totalConsumedTokens = consumeTokens + pendingConsumedTokens.sumThenReset();
+ long currentPendingConsumedTokens = pendingConsumedTokens.sumThenReset();
// update the tokens and return the current token value
return TOKENS_UPDATER.updateAndGet(this,
- currentTokens ->
- // after adding new tokens, limit the tokens to the capacity
- Math.min(currentTokens + newTokens, getCapacity())
- // subtract the consumed tokens
- - totalConsumedTokens);
+ // after adding new tokens subtract the pending consumed tokens and
+ // limit the tokens to the capacity of the bucket
+ currentTokens -> Math.min(currentTokens + newTokens - currentPendingConsumedTokens, getCapacity())
+ // subtract the consumed tokens
+ - consumeTokens);
} else {
// eventual consistent fast path, tokens are not updated immediately The problem with the previous solution was that it would first add new tokens and cap it to the capacity. With the eventually consistency logic in AsyncTokenBucket, that's a wrong solution. I'll provide a separate explanation for this in later updates. |
Fixes #23920
Motivation
The current rate limiting implementation in Pulsar broker can cause connection timeouts when the state of the AsyncTokenBucket becomes invalid due to clock source issues in some virtualized environments where
System.nanoTime()
isn't strictly monotonic and consistent across multiple CPUs.Here's a Gen AI summary of various issues (might not be accurate)
If anyone is interested, https://docs.kernel.org/virt/kvm/x86/timekeeping.html contains low level details about timekeeping, however some information might be outdated.
This PR addresses problems when
System.nanoTime()
isn't strictly monotonic and consistent across multiple CPUs.Modifications
AsyncTokenBucket improvements:
DefaultMonotonicSnapshotClock enhancements:
Rate limiter implementations:
Added test coverage:
New AsyncTokenBucketTest cases:
New DefaultMonotonicSnapshotClockTest:
AsyncTokenBucketBenchmark with JMH performs very well (test results with Apple M3 Max)
With 100 threads, it can achieve 2470 million ops/s.
Results with Dell XPS 15 7590 i9-9980HK on Linux
With 100 threads, 497 million ops/s.
Documentation
doc
doc-required
doc-not-needed
doc-complete