-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement parallel ItemBlock processing via backup_controller goroutines #8659
base: main
Are you sure you want to change the base?
Conversation
Currently in draft, as I have not yet tested the changes in a cluster env. |
fa680a9
to
5cee460
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #8659 +/- ##
==========================================
+ Coverage 59.39% 59.48% +0.08%
==========================================
Files 370 371 +1
Lines 39989 40107 +118
==========================================
+ Hits 23752 23857 +105
- Misses 14745 14756 +11
- Partials 1492 1494 +2 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I have just one concern otherwise nothing stands out!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some typos. other than that no objections atm.
Note: I am not as familiar with select-case channel statements.
5cee460
to
d08bb75
Compare
Updated in response to PR comments. Will test in cluster tomorrow. |
dad8770
to
097941e
Compare
I've tested this in an aws cluster. Two namespaces, 4 pods, 2 volumes, and additional related resources. Using vmware-tanzu/velero-plugin-example#75 to force each item backup to take 5 seconds allowed me to verify parallel processing. Starting with 1 configured worker, up to 16 -- each time dropped the total backup time, and I was able to verify from the logs that the 5-second duration BIA actions were happening in parallel to each other in the expected number. Summary of backup time/results is below. Note that the differing item count on subsequent backups was mainly due to the inclusion of events. If those had been excluded, the time drop from one backup to the next would be more significant. Also note that with a regular real-world backup of this size, the time differences would be much smaller, as I've introduced an artificial time duration per item here. The real-world use case where parallel backup will be most pronounced would be a backup with a large number of small PVCs using CSI snapshots or datamover.
|
e1ab7c0
to
fd2ee86
Compare
rebased after #8664 merged |
@@ -391,7 +393,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers( | |||
|
|||
// the main backup process will send on this channel once | |||
// for every item it processes. | |||
update := make(chan progressUpdate) | |||
update := make(chan progressUpdate, 10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once we set the limit to inputChannel
, is it still necessary to have a limit here?
If not, let's not set it, so as to reduce the efforts of. future enhancement, performance tuning and troubleshooting
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't a limit -- this is the buffer size. Without it, the channel is unbuffered. It's not something that we expect to have any need to tweak via performance tuning, though. Having a buffer allows the goroutine that sends messages to the channel to return without blocking if the receiver doesn't immediately pick it up. In general, we want a non-zero buffer size, but it doesn't need to be large. Whether it's 5 or 10 or something else is probably not important, but we do want a buffer. The input channel is a completely separate channel with its own buffer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pkg/backup/backup.go
Outdated
} | ||
|
||
if len(response.itemBlock.Items) > 0 { | ||
log.WithFields(map[string]interface{}{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to output the progress as an Info log? It will flood the log for a large backup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the way progress has always been. Current PR just moves it to a different location. It doesn't currently flood the logs since there are usually several other logs for each progress log. I think we can change log frequency as a separate PR since this shouldn't change log frequency from Velero 1.15 or earlier.
"name": items[i].name, | ||
}).Infof("Backed up %d items out of an estimated total of %d (estimate will change throughout the backup)", backedUpItems, totalItems) | ||
// Wait for all the ItemBlocks to be processed | ||
select { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we don't have a way to break the itemblock processing halfway, so code branches like case <-responseCtx.Done()
will never be reached, right?
So is this responseCtx
for future enhancement, e.g., backup cancel?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this particular Done()
will be hit -- although when we do implement backup cancel, we may end up using it. @shawn-hurley This is similar to the analyzer engine code we had looked at which did have this ctx.Done()
there. Do you think it makes sense to leave it, or is it better to remove?
func (p *ItemBlockWorkerPool) Stop() { | ||
p.cancelFunc() | ||
p.logger.Info("ItemBlock worker stopping") | ||
p.wg.Wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add another log to indicate work pool stopped and the wait time
|
||
func StartItemBlockWorkerPool(ctx context.Context, workers int, log logrus.FieldLogger) *ItemBlockWorkerPool { | ||
// Buffer will hold up to 10 ItemBlocks waiting for processing | ||
inputChannel := make(chan ItemBlockInput, 10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add a variable for the input limit (10).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure we need a second configuration value here. This is just a buffer which sets how many inputs can be queued without blocking before a worker takes one. I can add it if we really want two configuration values here, but in most cases users won't really want to change this -- i.e. the same value that works with 1 worker (the default number of workers) is probably fine when they have 10 or 20 workers. So we can add it, but I don't expect tweaking it to change performance much (although setting it to a very small value (1 or 0) might negatively affect performance.
|
||
func StartItemBlockWorkerPool(ctx context.Context, workers int, log logrus.FieldLogger) *ItemBlockWorkerPool { | ||
// Buffer will hold up to 10 ItemBlocks waiting for processing | ||
inputChannel := make(chan ItemBlockInput, 10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we pick 10 as the hardcode value, instead of number of CPU cores?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CPU cores isn't relevant here. This is just the buffer -- the number of not-yet-taken input values that can be added to the channel before the adding process blocks. The default value for the number of goroutines processing inputs will be one.
|
||
func StartItemBlockWorkerPool(ctx context.Context, workers int, log logrus.FieldLogger) *ItemBlockWorkerPool { | ||
// Buffer will hold up to 10 ItemBlocks waiting for processing | ||
inputChannel := make(chan ItemBlockInput, 10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are are going to make this configurable and when?
What is the consideration?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See the above comment. I don't really see a big need to make it configurable -- the concern is that users might get confused with both a worker pool size configuration (how many worker threads, this config is already in place) and a channel buffer size configuration, which really won't affect performance much at all. I can certainly make this configurable, though, if there is a use case for it.
Signed-off-by: Scott Seago <[email protected]>
fd2ee86
to
91d7301
Compare
Thank you for contributing to Velero!
Please add a summary of your change
Parallel ItemBlock processing via backup_controller goroutines as described in phase 2 of https://github.com/vmware-tanzu/velero/blob/main/design/backup-performance-improvements.md
Does your change fix a particular issue?
Fixes #8334
Please indicate you've done the following:
make new-changelog
) or comment/kind changelog-not-required
on this PR.site/content/docs/main
.