From 64820b0519bf3d635197a96307faf34d2952396a Mon Sep 17 00:00:00 2001 From: Wenyi Hu Date: Mon, 6 Jan 2025 10:50:47 -0500 Subject: [PATCH] changefeedccl: enable frontier quantization Previously, all checkpoint timestamps were forwarded precisely to the resolved timestamp. To improve merging of adjacent frontier spans, this patch introduces a frontier quantization mode. In this mode, resolved timestamps from rangefeeds are rounded down to the nearest multiple of the quantization granularity. Rounding down ensures no progress is missed and that we consistently track the lowest possible resolved timestamp. Resolves: #125723 Release note: changefeed.frontier.timestamp_granularity (default off) has been introduced to enable more efficient tracking of resolved timestamps. --- .../changefeedccl/changefeedbase/settings.go | 7 + pkg/ccl/changefeedccl/kvfeed/BUILD.bazel | 1 + pkg/ccl/changefeedccl/kvfeed/kv_feed.go | 83 +++--- pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go | 246 ++++++++++++++++-- .../changefeedccl/kvfeed/physical_kv_feed.go | 35 ++- 5 files changed, 299 insertions(+), 73 deletions(-) diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index 221ef13103f9..0d5284ddf24f 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -335,3 +335,10 @@ var DefaultLaggingRangesThreshold = 3 * time.Minute // DefaultLaggingRangesPollingInterval is the default polling rate at which // lagging ranges are checked and metrics are updated. var DefaultLaggingRangesPollingInterval = 1 * time.Minute + +var Quantize = settings.RegisterDurationSettingWithExplicitUnit( + settings.ApplicationLevel, + "changefeed.frontier.timestamp_granularity", + "the granularity at which changefeed progress are quantized to make tracking more efficient", + 0, +) diff --git a/pkg/ccl/changefeedccl/kvfeed/BUILD.bazel b/pkg/ccl/changefeedccl/kvfeed/BUILD.bazel index b6d5716efe56..d472e929d5e1 100644 --- a/pkg/ccl/changefeedccl/kvfeed/BUILD.bazel +++ b/pkg/ccl/changefeedccl/kvfeed/BUILD.bazel @@ -63,6 +63,7 @@ go_test( "//pkg/kv", "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvpb", + "//pkg/kv/kvserver/rangefeed", "//pkg/roachpb", "//pkg/security/securityassets", "//pkg/security/securitytest", diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index 26efd7b0f7e0..af3c295cc8a4 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -126,6 +126,7 @@ func Run(ctx context.Context, cfg Config) error { cfg.Writer, cfg.Spans, cfg.CheckpointSpans, cfg.CheckpointTimestamp, cfg.SchemaChangeEvents, cfg.SchemaChangePolicy, cfg.NeedsInitialScan, cfg.WithDiff, cfg.WithFiltering, + changefeedbase.Quantize.Get(&cfg.Settings.SV), cfg.ConsumerID, cfg.InitialHighWater, cfg.EndTime, cfg.Codec, @@ -246,17 +247,18 @@ func (e schemaChangeDetectedError) Error() string { } type kvFeed struct { - spans []roachpb.Span - checkpoint []roachpb.Span - checkpointTimestamp hlc.Timestamp - withDiff bool - withFiltering bool - withInitialBackfill bool - consumerID int64 - initialHighWater hlc.Timestamp - endTime hlc.Timestamp - writer kvevent.Writer - codec keys.SQLCodec + spans []roachpb.Span + checkpoint []roachpb.Span + checkpointTimestamp hlc.Timestamp + withFrontierQuantize time.Duration + withDiff bool + withFiltering bool + withInitialBackfill bool + consumerID int64 + initialHighWater hlc.Timestamp + endTime hlc.Timestamp + writer kvevent.Writer + codec keys.SQLCodec onBackfillCallback func() func() rangeObserver kvcoord.RangeObserver @@ -283,6 +285,7 @@ func newKVFeed( schemaChangeEvents changefeedbase.SchemaChangeEventClass, schemaChangePolicy changefeedbase.SchemaChangePolicy, withInitialBackfill, withDiff, withFiltering bool, + withFrontierQuantize time.Duration, consumerID int64, initialHighWater hlc.Timestamp, endTime hlc.Timestamp, @@ -296,26 +299,27 @@ func newKVFeed( knobs TestingKnobs, ) *kvFeed { return &kvFeed{ - writer: writer, - spans: spans, - checkpoint: checkpoint, - checkpointTimestamp: checkpointTimestamp, - withInitialBackfill: withInitialBackfill, - withDiff: withDiff, - withFiltering: withFiltering, - consumerID: consumerID, - initialHighWater: initialHighWater, - endTime: endTime, - schemaChangeEvents: schemaChangeEvents, - schemaChangePolicy: schemaChangePolicy, - codec: codec, - tableFeed: tf, - scanner: sc, - physicalFeed: pff, - bufferFactory: bf, - targets: targets, - timers: ts, - knobs: knobs, + writer: writer, + spans: spans, + checkpoint: checkpoint, + checkpointTimestamp: checkpointTimestamp, + withInitialBackfill: withInitialBackfill, + withDiff: withDiff, + withFiltering: withFiltering, + withFrontierQuantize: withFrontierQuantize, + consumerID: consumerID, + initialHighWater: initialHighWater, + endTime: endTime, + schemaChangeEvents: schemaChangeEvents, + schemaChangePolicy: schemaChangePolicy, + codec: codec, + tableFeed: tf, + scanner: sc, + physicalFeed: pff, + bufferFactory: bf, + targets: targets, + timers: ts, + knobs: knobs, } } @@ -598,14 +602,15 @@ func (f *kvFeed) runUntilTableEvent(ctx context.Context, resumeFrontier span.Fro g := ctxgroup.WithContext(ctx) physicalCfg := rangeFeedConfig{ - Spans: stps, - Frontier: resumeFrontier.Frontier(), - WithDiff: f.withDiff, - WithFiltering: f.withFiltering, - ConsumerID: f.consumerID, - Knobs: f.knobs, - Timers: f.timers, - RangeObserver: f.rangeObserver, + Spans: stps, + Frontier: resumeFrontier.Frontier(), + WithDiff: f.withDiff, + WithFiltering: f.withFiltering, + WithFrontierQuantize: f.withFrontierQuantize, + ConsumerID: f.consumerID, + Knobs: f.knobs, + Timers: f.timers, + RangeObserver: f.rangeObserver, } // The following two synchronous calls works as follows: diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go index a40bb6bd7b4d..835bf9308ab6 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + kvserverrangefeed "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" @@ -34,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/span" "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" @@ -95,22 +97,24 @@ func TestKVFeed(t *testing.T) { } type testCase struct { - name string - needsInitialScan bool - withDiff bool - schemaChangeEvents changefeedbase.SchemaChangeEventClass - schemaChangePolicy changefeedbase.SchemaChangePolicy - initialHighWater hlc.Timestamp - endTime hlc.Timestamp - spans []roachpb.Span - checkpoint []roachpb.Span - events []kvpb.RangeFeedEvent + name string + needsInitialScan bool + withDiff bool + withFrontierQuantize time.Duration + schemaChangeEvents changefeedbase.SchemaChangeEventClass + schemaChangePolicy changefeedbase.SchemaChangePolicy + initialHighWater hlc.Timestamp + endTime hlc.Timestamp + spans []roachpb.Span + checkpoint []roachpb.Span + events []kvpb.RangeFeedEvent descs []catalog.TableDescriptor - expScans []hlc.Timestamp - expEvents int - expErrRE string + expScans []hlc.Timestamp + expEvents []kvpb.RangeFeedEvent + expEventsCount int + expErrRE string } st := cluster.MakeTestingClusterSettings() runTest := func(t *testing.T, tc testCase) { @@ -143,7 +147,7 @@ func TestKVFeed(t *testing.T) { st := timers.New(time.Minute).GetOrCreateScopedTimers("") f := newKVFeed(buf, tc.spans, tc.checkpoint, hlc.Timestamp{}, tc.schemaChangeEvents, tc.schemaChangePolicy, - tc.needsInitialScan, tc.withDiff, true, /* withFiltering */ + tc.needsInitialScan, tc.withDiff, true /* withFiltering */, tc.withFrontierQuantize, 0, /* consumerID */ tc.initialHighWater, tc.endTime, codec, @@ -173,8 +177,11 @@ func TestKVFeed(t *testing.T) { // Assert that number of events emitted from the kvfeed matches what we // specified in the testcase. testG.GoCtx(func(ctx context.Context) error { - for events := 0; events < tc.expEvents; events++ { - _, err := buf.Get(ctx) + for eventIdx := 0; eventIdx < tc.expEventsCount; eventIdx++ { + e, err := buf.Get(ctx) + if tc.expEvents != nil { + assert.Equal(t, tc.expEvents[eventIdx], *e.Raw()) + } assert.NoError(t, err) } return nil @@ -229,7 +236,7 @@ func TestKVFeed(t *testing.T) { expScans: []hlc.Timestamp{ ts(2), }, - expEvents: 1, + expEventsCount: 1, }, { name: "no events - full checkpoint", @@ -246,8 +253,8 @@ func TestKVFeed(t *testing.T) { events: []kvpb.RangeFeedEvent{ kvEvent(codec, 42, "a", "b", ts(3)), }, - expScans: []hlc.Timestamp{}, - expEvents: 1, + expScans: []hlc.Timestamp{}, + expEventsCount: 1, }, { name: "no events - partial backfill", @@ -268,7 +275,7 @@ func TestKVFeed(t *testing.T) { expScans: []hlc.Timestamp{ ts(2), }, - expEvents: 2, + expEventsCount: 2, }, { name: "one table event - backfill", @@ -294,7 +301,7 @@ func TestKVFeed(t *testing.T) { makeTableDesc(42, 1, ts(1), 2, 1), addColumnDropBackfillMutation(makeTableDesc(42, 2, ts(3), 1, 1)), }, - expEvents: 5, + expEventsCount: 5, }, { name: "one table event - skip", @@ -318,7 +325,7 @@ func TestKVFeed(t *testing.T) { makeTableDesc(42, 1, ts(1), 2, 1), addColumnDropBackfillMutation(makeTableDesc(42, 2, ts(3), 1, 1)), }, - expEvents: 4, + expEventsCount: 4, }, { name: "one table event - stop", @@ -343,8 +350,56 @@ func TestKVFeed(t *testing.T) { makeTableDesc(42, 1, ts(1), 2, 1), addColumnDropBackfillMutation(makeTableDesc(42, 2, ts(4), 1, 1)), }, - expEvents: 2, - expErrRE: "schema change ...", + expEventsCount: 2, + expErrRE: "schema change ...", + }, + { + name: "checkpoint events - with quantize", + schemaChangeEvents: changefeedbase.OptSchemaChangeEventClassDefault, + schemaChangePolicy: changefeedbase.OptSchemaChangePolicyBackfill, + needsInitialScan: false, + withFrontierQuantize: time.Duration(10), + initialHighWater: hlc.Timestamp{WallTime: 10}, + spans: []roachpb.Span{ + tableSpan(codec, 42), + }, + events: []kvpb.RangeFeedEvent{ + checkpointEvent(tableSpan(codec, 42), hlc.Timestamp{WallTime: 20}), + checkpointEvent(tableSpan(codec, 42), hlc.Timestamp{WallTime: 20, Logical: 1}), + checkpointEvent(tableSpan(codec, 42), hlc.Timestamp{WallTime: 27, Logical: 1}), + checkpointEvent(tableSpan(codec, 42), hlc.Timestamp{WallTime: 43, Logical: 3}), + }, + expEvents: []kvpb.RangeFeedEvent{ + checkpointEvent(tableSpan(codec, 42), hlc.Timestamp{WallTime: 20}), + checkpointEvent(tableSpan(codec, 42), hlc.Timestamp{WallTime: 20, Logical: 0}), + checkpointEvent(tableSpan(codec, 42), hlc.Timestamp{WallTime: 20, Logical: 0}), + checkpointEvent(tableSpan(codec, 42), hlc.Timestamp{WallTime: 40, Logical: 0}), + }, + expEventsCount: 4, + }, + { + name: "checkpoint events - without quantize", + schemaChangeEvents: changefeedbase.OptSchemaChangeEventClassDefault, + schemaChangePolicy: changefeedbase.OptSchemaChangePolicyBackfill, + needsInitialScan: false, + withFrontierQuantize: time.Duration(0), + initialHighWater: hlc.Timestamp{WallTime: 10}, + spans: []roachpb.Span{ + tableSpan(codec, 42), + }, + events: []kvpb.RangeFeedEvent{ + checkpointEvent(tableSpan(codec, 42), hlc.Timestamp{WallTime: 20}), + checkpointEvent(tableSpan(codec, 42), hlc.Timestamp{WallTime: 20, Logical: 1}), + checkpointEvent(tableSpan(codec, 42), hlc.Timestamp{WallTime: 27, Logical: 1}), + checkpointEvent(tableSpan(codec, 42), hlc.Timestamp{WallTime: 43, Logical: 3}), + }, + expEvents: []kvpb.RangeFeedEvent{ + checkpointEvent(tableSpan(codec, 42), hlc.Timestamp{WallTime: 20}), + checkpointEvent(tableSpan(codec, 42), hlc.Timestamp{WallTime: 20, Logical: 1}), + checkpointEvent(tableSpan(codec, 42), hlc.Timestamp{WallTime: 27, Logical: 1}), + checkpointEvent(tableSpan(codec, 42), hlc.Timestamp{WallTime: 43, Logical: 3}), + }, + expEventsCount: 4, }, } { t.Run(tc.name, func(t *testing.T) { @@ -645,3 +700,146 @@ func TestCopyFromSourceToDestUntilTableEvent(t *testing.T) { }) } } + +// TestFrontierQuantization tests that the frontier quantization works as +// expected. It should quantize timestamps and merge adjacent spans with same +// timestamp together. +func TestFrontierQuantization(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + makeCheckpointEvent := func(key []byte, endKey []byte, ts int, logical int) *kvpb.RangeFeedEvent { + return &kvpb.RangeFeedEvent{ + Checkpoint: &kvpb.RangeFeedCheckpoint{ + Span: roachpb.Span{Key: key, EndKey: endKey}, + ResolvedTS: hlc.Timestamp{WallTime: int64(ts), Logical: 0}, + }, + } + } + + for name, tc := range map[string]struct { + spans []roachpb.Span + withFrontierQuantization bool + events []*kvpb.RangeFeedEvent + expectedFrontierEntries int + expectedFrontier hlc.Timestamp + }{ + "overlapping spans with close ts with quantization": { + withFrontierQuantization: true, + spans: []roachpb.Span{{Key: []byte("a"), EndKey: []byte("d")}}, + events: []*kvpb.RangeFeedEvent{ + makeCheckpointEvent([]byte("a"), []byte("b"), 12, 2), + makeCheckpointEvent([]byte("b"), []byte("c"), 13, 1), + makeCheckpointEvent([]byte("c"), []byte("d"), 14, 0), + }, + expectedFrontierEntries: 1, + expectedFrontier: hlc.Timestamp{WallTime: int64(10)}, + }, + "overlapping spans with close ts without quantization": { + withFrontierQuantization: false, + spans: []roachpb.Span{{Key: []byte("a"), EndKey: []byte("d")}}, + events: []*kvpb.RangeFeedEvent{ + makeCheckpointEvent([]byte("a"), []byte("b"), 12, 0), + makeCheckpointEvent([]byte("b"), []byte("c"), 13, 0), + makeCheckpointEvent([]byte("c"), []byte("d"), 14, 0), + }, + expectedFrontierEntries: 3, + expectedFrontier: hlc.Timestamp{WallTime: int64(12)}, + }, + "non-overlapping spans with close ts with quantization": { + withFrontierQuantization: true, + spans: []roachpb.Span{{Key: []byte("a"), EndKey: []byte("z")}}, + events: []*kvpb.RangeFeedEvent{ + makeCheckpointEvent([]byte("a"), []byte("b"), 12, 3), + makeCheckpointEvent([]byte("b"), []byte("c"), 13, 2), + makeCheckpointEvent([]byte("c"), []byte("d"), 14, 1), + makeCheckpointEvent([]byte("k"), []byte("m"), 12, 0), + }, + expectedFrontierEntries: 4, + expectedFrontier: hlc.Timestamp{WallTime: int64(0)}, + }, + "non-overlapping spans with close ts without quantization": { + withFrontierQuantization: false, + spans: []roachpb.Span{{Key: []byte("a"), EndKey: []byte("z")}}, + events: []*kvpb.RangeFeedEvent{ + makeCheckpointEvent([]byte("a"), []byte("b"), 12, 3), + makeCheckpointEvent([]byte("b"), []byte("c"), 13, 2), + makeCheckpointEvent([]byte("c"), []byte("d"), 14, 1), + makeCheckpointEvent([]byte("k"), []byte("m"), 12, 0), + }, + expectedFrontierEntries: 6, + expectedFrontier: hlc.Timestamp{WallTime: int64(0)}, + }, + } { + t.Run(name, func(t *testing.T) { + frontier, err := span.MakeFrontier(tc.spans...) + require.NoError(t, err) + const quantize = time.Duration(10) + for _, e := range tc.events { + if tc.withFrontierQuantization { + e.Checkpoint.ResolvedTS = quantizeTS(e.Checkpoint.ResolvedTS, quantize) + } + _, err := frontier.Forward(e.Checkpoint.Span, e.Checkpoint.ResolvedTS) + require.NoError(t, err) + } + frontier.Entries(func(sp roachpb.Span, ts hlc.Timestamp) span.OpResult { + t.Logf("span: %v, ts: %v\n", sp, ts) + return false + }) + require.Equal(t, tc.expectedFrontierEntries, frontier.Len()) + require.Equal(t, tc.expectedFrontier, frontier.Frontier()) + }) + } +} + +// TestFrontierQuantizationRand makes two frontiers with the same set of spans +// and one with quantized ts and the other without. The test makes sure that the +// highwater tracked by the frontier with quantized ts is <= the other, and the +// number of spans being tracked is >= the other. +func TestFrontierQuantizationRand(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + makeCheckpointEvent := func(key []byte, endKey []byte, ts int, logical int32) *kvpb.RangeFeedEvent { + return &kvpb.RangeFeedEvent{ + Checkpoint: &kvpb.RangeFeedCheckpoint{ + Span: roachpb.Span{Key: key, EndKey: endKey}, + ResolvedTS: hlc.Timestamp{WallTime: int64(ts), Logical: logical}, + }, + } + } + + const numOfSpans = 100 + events := make([]*kvpb.RangeFeedEvent, numOfSpans) + rng, _ := randutil.NewTestRand() + for i, s := range kvserverrangefeed.GenerateRandomizedSpans(rng, numOfSpans) { + ts := kvserverrangefeed.GenerateRandomizedTs(rng, time.Minute.Nanoseconds()) + events[i] = makeCheckpointEvent(s.Key, s.EndKey, int(ts.WallTime), int32(rng.Intn(100))) + } + + const quantize = time.Duration(10) + quantizedFrontier, err := span.MakeFrontier([]roachpb.Span{{Key: keys.MinKey, EndKey: keys.MaxKey}}...) + require.NoError(t, err) + for _, e := range events { + quantizedTs := quantizeTS(e.Checkpoint.ResolvedTS, quantize) + _, err := quantizedFrontier.Forward(e.Checkpoint.Span, quantizedTs) + require.NoError(t, err) + } + quantizedEntries := quantizedFrontier.Len() + quantizedHW := quantizedFrontier.Frontier() + + frontier, err := span.MakeFrontier([]roachpb.Span{{Key: keys.MinKey, EndKey: keys.MaxKey}}...) + require.NoError(t, err) + for _, e := range events { + _, err := frontier.Forward(e.Checkpoint.Span, e.Checkpoint.ResolvedTS) + require.NoError(t, err) + } + entries := frontier.Len() + hw := frontier.Frontier() + require.LessOrEqual(t, quantizedEntries, entries) + if quantizedHW.WallTime == hw.WallTime { + require.LessOrEqual(t, quantizedHW.Logical, hw.Logical) + } else { + require.LessOrEqual(t, quantizedHW.WallTime, hw.WallTime) + } +} diff --git a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go index e2402b9eef1d..a2f24f6aa6e4 100644 --- a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go @@ -7,6 +7,7 @@ package kvfeed import ( "context" + "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/timers" @@ -27,14 +28,15 @@ type physicalFeedFactory interface { // rangeFeedConfig contains configuration options for creating a rangefeed. // It provides an abstraction over the actual rangefeed API. type rangeFeedConfig struct { - Frontier hlc.Timestamp - Spans []kvcoord.SpanTimePair - WithDiff bool - WithFiltering bool - ConsumerID int64 - RangeObserver kvcoord.RangeObserver - Knobs TestingKnobs - Timers *timers.ScopedTimers + Frontier hlc.Timestamp + Spans []kvcoord.SpanTimePair + WithDiff bool + WithFiltering bool + WithFrontierQuantize time.Duration + ConsumerID int64 + RangeObserver kvcoord.RangeObserver + Knobs TestingKnobs + Timers *timers.ScopedTimers } // rangefeedFactory is a function that creates and runs a rangefeed. @@ -107,6 +109,15 @@ func (p rangefeedFactory) Run(ctx context.Context, sink kvevent.Writer, cfg rang return g.Wait() } +// quantizeTS returns a new timestamp with the walltime rounded down to the +// nearest multiple of the quantization granularity. +func quantizeTS(ts hlc.Timestamp, granularity time.Duration) hlc.Timestamp { + return hlc.Timestamp{ + WallTime: ts.WallTime - ts.WallTime%int64(granularity), + Logical: 0, + } +} + // addEventsToBuffer consumes rangefeed events from `p.eventCh`, transforms // them to kvevent.Event's, and pushes them into `p.memBuf`. func (p *rangefeed) addEventsToBuffer(ctx context.Context) error { @@ -128,7 +139,11 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error { } stop() case *kvpb.RangeFeedCheckpoint: - if !t.ResolvedTS.IsEmpty() && t.ResolvedTS.Less(p.cfg.Frontier) { + ev := e.RangeFeedEvent + if quantize := p.cfg.WithFrontierQuantize; quantize != 0 { + ev.Checkpoint.ResolvedTS = quantizeTS(ev.Checkpoint.ResolvedTS, quantize) + } + if resolvedTs := ev.Checkpoint.ResolvedTS; resolvedTs.IsEmpty() && resolvedTs.Less(p.cfg.Frontier) { // RangeFeed happily forwards any closed timestamps it receives as // soon as there are no outstanding intents under them. // Changefeeds don't care about these at all, so throw them out. @@ -139,7 +154,7 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error { } stop := p.st.RangefeedBufferCheckpoint.Start() if err := p.memBuf.Add( - ctx, kvevent.MakeResolvedEvent(e.RangeFeedEvent, jobspb.ResolvedSpan_NONE), + ctx, kvevent.MakeResolvedEvent(ev, jobspb.ResolvedSpan_NONE), ); err != nil { return err }