From e24504968c72990769fd2e84aa20363c48a47c4f Mon Sep 17 00:00:00 2001 From: 3pointer Date: Tue, 10 Dec 2024 17:00:53 +0800 Subject: [PATCH 1/7] hanlde lastCheckpoint correctly when check the lag --- br/pkg/streamhelper/advancer.go | 7 +++++++ br/pkg/streamhelper/advancer_daemon.go | 1 + 2 files changed, 8 insertions(+) diff --git a/br/pkg/streamhelper/advancer.go b/br/pkg/streamhelper/advancer.go index 8e8263f63fed0..4df56932669e0 100644 --- a/br/pkg/streamhelper/advancer.go +++ b/br/pkg/streamhelper/advancer.go @@ -568,6 +568,13 @@ func (c *CheckpointAdvancer) isCheckpointLagged(ctx context.Context) (bool, erro if c.cfg.CheckPointLagLimit <= 0 { return false, nil } + c.taskMu.Lock() + if c.lastCheckpoint == nil || c.task.StartTs == c.lastCheckpoint.TS { + c.taskMu.Unlock() + // task is not started yet + return false, nil + } + c.taskMu.Unlock() now, err := c.env.FetchCurrentTS(ctx) if err != nil { diff --git a/br/pkg/streamhelper/advancer_daemon.go b/br/pkg/streamhelper/advancer_daemon.go index 889c988886fa4..110841e9fbf5f 100644 --- a/br/pkg/streamhelper/advancer_daemon.go +++ b/br/pkg/streamhelper/advancer_daemon.go @@ -49,6 +49,7 @@ func (c *CheckpointAdvancer) Name() string { func (c *CheckpointAdvancer) OnStop() { metrics.AdvancerOwner.Set(0.0) metrics.LastCheckpoint.Reset() + c.lastCheckpoint.TS = 0 c.stopSubscriber() } From df5932fe55824e62d0ae58fcc19fce2b199c77b1 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Tue, 10 Dec 2024 18:11:36 +0800 Subject: [PATCH 2/7] log backup: use global env to calculate lag --- br/pkg/streamhelper/advancer.go | 11 ++-- br/pkg/streamhelper/advancer_daemon.go | 1 - br/pkg/streamhelper/advancer_test.go | 79 ++++++++++++++++++++++++++ 3 files changed, 85 insertions(+), 6 deletions(-) diff --git a/br/pkg/streamhelper/advancer.go b/br/pkg/streamhelper/advancer.go index 4df56932669e0..21a313659c629 100644 --- a/br/pkg/streamhelper/advancer.go +++ b/br/pkg/streamhelper/advancer.go @@ -568,20 +568,21 @@ func (c *CheckpointAdvancer) isCheckpointLagged(ctx context.Context) (bool, erro if c.cfg.CheckPointLagLimit <= 0 { return false, nil } - c.taskMu.Lock() - if c.lastCheckpoint == nil || c.task.StartTs == c.lastCheckpoint.TS { - c.taskMu.Unlock() + globalTs, err := c.env.GetGlobalCheckpointForTask(ctx, c.task.Name) + if err != nil { + return false, err + } + if globalTs <= c.task.StartTs { // task is not started yet return false, nil } - c.taskMu.Unlock() now, err := c.env.FetchCurrentTS(ctx) if err != nil { return false, err } - lagDuration := oracle.GetTimeFromTS(now).Sub(oracle.GetTimeFromTS(c.lastCheckpoint.TS)) + lagDuration := oracle.GetTimeFromTS(now).Sub(oracle.GetTimeFromTS(globalTs)) if lagDuration > c.cfg.CheckPointLagLimit { log.Warn("checkpoint lag is too large", zap.String("category", "log backup advancer"), zap.Stringer("lag", lagDuration)) diff --git a/br/pkg/streamhelper/advancer_daemon.go b/br/pkg/streamhelper/advancer_daemon.go index 110841e9fbf5f..889c988886fa4 100644 --- a/br/pkg/streamhelper/advancer_daemon.go +++ b/br/pkg/streamhelper/advancer_daemon.go @@ -49,7 +49,6 @@ func (c *CheckpointAdvancer) Name() string { func (c *CheckpointAdvancer) OnStop() { metrics.AdvancerOwner.Set(0.0) metrics.LastCheckpoint.Reset() - c.lastCheckpoint.TS = 0 c.stopSubscriber() } diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index e4d83c682f789..939598b730415 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -518,6 +518,85 @@ func TestEnableCheckPointLimit(t *testing.T) { } } +func TestOwnerChangeCheckPointLagged(t *testing.T) { + c := createFakeCluster(t, 4, false) + defer func() { + fmt.Println(c) + }() + c.splitAndScatter("01", "02", "022", "023", "033", "04", "043") + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + env := newTestEnv(c, t) + rngs := env.ranges + if len(rngs) == 0 { + rngs = []kv.KeyRange{{}} + } + env.task = streamhelper.TaskEvent{ + Type: streamhelper.EventAdd, + Name: "whole", + Info: &backup.StreamBackupTaskInfo{ + Name: "whole", + StartTs: oracle.GoTimeToTS(oracle.GetTimeFromTS(0).Add(1 * time.Minute)), + }, + Ranges: rngs, + } + + adv := streamhelper.NewCheckpointAdvancer(env) + adv.UpdateConfigWith(func(c *config.Config) { + c.CheckPointLagLimit = 1 * time.Minute + }) + ctx1, cancel1 := context.WithCancel(context.Background()) + adv.OnStart(ctx1) + adv.OnBecomeOwner(ctx1) + log.Info("advancer1 become owner") + require.NoError(t, adv.OnTick(ctx1)) + + // another advancer but never advance checkpoint before + adv2 := streamhelper.NewCheckpointAdvancer(env) + adv2.UpdateConfigWith(func(c *config.Config) { + c.CheckPointLagLimit = 1 * time.Minute + }) + ctx2, cancel2 := context.WithCancel(context.Background()) + adv2.OnStart(ctx2) + + for i := 0; i < 5; i++ { + c.advanceClusterTimeBy(2 * time.Minute) + c.advanceCheckpointBy(2 * time.Minute) + require.NoError(t, adv.OnTick(ctx1)) + } + c.advanceClusterTimeBy(2 * time.Minute) + require.ErrorContains(t, adv.OnTick(ctx1), "lagged too large") + + // resume task to make next tick normally + c.advanceCheckpointBy(2 * time.Minute) + env.ResumeTask(ctx) + + // stop advancer1, and advancer2 should take over + cancel1() + log.Info("advancer1 owner canceled, and advancer2 become owner") + adv2.OnBecomeOwner(ctx2) + require.NoError(t, adv2.OnTick(ctx2)) + + // advancer2 should take over and tick normally + for i := 0; i < 10; i++ { + c.advanceClusterTimeBy(2 * time.Minute) + c.advanceCheckpointBy(2 * time.Minute) + require.NoError(t, adv2.OnTick(ctx2)) + } + c.advanceClusterTimeBy(2 * time.Minute) + require.ErrorContains(t, adv2.OnTick(ctx2), "lagged too large") + // stop advancer2, and advancer1 should take over + c.advanceCheckpointBy(2 * time.Minute) + env.ResumeTask(ctx) + cancel2() + log.Info("advancer2 owner canceled, and advancer1 become owner") + + adv.OnBecomeOwner(ctx) + // advancer1 should take over and tick normally when come back + require.NoError(t, adv.OnTick(ctx)) +} + func TestCheckPointLagged(t *testing.T) { c := createFakeCluster(t, 4, false) defer func() { From f7b32c321bc196a1907d720bd45f34d3d9f21f7c Mon Sep 17 00:00:00 2001 From: 3pointer Date: Tue, 10 Dec 2024 18:47:36 +0800 Subject: [PATCH 3/7] fix build --- br/pkg/streamhelper/BUILD.bazel | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/br/pkg/streamhelper/BUILD.bazel b/br/pkg/streamhelper/BUILD.bazel index ac350fd9774b6..95d7cb0287141 100644 --- a/br/pkg/streamhelper/BUILD.bazel +++ b/br/pkg/streamhelper/BUILD.bazel @@ -69,7 +69,7 @@ go_test( ], flaky = True, race = "on", - shard_count = 34, + shard_count = 35, deps = [ ":streamhelper", "//br/pkg/errors", From 2f6a862793977cf6d97b74d461eada1ff23ca2e4 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Wed, 11 Dec 2024 11:43:49 +0800 Subject: [PATCH 4/7] fix broken unit test --- br/pkg/streamhelper/advancer_test.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index 939598b730415..cbe0157995be8 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -627,8 +627,10 @@ func TestCheckPointLagged(t *testing.T) { }) adv.StartTaskListener(ctx) c.advanceClusterTimeBy(2 * time.Minute) + // if global ts is not advanced, the checkpoint will not be lagged + c.advanceCheckpointBy(2 * time.Minute) require.NoError(t, adv.OnTick(ctx)) - c.advanceClusterTimeBy(1 * time.Minute) + c.advanceClusterTimeBy(3 * time.Minute) require.ErrorContains(t, adv.OnTick(ctx), "lagged too large") // after some times, the isPaused will be set and ticks are skipped require.Eventually(t, func() bool { @@ -652,8 +654,10 @@ func TestCheckPointResume(t *testing.T) { }) adv.StartTaskListener(ctx) c.advanceClusterTimeBy(1 * time.Minute) + // if global ts is not advanced, the checkpoint will not be lagged + c.advanceCheckpointBy(1 * time.Minute) require.NoError(t, adv.OnTick(ctx)) - c.advanceClusterTimeBy(1 * time.Minute) + c.advanceClusterTimeBy(2 * time.Minute) require.ErrorContains(t, adv.OnTick(ctx), "lagged too large") require.Eventually(t, func() bool { return assert.NoError(t, adv.OnTick(ctx)) From 26caf9bc0725e6009f12e5d4e2403b9a47051392 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Wed, 11 Dec 2024 12:19:06 +0800 Subject: [PATCH 5/7] continue fix test case --- br/pkg/streamhelper/advancer_test.go | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index cbe0157995be8..b9305dcbd7de0 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -6,7 +6,6 @@ import ( "bytes" "context" "fmt" - "strings" "sync" "testing" "time" @@ -695,10 +694,7 @@ func TestUnregisterAfterPause(t *testing.T) { require.NoError(t, adv.OnTick(ctx)) env.unregisterTask() env.putTask() - require.Eventually(t, func() bool { - err := adv.OnTick(ctx) - return err != nil && strings.Contains(err.Error(), "check point lagged too large") - }, 5*time.Second, 300*time.Millisecond) + require.NoError(t, adv.OnTick(ctx)) } // If the start ts is *NOT* lagged, even both the cluster and pd are lagged, the task should run normally. @@ -850,11 +846,17 @@ func TestAddTaskWithLongRunTask3(t *testing.T) { adv.UpdateConfigWith(func(c *config.Config) { c.CheckPointLagLimit = 1 * time.Minute }) - c.advanceClusterTimeBy(3 * time.Minute) + // advance cluster time to 4 minutes, and checkpoint to 1 minutes + // if start ts equals to checkpoint, the task will not be paused + c.advanceClusterTimeBy(4 * time.Minute) c.advanceCheckpointBy(1 * time.Minute) env.advanceCheckpointBy(1 * time.Minute) - env.mockPDConnectionError() adv.StartTaskListener(ctx) + require.NoError(t, adv.OnTick(ctx)) + + // if start ts < checkpoint, the task will be paused + c.advanceCheckpointBy(1 * time.Minute) + env.advanceCheckpointBy(1 * time.Minute) // Try update checkpoint require.ErrorContains(t, adv.OnTick(ctx), "lagged too large") // Verify no err raised after paused From 6c316bd45f71fe7b271c975b5cda30abf42c9b6a Mon Sep 17 00:00:00 2001 From: 3pointer Date: Wed, 11 Dec 2024 13:32:44 +0800 Subject: [PATCH 6/7] update cases --- br/pkg/streamhelper/advancer_test.go | 61 ++++++++++++++++++++++- br/pkg/streamhelper/basic_lib_for_test.go | 3 +- 2 files changed, 62 insertions(+), 2 deletions(-) diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index b9305dcbd7de0..1b8ef3b01cd5c 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -686,15 +686,74 @@ func TestUnregisterAfterPause(t *testing.T) { c.CheckPointLagLimit = 1 * time.Minute }) adv.StartTaskListener(ctx) + + // No matter how many times the task is paused, after put a new one the task should run normally + // First sequence: pause -> unregister -> put c.advanceClusterTimeBy(1 * time.Minute) require.NoError(t, adv.OnTick(ctx)) env.PauseTask(ctx, "whole") - time.Sleep(1 * time.Second) c.advanceClusterTimeBy(1 * time.Minute) require.NoError(t, adv.OnTick(ctx)) env.unregisterTask() env.putTask() require.NoError(t, adv.OnTick(ctx)) + + // Second sequence: put -> pause -> unregister -> put + c.advanceClusterTimeBy(1 * time.Minute) + env.putTask() + env.PauseTask(ctx, "whole") + env.unregisterTask() + env.putTask() + require.NoError(t, adv.OnTick(ctx)) + + // Third sequence: put -> pause -> put -> unregister -> put + c.advanceClusterTimeBy(1 * time.Minute) + env.putTask() + env.PauseTask(ctx, "whole") + env.putTask() + require.NoError(t, adv.OnTick(ctx)) + env.unregisterTask() + env.putTask() + require.NoError(t, adv.OnTick(ctx)) + + // Fourth sequence: unregister -> put -> pause -> put -> unregister -> put + c.advanceClusterTimeBy(1 * time.Minute) + env.unregisterTask() + env.putTask() + env.PauseTask(ctx, "whole") + time.Sleep(1 * time.Second) + env.putTask() + require.NoError(t, adv.OnTick(ctx)) + env.unregisterTask() + env.putTask() + require.NoError(t, adv.OnTick(ctx)) + + // Fifth sequence: multiple rapid operations with put before pause + for i := 0; i < 3; i++ { + c.advanceClusterTimeBy(1 * time.Minute) + env.putTask() + env.PauseTask(ctx, "whole") + env.unregisterTask() + env.putTask() + env.PauseTask(ctx, "whole") + env.putTask() + require.NoError(t, adv.OnTick(ctx)) + } + + // Sixth sequence: rapid alternating put and pause + for i := 0; i < 3; i++ { + c.advanceClusterTimeBy(1 * time.Minute) + env.putTask() + env.PauseTask(ctx, "whole") + env.putTask() + env.PauseTask(ctx, "whole") + env.putTask() + require.NoError(t, adv.OnTick(ctx)) + } + + // Final verification + c.advanceClusterTimeBy(1 * time.Minute) + require.NoError(t, adv.OnTick(ctx)) } // If the start ts is *NOT* lagged, even both the cluster and pd are lagged, the task should run normally. diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index 52ce89519a23a..5b5cd35c79021 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -766,7 +766,8 @@ func (t *testEnv) putTask() { Type: streamhelper.EventAdd, Name: "whole", Info: &backup.StreamBackupTaskInfo{ - Name: "whole", + Name: "whole", + StartTs: 5, }, Ranges: rngs, } From 99c99bbc3ff0da7b3a6164cfd3d94ce725e4fab9 Mon Sep 17 00:00:00 2001 From: 3pointer Date: Thu, 12 Dec 2024 17:00:33 +0800 Subject: [PATCH 7/7] do not return error when add task & fix some cases --- br/pkg/streamhelper/advancer.go | 13 +-- br/pkg/streamhelper/advancer_test.go | 97 +++++++++-------------- br/pkg/streamhelper/basic_lib_for_test.go | 9 ++- 3 files changed, 53 insertions(+), 66 deletions(-) diff --git a/br/pkg/streamhelper/advancer.go b/br/pkg/streamhelper/advancer.go index 21a313659c629..dab645ddb5ed1 100644 --- a/br/pkg/streamhelper/advancer.go +++ b/br/pkg/streamhelper/advancer.go @@ -426,8 +426,8 @@ func (c *CheckpointAdvancer) onTaskEvent(ctx context.Context, e TaskEvent) error c.setCheckpoints(spans.Sorted(spans.NewFullWith(e.Ranges, 0))) globalCheckpointTs, err := c.env.GetGlobalCheckpointForTask(ctx, e.Name) if err != nil { - log.Error("failed to get global checkpoint, skipping.", logutil.ShortError(err)) - return err + // ignore the error, just log it + log.Warn("failed to get global checkpoint, skipping.", logutil.ShortError(err)) } if globalCheckpointTs < c.task.StartTs { globalCheckpointTs = c.task.StartTs @@ -572,8 +572,8 @@ func (c *CheckpointAdvancer) isCheckpointLagged(ctx context.Context) (bool, erro if err != nil { return false, err } - if globalTs <= c.task.StartTs { - // task is not started yet + if globalTs < c.task.StartTs { + // unreachable. return false, nil } @@ -600,7 +600,8 @@ func (c *CheckpointAdvancer) importantTick(ctx context.Context) error { } isLagged, err := c.isCheckpointLagged(ctx) if err != nil { - return errors.Annotate(err, "failed to check timestamp") + // ignore the error, just log it + log.Warn("failed to check timestamp", logutil.ShortError(err)) } if isLagged { err := c.env.PauseTask(ctx, c.task.Name) @@ -665,7 +666,7 @@ func (c *CheckpointAdvancer) tick(ctx context.Context) error { c.taskMu.Lock() defer c.taskMu.Unlock() if c.task == nil || c.isPaused.Load() { - log.Debug("No tasks yet, skipping advancing.") + log.Info("No tasks yet, skipping advancing.") return nil } diff --git a/br/pkg/streamhelper/advancer_test.go b/br/pkg/streamhelper/advancer_test.go index 1b8ef3b01cd5c..37aa697e67791 100644 --- a/br/pkg/streamhelper/advancer_test.go +++ b/br/pkg/streamhelper/advancer_test.go @@ -687,73 +687,47 @@ func TestUnregisterAfterPause(t *testing.T) { }) adv.StartTaskListener(ctx) - // No matter how many times the task is paused, after put a new one the task should run normally - // First sequence: pause -> unregister -> put + // wait for the task to be added + require.Eventually(t, func() bool { + return adv.HasTask() + }, 5*time.Second, 100*time.Millisecond) + + // task is should be paused when global checkpoint is laggeod + // even the global checkpoint is equal to task start ts(not advanced all the time) c.advanceClusterTimeBy(1 * time.Minute) require.NoError(t, adv.OnTick(ctx)) env.PauseTask(ctx, "whole") c.advanceClusterTimeBy(1 * time.Minute) - require.NoError(t, adv.OnTick(ctx)) + require.Error(t, adv.OnTick(ctx), "checkpoint is lagged") env.unregisterTask() env.putTask() - require.NoError(t, adv.OnTick(ctx)) - // Second sequence: put -> pause -> unregister -> put - c.advanceClusterTimeBy(1 * time.Minute) - env.putTask() - env.PauseTask(ctx, "whole") - env.unregisterTask() - env.putTask() - require.NoError(t, adv.OnTick(ctx)) + // wait for the task to be added + require.Eventually(t, func() bool { + return adv.HasTask() + }, 5*time.Second, 100*time.Millisecond) - // Third sequence: put -> pause -> put -> unregister -> put - c.advanceClusterTimeBy(1 * time.Minute) - env.putTask() - env.PauseTask(ctx, "whole") - env.putTask() - require.NoError(t, adv.OnTick(ctx)) - env.unregisterTask() - env.putTask() - require.NoError(t, adv.OnTick(ctx)) + require.Error(t, adv.OnTick(ctx), "checkpoint is lagged") - // Fourth sequence: unregister -> put -> pause -> put -> unregister -> put - c.advanceClusterTimeBy(1 * time.Minute) env.unregisterTask() - env.putTask() - env.PauseTask(ctx, "whole") - time.Sleep(1 * time.Second) - env.putTask() + // wait for the task to be deleted + require.Eventually(t, func() bool { + return !adv.HasTask() + }, 5*time.Second, 100*time.Millisecond) + + // reset + c.advanceClusterTimeBy(-1 * time.Minute) require.NoError(t, adv.OnTick(ctx)) + env.PauseTask(ctx, "whole") + c.advanceClusterTimeBy(1 * time.Minute) env.unregisterTask() env.putTask() - require.NoError(t, adv.OnTick(ctx)) - - // Fifth sequence: multiple rapid operations with put before pause - for i := 0; i < 3; i++ { - c.advanceClusterTimeBy(1 * time.Minute) - env.putTask() - env.PauseTask(ctx, "whole") - env.unregisterTask() - env.putTask() - env.PauseTask(ctx, "whole") - env.putTask() - require.NoError(t, adv.OnTick(ctx)) - } - - // Sixth sequence: rapid alternating put and pause - for i := 0; i < 3; i++ { - c.advanceClusterTimeBy(1 * time.Minute) - env.putTask() - env.PauseTask(ctx, "whole") - env.putTask() - env.PauseTask(ctx, "whole") - env.putTask() - require.NoError(t, adv.OnTick(ctx)) - } + // wait for the task to be add + require.Eventually(t, func() bool { + return adv.HasTask() + }, 5*time.Second, 100*time.Millisecond) - // Final verification - c.advanceClusterTimeBy(1 * time.Minute) - require.NoError(t, adv.OnTick(ctx)) + require.Error(t, adv.OnTick(ctx), "checkpoint is lagged") } // If the start ts is *NOT* lagged, even both the cluster and pd are lagged, the task should run normally. @@ -865,13 +839,18 @@ func TestAddTaskWithLongRunTask2(t *testing.T) { adv.UpdateConfigWith(func(c *config.Config) { c.CheckPointLagLimit = 1 * time.Minute }) + adv.StartTaskListener(ctx) c.advanceClusterTimeBy(3 * time.Minute) c.advanceCheckpointBy(1 * time.Minute) env.advanceCheckpointBy(2 * time.Minute) env.mockPDConnectionError() - adv.StartTaskListener(ctx) - // Try update checkpoint - require.NoError(t, adv.OnTick(ctx)) + // if cannot connect to pd, the checkpoint will be rolled back + // because at this point. the global ts is 2 minutes + // and the local checkpoint ts is 1 minute + require.Error(t, adv.OnTick(ctx), "checkpoint rollback") + + // only when local checkpoint > global ts, the next tick will be normal + c.advanceCheckpointBy(12 * time.Minute) // Verify no err raised require.NoError(t, adv.OnTick(ctx)) } @@ -907,13 +886,13 @@ func TestAddTaskWithLongRunTask3(t *testing.T) { }) // advance cluster time to 4 minutes, and checkpoint to 1 minutes // if start ts equals to checkpoint, the task will not be paused - c.advanceClusterTimeBy(4 * time.Minute) + adv.StartTaskListener(ctx) + c.advanceClusterTimeBy(2 * time.Minute) c.advanceCheckpointBy(1 * time.Minute) env.advanceCheckpointBy(1 * time.Minute) - adv.StartTaskListener(ctx) require.NoError(t, adv.OnTick(ctx)) - // if start ts < checkpoint, the task will be paused + c.advanceClusterTimeBy(2 * time.Minute) c.advanceCheckpointBy(1 * time.Minute) env.advanceCheckpointBy(1 * time.Minute) // Try update checkpoint diff --git a/br/pkg/streamhelper/basic_lib_for_test.go b/br/pkg/streamhelper/basic_lib_for_test.go index 5b5cd35c79021..22a66c18e27af 100644 --- a/br/pkg/streamhelper/basic_lib_for_test.go +++ b/br/pkg/streamhelper/basic_lib_for_test.go @@ -687,7 +687,12 @@ func (t *testEnv) UploadV3GlobalCheckpointForTask(ctx context.Context, _ string, defer t.mu.Unlock() if checkpoint < t.checkpoint { - t.testCtx.Fatalf("checkpoint rolling back (from %d to %d)", t.checkpoint, checkpoint) + log.Error("checkpoint rolling back", + zap.Uint64("from", t.checkpoint), + zap.Uint64("to", checkpoint), + zap.Stack("stack")) + // t.testCtx.Fatalf("checkpoint rolling back (from %d to %d)", t.checkpoint, checkpoint) + return errors.New("checkpoint rolling back") } t.checkpoint = checkpoint return nil @@ -747,6 +752,8 @@ func (t *testEnv) advanceCheckpointBy(duration time.Duration) { t.mu.Lock() defer t.mu.Unlock() + log.Info("advance checkpoint", zap.Duration("duration", duration), zap.Uint64("from", t.checkpoint)) + t.checkpoint = oracle.GoTimeToTS(oracle.GetTimeFromTS(t.checkpoint).Add(duration)) }