Skip to content

Commit

Permalink
Merge pull request #7 from valkey-io/feature-for-throttle
Browse files Browse the repository at this point in the history
Feature: support retry when throttled
  • Loading branch information
yangbodong22011 authored Feb 27, 2025
2 parents 52bb5a1 + 0ccc1f9 commit e6791d5
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 0 deletions.
3 changes: 3 additions & 0 deletions src/main/java/io/valkey/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ public final class Protocol {
private static final String WRONGPASS_PREFIX = "WRONGPASS";
private static final String NOPERM_PREFIX = "NOPERM";
private static final String REDIRECT_PREFIX = "REDIRECT ";
private static final String THROTTLED_PREFIX = "THROTTLED ";

private Protocol() {
throw new InstantiationError("Must not instantiate this class");
Expand Down Expand Up @@ -114,6 +115,8 @@ private static void processError(final RedisInputStream is) {
|| message.startsWith(WRONGPASS_PREFIX)
|| message.startsWith(NOPERM_PREFIX)) {
throw new JedisAccessControlException(message);
} else if (message.startsWith(THROTTLED_PREFIX)) {
throw new JedisThrottledDataException(message);
}
throw new JedisDataException(message);
}
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/io/valkey/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -239,12 +239,24 @@ public UnifiedJedis(ClusterConnectionProvider provider, int maxAttempts, Duratio
new ClusterCommandObjects());
}

public UnifiedJedis(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration,
boolean retryOnThrottled) {
this(new ClusterCommandExecutor(provider, maxAttempts, maxTotalRetriesDuration, retryOnThrottled), provider,
new ClusterCommandObjects());
}

protected UnifiedJedis(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration,
RedisProtocol protocol) {
this(new ClusterCommandExecutor(provider, maxAttempts, maxTotalRetriesDuration), provider,
new ClusterCommandObjects(), protocol);
}

public UnifiedJedis(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration,
boolean retryOnThrottled, RedisProtocol protocol) {
this(new ClusterCommandExecutor(provider, maxAttempts, maxTotalRetriesDuration, retryOnThrottled), provider,
new ClusterCommandObjects(), protocol);
}

/**
* Constructs a {@code UnifiedJedis} instance with the given parameters.
*
Expand Down Expand Up @@ -291,6 +303,11 @@ public UnifiedJedis(ConnectionProvider provider, int maxAttempts, Duration maxTo
this(new RetryableCommandExecutor(provider, maxAttempts, maxTotalRetriesDuration), provider);
}

public UnifiedJedis(ConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration,
boolean retryOnThrottled) {
this(new RetryableCommandExecutor(provider, maxAttempts, maxTotalRetriesDuration, retryOnThrottled), provider);
}

/**
* Constructor which supports multiple cluster/database endpoints each with their own isolated connection pool.
* <p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.valkey.exceptions;

public class JedisThrottledDataException extends JedisDataException {

private static final long serialVersionUID = 3900401231195549148L;

public JedisThrottledDataException(String message) {
super(message);
}

public JedisThrottledDataException(Throwable cause) {
super(cause);
}

public JedisThrottledDataException(String message, Throwable cause) {
super(message, cause);
}
}
13 changes: 13 additions & 0 deletions src/main/java/io/valkey/executors/ClusterCommandExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.valkey.exceptions.JedisConnectionException;
import io.valkey.exceptions.JedisMovedDataException;
import io.valkey.exceptions.JedisRedirectionException;
import io.valkey.exceptions.JedisThrottledDataException;
import io.valkey.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -31,12 +32,19 @@ public class ClusterCommandExecutor implements CommandExecutor {
public final ClusterConnectionProvider provider;
protected final int maxAttempts;
protected final Duration maxTotalRetriesDuration;
protected final boolean retryOnThrottled;

public ClusterCommandExecutor(ClusterConnectionProvider provider, int maxAttempts,
Duration maxTotalRetriesDuration) {
this(provider, maxAttempts, maxTotalRetriesDuration, false);
}

public ClusterCommandExecutor(ClusterConnectionProvider provider, int maxAttempts,
Duration maxTotalRetriesDuration, boolean retryOnThrottled) {
this.provider = provider;
this.maxAttempts = maxAttempts;
this.maxTotalRetriesDuration = maxTotalRetriesDuration;
this.retryOnThrottled = retryOnThrottled;
}

@Override
Expand Down Expand Up @@ -124,6 +132,11 @@ public final <T> T executeCommand(CommandObject<T> commandObject) {
// it rebuilds cluster's slot cache recommended by Redis cluster specification
provider.renewSlotCache(connection);
}
} catch (JedisThrottledDataException ex) {
if (retryOnThrottled) {
lastException = ex;
sleep(getBackoffSleepMillis(attemptsLeft, deadline));
}
} finally {
IOUtils.closeQuietly(connection);
}
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/io/valkey/executors/RetryableCommandExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import io.valkey.CommandObject;
import io.valkey.Connection;
import io.valkey.annots.VisibleForTesting;
import io.valkey.exceptions.JedisThrottledDataException;
import io.valkey.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -22,12 +23,19 @@ public class RetryableCommandExecutor implements CommandExecutor {
protected final ConnectionProvider provider;
protected final int maxAttempts;
protected final Duration maxTotalRetriesDuration;
protected final boolean retryOnThrottled;

public RetryableCommandExecutor(ConnectionProvider provider, int maxAttempts,
Duration maxTotalRetriesDuration) {
this(provider, maxAttempts, maxTotalRetriesDuration, false);
}

public RetryableCommandExecutor(ConnectionProvider provider, int maxAttempts,
Duration maxTotalRetriesDuration, boolean retryOnThrottled) {
this.provider = provider;
this.maxAttempts = maxAttempts;
this.maxTotalRetriesDuration = maxTotalRetriesDuration;
this.retryOnThrottled = retryOnThrottled;
}

@Override
Expand Down Expand Up @@ -58,6 +66,11 @@ public final <T> T executeCommand(CommandObject<T> commandObject) {
if (reset) {
consecutiveConnectionFailures = 0;
}
} catch (JedisThrottledDataException ex) {
if (retryOnThrottled) {
lastException = ex;
sleep(getBackoffSleepMillis(attemptsLeft, deadline));
}
} finally {
if (connection != null) {
connection.close();
Expand Down

0 comments on commit e6791d5

Please sign in to comment.