Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Fix rate limiter token bucket and clock consistency issues causing excessive throttling and connection timeouts #23930

Merged
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
cccc564
[fix][broker] Fix rate limiting causing connections to time out
lhotari Feb 5, 2025
79c9cce
Fix test
lhotari Feb 5, 2025
9fa43a9
Add test that checks that clock leaping backward or forward would be …
lhotari Feb 5, 2025
f28af85
Improve DefaultMonotonicSnapshotClock so that requests don't get dela…
lhotari Feb 5, 2025
a99fe0c
Also test with small offsets
lhotari Feb 5, 2025
9072713
Improve test case
lhotari Feb 5, 2025
f719dbc
Use JMH blackhole in test
lhotari Feb 5, 2025
c60c4b8
Improve code coverage
lhotari Feb 6, 2025
696f9fb
Refactor: Split out logic for leap detection and monotonic tick updating
lhotari Feb 6, 2025
d6d60f7
Remove invalid test
lhotari Feb 6, 2025
0886054
Revert "Remove invalid test"
lhotari Feb 6, 2025
f4feda1
Add test mode to DefaultMonotonicSnapshotClock so that thread updates…
lhotari Feb 6, 2025
13aa2f7
Use test mode to fix test
lhotari Feb 6, 2025
8b36d71
Improve testability
lhotari Feb 6, 2025
d9fb30f
Add failing test case
lhotari Feb 6, 2025
8c851e2
Fix test
lhotari Feb 6, 2025
846cc89
Add failing test case for the negative tokens case
lhotari Feb 6, 2025
58f8b35
Don't handle leaps forward since those cannot be detected properly
lhotari Feb 6, 2025
91ae4c6
Remove separate test mode since it's not needed
lhotari Feb 7, 2025
487419e
Fix parameter
lhotari Feb 7, 2025
571b2c2
Enable batching of update requests by using a AtomicLong for the requ…
lhotari Feb 7, 2025
532998d
Removing remaining parts of the test mode
lhotari Feb 7, 2025
a21eee3
Remove synchronization from tick updater since the usual path is sing…
lhotari Feb 7, 2025
eb82ab9
Improve comments
lhotari Feb 7, 2025
d1dd4b7
Reorder methods
lhotari Feb 7, 2025
d5ea482
Improve javadoc
lhotari Feb 7, 2025
b19e0de
Reduce excessive logging in test
lhotari Feb 7, 2025
b46fbab
Reduce duplication in JMH test
lhotari Feb 7, 2025
73c2b8c
Add JMH benchmark for DefaultMonotonicSnapshotClock
lhotari Feb 7, 2025
a22da98
Add more instructions for running JMH benchmarks
lhotari Feb 7, 2025
4287838
Add unit tests for leap detection
lhotari Feb 7, 2025
12cb400
Move updating of requestCount outside of the synchronization blocks t…
lhotari Feb 7, 2025
31a1b30
Fix typo and improve comment
lhotari Feb 7, 2025
6e4a94f
Optimize token calculation performance and correctness
lhotari Feb 7, 2025
7463891
Improve eventual consistency test
lhotari Feb 7, 2025
b04752d
If condition when newTokens should be 0
lhotari Feb 7, 2025
ea4f7d5
Improve logic to update lastNanos so that races are prevented with CAS
lhotari Feb 7, 2025
ca7cae4
Add "getTokensUpdatesTokens" to AsyncTokenBucket to reduce eventual c…
lhotari Feb 7, 2025
543c978
Reduce test flakiness
lhotari Feb 7, 2025
d159198
Let MessageDispatchThrottlingTest#reset handle deletion
lhotari Feb 7, 2025
936cdb0
Reduce test flakiness for waiting to new rate to be applied
lhotari Feb 7, 2025
dae8220
Prevent NPEs in DispatchRateLimiter when limit has changed
lhotari Feb 7, 2025
d62f97e
Fix switchToConsistentTokensView behavior
lhotari Feb 7, 2025
1d68522
Revert using getTokensUpdatesTokens mode by default since eventual co…
lhotari Feb 7, 2025
38e9f7b
Rename getTokensUpdatesTokens to consistentTokensView
lhotari Feb 7, 2025
f5c3e57
Use consistent tokens view for SubscribeRateLimiter
lhotari Feb 7, 2025
b79ed85
Fix issue with restartBroker in tests
lhotari Feb 7, 2025
5910b3c
Ignore metadata change when broker isn't running
lhotari Feb 7, 2025
fda2337
Move dispatch throttling tests to broker-api group
lhotari Feb 7, 2025
759fafe
Use AssertJ for better error message
lhotari Feb 7, 2025
5e0a327
Improve test cleanup for retries
lhotari Feb 7, 2025
f024203
Use unique namespaces
lhotari Feb 7, 2025
9277208
Extract common base class to avoid test duplication
lhotari Feb 7, 2025
96781e9
Reduce flakiness
lhotari Feb 7, 2025
234cfc9
Refactor common config
lhotari Feb 7, 2025
8ef12ca
Fix flakiness
lhotari Feb 7, 2025
68ba451
Move MessagePublishThrottlingTest to broker-api test group
lhotari Feb 7, 2025
2d99778
Fix issue in lookupUrl change in test class
lhotari Feb 7, 2025
c67ceb0
Revisit startBroker method in test base class
lhotari Feb 7, 2025
6de794b
Revisit logic one more time in test class
lhotari Feb 7, 2025
f9aab2e
Refactor consistency settings in AsyncTokenBucket and add Javadocs
lhotari Feb 8, 2025
e4f4689
Attempt to fix flaky test MessageDispatchThrottlingTest
lhotari Feb 8, 2025
42fb876
Use consistent tokens view in flaky RGUsageMTAggrWaitForAllMsgsTest
lhotari Feb 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.annotations.Threads;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

@Fork(3)
@BenchmarkMode(Mode.Throughput)
Expand All @@ -59,23 +60,26 @@ public void teardown() {
@Benchmark
@Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
@Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
public void consumeTokensBenchmark001Threads() {
public void consumeTokensBenchmark001Threads(Blackhole blackhole) {
asyncTokenBucket.consumeTokens(1);
blackhole.consume(asyncTokenBucket.getTokens());
}

@Threads(10)
@Benchmark
@Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
@Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
public void consumeTokensBenchmark010Threads() {
public void consumeTokensBenchmark010Threads(Blackhole blackhole) {
asyncTokenBucket.consumeTokens(1);
blackhole.consume(asyncTokenBucket.getTokens());
}

@Threads(100)
@Benchmark
@Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
@Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
public void consumeTokensBenchmark100Threads() {
public void consumeTokensBenchmark100Threads(Blackhole blackhole) {
asyncTokenBucket.consumeTokens(1);
blackhole.consume(asyncTokenBucket.getTokens());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,16 +159,15 @@ private long consumeTokensAndMaybeUpdateTokensBalance(long consumeTokens, boolea
if (shouldUpdateTokensImmediately(currentNanos, forceUpdateTokens)) {
// calculate the number of new tokens since the last update
long newTokens = calculateNewTokensSinceLastUpdate(currentNanos);
// calculate the total amount of tokens to consume in this update
// flush the pendingConsumedTokens by calling "sumThenReset"
long totalConsumedTokens = consumeTokens + pendingConsumedTokens.sumThenReset();
long currentPendingConsumedTokens = pendingConsumedTokens.sumThenReset();
// update the tokens and return the current token value
return TOKENS_UPDATER.updateAndGet(this,
currentTokens ->
// after adding new tokens, limit the tokens to the capacity
Math.min(currentTokens + newTokens, getCapacity())
// subtract the consumed tokens
- totalConsumedTokens);
// after adding new tokens subtract the pending consumed tokens and
// limit the tokens to the capacity of the bucket
currentTokens -> Math.min(currentTokens + newTokens - currentPendingConsumedTokens, getCapacity())
// subtract the consumed tokens
- consumeTokens);
} else {
// eventual consistent fast path, tokens are not updated immediately

Expand Down Expand Up @@ -211,8 +210,10 @@ private boolean shouldUpdateTokensImmediately(long currentNanos, boolean forceUp
*/
private long calculateNewTokensSinceLastUpdate(long currentNanos) {
long newTokens;
long previousLastNanos = LAST_NANOS_UPDATER.getAndSet(this, currentNanos);
if (previousLastNanos == 0) {
// update lastNanos if currentNanos is greater than the current lastNanos
long previousLastNanos = LAST_NANOS_UPDATER.getAndUpdate(this,
currentLastNanos -> Math.max(currentNanos, currentLastNanos));
if (previousLastNanos == 0 || previousLastNanos >= currentNanos) {
newTokens = 0;
} else {
long durationNanos = currentNanos - previousLastNanos + REMAINDER_NANOS_UPDATER.getAndSet(this, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,17 @@
*
* Starts a daemon thread that updates the snapshot value periodically with a configured interval. The close method
* should be called to stop the thread.
* A single thread is used to update the monotonic clock value so that the snapshot value is always increasing,
* even if the clock source is not strictly monotonic across all CPUs. This might be the case in some virtualized
* environments.
*/
public class DefaultMonotonicSnapshotClock implements MonotonicSnapshotClock, AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(DefaultMonotonicSnapshotClock.class);
private final long sleepMillis;
private final int sleepNanos;
private final LongSupplier clockSource;
private final Thread thread;
private final TickUpdaterThread tickUpdaterThread;
private final long snapshotIntervalNanos;
private volatile long snapshotTickNanos;

public DefaultMonotonicSnapshotClock(long snapshotIntervalNanos, LongSupplier clockSource) {
Expand All @@ -45,45 +49,170 @@ public DefaultMonotonicSnapshotClock(long snapshotIntervalNanos, LongSupplier cl
this.sleepMillis = TimeUnit.NANOSECONDS.toMillis(snapshotIntervalNanos);
this.sleepNanos = (int) (snapshotIntervalNanos - TimeUnit.MILLISECONDS.toNanos(sleepMillis));
this.clockSource = clockSource;
updateSnapshotTickNanos();
thread = new Thread(this::snapshotLoop, getClass().getSimpleName() + "-update-loop");
thread.setDaemon(true);
thread.start();
this.snapshotIntervalNanos = snapshotIntervalNanos;
tickUpdaterThread = new TickUpdaterThread();
tickUpdaterThread.start();
}

/** {@inheritDoc} */
@Override
public long getTickNanos(boolean requestSnapshot) {
if (requestSnapshot) {
updateSnapshotTickNanos();
tickUpdaterThread.requestUpdate();
}
return snapshotTickNanos;
}

private void updateSnapshotTickNanos() {
snapshotTickNanos = clockSource.getAsLong();
}
/**
* A thread that updates snapshotTickNanos value periodically with a configured interval.
* The thread is started when the DefaultMonotonicSnapshotClock is created and runs until the close method is
* called.
* A single thread is used to read the clock source value since on some hardware of virtualized platforms,
* System.nanoTime() isn't strictly monotonic across all CPUs. Reading by a single thread will improve the
* stability of the read value since a single thread is scheduled on a single CPU. If the thread is migrated
* to another CPU, the clock source value might leap backward or forward, but logic in this class will handle it.
*/
private class TickUpdaterThread extends Thread {
private final Object tickUpdateDelayMonitor = new Object();
private final Object tickUpdatedMonitor = new Object();
private final long maxDelta;
private long referenceClockSourceValue;
private long baseSnapshotTickNanos;
private long previousSnapshotTickNanos;
private volatile boolean running;
private boolean tickUpdateDelayMonitorNotified;
private long requestCount;

TickUpdaterThread() {
super(DefaultMonotonicSnapshotClock.class.getSimpleName() + "-update-loop");
// set as daemon thread so that it doesn't prevent the JVM from exiting
setDaemon(true);
// set the highest priority
setPriority(MAX_PRIORITY);
this.maxDelta = 2 * snapshotIntervalNanos;
}

@Override
public void run() {
try {
running = true;
long updatedForRequestCount = -1;
while (!isInterrupted()) {
try {
boolean snapshotRequested = false;
// sleep for the configured interval on a monitor that can be notified to stop the sleep
// and update the tick value immediately. This is used in requestUpdate method.
synchronized (tickUpdateDelayMonitor) {
tickUpdateDelayMonitorNotified = false;
// only wait if no explicit request has been made since the last update
if (requestCount == updatedForRequestCount) {
// if no request has been made, sleep for the configured interval
tickUpdateDelayMonitor.wait(sleepMillis, sleepNanos);
snapshotRequested = tickUpdateDelayMonitorNotified;
}
updatedForRequestCount = requestCount;
}
updateSnapshotTickNanos(snapshotRequested);
notifyAllTickUpdated();
} catch (InterruptedException e) {
interrupt();
break;
}
}
} catch (Throwable t) {
// report unexpected error since this would be a fatal error when the clock doesn't progress anymore
// this is very unlikely to happen, but it's better to log it in any case
LOG.error("Unexpected fatal error that stopped the clock.", t);
} finally {
LOG.info("DefaultMonotonicSnapshotClock's TickUpdaterThread stopped. {},tid={}", this, getId());
running = false;
notifyAllTickUpdated();
}
}

private void updateSnapshotTickNanos(boolean snapshotRequested) {
lhotari marked this conversation as resolved.
Show resolved Hide resolved
long clockValue = clockSource.getAsLong();

// Initialization
if (referenceClockSourceValue == 0) {
referenceClockSourceValue = clockValue;
baseSnapshotTickNanos = clockValue;
snapshotTickNanos = clockValue;
previousSnapshotTickNanos = clockValue;
return;
}

// calculate the duration since the reference clock source value
// so that the snapshot value is always increasing and tolerates it when the clock source is not strictly
// monotonic across all CPUs and leaps backward or forward
long durationSinceReference = clockValue - referenceClockSourceValue;
long newSnapshotTickNanos = baseSnapshotTickNanos + durationSinceReference;

// reset the reference clock source value if the clock source value leaps backward or forward
if (newSnapshotTickNanos < previousSnapshotTickNanos - maxDelta
lhotari marked this conversation as resolved.
Show resolved Hide resolved
|| newSnapshotTickNanos > previousSnapshotTickNanos + maxDelta) {
lhotari marked this conversation as resolved.
Show resolved Hide resolved
referenceClockSourceValue = clockValue;
baseSnapshotTickNanos = previousSnapshotTickNanos;
if (!snapshotRequested) {
// if the snapshot value is not requested, increment by the snapshot interval
baseSnapshotTickNanos += snapshotIntervalNanos;
lhotari marked this conversation as resolved.
Show resolved Hide resolved
}
newSnapshotTickNanos = baseSnapshotTickNanos;
}

// update snapshotTickNanos value if the new value is greater than the previous value
if (newSnapshotTickNanos > previousSnapshotTickNanos) {
snapshotTickNanos = newSnapshotTickNanos;
// store into a field so that we don't need to do a volatile read to find out the previous value
previousSnapshotTickNanos = newSnapshotTickNanos;
}
}

private void notifyAllTickUpdated() {
synchronized (tickUpdatedMonitor) {
// notify all threads that are waiting for the tick value to be updated
tickUpdatedMonitor.notifyAll();
}
}

public void requestUpdate() {
if (!running) {
// thread has stopped running, fallback to update the value directly without any optimizations
snapshotTickNanos = clockSource.getAsLong();
return;
}
synchronized (tickUpdatedMonitor) {
// notify the thread to stop waiting and update the tick value
synchronized (tickUpdateDelayMonitor) {
tickUpdateDelayMonitorNotified = true;
requestCount++;
tickUpdateDelayMonitor.notify();
}
// wait until the tick value has been updated
try {
tickUpdatedMonitor.wait();
} catch (InterruptedException e) {
currentThread().interrupt();
}
}
}

private void snapshotLoop() {
try {
while (!Thread.currentThread().isInterrupted()) {
updateSnapshotTickNanos();
@Override
public synchronized void start() {
super.start();
// wait until the thread is started and the tick value has been updated
synchronized (tickUpdatedMonitor) {
try {
Thread.sleep(sleepMillis, sleepNanos);
tickUpdatedMonitor.wait();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
currentThread().interrupt();
}
}
} catch (Throwable t) {
// report unexpected error since this would be a fatal error when the clock doesn't progress anymore
// this is very unlikely to happen, but it's better to log it in any case
LOG.error("Unexpected fatal error that stopped the clock.", t);
}
}

@Override
public void close() {
thread.interrupt();
tickUpdaterThread.interrupt();
lhotari marked this conversation as resolved.
Show resolved Hide resolved
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,19 @@
package org.apache.pulsar.broker.service;

import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.EventLoopGroup;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.qos.AsyncTokenBucket;
import org.apache.pulsar.broker.qos.MonotonicSnapshotClock;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.jctools.queues.MessagePassingQueue;
import org.jctools.queues.MpscUnboundedArrayQueue;

@Slf4j
public class PublishRateLimiterImpl implements PublishRateLimiter {
private volatile AsyncTokenBucket tokenBucketOnMessage;
private volatile AsyncTokenBucket tokenBucketOnByte;
Expand Down Expand Up @@ -80,7 +81,7 @@ private void scheduleDecrementThrottleCount(Producer producer) {
// schedule unthrottling when the throttling count is incremented to 1
// this is to avoid scheduling unthrottling multiple times for concurrent producers
if (throttledProducersCount.incrementAndGet() == 1) {
EventLoopGroup executor = producer.getCnx().getBrokerService().executor();
ScheduledExecutorService executor = producer.getCnx().getBrokerService().executor().next();
scheduleUnthrottling(executor, calculateThrottlingDurationNanos());
}
}
Expand Down Expand Up @@ -134,7 +135,11 @@ private void unthrottleQueuedProducers(ScheduledExecutorService executor) {
// unthrottle as many producers as possible while there are token available
while ((throttlingDuration = calculateThrottlingDurationNanos()) == 0L
&& (producer = unthrottlingQueue.poll()) != null) {
producer.decrementThrottleCount();
try {
producer.decrementThrottleCount();
} catch (Exception e) {
log.error("Failed to unthrottle producer {}", producer, e);
lhotari marked this conversation as resolved.
Show resolved Hide resolved
}
throttledProducersCount.decrementAndGet();
}
// if there are still producers to be unthrottled, schedule unthrottling again
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.qos.AsyncTokenBucket;
import org.apache.pulsar.broker.qos.MonotonicSnapshotClock;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
Expand Down Expand Up @@ -216,18 +217,22 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) {
long msgRate = dispatchRate.getDispatchThrottlingRateInMsg();
long byteRate = dispatchRate.getDispatchThrottlingRateInByte();
long ratePeriodNanos = TimeUnit.SECONDS.toNanos(Math.max(dispatchRate.getRatePeriodInSecond(), 1));
MonotonicSnapshotClock clock = brokerService.getPulsar().getMonotonicSnapshotClock();

// update msg-rateLimiter
if (msgRate > 0) {
if (dispatchRate.isRelativeToPublishRate()) {
this.dispatchRateLimiterOnMessage =
AsyncTokenBucket.builderForDynamicRate()
.clock(clock)
.rateFunction(() -> getRelativeDispatchRateInMsg(dispatchRate))
.ratePeriodNanosFunction(() -> ratePeriodNanos)
.build();
} else {
this.dispatchRateLimiterOnMessage =
AsyncTokenBucket.builder().rate(msgRate).ratePeriodNanos(ratePeriodNanos)
AsyncTokenBucket.builder()
.clock(clock)
.rate(msgRate).ratePeriodNanos(ratePeriodNanos)
.build();
}
} else {
Expand All @@ -239,12 +244,15 @@ public synchronized void updateDispatchRate(DispatchRate dispatchRate) {
if (dispatchRate.isRelativeToPublishRate()) {
this.dispatchRateLimiterOnByte =
AsyncTokenBucket.builderForDynamicRate()
.clock(clock)
.rateFunction(() -> getRelativeDispatchRateInByte(dispatchRate))
.ratePeriodNanosFunction(() -> ratePeriodNanos)
.build();
} else {
this.dispatchRateLimiterOnByte =
AsyncTokenBucket.builder().rate(byteRate).ratePeriodNanos(ratePeriodNanos)
AsyncTokenBucket.builder()
.clock(clock)
.rate(byteRate).ratePeriodNanos(ratePeriodNanos)
.build();
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ private synchronized void updateSubscribeRate(ConsumerIdentifier consumerIdentif
// update subscribe-rateLimiter
if (ratePerConsumer > 0) {
AsyncTokenBucket tokenBucket =
AsyncTokenBucket.builder().rate(ratePerConsumer).ratePeriodNanos(ratePeriodNanos).build();
AsyncTokenBucket.builder()
.clock(brokerService.getPulsar().getMonotonicSnapshotClock())
.rate(ratePerConsumer).ratePeriodNanos(ratePeriodNanos).build();
this.subscribeRateLimiter.put(consumerIdentifier, tokenBucket);
} else {
// subscribe-rate should be disable and close
Expand Down
Loading
Loading