Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix rate limiting causing connections to time out #23930

Open
wants to merge 18 commits into
base: master
Choose a base branch
from

Conversation

lhotari
Copy link
Member

@lhotari lhotari commented Feb 5, 2025

Fixes #23920

Motivation

The current rate limiting implementation in Pulsar broker can cause connection timeouts when the state of the AsyncTokenBucket becomes invalid due to clock source issues in some virtualized environments where System.nanoTime() isn't strictly monotonic and consistent across multiple CPUs.

Here's a Gen AI summary of various issues (might not be accurate)

Virtualization Timing Issues Summary

KVM

  • Affected by TSC frequency differences between CPUs
  • Live migration between different CPU models can cause time jumps
  • Power management features can affect TSC stability
  • Provides kvmclock as a more stable alternative
  • Modern versions handle timing better with TSC scaling

VMware

  • Historical issues with System.nanoTime() during guest/host time sync
  • Can experience jumps during vMotion (live migration)
  • Time discontinuities possible during suspend/resume
  • Newer versions have improved clock sources and time keeping
  • Can be affected by host TSC frequency changes

Xen

  • Paravirtualization can affect clock source stability
  • Host/guest time synchronization can cause jumps
  • Migration between hosts can affect timing
  • Can have issues with different CPU architectures/frequencies
  • Provides different clock source options for guests

Common Issues Across All Platforms

  • Live migration between different CPU models/frequencies
  • Host/guest time synchronization effects
  • Suspend/resume operations can cause time discontinuities
  • Power management features can affect timing stability
  • Need for careful testing in timing-sensitive applications

Best Practices

  • Test timing behavior in your specific virtualization environment
  • Consider using platform-specific stable clock sources
  • Be prepared to handle time discontinuities in critical applications
  • Ensure consistent CPU configurations when possible
  • Monitor and validate timing in production environments

If anyone is interested, https://docs.kernel.org/virt/kvm/x86/timekeeping.html contains low level details about timekeeping, however some information might be outdated.

This PR addresses problems when System.nanoTime() isn't strictly monotonic and consistent across multiple CPUs.

Modifications

  1. AsyncTokenBucket improvements:

    • Improved token calculation logic to prevent clock leaps backwards causing invalid state
    • Improved token consumption logic while adding more test cases for the class (unrelated to the actual problem) to handle negative balance with eventual consistency:
      • subtract pendingConsumedToken from new tokens before capping at bucket capacity
  2. DefaultMonotonicSnapshotClock enhancements:

    • Reimplemented using a single dedicated thread to reduce chances for clock leaps since a single CPU would be used in most cases unless the OS migrates the thread to run to another CPU
    • Added logic to handle clock source leaps (backward/forward) gracefully
    • Added required solution for use case where the consistent value of the clock is requested
      • This is not on the hot path, so synchronization and Object monitor wait/notify/notifyAll provides a feasible solution.
  3. Rate limiter implementations:

    • Updated PublishRateLimiterImpl to use proper executor for unthrottling (EventLoopGroup will pick a new scheduler each time)
    • Added error handling for producer unthrottling
    • Modified DispatchRateLimiter and SubscribeRateLimiter to use the monotonic clock instance which handles the clock source issues

Added test coverage:

  1. New AsyncTokenBucketTest cases:

    • Fraction handling and leftover token retention
    • Negative balance with eventual consistency
    • Token bucket size limits
    • Clock source lag scenarios
  2. New DefaultMonotonicSnapshotClockTest:

    • Time leap handling (backward/forward)
    • Snapshot request validation

AsyncTokenBucketBenchmark with JMH performs very well (test results with Apple M3 Max)

Benchmark                                                    Mode  Cnt           Score            Error  Units
Benchmark                                                    Mode  Cnt           Score          Error  Units
AsyncTokenBucketBenchmark.consumeTokensBenchmark001Threads  thrpt    3   229855966.880 ± 22973513.211  ops/s
AsyncTokenBucketBenchmark.consumeTokensBenchmark010Threads  thrpt    3  2089791132.070 ± 66282218.899  ops/s
AsyncTokenBucketBenchmark.consumeTokensBenchmark100Threads  thrpt    3  2470109163.780 ± 69435765.779  ops/s

With 100 threads, it can achieve 2470 million ops/s.

Results with Dell XPS 15 7590 i9-9980HK on Linux

Benchmark                                                    Mode  Cnt          Score           Error  Units
AsyncTokenBucketBenchmark.consumeTokensBenchmark001Threads  thrpt    3   57236511.585 ±   1382872.930  ops/s
AsyncTokenBucketBenchmark.consumeTokensBenchmark010Threads  thrpt    3  400928422.190 ± 273973383.460  ops/s
AsyncTokenBucketBenchmark.consumeTokensBenchmark100Threads  thrpt    3  497853221.317 ± 110878844.019  ops/s

With 100 threads, 497 million ops/s.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@codecov-commenter
Copy link

codecov-commenter commented Feb 5, 2025

Codecov Report

Attention: Patch coverage is 90.08264% with 12 lines in your changes missing coverage. Please review.

Project coverage is 74.27%. Comparing base (bbc6224) to head (58f8b35).
Report is 883 commits behind head on master.

Files with missing lines Patch % Lines
...lsar/broker/qos/DefaultMonotonicSnapshotClock.java 89.79% 8 Missing and 2 partials ⚠️
.../pulsar/broker/service/PublishRateLimiterImpl.java 66.66% 2 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #23930      +/-   ##
============================================
+ Coverage     73.57%   74.27%   +0.69%     
+ Complexity    32624    32272     -352     
============================================
  Files          1877     1853      -24     
  Lines        139502   143832    +4330     
  Branches      15299    16345    +1046     
============================================
+ Hits         102638   106826    +4188     
+ Misses        28908    28610     -298     
- Partials       7956     8396     +440     
Flag Coverage Δ
inttests 26.74% <46.28%> (+2.16%) ⬆️
systests 23.24% <46.28%> (-1.08%) ⬇️
unittests 73.78% <90.08%> (+0.94%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...org/apache/pulsar/broker/qos/AsyncTokenBucket.java 93.05% <100.00%> (+0.09%) ⬆️
...broker/service/persistent/DispatchRateLimiter.java 79.83% <100.00%> (+1.20%) ⬆️
...roker/service/persistent/SubscribeRateLimiter.java 55.69% <100.00%> (+1.15%) ⬆️
.../pulsar/broker/service/PublishRateLimiterImpl.java 91.30% <66.66%> (+5.15%) ⬆️
...lsar/broker/qos/DefaultMonotonicSnapshotClock.java 90.00% <89.79%> (+6.66%) ⬆️

... and 1035 files with indirect coverage changes

}
}

@Override
public void close() {
thread.interrupt();
tickUpdaterThread.interrupt();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we terminate while loop with some variable stop and update it here. instead of depending on interrupting as it can lead leak if one changes the interrupt logic in future in previous method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that there's a problem in using .interrupt() in this case. Internally, interrupt is a flag in the Thread class.

baseSnapshotTickNanos = previousSnapshotTickNanos;
if (!snapshotRequested) {
// if the snapshot value is not requested, increment by the snapshot interval
baseSnapshotTickNanos += snapshotIntervalNanos;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here, we assume that updateSnapshotTickNanos gets invoked at every snapshotIntervalNanos time so, we are adding it to baseSnapshotTickNanos.
instead of if we pass snapshotIntervalNanos as a variable to this method then method could be less error prone and can be also used in unit-test if needed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll refactor out the separate concern to detect leaps in the clock source values and normalize the output values to a separate class since that could help unit test it separately.

long newSnapshotTickNanos = baseSnapshotTickNanos + durationSinceReference;

// reset the reference clock source value if the clock source value leaps backward or forward
if (newSnapshotTickNanos < previousSnapshotTickNanos - maxDelta
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now, as we are anyway, using single thread update, then can't we just avoid this complex check, and just directly use System.nanoTime() retrieving from the single CPU and use that incremented count?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A single thread could get migrated from one CPU to another. In certain virtualized environments, the counter can leap forward or backward. It can also happen on real multi-core/multi-socket hardware with TSC clock source when the hardware doesn't support CPU instructions which are required to address challenges in synchronizing the TSC values across CPUs. Some of the low level details is explained in "TSC hardware".

The purpose of this class is to address any such problems where the time leap forward or backward and removing this logic would make this solution pointless.

Although this logic might seem complex, the problem is also a very low level problem. The logic is performant and has been validated with JMH benchmarks.


// reset the reference clock source value if the clock source value leaps backward or forward
if (newSnapshotTickNanos < previousSnapshotTickNanos - maxDelta
|| newSnapshotTickNanos > previousSnapshotTickNanos + maxDelta) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, can we document what exactly we are doing with baseSnapshotTickNanos to explain the logic here as it's little hard to understand what is happening here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll add some more comments to describe the logic.

try {
producer.decrementThrottleCount();
} catch (Exception e) {
log.error("Failed to unthrottle producer {}", producer, e);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worth to add topic name here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The topic name is already included in producer's toString() method implementation.

public String toString() {
return MoreObjects.toStringHelper(this).add("topic", topic).add("client", cnx.toString())
.add("producerName", producerName).add("producerId", producerId).toString();
}

}
}

private void updateSnapshotTickNanos(boolean snapshotRequested) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be great if we can add unit-test for this method and try to get max code-coverage from unit-test as we can see high cyclomatic complexity in this method code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a few tests and the line coverage is now 97% (73/75) and branch coverage is 90% (18/20) with the current DefaultMonotonicSnapshotClock. There's only 2 lines and 2 branches that aren't covered which are lines 196 and 209 where there's handling for InterruptedException. Adding test coverage for those lines isn't reasonable, so this means that max code coverage is reached.

Based on metrics, the complexity of the of method code isn't high since methods are fairly small.

image

for example, PersistentTopic contains methods which have high complexity according to metrics:
image

Metrics calculated with IntelliJ plugin "MetricsReloaded"

@lhotari
Copy link
Member Author

lhotari commented Feb 6, 2025

Although clock sources aren't necessarily monotonic in all platforms, it seems that the root cause of this issue is could be different.
The DefaultMonotonicSnapshotClock was originally added to address a performance bottleneck in calling System.nanoTime() on MacOS. This wasn't a significant bottleneck on Linux platform. The solution was to update the value in a single thread and add eventual consistency to the rate limiter. The problem seen in issue #23920 could occur when this thread gets starved and the updates are delayed. This PR already contains 2 mitigations to this problem by modifying the thread priority to MAX_PRIORITY and by modifying the logic how new tokens are added to the bucket when the token update happens.
However, I'll need to find a solution where the problem would be properly avoided without adding contention since contention would cause the rate limiting solution to have a relatively high overhead.

@lhotari
Copy link
Member Author

lhotari commented Feb 6, 2025

Although clock sources aren't necessarily monotonic in all platforms, it seems that the root cause of this issue is could be different. The DefaultMonotonicSnapshotClock was originally added to address a performance bottleneck in calling System.nanoTime() on MacOS. This wasn't a significant bottleneck on Linux platform. The solution was to update the value in a single thread and add eventual consistency to the rate limiter. The problem seen in issue #23920 could occur when this thread gets starved and the updates are delayed. This PR already contains 2 mitigations to this problem by modifying the thread priority to MAX_PRIORITY and by modifying the logic how new tokens are added to the bucket when the token update happens. However, I'll need to find a solution where the problem would be properly avoided without adding contention since contention would cause the rate limiting solution to have a relatively high overhead.

I found a solution to the problem. It's possible to detect leaps backwards, but forward leaps should be unhandled since the tick updater thread could starve and time should be able to leap forward in that case when it finally gets updated.
Time leaping forward would cause issues with the changes in this PR. Before the changes made in this PR, there was an issue which caused negative tokens. I'll continue adding more tests to describe the scenario.

My assumption currently is that the minimal fix for #23920 is this diff:

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java
index ac9a1f03e59..36edb1c5d8a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/qos/AsyncTokenBucket.java
@@ -159,16 +159,15 @@ public abstract class AsyncTokenBucket {
         if (shouldUpdateTokensImmediately(currentNanos, forceUpdateTokens)) {
             // calculate the number of new tokens since the last update
             long newTokens = calculateNewTokensSinceLastUpdate(currentNanos);
-            // calculate the total amount of tokens to consume in this update
             // flush the pendingConsumedTokens by calling "sumThenReset"
-            long totalConsumedTokens = consumeTokens + pendingConsumedTokens.sumThenReset();
+            long currentPendingConsumedTokens = pendingConsumedTokens.sumThenReset();
             // update the tokens and return the current token value
             return TOKENS_UPDATER.updateAndGet(this,
-                    currentTokens ->
-                            // after adding new tokens, limit the tokens to the capacity
-                            Math.min(currentTokens + newTokens, getCapacity())
-                                    // subtract the consumed tokens
-                                    - totalConsumedTokens);
+                    // after adding new tokens subtract the pending consumed tokens and
+                    // limit the tokens to the capacity of the bucket
+                    currentTokens -> Math.min(currentTokens + newTokens - currentPendingConsumedTokens, getCapacity())
+                            // subtract the consumed tokens
+                            - consumeTokens);
         } else {
             // eventual consistent fast path, tokens are not updated immediately

The problem with the previous solution was that it would first add new tokens and cap it to the capacity. With the eventually consistency logic in AsyncTokenBucket, that's a wrong solution. I'll provide a separate explanation for this in later updates.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Bug] Producers getting timed out intermittently
3 participants