Skip to content

Commit

Permalink
Implement: CLUSTER REPLICATE NO ONE
Browse files Browse the repository at this point in the history
Signed-off-by: Sergey Kolosov <[email protected]>
  • Loading branch information
skolosov-snap committed Feb 20, 2025
1 parent 591ae9a commit 95c0b42
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 8 deletions.
31 changes: 30 additions & 1 deletion src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -7029,7 +7029,36 @@ int clusterCommandSpecial(client *c) {
clusterDelNode(n);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG);
addReply(c, shared.ok);
} else if (!strcasecmp(c->argv[1]->ptr, "replicate") && c->argc == 3) {
} else if (!strcasecmp(c->argv[1]->ptr, "replicate") && (c->argc == 3 || c->argc == 4)) {
/* CLUSTER REPLICATE (<NODE ID> | NO ONE)*/
if (c->argc == 4) {
/* CLUSTER REPLICATE NO ONE */
if (strcasecmp(c->argv[2]->ptr, "NO") != 0 || strcasecmp(c->argv[3]->ptr, "ONE") != 0) {
addReplySubcommandSyntaxError(c);
return 1;
}
if (nodeIsPrimary(myself)) {
addReply(c, shared.ok);
return 1;
}
sds client = catClientInfoShortString(sdsempty(), c, server.hide_user_data_from_log);
serverLog(LL_NOTICE, "Stop replication and turning myself into empty primary (request from '%s').", client);
sdsfree(client);
clusterSetNodeAsPrimary(myself);
clusterPromoteSelfToPrimary();
emptyData(-1, server.repl_replica_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS, NULL);
clusterCloseAllSlots();
resetManualFailover();

// moving new primary to its own shard.
char new_shard_id[CLUSTER_NAMELEN];
getRandomHexChars(new_shard_id, CLUSTER_NAMELEN);
updateShardId(myself, new_shard_id);

clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_BROADCAST_ALL);
addReply(c, shared.ok);
return 1;
}
/* CLUSTER REPLICATE <NODE ID> */
/* Lookup the specified node in our table. */
clusterNode *n = clusterLookupNode(c->argv[2]->ptr, sdslen(c->argv[2]->ptr));
Expand Down
20 changes: 17 additions & 3 deletions src/commands.def
Original file line number Diff line number Diff line change
Expand Up @@ -768,7 +768,9 @@ struct COMMAND_ARG CLUSTER_REPLICAS_Args[] = {

#ifndef SKIP_CMD_HISTORY_TABLE
/* CLUSTER REPLICATE history */
#define CLUSTER_REPLICATE_History NULL
commandHistory CLUSTER_REPLICATE_History[] = {
{"8.1.0","Added support of 'NO ONE' arg instead of <node-id> resulting into detaching replica from primary node."},
};
#endif

#ifndef SKIP_CMD_TIPS_TABLE
Expand All @@ -781,9 +783,21 @@ struct COMMAND_ARG CLUSTER_REPLICAS_Args[] = {
#define CLUSTER_REPLICATE_Keyspecs NULL
#endif

/* CLUSTER REPLICATE args no_one argument table */
struct COMMAND_ARG CLUSTER_REPLICATE_args_no_one_Subargs[] = {
{MAKE_ARG("no",ARG_TYPE_PURE_TOKEN,-1,"NO",NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("one",ARG_TYPE_PURE_TOKEN,-1,"ONE",NULL,NULL,CMD_ARG_NONE,0,NULL)},
};

/* CLUSTER REPLICATE args argument table */
struct COMMAND_ARG CLUSTER_REPLICATE_args_Subargs[] = {
{MAKE_ARG("node-id",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("no-one",ARG_TYPE_BLOCK,-1,NULL,NULL,"8.1.0",CMD_ARG_NONE,2,NULL),.subargs=CLUSTER_REPLICATE_args_no_one_Subargs},
};

/* CLUSTER REPLICATE argument table */
struct COMMAND_ARG CLUSTER_REPLICATE_Args[] = {
{MAKE_ARG("node-id",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE,0,NULL)},
{MAKE_ARG("args",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_NONE,2,NULL),.subargs=CLUSTER_REPLICATE_args_Subargs},
};

/********** CLUSTER RESET ********************/
Expand Down Expand Up @@ -1024,7 +1038,7 @@ struct COMMAND_STRUCT CLUSTER_Subcommands[] = {
{MAKE_CMD("myshardid","Returns the shard ID of a node.","O(1)","7.2.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_MYSHARDID_History,0,CLUSTER_MYSHARDID_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_MYSHARDID_Keyspecs,0,NULL,0)},
{MAKE_CMD("nodes","Returns the cluster configuration for a node.","O(N) where N is the total number of Cluster nodes","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_NODES_History,0,CLUSTER_NODES_Tips,1,clusterCommand,2,CMD_LOADING|CMD_STALE,0,CLUSTER_NODES_Keyspecs,0,NULL,0)},
{MAKE_CMD("replicas","Lists the replica nodes of a primary node.","O(N) where N is the number of replicas.","5.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_REPLICAS_History,0,CLUSTER_REPLICAS_Tips,1,clusterCommand,3,CMD_ADMIN|CMD_STALE,0,CLUSTER_REPLICAS_Keyspecs,0,NULL,1),.args=CLUSTER_REPLICAS_Args},
{MAKE_CMD("replicate","Configure a node as replica of a primary node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_REPLICATE_History,0,CLUSTER_REPLICATE_Tips,0,clusterCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_REPLICATE_Keyspecs,0,NULL,1),.args=CLUSTER_REPLICATE_Args},
{MAKE_CMD("replicate","Configure a node as replica of a primary node or detaches replica from primary.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_REPLICATE_History,1,CLUSTER_REPLICATE_Tips,0,clusterCommand,-3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_REPLICATE_Keyspecs,0,NULL,1),.args=CLUSTER_REPLICATE_Args},
{MAKE_CMD("reset","Resets a node.","O(N) where N is the number of known nodes. The command may execute a FLUSHALL as a side effect.","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_RESET_History,0,CLUSTER_RESET_Tips,0,clusterCommand,-2,CMD_ADMIN|CMD_STALE|CMD_NOSCRIPT,0,CLUSTER_RESET_Keyspecs,0,NULL,1),.args=CLUSTER_RESET_Args},
{MAKE_CMD("saveconfig","Forces a node to save the cluster configuration to disk.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SAVECONFIG_History,0,CLUSTER_SAVECONFIG_Tips,0,clusterCommand,2,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SAVECONFIG_Keyspecs,0,NULL,0)},
{MAKE_CMD("set-config-epoch","Sets the configuration epoch for a new node.","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,"cluster",COMMAND_GROUP_CLUSTER,CLUSTER_SET_CONFIG_EPOCH_History,0,CLUSTER_SET_CONFIG_EPOCH_Tips,0,clusterCommand,3,CMD_NO_ASYNC_LOADING|CMD_ADMIN|CMD_STALE,0,CLUSTER_SET_CONFIG_EPOCH_Keyspecs,0,NULL,1),.args=CLUSTER_SET_CONFIG_EPOCH_Args},
Expand Down
37 changes: 33 additions & 4 deletions src/commands/cluster-replicate.json
Original file line number Diff line number Diff line change
@@ -1,21 +1,50 @@
{
"REPLICATE": {
"summary": "Configure a node as replica of a primary node.",
"summary": "Configure a node as replica of a primary node or detaches replica from primary.",
"complexity": "O(1)",
"group": "cluster",
"since": "3.0.0",
"arity": 3,
"arity": -3,
"container": "CLUSTER",
"function": "clusterCommand",
"history": [
[
"8.1.0",
"Added support of 'NO ONE' arg instead of <node-id> resulting into detaching replica from primary node."
]
],
"command_flags": [
"NO_ASYNC_LOADING",
"ADMIN",
"STALE"
],
"arguments": [
{
"name": "node-id",
"type": "string"
"name": "args",
"type": "oneof",
"arguments": [
{
"name": "node-id",
"type": "string"
},
{
"name": "no-one",
"type": "block",
"since": "8.1.0",
"arguments": [
{
"name": "no",
"type": "pure-token",
"token": "NO"
},
{
"name": "one",
"type": "pure-token",
"token": "ONE"
}
]
}
]
}
],
"reply_schema": {
Expand Down
53 changes: 53 additions & 0 deletions tests/cluster/tests/12-replication.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Replication test.
#
# Check CLUSTER REPLICATE commands.

source "../tests/includes/init-tests.tcl"

# Create a cluster with 3 master and 6 slaves, to make sure there are no
# empty masters and make rebalancing simpler to handle during the test.
test "Create a 3 shards cluster" {
cluster_create_with_continuous_slots 3 6
}

test "Cluster is up" {
assert_cluster_state ok
}

set replica_node 3

test "CLUSTER REPLICATE NO ONE should turn node into empty primary" {
set replica_node_id [R $replica_node CLUSTER MYID]
# make sure node is replica
wait_for_condition 100 100 {
[string first "slave" [R $replica_node ROLE]] >= 0
} else {
puts "R $replica_node ROLE: [R $replica_node ROLE]"
fail "R $replica_node didn't assume replica role in time"
}

assert_equal "OK" [R $replica_node CLUSTER REPLICATE NO ONE]

# make sure node is turned into primary
wait_for_condition 100 100 {
[string first "master" [R $replica_node ROLE]] >= 0
} else {
puts "R $replica_node ROLE: [R $replica_node ROLE]"
fail "R $replica_node didn't assume primary role in time"
}

# checking that new primary has no slots
set shards_cfg [R $replica_node CLUSTER SHARDS]
foreach shard_cfg $shards_cfg {
set slots [dict get $shard_cfg slots]
if {[llength $slots] > 0} {
set nodes [dict get $shard_cfg nodes]
foreach node $nodes {
if {[dict get $node id] eq $replica_node_id} {
puts "R $replica_node/$replica_node_id owns some slots: $slots"
fail "R $replica_node/$replica_node_id should not own any slots"
}
}
}
}
}

0 comments on commit 95c0b42

Please sign in to comment.