Skip to content

Commit

Permalink
feature: support standalone seamless ha
Browse files Browse the repository at this point in the history
Signed-off-by: bodong.ybd <[email protected]>
  • Loading branch information
yangbodong22011 committed Jul 5, 2024
1 parent 764569d commit f750caf
Show file tree
Hide file tree
Showing 12 changed files with 508 additions and 7 deletions.
30 changes: 30 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,29 @@ replicaof localhost 6388
extended-redis-compatibility yes
endef

define VALKEY12_CONF
daemonize yes
protected-mode no
port 6390
pidfile /tmp/valkey12.pid
logfile /tmp/valkey12.log
save ""
appendonly no
extended-redis-compatibility yes
endef

define VALKEY13_CONF
daemonize yes
protected-mode no
port 6391
pidfile /tmp/valkey13.pid
logfile /tmp/valkey13.log
save ""
appendonly no
replicaof localhost 6390
extended-redis-compatibility yes
endef

# SENTINELS
define VALKEY_SENTINEL1
port 26379
Expand Down Expand Up @@ -409,6 +432,8 @@ export VALKEY8_CONF
export VALKEY9_CONF
export VALKEY10_CONF
export VALKEY11_CONF
export VALKEY12_CONF
export VALKEY13_CONF
export VALKEY_SENTINEL1
export VALKEY_SENTINEL2
export VALKEY_SENTINEL3
Expand Down Expand Up @@ -445,6 +470,8 @@ start: stunnel cleanup compile-module
echo "$$VALKEY9_CONF" | valkey-server -
echo "$$VALKEY10_CONF" | valkey-server -
echo "$$VALKEY11_CONF" | valkey-server -
echo "$$VALKEY12_CONF" | valkey-server -
echo "$$VALKEY13_CONF" | valkey-server -
echo "$$VALKEY_SENTINEL1" > /tmp/sentinel1.conf && valkey-server /tmp/sentinel1.conf --sentinel
@sleep 0.5
echo "$$VALKEY_SENTINEL2" > /tmp/sentinel2.conf && valkey-server /tmp/sentinel2.conf --sentinel
Expand All @@ -465,6 +492,7 @@ start: stunnel cleanup compile-module
echo "$$VALKEY_STABLE_CLUSTER_NODE3_CONF" | valkey-server -
echo "$$VALKEY_UDS" | valkey-server -
echo "$$VALKEY_UNAVAILABLE_CONF" | valkey-server -
@sleep 0.5
valkey-cli -a cluster --cluster create 127.0.0.1:7479 127.0.0.1:7480 127.0.0.1:7481 --cluster-yes
cleanup:
- rm -vf /tmp/valkey_cluster_node*.conf 2>/dev/null
Expand All @@ -487,6 +515,8 @@ stop:
kill `cat /tmp/valkey9.pid`
kill `cat /tmp/valkey10.pid`
kill `cat /tmp/valkey11.pid`
kill `cat /tmp/valkey12.pid`
kill `cat /tmp/valkey13.pid`
kill `cat /tmp/sentinel1.pid`
kill `cat /tmp/sentinel2.pid`
kill `cat /tmp/sentinel3.pid`
Expand Down
30 changes: 30 additions & 0 deletions src/main/java/io/jackey/ClientCapaConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.jackey;

public class ClientCapaConfig {
private final boolean disabled;
private final boolean redirect;

public static final ClientCapaConfig DEFAULT = new ClientCapaConfig();
public static final ClientCapaConfig DISABLED = new ClientCapaConfig(true, false);

public ClientCapaConfig() {
this(false, true);
}

public ClientCapaConfig(boolean disabled, boolean redirect) {
this.disabled = disabled;
this.redirect = redirect;
}

public final boolean isDisabled() {
return disabled;
}

public boolean isRedirect() {
return redirect;
}

public static ClientCapaConfig withRedirect() {
return new ClientCapaConfig(false, true);
}
}
15 changes: 15 additions & 0 deletions src/main/java/io/jackey/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ final HostAndPort getHostAndPort() {
return ((DefaultJedisSocketFactory) socketFactory).getHostAndPort();
}

public final ConnectionPool getMemberOf() {
return memberOf;
}

public int getSoTimeout() {
return soTimeout;
}
Expand Down Expand Up @@ -414,6 +418,7 @@ private void initializeFromClientConfig(final JedisClientConfig config) {
fireAndForgetMsg.add(new CommandArguments(Command.CLIENT).add(Keyword.SETNAME).add(clientName));
}

// CLIENT SETINFO LIB-NAME & LIB-VER
ClientSetInfoConfig setInfoConfig = config.getClientSetInfoConfig();
if (setInfoConfig == null) setInfoConfig = ClientSetInfoConfig.DEFAULT;

Expand All @@ -435,6 +440,16 @@ private void initializeFromClientConfig(final JedisClientConfig config) {
}
}

// CLIENT CAPA REDIRECT
ClientCapaConfig capaConfig = config.getClientCapaConfig();
if (capaConfig == null) capaConfig = ClientCapaConfig.DEFAULT;
if (!capaConfig.isDisabled()) {
if (capaConfig.isRedirect()) {
fireAndForgetMsg.add(new CommandArguments(Command.CLIENT).add(Keyword.CAPA)
.add(ClientAttributeOption.REDIRECT.getRaw()));
}
}

for (CommandArguments arg : fireAndForgetMsg) {
sendCommand(arg);
}
Expand Down
21 changes: 17 additions & 4 deletions src/main/java/io/jackey/DefaultJedisClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ public final class DefaultJedisClientConfig implements JedisClientConfig {
private final HostAndPortMapper hostAndPortMapper;

private final ClientSetInfoConfig clientSetInfoConfig;
private final ClientCapaConfig clientCapaConfig;

private DefaultJedisClientConfig(RedisProtocol protocol, int connectionTimeoutMillis, int soTimeoutMillis,
int blockingSocketTimeoutMillis, Supplier<RedisCredentials> credentialsProvider, int database,
String clientName, boolean ssl, SSLSocketFactory sslSocketFactory, SSLParameters sslParameters,
HostnameVerifier hostnameVerifier, HostAndPortMapper hostAndPortMapper,
ClientSetInfoConfig clientSetInfoConfig) {
ClientSetInfoConfig clientSetInfoConfig, ClientCapaConfig clientCapaConfig) {
this.redisProtocol = protocol;
this.connectionTimeoutMillis = connectionTimeoutMillis;
this.socketTimeoutMillis = soTimeoutMillis;
Expand All @@ -44,6 +45,7 @@ private DefaultJedisClientConfig(RedisProtocol protocol, int connectionTimeoutMi
this.hostnameVerifier = hostnameVerifier;
this.hostAndPortMapper = hostAndPortMapper;
this.clientSetInfoConfig = clientSetInfoConfig;
this.clientCapaConfig = clientCapaConfig;
}

@Override
Expand Down Expand Up @@ -122,6 +124,11 @@ public ClientSetInfoConfig getClientSetInfoConfig() {
return clientSetInfoConfig;
}

@Override
public ClientCapaConfig getClientCapaConfig() {
return clientCapaConfig;
}

public static Builder builder() {
return new Builder();
}
Expand All @@ -148,6 +155,7 @@ public static class Builder {
private HostAndPortMapper hostAndPortMapper = null;

private ClientSetInfoConfig clientSetInfoConfig = ClientSetInfoConfig.DEFAULT;
private ClientCapaConfig clientCapaConfig = ClientCapaConfig.DEFAULT;

private Builder() {
}
Expand All @@ -160,7 +168,7 @@ public DefaultJedisClientConfig build() {

return new DefaultJedisClientConfig(redisProtocol, connectionTimeoutMillis, socketTimeoutMillis,
blockingSocketTimeoutMillis, credentialsProvider, database, clientName, ssl,
sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, clientSetInfoConfig);
sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, clientSetInfoConfig, clientCapaConfig);
}

/**
Expand Down Expand Up @@ -255,6 +263,11 @@ public Builder clientSetInfoConfig(ClientSetInfoConfig setInfoConfig) {
this.clientSetInfoConfig = setInfoConfig;
return this;
}

public Builder clientCapaConfig(ClientCapaConfig capaConfig) {
this.clientCapaConfig = capaConfig;
return this;
}
}

public static DefaultJedisClientConfig create(int connectionTimeoutMillis, int soTimeoutMillis,
Expand All @@ -264,7 +277,7 @@ public static DefaultJedisClientConfig create(int connectionTimeoutMillis, int s
return new DefaultJedisClientConfig(null,
connectionTimeoutMillis, soTimeoutMillis, blockingSocketTimeoutMillis,
new DefaultRedisCredentialsProvider(new DefaultRedisCredentials(user, password)), database,
clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, null);
clientName, ssl, sslSocketFactory, sslParameters, hostnameVerifier, hostAndPortMapper, null, null);
}

public static DefaultJedisClientConfig copyConfig(JedisClientConfig copy) {
Expand All @@ -273,6 +286,6 @@ public static DefaultJedisClientConfig copyConfig(JedisClientConfig copy) {
copy.getBlockingSocketTimeoutMillis(), copy.getCredentialsProvider(),
copy.getDatabase(), copy.getClientName(), copy.isSsl(), copy.getSslSocketFactory(),
copy.getSslParameters(), copy.getHostnameVerifier(), copy.getHostAndPortMapper(),
copy.getClientSetInfoConfig());
copy.getClientSetInfoConfig(), copy.getClientCapaConfig());
}
}
4 changes: 4 additions & 0 deletions src/main/java/io/jackey/JedisClientConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,8 @@ default HostAndPortMapper getHostAndPortMapper() {
default ClientSetInfoConfig getClientSetInfoConfig() {
return ClientSetInfoConfig.DEFAULT;
}

default ClientCapaConfig getClientCapaConfig() {
return ClientCapaConfig.DEFAULT;
}
}
11 changes: 10 additions & 1 deletion src/main/java/io/jackey/Protocol.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public final class Protocol {
private static final String NOAUTH_PREFIX = "NOAUTH";
private static final String WRONGPASS_PREFIX = "WRONGPASS";
private static final String NOPERM_PREFIX = "NOPERM";
private static final String REDIRECT_PREFIX = "REDIRECT ";

private Protocol() {
throw new InstantiationError("Must not instantiate this class");
Expand Down Expand Up @@ -100,6 +101,9 @@ private static void processError(final RedisInputStream is) {
// throw new JedisAskDataException(message, new HostAndPort(askInfo[1],
// Integer.parseInt(askInfo[2])), Integer.parseInt(askInfo[0]));
throw new JedisAskDataException(message, HostAndPort.from(askInfo[1]), Integer.parseInt(askInfo[0]));
} else if (message.startsWith(REDIRECT_PREFIX)) {
String host = parseTargetHost(message);
throw new JedisRedirectionException(message, HostAndPort.from(host), -1); // slot -1 means standalone
} else if (message.startsWith(CLUSTERDOWN_PREFIX)) {
throw new JedisClusterException(message);
} else if (message.startsWith(BUSY_PREFIX)) {
Expand Down Expand Up @@ -140,6 +144,11 @@ private static String[] parseTargetHostAndSlot(String clusterRedirectResponse) {
return response;
}

private static String parseTargetHost(String clusterRedirectResponse) {
String[] messageInfo = clusterRedirectResponse.split(" ");
return messageInfo[1];
}

private static Object process(final RedisInputStream is) {
final byte b = is.readByte();
//System.out.println((char) b);
Expand Down Expand Up @@ -309,7 +318,7 @@ public static enum Keyword implements Rawable {
DELETE, LIBRARYNAME, WITHCODE, DESCRIPTION, GETKEYS, GETKEYSANDFLAGS, DOCS, FILTERBY, DUMP,
MODULE, ACLCAT, PATTERN, DOCTOR, LATEST, HISTORY, USAGE, SAMPLES, PURGE, STATS, LOADEX, CONFIG, ARGS, RANK,
NOW, VERSION, ADDR, SKIPME, USER, LADDR,
CHANNELS, NUMPAT, NUMSUB, SHARDCHANNELS, SHARDNUMSUB, NOVALUES, MAXAGE;
CHANNELS, NUMPAT, NUMSUB, SHARDCHANNELS, SHARDNUMSUB, NOVALUES, MAXAGE, CAPA;

private final byte[] raw;

Expand Down
26 changes: 26 additions & 0 deletions src/main/java/io/jackey/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,32 @@ protected UnifiedJedis(ClusterConnectionProvider provider, int maxAttempts, Dura
new ClusterCommandObjects(), protocol);
}

/**
* Constructs a {@code UnifiedJedis} instance with the given parameters.
*
* @param provider the connection provider for redirect handling
* @param maxAttempts the maximum number of attempts to execute a command before giving up
* @param maxTotalRetriesDuration the maximum total duration to retry commands before giving up
*/
public UnifiedJedis(RedirectConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration) {
this(new RedirectCommandExecutor(provider, maxAttempts, maxTotalRetriesDuration), provider);
}

/**
* A constructor with an additional parameter for customizing the Redis communication protocol.
*
* @param provider the connection provider for redirect handling
* @param maxAttempts the maximum number of attempts to execute a command before giving up
* @param maxTotalRetriesDuration the maximum total duration to retry commands before giving up
* @param protocol the Redis protocol implementation to use
*/
protected UnifiedJedis(RedirectConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration,
RedisProtocol protocol) {
this(new RedirectCommandExecutor(provider, maxAttempts, maxTotalRetriesDuration), provider, new CommandObjects(),
protocol);
}


/**
* @deprecated Sharding/Sharded feature will be removed in next major release.
*/
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/io/jackey/args/ClientAttributeOption.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
import io.jackey.util.SafeEncoder;

/**
* CLIENT SETINFO command attr option
* CLIENT command attr option
* since redis 7.2
*/
public enum ClientAttributeOption implements Rawable {
LIB_NAME("LIB-NAME"),
LIB_VER("LIB-VER");
LIB_VER("LIB-VER"),
REDIRECT("REDIRECT");

private final byte[] raw;

Expand Down
Loading

0 comments on commit f750caf

Please sign in to comment.