Skip to content

Commit

Permalink
Implement parallel ItemBlock processing via backup_controller goroutines
Browse files Browse the repository at this point in the history
Signed-off-by: Scott Seago <[email protected]>
  • Loading branch information
sseago committed Feb 6, 2025
1 parent 223e1fc commit 4c6dfaa
Show file tree
Hide file tree
Showing 13 changed files with 377 additions and 74 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/8659-sseago
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement parallel ItemBlock processing via backup_controller goroutines
16 changes: 8 additions & 8 deletions design/backup-performance-improvements.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,25 +191,25 @@ type ItemBlockWorkerPool struct {
}

type ItemBlockInput struct {
itemBlock ItemBlock
itemBlock *BackupItemBlock
returnChan chan ItemBlockReturn
}

type ItemBlockReturn struct {
itemBlock ItemBlock
itemBlock *BackupItemBlock
resources []schema.GroupResource
err error
}

func (*p ItemBlockWorkerPool) getInputChannel() chan ItemBlockInput
func RunItemBlockWorkers(context context.Context, workers int)
func processItemBlocksWorker(context context.Context, itemBlockChannel chan ItemBlockInput, logger logrus.FieldLogger, wg *sync.WaitGroup)
func StartItemBlockWorkerPool(context context.Context, workers int, logger logrus.FieldLogger) ItemBlockWorkerPool
func processItemBlockWorker(context context.Context, itemBlockChannel chan ItemBlockInput, logger logrus.FieldLogger, wg *sync.WaitGroup)
```

The worker pool will be started by calling `RunItemBlockWorkers` in `backupReconciler.SetupWithManager`, passing in the worker count and reconciler context.
`SetupWithManager` will also add the input channel to the `itemBackupper` so that it will be available during backup processing.
The func `RunItemBlockWorkers` will create the `ItemBlockWorkerPool` with a shared buffered input channel (fixed buffer size) and start `workers` gororoutines which will each call `processItemBlocksWorker`.
The `processItemBlocksWorker` func (run by the worker goroutines) will read from `itemBlockChannel`, call `BackupItemBlock` on the retrieved `ItemBlock`, and then send the return value to the retrieved `returnChan`, and then process the next block.
The worker pool will be started by calling `StartItemBlockWorkerPool` in `NewBackupReconciler()`, passing in the worker count and reconciler context.
`backupreconciler.prepareBackupRequest` will also add the input channel to the `backupRequest` so that it will be available during backup processing.
The func `StartItemBlockWorkerPool` will create the `ItemBlockWorkerPool` with a shared buffered input channel (fixed buffer size) and start `workers` gororoutines which will each call `processItemBlockWorker`.
The `processItemBlockWorker` func (run by the worker goroutines) will read from `itemBlockChannel`, call `BackupItemBlock` on the retrieved `ItemBlock`, and then send the return value to the retrieved `returnChan`, and then process the next block.

#### Modify ItemBlock processing loop to send ItemBlocks to the worker pool rather than backing them up directly

Expand Down
15 changes: 15 additions & 0 deletions pkg/backup/backed_up_items_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ import (
type backedUpItemsMap struct {
*sync.RWMutex
backedUpItems map[itemKey]struct{}
totalItems map[itemKey]struct{}
}

func NewBackedUpItemsMap() *backedUpItemsMap {
return &backedUpItemsMap{
RWMutex: &sync.RWMutex{},
backedUpItems: make(map[itemKey]struct{}),
totalItems: make(map[itemKey]struct{}),
}
}

Expand Down Expand Up @@ -75,6 +77,12 @@ func (m *backedUpItemsMap) Len() int {
return len(m.backedUpItems)
}

func (m *backedUpItemsMap) BackedUpAndTotalLen() (int, int) {
m.RLock()
defer m.RUnlock()
return len(m.backedUpItems), len(m.totalItems)
}

func (m *backedUpItemsMap) Has(key itemKey) bool {
m.RLock()
defer m.RUnlock()
Expand All @@ -87,4 +95,11 @@ func (m *backedUpItemsMap) AddItem(key itemKey) {
m.Lock()
defer m.Unlock()
m.backedUpItems[key] = struct{}{}
m.totalItems[key] = struct{}{}
}

func (m *backedUpItemsMap) AddItemToTotal(key itemKey) {
m.Lock()
defer m.Unlock()
m.totalItems[key] = struct{}{}
}
138 changes: 101 additions & 37 deletions pkg/backup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"io"
"os"
"path/filepath"
"sync"
"time"

"github.com/pkg/errors"
Expand Down Expand Up @@ -237,7 +238,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
gzippedData := gzip.NewWriter(backupFile)
defer gzippedData.Close()

tw := tar.NewWriter(gzippedData)
tw := NewTarWriter(tar.NewWriter(gzippedData))
defer tw.Close()

log.Info("Writing backup version file")
Expand Down Expand Up @@ -378,6 +379,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
boolptr.IsSetToTrue(backupRequest.Spec.DefaultVolumesToFsBackup),
!backupRequest.ResourceIncludesExcludes.ShouldInclude(kuberesource.PersistentVolumeClaims.String()),
),
kubernetesBackupper: kb,
}

// helper struct to send current progress between the main
Expand All @@ -389,7 +391,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers(

// the main backup process will send on this channel once
// for every item it processes.
update := make(chan progressUpdate)
update := make(chan progressUpdate, 10)

// the main backup process will send on this channel when
// it's done sending progress updates
Expand Down Expand Up @@ -429,6 +431,8 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
}
}()

responseCtx, responseCancel := context.WithCancel(context.Background())

backedUpGroupResources := map[schema.GroupResource]bool{}
// Maps items in the item list from GR+NamespacedName to a slice of pointers to kubernetesResources
// We need the slice value since if the EnableAPIGroupVersions feature flag is set, there may
Expand All @@ -441,9 +445,60 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
Name: items[i].name,
}
itemsMap[key] = append(itemsMap[key], items[i])
// add to total items for progress reporting
if items[i].kind != "" {
backupRequest.BackedUpItems.AddItemToTotal(itemKey{
resource: fmt.Sprintf("%s/%s", items[i].preferredGVR.GroupVersion().String(), items[i].kind),
namespace: items[i].namespace,
name: items[i].name,
})
}
}

var itemBlock *BackupItemBlock
itemBlockReturn := make(chan ItemBlockReturn)
wg := &sync.WaitGroup{}
// Handle returns from worker pool processing ItemBlocks
go func() {
for {
select {
case response := <-itemBlockReturn: // process each BackupItemBlock response
func() {
defer wg.Done()
if response.err != nil {
log.WithError(errors.WithStack((response.err))).Error("Got error in BackupItemBlock.")
}

Check warning on line 470 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L469-L470

Added lines #L469 - L470 were not covered by tests
for _, backedUpGR := range response.resources {
backedUpGroupResources[backedUpGR] = true
}
// We could eventually track which itemBlocks have finished
// using response.itemBlock

// updated total is computed as "how many items we've backed up so far,
// plus how many items are processed but not yet backed up plus how many
// we know of that are remaining to be processed"
backedUpItems, totalItems := backupRequest.BackedUpItems.BackedUpAndTotalLen()

// send a progress update
update <- progressUpdate{
totalItems: totalItems,
itemsBackedUp: backedUpItems,
}

if len(response.itemBlock.Items) > 0 {
log.WithFields(map[string]interface{}{
"progress": "",
"kind": response.itemBlock.Items[0].Item.GroupVersionKind().GroupKind().String(),
"namespace": response.itemBlock.Items[0].Item.GetNamespace(),
"name": response.itemBlock.Items[0].Item.GetName(),
}).Infof("Backed up %d items out of an estimated total of %d (estimate will change throughout the backup)", backedUpItems, totalItems)
}
}()
case <-responseCtx.Done():
return
}
}
}()

for i := range items {
log.WithFields(map[string]interface{}{
Expand All @@ -458,7 +513,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
log.Debugf("Not creating new ItemBlock for %s %s/%s because it's already in an ItemBlock", items[i].groupResource.String(), items[i].namespace, items[i].name)
} else {
if itemBlock == nil {
itemBlock = NewBackupItemBlock(log, itemBackupper)
itemBlock = NewBackupItemBlock(log, itemBackupper, ctx)
}
var newBlockItem *unstructured.Unstructured

Expand Down Expand Up @@ -489,31 +544,31 @@ func (kb *kubernetesBackupper) BackupWithResolvers(
addNextToBlock := i < len(items)-1 && items[i].orderedResource && items[i+1].orderedResource && items[i].groupResource == items[i+1].groupResource
if itemBlock != nil && len(itemBlock.Items) > 0 && !addNextToBlock {
log.Infof("Backing Up Item Block including %s %s/%s (%v items in block)", items[i].groupResource.String(), items[i].namespace, items[i].name, len(itemBlock.Items))
backedUpGRs := kb.backupItemBlock(ctx, *itemBlock)
for _, backedUpGR := range backedUpGRs {
backedUpGroupResources[backedUpGR] = true

wg.Add(1)
backupRequest.ItemBlockChannel <- ItemBlockInput{
itemBlock: itemBlock,
returnChan: itemBlockReturn,
}
itemBlock = nil
}
}

// updated total is computed as "how many items we've backed up so far, plus
// how many items we know of that are remaining"
backedUpItems := backupRequest.BackedUpItems.Len()
totalItems := backedUpItems + (len(items) - (i + 1))

// send a progress update
update <- progressUpdate{
totalItems: totalItems,
itemsBackedUp: backedUpItems,
}
done := make(chan struct{})
go func() {
defer close(done)
wg.Wait()
}()

log.WithFields(map[string]interface{}{
"progress": "",
"resource": items[i].groupResource.String(),
"namespace": items[i].namespace,
"name": items[i].name,
}).Infof("Backed up %d items out of an estimated total of %d (estimate will change throughout the backup)", backedUpItems, totalItems)
// Wait for all the ItemBlocks to be processed
select {
case <-done:
log.Info("done processing ItemBlocks")
case <-responseCtx.Done():
log.Info("ItemBlock processing canceled")

Check warning on line 568 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L567-L568

Added lines #L567 - L568 were not covered by tests
}
// cancel response-processing goroutine
responseCancel()

// no more progress updates will be sent on the 'update' channel
quit <- struct{}{}
Expand Down Expand Up @@ -661,7 +716,7 @@ func (kb *kubernetesBackupper) executeItemBlockActions(
}
}

func (kb *kubernetesBackupper) backupItemBlock(ctx context.Context, itemBlock BackupItemBlock) []schema.GroupResource {
func (kb *kubernetesBackupper) backupItemBlock(itemBlock *BackupItemBlock) []schema.GroupResource {
// find pods in ItemBlock
// filter pods based on whether they still need to be backed up
// this list will be used to run pre/post hooks
Expand Down Expand Up @@ -695,15 +750,15 @@ func (kb *kubernetesBackupper) backupItemBlock(ctx context.Context, itemBlock Ba
itemBlock.Log.Debug("Backing up items in BackupItemBlock")
var grList []schema.GroupResource
for _, item := range itemBlock.Items {
if backedUp := kb.backupItem(itemBlock.Log, item.Gr, itemBlock.itemBackupper, item.Item, item.PreferredGVR, &itemBlock); backedUp {
if backedUp := kb.backupItem(itemBlock.Log, item.Gr, itemBlock.itemBackupper, item.Item, item.PreferredGVR, itemBlock); backedUp {
grList = append(grList, item.Gr)
}
}

if len(postHookPods) > 0 {
itemBlock.Log.Debug("Executing post hooks")
itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Add(1)
go kb.handleItemBlockPostHooks(ctx, itemBlock, postHookPods)
go kb.handleItemBlockPostHooks(itemBlock, postHookPods)
}

return grList
Expand All @@ -722,7 +777,7 @@ func (kb *kubernetesBackupper) getItemKey(item itemblock.ItemBlockItem) (itemKey
return key, nil
}

func (kb *kubernetesBackupper) handleItemBlockPreHooks(itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem) ([]itemblock.ItemBlockItem, []itemblock.ItemBlockItem, []error) {
func (kb *kubernetesBackupper) handleItemBlockPreHooks(itemBlock *BackupItemBlock, hookPods []itemblock.ItemBlockItem) ([]itemblock.ItemBlockItem, []itemblock.ItemBlockItem, []error) {
var successPods []itemblock.ItemBlockItem
var failedPods []itemblock.ItemBlockItem
var errs []error
Expand All @@ -739,12 +794,12 @@ func (kb *kubernetesBackupper) handleItemBlockPreHooks(itemBlock BackupItemBlock
}

// The hooks cannot execute until the PVBs to be processed
func (kb *kubernetesBackupper) handleItemBlockPostHooks(ctx context.Context, itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem) {
func (kb *kubernetesBackupper) handleItemBlockPostHooks(itemBlock *BackupItemBlock, hookPods []itemblock.ItemBlockItem) {
log := itemBlock.Log
defer itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Done()

// the post hooks will not execute until all PVBs of the item block pods are processed
if err := kb.waitUntilPVBsProcessed(ctx, log, itemBlock, hookPods); err != nil {
if err := kb.waitUntilPVBsProcessed(log, itemBlock, hookPods); err != nil {
log.WithError(err).Error("failed to wait PVBs processed for the ItemBlock")
return
}
Expand All @@ -758,7 +813,7 @@ func (kb *kubernetesBackupper) handleItemBlockPostHooks(ctx context.Context, ite
}

// wait all PVBs of the item block pods to be processed
func (kb *kubernetesBackupper) waitUntilPVBsProcessed(ctx context.Context, log logrus.FieldLogger, itemBlock BackupItemBlock, pods []itemblock.ItemBlockItem) error {
func (kb *kubernetesBackupper) waitUntilPVBsProcessed(log logrus.FieldLogger, itemBlock *BackupItemBlock, pods []itemblock.ItemBlockItem) error {
pvbMap := map[*velerov1api.PodVolumeBackup]bool{}
for _, pod := range pods {
namespace, name := pod.Item.GetNamespace(), pod.Item.GetName()
Expand Down Expand Up @@ -795,7 +850,7 @@ func (kb *kubernetesBackupper) waitUntilPVBsProcessed(ctx context.Context, log l
return allProcessed, nil
}

return wait.PollUntilContextCancel(ctx, 5*time.Second, true, checkFunc)
return wait.PollUntilContextCancel(itemBlock.pvbTimeoutCtx, 5*time.Second, true, checkFunc)
}

func (kb *kubernetesBackupper) backupItem(log logrus.FieldLogger, gr schema.GroupResource, itemBackupper *itemBackupper, unstructured *unstructured.Unstructured, preferredGVR schema.GroupVersionResource, itemBlock *BackupItemBlock) bool {
Expand Down Expand Up @@ -881,7 +936,7 @@ func (kb *kubernetesBackupper) backupCRD(log logrus.FieldLogger, gr schema.Group
kb.backupItem(log, gvr.GroupResource(), itemBackupper, unstructured, gvr, nil)
}

func (kb *kubernetesBackupper) writeBackupVersion(tw *tar.Writer) error {
func (kb *kubernetesBackupper) writeBackupVersion(tw tarWriter) error {
versionFile := filepath.Join(velerov1api.MetadataDir, "version")
versionString := fmt.Sprintf("%s\n", BackupFormatVersion)

Expand Down Expand Up @@ -912,7 +967,7 @@ func (kb *kubernetesBackupper) FinalizeBackup(
) error {
gzw := gzip.NewWriter(outBackupFile)
defer gzw.Close()
tw := tar.NewWriter(gzw)
tw := NewTarWriter(tar.NewWriter(gzw))

Check warning on line 970 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L970

Added line #L970 was not covered by tests
defer tw.Close()

gzr, err := gzip.NewReader(inBackupFile)
Expand Down Expand Up @@ -966,6 +1021,7 @@ func (kb *kubernetesBackupper) FinalizeBackup(
itemHookHandler: &hook.NoOpItemHookHandler{},
podVolumeSnapshotTracker: podvolume.NewTracker(),
hookTracker: hook.NewHookTracker(),
kubernetesBackupper: kb,

Check warning on line 1024 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L1024

Added line #L1024 was not covered by tests
}
updateFiles := make(map[string]FileForArchive)
backedUpGroupResources := map[schema.GroupResource]bool{}
Expand Down Expand Up @@ -1051,7 +1107,9 @@ func (kb *kubernetesBackupper) FinalizeBackup(
return nil
}

func buildFinalTarball(tr *tar.Reader, tw *tar.Writer, updateFiles map[string]FileForArchive) error {
func buildFinalTarball(tr *tar.Reader, tw tarWriter, updateFiles map[string]FileForArchive) error {
tw.Lock()
defer tw.Unlock()

Check warning on line 1112 in pkg/backup/backup.go

View check run for this annotation

Codecov / codecov/patch

pkg/backup/backup.go#L1110-L1112

Added lines #L1110 - L1112 were not covered by tests
for {
header, err := tr.Next()
if err == io.EOF {
Expand Down Expand Up @@ -1102,10 +1160,16 @@ func buildFinalTarball(tr *tar.Reader, tw *tar.Writer, updateFiles map[string]Fi
return nil
}

type tarWriter interface {
io.Closer
Write([]byte) (int, error)
WriteHeader(*tar.Header) error
type tarWriter struct {
*tar.Writer
*sync.Mutex
}

func NewTarWriter(writer *tar.Writer) tarWriter {
return tarWriter{
Writer: writer,
Mutex: &sync.Mutex{},
}
}

// updateVolumeInfos update the VolumeInfos according to the AsyncOperations
Expand Down
Loading

0 comments on commit 4c6dfaa

Please sign in to comment.