Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: enable frontier quantization #138328

Merged
merged 1 commit into from
Jan 10, 2025

Conversation

wenyihu6
Copy link
Contributor

@wenyihu6 wenyihu6 commented Jan 6, 2025

Previously, all checkpoint timestamps were forwarded precisely to the resolved
timestamp. To improve merging of adjacent frontier spans, this patch introduces
a frontier quantization mode. In this mode, resolved timestamps from rangefeeds
are rounded down to the nearest multiple of the quantization granularity.
Rounding down ensures no progress is missed and that we consistently track the
lowest possible resolved timestamp.

Resolves: #125723
Release note: none

@cockroach-teamcity
Copy link
Member

This change is Reviewable

@wenyihu6 wenyihu6 marked this pull request as ready for review January 6, 2025 17:20
@wenyihu6 wenyihu6 requested a review from a team as a code owner January 6, 2025 17:20
@wenyihu6 wenyihu6 requested review from asg0451, rharding6373 and andyyang890 and removed request for a team January 6, 2025 17:20
@wenyihu6 wenyihu6 force-pushed the frontierquantize branch 2 times, most recently from ecc531b to 8eae6cf Compare January 6, 2025 18:07
@@ -128,7 +130,12 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error {
}
stop()
case *kvpb.RangeFeedCheckpoint:
if !t.ResolvedTS.IsEmpty() && t.ResolvedTS.Less(p.cfg.Frontier) {
ev := e.RangeFeedEvent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while this is a pretty simple implementation, would you mind investigating how much work it would be to switch to using the higher-level rangefeed client implementation that LDR uses? it already includes an implementation of quantization and more things that may prove useful. see https://github.com/asg0451/cockroach/blob/ed5ed37ea91b04fba061084ba73fe9225c018288/pkg/crosscluster/producer/event_stream.go#L147 &/ talk to me about it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's too much work / too incompatible an api, then maybe dont. however i think it would be beneficial if it's low to moderate effort

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will look into the refactoring. Can we try merging this PR anyways since this is pretty straightforward and already up? I will look into the refactoring as part of the follow up work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I put up #138346 and assigned myself.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks. lmk how it goes

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also be in favor of using the already-built frontier quantization rangefeed client code if it's not too complicated. Maybe you could try prototyping it and seeing how involved it would be?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ack, I'm exploring this option now and see how much work this turns out to be. I would still prefer to drive this PR forward given this is already up, but I'm actively prototyping for this refactoring.

Copy link
Contributor

@asg0451 asg0451 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good but i have some questions about the tests.

pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go Outdated Show resolved Hide resolved
pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go Outdated Show resolved Hide resolved
require.NoError(t, err)
}
frontier.Entries(func(sp roachpb.Span, ts hlc.Timestamp) span.OpResult {
t.Logf("span: %v, ts: %v\n", sp, ts)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit rm

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought it might be worth leaving it in since it was helpful for debugging test failures wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤷 if you think it will be helpful and it's not too spammy we can leave it in

@@ -128,7 +130,12 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error {
}
stop()
case *kvpb.RangeFeedCheckpoint:
if !t.ResolvedTS.IsEmpty() && t.ResolvedTS.Less(p.cfg.Frontier) {
ev := e.RangeFeedEvent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks. lmk how it goes

pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go Outdated Show resolved Hide resolved
pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go Outdated Show resolved Hide resolved
pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go Outdated Show resolved Hide resolved
@wenyihu6 wenyihu6 force-pushed the frontierquantize branch 2 times, most recently from cd1cba8 to 8bcc47f Compare January 6, 2025 22:16
@wenyihu6 wenyihu6 force-pushed the frontierquantize branch 2 times, most recently from 64820b0 to 3f5a7ef Compare January 6, 2025 22:24
Copy link
Collaborator

@andyyang890 andyyang890 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice work! Left some comments.

pkg/ccl/changefeedccl/kvfeed/physical_kv_feed.go Outdated Show resolved Hide resolved
@@ -128,7 +130,12 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error {
}
stop()
case *kvpb.RangeFeedCheckpoint:
if !t.ResolvedTS.IsEmpty() && t.ResolvedTS.Less(p.cfg.Frontier) {
ev := e.RangeFeedEvent
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also be in favor of using the already-built frontier quantization rangefeed client code if it's not too complicated. Maybe you could try prototyping it and seeing how involved it would be?

pkg/ccl/changefeedccl/kvfeed/kv_feed.go Outdated Show resolved Hide resolved
pkg/ccl/changefeedccl/changefeedbase/settings.go Outdated Show resolved Hide resolved
pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go Show resolved Hide resolved
pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go Outdated Show resolved Hide resolved
@wenyihu6 wenyihu6 force-pushed the frontierquantize branch 2 times, most recently from 30a63fc to 592c7b6 Compare January 7, 2025 21:27
pkg/ccl/changefeedccl/kvfeed/kv_feed_test.go Outdated Show resolved Hide resolved
Copy link
Collaborator

@andyyang890 andyyang890 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM modulo one comment

@wenyihu6 wenyihu6 force-pushed the frontierquantize branch 2 times, most recently from 488184b to 69f3484 Compare January 8, 2025 21:37
Copy link
Contributor Author

@wenyihu6 wenyihu6 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @andyyang890, @asg0451, @dt, and @rharding6373)

Previously, all checkpoint timestamps were forwarded precisely to the resolved
timestamp. To improve merging of adjacent frontier spans, this patch introduces
a frontier quantization mode. In this mode, resolved timestamps from rangefeeds
are rounded down to the nearest multiple of the quantization granularity.
Rounding down ensures no progress is missed and that we consistently track the
lowest possible resolved timestamp.

Resolves: cockroachdb#125723
Release note: none
@wenyihu6
Copy link
Contributor Author

TFTRs!

bors r=asg0451,andyyang890

@craig craig bot merged commit 3cc42e6 into cockroachdb:master Jan 10, 2025
22 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

changefeedccl: take advantage of quantized frontiers
5 participants