Skip to content

Commit

Permalink
Replica flush the old data after RDB file is ok in disk-based replica…
Browse files Browse the repository at this point in the history
…tion

Call emptyData right before rdbLoad to prevent errors in the middle
and we drop the replication stream and leaving an empty database.

Signed-off-by: Binbin <[email protected]>
  • Loading branch information
enjoy-binbin committed Aug 20, 2024
1 parent 7795152 commit 931f6e6
Showing 1 changed file with 21 additions and 6 deletions.
27 changes: 21 additions & 6 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -2173,11 +2173,6 @@ void readSyncBulkPayload(connection *conn) {
temp_functions_lib_ctx = functionsLibCtxCreate();

moduleFireServerEvent(VALKEYMODULE_EVENT_REPL_ASYNC_LOAD, VALKEYMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED, NULL);
} else {
replicationAttachToNewPrimary();

serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data");
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
}

/* Before loading the DB into memory we need to delete the readable
Expand All @@ -2186,7 +2181,6 @@ void readSyncBulkPayload(connection *conn) {
* time for non blocking loading. */
connSetReadHandler(conn, NULL);

serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory");
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (use_diskless_load) {
rio rdb;
Expand All @@ -2206,6 +2200,14 @@ void readSyncBulkPayload(connection *conn) {
dbarray = diskless_load_tempDb;
functions_lib_ctx = temp_functions_lib_ctx;
} else {
/* We will soon start loading the RDB from socket, the replication history is changed,
* we must discard the cached primary structure and force resync of sub-replicas. */
replicationAttachToNewPrimary();

/* Even though we are on-empty-db and the database is empty, we still call emptyData. */
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data");
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);

dbarray = server.db;
functions_lib_ctx = functionsLibCtxGetCurrent();
functionsLibCtxClear(functions_lib_ctx);
Expand All @@ -2217,6 +2219,8 @@ void readSyncBulkPayload(connection *conn) {
* We'll restore it when the RDB is received. */
connBlock(conn);
connRecvTimeout(conn, server.repl_timeout * 1000);

serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory");
startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION, asyncLoading);

int loadingFailed = 0;
Expand Down Expand Up @@ -2325,6 +2329,17 @@ void readSyncBulkPayload(connection *conn) {
return;
}

/* We will soon start loading the RDB from disk, the replication history is changed,
* we must discard the cached primary structure and force resync of sub-replicas. */
replicationAttachToNewPrimary();

/* Empty the databases only after the RDB file is ok, that is, before the RDB file
* is actually loaded, in case we encounter an error and drop the replication stream
* and leave an empty database. */
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data");
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);

serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory");
if (rdbLoad(server.rdb_filename, &rsi, RDBFLAGS_REPLICATION) != RDB_OK) {
serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization "
"DB from disk, check server logs.");
Expand Down

0 comments on commit 931f6e6

Please sign in to comment.