Skip to content

Commit

Permalink
SKS-1707: Optimize the creation/power on of virtual machines when Pla…
Browse files Browse the repository at this point in the history
…cementGroupFilter detected for placement group (#134)
  • Loading branch information
haijianyang authored Aug 15, 2023
1 parent 8a5f91a commit 6c44f6e
Show file tree
Hide file tree
Showing 5 changed files with 365 additions and 106 deletions.
41 changes: 30 additions & 11 deletions controllers/elfmachine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -480,13 +480,17 @@ func (r *ElfMachineReconciler) reconcileVM(ctx *context.MachineContext) (*models
return nil, false, errors.New("bootstrapData is empty")
}

if ok := isElfClusterMemoryInsufficient(ctx.ElfCluster.Spec.Cluster); ok {
if canRetry := canRetryVMOperation(ctx.ElfCluster.Spec.Cluster); !canRetry {
ctx.Logger.V(1).Info(fmt.Sprintf("Insufficient memory for ELF cluster %s, skip creating VM", ctx.ElfCluster.Spec.Cluster))
if ok, message, err := isELFScheduleVMErrorRecorded(ctx); err != nil {
return nil, false, err
} else if ok {
if canRetry, err := canRetryVMOperation(ctx); err != nil {
return nil, false, err
} else if !canRetry {
ctx.Logger.V(1).Info(fmt.Sprintf("%s, skip creating VM", message))
return nil, false, nil
}

ctx.Logger.V(1).Info(fmt.Sprintf("Insufficient memory for ELF cluster %s, try to create VM", ctx.ElfCluster.Spec.Cluster))
ctx.Logger.V(1).Info(fmt.Sprintf("%s and the retry silence period passes, will try to create the VM again", message))
}

// Only limit the virtual machines of the worker nodes
Expand Down Expand Up @@ -737,14 +741,18 @@ func (r *ElfMachineReconciler) powerOffVM(ctx *context.MachineContext) error {
}

func (r *ElfMachineReconciler) powerOnVM(ctx *context.MachineContext) error {
if ok := isElfClusterMemoryInsufficient(ctx.ElfCluster.Spec.Cluster); ok {
if canRetry := canRetryVMOperation(ctx.ElfCluster.Spec.Cluster); !canRetry {
ctx.Logger.V(1).Info(fmt.Sprintf("Insufficient memory for ELF cluster %s, skip powering on VM %s", ctx.ElfCluster.Spec.Cluster, ctx.ElfMachine.Status.VMRef))
if ok, message, err := isELFScheduleVMErrorRecorded(ctx); err != nil {
return err
} else if ok {
if canRetry, err := canRetryVMOperation(ctx); err != nil {
return err
} else if !canRetry {
ctx.Logger.V(1).Info(fmt.Sprintf("%s, skip powering on VM %s", message, ctx.ElfMachine.Status.VMRef))

return nil
}

ctx.Logger.V(1).Info(fmt.Sprintf("Insufficient memory for the ELF cluster %s was detected previously, try to power on VM %s to check if the ELF cluster has sufficient memory now", ctx.ElfCluster.Spec.Cluster, ctx.ElfMachine.Status.VMRef))
ctx.Logger.V(1).Info(fmt.Sprintf("%s and the retry silence period passes, will try to power on the VM again", message))
}

if ok := acquireTicketForUpdatingVM(ctx.ElfMachine.Name); !ok {
Expand Down Expand Up @@ -843,8 +851,16 @@ func (r *ElfMachineReconciler) reconcileVMTask(ctx *context.MachineContext, vm *
case service.IsCloneVMTask(task):
releaseTicketForCreateVM(ctx.ElfMachine.Name)
case service.IsMemoryInsufficientError(errorMessage):
setElfClusterMemoryInsufficient(ctx.ElfCluster.Spec.Cluster, true)
message := fmt.Sprintf("Insufficient memory detected for ELF cluster %s", ctx.ElfCluster.Spec.Cluster)
recordElfClusterMemoryInsufficient(ctx, true)
message := fmt.Sprintf("Insufficient memory detected for the ELF cluster %s", ctx.ElfCluster.Spec.Cluster)
ctx.Logger.Info(message)

return true, errors.New(message)
case service.IsPlacementGroupError(errorMessage):
if err := recordPlacementGroupPolicyNotSatisfied(ctx, true); err != nil {
return true, err
}
message := "The placement group policy can not be satisfied"
ctx.Logger.Info(message)

return true, errors.New(message)
Expand All @@ -853,8 +869,11 @@ func (r *ElfMachineReconciler) reconcileVMTask(ctx *context.MachineContext, vm *
ctx.Logger.Info("VM task succeeded", "vmRef", vmRef, "taskRef", taskRef, "taskDescription", service.GetTowerString(task.Description))

if service.IsCloneVMTask(task) || service.IsPowerOnVMTask(task) {
setElfClusterMemoryInsufficient(ctx.ElfCluster.Spec.Cluster, false)
releaseTicketForCreateVM(ctx.ElfMachine.Name)
recordElfClusterMemoryInsufficient(ctx, false)
if err := recordPlacementGroupPolicyNotSatisfied(ctx, false); err != nil {
return true, err
}
}
default:
ctx.Logger.Info("Waiting for VM task done", "vmRef", vmRef, "taskRef", taskRef, "taskStatus", service.GetTowerTaskStatus(task.Status), "taskDescription", service.GetTowerString(task.Description))
Expand Down
97 changes: 94 additions & 3 deletions controllers/elfmachine_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,27 +267,44 @@ var _ = Describe("ElfMachineReconciler", func() {
})

It("should create a new VM if none exists", func() {
resetClusterResourceMap()
vm := fake.NewTowerVM()
vm.Name = &elfMachine.Name
elfCluster.Spec.Cluster = clusterKey
task := fake.NewTowerTask()
withTaskVM := fake.NewWithTaskVM(vm, task)
ctrlContext := newCtrlContexts(elfCluster, cluster, elfMachine, machine, secret, md)
fake.InitOwnerReferences(ctrlContext, elfCluster, cluster, elfMachine, machine)

machineContext := newMachineContext(ctrlContext, elfCluster, cluster, elfMachine, machine, mockVMService)
machineContext.VMService = mockVMService
recordIsUnmet(machineContext, clusterKey, true)
reconciler := &ElfMachineReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService}
elfMachineKey := capiutil.ObjectKey(elfMachine)
result, err := reconciler.Reconcile(ctx, ctrl.Request{NamespacedName: elfMachineKey})
Expect(result.RequeueAfter).NotTo(BeZero())
Expect(err).NotTo(HaveOccurred())
Expect(logBuffer.String()).To(ContainSubstring("Insufficient memory detected for the ELF cluster"))

logBuffer = new(bytes.Buffer)
klog.SetOutput(logBuffer)
mockVMService.EXPECT().Clone(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(withTaskVM, nil)
mockVMService.EXPECT().Get(*vm.ID).Return(vm, nil)
mockVMService.EXPECT().GetTask(*task.ID).Return(task, nil)
mockVMService.EXPECT().GetVMPlacementGroup(gomock.Any()).Return(placementGroup, nil)
expireELFScheduleVMError(machineContext, clusterKey)

reconciler := &ElfMachineReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService}
elfMachineKey := capiutil.ObjectKey(elfMachine)
result, err := reconciler.Reconcile(ctx, ctrl.Request{NamespacedName: elfMachineKey})
reconciler = &ElfMachineReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService}
result, err = reconciler.Reconcile(ctx, ctrl.Request{NamespacedName: elfMachineKey})
Expect(result.RequeueAfter).NotTo(BeZero())
Expect(err).ShouldNot(HaveOccurred())
Expect(logBuffer.String()).To(ContainSubstring("and the retry silence period passes, will try to create the VM again"))
Expect(logBuffer.String()).To(ContainSubstring("Waiting for VM task done"))
elfMachine = &infrav1.ElfMachine{}
Expect(reconciler.Client.Get(reconciler, elfMachineKey, elfMachine)).To(Succeed())
Expect(elfMachine.Status.VMRef).To(Equal(*vm.ID))
Expect(elfMachine.Status.TaskRef).To(Equal(*task.ID))
resetClusterResourceMap()
})

It("should recover from lost task", func() {
Expand Down Expand Up @@ -789,6 +806,33 @@ var _ = Describe("ElfMachineReconciler", func() {
Expect(err).NotTo(HaveOccurred())
expectConditions(elfMachine, []conditionAssertion{{infrav1.VMProvisionedCondition, corev1.ConditionFalse, clusterv1.ConditionSeverityInfo, infrav1.PowerOffReason}})
})

Context("powerOnVM", func() {
It("should", func() {
resetClusterResourceMap()
vm := fake.NewTowerVM()
elfMachine.Status.VMRef = *vm.LocalID
elfCluster.Spec.Cluster = clusterKey
ctrlContext := newCtrlContexts(elfCluster, cluster, elfMachine, machine, secret, md)
fake.InitOwnerReferences(ctrlContext, elfCluster, cluster, elfMachine, machine)
machineContext := newMachineContext(ctrlContext, elfCluster, cluster, elfMachine, machine, mockVMService)
machineContext.VMService = mockVMService
recordIsUnmet(machineContext, clusterKey, true)
reconciler := &ElfMachineReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService}
err := reconciler.powerOnVM(machineContext)
Expect(err).NotTo(HaveOccurred())
Expect(logBuffer.String()).To(ContainSubstring("Insufficient memory detected for the ELF cluster"))

task := fake.NewTowerTask()
mockVMService.EXPECT().PowerOn(elfMachine.Status.VMRef).Return(task, nil)
expireELFScheduleVMError(machineContext, clusterKey)
err = reconciler.powerOnVM(machineContext)
Expect(err).NotTo(HaveOccurred())
Expect(logBuffer.String()).To(ContainSubstring("and the retry silence period passes, will try to power on the VM again"))
expectConditions(elfMachine, []conditionAssertion{{infrav1.VMProvisionedCondition, corev1.ConditionFalse, clusterv1.ConditionSeverityInfo, infrav1.PoweringOnReason}})
resetClusterResourceMap()
})
})
})

Context("Reconcile Join Placement Group", func() {
Expand Down Expand Up @@ -2646,6 +2690,53 @@ var _ = Describe("ElfMachineReconciler", func() {
Expect(strings.Contains(err.Error(), "failed to get task")).To(BeTrue())
Expect(elfMachine.Status.TaskRef).To(Equal(*task.ID))
})

It("should handle failed/succeeded task", func() {
resetClusterResourceMap()
task := fake.NewTowerTask()
task.Status = models.NewTaskStatus(models.TaskStatusFAILED)
task.ErrorMessage = service.TowerString(service.MemoryInsufficientError)
elfMachine.Status.TaskRef = *task.ID
ctrlContext := newCtrlContexts(elfCluster, cluster, elfMachine, machine, secret, md)
fake.InitOwnerReferences(ctrlContext, elfCluster, cluster, elfMachine, machine)
machineContext := newMachineContext(ctrlContext, elfCluster, cluster, elfMachine, machine, mockVMService)
machineContext.VMService = mockVMService
mockVMService.EXPECT().GetTask(elfMachine.Status.TaskRef).AnyTimes().Return(task, nil)

reconciler := &ElfMachineReconciler{ControllerContext: ctrlContext, NewVMService: mockNewVMService}
ok, err := reconciler.reconcileVMTask(machineContext, nil)
Expect(ok).Should(BeTrue())
Expect(err.Error()).To(ContainSubstring("Insufficient memory detected for the ELF cluster"))
Expect(elfMachine.Status.TaskRef).To(Equal(""))
Expect(logBuffer.String()).To(ContainSubstring("VM task failed"))

logBuffer = new(bytes.Buffer)
klog.SetOutput(logBuffer)
task.ErrorMessage = service.TowerString(service.PlacementGroupMustError)
elfMachine.Status.TaskRef = *task.ID
ok, err = reconciler.reconcileVMTask(machineContext, nil)
Expect(ok).Should(BeTrue())
Expect(err.Error()).To(ContainSubstring("The placement group policy can not be satisfied"))
Expect(logBuffer.String()).To(ContainSubstring("VM task failed"))

ok, msg, err := isELFScheduleVMErrorRecorded(machineContext)
Expect(ok).To(BeTrue())
Expect(msg).To(ContainSubstring("Insufficient memory detected for the ELF cluster"))
Expect(err).ShouldNot(HaveOccurred())

task.Status = models.NewTaskStatus(models.TaskStatusSUCCESSED)
task.Description = service.TowerString("Start VM")
elfMachine.Status.TaskRef = *task.ID
ok, err = reconciler.reconcileVMTask(machineContext, nil)
Expect(ok).Should(BeTrue())
Expect(err).ShouldNot(HaveOccurred())
Expect(logBuffer.String()).To(ContainSubstring("VM task succeeded"))

ok, msg, err = isELFScheduleVMErrorRecorded(machineContext)
Expect(ok).To(BeFalse())
Expect(msg).To(Equal(""))
Expect(err).ShouldNot(HaveOccurred())
})
})

Context("Reconcile Node", func() {
Expand Down
111 changes: 80 additions & 31 deletions controllers/tower_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,82 +17,131 @@ limitations under the License.
package controllers

import (
"fmt"
"sync"
"time"

"github.com/smartxworks/cluster-api-provider-elf/pkg/context"
towerresources "github.com/smartxworks/cluster-api-provider-elf/pkg/resources"
)

const (
silenceTime = time.Minute * 5
)

var clusterStatusMap = make(map[string]*clusterStatus)
var clusterResourceMap = make(map[string]*clusterResource)
var lock sync.RWMutex

type clusterStatus struct {
Resources resources
}

type resources struct {
IsMemoryInsufficient bool
// LastDetected records the last memory detection time
type clusterResource struct {
// IsUnmet indicates whether the resource does not meet the requirement.
// For example, true can indicate insufficient memory and not satisfy placement group policy.
IsUnmet bool
// LastDetected records the last resource detection time
LastDetected time.Time
// LastRetried records the time of the last attempt to detect memory
// LastRetried records the time of the last attempt to detect resource
LastRetried time.Time
}

// isElfClusterMemoryInsufficient returns whether the ELF cluster has insufficient memory.
func isElfClusterMemoryInsufficient(clusterID string) bool {
// isELFScheduleVMErrorRecorded returns whether the ELF cluster has failed scheduling virtual machine errors.
//
// Includes these scenarios:
// 1. ELF cluster has insufficient memory.
// 2. Placement group not satisfy policy.
func isELFScheduleVMErrorRecorded(ctx *context.MachineContext) (bool, string, error) {
lock.RLock()
defer lock.RUnlock()

if status, ok := clusterStatusMap[clusterID]; ok {
return status.Resources.IsMemoryInsufficient
if resource, ok := clusterResourceMap[getMemoryKey(ctx.ElfCluster.Spec.Cluster)]; ok && resource.IsUnmet {
return true, fmt.Sprintf("Insufficient memory detected for the ELF cluster %s", ctx.ElfCluster.Spec.Cluster), nil
}

return false
placementGroupName, err := towerresources.GetVMPlacementGroupName(ctx, ctx.Client, ctx.Machine, ctx.Cluster)
if err != nil {
return false, "", err
}

if resource, ok := clusterResourceMap[getPlacementGroupKey(placementGroupName)]; ok && resource.IsUnmet {
return true, fmt.Sprintf("Not satisfy policy detected for the placement group %s", placementGroupName), nil
}

return false, "", nil
}

// setElfClusterMemoryInsufficient sets whether the memory is insufficient.
func setElfClusterMemoryInsufficient(clusterID string, isInsufficient bool) {
// recordElfClusterMemoryInsufficient records whether the memory is insufficient.
func recordElfClusterMemoryInsufficient(ctx *context.MachineContext, isInsufficient bool) {
lock.Lock()
defer lock.Unlock()

now := time.Now()
resources := resources{
IsMemoryInsufficient: isInsufficient,
LastDetected: now,
LastRetried: now,
clusterResourceMap[getMemoryKey(ctx.ElfCluster.Spec.Cluster)] = newClusterResource(isInsufficient)
}

// recordPlacementGroupPolicyNotSatisfied records whether the placement group not satisfy policy.
func recordPlacementGroupPolicyNotSatisfied(ctx *context.MachineContext, isNotSatisfiedPolicy bool) error {
lock.Lock()
defer lock.Unlock()

placementGroupName, err := towerresources.GetVMPlacementGroupName(ctx, ctx.Client, ctx.Machine, ctx.Cluster)
if err != nil {
return err
}

if status, ok := clusterStatusMap[clusterID]; ok {
status.Resources = resources
} else {
clusterStatusMap[clusterID] = &clusterStatus{Resources: resources}
clusterResourceMap[getPlacementGroupKey(placementGroupName)] = newClusterResource(isNotSatisfiedPolicy)

return nil
}

func newClusterResource(isUnmet bool) *clusterResource {
now := time.Now()
return &clusterResource{
IsUnmet: isUnmet,
LastDetected: now,
LastRetried: now,
}
}

// canRetryVMOperation returns whether virtual machine operations(Create/PowerOn)
// can be performed.
func canRetryVMOperation(clusterID string) bool {
func canRetryVMOperation(ctx *context.MachineContext) (bool, error) {
lock.Lock()
defer lock.Unlock()

if status, ok := clusterStatusMap[clusterID]; ok {
if !status.Resources.IsMemoryInsufficient {
if ok := canRetry(getMemoryKey(ctx.ElfCluster.Spec.Cluster)); ok {
return true, nil
}

placementGroupName, err := towerresources.GetVMPlacementGroupName(ctx, ctx.Client, ctx.Machine, ctx.Cluster)
if err != nil {
return false, err
}

return canRetry(getPlacementGroupKey(placementGroupName)), nil
}

func canRetry(key string) bool {
if resource, ok := clusterResourceMap[key]; ok {
if !resource.IsUnmet {
return false
}

if time.Now().Before(status.Resources.LastDetected.Add(silenceTime)) {
if time.Now().Before(resource.LastDetected.Add(silenceTime)) {
return false
} else {
if time.Now().Before(status.Resources.LastRetried.Add(silenceTime)) {
if time.Now().Before(resource.LastRetried.Add(silenceTime)) {
return false
} else {
status.Resources.LastRetried = time.Now()
resource.LastRetried = time.Now()
return true
}
}
}

return false
}

func getMemoryKey(clusterID string) string {
return fmt.Sprintf("%s-memory", clusterID)
}

func getPlacementGroupKey(placementGroup string) string {
return fmt.Sprintf("%s-placement-group", placementGroup)
}
Loading

0 comments on commit 6c44f6e

Please sign in to comment.