-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
changefeedccl: enable frontier quantization #138328
Conversation
ecc531b
to
8eae6cf
Compare
@@ -128,7 +130,12 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error { | |||
} | |||
stop() | |||
case *kvpb.RangeFeedCheckpoint: | |||
if !t.ResolvedTS.IsEmpty() && t.ResolvedTS.Less(p.cfg.Frontier) { | |||
ev := e.RangeFeedEvent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while this is a pretty simple implementation, would you mind investigating how much work it would be to switch to using the higher-level rangefeed client implementation that LDR uses? it already includes an implementation of quantization and more things that may prove useful. see https://github.com/asg0451/cockroach/blob/ed5ed37ea91b04fba061084ba73fe9225c018288/pkg/crosscluster/producer/event_stream.go#L147 &/ talk to me about it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it's too much work / too incompatible an api, then maybe dont. however i think it would be beneficial if it's low to moderate effort
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will look into the refactoring. Can we try merging this PR anyways since this is pretty straightforward and already up? I will look into the refactoring as part of the follow up work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I put up #138346 and assigned myself.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks. lmk how it goes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would also be in favor of using the already-built frontier quantization rangefeed client code if it's not too complicated. Maybe you could try prototyping it and seeing how involved it would be?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ack, I'm exploring this option now and see how much work this turns out to be. I would still prefer to drive this PR forward given this is already up, but I'm actively prototyping for this refactoring.
8eae6cf
to
d165a8c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks good but i have some questions about the tests.
require.NoError(t, err) | ||
} | ||
frontier.Entries(func(sp roachpb.Span, ts hlc.Timestamp) span.OpResult { | ||
t.Logf("span: %v, ts: %v\n", sp, ts) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit rm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought it might be worth leaving it in since it was helpful for debugging test failures wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤷 if you think it will be helpful and it's not too spammy we can leave it in
@@ -128,7 +130,12 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error { | |||
} | |||
stop() | |||
case *kvpb.RangeFeedCheckpoint: | |||
if !t.ResolvedTS.IsEmpty() && t.ResolvedTS.Less(p.cfg.Frontier) { | |||
ev := e.RangeFeedEvent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks. lmk how it goes
cd1cba8
to
8bcc47f
Compare
64820b0
to
3f5a7ef
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work! Left some comments.
@@ -128,7 +130,12 @@ func (p *rangefeed) addEventsToBuffer(ctx context.Context) error { | |||
} | |||
stop() | |||
case *kvpb.RangeFeedCheckpoint: | |||
if !t.ResolvedTS.IsEmpty() && t.ResolvedTS.Less(p.cfg.Frontier) { | |||
ev := e.RangeFeedEvent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would also be in favor of using the already-built frontier quantization rangefeed client code if it's not too complicated. Maybe you could try prototyping it and seeing how involved it would be?
3f5a7ef
to
bfe3ed5
Compare
30a63fc
to
592c7b6
Compare
592c7b6
to
177c499
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM modulo one comment
488184b
to
69f3484
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @andyyang890, @asg0451, @dt, and @rharding6373)
Previously, all checkpoint timestamps were forwarded precisely to the resolved timestamp. To improve merging of adjacent frontier spans, this patch introduces a frontier quantization mode. In this mode, resolved timestamps from rangefeeds are rounded down to the nearest multiple of the quantization granularity. Rounding down ensures no progress is missed and that we consistently track the lowest possible resolved timestamp. Resolves: cockroachdb#125723 Release note: none
69f3484
to
b3983bf
Compare
TFTRs! bors r=asg0451,andyyang890 |
Build succeeded: |
Previously, all checkpoint timestamps were forwarded precisely to the resolved
timestamp. To improve merging of adjacent frontier spans, this patch introduces
a frontier quantization mode. In this mode, resolved timestamps from rangefeeds
are rounded down to the nearest multiple of the quantization granularity.
Rounding down ensures no progress is missed and that we consistently track the
lowest possible resolved timestamp.
Resolves: #125723
Release note: none