Skip to content

Commit

Permalink
changefeedccl: enable frontier quantization
Browse files Browse the repository at this point in the history
Previously, all checkpoint timestamps were forwarded precisely to the resolved
timestamp. To improve merging of adjacent frontier spans, this patch introduces
a frontier quantization mode. In this mode, resolved timestamps from rangefeeds
are rounded down to the nearest multiple of the quantization granularity.
Rounding down ensures no progress is missed and that we consistently track the
lowest possible resolved timestamp.

Resolves: #125723
Release note: changefeed.frontier.timestamp_granularity (default off)
has been introduced to enable more efficient tracking of resolved timestamps.
  • Loading branch information
wenyihu6 committed Jan 8, 2025
1 parent 4e68a4a commit 177c499
Show file tree
Hide file tree
Showing 6 changed files with 318 additions and 96 deletions.
47 changes: 24 additions & 23 deletions pkg/ccl/changefeedccl/changefeed_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,29 +512,30 @@ func (ca *changeAggregator) makeKVFeedCfg(
}

return kvfeed.Config{
Writer: buf,
Settings: cfg.Settings,
DB: cfg.DB.KV(),
Codec: cfg.Codec,
Clock: cfg.DB.KV().Clock(),
Spans: spans,
CheckpointSpans: ca.spec.Checkpoint.Spans,
CheckpointTimestamp: ca.spec.Checkpoint.Timestamp,
Targets: AllTargets(ca.spec.Feed),
Metrics: &ca.metrics.KVFeedMetrics,
MM: memMon,
InitialHighWater: initialHighWater,
EndTime: config.EndTime,
WithDiff: filters.WithDiff,
WithFiltering: filters.WithFiltering,
NeedsInitialScan: needsInitialScan,
SchemaChangeEvents: schemaChange.EventClass,
SchemaChangePolicy: schemaChange.Policy,
SchemaFeed: sf,
Knobs: ca.knobs.FeedKnobs,
ScopedTimers: ca.sliMetrics.Timers,
MonitoringCfg: monitoringCfg,
ConsumerID: int64(ca.spec.JobID),
Writer: buf,
Settings: cfg.Settings,
DB: cfg.DB.KV(),
Codec: cfg.Codec,
Clock: cfg.DB.KV().Clock(),
Spans: spans,
CheckpointSpans: ca.spec.Checkpoint.Spans,
CheckpointTimestamp: ca.spec.Checkpoint.Timestamp,
Targets: AllTargets(ca.spec.Feed),
Metrics: &ca.metrics.KVFeedMetrics,
MM: memMon,
InitialHighWater: initialHighWater,
EndTime: config.EndTime,
WithDiff: filters.WithDiff,
WithFiltering: filters.WithFiltering,
WithFrontierQuantize: changefeedbase.Quantize.Get(&cfg.Settings.SV),
NeedsInitialScan: needsInitialScan,
SchemaChangeEvents: schemaChange.EventClass,
SchemaChangePolicy: schemaChange.Policy,
SchemaFeed: sf,
Knobs: ca.knobs.FeedKnobs,
ScopedTimers: ca.sliMetrics.Timers,
MonitoringCfg: monitoringCfg,
ConsumerID: int64(ca.spec.JobID),
}, nil
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,3 +335,10 @@ var DefaultLaggingRangesThreshold = 3 * time.Minute
// DefaultLaggingRangesPollingInterval is the default polling rate at which
// lagging ranges are checked and metrics are updated.
var DefaultLaggingRangesPollingInterval = 1 * time.Minute

var Quantize = settings.RegisterDurationSettingWithExplicitUnit(
settings.ApplicationLevel,
"changefeed.resolved_timestamp.granularity",
"the granularity at which changefeed progress are quantized to make tracking more efficient",
0,
)
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/kvfeed/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ go_test(
"//pkg/kv",
"//pkg/kv/kvclient/kvcoord",
"//pkg/kv/kvpb",
"//pkg/kv/kvserver/rangefeed",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
Expand Down
89 changes: 50 additions & 39 deletions pkg/ccl/changefeedccl/kvfeed/kv_feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ type Config struct {
// enables filtering out any transactional writes with that flag set to true.
WithFiltering bool

// WithFrontierQuantize specifies the resolved timestamp quantization
// granularity. If non-zero, resolved timestamps from rangefeed checkpoint
// events will be rounded down to the nearest multiple of the quantization
// granularity.
WithFrontierQuantize time.Duration

// Knobs are kvfeed testing knobs.
Knobs TestingKnobs

Expand Down Expand Up @@ -126,6 +132,7 @@ func Run(ctx context.Context, cfg Config) error {
cfg.Writer, cfg.Spans, cfg.CheckpointSpans, cfg.CheckpointTimestamp,
cfg.SchemaChangeEvents, cfg.SchemaChangePolicy,
cfg.NeedsInitialScan, cfg.WithDiff, cfg.WithFiltering,
cfg.WithFrontierQuantize,
cfg.ConsumerID,
cfg.InitialHighWater, cfg.EndTime,
cfg.Codec,
Expand Down Expand Up @@ -246,17 +253,18 @@ func (e schemaChangeDetectedError) Error() string {
}

type kvFeed struct {
spans []roachpb.Span
checkpoint []roachpb.Span
checkpointTimestamp hlc.Timestamp
withDiff bool
withFiltering bool
withInitialBackfill bool
consumerID int64
initialHighWater hlc.Timestamp
endTime hlc.Timestamp
writer kvevent.Writer
codec keys.SQLCodec
spans []roachpb.Span
checkpoint []roachpb.Span
checkpointTimestamp hlc.Timestamp
withFrontierQuantize time.Duration
withDiff bool
withFiltering bool
withInitialBackfill bool
consumerID int64
initialHighWater hlc.Timestamp
endTime hlc.Timestamp
writer kvevent.Writer
codec keys.SQLCodec

onBackfillCallback func() func()
rangeObserver kvcoord.RangeObserver
Expand All @@ -283,6 +291,7 @@ func newKVFeed(
schemaChangeEvents changefeedbase.SchemaChangeEventClass,
schemaChangePolicy changefeedbase.SchemaChangePolicy,
withInitialBackfill, withDiff, withFiltering bool,
withFrontierQuantize time.Duration,
consumerID int64,
initialHighWater hlc.Timestamp,
endTime hlc.Timestamp,
Expand All @@ -296,26 +305,27 @@ func newKVFeed(
knobs TestingKnobs,
) *kvFeed {
return &kvFeed{
writer: writer,
spans: spans,
checkpoint: checkpoint,
checkpointTimestamp: checkpointTimestamp,
withInitialBackfill: withInitialBackfill,
withDiff: withDiff,
withFiltering: withFiltering,
consumerID: consumerID,
initialHighWater: initialHighWater,
endTime: endTime,
schemaChangeEvents: schemaChangeEvents,
schemaChangePolicy: schemaChangePolicy,
codec: codec,
tableFeed: tf,
scanner: sc,
physicalFeed: pff,
bufferFactory: bf,
targets: targets,
timers: ts,
knobs: knobs,
writer: writer,
spans: spans,
checkpoint: checkpoint,
checkpointTimestamp: checkpointTimestamp,
withInitialBackfill: withInitialBackfill,
withDiff: withDiff,
withFiltering: withFiltering,
withFrontierQuantize: withFrontierQuantize,
consumerID: consumerID,
initialHighWater: initialHighWater,
endTime: endTime,
schemaChangeEvents: schemaChangeEvents,
schemaChangePolicy: schemaChangePolicy,
codec: codec,
tableFeed: tf,
scanner: sc,
physicalFeed: pff,
bufferFactory: bf,
targets: targets,
timers: ts,
knobs: knobs,
}
}

Expand Down Expand Up @@ -598,14 +608,15 @@ func (f *kvFeed) runUntilTableEvent(ctx context.Context, resumeFrontier span.Fro

g := ctxgroup.WithContext(ctx)
physicalCfg := rangeFeedConfig{
Spans: stps,
Frontier: resumeFrontier.Frontier(),
WithDiff: f.withDiff,
WithFiltering: f.withFiltering,
ConsumerID: f.consumerID,
Knobs: f.knobs,
Timers: f.timers,
RangeObserver: f.rangeObserver,
Spans: stps,
Frontier: resumeFrontier.Frontier(),
WithDiff: f.withDiff,
WithFiltering: f.withFiltering,
WithFrontierQuantize: f.withFrontierQuantize,
ConsumerID: f.consumerID,
Knobs: f.knobs,
Timers: f.timers,
RangeObserver: f.rangeObserver,
}

// The following two synchronous calls works as follows:
Expand Down
Loading

0 comments on commit 177c499

Please sign in to comment.