diff --git a/br/pkg/restore/snap_client/client.go b/br/pkg/restore/snap_client/client.go index 7dcd37189a246..5883ce0e2e13f 100644 --- a/br/pkg/restore/snap_client/client.go +++ b/br/pkg/restore/snap_client/client.go @@ -76,6 +76,8 @@ const minBatchDdlSize = 1 type SnapClient struct { restorer restore.SstRestorer + // Use a closure to lazy load checkpoint runner + getRestorerFn func(*checkpoint.CheckpointRunner[checkpoint.RestoreKeyType, checkpoint.RestoreValueType]) restore.SstRestorer // Tool clients used by SnapClient pdClient pd.Client pdHTTPClient pdhttp.Client @@ -148,7 +150,8 @@ type SnapClient struct { rewriteMode RewriteMode // checkpoint information for snapshot restore - checkpointRunner *checkpoint.CheckpointRunner[checkpoint.RestoreKeyType, checkpoint.RestoreValueType] + checkpointRunner *checkpoint.CheckpointRunner[checkpoint.RestoreKeyType, checkpoint.RestoreValueType] + checkpointChecksum map[int64]*checkpoint.ChecksumItem } @@ -168,7 +171,10 @@ func NewRestoreClient( } } -func (rc *SnapClient) GetRestorer() restore.SstRestorer { +func (rc *SnapClient) GetRestorer(checkpointRunner *checkpoint.CheckpointRunner[checkpoint.RestoreKeyType, checkpoint.RestoreValueType]) restore.SstRestorer { + if rc.restorer == nil { + rc.restorer = rc.getRestorerFn(checkpointRunner) + } return rc.restorer } @@ -389,7 +395,10 @@ func (rc *SnapClient) InitCheckpoint( return checkpointSetWithTableID, nil, errors.Trace(err) } rc.checkpointRunner, err = checkpoint.StartCheckpointRunnerForRestore(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName) - return checkpointSetWithTableID, checkpointClusterConfig, errors.Trace(err) + if err != nil { + return checkpointSetWithTableID, nil, errors.Trace(err) + } + return checkpointSetWithTableID, checkpointClusterConfig, nil } func (rc *SnapClient) WaitForFinishCheckpoint(ctx context.Context, flush bool) { @@ -539,7 +548,9 @@ func (rc *SnapClient) initClients(ctx context.Context, backend *backuppb.Storage return errors.Trace(err) } // Raw/Txn restore are not support checkpoint for now - rc.restorer = restore.NewSimpleSstRestorer(ctx, fileImporter, rc.workerPool, nil) + rc.getRestorerFn = func(checkpointRunner *checkpoint.CheckpointRunner[checkpoint.RestoreKeyType, checkpoint.RestoreValueType]) restore.SstRestorer { + return restore.NewSimpleSstRestorer(ctx, fileImporter, rc.workerPool, nil) + } } else { // or create a fileImporter with the cluster API version fileImporter, err = NewSnapFileImporter( @@ -547,7 +558,9 @@ func (rc *SnapClient) initClients(ctx context.Context, backend *backuppb.Storage if err != nil { return errors.Trace(err) } - rc.restorer = restore.NewMultiTablesRestorer(ctx, fileImporter, rc.workerPool, rc.checkpointRunner) + rc.getRestorerFn = func(checkpointRunner *checkpoint.CheckpointRunner[checkpoint.RestoreKeyType, checkpoint.RestoreValueType]) restore.SstRestorer { + return restore.NewMultiTablesRestorer(ctx, fileImporter, rc.workerPool, checkpointRunner) + } } return nil } diff --git a/br/pkg/restore/snap_client/tikv_sender.go b/br/pkg/restore/snap_client/tikv_sender.go index 57f73835beda7..66909d8ac5a7b 100644 --- a/br/pkg/restore/snap_client/tikv_sender.go +++ b/br/pkg/restore/snap_client/tikv_sender.go @@ -385,9 +385,10 @@ func (rc *SnapClient) RestoreSSTFiles( } }) - retErr = rc.restorer.GoRestore(onProgress, tableIDWithFilesGroup...) + r := rc.GetRestorer(rc.checkpointRunner) + retErr = r.GoRestore(onProgress, tableIDWithFilesGroup...) if retErr != nil { return retErr } - return rc.restorer.WaitUntilFinish() + return r.WaitUntilFinish() } diff --git a/br/pkg/task/restore_raw.go b/br/pkg/task/restore_raw.go index acb2e48041e64..1680e60472f7b 100644 --- a/br/pkg/task/restore_raw.go +++ b/br/pkg/task/restore_raw.go @@ -163,11 +163,11 @@ func RunRestoreRaw(c context.Context, g glue.Glue, cmdName string, cfg *RestoreR defer restore.RestorePostWork(ctx, importModeSwitcher, restoreSchedulers, cfg.Online) start := time.Now() - err = client.GetRestorer().GoRestore(onProgress, restore.CreateUniqueFileSets(files)) + err = client.GetRestorer(nil).GoRestore(onProgress, restore.CreateUniqueFileSets(files)) if err != nil { return errors.Trace(err) } - err = client.GetRestorer().WaitUntilFinish() + err = client.GetRestorer(nil).WaitUntilFinish() if err != nil { return errors.Trace(err) } diff --git a/br/pkg/task/restore_txn.go b/br/pkg/task/restore_txn.go index 2af64a59602cc..16d8e099f659d 100644 --- a/br/pkg/task/restore_txn.go +++ b/br/pkg/task/restore_txn.go @@ -102,11 +102,11 @@ func RunRestoreTxn(c context.Context, g glue.Glue, cmdName string, cfg *Config) } defer restore.RestorePostWork(ctx, importModeSwitcher, restoreSchedulers, false) - err = client.GetRestorer().GoRestore(onProgress, restore.CreateUniqueFileSets(files)) + err = client.GetRestorer(nil).GoRestore(onProgress, restore.CreateUniqueFileSets(files)) if err != nil { return errors.Trace(err) } - err = client.GetRestorer().WaitUntilFinish() + err = client.GetRestorer(nil).WaitUntilFinish() if err != nil { return errors.Trace(err) } diff --git a/br/tests/br_restore_checkpoint/run.sh b/br/tests/br_restore_checkpoint/run.sh index da45692cdcb62..7fe1a0e678773 100644 --- a/br/tests/br_restore_checkpoint/run.sh +++ b/br/tests/br_restore_checkpoint/run.sh @@ -69,16 +69,20 @@ if [ $restore_fail -ne 1 ]; then fi # PITR with checkpoint but failed in the log restore metakv stage -export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/restore/snap_client/corrupt-files=return(\"only-last-table-files\");\ -github.com/pingcap/tidb/br/pkg/restore/log_client/failed-after-id-maps-saved=return(true)" +export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/restore/snap_client/corrupt-files=return(\"only-last-table-files\")" +export GO_FAILPOINTS=$GO_FAILPOINTS";github.com/pingcap/tidb/br/pkg/restore/log_client/failed-after-id-maps-saved=return(true)" restore_fail=0 run_br --pd $PD_ADDR restore point --full-backup-storage "local://$TEST_DIR/$PREFIX/full" -s "local://$TEST_DIR/$PREFIX/log" || restore_fail=1 export GO_FAILPOINTS="" if [ $restore_fail -ne 1 ]; then - echo 'PITR success' + echo 'PITR success, but should fail' exit 1 fi +# check the snapshot restore has checkpoint data +run_sql 'select count(*) from '"__TiDB_BR_Temporary_Snapshot_Restore_Checkpoint"'.`cpt_data`;' +check_contains "count(*): 1" + # PITR with checkpoint but failed in the log restore datakv stage # skip the snapshot restore stage export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/task/corrupt-files=return(\"corrupt-last-table-files\")"