diff --git a/Makefile b/Makefile index 385038ce1..e214114de 100644 --- a/Makefile +++ b/Makefile @@ -145,6 +145,29 @@ replicaof localhost 6388 extended-redis-compatibility yes endef +define VALKEY12_CONF +daemonize yes +protected-mode no +port 6390 +pidfile /tmp/valkey12.pid +logfile /tmp/valkey12.log +save "" +appendonly no +extended-redis-compatibility yes +endef + +define VALKEY13_CONF +daemonize yes +protected-mode no +port 6391 +pidfile /tmp/valkey13.pid +logfile /tmp/valkey13.log +save "" +appendonly no +replicaof localhost 6390 +extended-redis-compatibility yes +endef + # SENTINELS define VALKEY_SENTINEL1 port 26379 @@ -409,6 +432,8 @@ export VALKEY8_CONF export VALKEY9_CONF export VALKEY10_CONF export VALKEY11_CONF +export VALKEY12_CONF +export VALKEY13_CONF export VALKEY_SENTINEL1 export VALKEY_SENTINEL2 export VALKEY_SENTINEL3 @@ -445,6 +470,8 @@ start: stunnel cleanup compile-module echo "$$VALKEY9_CONF" | valkey-server - echo "$$VALKEY10_CONF" | valkey-server - echo "$$VALKEY11_CONF" | valkey-server - + echo "$$VALKEY12_CONF" | valkey-server - + echo "$$VALKEY13_CONF" | valkey-server - echo "$$VALKEY_SENTINEL1" > /tmp/sentinel1.conf && valkey-server /tmp/sentinel1.conf --sentinel @sleep 0.5 echo "$$VALKEY_SENTINEL2" > /tmp/sentinel2.conf && valkey-server /tmp/sentinel2.conf --sentinel @@ -465,6 +492,7 @@ start: stunnel cleanup compile-module echo "$$VALKEY_STABLE_CLUSTER_NODE3_CONF" | valkey-server - echo "$$VALKEY_UDS" | valkey-server - echo "$$VALKEY_UNAVAILABLE_CONF" | valkey-server - + @sleep 0.5 valkey-cli -a cluster --cluster create 127.0.0.1:7479 127.0.0.1:7480 127.0.0.1:7481 --cluster-yes cleanup: - rm -vf /tmp/valkey_cluster_node*.conf 2>/dev/null @@ -487,6 +515,8 @@ stop: kill `cat /tmp/valkey9.pid` kill `cat /tmp/valkey10.pid` kill `cat /tmp/valkey11.pid` + kill `cat /tmp/valkey12.pid` + kill `cat /tmp/valkey13.pid` kill `cat /tmp/sentinel1.pid` kill `cat /tmp/sentinel2.pid` kill `cat /tmp/sentinel3.pid` diff --git a/src/main/java/io/jackey/ClientCapaConfig.java b/src/main/java/io/jackey/ClientCapaConfig.java new file mode 100644 index 000000000..ac6c5b834 --- /dev/null +++ b/src/main/java/io/jackey/ClientCapaConfig.java @@ -0,0 +1,30 @@ +package io.jackey; + +public class ClientCapaConfig { + private final boolean disabled; + private final boolean redirect; + + public static final ClientCapaConfig DEFAULT = new ClientCapaConfig(); + public static final ClientCapaConfig DISABLED = new ClientCapaConfig(true, false); + + public ClientCapaConfig() { + this(false, true); + } + + public ClientCapaConfig(boolean disabled, boolean redirect) { + this.disabled = disabled; + this.redirect = redirect; + } + + public final boolean isDisabled() { + return disabled; + } + + public boolean isRedirect() { + return redirect; + } + + public static ClientCapaConfig withRedirect() { + return new ClientCapaConfig(false, true); + } +} diff --git a/src/main/java/io/jackey/Connection.java b/src/main/java/io/jackey/Connection.java index 930d2afc0..b9dd38b0b 100644 --- a/src/main/java/io/jackey/Connection.java +++ b/src/main/java/io/jackey/Connection.java @@ -84,6 +84,10 @@ final HostAndPort getHostAndPort() { return ((DefaultJedisSocketFactory) socketFactory).getHostAndPort(); } + public final ConnectionPool getMemberOf() { + return memberOf; + } + public int getSoTimeout() { return soTimeout; } @@ -414,6 +418,7 @@ private void initializeFromClientConfig(final JedisClientConfig config) { fireAndForgetMsg.add(new CommandArguments(Command.CLIENT).add(Keyword.SETNAME).add(clientName)); } + // CLIENT SETINFO LIB-NAME & LIB-VER ClientSetInfoConfig setInfoConfig = config.getClientSetInfoConfig(); if (setInfoConfig == null) setInfoConfig = ClientSetInfoConfig.DEFAULT; @@ -435,6 +440,16 @@ private void initializeFromClientConfig(final JedisClientConfig config) { } } + // CLIENT CAPA REDIRECT + ClientCapaConfig capaConfig = config.getClientCapaConfig(); + if (capaConfig == null) capaConfig = ClientCapaConfig.DEFAULT; + if (!capaConfig.isDisabled()) { + if (capaConfig.isRedirect()) { + fireAndForgetMsg.add(new CommandArguments(Command.CLIENT).add(Keyword.CAPA) + .add(ClientAttributeOption.REDIRECT.getRaw())); + } + } + for (CommandArguments arg : fireAndForgetMsg) { sendCommand(arg); } diff --git a/src/main/java/io/jackey/DefaultJedisClientConfig.java b/src/main/java/io/jackey/DefaultJedisClientConfig.java index 486c5f2ab..84452767e 100644 --- a/src/main/java/io/jackey/DefaultJedisClientConfig.java +++ b/src/main/java/io/jackey/DefaultJedisClientConfig.java @@ -25,12 +25,13 @@ public final class DefaultJedisClientConfig implements JedisClientConfig { private final HostAndPortMapper hostAndPortMapper; private final ClientSetInfoConfig clientSetInfoConfig; + private final ClientCapaConfig clientCapaConfig; private DefaultJedisClientConfig(RedisProtocol protocol, int connectionTimeoutMillis, int soTimeoutMillis, int blockingSocketTimeoutMillis, Supplier credentialsProvider, int database, String clientName, boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters, HostnameVerifier hostnameVerifier, HostAndPortMapper hostAndPortMapper, - ClientSetInfoConfig clientSetInfoConfig) { + ClientSetInfoConfig clientSetInfoConfig, ClientCapaConfig clientCapaConfig) { this.redisProtocol = protocol; this.connectionTimeoutMillis = connectionTimeoutMillis; this.socketTimeoutMillis = soTimeoutMillis; @@ -44,6 +45,7 @@ private DefaultJedisClientConfig(RedisProtocol protocol, int connectionTimeoutMi this.hostnameVerifier = hostnameVerifier; this.hostAndPortMapper = hostAndPortMapper; this.clientSetInfoConfig = clientSetInfoConfig; + this.clientCapaConfig = clientCapaConfig; } @Override @@ -122,6 +124,11 @@ public ClientSetInfoConfig getClientSetInfoConfig() { return clientSetInfoConfig; } + @Override + public ClientCapaConfig getClientCapaConfig() { + return clientCapaConfig; + } + public static Builder builder() { return new Builder(); } @@ -148,6 +155,7 @@ public static class Builder { private HostAndPortMapper hostAndPortMapper = null; private ClientSetInfoConfig clientSetInfoConfig = ClientSetInfoConfig.DEFAULT; + private ClientCapaConfig clientCapaConfig = ClientCapaConfig.DEFAULT; private Builder() { } @@ -160,7 +168,7 @@ public DefaultJedisClientConfig build() { return new DefaultJedisClientConfig(redisProtocol, connectionTimeoutMillis, socketTimeoutMillis, blockingSocketTimeoutMillis, credentialsProvider, database, clientName, ssl, - sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, clientSetInfoConfig); + sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, clientSetInfoConfig, clientCapaConfig); } /** @@ -255,6 +263,11 @@ public Builder clientSetInfoConfig(ClientSetInfoConfig setInfoConfig) { this.clientSetInfoConfig = setInfoConfig; return this; } + + public Builder clientCapaConfig(ClientCapaConfig capaConfig) { + this.clientCapaConfig = capaConfig; + return this; + } } public static DefaultJedisClientConfig create(int connectionTimeoutMillis, int soTimeoutMillis, @@ -264,7 +277,7 @@ public static DefaultJedisClientConfig create(int connectionTimeoutMillis, int s return new DefaultJedisClientConfig(null, connectionTimeoutMillis, soTimeoutMillis, blockingSocketTimeoutMillis, new DefaultRedisCredentialsProvider(new DefaultRedisCredentials(user, password)), database, - clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, null); + clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, null, null); } public static DefaultJedisClientConfig copyConfig(JedisClientConfig copy) { @@ -273,6 +286,6 @@ public static DefaultJedisClientConfig copyConfig(JedisClientConfig copy) { copy.getBlockingSocketTimeoutMillis(), copy.getCredentialsProvider(), copy.getDatabase(), copy.getClientName(), copy.isSsl(), copy.getSslSocketFactory(), copy.getSslParameters(), copy.getHostnameVerifier(), copy.getHostAndPortMapper(), - copy.getClientSetInfoConfig()); + copy.getClientSetInfoConfig(), copy.getClientCapaConfig()); } } diff --git a/src/main/java/io/jackey/JedisClientConfig.java b/src/main/java/io/jackey/JedisClientConfig.java index 494dec520..5d3f8b67f 100644 --- a/src/main/java/io/jackey/JedisClientConfig.java +++ b/src/main/java/io/jackey/JedisClientConfig.java @@ -87,4 +87,8 @@ default HostAndPortMapper getHostAndPortMapper() { default ClientSetInfoConfig getClientSetInfoConfig() { return ClientSetInfoConfig.DEFAULT; } + + default ClientCapaConfig getClientCapaConfig() { + return ClientCapaConfig.DEFAULT; + } } diff --git a/src/main/java/io/jackey/Protocol.java b/src/main/java/io/jackey/Protocol.java index 1a2edbb8e..5f30e8800 100644 --- a/src/main/java/io/jackey/Protocol.java +++ b/src/main/java/io/jackey/Protocol.java @@ -65,6 +65,7 @@ public final class Protocol { private static final String NOAUTH_PREFIX = "NOAUTH"; private static final String WRONGPASS_PREFIX = "WRONGPASS"; private static final String NOPERM_PREFIX = "NOPERM"; + private static final String REDIRECT_PREFIX = "REDIRECT "; private Protocol() { throw new InstantiationError("Must not instantiate this class"); @@ -100,6 +101,9 @@ private static void processError(final RedisInputStream is) { // throw new JedisAskDataException(message, new HostAndPort(askInfo[1], // Integer.parseInt(askInfo[2])), Integer.parseInt(askInfo[0])); throw new JedisAskDataException(message, HostAndPort.from(askInfo[1]), Integer.parseInt(askInfo[0])); + } else if (message.startsWith(REDIRECT_PREFIX)) { + String host = parseTargetHost(message); + throw new JedisRedirectionException(message, HostAndPort.from(host), -1); // slot -1 means standalone } else if (message.startsWith(CLUSTERDOWN_PREFIX)) { throw new JedisClusterException(message); } else if (message.startsWith(BUSY_PREFIX)) { @@ -140,6 +144,11 @@ private static String[] parseTargetHostAndSlot(String clusterRedirectResponse) { return response; } + private static String parseTargetHost(String clusterRedirectResponse) { + String[] messageInfo = clusterRedirectResponse.split(" "); + return messageInfo[1]; + } + private static Object process(final RedisInputStream is) { final byte b = is.readByte(); //System.out.println((char) b); @@ -309,7 +318,7 @@ public static enum Keyword implements Rawable { DELETE, LIBRARYNAME, WITHCODE, DESCRIPTION, GETKEYS, GETKEYSANDFLAGS, DOCS, FILTERBY, DUMP, MODULE, ACLCAT, PATTERN, DOCTOR, LATEST, HISTORY, USAGE, SAMPLES, PURGE, STATS, LOADEX, CONFIG, ARGS, RANK, NOW, VERSION, ADDR, SKIPME, USER, LADDR, - CHANNELS, NUMPAT, NUMSUB, SHARDCHANNELS, SHARDNUMSUB, NOVALUES, MAXAGE; + CHANNELS, NUMPAT, NUMSUB, SHARDCHANNELS, SHARDNUMSUB, NOVALUES, MAXAGE, CAPA; private final byte[] raw; diff --git a/src/main/java/io/jackey/UnifiedJedis.java b/src/main/java/io/jackey/UnifiedJedis.java index cee7cdc7f..dac2524d7 100644 --- a/src/main/java/io/jackey/UnifiedJedis.java +++ b/src/main/java/io/jackey/UnifiedJedis.java @@ -251,6 +251,32 @@ protected UnifiedJedis(ClusterConnectionProvider provider, int maxAttempts, Dura new ClusterCommandObjects(), protocol); } + /** + * Constructs a {@code UnifiedJedis} instance with the given parameters. + * + * @param provider the connection provider for redirect handling + * @param maxAttempts the maximum number of attempts to execute a command before giving up + * @param maxTotalRetriesDuration the maximum total duration to retry commands before giving up + */ + public UnifiedJedis(RedirectConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration) { + this(new RedirectCommandExecutor(provider, maxAttempts, maxTotalRetriesDuration), provider); + } + + /** + * A constructor with an additional parameter for customizing the Redis communication protocol. + * + * @param provider the connection provider for redirect handling + * @param maxAttempts the maximum number of attempts to execute a command before giving up + * @param maxTotalRetriesDuration the maximum total duration to retry commands before giving up + * @param protocol the Redis protocol implementation to use + */ + protected UnifiedJedis(RedirectConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration, + RedisProtocol protocol) { + this(new RedirectCommandExecutor(provider, maxAttempts, maxTotalRetriesDuration), provider, new CommandObjects(), + protocol); + } + + /** * @deprecated Sharding/Sharded feature will be removed in next major release. */ diff --git a/src/main/java/io/jackey/args/ClientAttributeOption.java b/src/main/java/io/jackey/args/ClientAttributeOption.java index 33cb70642..551c5e6b8 100644 --- a/src/main/java/io/jackey/args/ClientAttributeOption.java +++ b/src/main/java/io/jackey/args/ClientAttributeOption.java @@ -3,12 +3,13 @@ import io.jackey.util.SafeEncoder; /** - * CLIENT SETINFO command attr option + * CLIENT command attr option * since redis 7.2 */ public enum ClientAttributeOption implements Rawable { LIB_NAME("LIB-NAME"), - LIB_VER("LIB-VER"); + LIB_VER("LIB-VER"), + REDIRECT("REDIRECT"); private final byte[] raw; diff --git a/src/main/java/io/jackey/executors/RedirectCommandExecutor.java b/src/main/java/io/jackey/executors/RedirectCommandExecutor.java new file mode 100644 index 000000000..5d864bd53 --- /dev/null +++ b/src/main/java/io/jackey/executors/RedirectCommandExecutor.java @@ -0,0 +1,143 @@ +package io.jackey.executors; + +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; + +import io.jackey.CommandObject; +import io.jackey.Connection; +import io.jackey.annots.VisibleForTesting; +import io.jackey.exceptions.JedisConnectionException; +import io.jackey.exceptions.JedisException; +import io.jackey.exceptions.JedisRedirectionException; +import io.jackey.providers.RedirectConnectionProvider; +import io.jackey.util.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RedirectCommandExecutor implements CommandExecutor { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + public final RedirectConnectionProvider provider; + protected final int maxAttempts; + protected final Duration maxTotalRetriesDuration; + + public RedirectCommandExecutor(RedirectConnectionProvider provider, int maxAttempts, + Duration maxTotalRetriesDuration) { + this.provider = provider; + this.maxAttempts = maxAttempts; + this.maxTotalRetriesDuration = maxTotalRetriesDuration; + } + + @Override + public void close() { + this.provider.close(); + } + + @Override + public final T executeCommand(CommandObject commandObject) { + Instant deadline = Instant.now().plus(maxTotalRetriesDuration); + + int consecutiveConnectionFailures = 0; + Exception lastException = null; + for (int attemptsLeft = this.maxAttempts; attemptsLeft > 0; attemptsLeft--) { + Connection connection = null; + try { + connection = provider.getConnection(); + return execute(connection, commandObject); + + } catch (JedisConnectionException jce) { + lastException = jce; + ++consecutiveConnectionFailures; + log.debug("Failed connecting to Redis: {}", connection, jce); + // "- 1" because we just did one, but the attemptsLeft counter hasn't been decremented yet + boolean reset = handleConnectionProblem(attemptsLeft - 1, consecutiveConnectionFailures, deadline); + if (reset) { + consecutiveConnectionFailures = 0; + } + } catch (JedisRedirectionException jre) { + // avoid updating lastException if it is a connection exception + if (lastException == null || lastException instanceof JedisRedirectionException) { + lastException = jre; + } + log.debug("Redirected by server to {}", jre.getTargetNode()); + consecutiveConnectionFailures = 0; + provider.renewPool(connection, jre.getTargetNode()); + } finally { + IOUtils.closeQuietly(connection); + } + if (Instant.now().isAfter(deadline)) { + throw new JedisException("Redirect retry deadline exceeded."); + } + } + + JedisException maxRedirectException = new JedisException("No more redirect attempts left."); + maxRedirectException.addSuppressed(lastException); + throw maxRedirectException; + } + + /** + * WARNING: This method is accessible for the purpose of testing. + * This should not be used or overriden. + */ + @VisibleForTesting + protected T execute(Connection connection, CommandObject commandObject) { + return connection.executeCommand(commandObject); + } + + /** + * Related values should be reset if TRUE is returned. + * + * @param attemptsLeft + * @param consecutiveConnectionFailures + * @param doneDeadline + * @return true - if some actions are taken + *
false - if no actions are taken + */ + private boolean handleConnectionProblem(int attemptsLeft, int consecutiveConnectionFailures, Instant doneDeadline) { + if (this.maxAttempts < 3) { + if (attemptsLeft == 0) { + provider.renewPool(null, null); + return true; + } + return false; + } + + if (consecutiveConnectionFailures < 2) { + return false; + } + + sleep(getBackoffSleepMillis(attemptsLeft, doneDeadline)); + provider.renewPool(null, null); + return true; + } + + private static long getBackoffSleepMillis(int attemptsLeft, Instant deadline) { + if (attemptsLeft <= 0) { + return 0; + } + + long millisLeft = Duration.between(Instant.now(), deadline).toMillis(); + if (millisLeft < 0) { + throw new JedisException("Redirect retry deadline exceeded."); + } + + long maxBackOff = millisLeft / (attemptsLeft * attemptsLeft); + return ThreadLocalRandom.current().nextLong(maxBackOff + 1); + } + + /** + * WARNING: This method is accessible for the purpose of testing. + * This should not be used or overriden. + */ + @VisibleForTesting + protected void sleep(long sleepMillis) { + try { + TimeUnit.MILLISECONDS.sleep(sleepMillis); + } catch (InterruptedException e) { + throw new JedisException(e); + } + } +} diff --git a/src/main/java/io/jackey/providers/RedirectConnectionProvider.java b/src/main/java/io/jackey/providers/RedirectConnectionProvider.java new file mode 100644 index 000000000..8fbd54dde --- /dev/null +++ b/src/main/java/io/jackey/providers/RedirectConnectionProvider.java @@ -0,0 +1,116 @@ +package io.jackey.providers; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import io.jackey.CommandArguments; +import io.jackey.Connection; +import io.jackey.ConnectionPool; +import io.jackey.DefaultJedisClientConfig; +import io.jackey.HostAndPort; +import io.jackey.JedisClientConfig; +import io.jackey.exceptions.JedisException; +import io.jackey.util.Pool; +import org.apache.commons.pool2.impl.GenericObjectPoolConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RedirectConnectionProvider implements ConnectionProvider { + private static final Logger logger = LoggerFactory.getLogger(RedirectConnectionProvider.class); + + private Pool pool; + private HostAndPort hostAndPort; + private JedisClientConfig clientConfig; + private GenericObjectPoolConfig poolConfig; + + private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); + private final Lock r = rwl.readLock(); + private final Lock w = rwl.writeLock(); + private final Lock rediscoverLock = new ReentrantLock(); + + public RedirectConnectionProvider(HostAndPort hostAndPort) { + this(hostAndPort, DefaultJedisClientConfig.builder().build()); + } + + public RedirectConnectionProvider(HostAndPort hostAndPort, JedisClientConfig clientConfig) { + this(hostAndPort, clientConfig, new GenericObjectPoolConfig<>()); + } + + public RedirectConnectionProvider(HostAndPort hostAndPort, JedisClientConfig clientConfig, + GenericObjectPoolConfig poolConfig) { + this.hostAndPort = hostAndPort; + this.clientConfig = clientConfig; + this.poolConfig = poolConfig; + this.pool = new ConnectionPool(hostAndPort, clientConfig, poolConfig); + } + + public void renewPool(Connection connection, HostAndPort targetNode) { + if (rediscoverLock.tryLock()) { + try { + // if memberOf is closed, means old flight connection return redirect exception, just return. + if (connection != null && connection.getMemberOf().isClosed()) { + return; + } + // if targetNode is not null, update the new address + HostAndPort oldNode = hostAndPort; + if (targetNode != null) { + this.hostAndPort = targetNode; + } + // close old pool and create new pool + w.lock(); + try { + if (!pool.isClosed()) { + try { + pool.close(); + } catch (JedisException e) { + logger.warn("close pool get exception, hostAndPort:{}", oldNode, e); + } + } + this.pool = new ConnectionPool(hostAndPort, clientConfig, poolConfig); + } finally { + w.unlock(); + } + } finally { + rediscoverLock.unlock(); + } + } + } + + @Override + public void close() { + w.lock(); + try { + pool.close(); + } finally { + w.unlock(); + } + } + + @Override + public Connection getConnection(CommandArguments args) { + return getConnection(); + } + + @Override + public Connection getConnection() { + r.lock(); + try { + return pool.getResource(); + } finally { + r.unlock(); + } + } + + @Override + public Map> getConnectionMap() { + r.lock(); + try { + return Collections.singletonMap(hostAndPort, pool); + } finally { + r.unlock(); + } + } +} diff --git a/src/test/java/io/jackey/HostAndPorts.java b/src/test/java/io/jackey/HostAndPorts.java index 5f385dda0..18272489a 100644 --- a/src/test/java/io/jackey/HostAndPorts.java +++ b/src/test/java/io/jackey/HostAndPorts.java @@ -22,6 +22,8 @@ public final class HostAndPorts { redisHostAndPortList.add(new HostAndPort("localhost", Protocol.DEFAULT_PORT + 8)); redisHostAndPortList.add(new HostAndPort("localhost", Protocol.DEFAULT_PORT + 9)); redisHostAndPortList.add(new HostAndPort("localhost", Protocol.DEFAULT_PORT + 10)); + redisHostAndPortList.add(new HostAndPort("localhost", Protocol.DEFAULT_PORT + 11)); + redisHostAndPortList.add(new HostAndPort("localhost", Protocol.DEFAULT_PORT + 12)); sentinelHostAndPortList.add(new HostAndPort("localhost", Protocol.DEFAULT_SENTINEL_PORT)); sentinelHostAndPortList.add(new HostAndPort("localhost", Protocol.DEFAULT_SENTINEL_PORT + 1)); diff --git a/src/test/java/io/jackey/RedirectPoolTest.java b/src/test/java/io/jackey/RedirectPoolTest.java new file mode 100644 index 000000000..c5af70a8b --- /dev/null +++ b/src/test/java/io/jackey/RedirectPoolTest.java @@ -0,0 +1,112 @@ +package io.jackey; + +import java.time.Duration; + +import io.jackey.args.ClientPauseMode; +import io.jackey.providers.RedirectConnectionProvider; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.fail; + +public class RedirectPoolTest { + private static final Logger logger = LoggerFactory.getLogger(RedirectPoolTest.class); + + private static final HostAndPort node1 = HostAndPorts.getRedisServers().get(11); + private static final HostAndPort node2 = HostAndPorts.getRedisServers().get(12); + + private HostAndPort masterAddress; + private HostAndPort replicaAddress; + + @Before + public void prepare() { + String role1, role2; + try (Jedis jedis1 = new Jedis(node1)) { + role1 = (String)jedis1.role().get(0); + } + try (Jedis jedis2 = new Jedis(node2)) { + role2 = (String)jedis2.role().get(0); + } + + if ("master".equals(role1) && "slave".equals(role2)) { + masterAddress = node1; + replicaAddress = node2; + } else if ("master".equals(role2) && "slave".equals(role1)) { + masterAddress = node2; + replicaAddress = node1; + } else { + fail("role not match"); + } + } + + private void changeMaster() throws Exception { + prepare(); + Jedis masterJedis = new Jedis(masterAddress); + Jedis replicaJedis = new Jedis(replicaAddress); + replicaJedis.readonly(); + // pause master + masterJedis.clientPause(60000, ClientPauseMode.WRITE); + // check replication + do { + Thread.sleep(1000); + } while (masterJedis.dbSize() != replicaJedis.dbSize()); + // change master and replica + replicaJedis.slaveofNoOne(); + masterJedis.slaveof(replicaAddress.getHost(), replicaAddress.getPort()); + // unpause + masterJedis.clientUnpause(); + } + + @Test + public void basicRedirect() throws Exception { + UnifiedJedis unifiedJedis = new UnifiedJedis(new RedirectConnectionProvider(masterAddress, + DefaultJedisClientConfig.builder().socketTimeoutMillis(60000).build()), 60, + Duration.ofSeconds(60)); + Thread[] threads = new Thread[10]; + for (int i = 0; i < 10; i++) { + threads[i] = new Thread(new Runnable() { + @Override + public void run() { + try { + for (int i = 0; i < 600; i++) { + Thread.sleep(100); + Assert.assertEquals("OK", unifiedJedis.set(i + "", i + "")); + Assert.assertEquals(i + "", unifiedJedis.get(i + "")); + } + } catch (Exception e) { + e.printStackTrace(); + fail("Redirect should not throw exception"); + } + } + }); + } + + Thread thread2 = new Thread(new Runnable() { + @Override + public void run() { + try { + for (int i = 0; i < 10; i++) { + Thread.sleep(5000); + changeMaster(); + logger.info("This is {} times change master", i); + } + } catch (Exception e) { + e.printStackTrace(); + fail("Redirect should not throw exception"); + } + } + }); + for (int i = 0; i < 10; i++) { + threads[i].start(); + } + thread2.start(); + for (int i = 0; i < 10; i++) { + threads[i].join(); + } + thread2.join(); + unifiedJedis.close(); + } +}