Skip to content

Commit

Permalink
bucket4j#501 fix bug in merging multiple tryConsume(1) int tryConsume…
Browse files Browse the repository at this point in the history
…AsMatchAsPossible
  • Loading branch information
Vladimir Buhtoyarov committed Jun 21, 2024
1 parent 3027f6b commit ee37f58
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,8 @@ public int getMergedCommandsCount() {

@Override
public CommandResult<?> unwrapOneResult(Long consumedTokens, int indice) {
boolean wasConsumed = indice <= consumedTokens;
return CommandResult.success(wasConsumed, BOOLEAN_HANDLE);
boolean wasConsumed = indice < consumedTokens;
return wasConsumed ? CommandResult.TRUE : CommandResult.FALSE;
}

public void setMerged(boolean merged) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import io.github.bucket4j.TokensInheritanceStrategy
import io.github.bucket4j.distributed.proxy.ProxyManager
import io.github.bucket4j.distributed.proxy.optimization.DefaultOptimizationListener
import io.github.bucket4j.distributed.proxy.optimization.Optimization
import io.github.bucket4j.distributed.proxy.optimization.Optimizations
import io.github.bucket4j.distributed.remote.MultiResult
import io.github.bucket4j.distributed.remote.RemoteCommand
import io.github.bucket4j.distributed.remote.Request
Expand All @@ -21,6 +22,8 @@ import java.time.Duration
import java.util.concurrent.Callable
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicLong
import java.util.function.Supplier

import static org.junit.jupiter.api.Assertions.fail

Expand All @@ -36,7 +39,7 @@ class BatchingCommandExecutorSpecification extends Specification {
setup:
DefaultOptimizationListener listener = new DefaultOptimizationListener();
ProxyManagerMock proxyManager = new ProxyManagerMock(clock)
Bucket bucket = createBucket(verbose, proxyManager, listener)
Bucket bucket = createBucket(versioned, proxyManager, listener)
when:
bucket.getAvailableTokens()
Expand Down Expand Up @@ -78,7 +81,7 @@ class BatchingCommandExecutorSpecification extends Specification {
setup:
DefaultOptimizationListener listener = new DefaultOptimizationListener();
ProxyManagerMock proxyManager = new ProxyManagerMock(clock)
Bucket bucket = createBucket(verbose, proxyManager, listener)
Bucket bucket = createBucket(versioned, proxyManager, listener)
when:
bucket.getAvailableTokens()
Expand Down Expand Up @@ -124,7 +127,7 @@ class BatchingCommandExecutorSpecification extends Specification {
setup:
DefaultOptimizationListener listener = new DefaultOptimizationListener();
ProxyManagerMock proxyManager = new ProxyManagerMock(clock)
Bucket bucket = createBucket(verbose, proxyManager, listener)
Bucket bucket = createBucket(versioned, proxyManager, listener)
when:
bucket.getAvailableTokens()
Expand Down Expand Up @@ -170,7 +173,7 @@ class BatchingCommandExecutorSpecification extends Specification {
setup:
DefaultOptimizationListener listener = new DefaultOptimizationListener();
ProxyManagerMock proxyManager = new ProxyManagerMock(clock)
Bucket bucket = createBucket(verbose, proxyManager, listener)
Bucket bucket = createBucket(versioned, proxyManager, listener)
when:
bucket.getAvailableTokens()
Expand Down Expand Up @@ -220,7 +223,7 @@ class BatchingCommandExecutorSpecification extends Specification {
when:
DefaultOptimizationListener listener = new DefaultOptimizationListener();
ProxyManagerMock proxyManager = new ProxyManagerMock(clock)
Bucket bucket = createBucket(verbose, proxyManager, listener)
Bucket bucket = createBucket(versioned, proxyManager, listener)
bucket.getAvailableTokens()
proxyManager.blockExecution()
Expand Down Expand Up @@ -287,9 +290,9 @@ class BatchingCommandExecutorSpecification extends Specification {
return future
}
Bucket createBucket(boolean verbose, ProxyManager proxyManager, DefaultOptimizationListener listener) {
Bucket createBucket(boolean versioned, ProxyManager proxyManager, DefaultOptimizationListener listener) {
Optimization optimization = new BatchingOptimization(listener)
if (verbose) {
if (versioned) {
return proxyManager.builder()
.withOptimization(optimization)
.build(1L, configuration)
Expand All @@ -301,4 +304,59 @@ class BatchingCommandExecutorSpecification extends Specification {
}
}
@Unroll
def "#n regression test for https://github.com/bucket4j/bucket4j/issues/501"(int n, boolean verbose, boolean versioned) {
setup:
ProxyManagerMock proxyManager = new ProxyManagerMock(clock)
Bucket bucket = null

Supplier<BucketConfiguration> configSupplier = () -> BucketConfiguration.builder()
// configure always empty bucket
.addLimit(limit -> limit.capacity(100).refillGreedy(1, Duration.ofDays(1)).initialTokens(0))
.build()
if (versioned) {
bucket = proxyManager.builder()
.withOptimization(Optimizations.batching())
.withImplicitConfigurationReplacement(1, TokensInheritanceStrategy.AS_IS)
.build(42, configSupplier)
} else {
bucket = proxyManager.builder()
.withOptimization(Optimizations.batching())
.build(42, configSupplier)
}
AtomicLong consumedTokens = new AtomicLong()
when:
int processors = Runtime.getRuntime().availableProcessors()
CountDownLatch startLatch = new CountDownLatch(processors)
CountDownLatch stopLatch = new CountDownLatch(processors)
for (int i = 0; i < processors; i++) {
new Thread(() -> {
startLatch.countDown()
startLatch.await()
try {
for (int j = 0; j < 1_000; j++) {
boolean consumed = verbose ? bucket.asVerbose().tryConsume(1).value : bucket.tryConsume(1)
if (consumed) {
consumedTokens.addAndGet(1)
}
}
} catch (Throwable t) {
t.printStackTrace()
} finally {
stopLatch.countDown()
}
}).start()
}
stopLatch.await()
then:
consumedTokens.get() == 0
where:
[n, verbose, versioned] << [
[1, false, false],
[2, true, false],
[3, false, true],
[4, true, true]
]
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.github.bucket4j.distributed.remote.commands

import io.github.bucket4j.distributed.remote.CommandResult
import spock.lang.Specification
import spock.lang.Unroll

class ConsumeAsMuchAsPossibleCommandSpecification extends Specification {

@Unroll
def "#n test unwrapOneResult"(int n, int consumedTokens, int indice, CommandResult expectedResult) {
expect:
ConsumeAsMuchAsPossibleCommand command = new ConsumeAsMuchAsPossibleCommand(Long.MAX_VALUE)
assert expectedResult == command.unwrapOneResult(consumedTokens, indice)
where:
[n, consumedTokens, indice, expectedResult] << [
[1, 0, 0, CommandResult.FALSE],
[2, 0, 1, CommandResult.FALSE],
[3, 1, 0, CommandResult.TRUE],
[4, 1, 1, CommandResult.FALSE],
[5, 2, 1, CommandResult.TRUE],
]
}
}

0 comments on commit ee37f58

Please sign in to comment.