diff --git a/pkg/ccl/changefeedccl/changefeedbase/settings.go b/pkg/ccl/changefeedccl/changefeedbase/settings.go index 221ef13103f9..0d5284ddf24f 100644 --- a/pkg/ccl/changefeedccl/changefeedbase/settings.go +++ b/pkg/ccl/changefeedccl/changefeedbase/settings.go @@ -335,3 +335,10 @@ var DefaultLaggingRangesThreshold = 3 * time.Minute // DefaultLaggingRangesPollingInterval is the default polling rate at which // lagging ranges are checked and metrics are updated. var DefaultLaggingRangesPollingInterval = 1 * time.Minute + +var Quantize = settings.RegisterDurationSettingWithExplicitUnit( + settings.ApplicationLevel, + "changefeed.frontier.timestamp_granularity", + "the granularity at which changefeed progress are quantized to make tracking more efficient", + 0, +) diff --git a/pkg/ccl/changefeedccl/kvfeed/BUILD.bazel b/pkg/ccl/changefeedccl/kvfeed/BUILD.bazel index b6d5716efe56..d472e929d5e1 100644 --- a/pkg/ccl/changefeedccl/kvfeed/BUILD.bazel +++ b/pkg/ccl/changefeedccl/kvfeed/BUILD.bazel @@ -63,6 +63,7 @@ go_test( "//pkg/kv", "//pkg/kv/kvclient/kvcoord", "//pkg/kv/kvpb", + "//pkg/kv/kvserver/rangefeed", "//pkg/roachpb", "//pkg/security/securityassets", "//pkg/security/securitytest", diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go index 26efd7b0f7e0..af3c295cc8a4 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed.go @@ -126,6 +126,7 @@ func Run(ctx context.Context, cfg Config) error { cfg.Writer, cfg.Spans, cfg.CheckpointSpans, cfg.CheckpointTimestamp, cfg.SchemaChangeEvents, cfg.SchemaChangePolicy, cfg.NeedsInitialScan, cfg.WithDiff, cfg.WithFiltering, + changefeedbase.Quantize.Get(&cfg.Settings.SV), cfg.ConsumerID, cfg.InitialHighWater, cfg.EndTime, cfg.Codec, @@ -246,17 +247,18 @@ func (e schemaChangeDetectedError) Error() string { } type kvFeed struct { - spans []roachpb.Span - checkpoint []roachpb.Span - checkpointTimestamp hlc.Timestamp - withDiff bool - withFiltering bool - withInitialBackfill bool - consumerID int64 - initialHighWater hlc.Timestamp - endTime hlc.Timestamp - writer kvevent.Writer - codec keys.SQLCodec + spans []roachpb.Span + checkpoint []roachpb.Span + checkpointTimestamp hlc.Timestamp + withFrontierQuantize time.Duration + withDiff bool + withFiltering bool + withInitialBackfill bool + consumerID int64 + initialHighWater hlc.Timestamp + endTime hlc.Timestamp + writer kvevent.Writer + codec keys.SQLCodec onBackfillCallback func() func() rangeObserver kvcoord.RangeObserver @@ -283,6 +285,7 @@ func newKVFeed( schemaChangeEvents changefeedbase.SchemaChangeEventClass, schemaChangePolicy changefeedbase.SchemaChangePolicy, withInitialBackfill, withDiff, withFiltering bool, + withFrontierQuantize time.Duration, consumerID int64, initialHighWater hlc.Timestamp, endTime hlc.Timestamp, @@ -296,26 +299,27 @@ func newKVFeed( knobs TestingKnobs, ) *kvFeed { return &kvFeed{ - writer: writer, - spans: spans, - checkpoint: checkpoint, - checkpointTimestamp: checkpointTimestamp, - withInitialBackfill: withInitialBackfill, - withDiff: withDiff, - withFiltering: withFiltering, - consumerID: consumerID, - initialHighWater: initialHighWater, - endTime: endTime, - schemaChangeEvents: schemaChangeEvents, - schemaChangePolicy: schemaChangePolicy, - codec: codec, - tableFeed: tf, - scanner: sc, - physicalFeed: pff, - bufferFactory: bf, - targets: targets, - timers: ts, - knobs: knobs, + writer: writer, + spans: spans, + checkpoint: checkpoint, + checkpointTimestamp: checkpointTimestamp, + withInitialBackfill: withInitialBackfill, + withDiff: withDiff, + withFiltering: withFiltering, + withFrontierQuantize: withFrontierQuantize, + consumerID: consumerID, + initialHighWater: initialHighWater, + endTime: endTime, + schemaChangeEvents: schemaChangeEvents, + schemaChangePolicy: schemaChangePolicy, + codec: codec, + tableFeed: tf, + scanner: sc, + physicalFeed: pff, + bufferFactory: bf, + targets: targets, + timers: ts, + knobs: knobs, } } @@ -598,14 +602,15 @@ func (f *kvFeed) runUntilTableEvent(ctx context.Context, resumeFrontier span.Fro g := ctxgroup.WithContext(ctx) physicalCfg := rangeFeedConfig{ - Spans: stps, - Frontier: resumeFrontier.Frontier(), - WithDiff: f.withDiff, - WithFiltering: f.withFiltering, - ConsumerID: f.consumerID, - Knobs: f.knobs, - Timers: f.timers, - RangeObserver: f.rangeObserver, + Spans: stps, + Frontier: resumeFrontier.Frontier(), + WithDiff: f.withDiff, + WithFiltering: f.withFiltering, + WithFrontierQuantize: f.withFrontierQuantize, + ConsumerID: f.consumerID, + Knobs: f.knobs, + Timers: f.timers, + RangeObserver: f.rangeObserver, } // The following two synchronous calls works as follows: diff --git a/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go b/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go index a40bb6bd7b4d..8c61f88b62f9 100644 --- a/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go +++ b/pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go @@ -22,6 +22,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvcoord" "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + kvserverrangefeed "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/catalog" @@ -34,6 +35,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/mon" + "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/span" "github.com/cockroachdb/errors" "github.com/stretchr/testify/assert" @@ -95,22 +97,24 @@ func TestKVFeed(t *testing.T) { } type testCase struct { - name string - needsInitialScan bool - withDiff bool - schemaChangeEvents changefeedbase.SchemaChangeEventClass - schemaChangePolicy changefeedbase.SchemaChangePolicy - initialHighWater hlc.Timestamp - endTime hlc.Timestamp - spans []roachpb.Span - checkpoint []roachpb.Span - events []kvpb.RangeFeedEvent + name string + needsInitialScan bool + withDiff bool + withFrontierQuantize time.Duration + schemaChangeEvents changefeedbase.SchemaChangeEventClass + schemaChangePolicy changefeedbase.SchemaChangePolicy + initialHighWater hlc.Timestamp + endTime hlc.Timestamp + spans []roachpb.Span + checkpoint []roachpb.Span + events []kvpb.RangeFeedEvent descs []catalog.TableDescriptor - expScans []hlc.Timestamp - expEvents int - expErrRE string + expScans []hlc.Timestamp + expEvents []kvpb.RangeFeedEvent + expEventsCount int + expErrRE string } st := cluster.MakeTestingClusterSettings() runTest := func(t *testing.T, tc testCase) { @@ -143,7 +147,7 @@ func TestKVFeed(t *testing.T) { st := timers.New(time.Minute).GetOrCreateScopedTimers("") f := newKVFeed(buf, tc.spans, tc.checkpoint, hlc.Timestamp{}, tc.schemaChangeEvents, tc.schemaChangePolicy, - tc.needsInitialScan, tc.withDiff, true, /* withFiltering */ + tc.needsInitialScan, tc.withDiff, true /* withFiltering */, tc.withFrontierQuantize, 0, /* consumerID */ tc.initialHighWater, tc.endTime, codec, @@ -173,8 +177,11 @@ func TestKVFeed(t *testing.T) { // Assert that number of events emitted from the kvfeed matches what we // specified in the testcase. testG.GoCtx(func(ctx context.Context) error { - for events := 0; events < tc.expEvents; events++ { - _, err := buf.Get(ctx) + for eventIdx := 0; eventIdx < tc.expEventsCount; eventIdx++ { + e, err := buf.Get(ctx) + if tc.expEvents != nil { + assert.Equal(t, tc.expEvents[eventIdx], *e.Raw()) + } assert.NoError(t, err) } return nil @@ -229,7 +236,7 @@ func TestKVFeed(t *testing.T) { expScans: []hlc.Timestamp{ ts(2), }, - expEvents: 1, + expEventsCount: 1, }, { name: "no events - full checkpoint", @@ -246,8 +253,8 @@ func TestKVFeed(t *testing.T) { events: []kvpb.RangeFeedEvent{ kvEvent(codec, 42, "a", "b", ts(3)), }, - expScans: []hlc.Timestamp{}, - expEvents: 1, + expScans: []hlc.Timestamp{}, + expEventsCount: 1, }, { name: "no events - partial backfill", @@ -268,7 +275,7 @@ func TestKVFeed(t *testing.T) { expScans: []hlc.Timestamp{ ts(2), }, - expEvents: 2, + expEventsCount: 2, }, { name: "one table event - backfill", @@ -294,7 +301,7 @@ func TestKVFeed(t *testing.T) { makeTableDesc(42, 1, ts(1), 2, 1), addColumnDropBackfillMutation(makeTableDesc(42, 2, ts(3), 1, 1)), }, - expEvents: 5, + expEventsCount: 5, }, { name: "one table event - skip", @@ -318,7 +325,7 @@ func TestKVFeed(t *testing.T) { makeTableDesc(42, 1, ts(1), 2, 1), addColumnDropBackfillMutation(makeTableDesc(42, 2, ts(3), 1, 1)), }, - expEvents: 4, + expEventsCount: 4, }, { name: "one table event - stop", @@ -343,8 +350,56 @@ func TestKVFeed(t *testing.T) { makeTableDesc(42, 1, ts(1), 2, 1), addColumnDropBackfillMutation(makeTableDesc(42, 2, ts(4), 1, 1)), }, - expEvents: 2, - expErrRE: "schema change ...", + expEventsCount: 2, + expErrRE: "schema change ...", + }, + { + name: "checkpoint events - with quantize", + schemaChangeEvents: changefeedbase.OptSchemaChangeEventClassDefault, + schemaChangePolicy: changefeedbase.OptSchemaChangePolicyBackfill, + needsInitialScan: false, + withFrontierQuantize: time.Duration(10), + initialHighWater: ts(2), + spans: []roachpb.Span{ + tableSpan(codec, 42), + }, + events: []kvpb.RangeFeedEvent{ + checkpointEvent(tableSpan(codec, 42), ts(2)), + checkpointEvent(tableSpan(codec, 42), hlc.Timestamp{WallTime: ts(2).WallTime, Logical: 1}), + checkpointEvent(tableSpan(codec, 42), hlc.Timestamp{WallTime: ts(3).WallTime + 5, Logical: 1}), + checkpointEvent(tableSpan(codec, 42), hlc.Timestamp{WallTime: ts(4).WallTime, Logical: 3}), + }, + expEvents: []kvpb.RangeFeedEvent{ + checkpointEvent(tableSpan(codec, 42), hlc.Timestamp{WallTime: ts(2).WallTime, Logical: 0}), + checkpointEvent(tableSpan(codec, 42), hlc.Timestamp{WallTime: ts(2).WallTime, Logical: 0}), + checkpointEvent(tableSpan(codec, 42), hlc.Timestamp{WallTime: ts(3).WallTime, Logical: 0}), + checkpointEvent(tableSpan(codec, 42), hlc.Timestamp{WallTime: ts(4).WallTime, Logical: 0}), + }, + expEventsCount: 4, + }, + { + name: "checkpoint events - without quantize", + schemaChangeEvents: changefeedbase.OptSchemaChangeEventClassDefault, + schemaChangePolicy: changefeedbase.OptSchemaChangePolicyBackfill, + needsInitialScan: false, + withFrontierQuantize: time.Duration(0), + initialHighWater: ts(2), + spans: []roachpb.Span{ + tableSpan(codec, 42), + }, + events: []kvpb.RangeFeedEvent{ + checkpointEvent(tableSpan(codec, 42), ts(2)), + checkpointEvent(tableSpan(codec, 42), hlc.Timestamp{WallTime: ts(2).WallTime, Logical: 1}), + checkpointEvent(tableSpan(codec, 42), hlc.Timestamp{WallTime: ts(3).WallTime + 5, Logical: 1}), + checkpointEvent(tableSpan(codec, 42), hlc.Timestamp{WallTime: ts(4).WallTime, Logical: 3}), + }, + expEvents: []kvpb.RangeFeedEvent{ + checkpointEvent(tableSpan(codec, 42), ts(2)), + checkpointEvent(tableSpan(codec, 42), hlc.Timestamp{WallTime: ts(2).WallTime, Logical: 1}), + checkpointEvent(tableSpan(codec, 42), hlc.Timestamp{WallTime: ts(3).WallTime + 5, Logical: 1}), + checkpointEvent(tableSpan(codec, 42), hlc.Timestamp{WallTime: ts(4).WallTime, Logical: 3}), + }, + expEventsCount: 4, }, } { t.Run(tc.name, func(t *testing.T) { @@ -645,3 +700,151 @@ func TestCopyFromSourceToDestUntilTableEvent(t *testing.T) { }) } } + +func TestFrontierQuantization(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + mockQuantization := func(ts hlc.Timestamp, quantize time.Duration) hlc.Timestamp { + ts.Logical = 0 + ts.WallTime -= ts.WallTime % int64(quantize) + return ts + } + + makeCheckpointEvent := func(key []byte, endKey []byte, ts int) *kvpb.RangeFeedEvent { + return &kvpb.RangeFeedEvent{ + Checkpoint: &kvpb.RangeFeedCheckpoint{ + Span: roachpb.Span{Key: key, EndKey: endKey}, + ResolvedTS: hlc.Timestamp{WallTime: int64(ts)}, + }, + } + } + + for name, tc := range map[string]struct { + spans []roachpb.Span + withFrontierQuantization bool + events []*kvpb.RangeFeedEvent + expectedFrontierEntries int + expectedFrontier hlc.Timestamp + }{ + "overlapping spans with close ts with quantization": { + withFrontierQuantization: true, + spans: []roachpb.Span{{Key: []byte("a"), EndKey: []byte("d")}}, + events: []*kvpb.RangeFeedEvent{ + makeCheckpointEvent([]byte("a"), []byte("b"), 12), + makeCheckpointEvent([]byte("b"), []byte("c"), 13), + makeCheckpointEvent([]byte("c"), []byte("d"), 14), + }, + expectedFrontierEntries: 1, + expectedFrontier: hlc.Timestamp{WallTime: int64(10)}, + }, + "overlapping spans with close ts without quantization": { + withFrontierQuantization: false, + spans: []roachpb.Span{{Key: []byte("a"), EndKey: []byte("d")}}, + events: []*kvpb.RangeFeedEvent{ + makeCheckpointEvent([]byte("a"), []byte("b"), 12), + makeCheckpointEvent([]byte("b"), []byte("c"), 13), + makeCheckpointEvent([]byte("c"), []byte("d"), 14), + }, + expectedFrontierEntries: 3, + expectedFrontier: hlc.Timestamp{WallTime: int64(12)}, + }, + "non-overlapping spans with close ts with quantization": { + withFrontierQuantization: true, + spans: []roachpb.Span{{Key: []byte("a"), EndKey: []byte("z")}}, + events: []*kvpb.RangeFeedEvent{ + makeCheckpointEvent([]byte("a"), []byte("b"), 12), + makeCheckpointEvent([]byte("b"), []byte("c"), 13), + makeCheckpointEvent([]byte("c"), []byte("d"), 14), + makeCheckpointEvent([]byte("k"), []byte("m"), 12), + }, + expectedFrontierEntries: 4, + expectedFrontier: hlc.Timestamp{WallTime: int64(0)}, + }, + "non-overlapping spans with close ts without quantization": { + withFrontierQuantization: false, + spans: []roachpb.Span{{Key: []byte("a"), EndKey: []byte("z")}}, + events: []*kvpb.RangeFeedEvent{ + makeCheckpointEvent([]byte("a"), []byte("b"), 12), + makeCheckpointEvent([]byte("b"), []byte("c"), 13), + makeCheckpointEvent([]byte("c"), []byte("d"), 14), + makeCheckpointEvent([]byte("k"), []byte("m"), 12), + }, + expectedFrontierEntries: 6, + expectedFrontier: hlc.Timestamp{WallTime: int64(0)}, + }, + } { + t.Run(name, func(t *testing.T) { + frontier, err := span.MakeFrontier(tc.spans...) + require.NoError(t, err) + const quantize = time.Duration(10) + for _, e := range tc.events { + if tc.withFrontierQuantization { + e.Checkpoint.ResolvedTS = mockQuantization(e.Checkpoint.ResolvedTS, quantize) + } + _, err := frontier.Forward(e.Checkpoint.Span, e.Checkpoint.ResolvedTS) + require.NoError(t, err) + } + frontier.Entries(func(sp roachpb.Span, ts hlc.Timestamp) span.OpResult { + t.Logf("span: %v, ts: %v\n", sp, ts) + return false + }) + require.Equal(t, tc.expectedFrontierEntries, frontier.Len()) + require.Equal(t, tc.expectedFrontier, frontier.Frontier()) + }) + } +} + +func TestFrontierQuantizationRand(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + mockQuantization := func(ts hlc.Timestamp, quantize time.Duration) hlc.Timestamp { + ts.Logical = 0 + ts.WallTime -= ts.WallTime % int64(quantize) + return ts + } + + makeCheckpointEvent := func(key []byte, endKey []byte, ts int, logical int32) *kvpb.RangeFeedEvent { + return &kvpb.RangeFeedEvent{ + Checkpoint: &kvpb.RangeFeedCheckpoint{ + Span: roachpb.Span{Key: key, EndKey: endKey}, + ResolvedTS: hlc.Timestamp{WallTime: int64(ts), Logical: logical}, + }, + } + } + + const numOfSpans = 100 + events := make([]*kvpb.RangeFeedEvent, numOfSpans) + rng, _ := randutil.NewTestRand() + for i, s := range kvserverrangefeed.GenerateRandomizedSpans(rng, numOfSpans) { + ts := kvserverrangefeed.GenerateRandomizedTs(rng, time.Minute.Nanoseconds()) + events[i] = makeCheckpointEvent(s.Key, s.EndKey, int(ts.WallTime), int32(rng.Intn(100))) + } + + const quantize = time.Duration(10) + quantizedFrontier, err := span.MakeFrontier([]roachpb.Span{{Key: keys.MinKey, EndKey: keys.MaxKey}}...) + require.NoError(t, err) + for _, e := range events { + quantizedTs := mockQuantization(e.Checkpoint.ResolvedTS, quantize) + _, err := quantizedFrontier.Forward(e.Checkpoint.Span, quantizedTs) + require.NoError(t, err) + } + quantizedEntries := quantizedFrontier.Len() + quantizedHW := quantizedFrontier.Frontier() + + frontier, err := span.MakeFrontier([]roachpb.Span{{Key: keys.MinKey, EndKey: keys.MaxKey}}...) + require.NoError(t, err) + for _, e := range events { + _, err := frontier.Forward(e.Checkpoint.Span, e.Checkpoint.ResolvedTS) + require.NoError(t, err) + } + entries := frontier.Len() + hw := frontier.Frontier() + require.LessOrEqual(t, quantizedEntries, entries) + if quantizedHW.WallTime == hw.WallTime { + require.LessOrEqual(t, quantizedHW.Logical, hw.Logical) + } else { + require.LessOrEqual(t, quantizedHW.WallTime, hw.WallTime) + } +} diff --git a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go index e2402b9eef1d..838614efcefe 100644 --- a/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go +++ b/pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go @@ -7,6 +7,7 @@ package kvfeed import ( "context" + "time" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/timers" @@ -27,14 +28,15 @@ type physicalFeedFactory interface { // rangeFeedConfig contains configuration options for creating a rangefeed. // It provides an abstraction over the actual rangefeed API. type rangeFeedConfig struct { - Frontier hlc.Timestamp - Spans []kvcoord.SpanTimePair - WithDiff bool - WithFiltering bool - ConsumerID int64 - RangeObserver kvcoord.RangeObserver - Knobs TestingKnobs - Timers *timers.ScopedTimers + Frontier hlc.Timestamp + Spans []kvcoord.SpanTimePair + WithDiff bool + WithFiltering bool + WithFrontierQuantize time.Duration + ConsumerID int64 + RangeObserver kvcoord.RangeObserver + Knobs TestingKnobs + Timers *timers.ScopedTimers } // rangefeedFactory is a function that creates and runs a rangefeed. @@ -128,7 +130,12 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error { } stop() case *kvpb.RangeFeedCheckpoint: - if !t.ResolvedTS.IsEmpty() && t.ResolvedTS.Less(p.cfg.Frontier) { + ev := e.RangeFeedEvent + if p.cfg.WithFrontierQuantize != 0 { + ev.Checkpoint.ResolvedTS.Logical = 0 + ev.Checkpoint.ResolvedTS.WallTime -= ev.Checkpoint.ResolvedTS.WallTime % int64(p.cfg.WithFrontierQuantize) + } + if resolvedTs := ev.Checkpoint.ResolvedTS; resolvedTs.IsEmpty() && resolvedTs.Less(p.cfg.Frontier) { // RangeFeed happily forwards any closed timestamps it receives as // soon as there are no outstanding intents under them. // Changefeeds don't care about these at all, so throw them out. @@ -139,7 +146,7 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error { } stop := p.st.RangefeedBufferCheckpoint.Start() if err := p.memBuf.Add( - ctx, kvevent.MakeResolvedEvent(e.RangeFeedEvent, jobspb.ResolvedSpan_NONE), + ctx, kvevent.MakeResolvedEvent(ev, jobspb.ResolvedSpan_NONE), ); err != nil { return err }