Skip to content

Commit

Permalink
Fix data loss when replica do a failover with a old history repl offs…
Browse files Browse the repository at this point in the history
…et (#885)
  • Loading branch information
enjoy-binbin authored Aug 21, 2024
1 parent 829243e commit 910fd54
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 25 deletions.
57 changes: 38 additions & 19 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ int clusterAddSlot(clusterNode *n, int slot);
int clusterDelSlot(int slot);
int clusterDelNodeSlots(clusterNode *node);
int clusterNodeSetSlotBit(clusterNode *n, int slot);
void clusterSetPrimary(clusterNode *n, int closeSlots);
static void clusterSetPrimary(clusterNode *n, int closeSlots, int full_sync_required);
void clusterHandleReplicaFailover(void);
void clusterHandleReplicaMigration(int max_replicas);
int bitmapTestBit(unsigned char *bitmap, int pos);
Expand Down Expand Up @@ -2370,7 +2370,7 @@ int nodeUpdateAddressIfNeeded(clusterNode *node, clusterLink *link, clusterMsg *
/* Check if this is our primary and we have to change the
* replication target as well. */
if (nodeIsReplica(myself) && myself->replicaof == node)
replicationSetPrimary(node->ip, getNodeDefaultReplicationPort(node));
replicationSetPrimary(node->ip, getNodeDefaultReplicationPort(node), 0);
return 1;
}

Expand Down Expand Up @@ -2432,6 +2432,9 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
return;
}

/* Sender and myself in the same shard? */
int are_in_same_shard = areInSameShard(sender, myself);

for (j = 0; j < CLUSTER_SLOTS; j++) {
if (bitmapTestBit(slots, j)) {
sender_slots++;
Expand Down Expand Up @@ -2474,7 +2477,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
* the same shard and we should retain the migrating_slots_to state
* for the slot in question */
if (server.cluster->migrating_slots_to[j] != NULL) {
if (!areInSameShard(sender, myself)) {
if (!are_in_same_shard) {
serverLog(LL_NOTICE, "Slot %d is no longer being migrated to node %.40s (%s) in shard %.40s.",
j, server.cluster->migrating_slots_to[j]->name,
server.cluster->migrating_slots_to[j]->human_nodename,
Expand Down Expand Up @@ -2595,7 +2598,7 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
* the new primary if my current config epoch is lower than the
* sender's. */
if (!new_primary && myself->replicaof != sender && sender_slots == 0 && myself->numslots == 0 &&
nodeEpoch(myself) < senderConfigEpoch && areInSameShard(sender, myself)) {
nodeEpoch(myself) < senderConfigEpoch && are_in_same_shard) {
new_primary = sender;
}

Expand All @@ -2619,16 +2622,18 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
* sender. In this case we don't reconfigure ourselves as a replica
* of the sender. */
if (new_primary && cur_primary->numslots == 0) {
if (server.cluster_allow_replica_migration || areInSameShard(sender, myself)) {
if (server.cluster_allow_replica_migration || are_in_same_shard) {
serverLog(LL_NOTICE,
"Configuration change detected. Reconfiguring myself "
"as a replica of node %.40s (%s) in shard %.40s",
sender->name, sender->human_nodename, sender->shard_id);
/* Don't clear the migrating/importing states if this is a replica that
* just gets promoted to the new primary in the shard. */
clusterSetPrimary(sender, !areInSameShard(sender, myself));
* just gets promoted to the new primary in the shard.
*
* If the sender and myself are in the same shard, try psync. */
clusterSetPrimary(sender, !are_in_same_shard, !are_in_same_shard);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
} else if ((sender_slots >= migrated_our_slots) && !areInSameShard(sender, myself)) {
} else if ((sender_slots >= migrated_our_slots) && !are_in_same_shard) {
/* When all our slots are lost to the sender and the sender belongs to
* a different shard, this is likely due to a client triggered slot
* migration. Don't reconfigure this node to migrate to the new shard
Expand Down Expand Up @@ -3383,12 +3388,19 @@ int clusterProcessPacket(clusterLink *link) {
/* Explicitly check for a replication loop before attempting the replication
* chain folding logic. */
if (myself->replicaof && myself->replicaof->replicaof && myself->replicaof->replicaof != myself) {
/* Safeguard against sub-replicas. A replica's primary can turn itself
* into a replica if its last slot is removed. If no other node takes
* over the slot, there is nothing else to trigger replica migration. */
/* Safeguard against sub-replicas.
*
* A replica's primary can turn itself into a replica if its last slot
* is removed. If no other node takes over the slot, there is nothing
* else to trigger replica migration. In this case, they are not in the
* same shard, so a full sync is required.
*
* Or a replica's primary can turn itself into a replica of its other
* replica during a failover. In this case, they are in the same shard,
* so we can try a psync. */
serverLog(LL_NOTICE, "I'm a sub-replica! Reconfiguring myself as a replica of %.40s from %.40s",
myself->replicaof->replicaof->name, myself->replicaof->name);
clusterSetPrimary(myself->replicaof->replicaof, 1);
clusterSetPrimary(myself->replicaof->replicaof, 1, !areInSameShard(myself->replicaof->replicaof, myself));
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
}

Expand Down Expand Up @@ -4692,7 +4704,9 @@ void clusterHandleReplicaMigration(int max_replicas) {
!(server.cluster_module_flags & CLUSTER_MODULE_FLAG_NO_FAILOVER)) {
serverLog(LL_NOTICE, "Migrating to orphaned primary %.40s (%s) in shard %.40s", target->name,
target->human_nodename, target->shard_id);
clusterSetPrimary(target, 1);
/* We are migrating to a different shard that has a completely different
* replication history, so a full sync is required. */
clusterSetPrimary(target, 1, 1);
}
}

Expand Down Expand Up @@ -5005,7 +5019,7 @@ void clusterCron(void) {
* enable it if we know the address of our primary and it appears to
* be up. */
if (nodeIsReplica(myself) && server.primary_host == NULL && myself->replicaof && nodeHasAddr(myself->replicaof)) {
replicationSetPrimary(myself->replicaof->ip, getNodeDefaultReplicationPort(myself->replicaof));
replicationSetPrimary(myself->replicaof->ip, getNodeDefaultReplicationPort(myself->replicaof), 0);
}

/* Abort a manual failover if the timeout is reached. */
Expand Down Expand Up @@ -5398,7 +5412,7 @@ static inline void removeAllNotOwnedShardChannelSubscriptions(void) {

/* Set the specified node 'n' as primary for this node.
* If this node is currently a primary, it is turned into a replica. */
void clusterSetPrimary(clusterNode *n, int closeSlots) {
static void clusterSetPrimary(clusterNode *n, int closeSlots, int full_sync_required) {
serverAssert(n != myself);
serverAssert(myself->numslots == 0);

Expand All @@ -5412,7 +5426,7 @@ void clusterSetPrimary(clusterNode *n, int closeSlots) {
myself->replicaof = n;
updateShardId(myself, n->shard_id);
clusterNodeAddReplica(n, myself);
replicationSetPrimary(n->ip, getNodeDefaultReplicationPort(n));
replicationSetPrimary(n->ip, getNodeDefaultReplicationPort(n), full_sync_required);
removeAllNotOwnedShardChannelSubscriptions();
resetManualFailover();

Expand Down Expand Up @@ -6343,7 +6357,9 @@ void clusterCommandSetSlot(client *c) {
"Lost my last slot during slot migration. Reconfiguring myself "
"as a replica of %.40s (%s) in shard %.40s",
n->name, n->human_nodename, n->shard_id);
clusterSetPrimary(n, 1);
/* We are migrating to a different shard that has a completely different
* replication history, so a full sync is required. */
clusterSetPrimary(n, 1, 1);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
}

Expand Down Expand Up @@ -6548,8 +6564,11 @@ int clusterCommandSpecial(client *c) {
return 1;
}

/* Set the primary. */
clusterSetPrimary(n, 1);
/* Set the primary.
* If the instance is a primary, it is an empty primary.
* If the instance is a replica, it had a totally different replication history.
* In these both cases, myself as a replica has to do a full sync. */
clusterSetPrimary(n, 1, 1);
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG);
addReply(c, shared.ok);
Expand Down
19 changes: 14 additions & 5 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -3726,7 +3726,7 @@ int cancelReplicationHandshake(int reconnect) {
}

/* Set replication to the specified primary address and port. */
void replicationSetPrimary(char *ip, int port) {
void replicationSetPrimary(char *ip, int port, int full_sync_required) {
int was_primary = server.primary_host == NULL;

sdsfree(server.primary_host);
Expand All @@ -3752,13 +3752,22 @@ void replicationSetPrimary(char *ip, int port) {
* sync with new primary. */

cancelReplicationHandshake(0);

/* Before destroying our primary state, create a cached primary using
* our own parameters, to later PSYNC with the new primary. */
if (was_primary) {
if (was_primary && !full_sync_required) {
replicationDiscardCachedPrimary();
replicationCachePrimaryUsingMyself();
}

/* If full sync is required, drop the cached primary. Doing so increases
* this replica node's election rank (delay) and reduces its chance of
* winning the election. If a replica requiring a full sync wins the
* election, it will flush valid data in the shard, causing data loss. */
if (full_sync_required) {
replicationDiscardCachedPrimary();
}

/* Fire the role change modules event. */
moduleFireServerEvent(VALKEYMODULE_EVENT_REPLICATION_ROLE_CHANGED, VALKEYMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA,
NULL);
Expand Down Expand Up @@ -3896,7 +3905,7 @@ void replicaofCommand(client *c) {
}
/* There was no previous primary or the user specified a different one,
* we can continue. */
replicationSetPrimary(c->argv[1]->ptr, port);
replicationSetPrimary(c->argv[1]->ptr, port, 0);
sds client = catClientInfoString(sdsempty(), c);
serverLog(LL_NOTICE, "REPLICAOF %s:%d enabled (user request from '%s')", server.primary_host,
server.primary_port, client);
Expand Down Expand Up @@ -4907,7 +4916,7 @@ void updateFailoverStatus(void) {
server.target_replica_port);
server.failover_state = FAILOVER_IN_PROGRESS;
/* If timeout has expired force a failover if requested. */
replicationSetPrimary(server.target_replica_host, server.target_replica_port);
replicationSetPrimary(server.target_replica_host, server.target_replica_port, 0);
return;
} else {
/* Force was not requested, so timeout. */
Expand Down Expand Up @@ -4950,6 +4959,6 @@ void updateFailoverStatus(void) {
serverLog(LL_NOTICE, "Failover target %s:%d is synced, failing over.", server.target_replica_host,
server.target_replica_port);
/* Designated replica is caught up, failover to it. */
replicationSetPrimary(server.target_replica_host, server.target_replica_port);
replicationSetPrimary(server.target_replica_host, server.target_replica_port, 0);
}
}
2 changes: 1 addition & 1 deletion src/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -3029,7 +3029,7 @@ void replicationStartPendingFork(void);
void replicationHandlePrimaryDisconnection(void);
void replicationCachePrimary(client *c);
void resizeReplicationBacklog(void);
void replicationSetPrimary(char *ip, int port);
void replicationSetPrimary(char *ip, int port, int full_sync_required);
void replicationUnsetPrimary(void);
void refreshGoodReplicasCount(void);
int checkGoodReplicasStatus(void);
Expand Down
159 changes: 159 additions & 0 deletions tests/unit/cluster/replica_migration.tcl
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
# Allocate slot 0 to the last primary and evenly distribute the remaining
# slots to the remaining primaries.
proc my_slot_allocation {masters replicas} {
set avg [expr double(16384) / [expr $masters-1]]
set slot_start 1
for {set j 0} {$j < $masters-1} {incr j} {
set slot_end [expr int(ceil(($j + 1) * $avg) - 1)]
R $j cluster addslotsrange $slot_start $slot_end
set slot_start [expr $slot_end + 1]
}
R [expr $masters-1] cluster addslots 0
}

start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout 1000 cluster-migration-barrier 999}} {
test "Migrated replica reports zero repl offset and rank, and fails to win election" {
# Write some data to primary 0, slot 1, make a small repl_offset.
for {set i 0} {$i < 1024} {incr i} {
R 0 incr key_991803
}
assert_equal {1024} [R 0 get key_991803]

# Write some data to primary 3, slot 0, make a big repl_offset.
for {set i 0} {$i < 10240} {incr i} {
R 3 incr key_977613
}
assert_equal {10240} [R 3 get key_977613]

# 10s, make sure primary 0 will hang in the save.
R 0 config set rdb-key-save-delay 100000000

# Move the slot 0 from primary 3 to primary 0
set addr "[srv 0 host]:[srv 0 port]"
set myid [R 3 CLUSTER MYID]
set code [catch {
exec src/valkey-cli {*}[valkeycli_tls_config "./tests"] --cluster rebalance $addr --cluster-weight $myid=0
} result]
if {$code != 0} {
fail "valkey-cli --cluster rebalance returns non-zero exit code, output below:\n$result"
}

# Validate that shard 3's primary and replica can convert to replicas after
# they lose the last slot.
R 3 config set cluster-replica-validity-factor 0
R 7 config set cluster-replica-validity-factor 0
R 3 config set cluster-allow-replica-migration yes
R 7 config set cluster-allow-replica-migration yes

# Shutdown primary 0.
catch {R 0 shutdown nosave}

# Wait for the replica to become a primary, and make sure
# the other primary become a replica.
wait_for_condition 1000 50 {
[s -4 role] eq {master} &&
[s -3 role] eq {slave} &&
[s -7 role] eq {slave}
} else {
puts "s -4 role: [s -4 role]"
puts "s -3 role: [s -3 role]"
puts "s -7 role: [s -7 role]"
fail "Failover does not happened"
}

# Make sure the offset of server 3 / 7 is 0.
verify_log_message -3 "*Start of election*offset 0*" 0
verify_log_message -7 "*Start of election*offset 0*" 0

# Make sure the right replica gets the higher rank.
verify_log_message -4 "*Start of election*rank #0*" 0

# Wait for the cluster to be ok.
wait_for_condition 1000 50 {
[CI 3 cluster_state] eq "ok" &&
[CI 4 cluster_state] eq "ok" &&
[CI 7 cluster_state] eq "ok"
} else {
puts "R 3: [R 3 cluster info]"
puts "R 4: [R 4 cluster info]"
puts "R 7: [R 7 cluster info]"
fail "Cluster is down"
}

# Make sure the key exists and is consistent.
R 3 readonly
R 7 readonly
wait_for_condition 1000 50 {
[R 3 get key_991803] == 1024 && [R 3 get key_977613] == 10240 &&
[R 4 get key_991803] == 1024 && [R 4 get key_977613] == 10240 &&
[R 7 get key_991803] == 1024 && [R 7 get key_977613] == 10240
} else {
puts "R 3: [R 3 keys *]"
puts "R 4: [R 4 keys *]"
puts "R 7: [R 7 keys *]"
fail "Key not consistent"
}
}
} my_slot_allocation cluster_allocate_replicas ;# start_cluster

start_cluster 4 4 {tags {external:skip cluster} overrides {cluster-node-timeout 1000 cluster-migration-barrier 999}} {
test "New non-empty replica reports zero repl offset and rank, and fails to win election" {
# Write some data to primary 0, slot 1, make a small repl_offset.
for {set i 0} {$i < 1024} {incr i} {
R 0 incr key_991803
}
assert_equal {1024} [R 0 get key_991803]

# Write some data to primary 3, slot 0, make a big repl_offset.
for {set i 0} {$i < 10240} {incr i} {
R 3 incr key_977613
}
assert_equal {10240} [R 3 get key_977613]

# 10s, make sure primary 0 will hang in the save.
R 0 config set rdb-key-save-delay 100000000

# Make server 7 a replica of server 0.
R 7 config set cluster-replica-validity-factor 0
R 7 config set cluster-allow-replica-migration yes
R 7 cluster replicate [R 0 cluster myid]

# Shutdown primary 0.
catch {R 0 shutdown nosave}

# Wait for the replica to become a primary.
wait_for_condition 1000 50 {
[s -4 role] eq {master} &&
[s -7 role] eq {slave}
} else {
puts "s -4 role: [s -4 role]"
puts "s -7 role: [s -7 role]"
fail "Failover does not happened"
}

# Make sure server 7 gets the lower rank and it's offset is 0.
verify_log_message -4 "*Start of election*rank #0*" 0
verify_log_message -7 "*Start of election*offset 0*" 0

# Wait for the cluster to be ok.
wait_for_condition 1000 50 {
[CI 4 cluster_state] eq "ok" &&
[CI 7 cluster_state] eq "ok"
} else {
puts "R 4: [R 4 cluster info]"
puts "R 7: [R 7 cluster info]"
fail "Cluster is down"
}

# Make sure the key exists and is consistent.
R 7 readonly
wait_for_condition 1000 50 {
[R 4 get key_991803] == 1024 &&
[R 7 get key_991803] == 1024
} else {
puts "R 4: [R 4 get key_991803]"
puts "R 7: [R 7 get key_991803]"
fail "Key not consistent"
}
}
} my_slot_allocation cluster_allocate_replicas ;# start_cluster

0 comments on commit 910fd54

Please sign in to comment.