Skip to content

Commit

Permalink
log backup: use global checkpoint ts as source of truth (#58135) (#58259
Browse files Browse the repository at this point in the history
)

close #58031
  • Loading branch information
ti-chi-bot authored Dec 13, 2024
1 parent c44264f commit 9ffb182
Show file tree
Hide file tree
Showing 4 changed files with 161 additions and 21 deletions.
2 changes: 1 addition & 1 deletion br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 33,
shard_count = 34,
deps = [
":streamhelper",
"//br/pkg/errors",
Expand Down
19 changes: 14 additions & 5 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,8 +426,8 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
c.setCheckpoints(spans.Sorted(spans.NewFullWith(e.Ranges, 0)))
globalCheckpointTs, err := c.env.GetGlobalCheckpointForTask(ctx, e.Name)
if err != nil {
log.Error("failed to get global checkpoint, skipping.", logutil.ShortError(err))
return err
// ignore the error, just log it
log.Warn("failed to get global checkpoint, skipping.", logutil.ShortError(err))
}
if globalCheckpointTs < c.task.StartTs {
globalCheckpointTs = c.task.StartTs
Expand Down Expand Up @@ -568,13 +568,21 @@ func (c *CheckpointAdvancer) isCheckpointLagged(ctx context.Context) (bool, erro
if c.cfg.CheckPointLagLimit <= 0 {
return false, nil
}
globalTs, err := c.env.GetGlobalCheckpointForTask(ctx, c.task.Name)
if err != nil {
return false, err
}
if globalTs < c.task.StartTs {
// unreachable.
return false, nil
}

now, err := c.env.FetchCurrentTS(ctx)
if err != nil {
return false, err
}

lagDuration := oracle.GetTimeFromTS(now).Sub(oracle.GetTimeFromTS(c.lastCheckpoint.TS))
lagDuration := oracle.GetTimeFromTS(now).Sub(oracle.GetTimeFromTS(globalTs))
if lagDuration > c.cfg.CheckPointLagLimit {
log.Warn("checkpoint lag is too large", zap.String("category", "log backup advancer"),
zap.Stringer("lag", lagDuration))
Expand All @@ -592,7 +600,8 @@ func (c *CheckpointAdvancer) importantTick(ctx context.Context) error {
}
isLagged, err := c.isCheckpointLagged(ctx)
if err != nil {
return errors.Annotate(err, "failed to check timestamp")
// ignore the error, just log it
log.Warn("failed to check timestamp", logutil.ShortError(err))
}
if isLagged {
err := c.env.PauseTask(ctx, c.task.Name)
Expand Down Expand Up @@ -657,7 +666,7 @@ func (c *CheckpointAdvancer) tick(ctx context.Context) error {
c.taskMu.Lock()
defer c.taskMu.Unlock()
if c.task == nil || c.isPaused.Load() {
log.Debug("No tasks yet, skipping advancing.")
log.Info("No tasks yet, skipping advancing.")
return nil
}

Expand Down
149 changes: 136 additions & 13 deletions br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"bytes"
"context"
"fmt"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -518,6 +517,85 @@ func TestEnableCheckPointLimit(t *testing.T) {
}
}

func TestOwnerChangeCheckPointLagged(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
fmt.Println(c)
}()
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

env := newTestEnv(c, t)
rngs := env.ranges
if len(rngs) == 0 {
rngs = []kv.KeyRange{{}}
}
env.task = streamhelper.TaskEvent{
Type: streamhelper.EventAdd,
Name: "whole",
Info: &backup.StreamBackupTaskInfo{
Name: "whole",
StartTs: oracle.GoTimeToTS(oracle.GetTimeFromTS(0).Add(1 * time.Minute)),
},
Ranges: rngs,
}

adv := streamhelper.NewCheckpointAdvancer(env)
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
ctx1, cancel1 := context.WithCancel(context.Background())
adv.OnStart(ctx1)
adv.OnBecomeOwner(ctx1)
log.Info("advancer1 become owner")
require.NoError(t, adv.OnTick(ctx1))

// another advancer but never advance checkpoint before
adv2 := streamhelper.NewCheckpointAdvancer(env)
adv2.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
ctx2, cancel2 := context.WithCancel(context.Background())
adv2.OnStart(ctx2)

for i := 0; i < 5; i++ {
c.advanceClusterTimeBy(2 * time.Minute)
c.advanceCheckpointBy(2 * time.Minute)
require.NoError(t, adv.OnTick(ctx1))
}
c.advanceClusterTimeBy(2 * time.Minute)
require.ErrorContains(t, adv.OnTick(ctx1), "lagged too large")

// resume task to make next tick normally
c.advanceCheckpointBy(2 * time.Minute)
env.ResumeTask(ctx)

// stop advancer1, and advancer2 should take over
cancel1()
log.Info("advancer1 owner canceled, and advancer2 become owner")
adv2.OnBecomeOwner(ctx2)
require.NoError(t, adv2.OnTick(ctx2))

// advancer2 should take over and tick normally
for i := 0; i < 10; i++ {
c.advanceClusterTimeBy(2 * time.Minute)
c.advanceCheckpointBy(2 * time.Minute)
require.NoError(t, adv2.OnTick(ctx2))
}
c.advanceClusterTimeBy(2 * time.Minute)
require.ErrorContains(t, adv2.OnTick(ctx2), "lagged too large")
// stop advancer2, and advancer1 should take over
c.advanceCheckpointBy(2 * time.Minute)
env.ResumeTask(ctx)
cancel2()
log.Info("advancer2 owner canceled, and advancer1 become owner")

adv.OnBecomeOwner(ctx)
// advancer1 should take over and tick normally when come back
require.NoError(t, adv.OnTick(ctx))
}

func TestCheckPointLagged(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
Expand Down Expand Up @@ -548,8 +626,10 @@ func TestCheckPointLagged(t *testing.T) {
})
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(2 * time.Minute)
// if global ts is not advanced, the checkpoint will not be lagged
c.advanceCheckpointBy(2 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
c.advanceClusterTimeBy(1 * time.Minute)
c.advanceClusterTimeBy(3 * time.Minute)
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
// after some times, the isPaused will be set and ticks are skipped
require.Eventually(t, func() bool {
Expand All @@ -573,8 +653,10 @@ func TestCheckPointResume(t *testing.T) {
})
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(1 * time.Minute)
// if global ts is not advanced, the checkpoint will not be lagged
c.advanceCheckpointBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
c.advanceClusterTimeBy(1 * time.Minute)
c.advanceClusterTimeBy(2 * time.Minute)
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
require.Eventually(t, func() bool {
return assert.NoError(t, adv.OnTick(ctx))
Expand Down Expand Up @@ -604,18 +686,48 @@ func TestUnregisterAfterPause(t *testing.T) {
c.CheckPointLagLimit = 1 * time.Minute
})
adv.StartTaskListener(ctx)

// wait for the task to be added
require.Eventually(t, func() bool {
return adv.HasTask()
}, 5*time.Second, 100*time.Millisecond)

// task is should be paused when global checkpoint is laggeod
// even the global checkpoint is equal to task start ts(not advanced all the time)
c.advanceClusterTimeBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
env.PauseTask(ctx, "whole")
time.Sleep(1 * time.Second)
c.advanceClusterTimeBy(1 * time.Minute)
require.Error(t, adv.OnTick(ctx), "checkpoint is lagged")
env.unregisterTask()
env.putTask()

// wait for the task to be added
require.Eventually(t, func() bool {
return adv.HasTask()
}, 5*time.Second, 100*time.Millisecond)

require.Error(t, adv.OnTick(ctx), "checkpoint is lagged")

env.unregisterTask()
// wait for the task to be deleted
require.Eventually(t, func() bool {
return !adv.HasTask()
}, 5*time.Second, 100*time.Millisecond)

// reset
c.advanceClusterTimeBy(-1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
env.PauseTask(ctx, "whole")
c.advanceClusterTimeBy(1 * time.Minute)
env.unregisterTask()
env.putTask()
// wait for the task to be add
require.Eventually(t, func() bool {
err := adv.OnTick(ctx)
return err != nil && strings.Contains(err.Error(), "check point lagged too large")
}, 5*time.Second, 300*time.Millisecond)
return adv.HasTask()
}, 5*time.Second, 100*time.Millisecond)

require.Error(t, adv.OnTick(ctx), "checkpoint is lagged")
}

// If the start ts is *NOT* lagged, even both the cluster and pd are lagged, the task should run normally.
Expand Down Expand Up @@ -727,13 +839,18 @@ func TestAddTaskWithLongRunTask2(t *testing.T) {
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(3 * time.Minute)
c.advanceCheckpointBy(1 * time.Minute)
env.advanceCheckpointBy(2 * time.Minute)
env.mockPDConnectionError()
adv.StartTaskListener(ctx)
// Try update checkpoint
require.NoError(t, adv.OnTick(ctx))
// if cannot connect to pd, the checkpoint will be rolled back
// because at this point. the global ts is 2 minutes
// and the local checkpoint ts is 1 minute
require.Error(t, adv.OnTick(ctx), "checkpoint rollback")

// only when local checkpoint > global ts, the next tick will be normal
c.advanceCheckpointBy(12 * time.Minute)
// Verify no err raised
require.NoError(t, adv.OnTick(ctx))
}
Expand Down Expand Up @@ -767,11 +884,17 @@ func TestAddTaskWithLongRunTask3(t *testing.T) {
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
c.advanceClusterTimeBy(3 * time.Minute)
// advance cluster time to 4 minutes, and checkpoint to 1 minutes
// if start ts equals to checkpoint, the task will not be paused
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(2 * time.Minute)
c.advanceCheckpointBy(1 * time.Minute)
env.advanceCheckpointBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))

c.advanceClusterTimeBy(2 * time.Minute)
c.advanceCheckpointBy(1 * time.Minute)
env.advanceCheckpointBy(1 * time.Minute)
env.mockPDConnectionError()
adv.StartTaskListener(ctx)
// Try update checkpoint
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
// Verify no err raised after paused
Expand Down
12 changes: 10 additions & 2 deletions br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,12 @@ func (t *testEnv) UploadV3GlobalCheckpointForTask(ctx context.Context, _ string,
defer t.mu.Unlock()

if checkpoint < t.checkpoint {
t.testCtx.Fatalf("checkpoint rolling back (from %d to %d)", t.checkpoint, checkpoint)
log.Error("checkpoint rolling back",
zap.Uint64("from", t.checkpoint),
zap.Uint64("to", checkpoint),
zap.Stack("stack"))
// t.testCtx.Fatalf("checkpoint rolling back (from %d to %d)", t.checkpoint, checkpoint)
return errors.New("checkpoint rolling back")
}
t.checkpoint = checkpoint
return nil
Expand Down Expand Up @@ -745,6 +750,8 @@ func (t *testEnv) advanceCheckpointBy(duration time.Duration) {
t.mu.Lock()
defer t.mu.Unlock()

log.Info("advance checkpoint", zap.Duration("duration", duration), zap.Uint64("from", t.checkpoint))

t.checkpoint = oracle.GoTimeToTS(oracle.GetTimeFromTS(t.checkpoint).Add(duration))
}

Expand All @@ -764,7 +771,8 @@ func (t *testEnv) putTask() {
Type: streamhelper.EventAdd,
Name: "whole",
Info: &backup.StreamBackupTaskInfo{
Name: "whole",
Name: "whole",
StartTs: 5,
},
Ranges: rngs,
}
Expand Down

0 comments on commit 9ffb182

Please sign in to comment.