diff --git a/br/pkg/checkpoint/BUILD.bazel b/br/pkg/checkpoint/BUILD.bazel index c8679787db4a8..dc3471726625b 100644 --- a/br/pkg/checkpoint/BUILD.bazel +++ b/br/pkg/checkpoint/BUILD.bazel @@ -45,7 +45,7 @@ go_test( srcs = ["checkpoint_test.go"], flaky = True, race = "on", - shard_count = 8, + shard_count = 9, deps = [ ":checkpoint", "//br/pkg/gluetidb", diff --git a/br/pkg/checkpoint/checkpoint.go b/br/pkg/checkpoint/checkpoint.go index 765ede725fb98..78f9a7587d255 100644 --- a/br/pkg/checkpoint/checkpoint.go +++ b/br/pkg/checkpoint/checkpoint.go @@ -67,13 +67,7 @@ type RangeType struct { *rtree.Range } -func (r RangeType) IdentKey() []byte { - return r.StartKey -} - -type ValueType interface { - IdentKey() []byte -} +type ValueType any type CheckpointMessage[K KeyType, V ValueType] struct { // start-key of the origin range @@ -261,7 +255,6 @@ func (r *CheckpointRunner[K, V]) WaitForFinish(ctx context.Context, flush bool) // wait the range flusher exit r.wg.Wait() // remove the checkpoint lock - r.checkpointStorage.deleteLock(ctx) r.checkpointStorage.close() } diff --git a/br/pkg/checkpoint/checkpoint_test.go b/br/pkg/checkpoint/checkpoint_test.go index c6756f8058c5c..b02402005aa43 100644 --- a/br/pkg/checkpoint/checkpoint_test.go +++ b/br/pkg/checkpoint/checkpoint_test.go @@ -75,7 +75,7 @@ func TestCheckpointMetaForRestore(t *testing.T) { }, }, } - err = checkpoint.SaveCheckpointMetadataForSnapshotRestore(ctx, se, checkpointMetaForSnapshotRestore) + err = checkpoint.SaveCheckpointMetadataForSstRestore(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, checkpointMetaForSnapshotRestore) require.NoError(t, err) checkpointMetaForSnapshotRestore2, err := checkpoint.LoadCheckpointMetadataForSnapshotRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor()) require.NoError(t, err) @@ -278,9 +278,9 @@ func TestCheckpointRestoreRunner(t *testing.T) { se, err := g.CreateSession(s.Mock.Storage) require.NoError(t, err) - err = checkpoint.SaveCheckpointMetadataForSnapshotRestore(ctx, se, &checkpoint.CheckpointMetadataForSnapshotRestore{}) + err = checkpoint.SaveCheckpointMetadataForSstRestore(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, &checkpoint.CheckpointMetadataForSnapshotRestore{}) require.NoError(t, err) - checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, 5*time.Second, 3*time.Second) + checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, 5*time.Second, 3*time.Second) require.NoError(t, err) data := map[string]struct { @@ -310,7 +310,7 @@ func TestCheckpointRestoreRunner(t *testing.T) { } for _, d := range data { - err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 1, d.RangeKey) + err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, checkpoint.NewCheckpointRangeKeyItem(1, d.RangeKey)) require.NoError(t, err) } @@ -320,7 +320,7 @@ func TestCheckpointRestoreRunner(t *testing.T) { checkpointRunner.FlushChecksum(ctx, 4, 4, 4, 4) for _, d := range data2 { - err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 2, d.RangeKey) + err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, checkpoint.NewCheckpointRangeKeyItem(2, d.RangeKey)) require.NoError(t, err) } @@ -343,7 +343,7 @@ func TestCheckpointRestoreRunner(t *testing.T) { respCount += 1 } - _, err = checkpoint.LoadCheckpointDataForSnapshotRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), checker) + _, err = checkpoint.LoadCheckpointDataForSstRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), checkpoint.SnapshotRestoreCheckpointDatabaseName, checker) require.NoError(t, err) require.Equal(t, 4, respCount) @@ -355,10 +355,10 @@ func TestCheckpointRestoreRunner(t *testing.T) { require.Equal(t, checksum[i].Crc64xor, uint64(i)) } - err = checkpoint.RemoveCheckpointDataForSnapshotRestore(ctx, s.Mock.Domain, se) + err = checkpoint.RemoveCheckpointDataForSstRestore(ctx, s.Mock.Domain, se, checkpoint.SnapshotRestoreCheckpointDatabaseName) require.NoError(t, err) - exists := checkpoint.ExistsSnapshotRestoreCheckpoint(ctx, s.Mock.Domain) + exists := checkpoint.ExistsSstRestoreCheckpoint(ctx, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName) require.False(t, exists) exists = s.Mock.Domain.InfoSchema().SchemaExists(pmodel.NewCIStr(checkpoint.SnapshotRestoreCheckpointDatabaseName)) require.False(t, exists) @@ -371,9 +371,9 @@ func TestCheckpointRunnerRetry(t *testing.T) { se, err := g.CreateSession(s.Mock.Storage) require.NoError(t, err) - err = checkpoint.SaveCheckpointMetadataForSnapshotRestore(ctx, se, &checkpoint.CheckpointMetadataForSnapshotRestore{}) + err = checkpoint.SaveCheckpointMetadataForSstRestore(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, &checkpoint.CheckpointMetadataForSnapshotRestore{}) require.NoError(t, err) - checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, 100*time.Millisecond, 300*time.Millisecond) + checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, 100*time.Millisecond, 300*time.Millisecond) require.NoError(t, err) err = failpoint.Enable("github.com/pingcap/tidb/br/pkg/checkpoint/failed-after-checkpoint-flushes", "return(true)") @@ -382,9 +382,9 @@ func TestCheckpointRunnerRetry(t *testing.T) { err = failpoint.Disable("github.com/pingcap/tidb/br/pkg/checkpoint/failed-after-checkpoint-flushes") require.NoError(t, err) }() - err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 1, "123") + err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, checkpoint.NewCheckpointRangeKeyItem(1, "123")) require.NoError(t, err) - err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 2, "456") + err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, checkpoint.NewCheckpointRangeKeyItem(2, "456")) require.NoError(t, err) err = checkpointRunner.FlushChecksum(ctx, 1, 1, 1, 1) require.NoError(t, err) @@ -392,7 +392,7 @@ func TestCheckpointRunnerRetry(t *testing.T) { time.Sleep(time.Second) err = failpoint.Disable("github.com/pingcap/tidb/br/pkg/checkpoint/failed-after-checkpoint-flushes") require.NoError(t, err) - err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 3, "789") + err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, checkpoint.NewCheckpointRangeKeyItem(3, "789")) require.NoError(t, err) err = checkpointRunner.FlushChecksum(ctx, 3, 3, 3, 3) require.NoError(t, err) @@ -400,14 +400,15 @@ func TestCheckpointRunnerRetry(t *testing.T) { se, err = g.CreateSession(s.Mock.Storage) require.NoError(t, err) recordSet := make(map[string]int) - _, err = checkpoint.LoadCheckpointDataForSnapshotRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), - func(tableID int64, rangeKey checkpoint.RestoreValueType) { - recordSet[fmt.Sprintf("%d_%s", tableID, rangeKey)] += 1 + _, err = checkpoint.LoadCheckpointDataForSstRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), + checkpoint.SnapshotRestoreCheckpointDatabaseName, + func(tableID int64, v checkpoint.RestoreValueType) { + recordSet[fmt.Sprintf("%d_%s", tableID, v.RangeKey)] += 1 }) require.NoError(t, err) - require.LessOrEqual(t, 1, recordSet["1_{123}"]) - require.LessOrEqual(t, 1, recordSet["2_{456}"]) - require.LessOrEqual(t, 1, recordSet["3_{789}"]) + require.LessOrEqual(t, 1, recordSet["1_123"]) + require.LessOrEqual(t, 1, recordSet["2_456"]) + require.LessOrEqual(t, 1, recordSet["3_789"]) items, _, err := checkpoint.LoadCheckpointChecksumForRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor()) require.NoError(t, err) require.Equal(t, fmt.Sprintf("%d_%d_%d", items[1].Crc64xor, items[1].TotalBytes, items[1].TotalKvs), "1_1_1") @@ -422,14 +423,14 @@ func TestCheckpointRunnerNoRetry(t *testing.T) { se, err := g.CreateSession(s.Mock.Storage) require.NoError(t, err) - err = checkpoint.SaveCheckpointMetadataForSnapshotRestore(ctx, se, &checkpoint.CheckpointMetadataForSnapshotRestore{}) + err = checkpoint.SaveCheckpointMetadataForSstRestore(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, &checkpoint.CheckpointMetadataForSnapshotRestore{}) require.NoError(t, err) - checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, 100*time.Millisecond, 300*time.Millisecond) + checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, 100*time.Millisecond, 300*time.Millisecond) require.NoError(t, err) - err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 1, "123") + err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, checkpoint.NewCheckpointRangeKeyItem(1, "123")) require.NoError(t, err) - err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 2, "456") + err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, checkpoint.NewCheckpointRangeKeyItem(2, "456")) require.NoError(t, err) err = checkpointRunner.FlushChecksum(ctx, 1, 1, 1, 1) require.NoError(t, err) @@ -440,13 +441,14 @@ func TestCheckpointRunnerNoRetry(t *testing.T) { se, err = g.CreateSession(s.Mock.Storage) require.NoError(t, err) recordSet := make(map[string]int) - _, err = checkpoint.LoadCheckpointDataForSnapshotRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), - func(tableID int64, rangeKey checkpoint.RestoreValueType) { - recordSet[fmt.Sprintf("%d_%s", tableID, rangeKey)] += 1 + _, err = checkpoint.LoadCheckpointDataForSstRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), + checkpoint.SnapshotRestoreCheckpointDatabaseName, + func(tableID int64, v checkpoint.RestoreValueType) { + recordSet[fmt.Sprintf("%d_%s", tableID, v.RangeKey)] += 1 }) require.NoError(t, err) - require.Equal(t, 1, recordSet["1_{123}"]) - require.Equal(t, 1, recordSet["2_{456}"]) + require.Equal(t, 1, recordSet["1_123"]) + require.Equal(t, 1, recordSet["2_456"]) items, _, err := checkpoint.LoadCheckpointChecksumForRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor()) require.NoError(t, err) require.Equal(t, fmt.Sprintf("%d_%d_%d", items[1].Crc64xor, items[1].TotalBytes, items[1].TotalKvs), "1_1_1") @@ -584,3 +586,57 @@ func TestCheckpointRunnerLock(t *testing.T) { runner.WaitForFinish(ctx, true) } + +func TestCheckpointCompactedRestoreRunner(t *testing.T) { + ctx := context.Background() + s := utiltest.CreateRestoreSchemaSuite(t) + g := gluetidb.New() + se, err := g.CreateSession(s.Mock.Storage) + require.NoError(t, err) + + err = checkpoint.SaveCheckpointMetadataForSstRestore(ctx, se, checkpoint.CustomSSTRestoreCheckpointDatabaseName, nil) + require.NoError(t, err) + checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, checkpoint.CustomSSTRestoreCheckpointDatabaseName, 500*time.Millisecond, time.Second) + require.NoError(t, err) + + data := map[string]struct { + Name string + }{ + "a": {Name: "a"}, + "A": {Name: "A"}, + "1": {Name: "1"}, + } + + for _, d := range data { + err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, checkpoint.NewCheckpointFileItem(1, d.Name)) + require.NoError(t, err) + } + + checkpointRunner.FlushChecksum(ctx, 1, 1, 1, 1) + checkpointRunner.FlushChecksum(ctx, 2, 2, 2, 2) + + checkpointRunner.WaitForFinish(ctx, true) + + se, err = g.CreateSession(s.Mock.Storage) + require.NoError(t, err) + respCount := 0 + checker := func(tableID int64, resp checkpoint.RestoreValueType) { + require.NotNil(t, resp) + d, ok := data[resp.Name] + require.True(t, ok) + require.Equal(t, d.Name, resp.Name) + respCount++ + } + + _, err = checkpoint.LoadCheckpointDataForSstRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), checkpoint.CustomSSTRestoreCheckpointDatabaseName, checker) + require.NoError(t, err) + require.Equal(t, 3, respCount) + + err = checkpoint.RemoveCheckpointDataForSstRestore(ctx, s.Mock.Domain, se, checkpoint.CustomSSTRestoreCheckpointDatabaseName) + require.NoError(t, err) + + exists := checkpoint.ExistsSstRestoreCheckpoint(ctx, s.Mock.Domain, checkpoint.CustomSSTRestoreCheckpointDatabaseName) + require.False(t, exists) + exists = s.Mock.Domain.InfoSchema().SchemaExists(pmodel.NewCIStr(checkpoint.CustomSSTRestoreCheckpointDatabaseName)) + require.False(t, exists) +} diff --git a/br/pkg/checkpoint/external_storage.go b/br/pkg/checkpoint/external_storage.go index 47d8ed0296624..f7d365068e6cf 100644 --- a/br/pkg/checkpoint/external_storage.go +++ b/br/pkg/checkpoint/external_storage.go @@ -187,12 +187,3 @@ func (s *externalCheckpointStorage) updateLock(ctx context.Context) error { return nil } - -func (s *externalCheckpointStorage) deleteLock(ctx context.Context) { - if s.lockId > 0 { - err := s.storage.DeleteFile(ctx, s.CheckpointLockPath) - if err != nil { - log.Warn("failed to remove the checkpoint lock", zap.Error(err)) - } - } -} diff --git a/br/pkg/checkpoint/log_restore.go b/br/pkg/checkpoint/log_restore.go index b2ae3c398a3c8..0fd046b67ad7c 100644 --- a/br/pkg/checkpoint/log_restore.go +++ b/br/pkg/checkpoint/log_restore.go @@ -17,11 +17,9 @@ package checkpoint import ( "context" "encoding/json" - "fmt" "time" "github.com/pingcap/errors" - "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/pkg/domain" "github.com/pingcap/tidb/pkg/meta/model" @@ -39,10 +37,6 @@ type LogRestoreValueType struct { Foff int } -func (l LogRestoreValueType) IdentKey() []byte { - return []byte(fmt.Sprint(l.Goff, '.', l.Foff, '.', l.TableID)) -} - type LogRestoreValueMarshaled struct { // group index in the metadata Goff int `json:"goff"` @@ -50,11 +44,6 @@ type LogRestoreValueMarshaled struct { Foffs map[int64][]int `json:"foffs"` } -func (l LogRestoreValueMarshaled) IdentKey() []byte { - log.Fatal("unimplement!") - return nil -} - // valueMarshalerForLogRestore convert the checkpoint data‘s format to an smaller space-used format // input format : // @@ -299,7 +288,7 @@ func TryToGetCheckpointTaskInfo( return nil, errors.Trace(err) } } - hasSnapshotMetadata := ExistsSnapshotRestoreCheckpoint(ctx, dom) + hasSnapshotMetadata := ExistsSstRestoreCheckpoint(ctx, dom, SnapshotRestoreCheckpointDatabaseName) return &CheckpointTaskInfoForLogRestore{ Metadata: metadata, diff --git a/br/pkg/checkpoint/restore.go b/br/pkg/checkpoint/restore.go index 88ff6f8f204de..c05ec3df8c2f2 100644 --- a/br/pkg/checkpoint/restore.go +++ b/br/pkg/checkpoint/restore.go @@ -30,11 +30,31 @@ import ( type RestoreKeyType = int64 type RestoreValueType struct { // the file key of a range - RangeKey string + RangeKey string `json:"range-key,omitempty"` + // the file name, used for compacted restore + Name string `json:"name,omitempty"` } -func (rv RestoreValueType) IdentKey() []byte { - return []byte(rv.RangeKey) +type CheckpointItem struct { + tableID RestoreKeyType + // used for table full backup restore + rangeKey string + // used for table raw/txn/compacted SST restore + name string +} + +func NewCheckpointRangeKeyItem(tableID RestoreKeyType, rangeKey string) *CheckpointItem { + return &CheckpointItem{ + tableID: tableID, + rangeKey: rangeKey, + } +} + +func NewCheckpointFileItem(tableID RestoreKeyType, fileName string) *CheckpointItem { + return &CheckpointItem{ + tableID: tableID, + name: fileName, + } } func valueMarshalerForRestore(group *RangeGroup[RestoreKeyType, RestoreValueType]) ([]byte, error) { @@ -45,11 +65,12 @@ func valueMarshalerForRestore(group *RangeGroup[RestoreKeyType, RestoreValueType func StartCheckpointRestoreRunnerForTest( ctx context.Context, se glue.Session, + dbName string, tick time.Duration, retryDuration time.Duration, ) (*CheckpointRunner[RestoreKeyType, RestoreValueType], error) { runner := newCheckpointRunner[RestoreKeyType, RestoreValueType]( - newTableCheckpointStorage(se, SnapshotRestoreCheckpointDatabaseName), + newTableCheckpointStorage(se, dbName), nil, valueMarshalerForRestore) runner.startCheckpointMainLoop(ctx, tick, tick, 0, retryDuration) @@ -60,9 +81,10 @@ func StartCheckpointRestoreRunnerForTest( func StartCheckpointRunnerForRestore( ctx context.Context, se glue.Session, + dbName string, ) (*CheckpointRunner[RestoreKeyType, RestoreValueType], error) { runner := newCheckpointRunner[RestoreKeyType, RestoreValueType]( - newTableCheckpointStorage(se, SnapshotRestoreCheckpointDatabaseName), + newTableCheckpointStorage(se, dbName), nil, valueMarshalerForRestore) // for restore, no need to set lock @@ -75,25 +97,33 @@ func StartCheckpointRunnerForRestore( func AppendRangesForRestore( ctx context.Context, r *CheckpointRunner[RestoreKeyType, RestoreValueType], - tableID RestoreKeyType, - rangeKey string, + c *CheckpointItem, ) error { + var group RestoreValueType + if len(c.rangeKey) != 0 { + group.RangeKey = c.rangeKey + } else if len(c.name) != 0 { + group.Name = c.name + } else { + return errors.New("either rangekey or name should be used in checkpoint append") + } return r.Append(ctx, &CheckpointMessage[RestoreKeyType, RestoreValueType]{ - GroupKey: tableID, + GroupKey: c.tableID, Group: []RestoreValueType{ - {RangeKey: rangeKey}, + group, }, }) } // load the whole checkpoint range data and retrieve the metadata of restored ranges // and return the total time cost in the past executions -func LoadCheckpointDataForSnapshotRestore[K KeyType, V ValueType]( +func LoadCheckpointDataForSstRestore[K KeyType, V ValueType]( ctx context.Context, execCtx sqlexec.RestrictedSQLExecutor, + dbName string, fn func(K, V), ) (time.Duration, error) { - return selectCheckpointData(ctx, execCtx, SnapshotRestoreCheckpointDatabaseName, fn) + return selectCheckpointData(ctx, execCtx, dbName, fn) } func LoadCheckpointChecksumForRestore( @@ -118,28 +148,33 @@ func LoadCheckpointMetadataForSnapshotRestore( return m, err } -func SaveCheckpointMetadataForSnapshotRestore( +func SaveCheckpointMetadataForSstRestore( ctx context.Context, se glue.Session, + dbName string, meta *CheckpointMetadataForSnapshotRestore, ) error { - err := initCheckpointTable(ctx, se, SnapshotRestoreCheckpointDatabaseName, + err := initCheckpointTable(ctx, se, dbName, []string{checkpointDataTableName, checkpointChecksumTableName}) if err != nil { return errors.Trace(err) } - return insertCheckpointMeta(ctx, se, SnapshotRestoreCheckpointDatabaseName, checkpointMetaTableName, meta) + if meta != nil { + return insertCheckpointMeta(ctx, se, dbName, checkpointMetaTableName, meta) + } + return nil } -func ExistsSnapshotRestoreCheckpoint( +func ExistsSstRestoreCheckpoint( ctx context.Context, dom *domain.Domain, + dbName string, ) bool { return dom.InfoSchema(). - TableExists(pmodel.NewCIStr(SnapshotRestoreCheckpointDatabaseName), pmodel.NewCIStr(checkpointMetaTableName)) + TableExists(pmodel.NewCIStr(dbName), pmodel.NewCIStr(checkpointMetaTableName)) } -func RemoveCheckpointDataForSnapshotRestore(ctx context.Context, dom *domain.Domain, se glue.Session) error { - return dropCheckpointTables(ctx, dom, se, SnapshotRestoreCheckpointDatabaseName, +func RemoveCheckpointDataForSstRestore(ctx context.Context, dom *domain.Domain, se glue.Session, dbName string) error { + return dropCheckpointTables(ctx, dom, se, dbName, []string{checkpointDataTableName, checkpointChecksumTableName, checkpointMetaTableName}) } diff --git a/br/pkg/checkpoint/storage.go b/br/pkg/checkpoint/storage.go index 465924f8300f4..4a37a14b0da12 100644 --- a/br/pkg/checkpoint/storage.go +++ b/br/pkg/checkpoint/storage.go @@ -38,7 +38,6 @@ type checkpointStorage interface { initialLock(ctx context.Context) error updateLock(ctx context.Context) error - deleteLock(ctx context.Context) close() } @@ -48,8 +47,9 @@ type checkpointStorage interface { // 2. BR regards the metadata table as a file so that it is not empty if the table exists. // 3. BR regards the checkpoint table as a directory which is managed by metadata table. const ( - LogRestoreCheckpointDatabaseName string = "__TiDB_BR_Temporary_Log_Restore_Checkpoint" - SnapshotRestoreCheckpointDatabaseName string = "__TiDB_BR_Temporary_Snapshot_Restore_Checkpoint" + LogRestoreCheckpointDatabaseName string = "__TiDB_BR_Temporary_Log_Restore_Checkpoint" + SnapshotRestoreCheckpointDatabaseName string = "__TiDB_BR_Temporary_Snapshot_Restore_Checkpoint" + CustomSSTRestoreCheckpointDatabaseName string = "__TiDB_BR_Temporary_Custom_SST_Restore_Checkpoint" // directory level table checkpointDataTableName string = "cpt_data" @@ -90,7 +90,9 @@ const ( // IsCheckpointDB checks whether the dbname is checkpoint database. func IsCheckpointDB(dbname pmodel.CIStr) bool { - return dbname.O == LogRestoreCheckpointDatabaseName || dbname.O == SnapshotRestoreCheckpointDatabaseName + return dbname.O == LogRestoreCheckpointDatabaseName || + dbname.O == SnapshotRestoreCheckpointDatabaseName || + dbname.O == CustomSSTRestoreCheckpointDatabaseName } const CheckpointIdMapBlockSize int = 524288 @@ -142,8 +144,6 @@ func (s *tableCheckpointStorage) updateLock(ctx context.Context) error { return nil } -func (s *tableCheckpointStorage) deleteLock(ctx context.Context) {} - func (s *tableCheckpointStorage) flushCheckpointData(ctx context.Context, data []byte) error { sqls, argss := chunkInsertCheckpointSQLs(s.checkpointDBName, checkpointDataTableName, data) for i, sql := range sqls { diff --git a/br/pkg/restore/import_mode_switcher.go b/br/pkg/restore/import_mode_switcher.go index be01389c19e5f..0bec6a4d1e384 100644 --- a/br/pkg/restore/import_mode_switcher.go +++ b/br/pkg/restore/import_mode_switcher.go @@ -31,7 +31,9 @@ type ImportModeSwitcher struct { switchModeInterval time.Duration tlsConf *tls.Config - switchCh chan struct{} + mu sync.Mutex + cancel context.CancelFunc // Manages goroutine lifecycle + wg sync.WaitGroup } func NewImportModeSwitcher( @@ -43,15 +45,23 @@ func NewImportModeSwitcher( pdClient: pdClient, switchModeInterval: switchModeInterval, tlsConf: tlsConf, - switchCh: make(chan struct{}), } } -var closeOnce sync.Once - // switchToNormalMode switch tikv cluster to normal mode. func (switcher *ImportModeSwitcher) SwitchToNormalMode(ctx context.Context) error { - closeOnce.Do(func() { close(switcher.switchCh) }) + switcher.mu.Lock() + defer switcher.mu.Unlock() + + if switcher.cancel == nil { + log.Info("TiKV is already in normal mode") + return nil + } + log.Info("Stopping the import mode goroutine") + switcher.cancel() + switcher.cancel = nil + // wait for switch goroutine exits + switcher.wg.Wait() return switcher.switchTiKVMode(ctx, import_sstpb.SwitchMode_Normal) } @@ -116,26 +126,43 @@ func (switcher *ImportModeSwitcher) switchTiKVMode( return nil } -// SwitchToImportMode switch tikv cluster to import mode. -func (switcher *ImportModeSwitcher) SwitchToImportMode( +// GoSwitchToImportMode switch tikv cluster to import mode. +func (switcher *ImportModeSwitcher) GoSwitchToImportMode( ctx context.Context, -) { +) error { + switcher.mu.Lock() + defer switcher.mu.Unlock() + + if switcher.cancel != nil { + log.Info("TiKV is already in import mode") + return nil + } + + // Create a new context for the goroutine + ctx, cancel := context.WithCancel(context.Background()) + switcher.cancel = cancel + + // [important!] switch tikv mode into import at the beginning + log.Info("switch to import mode at beginning") + err := switcher.switchTiKVMode(ctx, import_sstpb.SwitchMode_Import) + if err != nil { + log.Warn("switch to import mode failed", zap.Error(err)) + return errors.Trace(err) + } + switcher.wg.Add(1) // tikv automatically switch to normal mode in every 10 minutes // so we need ping tikv in less than 10 minute go func() { tick := time.NewTicker(switcher.switchModeInterval) - defer tick.Stop() - - // [important!] switch tikv mode into import at the beginning - log.Info("switch to import mode at beginning") - err := switcher.switchTiKVMode(ctx, import_sstpb.SwitchMode_Import) - if err != nil { - log.Warn("switch to import mode failed", zap.Error(err)) - } + defer func() { + switcher.wg.Done() + tick.Stop() + }() for { select { case <-ctx.Done(): + log.Info("stop automatic switch to import mode when context done") return case <-tick.C: log.Info("switch to import mode") @@ -143,12 +170,10 @@ func (switcher *ImportModeSwitcher) SwitchToImportMode( if err != nil { log.Warn("switch to import mode failed", zap.Error(err)) } - case <-switcher.switchCh: - log.Info("stop automatic switch to import mode") - return } } }() + return nil } // RestorePreWork executes some prepare work before restore. @@ -166,7 +191,10 @@ func RestorePreWork( if switchToImport { // Switch TiKV cluster to import mode (adjust rocksdb configuration). - switcher.SwitchToImportMode(ctx) + err := switcher.GoSwitchToImportMode(ctx) + if err != nil { + return pdutil.Nop, nil, err + } } return mgr.RemoveSchedulersWithConfig(ctx) diff --git a/br/pkg/restore/log_client/client.go b/br/pkg/restore/log_client/client.go index 4faf59a316657..ec3e3539324ad 100644 --- a/br/pkg/restore/log_client/client.go +++ b/br/pkg/restore/log_client/client.go @@ -166,7 +166,7 @@ func NewSstRestoreManager( return nil, errors.Trace(err) } if se != nil { - checkpointRunner, err := checkpoint.StartCheckpointRunnerForRestore(ctx, se) + checkpointRunner, err := checkpoint.StartCheckpointRunnerForRestore(ctx, se, checkpoint.CustomSSTRestoreCheckpointDatabaseName) if err != nil { return nil, errors.Trace(err) } @@ -284,7 +284,10 @@ func (rc *LogClient) RestoreCompactedSstFiles( log.Info("[Compacted SST Restore] No SST files found for restoration.") return nil } - importModeSwitcher.SwitchToImportMode(ctx) + err := importModeSwitcher.GoSwitchToImportMode(ctx) + if err != nil { + return errors.Trace(err) + } defer func() { switchErr := importModeSwitcher.SwitchToNormalMode(ctx) if switchErr != nil { @@ -298,11 +301,9 @@ func (rc *LogClient) RestoreCompactedSstFiles( // where batch processing may lead to increased complexity and potential inefficiencies. // TODO: Future enhancements may explore the feasibility of reintroducing batch restoration // while maintaining optimal performance and resource utilization. - for _, i := range backupFileSets { - err := rc.sstRestoreManager.restorer.GoRestore(onProgress, []restore.BackupFileSet{i}) - if err != nil { - return errors.Trace(err) - } + err = rc.sstRestoreManager.restorer.GoRestore(onProgress, backupFileSets) + if err != nil { + return errors.Trace(err) } return rc.sstRestoreManager.restorer.WaitUntilFinish() } diff --git a/br/pkg/restore/restorer.go b/br/pkg/restore/restorer.go index 75a21b583eb1f..4e36086eee1c2 100644 --- a/br/pkg/restore/restorer.go +++ b/br/pkg/restore/restorer.go @@ -203,7 +203,16 @@ func (s *SimpleRestorer) GoRestore(onProgress func(int64), batchFileSets ...Batc if err != nil { return errors.Trace(err) } - // TODO handle checkpoint + if s.checkpointRunner != nil { + // The checkpoint shows this ranges of files has been restored into + // the table corresponding to the table-id. + for _, f := range set.SSTFiles { + if err := checkpoint.AppendRangesForRestore(s.ectx, s.checkpointRunner, + checkpoint.NewCheckpointFileItem(set.TableID, f.GetName())); err != nil { + return errors.Trace(err) + } + } + } return nil }) } @@ -302,7 +311,8 @@ func (m *MultiTablesRestorer) GoRestore(onProgress func(int64), batchFileSets .. for rangeKey := range rangeKeySet { // The checkpoint range shows this ranges of kvs has been restored into // the table corresponding to the table-id. - if err := checkpoint.AppendRangesForRestore(m.ectx, m.checkpointRunner, filesGroup.TableID, rangeKey); err != nil { + if err := checkpoint.AppendRangesForRestore(m.ectx, m.checkpointRunner, + checkpoint.NewCheckpointRangeKeyItem(filesGroup.TableID, rangeKey)); err != nil { return errors.Trace(err) } } @@ -317,9 +327,11 @@ func (m *MultiTablesRestorer) GoRestore(onProgress func(int64), batchFileSets .. return m.ectx.Err() } +// GetFileRangeKey is used to reduce the checkpoint number, because we combine the write cf/default cf into one restore file group. +// during full restore, so we can reduce the checkpoint number with the common prefix of the file. func GetFileRangeKey(f string) string { // the backup date file pattern is `{store_id}_{region_id}_{epoch_version}_{key}_{ts}_{cf}.sst` - // so we need to compare with out the `_{cf}.sst` suffix + // so we need to compare without the `_{cf}.sst` suffix idx := strings.LastIndex(f, "_") if idx < 0 { panic(fmt.Sprintf("invalid backup data file name: '%s'", f)) diff --git a/br/pkg/restore/snap_client/client.go b/br/pkg/restore/snap_client/client.go index b56ab9e882f55..3b5e29c5a1927 100644 --- a/br/pkg/restore/snap_client/client.go +++ b/br/pkg/restore/snap_client/client.go @@ -346,7 +346,7 @@ func (rc *SnapClient) InitCheckpoint( } // t1 is the latest time the checkpoint ranges persisted to the external storage. - t1, err := checkpoint.LoadCheckpointDataForSnapshotRestore(ctx, execCtx, func(tableID int64, v checkpoint.RestoreValueType) { + t1, err := checkpoint.LoadCheckpointDataForSstRestore(ctx, execCtx, checkpoint.SnapshotRestoreCheckpointDatabaseName, func(tableID int64, v checkpoint.RestoreValueType) { checkpointSet, exists := checkpointSetWithTableID[tableID] if !exists { checkpointSet = make(map[string]struct{}) @@ -379,7 +379,7 @@ func (rc *SnapClient) InitCheckpoint( if config != nil { meta.SchedulersConfig = &pdutil.ClusterConfig{Schedulers: config.Schedulers, ScheduleCfg: config.ScheduleCfg} } - if err := checkpoint.SaveCheckpointMetadataForSnapshotRestore(ctx, rc.db.Session(), meta); err != nil { + if err := checkpoint.SaveCheckpointMetadataForSstRestore(ctx, rc.db.Session(), checkpoint.SnapshotRestoreCheckpointDatabaseName, meta); err != nil { return checkpointSetWithTableID, nil, errors.Trace(err) } } @@ -388,7 +388,7 @@ func (rc *SnapClient) InitCheckpoint( if err != nil { return checkpointSetWithTableID, nil, errors.Trace(err) } - rc.checkpointRunner, err = checkpoint.StartCheckpointRunnerForRestore(ctx, se) + rc.checkpointRunner, err = checkpoint.StartCheckpointRunnerForRestore(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName) return checkpointSetWithTableID, checkpointClusterConfig, errors.Trace(err) } diff --git a/br/pkg/storage/BUILD.bazel b/br/pkg/storage/BUILD.bazel index 9084fc41db3cd..60c587893af9f 100644 --- a/br/pkg/storage/BUILD.bazel +++ b/br/pkg/storage/BUILD.bazel @@ -77,6 +77,7 @@ go_library( "@org_golang_google_api//transport/http", "@org_golang_x_net//http2", "@org_golang_x_oauth2//google", + "@org_golang_x_sync//errgroup", "@org_uber_go_atomic//:atomic", "@org_uber_go_multierr//:multierr", "@org_uber_go_zap//:zap", diff --git a/br/pkg/storage/helper.go b/br/pkg/storage/helper.go index c0c5c63ba0747..d9c864cf7fa66 100644 --- a/br/pkg/storage/helper.go +++ b/br/pkg/storage/helper.go @@ -7,8 +7,12 @@ import ( "sync/atomic" "github.com/pingcap/errors" + "github.com/pingcap/log" "github.com/pingcap/tidb/br/pkg/utils/iter" "github.com/pingcap/tidb/pkg/sessionctx/variable" + "github.com/pingcap/tidb/pkg/util" + "go.uber.org/zap" + "golang.org/x/sync/errgroup" ) func init() { @@ -48,22 +52,32 @@ func UnmarshalDir[T any](ctx context.Context, walkOpt *WalkOption, s ExternalSto errCh := make(chan error, 1) reader := func() { defer close(ch) - err := s.WalkDir(ctx, walkOpt, func(path string, size int64) error { - metaBytes, err := s.ReadFile(ctx, path) - if err != nil { - return errors.Annotatef(err, "failed during reading file %s", path) - } - var meta T - if err := unmarshal(&meta, path, metaBytes); err != nil { - return errors.Annotatef(err, "failed to parse subcompaction meta of file %s", path) - } - select { - case ch <- &meta: - case <-ctx.Done(): - return ctx.Err() - } + pool := util.NewWorkerPool(128, "metadata") + eg, ectx := errgroup.WithContext(ctx) + err := s.WalkDir(ectx, walkOpt, func(path string, size int64) error { + pool.ApplyOnErrorGroup(eg, func() error { + metaBytes, err := s.ReadFile(ectx, path) + if err != nil { + log.Error("failed to read file", zap.String("file", path)) + return errors.Annotatef(err, "during reading meta file %s from storage", path) + } + + var meta T + if err := unmarshal(&meta, path, metaBytes); err != nil { + return errors.Annotatef(err, "failed to unmarshal file %s", path) + } + select { + case ch <- &meta: + case <-ctx.Done(): + return ctx.Err() + } + return nil + }) return nil }) + if err == nil { + err = eg.Wait() + } if err != nil { select { case errCh <- err: diff --git a/br/pkg/task/operator/migrate_to.go b/br/pkg/task/operator/migrate_to.go index 20f76b0f86967..282e82784ecb9 100644 --- a/br/pkg/task/operator/migrate_to.go +++ b/br/pkg/task/operator/migrate_to.go @@ -5,7 +5,7 @@ import ( "github.com/fatih/color" "github.com/pingcap/errors" - backup "github.com/pingcap/kvproto/pkg/brpb" + backuppb "github.com/pingcap/kvproto/pkg/brpb" "github.com/pingcap/tidb/br/pkg/glue" "github.com/pingcap/tidb/br/pkg/storage" "github.com/pingcap/tidb/br/pkg/stream" @@ -39,7 +39,7 @@ func (cx migrateToCtx) printErr(errs []error, msg string) { } } -func (cx migrateToCtx) askForContinue(targetMig *backup.Migration) bool { +func (cx migrateToCtx) askForContinue(targetMig *backuppb.Migration) bool { tbl := cx.console.CreateTable() stream.AddMigrationToTable(targetMig, tbl) cx.console.Println("The migration going to be executed will be like: ") @@ -124,7 +124,7 @@ func RunMigrateTo(ctx context.Context, cfg MigrateToConfig) error { } return run(func(est stream.MigrationExt) stream.MergeAndMigratedTo { - return est.MergeAndMigrateTo(ctx, targetVersion, stream.MMOptInteractiveCheck(func(ctx context.Context, m *backup.Migration) bool { + return est.MergeAndMigrateTo(ctx, targetVersion, stream.MMOptInteractiveCheck(func(ctx context.Context, m *backuppb.Migration) bool { return cfg.Yes || cx.askForContinue(m) })) }) diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index 4250533a248ba..f53b7b72a9121 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -725,12 +725,16 @@ func RunRestore(c context.Context, g glue.Glue, cmdName string, cfg *RestoreConf if err != nil { log.Warn("failed to remove checkpoint data for log restore", zap.Error(err)) } - err = checkpoint.RemoveCheckpointDataForSnapshotRestore(c, mgr.GetDomain(), se) + err = checkpoint.RemoveCheckpointDataForSstRestore(c, mgr.GetDomain(), se, checkpoint.CustomSSTRestoreCheckpointDatabaseName) + if err != nil { + log.Warn("failed to remove checkpoint data for compacted restore", zap.Error(err)) + } + err = checkpoint.RemoveCheckpointDataForSstRestore(c, mgr.GetDomain(), se, checkpoint.SnapshotRestoreCheckpointDatabaseName) if err != nil { log.Warn("failed to remove checkpoint data for snapshot restore", zap.Error(err)) } } else { - err = checkpoint.RemoveCheckpointDataForSnapshotRestore(c, mgr.GetDomain(), se) + err = checkpoint.RemoveCheckpointDataForSstRestore(c, mgr.GetDomain(), se, checkpoint.SnapshotRestoreCheckpointDatabaseName) if err != nil { log.Warn("failed to remove checkpoint data for snapshot restore", zap.Error(err)) } @@ -875,7 +879,7 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s if cfg.UseCheckpoint { // if the checkpoint metadata exists in the checkpoint storage, the restore is not // for the first time. - existsCheckpointMetadata := checkpoint.ExistsSnapshotRestoreCheckpoint(ctx, mgr.GetDomain()) + existsCheckpointMetadata := checkpoint.ExistsSstRestoreCheckpoint(ctx, mgr.GetDomain(), checkpoint.SnapshotRestoreCheckpointDatabaseName) checkpointFirstRun = !existsCheckpointMetadata }