From 273ccbf4aeac779633fe2ef5381998ebf6e58e27 Mon Sep 17 00:00:00 2001 From: Binbin Date: Sat, 31 Aug 2024 15:59:18 +0800 Subject: [PATCH 1/2] Fix data loss when the old primary takes over the slots after online There is a race in clusterHandleConfigEpochCollision, which may cause the old primary node to take over the slots again after coming online and cause data loss. It happens when the old primary and the new primary have the same config epoch, and the old primary has a smaller node id and win the collision. In this case, the old primary and the new primary are in the same shard, we are not sure which is strictly the latest. To prevent data loss, now in clusterHandleConfigEpochCollision we will let the node with the larger offset win the conflict. In addition to this change, when a node increments the config epoch throught conflicts, or CLUSTER FAILOVER TAKEOVER, or CLUSTER BUMPEPOCH, we will send PONGs to all ndoes to allow the cluster to reach consensus on the new config epoch more quickly. This also can closes #969. Signed-off-by: Binbin Signed-off-by: Binbin --- src/cluster_legacy.c | 39 +++++++++++++++++++++++--- tests/unit/cluster/manual-takeover.tcl | 4 +++ 2 files changed, 39 insertions(+), 4 deletions(-) diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 21bdd09919..79fba255bb 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -122,6 +122,10 @@ int verifyClusterNodeId(const char *name, int length); sds clusterEncodeOpenSlotsAuxField(int rdbflags); int clusterDecodeOpenSlotsAuxField(int rdbflags, sds s); +#define CLUSTER_BROADCAST_ALL 0 +#define CLUSTER_BROADCAST_LOCAL_REPLICAS 1 +void clusterBroadcastPong(int target); + /* Only primaries that own slots have voting rights. * Returns 1 if the node has voting rights, otherwise returns 0. */ static inline int clusterNodeIsVotingPrimary(clusterNode *n) { @@ -1830,6 +1834,7 @@ int clusterBumpConfigEpochWithoutConsensus(void) { if (myself->configEpoch == 0 || myself->configEpoch != maxEpoch) { server.cluster->currentEpoch++; myself->configEpoch = server.cluster->currentEpoch; + clusterBroadcastPong(CLUSTER_BROADCAST_ALL); clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_FSYNC_CONFIG); serverLog(LL_NOTICE, "New configEpoch set to %llu", (unsigned long long)myself->configEpoch); return C_OK; @@ -1880,6 +1885,14 @@ int clusterBumpConfigEpochWithoutConsensus(void) { * with the conflicting epoch (the 'sender' node), it will assign itself * the greatest configuration epoch currently detected among nodes plus 1. * + * The above is an optimistic scenario. It this node and the sender node + * are in the same shard, their conflict in configEpoch indicates that a + * node has experienced a partition. Or for example, the old primary node + * was down then up again, and the new primary node won the election. In + * this case, we need to take the replication offset into consideration, + * otherwise, if the old primary wins the collision, we will lose some of + * the new primary's data. + * * This means that even if there are multiple nodes colliding, the node * with the greatest Node ID never moves forward, so eventually all the nodes * end with a different configuration epoch. @@ -1888,11 +1901,31 @@ void clusterHandleConfigEpochCollision(clusterNode *sender) { /* Prerequisites: nodes have the same configEpoch and are both primaries. */ if (sender->configEpoch != myself->configEpoch || !clusterNodeIsPrimary(sender) || !clusterNodeIsPrimary(myself)) return; - /* Don't act if the colliding node has a smaller Node ID. */ - if (memcmp(sender->name, myself->name, CLUSTER_NAMELEN) <= 0) return; + + /* If sender and myself are in the same shard, the one with the + * bigger offset will win. Otherwise if sender and myself are not + * in the same shard, the one will the lexicographically small + * Node ID will win.*/ + if (areInSameShard(sender, myself)) { + long long sender_offset = getNodeReplicationOffset(sender); + long long myself_offset = getNodeReplicationOffset(myself); + if (sender_offset > myself_offset) { + /* Don't act if the colliding node has a bigger offset. */ + return; + } else if (sender_offset == myself_offset) { + /* If the offset are the same, we fall back to Node ID logic. + * Don't act if the colliding node has a smaller Node ID. */ + if (memcmp(sender->name, myself->name, CLUSTER_NAMELEN) <= 0) return; + } + } else { + /* Don't act if the colliding node has a smaller Node ID. */ + if (memcmp(sender->name, myself->name, CLUSTER_NAMELEN) <= 0) return; + } + /* Get the next ID available at the best of this node knowledge. */ server.cluster->currentEpoch++; myself->configEpoch = server.cluster->currentEpoch; + clusterBroadcastPong(CLUSTER_BROADCAST_ALL); clusterSaveConfigOrDie(1); serverLog(LL_NOTICE, "configEpoch collision with node %.40s (%s). configEpoch set to %llu", sender->name, sender->human_nodename, (unsigned long long)myself->configEpoch); @@ -4001,8 +4034,6 @@ void clusterSendPing(clusterLink *link, int type) { * CLUSTER_BROADCAST_ALL -> All known instances. * CLUSTER_BROADCAST_LOCAL_REPLICAS -> All replicas in my primary-replicas ring. */ -#define CLUSTER_BROADCAST_ALL 0 -#define CLUSTER_BROADCAST_LOCAL_REPLICAS 1 void clusterBroadcastPong(int target) { dictIterator *di; dictEntry *de; diff --git a/tests/unit/cluster/manual-takeover.tcl b/tests/unit/cluster/manual-takeover.tcl index a175f79342..dcbbcd1c59 100644 --- a/tests/unit/cluster/manual-takeover.tcl +++ b/tests/unit/cluster/manual-takeover.tcl @@ -22,8 +22,12 @@ set paused_pid [srv 0 pid] set paused_pid1 [srv -1 pid] set paused_pid2 [srv -2 pid] test "Killing majority of master nodes" { + # Bumping the epochs to increase the chance of conflicts. + R 0 cluster bumpepoch pause_process $paused_pid + R 1 cluster bumpepoch pause_process $paused_pid1 + R 2 cluster bumpepoch pause_process $paused_pid2 } From b395193f3c3bf2c62ec7d894a8152d2cc36a59a6 Mon Sep 17 00:00:00 2001 From: Binbin Date: Sat, 31 Aug 2024 19:42:23 +0800 Subject: [PATCH 2/2] print the logs first and then save the config, in case the save fails and we lost the context Signed-off-by: Binbin --- src/cluster_legacy.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index 79fba255bb..352b4ff4e2 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -1926,9 +1926,9 @@ void clusterHandleConfigEpochCollision(clusterNode *sender) { server.cluster->currentEpoch++; myself->configEpoch = server.cluster->currentEpoch; clusterBroadcastPong(CLUSTER_BROADCAST_ALL); - clusterSaveConfigOrDie(1); serverLog(LL_NOTICE, "configEpoch collision with node %.40s (%s). configEpoch set to %llu", sender->name, sender->human_nodename, (unsigned long long)myself->configEpoch); + clusterSaveConfigOrDie(1); } /* -----------------------------------------------------------------------------