From 4c6dfaa2d4400924e8f856514503486d52edcda7 Mon Sep 17 00:00:00 2001 From: Scott Seago Date: Thu, 16 Jan 2025 18:07:59 -0500 Subject: [PATCH] Implement parallel ItemBlock processing via backup_controller goroutines Signed-off-by: Scott Seago --- changelogs/unreleased/8659-sseago | 1 + design/backup-performance-improvements.md | 16 +-- pkg/backup/backed_up_items_map.go | 15 +++ pkg/backup/backup.go | 138 ++++++++++++++++------ pkg/backup/backup_test.go | 137 ++++++++++++++++----- pkg/backup/item_backupper.go | 3 + pkg/backup/item_block_worker_pool.go | 98 +++++++++++++++ pkg/backup/item_collector.go | 5 + pkg/backup/itemblock.go | 5 +- pkg/backup/request.go | 1 + pkg/controller/backup_controller.go | 3 + pkg/controller/backup_controller_test.go | 16 +++ pkg/podvolume/snapshot_tracker.go | 13 +- 13 files changed, 377 insertions(+), 74 deletions(-) create mode 100644 changelogs/unreleased/8659-sseago create mode 100644 pkg/backup/item_block_worker_pool.go diff --git a/changelogs/unreleased/8659-sseago b/changelogs/unreleased/8659-sseago new file mode 100644 index 0000000000..d00d889bd4 --- /dev/null +++ b/changelogs/unreleased/8659-sseago @@ -0,0 +1 @@ +Implement parallel ItemBlock processing via backup_controller goroutines diff --git a/design/backup-performance-improvements.md b/design/backup-performance-improvements.md index 8a030e0010..f2a3c1d0f8 100644 --- a/design/backup-performance-improvements.md +++ b/design/backup-performance-improvements.md @@ -191,25 +191,25 @@ type ItemBlockWorkerPool struct { } type ItemBlockInput struct { - itemBlock ItemBlock + itemBlock *BackupItemBlock returnChan chan ItemBlockReturn } type ItemBlockReturn struct { - itemBlock ItemBlock + itemBlock *BackupItemBlock resources []schema.GroupResource err error } func (*p ItemBlockWorkerPool) getInputChannel() chan ItemBlockInput -func RunItemBlockWorkers(context context.Context, workers int) -func processItemBlocksWorker(context context.Context, itemBlockChannel chan ItemBlockInput, logger logrus.FieldLogger, wg *sync.WaitGroup) +func StartItemBlockWorkerPool(context context.Context, workers int, logger logrus.FieldLogger) ItemBlockWorkerPool +func processItemBlockWorker(context context.Context, itemBlockChannel chan ItemBlockInput, logger logrus.FieldLogger, wg *sync.WaitGroup) ``` -The worker pool will be started by calling `RunItemBlockWorkers` in `backupReconciler.SetupWithManager`, passing in the worker count and reconciler context. -`SetupWithManager` will also add the input channel to the `itemBackupper` so that it will be available during backup processing. -The func `RunItemBlockWorkers` will create the `ItemBlockWorkerPool` with a shared buffered input channel (fixed buffer size) and start `workers` gororoutines which will each call `processItemBlocksWorker`. -The `processItemBlocksWorker` func (run by the worker goroutines) will read from `itemBlockChannel`, call `BackupItemBlock` on the retrieved `ItemBlock`, and then send the return value to the retrieved `returnChan`, and then process the next block. +The worker pool will be started by calling `StartItemBlockWorkerPool` in `NewBackupReconciler()`, passing in the worker count and reconciler context. +`backupreconciler.prepareBackupRequest` will also add the input channel to the `backupRequest` so that it will be available during backup processing. +The func `StartItemBlockWorkerPool` will create the `ItemBlockWorkerPool` with a shared buffered input channel (fixed buffer size) and start `workers` gororoutines which will each call `processItemBlockWorker`. +The `processItemBlockWorker` func (run by the worker goroutines) will read from `itemBlockChannel`, call `BackupItemBlock` on the retrieved `ItemBlock`, and then send the return value to the retrieved `returnChan`, and then process the next block. #### Modify ItemBlock processing loop to send ItemBlocks to the worker pool rather than backing them up directly diff --git a/pkg/backup/backed_up_items_map.go b/pkg/backup/backed_up_items_map.go index 0ad4df6d09..f5764cd8ec 100644 --- a/pkg/backup/backed_up_items_map.go +++ b/pkg/backup/backed_up_items_map.go @@ -26,12 +26,14 @@ import ( type backedUpItemsMap struct { *sync.RWMutex backedUpItems map[itemKey]struct{} + totalItems map[itemKey]struct{} } func NewBackedUpItemsMap() *backedUpItemsMap { return &backedUpItemsMap{ RWMutex: &sync.RWMutex{}, backedUpItems: make(map[itemKey]struct{}), + totalItems: make(map[itemKey]struct{}), } } @@ -75,6 +77,12 @@ func (m *backedUpItemsMap) Len() int { return len(m.backedUpItems) } +func (m *backedUpItemsMap) BackedUpAndTotalLen() (int, int) { + m.RLock() + defer m.RUnlock() + return len(m.backedUpItems), len(m.totalItems) +} + func (m *backedUpItemsMap) Has(key itemKey) bool { m.RLock() defer m.RUnlock() @@ -87,4 +95,11 @@ func (m *backedUpItemsMap) AddItem(key itemKey) { m.Lock() defer m.Unlock() m.backedUpItems[key] = struct{}{} + m.totalItems[key] = struct{}{} +} + +func (m *backedUpItemsMap) AddItemToTotal(key itemKey) { + m.Lock() + defer m.Unlock() + m.totalItems[key] = struct{}{} } diff --git a/pkg/backup/backup.go b/pkg/backup/backup.go index 280164cc40..2d80a39039 100644 --- a/pkg/backup/backup.go +++ b/pkg/backup/backup.go @@ -26,6 +26,7 @@ import ( "io" "os" "path/filepath" + "sync" "time" "github.com/pkg/errors" @@ -237,7 +238,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers( gzippedData := gzip.NewWriter(backupFile) defer gzippedData.Close() - tw := tar.NewWriter(gzippedData) + tw := NewTarWriter(tar.NewWriter(gzippedData)) defer tw.Close() log.Info("Writing backup version file") @@ -378,6 +379,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers( boolptr.IsSetToTrue(backupRequest.Spec.DefaultVolumesToFsBackup), !backupRequest.ResourceIncludesExcludes.ShouldInclude(kuberesource.PersistentVolumeClaims.String()), ), + kubernetesBackupper: kb, } // helper struct to send current progress between the main @@ -389,7 +391,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers( // the main backup process will send on this channel once // for every item it processes. - update := make(chan progressUpdate) + update := make(chan progressUpdate, 10) // the main backup process will send on this channel when // it's done sending progress updates @@ -429,6 +431,8 @@ func (kb *kubernetesBackupper) BackupWithResolvers( } }() + responseCtx, responseCancel := context.WithCancel(context.Background()) + backedUpGroupResources := map[schema.GroupResource]bool{} // Maps items in the item list from GR+NamespacedName to a slice of pointers to kubernetesResources // We need the slice value since if the EnableAPIGroupVersions feature flag is set, there may @@ -441,9 +445,60 @@ func (kb *kubernetesBackupper) BackupWithResolvers( Name: items[i].name, } itemsMap[key] = append(itemsMap[key], items[i]) + // add to total items for progress reporting + if items[i].kind != "" { + backupRequest.BackedUpItems.AddItemToTotal(itemKey{ + resource: fmt.Sprintf("%s/%s", items[i].preferredGVR.GroupVersion().String(), items[i].kind), + namespace: items[i].namespace, + name: items[i].name, + }) + } } var itemBlock *BackupItemBlock + itemBlockReturn := make(chan ItemBlockReturn) + wg := &sync.WaitGroup{} + // Handle returns from worker pool processing ItemBlocks + go func() { + for { + select { + case response := <-itemBlockReturn: // process each BackupItemBlock response + func() { + defer wg.Done() + if response.err != nil { + log.WithError(errors.WithStack((response.err))).Error("Got error in BackupItemBlock.") + } + for _, backedUpGR := range response.resources { + backedUpGroupResources[backedUpGR] = true + } + // We could eventually track which itemBlocks have finished + // using response.itemBlock + + // updated total is computed as "how many items we've backed up so far, + // plus how many items are processed but not yet backed up plus how many + // we know of that are remaining to be processed" + backedUpItems, totalItems := backupRequest.BackedUpItems.BackedUpAndTotalLen() + + // send a progress update + update <- progressUpdate{ + totalItems: totalItems, + itemsBackedUp: backedUpItems, + } + + if len(response.itemBlock.Items) > 0 { + log.WithFields(map[string]interface{}{ + "progress": "", + "kind": response.itemBlock.Items[0].Item.GroupVersionKind().GroupKind().String(), + "namespace": response.itemBlock.Items[0].Item.GetNamespace(), + "name": response.itemBlock.Items[0].Item.GetName(), + }).Infof("Backed up %d items out of an estimated total of %d (estimate will change throughout the backup)", backedUpItems, totalItems) + } + }() + case <-responseCtx.Done(): + return + } + } + }() for i := range items { log.WithFields(map[string]interface{}{ @@ -458,7 +513,7 @@ func (kb *kubernetesBackupper) BackupWithResolvers( log.Debugf("Not creating new ItemBlock for %s %s/%s because it's already in an ItemBlock", items[i].groupResource.String(), items[i].namespace, items[i].name) } else { if itemBlock == nil { - itemBlock = NewBackupItemBlock(log, itemBackupper) + itemBlock = NewBackupItemBlock(log, itemBackupper, ctx) } var newBlockItem *unstructured.Unstructured @@ -489,31 +544,31 @@ func (kb *kubernetesBackupper) BackupWithResolvers( addNextToBlock := i < len(items)-1 && items[i].orderedResource && items[i+1].orderedResource && items[i].groupResource == items[i+1].groupResource if itemBlock != nil && len(itemBlock.Items) > 0 && !addNextToBlock { log.Infof("Backing Up Item Block including %s %s/%s (%v items in block)", items[i].groupResource.String(), items[i].namespace, items[i].name, len(itemBlock.Items)) - backedUpGRs := kb.backupItemBlock(ctx, *itemBlock) - for _, backedUpGR := range backedUpGRs { - backedUpGroupResources[backedUpGR] = true + + wg.Add(1) + backupRequest.ItemBlockChannel <- ItemBlockInput{ + itemBlock: itemBlock, + returnChan: itemBlockReturn, } itemBlock = nil } + } - // updated total is computed as "how many items we've backed up so far, plus - // how many items we know of that are remaining" - backedUpItems := backupRequest.BackedUpItems.Len() - totalItems := backedUpItems + (len(items) - (i + 1)) - - // send a progress update - update <- progressUpdate{ - totalItems: totalItems, - itemsBackedUp: backedUpItems, - } + done := make(chan struct{}) + go func() { + defer close(done) + wg.Wait() + }() - log.WithFields(map[string]interface{}{ - "progress": "", - "resource": items[i].groupResource.String(), - "namespace": items[i].namespace, - "name": items[i].name, - }).Infof("Backed up %d items out of an estimated total of %d (estimate will change throughout the backup)", backedUpItems, totalItems) + // Wait for all the ItemBlocks to be processed + select { + case <-done: + log.Info("done processing ItemBlocks") + case <-responseCtx.Done(): + log.Info("ItemBlock processing canceled") } + // cancel response-processing goroutine + responseCancel() // no more progress updates will be sent on the 'update' channel quit <- struct{}{} @@ -661,7 +716,7 @@ func (kb *kubernetesBackupper) executeItemBlockActions( } } -func (kb *kubernetesBackupper) backupItemBlock(ctx context.Context, itemBlock BackupItemBlock) []schema.GroupResource { +func (kb *kubernetesBackupper) backupItemBlock(itemBlock *BackupItemBlock) []schema.GroupResource { // find pods in ItemBlock // filter pods based on whether they still need to be backed up // this list will be used to run pre/post hooks @@ -695,7 +750,7 @@ func (kb *kubernetesBackupper) backupItemBlock(ctx context.Context, itemBlock Ba itemBlock.Log.Debug("Backing up items in BackupItemBlock") var grList []schema.GroupResource for _, item := range itemBlock.Items { - if backedUp := kb.backupItem(itemBlock.Log, item.Gr, itemBlock.itemBackupper, item.Item, item.PreferredGVR, &itemBlock); backedUp { + if backedUp := kb.backupItem(itemBlock.Log, item.Gr, itemBlock.itemBackupper, item.Item, item.PreferredGVR, itemBlock); backedUp { grList = append(grList, item.Gr) } } @@ -703,7 +758,7 @@ func (kb *kubernetesBackupper) backupItemBlock(ctx context.Context, itemBlock Ba if len(postHookPods) > 0 { itemBlock.Log.Debug("Executing post hooks") itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Add(1) - go kb.handleItemBlockPostHooks(ctx, itemBlock, postHookPods) + go kb.handleItemBlockPostHooks(itemBlock, postHookPods) } return grList @@ -722,7 +777,7 @@ func (kb *kubernetesBackupper) getItemKey(item itemblock.ItemBlockItem) (itemKey return key, nil } -func (kb *kubernetesBackupper) handleItemBlockPreHooks(itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem) ([]itemblock.ItemBlockItem, []itemblock.ItemBlockItem, []error) { +func (kb *kubernetesBackupper) handleItemBlockPreHooks(itemBlock *BackupItemBlock, hookPods []itemblock.ItemBlockItem) ([]itemblock.ItemBlockItem, []itemblock.ItemBlockItem, []error) { var successPods []itemblock.ItemBlockItem var failedPods []itemblock.ItemBlockItem var errs []error @@ -739,12 +794,12 @@ func (kb *kubernetesBackupper) handleItemBlockPreHooks(itemBlock BackupItemBlock } // The hooks cannot execute until the PVBs to be processed -func (kb *kubernetesBackupper) handleItemBlockPostHooks(ctx context.Context, itemBlock BackupItemBlock, hookPods []itemblock.ItemBlockItem) { +func (kb *kubernetesBackupper) handleItemBlockPostHooks(itemBlock *BackupItemBlock, hookPods []itemblock.ItemBlockItem) { log := itemBlock.Log defer itemBlock.itemBackupper.hookTracker.AsyncItemBlocks.Done() // the post hooks will not execute until all PVBs of the item block pods are processed - if err := kb.waitUntilPVBsProcessed(ctx, log, itemBlock, hookPods); err != nil { + if err := kb.waitUntilPVBsProcessed(log, itemBlock, hookPods); err != nil { log.WithError(err).Error("failed to wait PVBs processed for the ItemBlock") return } @@ -758,7 +813,7 @@ func (kb *kubernetesBackupper) handleItemBlockPostHooks(ctx context.Context, ite } // wait all PVBs of the item block pods to be processed -func (kb *kubernetesBackupper) waitUntilPVBsProcessed(ctx context.Context, log logrus.FieldLogger, itemBlock BackupItemBlock, pods []itemblock.ItemBlockItem) error { +func (kb *kubernetesBackupper) waitUntilPVBsProcessed(log logrus.FieldLogger, itemBlock *BackupItemBlock, pods []itemblock.ItemBlockItem) error { pvbMap := map[*velerov1api.PodVolumeBackup]bool{} for _, pod := range pods { namespace, name := pod.Item.GetNamespace(), pod.Item.GetName() @@ -795,7 +850,7 @@ func (kb *kubernetesBackupper) waitUntilPVBsProcessed(ctx context.Context, log l return allProcessed, nil } - return wait.PollUntilContextCancel(ctx, 5*time.Second, true, checkFunc) + return wait.PollUntilContextCancel(itemBlock.pvbTimeoutCtx, 5*time.Second, true, checkFunc) } func (kb *kubernetesBackupper) backupItem(log logrus.FieldLogger, gr schema.GroupResource, itemBackupper *itemBackupper, unstructured *unstructured.Unstructured, preferredGVR schema.GroupVersionResource, itemBlock *BackupItemBlock) bool { @@ -881,7 +936,7 @@ func (kb *kubernetesBackupper) backupCRD(log logrus.FieldLogger, gr schema.Group kb.backupItem(log, gvr.GroupResource(), itemBackupper, unstructured, gvr, nil) } -func (kb *kubernetesBackupper) writeBackupVersion(tw *tar.Writer) error { +func (kb *kubernetesBackupper) writeBackupVersion(tw tarWriter) error { versionFile := filepath.Join(velerov1api.MetadataDir, "version") versionString := fmt.Sprintf("%s\n", BackupFormatVersion) @@ -912,7 +967,7 @@ func (kb *kubernetesBackupper) FinalizeBackup( ) error { gzw := gzip.NewWriter(outBackupFile) defer gzw.Close() - tw := tar.NewWriter(gzw) + tw := NewTarWriter(tar.NewWriter(gzw)) defer tw.Close() gzr, err := gzip.NewReader(inBackupFile) @@ -966,6 +1021,7 @@ func (kb *kubernetesBackupper) FinalizeBackup( itemHookHandler: &hook.NoOpItemHookHandler{}, podVolumeSnapshotTracker: podvolume.NewTracker(), hookTracker: hook.NewHookTracker(), + kubernetesBackupper: kb, } updateFiles := make(map[string]FileForArchive) backedUpGroupResources := map[schema.GroupResource]bool{} @@ -1051,7 +1107,9 @@ func (kb *kubernetesBackupper) FinalizeBackup( return nil } -func buildFinalTarball(tr *tar.Reader, tw *tar.Writer, updateFiles map[string]FileForArchive) error { +func buildFinalTarball(tr *tar.Reader, tw tarWriter, updateFiles map[string]FileForArchive) error { + tw.Lock() + defer tw.Unlock() for { header, err := tr.Next() if err == io.EOF { @@ -1102,10 +1160,16 @@ func buildFinalTarball(tr *tar.Reader, tw *tar.Writer, updateFiles map[string]Fi return nil } -type tarWriter interface { - io.Closer - Write([]byte) (int, error) - WriteHeader(*tar.Header) error +type tarWriter struct { + *tar.Writer + *sync.Mutex +} + +func NewTarWriter(writer *tar.Writer) tarWriter { + return tarWriter{ + Writer: writer, + Mutex: &sync.Mutex{}, + } } // updateVolumeInfos update the VolumeInfos according to the AsyncOperations diff --git a/pkg/backup/backup_test.go b/pkg/backup/backup_test.go index 425fa827c5..71d7a12c87 100644 --- a/pkg/backup/backup_test.go +++ b/pkg/backup/backup_test.go @@ -72,11 +72,14 @@ func TestBackedUpItemsMatchesTarballContents(t *testing.T) { "v1/PersistentVolume": "persistentvolumes", } - h := newHarness(t) + h := newHarness(t, nil) + defer h.itemBlockPool.Stop() + req := &Request{ Backup: defaultBackup().Result(), SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: h.itemBlockPool.GetInputChannel(), } backupFile := bytes.NewBuffer([]byte{}) @@ -132,11 +135,13 @@ func TestBackedUpItemsMatchesTarballContents(t *testing.T) { // backed up. It validates this by comparing their values to the length of // the request's BackedUpItems field. func TestBackupProgressIsUpdated(t *testing.T) { - h := newHarness(t) + h := newHarness(t, nil) + defer h.itemBlockPool.Stop() req := &Request{ Backup: defaultBackup().Result(), SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: h.itemBlockPool.GetInputChannel(), } backupFile := bytes.NewBuffer([]byte{}) @@ -866,14 +871,17 @@ func TestBackupOldResourceFiltering(t *testing.T) { }, } + itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger()) + defer itemBlockPool.Stop() for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { var ( - h = newHarness(t) + h = newHarness(t, itemBlockPool) req = &Request{ Backup: tc.backup, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: itemBlockPool.GetInputChannel(), } backupFile = bytes.NewBuffer([]byte{}) ) @@ -1044,14 +1052,17 @@ func TestCRDInclusion(t *testing.T) { }, } + itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger()) + defer itemBlockPool.Stop() for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { var ( - h = newHarness(t) + h = newHarness(t, itemBlockPool) req = &Request{ Backup: tc.backup, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: itemBlockPool.GetInputChannel(), } backupFile = bytes.NewBuffer([]byte{}) ) @@ -1140,14 +1151,17 @@ func TestBackupResourceCohabitation(t *testing.T) { }, } + itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger()) + defer itemBlockPool.Stop() for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { var ( - h = newHarness(t) + h = newHarness(t, itemBlockPool) req = &Request{ Backup: tc.backup, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: itemBlockPool.GetInputChannel(), } backupFile = bytes.NewBuffer([]byte{}) ) @@ -1168,13 +1182,15 @@ func TestBackupResourceCohabitation(t *testing.T) { // backed up in each backup. Verification is done by looking at the contents of the backup // tarball. This covers a specific issue that was fixed by https://github.com/vmware-tanzu/velero/pull/485. func TestBackupUsesNewCohabitatingResourcesForEachBackup(t *testing.T) { - h := newHarness(t) + h := newHarness(t, nil) + defer h.itemBlockPool.Stop() // run and verify backup 1 backup1 := &Request{ Backup: defaultBackup().Result(), SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: h.itemBlockPool.GetInputChannel(), } backup1File := bytes.NewBuffer([]byte{}) @@ -1190,6 +1206,7 @@ func TestBackupUsesNewCohabitatingResourcesForEachBackup(t *testing.T) { Backup: defaultBackup().Result(), SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: h.itemBlockPool.GetInputChannel(), } backup2File := bytes.NewBuffer([]byte{}) @@ -1233,14 +1250,17 @@ func TestBackupResourceOrdering(t *testing.T) { }, } + itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger()) + defer itemBlockPool.Stop() for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { var ( - h = newHarness(t) + h = newHarness(t, itemBlockPool) req = &Request{ Backup: tc.backup, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: itemBlockPool.GetInputChannel(), } backupFile = bytes.NewBuffer([]byte{}) ) @@ -1341,6 +1361,9 @@ func (a *recordResourcesAction) WithSkippedCSISnapshotFlag(flag bool) *recordRes // TestBackupItemActionsForSkippedPV runs backups with backup item actions, and // verifies that the data in SkippedPVTracker is updated as expected. func TestBackupItemActionsForSkippedPV(t *testing.T) { + itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger()) + defer itemBlockPool.Stop() + tests := []struct { name string backupReq *Request @@ -1358,6 +1381,7 @@ func TestBackupItemActionsForSkippedPV(t *testing.T) { Backup: defaultBackup().SnapshotVolumes(false).Result(), SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: itemBlockPool.GetInputChannel(), }, resPolicies: &resourcepolicies.ResourcePolicies{ Version: "v1", @@ -1404,7 +1428,8 @@ func TestBackupItemActionsForSkippedPV(t *testing.T) { }, includedPVs: map[string]struct{}{}, }, - BackedUpItems: NewBackedUpItemsMap(), + BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: itemBlockPool.GetInputChannel(), }, apiResources: []*test.APIResource{ test.PVCs( @@ -1430,7 +1455,7 @@ func TestBackupItemActionsForSkippedPV(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(tt *testing.T) { var ( - h = newHarness(t) + h = newHarness(t, itemBlockPool) backupFile = bytes.NewBuffer([]byte{}) fakeClient = test.NewFakeControllerRuntimeClient(t, tc.runtimeResources...) ) @@ -1644,14 +1669,17 @@ func TestBackupActionsRunForCorrectItems(t *testing.T) { }, } + itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger()) + defer itemBlockPool.Stop() for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { var ( - h = newHarness(t) + h = newHarness(t, itemBlockPool) req = &Request{ Backup: tc.backup, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: itemBlockPool.GetInputChannel(), } backupFile = bytes.NewBuffer([]byte{}) ) @@ -1726,14 +1754,17 @@ func TestBackupWithInvalidActions(t *testing.T) { }, } + itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger()) + defer itemBlockPool.Stop() for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { var ( - h = newHarness(t) + h = newHarness(t, itemBlockPool) req = &Request{ Backup: tc.backup, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: itemBlockPool.GetInputChannel(), } backupFile = bytes.NewBuffer([]byte{}) ) @@ -1877,14 +1908,17 @@ func TestBackupActionModifications(t *testing.T) { }, } + itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger()) + defer itemBlockPool.Stop() for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { var ( - h = newHarness(t) + h = newHarness(t, itemBlockPool) req = &Request{ Backup: tc.backup, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: itemBlockPool.GetInputChannel(), } backupFile = bytes.NewBuffer([]byte{}) ) @@ -2134,14 +2168,17 @@ func TestBackupActionAdditionalItems(t *testing.T) { }, } + itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger()) + defer itemBlockPool.Stop() for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { var ( - h = newHarness(t) + h = newHarness(t, itemBlockPool) req = &Request{ Backup: tc.backup, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: itemBlockPool.GetInputChannel(), } backupFile = bytes.NewBuffer([]byte{}) ) @@ -2392,14 +2429,17 @@ func TestItemBlockActionsRunForCorrectItems(t *testing.T) { }, } + itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger()) + defer itemBlockPool.Stop() for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { var ( - h = newHarness(t) + h = newHarness(t, itemBlockPool) req = &Request{ Backup: tc.backup, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: itemBlockPool.GetInputChannel(), } backupFile = bytes.NewBuffer([]byte{}) ) @@ -2474,14 +2514,17 @@ func TestBackupWithInvalidItemBlockActions(t *testing.T) { }, } + itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger()) + defer itemBlockPool.Stop() for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { var ( - h = newHarness(t) + h = newHarness(t, itemBlockPool) req = &Request{ Backup: tc.backup, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: itemBlockPool.GetInputChannel(), } backupFile = bytes.NewBuffer([]byte{}) ) @@ -2727,14 +2770,17 @@ func TestItemBlockActionRelatedItems(t *testing.T) { }, } + itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger()) + defer itemBlockPool.Stop() for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { var ( - h = newHarness(t) + h = newHarness(t, itemBlockPool) req = &Request{ Backup: tc.backup, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: itemBlockPool.GetInputChannel(), } backupFile = bytes.NewBuffer([]byte{}) ) @@ -2883,6 +2929,8 @@ func (*fakeVolumeSnapshotter) DeleteSnapshot(snapshotID string) error { // struct in place of real volume snapshotters. func TestBackupWithSnapshots(t *testing.T) { // TODO: add more verification for skippedPVTracker + itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger()) + defer itemBlockPool.Stop() tests := []struct { name string req *Request @@ -2900,6 +2948,7 @@ func TestBackupWithSnapshots(t *testing.T) { }, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: itemBlockPool.GetInputChannel(), }, apiResources: []*test.APIResource{ test.PVs( @@ -2935,6 +2984,7 @@ func TestBackupWithSnapshots(t *testing.T) { }, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: itemBlockPool.GetInputChannel(), }, apiResources: []*test.APIResource{ test.PVs( @@ -2971,6 +3021,7 @@ func TestBackupWithSnapshots(t *testing.T) { }, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: itemBlockPool.GetInputChannel(), }, apiResources: []*test.APIResource{ test.PVs( @@ -3007,6 +3058,7 @@ func TestBackupWithSnapshots(t *testing.T) { }, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: itemBlockPool.GetInputChannel(), }, apiResources: []*test.APIResource{ test.PVs( @@ -3043,6 +3095,7 @@ func TestBackupWithSnapshots(t *testing.T) { }, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: itemBlockPool.GetInputChannel(), }, apiResources: []*test.APIResource{ test.PVs( @@ -3077,6 +3130,7 @@ func TestBackupWithSnapshots(t *testing.T) { }, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: itemBlockPool.GetInputChannel(), }, apiResources: []*test.APIResource{ test.PVs( @@ -3094,6 +3148,7 @@ func TestBackupWithSnapshots(t *testing.T) { Backup: defaultBackup().Result(), SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: itemBlockPool.GetInputChannel(), }, apiResources: []*test.APIResource{ test.PVs( @@ -3114,6 +3169,7 @@ func TestBackupWithSnapshots(t *testing.T) { }, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: itemBlockPool.GetInputChannel(), }, apiResources: []*test.APIResource{ test.PVs( @@ -3132,6 +3188,7 @@ func TestBackupWithSnapshots(t *testing.T) { }, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: itemBlockPool.GetInputChannel(), }, apiResources: []*test.APIResource{ test.PVs( @@ -3153,6 +3210,7 @@ func TestBackupWithSnapshots(t *testing.T) { }, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: itemBlockPool.GetInputChannel(), }, apiResources: []*test.APIResource{ test.PVs( @@ -3200,7 +3258,7 @@ func TestBackupWithSnapshots(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { var ( - h = newHarness(t) + h = newHarness(t, itemBlockPool) backupFile = bytes.NewBuffer([]byte{}) ) @@ -3271,6 +3329,8 @@ func TestBackupWithAsyncOperations(t *testing.T) { }, } + itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger()) + defer itemBlockPool.Stop() tests := []struct { name string req *Request @@ -3284,6 +3344,7 @@ func TestBackupWithAsyncOperations(t *testing.T) { Backup: defaultBackup().Result(), SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: itemBlockPool.GetInputChannel(), }, apiResources: []*test.APIResource{ test.Pods( @@ -3315,6 +3376,7 @@ func TestBackupWithAsyncOperations(t *testing.T) { Backup: defaultBackup().Result(), SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: itemBlockPool.GetInputChannel(), }, apiResources: []*test.APIResource{ test.Pods( @@ -3346,6 +3408,7 @@ func TestBackupWithAsyncOperations(t *testing.T) { Backup: defaultBackup().Result(), SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: itemBlockPool.GetInputChannel(), }, apiResources: []*test.APIResource{ test.Pods( @@ -3362,7 +3425,7 @@ func TestBackupWithAsyncOperations(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { var ( - h = newHarness(t) + h = newHarness(t, itemBlockPool) backupFile = bytes.NewBuffer([]byte{}) ) @@ -3421,14 +3484,17 @@ func TestBackupWithInvalidHooks(t *testing.T) { }, } + itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger()) + defer itemBlockPool.Stop() for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { var ( - h = newHarness(t) + h = newHarness(t, itemBlockPool) req = &Request{ Backup: tc.backup, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: itemBlockPool.GetInputChannel(), } backupFile = bytes.NewBuffer([]byte{}) ) @@ -3892,14 +3958,17 @@ func TestBackupWithHooks(t *testing.T) { }, } + itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger()) + defer itemBlockPool.Stop() for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { var ( - h = newHarness(t) + h = newHarness(t, itemBlockPool) req = &Request{ Backup: tc.backup, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: itemBlockPool.GetInputChannel(), } backupFile = bytes.NewBuffer([]byte{}) podCommandExecutor = new(test.MockPodCommandExecutor) @@ -4123,15 +4192,18 @@ func TestBackupWithPodVolume(t *testing.T) { }, } + itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger()) + defer itemBlockPool.Stop() for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { var ( - h = newHarness(t) + h = newHarness(t, itemBlockPool) req = &Request{ Backup: tc.backup, SnapshotLocations: []*velerov1.VolumeSnapshotLocation{tc.vsl}, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: itemBlockPool.GetInputChannel(), } backupFile = bytes.NewBuffer([]byte{}) ) @@ -4216,8 +4288,9 @@ func (a *pluggableIBA) Name() string { type harness struct { *test.APIServer - backupper *kubernetesBackupper - log logrus.FieldLogger + backupper *kubernetesBackupper + log logrus.FieldLogger + itemBlockPool ItemBlockWorkerPool } func (h *harness) addItems(t *testing.T, resource *test.APIResource) { @@ -4241,7 +4314,7 @@ func (h *harness) addItems(t *testing.T, resource *test.APIResource) { } } -func newHarness(t *testing.T) *harness { +func newHarness(t *testing.T, itemBlockPool *ItemBlockWorkerPool) *harness { t.Helper() apiServer := test.NewAPIServer(t) @@ -4250,6 +4323,9 @@ func newHarness(t *testing.T) *harness { discoveryHelper, err := discovery.NewHelper(apiServer.DiscoveryClient, log) require.NoError(t, err) + if itemBlockPool == nil { + itemBlockPool = StartItemBlockWorkerPool(context.Background(), 1, log) + } return &harness{ APIServer: apiServer, backupper: &kubernetesBackupper{ @@ -4262,7 +4338,8 @@ func newHarness(t *testing.T) *harness { podVolumeBackupperFactory: new(fakePodVolumeBackupperFactory), podVolumeTimeout: 60 * time.Second, }, - log: log, + log: log, + itemBlockPool: *itemBlockPool, } } @@ -5235,14 +5312,17 @@ func TestBackupNewResourceFiltering(t *testing.T) { }, } + itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger()) + defer itemBlockPool.Stop() for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { var ( - h = newHarness(t) + h = newHarness(t, itemBlockPool) req = &Request{ Backup: tc.backup, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: itemBlockPool.GetInputChannel(), } backupFile = bytes.NewBuffer([]byte{}) ) @@ -5397,14 +5477,17 @@ func TestBackupNamespaces(t *testing.T) { }, } + itemBlockPool := StartItemBlockWorkerPool(context.Background(), 1, logrus.StandardLogger()) + defer itemBlockPool.Stop() for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { var ( - h = newHarness(t) + h = newHarness(t, itemBlockPool) req = &Request{ Backup: tc.backup, SkippedPVTracker: NewSkipPVTracker(), BackedUpItems: NewBackedUpItemsMap(), + ItemBlockChannel: itemBlockPool.GetInputChannel(), } backupFile = bytes.NewBuffer([]byte{}) ) diff --git a/pkg/backup/item_backupper.go b/pkg/backup/item_backupper.go index 9e5caef0ed..4140c4b92c 100644 --- a/pkg/backup/item_backupper.go +++ b/pkg/backup/item_backupper.go @@ -71,6 +71,7 @@ type itemBackupper struct { podVolumeBackupper podvolume.Backupper podVolumeSnapshotTracker *podvolume.Tracker volumeSnapshotterGetter VolumeSnapshotterGetter + kubernetesBackupper *kubernetesBackupper itemHookHandler hook.ItemHookHandler snapshotLocationVolumeSnapshotters map[string]vsv1.VolumeSnapshotter @@ -95,6 +96,8 @@ func (ib *itemBackupper) backupItem(logger logrus.FieldLogger, obj runtime.Unstr if !selectedForBackup || err != nil || len(files) == 0 || finalize { return selectedForBackup, files, err } + ib.tarWriter.Lock() + defer ib.tarWriter.Unlock() for _, file := range files { if err := ib.tarWriter.WriteHeader(file.Header); err != nil { return false, []FileForArchive{}, errors.WithStack(err) diff --git a/pkg/backup/item_block_worker_pool.go b/pkg/backup/item_block_worker_pool.go new file mode 100644 index 0000000000..b2978dd5b8 --- /dev/null +++ b/pkg/backup/item_block_worker_pool.go @@ -0,0 +1,98 @@ +/* +Copyright the Velero Contributors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package backup + +import ( + "context" + "sync" + + "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +type ItemBlockWorkerPool struct { + inputChannel chan ItemBlockInput + wg *sync.WaitGroup + logger logrus.FieldLogger + cancelFunc context.CancelFunc +} + +type ItemBlockInput struct { + itemBlock *BackupItemBlock + returnChan chan ItemBlockReturn +} + +type ItemBlockReturn struct { + itemBlock *BackupItemBlock + resources []schema.GroupResource + err error +} + +func (p *ItemBlockWorkerPool) GetInputChannel() chan ItemBlockInput { + return p.inputChannel +} + +func StartItemBlockWorkerPool(ctx context.Context, workers int, log logrus.FieldLogger) *ItemBlockWorkerPool { + // Buffer will hold up to 10 ItemBlocks waiting for processing + inputChannel := make(chan ItemBlockInput, 10) + + ctx, cancelFunc := context.WithCancel(ctx) + wg := &sync.WaitGroup{} + + for i := 0; i < workers; i++ { + logger := log.WithField("worker", i) + wg.Add(1) + go processItemBlockWorker(ctx, inputChannel, logger, wg) + } + + pool := &ItemBlockWorkerPool{ + inputChannel: inputChannel, + cancelFunc: cancelFunc, + logger: log, + wg: wg, + } + return pool +} + +func (p *ItemBlockWorkerPool) Stop() { + p.cancelFunc() + p.logger.Info("ItemBlock worker stopping") + p.wg.Wait() +} + +func processItemBlockWorker(ctx context.Context, + inputChannel chan ItemBlockInput, + logger logrus.FieldLogger, + wg *sync.WaitGroup) { + for { + select { + case m := <-inputChannel: + logger.Infof("processing ItemBlock for backup %v", m.itemBlock.itemBackupper.backupRequest.Name) + grList := m.itemBlock.itemBackupper.kubernetesBackupper.backupItemBlock(m.itemBlock) + logger.Infof("finished processing ItemBlock for backup %v", m.itemBlock.itemBackupper.backupRequest.Name) + m.returnChan <- ItemBlockReturn{ + itemBlock: m.itemBlock, + resources: grList, + err: nil, + } + case <-ctx.Done(): + logger.Info("stopping ItemBlock worker") + wg.Done() + return + } + } +} diff --git a/pkg/backup/item_collector.go b/pkg/backup/item_collector.go index 903b0e991f..2eb9079f50 100644 --- a/pkg/backup/item_collector.go +++ b/pkg/backup/item_collector.go @@ -179,6 +179,8 @@ type kubernetesResource struct { // set to true during backup processing when added to an ItemBlock // or if the item is excluded from backup. inItemBlockOrExcluded bool + // Kind is added to facilitate creating an itemKey for progress tracking + kind string } // getItemsFromResourceIdentifiers get the kubernetesResources @@ -407,6 +409,7 @@ func (r *itemCollector) getResourceItems( namespace: resourceID.Namespace, name: resourceID.Name, path: path, + kind: resource.Kind, }) } @@ -480,6 +483,7 @@ func (r *itemCollector) getResourceItems( namespace: item.GetNamespace(), name: item.GetName(), path: path, + kind: resource.Kind, }) if item.GetNamespace() != "" { @@ -806,6 +810,7 @@ func (r *itemCollector) collectNamespaces( preferredGVR: preferredGVR, name: unstructuredList.Items[index].GetName(), path: path, + kind: resource.Kind, }) } diff --git a/pkg/backup/itemblock.go b/pkg/backup/itemblock.go index dee553f721..f1d8b87118 100644 --- a/pkg/backup/itemblock.go +++ b/pkg/backup/itemblock.go @@ -17,6 +17,7 @@ limitations under the License. package backup import ( + "context" "encoding/json" "os" @@ -32,12 +33,14 @@ type BackupItemBlock struct { itemblock.ItemBlock // This is a reference to the shared itemBackupper for the backup itemBackupper *itemBackupper + pvbTimeoutCtx context.Context } -func NewBackupItemBlock(log logrus.FieldLogger, itemBackupper *itemBackupper) *BackupItemBlock { +func NewBackupItemBlock(log logrus.FieldLogger, itemBackupper *itemBackupper, pvbTimeoutCtx context.Context) *BackupItemBlock { return &BackupItemBlock{ ItemBlock: itemblock.ItemBlock{Log: log}, itemBackupper: itemBackupper, + pvbTimeoutCtx: pvbTimeoutCtx, } } diff --git a/pkg/backup/request.go b/pkg/backup/request.go index f89510933d..3ec05ee044 100644 --- a/pkg/backup/request.go +++ b/pkg/backup/request.go @@ -51,6 +51,7 @@ type Request struct { ResPolicies *resourcepolicies.Policies SkippedPVTracker *skipPVTracker VolumesInformation volume.BackupVolumesInformation + ItemBlockChannel chan ItemBlockInput } // BackupVolumesInformation contains the information needs by generating diff --git a/pkg/controller/backup_controller.go b/pkg/controller/backup_controller.go index 61db0b2360..414582c8fa 100644 --- a/pkg/controller/backup_controller.go +++ b/pkg/controller/backup_controller.go @@ -87,6 +87,7 @@ type backupReconciler struct { defaultSnapshotMoveData bool globalCRClient kbclient.Client itemBlockWorkerCount int + workerPool *pkgbackup.ItemBlockWorkerPool } func NewBackupReconciler( @@ -139,6 +140,7 @@ func NewBackupReconciler( defaultSnapshotMoveData: defaultSnapshotMoveData, itemBlockWorkerCount: itemBlockWorkerCount, globalCRClient: globalCRClient, + workerPool: pkgbackup.StartItemBlockWorkerPool(ctx, itemBlockWorkerCount, logger), } b.updateTotalBackupMetric() return b @@ -329,6 +331,7 @@ func (b *backupReconciler) prepareBackupRequest(backup *velerov1api.Backup, logg Backup: backup.DeepCopy(), // don't modify items in the cache SkippedPVTracker: pkgbackup.NewSkipPVTracker(), BackedUpItems: pkgbackup.NewBackedUpItemsMap(), + ItemBlockChannel: b.workerPool.GetInputChannel(), } request.VolumesInformation.Init() diff --git a/pkg/controller/backup_controller_test.go b/pkg/controller/backup_controller_test.go index 44cbb6dc52..ddf2d13297 100644 --- a/pkg/controller/backup_controller_test.go +++ b/pkg/controller/backup_controller_test.go @@ -137,7 +137,9 @@ func TestProcessBackupNonProcessedItems(t *testing.T) { kbClient: velerotest.NewFakeControllerRuntimeClient(t), formatFlag: formatFlag, logger: logger, + workerPool: pkgbackup.StartItemBlockWorkerPool(context.Background(), 1, logger), } + defer c.workerPool.Stop() if test.backup != nil { require.NoError(t, c.kbClient.Create(context.Background(), test.backup)) } @@ -226,7 +228,9 @@ func TestProcessBackupValidationFailures(t *testing.T) { clock: &clock.RealClock{}, formatFlag: formatFlag, metrics: metrics.NewServerMetrics(), + workerPool: pkgbackup.StartItemBlockWorkerPool(context.Background(), 1, logger), } + defer c.workerPool.Stop() require.NotNil(t, test.backup) require.NoError(t, c.kbClient.Create(context.Background(), test.backup)) @@ -289,7 +293,9 @@ func TestBackupLocationLabel(t *testing.T) { defaultBackupLocation: test.backupLocation.Name, clock: &clock.RealClock{}, formatFlag: formatFlag, + workerPool: pkgbackup.StartItemBlockWorkerPool(context.Background(), 1, logger), } + defer c.workerPool.Stop() res := c.prepareBackupRequest(test.backup, logger) assert.NotNil(t, res) @@ -384,7 +390,9 @@ func Test_prepareBackupRequest_BackupStorageLocation(t *testing.T) { defaultBackupTTL: defaultBackupTTL.Duration, clock: testclocks.NewFakeClock(now), formatFlag: formatFlag, + workerPool: pkgbackup.StartItemBlockWorkerPool(context.Background(), 1, logger), } + defer c.workerPool.Stop() test.backup.Spec.StorageLocation = test.backupLocationNameInBackup @@ -460,7 +468,9 @@ func TestDefaultBackupTTL(t *testing.T) { defaultBackupTTL: defaultBackupTTL.Duration, clock: testclocks.NewFakeClock(now), formatFlag: formatFlag, + workerPool: pkgbackup.StartItemBlockWorkerPool(context.Background(), 1, logger), } + defer c.workerPool.Stop() res := c.prepareBackupRequest(test.backup, logger) assert.NotNil(t, res) @@ -560,7 +570,9 @@ func TestDefaultVolumesToResticDeprecation(t *testing.T) { clock: &clock.RealClock{}, formatFlag: formatFlag, defaultVolumesToFsBackup: test.globalVal, + workerPool: pkgbackup.StartItemBlockWorkerPool(context.Background(), 1, logger), } + defer c.workerPool.Stop() res := c.prepareBackupRequest(test.backup, logger) assert.NotNil(t, res) @@ -1345,7 +1357,9 @@ func TestProcessBackupCompletions(t *testing.T) { backupper: backupper, formatFlag: formatFlag, globalCRClient: fakeGlobalClient, + workerPool: pkgbackup.StartItemBlockWorkerPool(context.Background(), 1, logger), } + defer c.workerPool.Stop() pluginManager.On("GetBackupItemActionsV2").Return(nil, nil) pluginManager.On("GetItemBlockActions").Return(nil, nil) @@ -1539,7 +1553,9 @@ func TestValidateAndGetSnapshotLocations(t *testing.T) { logger: logger, defaultSnapshotLocations: test.defaultLocations, kbClient: velerotest.NewFakeControllerRuntimeClient(t), + workerPool: pkgbackup.StartItemBlockWorkerPool(context.Background(), 1, logger), } + defer c.workerPool.Stop() // set up a Backup object to represent what we expect to be passed to backupper.Backup() backup := test.backup.DeepCopy() diff --git a/pkg/podvolume/snapshot_tracker.go b/pkg/podvolume/snapshot_tracker.go index 9263065782..d567946173 100644 --- a/pkg/podvolume/snapshot_tracker.go +++ b/pkg/podvolume/snapshot_tracker.go @@ -18,6 +18,7 @@ package podvolume import ( "fmt" + "sync" corev1api "k8s.io/api/core/v1" ) @@ -27,6 +28,7 @@ import ( type Tracker struct { pvcs map[string]pvcSnapshotStatus pvcPod map[string]string + *sync.RWMutex } type pvcSnapshotStatus int @@ -42,7 +44,8 @@ func NewTracker() *Tracker { return &Tracker{ pvcs: make(map[string]pvcSnapshotStatus), // key: pvc ns/name, value: pod name - pvcPod: make(map[string]string), + pvcPod: make(map[string]string), + RWMutex: &sync.RWMutex{}, } } @@ -64,6 +67,8 @@ func (t *Tracker) Optout(pod *corev1api.Pod, volumeName string) { // OptedoutByPod returns true if the PVC with the specified namespace and name has been opted out by the pod. The // second return value is the name of the pod which has the annotation that opted out the volume/pvc func (t *Tracker) OptedoutByPod(namespace, name string) (bool, string) { + t.RLock() + defer t.RUnlock() status, found := t.pvcs[key(namespace, name)] if !found || status != pvcSnapshotStatusOptedout { @@ -74,6 +79,8 @@ func (t *Tracker) OptedoutByPod(namespace, name string) (bool, string) { // if the volume is a PVC, record the status and the related pod func (t *Tracker) recordStatus(pod *corev1api.Pod, volumeName string, status pvcSnapshotStatus, preReqStatus pvcSnapshotStatus) { + t.Lock() + defer t.Unlock() for _, volume := range pod.Spec.Volumes { if volume.Name == volumeName { if volume.PersistentVolumeClaim != nil { @@ -93,6 +100,8 @@ func (t *Tracker) recordStatus(pod *corev1api.Pod, volumeName string, status pvc // Has returns true if the PVC with the specified namespace and name has been tracked. func (t *Tracker) Has(namespace, name string) bool { + t.RLock() + defer t.RUnlock() status, found := t.pvcs[key(namespace, name)] return found && (status == pvcSnapshotStatusTracked || status == pvcSnapshotStatusTaken) } @@ -100,6 +109,8 @@ func (t *Tracker) Has(namespace, name string) bool { // TakenForPodVolume returns true and the PVC's name if the pod volume with the specified name uses a // PVC and that PVC has been taken by pod volume backup. func (t *Tracker) TakenForPodVolume(pod *corev1api.Pod, volume string) (bool, string) { + t.RLock() + defer t.RUnlock() for _, podVolume := range pod.Spec.Volumes { if podVolume.Name != volume { continue