Skip to content

Commit

Permalink
feature: support standalone seamless ha
Browse files Browse the repository at this point in the history
Signed-off-by: bodong.ybd <[email protected]>
  • Loading branch information
yangbodong22011 committed May 9, 2024
1 parent 12974bc commit b78a3fe
Show file tree
Hide file tree
Showing 8 changed files with 443 additions and 0 deletions.
32 changes: 32 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,31 @@ replicaof localhost 6388
extended-redis-compatibility yes
endef

define VALKEY12_CONF
daemonize yes
protected-mode no
port 6390
pidfile /tmp/valkey12.pid
logfile /tmp/valkey12.log
save ""
appendonly no
extended-redis-compatibility yes
replica-enable-redirect yes
endef

define VALKEY13_CONF
daemonize yes
protected-mode no
port 6391
pidfile /tmp/valkey13.pid
logfile /tmp/valkey13.log
save ""
appendonly no
replicaof localhost 6390
extended-redis-compatibility yes
replica-enable-redirect yes
endef

# SENTINELS
define VALKEY_SENTINEL1
port 26379
Expand Down Expand Up @@ -409,6 +434,8 @@ export VALKEY8_CONF
export VALKEY9_CONF
export VALKEY10_CONF
export VALKEY11_CONF
export VALKEY12_CONF
export VALKEY13_CONF
export VALKEY_SENTINEL1
export VALKEY_SENTINEL2
export VALKEY_SENTINEL3
Expand Down Expand Up @@ -445,6 +472,8 @@ start: stunnel cleanup compile-module
echo "$$VALKEY9_CONF" | valkey-server -
echo "$$VALKEY10_CONF" | valkey-server -
echo "$$VALKEY11_CONF" | valkey-server -
echo "$$VALKEY12_CONF" | valkey-server -
echo "$$VALKEY13_CONF" | valkey-server -
echo "$$VALKEY_SENTINEL1" > /tmp/sentinel1.conf && valkey-server /tmp/sentinel1.conf --sentinel
@sleep 0.5
echo "$$VALKEY_SENTINEL2" > /tmp/sentinel2.conf && valkey-server /tmp/sentinel2.conf --sentinel
Expand All @@ -465,6 +494,7 @@ start: stunnel cleanup compile-module
echo "$$VALKEY_STABLE_CLUSTER_NODE3_CONF" | valkey-server -
echo "$$VALKEY_UDS" | valkey-server -
echo "$$VALKEY_UNAVAILABLE_CONF" | valkey-server -
@sleep 0.5
valkey-cli -a cluster --cluster create 127.0.0.1:7479 127.0.0.1:7480 127.0.0.1:7481 --cluster-yes
cleanup:
- rm -vf /tmp/valkey_cluster_node*.conf 2>/dev/null
Expand All @@ -487,6 +517,8 @@ stop:
kill `cat /tmp/valkey9.pid`
kill `cat /tmp/valkey10.pid`
kill `cat /tmp/valkey11.pid`
kill `cat /tmp/valkey12.pid`
kill `cat /tmp/valkey13.pid`
kill `cat /tmp/sentinel1.pid`
kill `cat /tmp/sentinel2.pid`
kill `cat /tmp/sentinel3.pid`
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/io/jackey/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ final HostAndPort getHostAndPort() {
return ((DefaultJedisSocketFactory) socketFactory).getHostAndPort();
}

public final ConnectionPool getMemberOf() {
return memberOf;
}

public int getSoTimeout() {
return soTimeout;
}
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/io/jackey/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public final class Protocol {
private static final String NOAUTH_PREFIX = "NOAUTH";
private static final String WRONGPASS_PREFIX = "WRONGPASS";
private static final String NOPERM_PREFIX = "NOPERM";
private static final String REDIRECT_PREFIX = "REDIRECT ";

private Protocol() {
throw new InstantiationError("Must not instantiate this class");
Expand Down Expand Up @@ -100,6 +101,9 @@ private static void processError(final RedisInputStream is) {
// throw new JedisAskDataException(message, new HostAndPort(askInfo[1],
// Integer.parseInt(askInfo[2])), Integer.parseInt(askInfo[0]));
throw new JedisAskDataException(message, HostAndPort.from(askInfo[1]), Integer.parseInt(askInfo[0]));
} else if (message.startsWith(REDIRECT_PREFIX)) {
String host = parseTargetHost(message);
throw new JedisRedirectionException(message, HostAndPort.from(host), -1); // slot -1 means standalone
} else if (message.startsWith(CLUSTERDOWN_PREFIX)) {
throw new JedisClusterException(message);
} else if (message.startsWith(BUSY_PREFIX)) {
Expand Down Expand Up @@ -140,6 +144,11 @@ private static String[] parseTargetHostAndSlot(String clusterRedirectResponse) {
return response;
}

private static String parseTargetHost(String clusterRedirectResponse) {
String[] messageInfo = clusterRedirectResponse.split(" ");
return messageInfo[1];
}

private static Object process(final RedisInputStream is) {
final byte b = is.readByte();
//System.out.println((char) b);
Expand Down
26 changes: 26 additions & 0 deletions src/main/java/io/jackey/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,32 @@ protected UnifiedJedis(ClusterConnectionProvider provider, int maxAttempts, Dura
new ClusterCommandObjects(), protocol);
}

/**
* Constructs a {@code UnifiedJedis} instance with the given parameters.
*
* @param provider the connection provider for redirect handling
* @param maxAttempts the maximum number of attempts to execute a command before giving up
* @param maxTotalRetriesDuration the maximum total duration to retry commands before giving up
*/
public UnifiedJedis(RedirectConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration) {
this(new RedirectCommandExecutor(provider, maxAttempts, maxTotalRetriesDuration), provider);
}

/**
* A constructor with an additional parameter for customizing the Redis communication protocol.
*
* @param provider the connection provider for redirect handling
* @param maxAttempts the maximum number of attempts to execute a command before giving up
* @param maxTotalRetriesDuration the maximum total duration to retry commands before giving up
* @param protocol the Redis protocol implementation to use
*/
protected UnifiedJedis(RedirectConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration,
RedisProtocol protocol) {
this(new RedirectCommandExecutor(provider, maxAttempts, maxTotalRetriesDuration), provider, new CommandObjects(),
protocol);
}


/**
* @deprecated Sharding/Sharded feature will be removed in next major release.
*/
Expand Down
143 changes: 143 additions & 0 deletions src/main/java/io/jackey/executors/RedirectCommandExecutor.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package io.jackey.executors;

import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

import io.jackey.CommandObject;
import io.jackey.Connection;
import io.jackey.annots.VisibleForTesting;
import io.jackey.exceptions.JedisConnectionException;
import io.jackey.exceptions.JedisException;
import io.jackey.exceptions.JedisRedirectionException;
import io.jackey.providers.RedirectConnectionProvider;
import io.jackey.util.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedirectCommandExecutor implements CommandExecutor {

private final Logger log = LoggerFactory.getLogger(getClass());

public final RedirectConnectionProvider provider;
protected final int maxAttempts;
protected final Duration maxTotalRetriesDuration;

public RedirectCommandExecutor(RedirectConnectionProvider provider, int maxAttempts,
Duration maxTotalRetriesDuration) {
this.provider = provider;
this.maxAttempts = maxAttempts;
this.maxTotalRetriesDuration = maxTotalRetriesDuration;
}

@Override
public void close() {
this.provider.close();
}

@Override
public final <T> T executeCommand(CommandObject<T> commandObject) {
Instant deadline = Instant.now().plus(maxTotalRetriesDuration);

int consecutiveConnectionFailures = 0;
Exception lastException = null;
for (int attemptsLeft = this.maxAttempts; attemptsLeft > 0; attemptsLeft--) {
Connection connection = null;
try {
connection = provider.getConnection();
return execute(connection, commandObject);

} catch (JedisConnectionException jce) {
lastException = jce;
++consecutiveConnectionFailures;
log.debug("Failed connecting to Redis: {}", connection, jce);
// "- 1" because we just did one, but the attemptsLeft counter hasn't been decremented yet
boolean reset = handleConnectionProblem(attemptsLeft - 1, consecutiveConnectionFailures, deadline);
if (reset) {
consecutiveConnectionFailures = 0;
}
} catch (JedisRedirectionException jre) {
// avoid updating lastException if it is a connection exception
if (lastException == null || lastException instanceof JedisRedirectionException) {
lastException = jre;
}
log.debug("Redirected by server to {}", jre.getTargetNode());
consecutiveConnectionFailures = 0;
provider.renewPool(connection, jre.getTargetNode());
} finally {
IOUtils.closeQuietly(connection);
}
if (Instant.now().isAfter(deadline)) {
throw new JedisException("Redirect retry deadline exceeded.");
}
}

JedisException maxRedirectException = new JedisException("No more redirect attempts left.");
maxRedirectException.addSuppressed(lastException);
throw maxRedirectException;
}

/**
* WARNING: This method is accessible for the purpose of testing.
* This should not be used or overriden.
*/
@VisibleForTesting
protected <T> T execute(Connection connection, CommandObject<T> commandObject) {
return connection.executeCommand(commandObject);
}

/**
* Related values should be reset if <code>TRUE</code> is returned.
*
* @param attemptsLeft
* @param consecutiveConnectionFailures
* @param doneDeadline
* @return true - if some actions are taken
* <br /> false - if no actions are taken
*/
private boolean handleConnectionProblem(int attemptsLeft, int consecutiveConnectionFailures, Instant doneDeadline) {
if (this.maxAttempts < 3) {
if (attemptsLeft == 0) {
provider.renewPool(null, null);
return true;
}
return false;
}

if (consecutiveConnectionFailures < 2) {
return false;
}

sleep(getBackoffSleepMillis(attemptsLeft, doneDeadline));
provider.renewPool(null, null);
return true;
}

private static long getBackoffSleepMillis(int attemptsLeft, Instant deadline) {
if (attemptsLeft <= 0) {
return 0;
}

long millisLeft = Duration.between(Instant.now(), deadline).toMillis();
if (millisLeft < 0) {
throw new JedisException("Redirect retry deadline exceeded.");
}

long maxBackOff = millisLeft / (attemptsLeft * attemptsLeft);
return ThreadLocalRandom.current().nextLong(maxBackOff + 1);
}

/**
* WARNING: This method is accessible for the purpose of testing.
* This should not be used or overriden.
*/
@VisibleForTesting
protected void sleep(long sleepMillis) {
try {
TimeUnit.MILLISECONDS.sleep(sleepMillis);
} catch (InterruptedException e) {
throw new JedisException(e);
}
}
}
Loading

0 comments on commit b78a3fe

Please sign in to comment.