diff --git a/br/pkg/streamhelper/BUILD.bazel b/br/pkg/streamhelper/BUILD.bazel index ac350fd9774b6..95d7cb0287141 100644 --- a/br/pkg/streamhelper/BUILD.bazel +++ b/br/pkg/streamhelper/BUILD.bazel @@ -69,7 +69,7 @@ go_test( ], flaky = True, race = "on", - shard_count = 34, + shard_count = 35, deps = [ ":streamhelper", "//br/pkg/errors", diff --git a/br/pkg/streamhelper/advancer.go b/br/pkg/streamhelper/advancer.go index 945860c288a24..d2fd40ac9f1c2 100644 --- a/br/pkg/streamhelper/advancer.go +++ b/br/pkg/streamhelper/advancer.go @@ -425,8 +425,8 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error c.setCheckpoints(spans.Sorted(spans.NewFullWith(e.Ranges, 0))) globalCheckpointTs, err := c.env.GetGlobalCheckpointForTask(ctx, e.Name) if err != nil { - log.Error("failed to get global checkpoint, skipping.", logutil.ShortError(err)) - return err + // ignore the error, just log it + log.Warn("failed to get global checkpoint, skipping.", logutil.ShortError(err)) } if globalCheckpointTs < c.task.StartTs { globalCheckpointTs = c.task.StartTs @@ -567,13 +567,21 @@ func (c *CheckpointAdvancer) isCheckpointLagged(ctx context.Context) (bool, erro if c.cfg.CheckPointLagLimit <= 0 { return false, nil } + globalTs, err := c.env.GetGlobalCheckpointForTask(ctx, c.task.Name) + if err != nil { + return false, err + } + if globalTs < c.task.StartTs { + // unreachable. + return false, nil + } now, err := c.env.FetchCurrentTS(ctx) if err != nil { return false, err } - lagDuration := oracle.GetTimeFromTS(now).Sub(oracle.GetTimeFromTS(c.lastCheckpoint.TS)) + lagDuration := oracle.GetTimeFromTS(now).Sub(oracle.GetTimeFromTS(globalTs)) if lagDuration > c.cfg.CheckPointLagLimit { log.Warn("checkpoint lag is too large", zap.String("category", "log backup advancer"), zap.Stringer("lag", lagDuration)) @@ -591,7 +599,8 @@ func (c *CheckpointAdvancer) importantTick(ctx context.Context) error { } isLagged, err := c.isCheckpointLagged(ctx) if err != nil { - return errors.Annotate(err, "failed to check timestamp") + // ignore the error, just log it + log.Warn("failed to check timestamp", logutil.ShortError(err)) } if isLagged { err := c.env.PauseTask(ctx, c.task.Name) @@ -656,7 +665,7 @@ func (c *CheckpointAdvancer) tick(ctx context.Context) error { c.taskMu.Lock() defer c.taskMu.Unlock() if c.task == nil || c.isPaused.Load() { - log.Debug("No tasks yet, skipping advancing.") + log.Info("No tasks yet, skipping advancing.") return nil } diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index e4d83c682f789..37aa697e67791 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -6,7 +6,6 @@ import ( "bytes" "context" "fmt" - "strings" "sync" "testing" "time" @@ -518,6 +517,85 @@ func TestEnableCheckPointLimit(t *testing.T) { } } +func TestOwnerChangeCheckPointLagged(t *testing.T) { + c := createFakeCluster(t, 4, false) + defer func() { + fmt.Println(c) + }() + c.splitAndScatter("01", "02", "022", "023", "033", "04", "043") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + env := newTestEnv(c, t) + rngs := env.ranges + if len(rngs) == 0 { + rngs = []kv.KeyRange{{}} + } + env.task = streamhelper.TaskEvent{ + Type: streamhelper.EventAdd, + Name: "whole", + Info: &backup.StreamBackupTaskInfo{ + Name: "whole", + StartTs: oracle.GoTimeToTS(oracle.GetTimeFromTS(0).Add(1 * time.Minute)), + }, + Ranges: rngs, + } + + adv := streamhelper.NewCheckpointAdvancer(env) + adv.UpdateConfigWith(func(c *config.Config) { + c.CheckPointLagLimit = 1 * time.Minute + }) + ctx1, cancel1 := context.WithCancel(context.Background()) + adv.OnStart(ctx1) + adv.OnBecomeOwner(ctx1) + log.Info("advancer1 become owner") + require.NoError(t, adv.OnTick(ctx1)) + + // another advancer but never advance checkpoint before + adv2 := streamhelper.NewCheckpointAdvancer(env) + adv2.UpdateConfigWith(func(c *config.Config) { + c.CheckPointLagLimit = 1 * time.Minute + }) + ctx2, cancel2 := context.WithCancel(context.Background()) + adv2.OnStart(ctx2) + + for i := 0; i < 5; i++ { + c.advanceClusterTimeBy(2 * time.Minute) + c.advanceCheckpointBy(2 * time.Minute) + require.NoError(t, adv.OnTick(ctx1)) + } + c.advanceClusterTimeBy(2 * time.Minute) + require.ErrorContains(t, adv.OnTick(ctx1), "lagged too large") + + // resume task to make next tick normally + c.advanceCheckpointBy(2 * time.Minute) + env.ResumeTask(ctx) + + // stop advancer1, and advancer2 should take over + cancel1() + log.Info("advancer1 owner canceled, and advancer2 become owner") + adv2.OnBecomeOwner(ctx2) + require.NoError(t, adv2.OnTick(ctx2)) + + // advancer2 should take over and tick normally + for i := 0; i < 10; i++ { + c.advanceClusterTimeBy(2 * time.Minute) + c.advanceCheckpointBy(2 * time.Minute) + require.NoError(t, adv2.OnTick(ctx2)) + } + c.advanceClusterTimeBy(2 * time.Minute) + require.ErrorContains(t, adv2.OnTick(ctx2), "lagged too large") + // stop advancer2, and advancer1 should take over + c.advanceCheckpointBy(2 * time.Minute) + env.ResumeTask(ctx) + cancel2() + log.Info("advancer2 owner canceled, and advancer1 become owner") + + adv.OnBecomeOwner(ctx) + // advancer1 should take over and tick normally when come back + require.NoError(t, adv.OnTick(ctx)) +} + func TestCheckPointLagged(t *testing.T) { c := createFakeCluster(t, 4, false) defer func() { @@ -548,8 +626,10 @@ func TestCheckPointLagged(t *testing.T) { }) adv.StartTaskListener(ctx) c.advanceClusterTimeBy(2 * time.Minute) + // if global ts is not advanced, the checkpoint will not be lagged + c.advanceCheckpointBy(2 * time.Minute) require.NoError(t, adv.OnTick(ctx)) - c.advanceClusterTimeBy(1 * time.Minute) + c.advanceClusterTimeBy(3 * time.Minute) require.ErrorContains(t, adv.OnTick(ctx), "lagged too large") // after some times, the isPaused will be set and ticks are skipped require.Eventually(t, func() bool { @@ -573,8 +653,10 @@ func TestCheckPointResume(t *testing.T) { }) adv.StartTaskListener(ctx) c.advanceClusterTimeBy(1 * time.Minute) + // if global ts is not advanced, the checkpoint will not be lagged + c.advanceCheckpointBy(1 * time.Minute) require.NoError(t, adv.OnTick(ctx)) - c.advanceClusterTimeBy(1 * time.Minute) + c.advanceClusterTimeBy(2 * time.Minute) require.ErrorContains(t, adv.OnTick(ctx), "lagged too large") require.Eventually(t, func() bool { return assert.NoError(t, adv.OnTick(ctx)) @@ -604,18 +686,48 @@ func TestUnregisterAfterPause(t *testing.T) { c.CheckPointLagLimit = 1 * time.Minute }) adv.StartTaskListener(ctx) + + // wait for the task to be added + require.Eventually(t, func() bool { + return adv.HasTask() + }, 5*time.Second, 100*time.Millisecond) + + // task is should be paused when global checkpoint is laggeod + // even the global checkpoint is equal to task start ts(not advanced all the time) c.advanceClusterTimeBy(1 * time.Minute) require.NoError(t, adv.OnTick(ctx)) env.PauseTask(ctx, "whole") - time.Sleep(1 * time.Second) c.advanceClusterTimeBy(1 * time.Minute) + require.Error(t, adv.OnTick(ctx), "checkpoint is lagged") + env.unregisterTask() + env.putTask() + + // wait for the task to be added + require.Eventually(t, func() bool { + return adv.HasTask() + }, 5*time.Second, 100*time.Millisecond) + + require.Error(t, adv.OnTick(ctx), "checkpoint is lagged") + + env.unregisterTask() + // wait for the task to be deleted + require.Eventually(t, func() bool { + return !adv.HasTask() + }, 5*time.Second, 100*time.Millisecond) + + // reset + c.advanceClusterTimeBy(-1 * time.Minute) require.NoError(t, adv.OnTick(ctx)) + env.PauseTask(ctx, "whole") + c.advanceClusterTimeBy(1 * time.Minute) env.unregisterTask() env.putTask() + // wait for the task to be add require.Eventually(t, func() bool { - err := adv.OnTick(ctx) - return err != nil && strings.Contains(err.Error(), "check point lagged too large") - }, 5*time.Second, 300*time.Millisecond) + return adv.HasTask() + }, 5*time.Second, 100*time.Millisecond) + + require.Error(t, adv.OnTick(ctx), "checkpoint is lagged") } // If the start ts is *NOT* lagged, even both the cluster and pd are lagged, the task should run normally. @@ -727,13 +839,18 @@ func TestAddTaskWithLongRunTask2(t *testing.T) { adv.UpdateConfigWith(func(c *config.Config) { c.CheckPointLagLimit = 1 * time.Minute }) + adv.StartTaskListener(ctx) c.advanceClusterTimeBy(3 * time.Minute) c.advanceCheckpointBy(1 * time.Minute) env.advanceCheckpointBy(2 * time.Minute) env.mockPDConnectionError() - adv.StartTaskListener(ctx) - // Try update checkpoint - require.NoError(t, adv.OnTick(ctx)) + // if cannot connect to pd, the checkpoint will be rolled back + // because at this point. the global ts is 2 minutes + // and the local checkpoint ts is 1 minute + require.Error(t, adv.OnTick(ctx), "checkpoint rollback") + + // only when local checkpoint > global ts, the next tick will be normal + c.advanceCheckpointBy(12 * time.Minute) // Verify no err raised require.NoError(t, adv.OnTick(ctx)) } @@ -767,11 +884,17 @@ func TestAddTaskWithLongRunTask3(t *testing.T) { adv.UpdateConfigWith(func(c *config.Config) { c.CheckPointLagLimit = 1 * time.Minute }) - c.advanceClusterTimeBy(3 * time.Minute) + // advance cluster time to 4 minutes, and checkpoint to 1 minutes + // if start ts equals to checkpoint, the task will not be paused + adv.StartTaskListener(ctx) + c.advanceClusterTimeBy(2 * time.Minute) + c.advanceCheckpointBy(1 * time.Minute) + env.advanceCheckpointBy(1 * time.Minute) + require.NoError(t, adv.OnTick(ctx)) + + c.advanceClusterTimeBy(2 * time.Minute) c.advanceCheckpointBy(1 * time.Minute) env.advanceCheckpointBy(1 * time.Minute) - env.mockPDConnectionError() - adv.StartTaskListener(ctx) // Try update checkpoint require.ErrorContains(t, adv.OnTick(ctx), "lagged too large") // Verify no err raised after paused diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index 52ce89519a23a..22a66c18e27af 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -687,7 +687,12 @@ func (t *testEnv) UploadV3GlobalCheckpointForTask(ctx context.Context, _ string, defer t.mu.Unlock() if checkpoint < t.checkpoint { - t.testCtx.Fatalf("checkpoint rolling back (from %d to %d)", t.checkpoint, checkpoint) + log.Error("checkpoint rolling back", + zap.Uint64("from", t.checkpoint), + zap.Uint64("to", checkpoint), + zap.Stack("stack")) + // t.testCtx.Fatalf("checkpoint rolling back (from %d to %d)", t.checkpoint, checkpoint) + return errors.New("checkpoint rolling back") } t.checkpoint = checkpoint return nil @@ -747,6 +752,8 @@ func (t *testEnv) advanceCheckpointBy(duration time.Duration) { t.mu.Lock() defer t.mu.Unlock() + log.Info("advance checkpoint", zap.Duration("duration", duration), zap.Uint64("from", t.checkpoint)) + t.checkpoint = oracle.GoTimeToTS(oracle.GetTimeFromTS(t.checkpoint).Add(duration)) } @@ -766,7 +773,8 @@ func (t *testEnv) putTask() { Type: streamhelper.EventAdd, Name: "whole", Info: &backup.StreamBackupTaskInfo{ - Name: "whole", + Name: "whole", + StartTs: 5, }, Ranges: rngs, }