Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replica flush the old data after RDB file is ok in disk-based replication #926

Merged
merged 5 commits into from
Sep 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 23 additions & 6 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -2173,11 +2173,6 @@ void readSyncBulkPayload(connection *conn) {
temp_functions_lib_ctx = functionsLibCtxCreate();

moduleFireServerEvent(VALKEYMODULE_EVENT_REPL_ASYNC_LOAD, VALKEYMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED, NULL);
} else {
replicationAttachToNewPrimary();

serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data");
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
}

/* Before loading the DB into memory we need to delete the readable
Expand All @@ -2186,7 +2181,6 @@ void readSyncBulkPayload(connection *conn) {
* time for non blocking loading. */
connSetReadHandler(conn, NULL);

serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory");
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (use_diskless_load) {
rio rdb;
Expand All @@ -2206,6 +2200,14 @@ void readSyncBulkPayload(connection *conn) {
dbarray = diskless_load_tempDb;
functions_lib_ctx = temp_functions_lib_ctx;
} else {
/* We will soon start loading the RDB from socket, the replication history is changed,
* we must discard the cached primary structure and force resync of sub-replicas. */
replicationAttachToNewPrimary();

/* Even though we are on-empty-db and the database is empty, we still call emptyData. */
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data");
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);

dbarray = server.db;
functions_lib_ctx = functionsLibCtxGetCurrent();
functionsLibCtxClear(functions_lib_ctx);
Expand All @@ -2217,6 +2219,8 @@ void readSyncBulkPayload(connection *conn) {
* We'll restore it when the RDB is received. */
connBlock(conn);
connRecvTimeout(conn, server.repl_timeout * 1000);

serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory");
startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION, asyncLoading);

int loadingFailed = 0;
Expand Down Expand Up @@ -2249,6 +2253,7 @@ void readSyncBulkPayload(connection *conn) {
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding temporary DB in background");
} else {
/* Remove the half-loaded data in case we started with an empty replica. */
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data");
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
}

Expand Down Expand Up @@ -2325,6 +2330,17 @@ void readSyncBulkPayload(connection *conn) {
return;
}

/* We will soon start loading the RDB from disk, the replication history is changed,
* we must discard the cached primary structure and force resync of sub-replicas. */
replicationAttachToNewPrimary();

/* Empty the databases only after the RDB file is ok, that is, before the RDB file
* is actually loaded, in case we encounter an error and drop the replication stream
* and leave an empty database. */
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data");
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);

serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory");
if (rdbLoad(server.rdb_filename, &rsi, RDBFLAGS_REPLICATION) != RDB_OK) {
serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization "
"DB from disk, check server logs.");
Expand All @@ -2337,6 +2353,7 @@ void readSyncBulkPayload(connection *conn) {
}

/* If disk-based RDB loading fails, remove the half-loaded dataset. */
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data");
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);

/* Note that there's no point in restarting the AOF on sync failure,
Expand Down
40 changes: 40 additions & 0 deletions tests/integration/replication.tcl
Original file line number Diff line number Diff line change
Expand Up @@ -1477,3 +1477,43 @@ start_server {tags {"repl external:skip"}} {
}
}
}

start_server {tags {"repl external:skip"}} {
set replica [srv 0 client]
$replica set replica_key replica_value

start_server {} {
set primary [srv 0 client]
set primary_host [srv 0 host]
set primary_port [srv 0 port]
$primary set primary_key primary_value

test {Replica keep the old data if RDB file save fails in disk-based replication} {
# Create a folder called 'dump.rdb' to trigger temp-rdb rename failure
# and it will cause RDB file save to fail at the rename.
set dump_rdb [file join [lindex [$replica config get dir] 1] dump.rdb]
if {[file exists $dump_rdb]} { exec rm -f $dump_rdb }
exec mkdir -p $dump_rdb

$replica replicaof $primary_host $primary_port

# Waiting for the rename to fail.
wait_for_log_messages -1 {"*Failed trying to rename the temp DB into dump.rdb*"} 0 1000 10

# Make sure the replica has not completed sync and keep the old data.
assert_equal {} [$replica get primary_key]
assert_equal {replica_value} [$replica get replica_key]

# Remove the test folder and make the rename success
exec rm -rf $dump_rdb
wait_for_condition 500 100 {
[$replica get primary_key] == {primary_value} &&
[$replica get replica_key] == {}
} else {
puts [$primary keys *]
puts [$replica keys *]
fail "Replication failed."
}
}
}
}
Loading