Skip to content

Commit

Permalink
This is an automated cherry-pick of pingcap#58135
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
3pointer authored and ti-chi-bot committed Jan 21, 2025
1 parent e9a1f7f commit 81c5723
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 20 deletions.
4 changes: 4 additions & 0 deletions br/pkg/streamhelper/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,11 @@ go_test(
],
flaky = True,
race = "on",
<<<<<<< HEAD
shard_count = 29,
=======
shard_count = 35,
>>>>>>> e3248e76b56 (log backup: use global checkpoint ts as source of truth (#58135))
deps = [
":streamhelper",
"//br/pkg/errors",
Expand Down
19 changes: 14 additions & 5 deletions br/pkg/streamhelper/advancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,8 +431,8 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error
c.setCheckpoints(spans.Sorted(spans.NewFullWith(e.Ranges, 0)))
globalCheckpointTs, err := c.env.GetGlobalCheckpointForTask(ctx, e.Name)
if err != nil {
log.Error("failed to get global checkpoint, skipping.", logutil.ShortError(err))
return err
// ignore the error, just log it
log.Warn("failed to get global checkpoint, skipping.", logutil.ShortError(err))
}
if globalCheckpointTs < c.task.StartTs {
globalCheckpointTs = c.task.StartTs
Expand Down Expand Up @@ -566,13 +566,21 @@ func (c *CheckpointAdvancer) isCheckpointLagged(ctx context.Context) (bool, erro
if c.cfg.CheckPointLagLimit <= 0 {
return false, nil
}
globalTs, err := c.env.GetGlobalCheckpointForTask(ctx, c.task.Name)
if err != nil {
return false, err
}
if globalTs < c.task.StartTs {
// unreachable.
return false, nil
}

now, err := c.env.FetchCurrentTS(ctx)
if err != nil {
return false, err
}

lagDuration := oracle.GetTimeFromTS(now).Sub(oracle.GetTimeFromTS(c.lastCheckpoint.TS))
lagDuration := oracle.GetTimeFromTS(now).Sub(oracle.GetTimeFromTS(globalTs))
if lagDuration > c.cfg.CheckPointLagLimit {
log.Warn("checkpoint lag is too large", zap.String("category", "log backup advancer"),
zap.Stringer("lag", lagDuration))
Expand All @@ -590,7 +598,8 @@ func (c *CheckpointAdvancer) importantTick(ctx context.Context) error {
}
isLagged, err := c.isCheckpointLagged(ctx)
if err != nil {
return errors.Annotate(err, "failed to check timestamp")
// ignore the error, just log it
log.Warn("failed to check timestamp", logutil.ShortError(err))
}
if isLagged {
err := c.env.PauseTask(ctx, c.task.Name)
Expand Down Expand Up @@ -640,7 +649,7 @@ func (c *CheckpointAdvancer) tick(ctx context.Context) error {
c.taskMu.Lock()
defer c.taskMu.Unlock()
if c.task == nil || c.isPaused.Load() {
log.Debug("No tasks yet, skipping advancing.")
log.Info("No tasks yet, skipping advancing.")
return nil
}

Expand Down
149 changes: 136 additions & 13 deletions br/pkg/streamhelper/advancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package streamhelper_test
import (
"context"
"fmt"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -488,6 +487,85 @@ func TestEnableCheckPointLimit(t *testing.T) {
}
}

func TestOwnerChangeCheckPointLagged(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
fmt.Println(c)
}()
c.splitAndScatter("01", "02", "022", "023", "033", "04", "043")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

env := newTestEnv(c, t)
rngs := env.ranges
if len(rngs) == 0 {
rngs = []kv.KeyRange{{}}
}
env.task = streamhelper.TaskEvent{
Type: streamhelper.EventAdd,
Name: "whole",
Info: &backup.StreamBackupTaskInfo{
Name: "whole",
StartTs: oracle.GoTimeToTS(oracle.GetTimeFromTS(0).Add(1 * time.Minute)),
},
Ranges: rngs,
}

adv := streamhelper.NewCheckpointAdvancer(env)
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
ctx1, cancel1 := context.WithCancel(context.Background())
adv.OnStart(ctx1)
adv.OnBecomeOwner(ctx1)
log.Info("advancer1 become owner")
require.NoError(t, adv.OnTick(ctx1))

// another advancer but never advance checkpoint before
adv2 := streamhelper.NewCheckpointAdvancer(env)
adv2.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
ctx2, cancel2 := context.WithCancel(context.Background())
adv2.OnStart(ctx2)

for i := 0; i < 5; i++ {
c.advanceClusterTimeBy(2 * time.Minute)
c.advanceCheckpointBy(2 * time.Minute)
require.NoError(t, adv.OnTick(ctx1))
}
c.advanceClusterTimeBy(2 * time.Minute)
require.ErrorContains(t, adv.OnTick(ctx1), "lagged too large")

// resume task to make next tick normally
c.advanceCheckpointBy(2 * time.Minute)
env.ResumeTask(ctx)

// stop advancer1, and advancer2 should take over
cancel1()
log.Info("advancer1 owner canceled, and advancer2 become owner")
adv2.OnBecomeOwner(ctx2)
require.NoError(t, adv2.OnTick(ctx2))

// advancer2 should take over and tick normally
for i := 0; i < 10; i++ {
c.advanceClusterTimeBy(2 * time.Minute)
c.advanceCheckpointBy(2 * time.Minute)
require.NoError(t, adv2.OnTick(ctx2))
}
c.advanceClusterTimeBy(2 * time.Minute)
require.ErrorContains(t, adv2.OnTick(ctx2), "lagged too large")
// stop advancer2, and advancer1 should take over
c.advanceCheckpointBy(2 * time.Minute)
env.ResumeTask(ctx)
cancel2()
log.Info("advancer2 owner canceled, and advancer1 become owner")

adv.OnBecomeOwner(ctx)
// advancer1 should take over and tick normally when come back
require.NoError(t, adv.OnTick(ctx))
}

func TestCheckPointLagged(t *testing.T) {
c := createFakeCluster(t, 4, false)
defer func() {
Expand Down Expand Up @@ -518,8 +596,10 @@ func TestCheckPointLagged(t *testing.T) {
})
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(2 * time.Minute)
// if global ts is not advanced, the checkpoint will not be lagged
c.advanceCheckpointBy(2 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
c.advanceClusterTimeBy(1 * time.Minute)
c.advanceClusterTimeBy(3 * time.Minute)
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
// after some times, the isPaused will be set and ticks are skipped
require.Eventually(t, func() bool {
Expand All @@ -543,8 +623,10 @@ func TestCheckPointResume(t *testing.T) {
})
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(1 * time.Minute)
// if global ts is not advanced, the checkpoint will not be lagged
c.advanceCheckpointBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
c.advanceClusterTimeBy(1 * time.Minute)
c.advanceClusterTimeBy(2 * time.Minute)
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
require.Eventually(t, func() bool {
return assert.NoError(t, adv.OnTick(ctx))
Expand Down Expand Up @@ -574,18 +656,48 @@ func TestUnregisterAfterPause(t *testing.T) {
c.CheckPointLagLimit = 1 * time.Minute
})
adv.StartTaskListener(ctx)

// wait for the task to be added
require.Eventually(t, func() bool {
return adv.HasTask()
}, 5*time.Second, 100*time.Millisecond)

// task is should be paused when global checkpoint is laggeod
// even the global checkpoint is equal to task start ts(not advanced all the time)
c.advanceClusterTimeBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
env.PauseTask(ctx, "whole")
time.Sleep(1 * time.Second)
c.advanceClusterTimeBy(1 * time.Minute)
require.Error(t, adv.OnTick(ctx), "checkpoint is lagged")
env.unregisterTask()
env.putTask()

// wait for the task to be added
require.Eventually(t, func() bool {
return adv.HasTask()
}, 5*time.Second, 100*time.Millisecond)

require.Error(t, adv.OnTick(ctx), "checkpoint is lagged")

env.unregisterTask()
// wait for the task to be deleted
require.Eventually(t, func() bool {
return !adv.HasTask()
}, 5*time.Second, 100*time.Millisecond)

// reset
c.advanceClusterTimeBy(-1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))
env.PauseTask(ctx, "whole")
c.advanceClusterTimeBy(1 * time.Minute)
env.unregisterTask()
env.putTask()
// wait for the task to be add
require.Eventually(t, func() bool {
err := adv.OnTick(ctx)
return err != nil && strings.Contains(err.Error(), "check point lagged too large")
}, 5*time.Second, 300*time.Millisecond)
return adv.HasTask()
}, 5*time.Second, 100*time.Millisecond)

require.Error(t, adv.OnTick(ctx), "checkpoint is lagged")
}

// If the start ts is *NOT* lagged, even both the cluster and pd are lagged, the task should run normally.
Expand Down Expand Up @@ -697,13 +809,18 @@ func TestAddTaskWithLongRunTask2(t *testing.T) {
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(3 * time.Minute)
c.advanceCheckpointBy(1 * time.Minute)
env.advanceCheckpointBy(2 * time.Minute)
env.mockPDConnectionError()
adv.StartTaskListener(ctx)
// Try update checkpoint
require.NoError(t, adv.OnTick(ctx))
// if cannot connect to pd, the checkpoint will be rolled back
// because at this point. the global ts is 2 minutes
// and the local checkpoint ts is 1 minute
require.Error(t, adv.OnTick(ctx), "checkpoint rollback")

// only when local checkpoint > global ts, the next tick will be normal
c.advanceCheckpointBy(12 * time.Minute)
// Verify no err raised
require.NoError(t, adv.OnTick(ctx))
}
Expand Down Expand Up @@ -737,11 +854,17 @@ func TestAddTaskWithLongRunTask3(t *testing.T) {
adv.UpdateConfigWith(func(c *config.Config) {
c.CheckPointLagLimit = 1 * time.Minute
})
c.advanceClusterTimeBy(3 * time.Minute)
// advance cluster time to 4 minutes, and checkpoint to 1 minutes
// if start ts equals to checkpoint, the task will not be paused
adv.StartTaskListener(ctx)
c.advanceClusterTimeBy(2 * time.Minute)
c.advanceCheckpointBy(1 * time.Minute)
env.advanceCheckpointBy(1 * time.Minute)
require.NoError(t, adv.OnTick(ctx))

c.advanceClusterTimeBy(2 * time.Minute)
c.advanceCheckpointBy(1 * time.Minute)
env.advanceCheckpointBy(1 * time.Minute)
env.mockPDConnectionError()
adv.StartTaskListener(ctx)
// Try update checkpoint
require.ErrorContains(t, adv.OnTick(ctx), "lagged too large")
// Verify no err raised after paused
Expand Down
12 changes: 10 additions & 2 deletions br/pkg/streamhelper/basic_lib_for_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,12 @@ func (t *testEnv) UploadV3GlobalCheckpointForTask(ctx context.Context, _ string,
defer t.mu.Unlock()

if checkpoint < t.checkpoint {
t.testCtx.Fatalf("checkpoint rolling back (from %d to %d)", t.checkpoint, checkpoint)
log.Error("checkpoint rolling back",
zap.Uint64("from", t.checkpoint),
zap.Uint64("to", checkpoint),
zap.Stack("stack"))
// t.testCtx.Fatalf("checkpoint rolling back (from %d to %d)", t.checkpoint, checkpoint)
return errors.New("checkpoint rolling back")
}
t.checkpoint = checkpoint
return nil
Expand Down Expand Up @@ -739,6 +744,8 @@ func (t *testEnv) advanceCheckpointBy(duration time.Duration) {
t.mu.Lock()
defer t.mu.Unlock()

log.Info("advance checkpoint", zap.Duration("duration", duration), zap.Uint64("from", t.checkpoint))

t.checkpoint = oracle.GoTimeToTS(oracle.GetTimeFromTS(t.checkpoint).Add(duration))
}

Expand All @@ -758,7 +765,8 @@ func (t *testEnv) putTask() {
Type: streamhelper.EventAdd,
Name: "whole",
Info: &backup.StreamBackupTaskInfo{
Name: "whole",
Name: "whole",
StartTs: 5,
},
Ranges: rngs,
}
Expand Down

0 comments on commit 81c5723

Please sign in to comment.