Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BR: support restore compacted logs with checkpoint #57532

Merged
merged 12 commits into from
Dec 2, 2024
2 changes: 1 addition & 1 deletion br/pkg/checkpoint/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ go_test(
srcs = ["checkpoint_test.go"],
flaky = True,
race = "on",
shard_count = 8,
shard_count = 9,
deps = [
":checkpoint",
"//br/pkg/gluetidb",
Expand Down
9 changes: 1 addition & 8 deletions br/pkg/checkpoint/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,7 @@ type RangeType struct {
*rtree.Range
}

func (r RangeType) IdentKey() []byte {
return r.StartKey
}

type ValueType interface {
IdentKey() []byte
}
type ValueType any

type CheckpointMessage[K KeyType, V ValueType] struct {
// start-key of the origin range
Expand Down Expand Up @@ -261,7 +255,6 @@ func (r *CheckpointRunner[K, V]) WaitForFinish(ctx context.Context, flush bool)
// wait the range flusher exit
r.wg.Wait()
// remove the checkpoint lock
r.checkpointStorage.deleteLock(ctx)
r.checkpointStorage.close()
}

Expand Down
112 changes: 84 additions & 28 deletions br/pkg/checkpoint/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestCheckpointMetaForRestore(t *testing.T) {
},
},
}
err = checkpoint.SaveCheckpointMetadataForSnapshotRestore(ctx, se, checkpointMetaForSnapshotRestore)
err = checkpoint.SaveCheckpointMetadataForSstRestore(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, checkpointMetaForSnapshotRestore)
require.NoError(t, err)
checkpointMetaForSnapshotRestore2, err := checkpoint.LoadCheckpointMetadataForSnapshotRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor())
require.NoError(t, err)
Expand Down Expand Up @@ -278,9 +278,9 @@ func TestCheckpointRestoreRunner(t *testing.T) {
se, err := g.CreateSession(s.Mock.Storage)
require.NoError(t, err)

err = checkpoint.SaveCheckpointMetadataForSnapshotRestore(ctx, se, &checkpoint.CheckpointMetadataForSnapshotRestore{})
err = checkpoint.SaveCheckpointMetadataForSstRestore(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, &checkpoint.CheckpointMetadataForSnapshotRestore{})
require.NoError(t, err)
checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, 5*time.Second, 3*time.Second)
checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, 5*time.Second, 3*time.Second)
require.NoError(t, err)

data := map[string]struct {
Expand Down Expand Up @@ -310,7 +310,7 @@ func TestCheckpointRestoreRunner(t *testing.T) {
}

for _, d := range data {
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 1, d.RangeKey)
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, checkpoint.NewCheckpointRangeKeyItem(1, d.RangeKey))
require.NoError(t, err)
}

Expand All @@ -320,7 +320,7 @@ func TestCheckpointRestoreRunner(t *testing.T) {
checkpointRunner.FlushChecksum(ctx, 4, 4, 4, 4)

for _, d := range data2 {
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 2, d.RangeKey)
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, checkpoint.NewCheckpointRangeKeyItem(2, d.RangeKey))
require.NoError(t, err)
}

Expand All @@ -343,7 +343,7 @@ func TestCheckpointRestoreRunner(t *testing.T) {
respCount += 1
}

_, err = checkpoint.LoadCheckpointDataForSnapshotRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), checker)
_, err = checkpoint.LoadCheckpointDataForSstRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), checkpoint.SnapshotRestoreCheckpointDatabaseName, checker)
require.NoError(t, err)
require.Equal(t, 4, respCount)

Expand All @@ -355,10 +355,10 @@ func TestCheckpointRestoreRunner(t *testing.T) {
require.Equal(t, checksum[i].Crc64xor, uint64(i))
}

err = checkpoint.RemoveCheckpointDataForSnapshotRestore(ctx, s.Mock.Domain, se)
err = checkpoint.RemoveCheckpointDataForSstRestore(ctx, s.Mock.Domain, se, checkpoint.SnapshotRestoreCheckpointDatabaseName)
require.NoError(t, err)

exists := checkpoint.ExistsSnapshotRestoreCheckpoint(ctx, s.Mock.Domain)
exists := checkpoint.ExistsSstRestoreCheckpoint(ctx, s.Mock.Domain, checkpoint.SnapshotRestoreCheckpointDatabaseName)
require.False(t, exists)
exists = s.Mock.Domain.InfoSchema().SchemaExists(pmodel.NewCIStr(checkpoint.SnapshotRestoreCheckpointDatabaseName))
require.False(t, exists)
Expand All @@ -371,9 +371,9 @@ func TestCheckpointRunnerRetry(t *testing.T) {
se, err := g.CreateSession(s.Mock.Storage)
require.NoError(t, err)

err = checkpoint.SaveCheckpointMetadataForSnapshotRestore(ctx, se, &checkpoint.CheckpointMetadataForSnapshotRestore{})
err = checkpoint.SaveCheckpointMetadataForSstRestore(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, &checkpoint.CheckpointMetadataForSnapshotRestore{})
require.NoError(t, err)
checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, 100*time.Millisecond, 300*time.Millisecond)
checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, 100*time.Millisecond, 300*time.Millisecond)
require.NoError(t, err)

err = failpoint.Enable("github.com/pingcap/tidb/br/pkg/checkpoint/failed-after-checkpoint-flushes", "return(true)")
Expand All @@ -382,32 +382,33 @@ func TestCheckpointRunnerRetry(t *testing.T) {
err = failpoint.Disable("github.com/pingcap/tidb/br/pkg/checkpoint/failed-after-checkpoint-flushes")
require.NoError(t, err)
}()
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 1, "123")
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, checkpoint.NewCheckpointRangeKeyItem(1, "123"))
require.NoError(t, err)
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 2, "456")
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, checkpoint.NewCheckpointRangeKeyItem(2, "456"))
require.NoError(t, err)
err = checkpointRunner.FlushChecksum(ctx, 1, 1, 1, 1)
require.NoError(t, err)
err = checkpointRunner.FlushChecksum(ctx, 2, 2, 2, 2)
time.Sleep(time.Second)
err = failpoint.Disable("github.com/pingcap/tidb/br/pkg/checkpoint/failed-after-checkpoint-flushes")
require.NoError(t, err)
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 3, "789")
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, checkpoint.NewCheckpointRangeKeyItem(3, "789"))
require.NoError(t, err)
err = checkpointRunner.FlushChecksum(ctx, 3, 3, 3, 3)
require.NoError(t, err)
checkpointRunner.WaitForFinish(ctx, true)
se, err = g.CreateSession(s.Mock.Storage)
require.NoError(t, err)
recordSet := make(map[string]int)
_, err = checkpoint.LoadCheckpointDataForSnapshotRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(),
func(tableID int64, rangeKey checkpoint.RestoreValueType) {
recordSet[fmt.Sprintf("%d_%s", tableID, rangeKey)] += 1
_, err = checkpoint.LoadCheckpointDataForSstRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(),
checkpoint.SnapshotRestoreCheckpointDatabaseName,
func(tableID int64, v checkpoint.RestoreValueType) {
recordSet[fmt.Sprintf("%d_%s", tableID, v.RangeKey)] += 1
})
require.NoError(t, err)
require.LessOrEqual(t, 1, recordSet["1_{123}"])
require.LessOrEqual(t, 1, recordSet["2_{456}"])
require.LessOrEqual(t, 1, recordSet["3_{789}"])
require.LessOrEqual(t, 1, recordSet["1_123"])
require.LessOrEqual(t, 1, recordSet["2_456"])
require.LessOrEqual(t, 1, recordSet["3_789"])
items, _, err := checkpoint.LoadCheckpointChecksumForRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor())
require.NoError(t, err)
require.Equal(t, fmt.Sprintf("%d_%d_%d", items[1].Crc64xor, items[1].TotalBytes, items[1].TotalKvs), "1_1_1")
Expand All @@ -422,14 +423,14 @@ func TestCheckpointRunnerNoRetry(t *testing.T) {
se, err := g.CreateSession(s.Mock.Storage)
require.NoError(t, err)

err = checkpoint.SaveCheckpointMetadataForSnapshotRestore(ctx, se, &checkpoint.CheckpointMetadataForSnapshotRestore{})
err = checkpoint.SaveCheckpointMetadataForSstRestore(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, &checkpoint.CheckpointMetadataForSnapshotRestore{})
require.NoError(t, err)
checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, 100*time.Millisecond, 300*time.Millisecond)
checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, checkpoint.SnapshotRestoreCheckpointDatabaseName, 100*time.Millisecond, 300*time.Millisecond)
require.NoError(t, err)

err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 1, "123")
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, checkpoint.NewCheckpointRangeKeyItem(1, "123"))
require.NoError(t, err)
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, 2, "456")
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, checkpoint.NewCheckpointRangeKeyItem(2, "456"))
require.NoError(t, err)
err = checkpointRunner.FlushChecksum(ctx, 1, 1, 1, 1)
require.NoError(t, err)
Expand All @@ -440,13 +441,14 @@ func TestCheckpointRunnerNoRetry(t *testing.T) {
se, err = g.CreateSession(s.Mock.Storage)
require.NoError(t, err)
recordSet := make(map[string]int)
_, err = checkpoint.LoadCheckpointDataForSnapshotRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(),
func(tableID int64, rangeKey checkpoint.RestoreValueType) {
recordSet[fmt.Sprintf("%d_%s", tableID, rangeKey)] += 1
_, err = checkpoint.LoadCheckpointDataForSstRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(),
checkpoint.SnapshotRestoreCheckpointDatabaseName,
func(tableID int64, v checkpoint.RestoreValueType) {
recordSet[fmt.Sprintf("%d_%s", tableID, v.RangeKey)] += 1
})
require.NoError(t, err)
require.Equal(t, 1, recordSet["1_{123}"])
require.Equal(t, 1, recordSet["2_{456}"])
require.Equal(t, 1, recordSet["1_123"])
require.Equal(t, 1, recordSet["2_456"])
items, _, err := checkpoint.LoadCheckpointChecksumForRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor())
require.NoError(t, err)
require.Equal(t, fmt.Sprintf("%d_%d_%d", items[1].Crc64xor, items[1].TotalBytes, items[1].TotalKvs), "1_1_1")
Expand Down Expand Up @@ -584,3 +586,57 @@ func TestCheckpointRunnerLock(t *testing.T) {

runner.WaitForFinish(ctx, true)
}

func TestCheckpointCompactedRestoreRunner(t *testing.T) {
ctx := context.Background()
s := utiltest.CreateRestoreSchemaSuite(t)
g := gluetidb.New()
se, err := g.CreateSession(s.Mock.Storage)
require.NoError(t, err)

err = checkpoint.SaveCheckpointMetadataForSstRestore(ctx, se, checkpoint.CustomSSTRestoreCheckpointDatabaseName, nil)
require.NoError(t, err)
checkpointRunner, err := checkpoint.StartCheckpointRestoreRunnerForTest(ctx, se, checkpoint.CustomSSTRestoreCheckpointDatabaseName, 500*time.Millisecond, time.Second)
require.NoError(t, err)

data := map[string]struct {
Name string
}{
"a": {Name: "a"},
"A": {Name: "A"},
"1": {Name: "1"},
}

for _, d := range data {
err = checkpoint.AppendRangesForRestore(ctx, checkpointRunner, checkpoint.NewCheckpointFileItem(1, d.Name))
require.NoError(t, err)
}

checkpointRunner.FlushChecksum(ctx, 1, 1, 1, 1)
checkpointRunner.FlushChecksum(ctx, 2, 2, 2, 2)

checkpointRunner.WaitForFinish(ctx, true)

se, err = g.CreateSession(s.Mock.Storage)
require.NoError(t, err)
respCount := 0
checker := func(tableID int64, resp checkpoint.RestoreValueType) {
require.NotNil(t, resp)
d, ok := data[resp.Name]
require.True(t, ok)
require.Equal(t, d.Name, resp.Name)
respCount++
}

_, err = checkpoint.LoadCheckpointDataForSstRestore(ctx, se.GetSessionCtx().GetRestrictedSQLExecutor(), checkpoint.CustomSSTRestoreCheckpointDatabaseName, checker)
require.NoError(t, err)
require.Equal(t, 3, respCount)

err = checkpoint.RemoveCheckpointDataForSstRestore(ctx, s.Mock.Domain, se, checkpoint.CustomSSTRestoreCheckpointDatabaseName)
require.NoError(t, err)

exists := checkpoint.ExistsSstRestoreCheckpoint(ctx, s.Mock.Domain, checkpoint.CustomSSTRestoreCheckpointDatabaseName)
require.False(t, exists)
exists = s.Mock.Domain.InfoSchema().SchemaExists(pmodel.NewCIStr(checkpoint.CustomSSTRestoreCheckpointDatabaseName))
require.False(t, exists)
}
9 changes: 0 additions & 9 deletions br/pkg/checkpoint/external_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,12 +187,3 @@ func (s *externalCheckpointStorage) updateLock(ctx context.Context) error {

return nil
}

func (s *externalCheckpointStorage) deleteLock(ctx context.Context) {
if s.lockId > 0 {
err := s.storage.DeleteFile(ctx, s.CheckpointLockPath)
if err != nil {
log.Warn("failed to remove the checkpoint lock", zap.Error(err))
}
}
}
13 changes: 1 addition & 12 deletions br/pkg/checkpoint/log_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@ package checkpoint
import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/pingcap/tidb/br/pkg/glue"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/meta/model"
Expand All @@ -39,22 +37,13 @@ type LogRestoreValueType struct {
Foff int
}

func (l LogRestoreValueType) IdentKey() []byte {
return []byte(fmt.Sprint(l.Goff, '.', l.Foff, '.', l.TableID))
}

type LogRestoreValueMarshaled struct {
// group index in the metadata
Goff int `json:"goff"`
// downstream table id -> file indexes in the group
Foffs map[int64][]int `json:"foffs"`
}

func (l LogRestoreValueMarshaled) IdentKey() []byte {
log.Fatal("unimplement!")
return nil
}

// valueMarshalerForLogRestore convert the checkpoint data‘s format to an smaller space-used format
// input format :
//
Expand Down Expand Up @@ -299,7 +288,7 @@ func TryToGetCheckpointTaskInfo(
return nil, errors.Trace(err)
}
}
hasSnapshotMetadata := ExistsSnapshotRestoreCheckpoint(ctx, dom)
hasSnapshotMetadata := ExistsSstRestoreCheckpoint(ctx, dom, SnapshotRestoreCheckpointDatabaseName)

return &CheckpointTaskInfoForLogRestore{
Metadata: metadata,
Expand Down
Loading
Loading