Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

log backup: use global checkpoint ts as source of truth (#58135) #58265

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 34,
shard_count = 35,
deps = [
":streamhelper",
"//br/pkg/errors",
Expand Down
19 changes: 14 additions & 5 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,8 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
c.setCheckpoints(spans.Sorted(spans.NewFullWith(e.Ranges, 0)))
globalCheckpointTs, err := c.env.GetGlobalCheckpointForTask(ctx, e.Name)
if err != nil {
log.Error("failed to get global checkpoint, skipping.", logutil.ShortError(err))
return err
// ignore the error, just log it
log.Warn("failed to get global checkpoint, skipping.", logutil.ShortError(err))
}
if globalCheckpointTs < c.task.StartTs {
globalCheckpointTs = c.task.StartTs
Expand Down Expand Up @@ -567,13 +567,21 @@ func (c *CheckpointAdvancer) isCheckpointLagged(ctx context.Context) (bool, erro
if c.cfg.CheckPointLagLimit <= 0 {
return false, nil
}
globalTs, err := c.env.GetGlobalCheckpointForTask(ctx, c.task.Name)
if err != nil {
return false, err
}
if globalTs < c.task.StartTs {
// unreachable.
return false, nil
}

now, err := c.env.FetchCurrentTS(ctx)
if err != nil {
return false, err
}

lagDuration := oracle.GetTimeFromTS(now).Sub(oracle.GetTimeFromTS(c.lastCheckpoint.TS))
lagDuration := oracle.GetTimeFromTS(now).Sub(oracle.GetTimeFromTS(globalTs))
if lagDuration > c.cfg.CheckPointLagLimit {
log.Warn("checkpoint lag is too large", zap.String("category", "log backup advancer"),
zap.Stringer("lag", lagDuration))
Expand All @@ -591,7 +599,8 @@ func (c *CheckpointAdvancer) importantTick(ctx context.Context) error {
}
isLagged, err := c.isCheckpointLagged(ctx)
if err != nil {
return errors.Annotate(err, "failed to check timestamp")
// ignore the error, just log it
log.Warn("failed to check timestamp", logutil.ShortError(err))
}
if isLagged {
err := c.env.PauseTask(ctx, c.task.Name)
Expand Down Expand Up @@ -656,7 +665,7 @@ func (c *CheckpointAdvancer) tick(ctx context.Context) error {
c.taskMu.Lock()
defer c.taskMu.Unlock()
if c.task == nil || c.isPaused.Load() {
log.Debug("No tasks yet, skipping advancing.")
log.Info("No tasks yet, skipping advancing.")
return nil
}

Expand Down
149 changes: 136 additions & 13 deletions br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"bytes"
"context"
"fmt"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -518,6 +517,85 @@ func TestEnableCheckPointLimit(t *testing.T) {
}
}

func TestOwnerChangeCheckPointLagged(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
fmt.Println(c)
}()
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

env := newTestEnv(c, t)
rngs := env.ranges
if len(rngs) == 0 {
rngs = []kv.KeyRange{{}}
}
env.task = streamhelper.TaskEvent{
Type: streamhelper.EventAdd,
Name: "whole",
Info: &backup.StreamBackupTaskInfo{
Name: "whole",
StartTs: oracle.GoTimeToTS(oracle.GetTimeFromTS(0).Add(1 * time.Minute)),
},
Ranges: rngs,
}

adv := streamhelper.NewCheckpointAdvancer(env)
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
ctx1, cancel1 := context.WithCancel(context.Background())
adv.OnStart(ctx1)
adv.OnBecomeOwner(ctx1)
log.Info("advancer1 become owner")
require.NoError(t, adv.OnTick(ctx1))

// another advancer but never advance checkpoint before
adv2 := streamhelper.NewCheckpointAdvancer(env)
adv2.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
ctx2, cancel2 := context.WithCancel(context.Background())
adv2.OnStart(ctx2)

for i := 0; i < 5; i++ {
c.advanceClusterTimeBy(2 * time.Minute)
c.advanceCheckpointBy(2 * time.Minute)
require.NoError(t, adv.OnTick(ctx1))
}
c.advanceClusterTimeBy(2 * time.Minute)
require.ErrorContains(t, adv.OnTick(ctx1), "lagged too large")

// resume task to make next tick normally
c.advanceCheckpointBy(2 * time.Minute)
env.ResumeTask(ctx)

// stop advancer1, and advancer2 should take over
cancel1()
log.Info("advancer1 owner canceled, and advancer2 become owner")
adv2.OnBecomeOwner(ctx2)
require.NoError(t, adv2.OnTick(ctx2))

// advancer2 should take over and tick normally
for i := 0; i < 10; i++ {
c.advanceClusterTimeBy(2 * time.Minute)
c.advanceCheckpointBy(2 * time.Minute)
require.NoError(t, adv2.OnTick(ctx2))
}
c.advanceClusterTimeBy(2 * time.Minute)
require.ErrorContains(t, adv2.OnTick(ctx2), "lagged too large")
// stop advancer2, and advancer1 should take over
c.advanceCheckpointBy(2 * time.Minute)
env.ResumeTask(ctx)
cancel2()
log.Info("advancer2 owner canceled, and advancer1 become owner")

adv.OnBecomeOwner(ctx)
// advancer1 should take over and tick normally when come back
require.NoError(t, adv.OnTick(ctx))
}

func TestCheckPointLagged(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
Expand Down Expand Up @@ -548,8 +626,10 @@ func TestCheckPointLagged(t *testing.T) {
})
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(2 * time.Minute)
// if global ts is not advanced, the checkpoint will not be lagged
c.advanceCheckpointBy(2 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
c.advanceClusterTimeBy(1 * time.Minute)
c.advanceClusterTimeBy(3 * time.Minute)
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
// after some times, the isPaused will be set and ticks are skipped
require.Eventually(t, func() bool {
Expand All @@ -573,8 +653,10 @@ func TestCheckPointResume(t *testing.T) {
})
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(1 * time.Minute)
// if global ts is not advanced, the checkpoint will not be lagged
c.advanceCheckpointBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
c.advanceClusterTimeBy(1 * time.Minute)
c.advanceClusterTimeBy(2 * time.Minute)
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
require.Eventually(t, func() bool {
return assert.NoError(t, adv.OnTick(ctx))
Expand Down Expand Up @@ -604,18 +686,48 @@ func TestUnregisterAfterPause(t *testing.T) {
c.CheckPointLagLimit = 1 * time.Minute
})
adv.StartTaskListener(ctx)

// wait for the task to be added
require.Eventually(t, func() bool {
return adv.HasTask()
}, 5*time.Second, 100*time.Millisecond)

// task is should be paused when global checkpoint is laggeod
// even the global checkpoint is equal to task start ts(not advanced all the time)
c.advanceClusterTimeBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
env.PauseTask(ctx, "whole")
time.Sleep(1 * time.Second)
c.advanceClusterTimeBy(1 * time.Minute)
require.Error(t, adv.OnTick(ctx), "checkpoint is lagged")
env.unregisterTask()
env.putTask()

// wait for the task to be added
require.Eventually(t, func() bool {
return adv.HasTask()
}, 5*time.Second, 100*time.Millisecond)

require.Error(t, adv.OnTick(ctx), "checkpoint is lagged")

env.unregisterTask()
// wait for the task to be deleted
require.Eventually(t, func() bool {
return !adv.HasTask()
}, 5*time.Second, 100*time.Millisecond)

// reset
c.advanceClusterTimeBy(-1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
env.PauseTask(ctx, "whole")
c.advanceClusterTimeBy(1 * time.Minute)
env.unregisterTask()
env.putTask()
// wait for the task to be add
require.Eventually(t, func() bool {
err := adv.OnTick(ctx)
return err != nil && strings.Contains(err.Error(), "check point lagged too large")
}, 5*time.Second, 300*time.Millisecond)
return adv.HasTask()
}, 5*time.Second, 100*time.Millisecond)

require.Error(t, adv.OnTick(ctx), "checkpoint is lagged")
}

// If the start ts is *NOT* lagged, even both the cluster and pd are lagged, the task should run normally.
Expand Down Expand Up @@ -727,13 +839,18 @@ func TestAddTaskWithLongRunTask2(t *testing.T) {
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(3 * time.Minute)
c.advanceCheckpointBy(1 * time.Minute)
env.advanceCheckpointBy(2 * time.Minute)
env.mockPDConnectionError()
adv.StartTaskListener(ctx)
// Try update checkpoint
require.NoError(t, adv.OnTick(ctx))
// if cannot connect to pd, the checkpoint will be rolled back
// because at this point. the global ts is 2 minutes
// and the local checkpoint ts is 1 minute
require.Error(t, adv.OnTick(ctx), "checkpoint rollback")

// only when local checkpoint > global ts, the next tick will be normal
c.advanceCheckpointBy(12 * time.Minute)
// Verify no err raised
require.NoError(t, adv.OnTick(ctx))
}
Expand Down Expand Up @@ -767,11 +884,17 @@ func TestAddTaskWithLongRunTask3(t *testing.T) {
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
c.advanceClusterTimeBy(3 * time.Minute)
// advance cluster time to 4 minutes, and checkpoint to 1 minutes
// if start ts equals to checkpoint, the task will not be paused
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(2 * time.Minute)
c.advanceCheckpointBy(1 * time.Minute)
env.advanceCheckpointBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))

c.advanceClusterTimeBy(2 * time.Minute)
c.advanceCheckpointBy(1 * time.Minute)
env.advanceCheckpointBy(1 * time.Minute)
env.mockPDConnectionError()
adv.StartTaskListener(ctx)
// Try update checkpoint
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
// Verify no err raised after paused
Expand Down
12 changes: 10 additions & 2 deletions br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,12 @@ func (t *testEnv) UploadV3GlobalCheckpointForTask(ctx context.Context, _ string,
defer t.mu.Unlock()

if checkpoint < t.checkpoint {
t.testCtx.Fatalf("checkpoint rolling back (from %d to %d)", t.checkpoint, checkpoint)
log.Error("checkpoint rolling back",
zap.Uint64("from", t.checkpoint),
zap.Uint64("to", checkpoint),
zap.Stack("stack"))
// t.testCtx.Fatalf("checkpoint rolling back (from %d to %d)", t.checkpoint, checkpoint)
return errors.New("checkpoint rolling back")
}
t.checkpoint = checkpoint
return nil
Expand Down Expand Up @@ -747,6 +752,8 @@ func (t *testEnv) advanceCheckpointBy(duration time.Duration) {
t.mu.Lock()
defer t.mu.Unlock()

log.Info("advance checkpoint", zap.Duration("duration", duration), zap.Uint64("from", t.checkpoint))

t.checkpoint = oracle.GoTimeToTS(oracle.GetTimeFromTS(t.checkpoint).Add(duration))
}

Expand All @@ -766,7 +773,8 @@ func (t *testEnv) putTask() {
Type: streamhelper.EventAdd,
Name: "whole",
Info: &backup.StreamBackupTaskInfo{
Name: "whole",
Name: "whole",
StartTs: 5,
},
Ranges: rngs,
}
Expand Down
Loading