From ee5d3479fd84a5ac6533ab913ced8736fec4b419 Mon Sep 17 00:00:00 2001 From: Wenqi Mou Date: Thu, 2 Jan 2025 19:40:59 -0500 Subject: [PATCH] remove filter and use tracker Signed-off-by: Wenqi Mou --- br/pkg/restore/log_client/BUILD.bazel | 1 - br/pkg/restore/log_client/client.go | 30 +++++++++++--------------- br/pkg/task/common.go | 4 ++-- br/pkg/task/restore.go | 31 ++++++++++++++------------- br/pkg/task/stream.go | 17 +++++++++------ br/pkg/utils/filter.go | 2 +- br/pkg/utils/filter_test.go | 6 +++--- br/tests/config/tikv.toml | 2 +- 8 files changed, 46 insertions(+), 47 deletions(-) diff --git a/br/pkg/restore/log_client/BUILD.bazel b/br/pkg/restore/log_client/BUILD.bazel index 86faca1208bdf..3930edd209e96 100644 --- a/br/pkg/restore/log_client/BUILD.bazel +++ b/br/pkg/restore/log_client/BUILD.bazel @@ -49,7 +49,6 @@ go_library( "//pkg/util/codec", "//pkg/util/redact", "//pkg/util/sqlexec", - "//pkg/util/table-filter", "@com_github_fatih_color//:color", "@com_github_gogo_protobuf//proto", "@com_github_opentracing_opentracing_go//:opentracing-go", diff --git a/br/pkg/restore/log_client/client.go b/br/pkg/restore/log_client/client.go index 8d1eda42a9c62..5bec371878568 100644 --- a/br/pkg/restore/log_client/client.go +++ b/br/pkg/restore/log_client/client.go @@ -65,7 +65,6 @@ import ( "github.com/pingcap/tidb/pkg/meta/model" tidbutil "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/sqlexec" - filter "github.com/pingcap/tidb/pkg/util/table-filter" "github.com/tikv/client-go/v2/config" kvutil "github.com/tikv/client-go/v2/util" pd "github.com/tikv/pd/client" @@ -856,10 +855,12 @@ func (rc *LogClient) loadSchemasMap( func readFilteredFullBackupTables( ctx context.Context, s storage.ExternalStorage, - tableFilter filter.Filter, piTRTableFilter *utils.PiTRTableTracker, cipherInfo *backuppb.CipherInfo, ) (map[int64]*metautil.Table, error) { + if piTRTableFilter == nil { + return nil, errors.Errorf("missing pitr table tracker information") + } metaData, err := s.ReadFile(ctx, metautil.MetaFile) if err != nil { return nil, errors.Trace(err) @@ -885,11 +886,7 @@ func readFilteredFullBackupTables( tables := make(map[int64]*metautil.Table) for _, db := range databases { - dbName := db.Info.Name.O - if name, ok := utils.StripTempTableNamePrefixIfNeeded(db.Info.Name.O); utils.IsSysDB(name) && ok { - dbName = name - } - if !tableFilter.MatchSchema(dbName) && !(piTRTableFilter != nil && piTRTableFilter.ContainsDB(db.Info.ID)) { + if !piTRTableFilter.ContainsDB(db.Info.ID) { continue } @@ -901,8 +898,7 @@ func readFilteredFullBackupTables( tableAdded = true continue } - if !tableFilter.MatchTable(dbName, table.Info.Name.O) && - !(piTRTableFilter != nil && piTRTableFilter.ContainsTable(db.Info.ID, table.Info.ID)) { + if !piTRTableFilter.ContainsTable(db.Info.ID, table.Info.ID) { continue } tables[table.Info.ID] = table @@ -929,12 +925,12 @@ type FullBackupStorageConfig struct { type GetIDMapConfig struct { // required LoadSavedIDMap bool - TableFilter filter.Filter // original table filter from user // optional - FullBackupStorage *FullBackupStorageConfig - CipherInfo *backuppb.CipherInfo - PiTRTableFilter *utils.PiTRTableTracker // generated table filter that contain all the table id that needs to restore + FullBackupStorageConfig *FullBackupStorageConfig + CipherInfo *backuppb.CipherInfo + // generated at full restore step that contains all the table ids that need to restore + PiTRTableTracker *utils.PiTRTableTracker } const UnsafePITRLogRestoreStartBeforeAnyUpstreamUserDDL = "UNSAFE_PITR_LOG_RESTORE_START_BEFORE_ANY_UPSTREAM_USER_DDL" @@ -947,7 +943,7 @@ func (rc *LogClient) generateDBReplacesFromFullBackupStorage( cfg *GetIDMapConfig, ) (map[stream.UpstreamID]*stream.DBReplace, error) { dbReplaces := make(map[stream.UpstreamID]*stream.DBReplace) - if cfg.FullBackupStorage == nil { + if cfg.FullBackupStorageConfig == nil { envVal, ok := os.LookupEnv(UnsafePITRLogRestoreStartBeforeAnyUpstreamUserDDL) if ok && len(envVal) > 0 { log.Info(fmt.Sprintf("the environment variable %s is active, skip loading the base schemas.", UnsafePITRLogRestoreStartBeforeAnyUpstreamUserDDL)) @@ -955,11 +951,11 @@ func (rc *LogClient) generateDBReplacesFromFullBackupStorage( } return nil, errors.Errorf("miss upstream table information at `start-ts`(%d) but the full backup path is not specified", rc.startTS) } - s, err := storage.New(ctx, cfg.FullBackupStorage.Backend, cfg.FullBackupStorage.Opts) + s, err := storage.New(ctx, cfg.FullBackupStorageConfig.Backend, cfg.FullBackupStorageConfig.Opts) if err != nil { return nil, errors.Trace(err) } - filteredFullBackupTables, err := readFilteredFullBackupTables(ctx, s, cfg.TableFilter, cfg.PiTRTableFilter, cfg.CipherInfo) + filteredFullBackupTables, err := readFilteredFullBackupTables(ctx, s, cfg.PiTRTableTracker, cfg.CipherInfo) if err != nil { return nil, errors.Trace(err) } @@ -1023,7 +1019,7 @@ func (rc *LogClient) GetBaseIDMap( // a new task, but without full snapshot restore, tries to load // schemas map whose `restore-ts`` is the task's `start-ts`. - if len(dbMaps) <= 0 && cfg.FullBackupStorage == nil { + if len(dbMaps) <= 0 && cfg.FullBackupStorageConfig == nil { log.Info("try to load pitr id maps of the previous task", zap.Uint64("start-ts", rc.startTS)) dbMaps, err = rc.loadSchemasMap(ctx, rc.startTS) if err != nil { diff --git a/br/pkg/task/common.go b/br/pkg/task/common.go index 79c9eaef8c5ac..58baffe9cacc5 100644 --- a/br/pkg/task/common.go +++ b/br/pkg/task/common.go @@ -254,9 +254,9 @@ type Config struct { FilterStr []string `json:"filter-strings" toml:"filter-strings"` TableFilter filter.Filter `json:"-" toml:"-"` - // PiTRTableFilter generated from TableFilter during snapshot restore, it has all the db id and table id that needs + // PiTRTableTracker generated from TableFilter during snapshot restore, it has all the db id and table id that needs // to be restored - PiTRTableFilter *utils.PiTRTableTracker `json:"-" toml:"-"` + PiTRTableTracker *utils.PiTRTableTracker `json:"-" toml:"-"` SwitchModeInterval time.Duration `json:"switch-mode-interval" toml:"switch-mode-interval"` // Schemas is a database name set, to check whether the restore database has been backup Schemas map[string]struct{} diff --git a/br/pkg/task/restore.go b/br/pkg/task/restore.go index a8c073fdee838..6a58031455249 100644 --- a/br/pkg/task/restore.go +++ b/br/pkg/task/restore.go @@ -862,7 +862,7 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s if cfg.logTableHistoryManager != nil { // adjust tables to restore in the snapshot restore phase since it will later be renamed during // log restore and will fall into or out of the filter range. - err := adjustTablesToRestoreAndCreateFilter(cfg.logTableHistoryManager, cfg.RestoreConfig, client, fileMap, tableMap) + err := adjustTablesToRestoreAndCreateTableTracker(cfg.logTableHistoryManager, cfg.RestoreConfig, client, fileMap, tableMap) if err != nil { return errors.Trace(err) } @@ -873,7 +873,7 @@ func runSnapshotRestore(c context.Context, mgr *conn.Mgr, g glue.Glue, cmdName s zap.Int("db", len(dbMap))) // need to update to include all eligible table id from snapshot restore - UpdatePiTRFilter(cfg.RestoreConfig, tableMap) + UpdatePiTRTableTracker(cfg.RestoreConfig, tableMap) } files, tables, dbs := convertMapsToSlices(fileMap, tableMap, dbMap) @@ -1452,7 +1452,7 @@ func filterRestoreFiles( return } -func adjustTablesToRestoreAndCreateFilter( +func adjustTablesToRestoreAndCreateTableTracker( logBackupTableHistory *stream.LogBackupTableHistoryManager, cfg *RestoreConfig, client *snapclient.SnapClient, @@ -1461,14 +1461,14 @@ func adjustTablesToRestoreAndCreateFilter( ) (err error) { snapshotDBMap := client.GetDatabaseMap() - // build filter for pitr restore to use later - piTRTableFilter := utils.NewPiTRTableFilter() + // build tracker for pitr restore to use later + piTRTableTracker := utils.NewPiTRTableTracker() // put all the newly created db that matches the filter during log backup into the pitr filter newlyCreatedDBs := logBackupTableHistory.GetNewlyCreatedDBHistory() for dbId, dbName := range newlyCreatedDBs { if utils.MatchSchema(cfg.TableFilter, dbName) { - piTRTableFilter.AddDB(dbId) + piTRTableTracker.AddDB(dbId) } } @@ -1480,10 +1480,11 @@ func adjustTablesToRestoreAndCreateFilter( end := dbIDAndTableName[1] var dbName string + // check in snapshot if snapDb, exists := snapshotDBMap[end.DbID]; exists { dbName = snapDb.Info.Name.O } else if name, exists := logBackupTableHistory.GetDBNameByID(end.DbID); exists { - // if db id does not exist in the snapshot, meaning it's created during log backup + // check during log backup dbName = name } else { log.Warn("did not find db id in full/log backup, "+ @@ -1502,10 +1503,10 @@ func adjustTablesToRestoreAndCreateFilter( // 2. original has been renamed and current is in the filter range // we need to restore original table if utils.MatchTable(cfg.TableFilter, dbName, end.TableName) { - // put this db/table id into pitr filter as it matches with user's filter + // put this db/table id into pitr tracker as it matches with user's filter // have to update filter here since table might be empty or not in snapshot so nothing will be returned . // but we still need to capture this table id to restore during log restore. - piTRTableFilter.AddTable(end.DbID, tableID) + piTRTableTracker.AddTable(end.DbID, tableID) // check if snapshot contains the original db/table originalDB, exists := snapshotDBMap[start.DbID] @@ -1533,7 +1534,7 @@ func adjustTablesToRestoreAndCreateFilter( // restoring } else if utils.MatchTable(cfg.TableFilter, dbName, start.TableName) { // remove it from the filter, will not remove db even table size becomes 0 - _ = piTRTableFilter.Remove(start.DbID, tableID) + _ = piTRTableTracker.Remove(start.DbID, tableID) // check if snapshot contains the original db/table originalDB, exists := snapshotDBMap[start.DbID] @@ -1556,15 +1557,15 @@ func adjustTablesToRestoreAndCreateFilter( } } } - // store the filter into config - log.Info("pitr table filter", zap.String("map", piTRTableFilter.String())) - cfg.PiTRTableFilter = piTRTableFilter + // store the tracker into config + log.Info("pitr table tracker", zap.String("map", piTRTableTracker.String())) + cfg.PiTRTableTracker = piTRTableTracker return } -func UpdatePiTRFilter(cfg *RestoreConfig, tableMap map[int64]*metautil.Table) { +func UpdatePiTRTableTracker(cfg *RestoreConfig, tableMap map[int64]*metautil.Table) { for _, table := range tableMap { - cfg.PiTRTableFilter.AddTable(table.DB.ID, table.Info.ID) + cfg.PiTRTableTracker.AddTable(table.DB.ID, table.Info.ID) } } diff --git a/br/pkg/task/stream.go b/br/pkg/task/stream.go index 2277b547f6fbe..6f3d5cc77ba41 100644 --- a/br/pkg/task/stream.go +++ b/br/pkg/task/stream.go @@ -2014,7 +2014,7 @@ func buildSchemaReplace( func buildAndSaveIDMapIfNeeded(ctx context.Context, client *logclient.LogClient, cfg *LogRestoreConfig, tableMappingManager *stream.TableMappingManager) error { // get full backup meta storage if needed. - fullBackupStorage, err := parseFullBackupTablesStorage(cfg.RestoreConfig) + fullBackupStorageConfig, err := parseFullBackupTablesStorage(cfg.RestoreConfig) if err != nil { return errors.Trace(err) } @@ -2022,11 +2022,10 @@ func buildAndSaveIDMapIfNeeded(ctx context.Context, client *logclient.LogClient, // get the schemas ID replace information. saved := isCurrentIdMapSaved(cfg.checkpointTaskInfo) dbReplaces, err := client.GetBaseIDMap(ctx, &logclient.GetIDMapConfig{ - LoadSavedIDMap: saved, - TableFilter: cfg.TableFilter, - PiTRTableFilter: cfg.PiTRTableFilter, - FullBackupStorage: fullBackupStorage, - CipherInfo: &cfg.Config.CipherInfo, + LoadSavedIDMap: saved, + PiTRTableTracker: cfg.PiTRTableTracker, + FullBackupStorageConfig: fullBackupStorageConfig, + CipherInfo: &cfg.Config.CipherInfo, }) if err != nil { return errors.Trace(err) @@ -2039,7 +2038,11 @@ func buildAndSaveIDMapIfNeeded(ctx context.Context, client *logclient.LogClient, } } else { tableMappingManager.MergeBaseDBReplace(dbReplaces) - tableMappingManager.FilterDBReplaceMap(cfg.PiTRTableFilter) + if cfg.PiTRTableTracker != nil { + tableMappingManager.FilterDBReplaceMap(cfg.PiTRTableTracker) + } else { + log.Warn("pitr table tracker is nil, base map is not from full backup") + } err = tableMappingManager.ReplaceTemporaryIDs(ctx, client.GenGlobalIDs) if err != nil { return errors.Trace(err) diff --git a/br/pkg/utils/filter.go b/br/pkg/utils/filter.go index bfe278c07556c..0cfd0d9fc2118 100644 --- a/br/pkg/utils/filter.go +++ b/br/pkg/utils/filter.go @@ -27,7 +27,7 @@ type PiTRTableTracker struct { DBIdToTable map[int64]map[int64]struct{} } -func NewPiTRTableFilter() *PiTRTableTracker { +func NewPiTRTableTracker() *PiTRTableTracker { return &PiTRTableTracker{ DBIdToTable: make(map[int64]map[int64]struct{}), } diff --git a/br/pkg/utils/filter_test.go b/br/pkg/utils/filter_test.go index adac7e7d50da4..dd23f9cc161c1 100644 --- a/br/pkg/utils/filter_test.go +++ b/br/pkg/utils/filter_test.go @@ -8,14 +8,14 @@ import ( func TestPiTRTableTracker(t *testing.T) { t.Run("test new tracker", func(t *testing.T) { - tracker := NewPiTRTableFilter() + tracker := NewPiTRTableTracker() require.NotNil(t, tracker) require.NotNil(t, tracker.DBIdToTable) require.Empty(t, tracker.DBIdToTable) }) t.Run("test update and contains table", func(t *testing.T) { - tracker := NewPiTRTableFilter() + tracker := NewPiTRTableTracker() tracker.AddDB(1) tracker.AddTable(1, 100) @@ -38,7 +38,7 @@ func TestPiTRTableTracker(t *testing.T) { }) t.Run("test remove table", func(t *testing.T) { - tracker := NewPiTRTableFilter() + tracker := NewPiTRTableTracker() tracker.AddTable(1, 100) tracker.AddTable(1, 101) diff --git a/br/tests/config/tikv.toml b/br/tests/config/tikv.toml index 22126549ab848..07eba85bc268d 100644 --- a/br/tests/config/tikv.toml +++ b/br/tests/config/tikv.toml @@ -37,5 +37,5 @@ path = "/tmp/backup_restore_test/master-key-file" [log-backup] max-flush-interval = "50s" [gc] -ratio-threshold = 1.1 +ratio-threshold = -1.0