Skip to content

Commit

Permalink
changefeedccl: enable frontier quantization
Browse files Browse the repository at this point in the history
Previously, all checkpoint timestamps were forwarded precisely to the resolved
timestamp. To improve merging of adjacent frontier spans, this patch introduces
a frontier quantization mode. In this mode, resolved timestamps from rangefeeds
are rounded down to the nearest multiple of the quantization granularity.
Rounding down ensures no progress is missed and that we consistently track the
lowest possible resolved timestamp.

Resolves: #125723
Release note: changefeed.frontier.timestamp_granularity (default off)
has been introduced to enable more efficient tracking of resolved timestamps.
  • Loading branch information
wenyihu6 committed Jan 6, 2025
1 parent 4e68a4a commit 64820b0
Show file tree
Hide file tree
Showing 5 changed files with 299 additions and 73 deletions.
7 changes: 7 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,3 +335,10 @@ var DefaultLaggingRangesThreshold = 3 * time.Minute
// DefaultLaggingRangesPollingInterval is the default polling rate at which
// lagging ranges are checked and metrics are updated.
var DefaultLaggingRangesPollingInterval = 1 * time.Minute

var Quantize = settings.RegisterDurationSettingWithExplicitUnit(
settings.ApplicationLevel,
"changefeed.frontier.timestamp_granularity",
"the granularity at which changefeed progress are quantized to make tracking more efficient",
0,
)
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/kvfeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ go_test(
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/rangefeed",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
Expand Down
83 changes: 44 additions & 39 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func Run(ctx context.Context, cfg Config) error {
cfg.Writer, cfg.Spans, cfg.CheckpointSpans, cfg.CheckpointTimestamp,
cfg.SchemaChangeEvents, cfg.SchemaChangePolicy,
cfg.NeedsInitialScan, cfg.WithDiff, cfg.WithFiltering,
changefeedbase.Quantize.Get(&cfg.Settings.SV),
cfg.ConsumerID,
cfg.InitialHighWater, cfg.EndTime,
cfg.Codec,
Expand Down Expand Up @@ -246,17 +247,18 @@ func (e schemaChangeDetectedError) Error() string {
}

type kvFeed struct {
spans []roachpb.Span
checkpoint []roachpb.Span
checkpointTimestamp hlc.Timestamp
withDiff bool
withFiltering bool
withInitialBackfill bool
consumerID int64
initialHighWater hlc.Timestamp
endTime hlc.Timestamp
writer kvevent.Writer
codec keys.SQLCodec
spans []roachpb.Span
checkpoint []roachpb.Span
checkpointTimestamp hlc.Timestamp
withFrontierQuantize time.Duration
withDiff bool
withFiltering bool
withInitialBackfill bool
consumerID int64
initialHighWater hlc.Timestamp
endTime hlc.Timestamp
writer kvevent.Writer
codec keys.SQLCodec

onBackfillCallback func() func()
rangeObserver kvcoord.RangeObserver
Expand All @@ -283,6 +285,7 @@ func newKVFeed(
schemaChangeEvents changefeedbase.SchemaChangeEventClass,
schemaChangePolicy changefeedbase.SchemaChangePolicy,
withInitialBackfill, withDiff, withFiltering bool,
withFrontierQuantize time.Duration,
consumerID int64,
initialHighWater hlc.Timestamp,
endTime hlc.Timestamp,
Expand All @@ -296,26 +299,27 @@ func newKVFeed(
knobs TestingKnobs,
) *kvFeed {
return &kvFeed{
writer: writer,
spans: spans,
checkpoint: checkpoint,
checkpointTimestamp: checkpointTimestamp,
withInitialBackfill: withInitialBackfill,
withDiff: withDiff,
withFiltering: withFiltering,
consumerID: consumerID,
initialHighWater: initialHighWater,
endTime: endTime,
schemaChangeEvents: schemaChangeEvents,
schemaChangePolicy: schemaChangePolicy,
codec: codec,
tableFeed: tf,
scanner: sc,
physicalFeed: pff,
bufferFactory: bf,
targets: targets,
timers: ts,
knobs: knobs,
writer: writer,
spans: spans,
checkpoint: checkpoint,
checkpointTimestamp: checkpointTimestamp,
withInitialBackfill: withInitialBackfill,
withDiff: withDiff,
withFiltering: withFiltering,
withFrontierQuantize: withFrontierQuantize,
consumerID: consumerID,
initialHighWater: initialHighWater,
endTime: endTime,
schemaChangeEvents: schemaChangeEvents,
schemaChangePolicy: schemaChangePolicy,
codec: codec,
tableFeed: tf,
scanner: sc,
physicalFeed: pff,
bufferFactory: bf,
targets: targets,
timers: ts,
knobs: knobs,
}
}

Expand Down Expand Up @@ -598,14 +602,15 @@ func (f *kvFeed) runUntilTableEvent(ctx context.Context, resumeFrontier span.Fro

g := ctxgroup.WithContext(ctx)
physicalCfg := rangeFeedConfig{
Spans: stps,
Frontier: resumeFrontier.Frontier(),
WithDiff: f.withDiff,
WithFiltering: f.withFiltering,
ConsumerID: f.consumerID,
Knobs: f.knobs,
Timers: f.timers,
RangeObserver: f.rangeObserver,
Spans: stps,
Frontier: resumeFrontier.Frontier(),
WithDiff: f.withDiff,
WithFiltering: f.withFiltering,
WithFrontierQuantize: f.withFrontierQuantize,
ConsumerID: f.consumerID,
Knobs: f.knobs,
Timers: f.timers,
RangeObserver: f.rangeObserver,
}

// The following two synchronous calls works as follows:
Expand Down
Loading

0 comments on commit 64820b0

Please sign in to comment.