diff --git a/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go b/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go index e6ff41de6279..6ea6a3660787 100644 --- a/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go +++ b/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go @@ -24,8 +24,12 @@ package mcm import ( "context" "fmt" + "github.com/gardener/machine-controller-manager/pkg/apis/machine/v1alpha1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "slices" "strconv" "strings" + "sync" "time" apiv1 "k8s.io/api/core/v1" @@ -34,7 +38,6 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" "k8s.io/autoscaler/cluster-autoscaler/config" - "k8s.io/autoscaler/cluster-autoscaler/config/dynamic" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "k8s.io/klog/v2" schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework" @@ -67,15 +70,14 @@ const ( // MCMCloudProvider implements the cloud provider interface for machine-controller-manager // Reference: https://github.com/gardener/machine-controller-manager type mcmCloudProvider struct { - mcmManager *McmManager - machinedeployments map[types.NamespacedName]*MachineDeployment - resourceLimiter *cloudprovider.ResourceLimiter + mcmManager *McmManager + resourceLimiter *cloudprovider.ResourceLimiter } // BuildMcmCloudProvider builds CloudProvider implementation for machine-controller-manager. func BuildMcmCloudProvider(mcmManager *McmManager, resourceLimiter *cloudprovider.ResourceLimiter) (cloudprovider.CloudProvider, error) { if mcmManager.discoveryOpts.StaticDiscoverySpecified() { - return buildStaticallyDiscoveringProvider(mcmManager, mcmManager.discoveryOpts.NodeGroupSpecs, resourceLimiter) + return buildStaticallyDiscoveringProvider(mcmManager, resourceLimiter) } return nil, fmt.Errorf("Failed to build an mcm cloud provider: Either node group specs or node group auto discovery spec must be specified") } @@ -96,51 +98,28 @@ func BuildMCM(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscover return provider } -func buildStaticallyDiscoveringProvider(mcmManager *McmManager, specs []string, resourceLimiter *cloudprovider.ResourceLimiter) (*mcmCloudProvider, error) { +func buildStaticallyDiscoveringProvider(mcmManager *McmManager, resourceLimiter *cloudprovider.ResourceLimiter) (*mcmCloudProvider, error) { mcm := &mcmCloudProvider{ - mcmManager: mcmManager, - machinedeployments: make(map[types.NamespacedName]*MachineDeployment), - resourceLimiter: resourceLimiter, - } - for _, spec := range specs { - if err := mcm.addNodeGroup(spec); err != nil { - return nil, err - } + mcmManager: mcmManager, + resourceLimiter: resourceLimiter, } return mcm, nil } -// Cleanup stops the go routine that is handling the current view of the MachineDeployment in the form of a cache +// Cleanup stops the go routine that is handling the current view of the NodeGroupImpl in the form of a cache func (mcm *mcmCloudProvider) Cleanup() error { mcm.mcmManager.Cleanup() return nil } -// addNodeGroup adds node group defined in string spec. Format: -// minNodes:maxNodes:namespace.machineDeploymentName -func (mcm *mcmCloudProvider) addNodeGroup(spec string) error { - machinedeployment, err := buildMachineDeploymentFromSpec(spec, mcm.mcmManager) - if err != nil { - return err - } - mcm.addMachineDeployment(machinedeployment) - return nil -} - -func (mcm *mcmCloudProvider) addMachineDeployment(machinedeployment *MachineDeployment) { - key := types.NamespacedName{Namespace: machinedeployment.Namespace, Name: machinedeployment.Name} - mcm.machinedeployments[key] = machinedeployment - return -} - func (mcm *mcmCloudProvider) Name() string { return "machine-controller-manager" } // NodeGroups returns all node groups configured for this cloud provider. func (mcm *mcmCloudProvider) NodeGroups() []cloudprovider.NodeGroup { - result := make([]cloudprovider.NodeGroup, 0, len(mcm.machinedeployments)) - for _, machinedeployment := range mcm.machinedeployments { + result := make([]cloudprovider.NodeGroup, 0, len(mcm.mcmManager.nodeGroups)) + for _, machinedeployment := range mcm.mcmManager.nodeGroups { if machinedeployment.maxSize == 0 { continue } @@ -166,13 +145,13 @@ func (mcm *mcmCloudProvider) NodeGroupForNode(node *apiv1.Node) (cloudprovider.N return nil, nil } - md, err := mcm.mcmManager.GetMachineDeploymentForMachine(ref) + md, err := mcm.mcmManager.GetNodeGroupImpl(ref) if err != nil { return nil, err } key := types.NamespacedName{Namespace: md.Namespace, Name: md.Name} - _, isManaged := mcm.machinedeployments[key] + _, isManaged := mcm.mcmManager.nodeGroups[key] if !isManaged { klog.V(4).Infof("Skipped node %v, it's not managed by this controller", node.Spec.ProviderID) return nil, nil @@ -248,14 +227,8 @@ func (mcm *mcmCloudProvider) GetNodeGpuConfig(*apiv1.Node) *cloudprovider.GpuCon return nil } -// Ref contains a reference to the name of the machine-deployment. -type Ref struct { - Name string - Namespace string -} - // ReferenceFromProviderID extracts the Ref from providerId. It returns corresponding machine-name to providerid. -func ReferenceFromProviderID(m *McmManager, id string) (*Ref, error) { +func ReferenceFromProviderID(m *McmManager, id string) (*types.NamespacedName, error) { machines, err := m.machineLister.Machines(m.namespace).List(labels.Everything()) if err != nil { return nil, fmt.Errorf("Could not list machines due to error: %s", err) @@ -281,79 +254,82 @@ func ReferenceFromProviderID(m *McmManager, id string) (*Ref, error) { klog.V(4).Infof("No machine found for node ID %q", id) return nil, nil } - return &Ref{ + return &types.NamespacedName{ Name: Name, Namespace: Namespace, }, nil } -// MachineDeployment implements NodeGroup interface. -type MachineDeployment struct { - Ref +// NodeGroupImpl implements NodeGroup interface. +type NodeGroupImpl struct { + types.NamespacedName mcmManager *McmManager - minSize int - maxSize int + scalingMutex sync.Mutex + minSize int + maxSize int } // MaxSize returns maximum size of the node group. -func (machinedeployment *MachineDeployment) MaxSize() int { - return machinedeployment.maxSize +func (ngImpl *NodeGroupImpl) MaxSize() int { + return ngImpl.maxSize } // MinSize returns minimum size of the node group. -func (machinedeployment *MachineDeployment) MinSize() int { - return machinedeployment.minSize +func (ngImpl *NodeGroupImpl) MinSize() int { + return ngImpl.minSize } // TargetSize returns the current TARGET size of the node group. It is possible that the // number is different from the number of nodes registered in Kubernetes. -func (machinedeployment *MachineDeployment) TargetSize() (int, error) { - size, err := machinedeployment.mcmManager.GetMachineDeploymentSize(machinedeployment) +func (ngImpl *NodeGroupImpl) TargetSize() (int, error) { + size, err := ngImpl.mcmManager.GetMachineDeploymentSize(ngImpl.Name) return int(size), err } // Exist checks if the node group really exists on the cloud provider side. Allows to tell the // theoretical node group from the real one. // TODO: Implement this to check if machine-deployment really exists. -func (machinedeployment *MachineDeployment) Exist() bool { +func (ngImpl *NodeGroupImpl) Exist() bool { return true } // Create creates the node group on the cloud provider side. -func (machinedeployment *MachineDeployment) Create() (cloudprovider.NodeGroup, error) { +func (ngImpl *NodeGroupImpl) Create() (cloudprovider.NodeGroup, error) { return nil, cloudprovider.ErrAlreadyExist } // Autoprovisioned returns true if the node group is autoprovisioned. -func (machinedeployment *MachineDeployment) Autoprovisioned() bool { +func (ngImpl *NodeGroupImpl) Autoprovisioned() bool { return false } // Delete deletes the node group on the cloud provider side. // This will be executed only for autoprovisioned node groups, once their size drops to 0. -func (machinedeployment *MachineDeployment) Delete() error { +func (ngImpl *NodeGroupImpl) Delete() error { return cloudprovider.ErrNotImplemented } // IncreaseSize of the Machinedeployment. -func (machinedeployment *MachineDeployment) IncreaseSize(delta int) error { - klog.V(0).Infof("Received request to increase size of machine deployment %s by %d", machinedeployment.Name, delta) +func (ngImpl *NodeGroupImpl) IncreaseSize(delta int) error { + klog.V(0).Infof("Received request to increase size of machine deployment %s by %d", ngImpl.Name, delta) if delta <= 0 { return fmt.Errorf("size increase must be positive") } - size, err := machinedeployment.mcmManager.GetMachineDeploymentSize(machinedeployment) + ngImpl.scalingMutex.Lock() + defer ngImpl.scalingMutex.Unlock() + size, err := ngImpl.mcmManager.GetMachineDeploymentSize(ngImpl.Name) if err != nil { return err } targetSize := int(size) + delta - if targetSize > machinedeployment.MaxSize() { - return fmt.Errorf("size increase too large - desired:%d max:%d", targetSize, machinedeployment.MaxSize()) + if targetSize > ngImpl.MaxSize() { + return fmt.Errorf("size increase too large - desired:%d max:%d", targetSize, ngImpl.MaxSize()) } - return machinedeployment.mcmManager.retry(func(ctx context.Context) (bool, error) { - return machinedeployment.mcmManager.SetMachineDeploymentSize(ctx, machinedeployment, int64(targetSize)) - }, "MachineDeployment", "update", machinedeployment.Name) + return ngImpl.mcmManager.retry(func(ctx context.Context) (bool, error) { + return ngImpl.mcmManager.SetMachineDeploymentSize(ctx, ngImpl, int64(targetSize)) + }, "MachineDeployment", "update", ngImpl.Name) } // DecreaseTargetSize decreases the target size of the node group. This function @@ -361,40 +337,98 @@ func (machinedeployment *MachineDeployment) IncreaseSize(delta int) error { // request for new nodes that have not been yet fulfilled. Delta should be negative. // It is assumed that cloud provider will not delete the existing nodes if the size // when there is an option to just decrease the target. -func (machinedeployment *MachineDeployment) DecreaseTargetSize(delta int) error { - klog.V(0).Infof("Received request to decrease target size of machine deployment %s by %d", machinedeployment.Name, delta) +func (ngImpl *NodeGroupImpl) DecreaseTargetSize(delta int) error { + klog.V(0).Infof("Received request to decrease target size of machine deployment %s by %d", ngImpl.Name, delta) if delta >= 0 { return fmt.Errorf("size decrease size must be negative") } - size, err := machinedeployment.mcmManager.GetMachineDeploymentSize(machinedeployment) + ngImpl.scalingMutex.Lock() + defer ngImpl.scalingMutex.Unlock() + size, err := ngImpl.mcmManager.GetMachineDeploymentSize(ngImpl.Name) if err != nil { return err } decreaseAmount := int(size) + delta - if decreaseAmount < machinedeployment.minSize { - klog.Warningf("Cannot go below min size= %d for machineDeployment %s, requested target size= %d . Setting target size to min size", machinedeployment.minSize, machinedeployment.Name, size+int64(delta)) - decreaseAmount = machinedeployment.minSize + if decreaseAmount < ngImpl.minSize { + klog.Warningf("Cannot go below min size= %d for ngImpl %s, requested target size= %d . Setting target size to min size", ngImpl.minSize, ngImpl.Name, size+int64(delta)) + decreaseAmount = ngImpl.minSize } - return machinedeployment.mcmManager.retry(func(ctx context.Context) (bool, error) { - return machinedeployment.mcmManager.SetMachineDeploymentSize(ctx, machinedeployment, int64(decreaseAmount)) - }, "MachineDeployment", "update", machinedeployment.Name) + return ngImpl.mcmManager.retry(func(ctx context.Context) (bool, error) { + return ngImpl.mcmManager.SetMachineDeploymentSize(ctx, ngImpl, int64(decreaseAmount)) + }, "MachineDeployment", "update", ngImpl.Name) +} + +// Refresh resets the priority annotation for the machines that are not present in machines-marked-by-ca-for-deletion annotation on the machineDeployment +func (ngImpl *NodeGroupImpl) Refresh() error { + if !ngImpl.scalingMutex.TryLock() { + return fmt.Errorf("cannot Refresh() since scalingMutex currently acquired for %q", ngImpl.Name) + } + defer ngImpl.scalingMutex.Unlock() + mcd, err := ngImpl.mcmManager.GetMachineDeploymentObject(ngImpl.Name) + if err != nil { + return err + } + markedMachineNames := getMachineNamesMarkedByCAForDeletion(mcd) + machines, err := ngImpl.mcmManager.getMachinesForMachineDeployment(ngImpl.Name) + if err != nil { + klog.Errorf("[Refresh] failed to get machines for machine deployment %s, hence skipping it. Err: %v", ngImpl.Name, err.Error()) + return err + } + // update the machines-marked-by-ca-for-deletion annotation with the machines that are still marked for deletion by CA. + // This is done to ensure that the machines that are no longer present are removed from the annotation. + var updatedMarkedMachineNames []string + for _, machineName := range markedMachineNames { + if slices.ContainsFunc(machines, func(mc *v1alpha1.Machine) bool { + return mc.Name == machineName + }) { + updatedMarkedMachineNames = append(updatedMarkedMachineNames, machineName) + } + } + clone := mcd.DeepCopy() + if clone.Annotations == nil { + clone.Annotations = map[string]string{} + } + updatedMachinesMarkedByCAForDeletionAnnotationVal := createMachinesMarkedForDeletionAnnotationValue(updatedMarkedMachineNames) + if clone.Annotations[machinesMarkedByCAForDeletionAnnotation] != updatedMachinesMarkedByCAForDeletionAnnotationVal { + clone.Annotations[machinesMarkedByCAForDeletionAnnotation] = updatedMachinesMarkedByCAForDeletionAnnotationVal + ctx, cancelFn := context.WithTimeout(context.Background(), ngImpl.mcmManager.maxRetryTimeout) + defer cancelFn() + _, err = ngImpl.mcmManager.machineClient.MachineDeployments(ngImpl.Namespace).Update(ctx, clone, metav1.UpdateOptions{}) + if err != nil { + return err + } + } + // reset the priority for the machines that are not present in machines-marked-by-ca-for-deletion annotation + var incorrectlyMarkedMachines []string + for _, machine := range machines { + // no need to reset priority for machines already in termination or failed phase + if isMachineFailedOrTerminating(machine) { + continue + } + // check if the machine is marked for deletion by CA but not present in machines-marked-by-ca-for-deletion annotation. This means that CA was not able to reduce the replicas + // corresponding to this machine and hence the machine should not be marked for deletion. + if annotValue, ok := machine.Annotations[machinePriorityAnnotation]; ok && annotValue == priorityValueForDeletionCandidateMachines && !slices.Contains(markedMachineNames, machine.Name) { + incorrectlyMarkedMachines = append(incorrectlyMarkedMachines, machine.Name) + } + } + return ngImpl.mcmManager.resetPriorityForMachines(incorrectlyMarkedMachines) } // Belongs returns true if the given node belongs to the NodeGroup. // TODO: Implement this to iterate over machines under machinedeployment, and return true if node exists in list. -func (machinedeployment *MachineDeployment) Belongs(node *apiv1.Node) (bool, error) { - ref, err := ReferenceFromProviderID(machinedeployment.mcmManager, node.Spec.ProviderID) +func (ngImpl *NodeGroupImpl) Belongs(node *apiv1.Node) (bool, error) { + ref, err := ReferenceFromProviderID(ngImpl.mcmManager, node.Spec.ProviderID) if err != nil { return false, err } - targetMd, err := machinedeployment.mcmManager.GetMachineDeploymentForMachine(ref) + targetMd, err := ngImpl.mcmManager.GetNodeGroupImpl(ref) if err != nil { return false, err } if targetMd == nil { return false, fmt.Errorf("%s doesn't belong to a known MachinDeployment", node.Name) } - if targetMd.Id() != machinedeployment.Id() { + if targetMd.Id() != ngImpl.Id() { return false, nil } return true, nil @@ -402,31 +436,31 @@ func (machinedeployment *MachineDeployment) Belongs(node *apiv1.Node) (bool, err // DeleteNodes deletes the nodes from the group. It is expected that this method will not be called // for nodes which are not part of ANY machine deployment. -func (machinedeployment *MachineDeployment) DeleteNodes(nodes []*apiv1.Node) error { +func (ngImpl *NodeGroupImpl) DeleteNodes(nodes []*apiv1.Node) error { nodeNames := getNodeNames(nodes) klog.V(0).Infof("Received request to delete nodes:- %v", nodeNames) - size, err := machinedeployment.mcmManager.GetMachineDeploymentSize(machinedeployment) + size, err := ngImpl.mcmManager.GetMachineDeploymentSize(ngImpl.Name) if err != nil { return err } - if int(size) <= machinedeployment.MinSize() { + if int(size) <= ngImpl.MinSize() { return fmt.Errorf("min size reached, nodes will not be deleted") } - machines := make([]*Ref, 0, len(nodes)) + machines := make([]*types.NamespacedName, 0, len(nodes)) for _, node := range nodes { - belongs, err := machinedeployment.Belongs(node) + belongs, err := ngImpl.Belongs(node) if err != nil { return err } else if !belongs { - return fmt.Errorf("%s belongs to a different machinedeployment than %s", node.Name, machinedeployment.Id()) + return fmt.Errorf("%s belongs to a different machinedeployment than %s", node.Name, ngImpl.Id()) } - ref, err := ReferenceFromProviderID(machinedeployment.mcmManager, node.Spec.ProviderID) + ref, err := ReferenceFromProviderID(ngImpl.mcmManager, node.Spec.ProviderID) if err != nil { return fmt.Errorf("couldn't find the machine-name from provider-id %s", node.Spec.ProviderID) } machines = append(machines, ref) } - return machinedeployment.mcmManager.DeleteMachines(machines) + return ngImpl.mcmManager.DeleteMachines(machines) } func getNodeNames(nodes []*apiv1.Node) interface{} { @@ -438,20 +472,20 @@ func getNodeNames(nodes []*apiv1.Node) interface{} { } // Id returns machinedeployment id. -func (machinedeployment *MachineDeployment) Id() string { - return machinedeployment.Name +func (ngImpl *NodeGroupImpl) Id() string { + return ngImpl.Name } // Debug returns a debug string for the Asg. -func (machinedeployment *MachineDeployment) Debug() string { - return fmt.Sprintf("%s (%d:%d)", machinedeployment.Id(), machinedeployment.MinSize(), machinedeployment.MaxSize()) +func (ngImpl *NodeGroupImpl) Debug() string { + return fmt.Sprintf("%s (%d:%d)", ngImpl.Id(), ngImpl.MinSize(), ngImpl.MaxSize()) } // Nodes returns a list of all nodes that belong to this node group. -func (machinedeployment *MachineDeployment) Nodes() ([]cloudprovider.Instance, error) { - instances, err := machinedeployment.mcmManager.GetInstancesForMachineDeployment(machinedeployment) +func (ngImpl *NodeGroupImpl) Nodes() ([]cloudprovider.Instance, error) { + instances, err := ngImpl.mcmManager.GetInstancesForMachineDeployment(ngImpl.Name) if err != nil { - return nil, fmt.Errorf("failed to get the cloudprovider.Instance for machines backed by the machinedeployment %q, error: %v", machinedeployment.Name, err) + return nil, fmt.Errorf("failed to get the cloudprovider.Instance for machines backed by the machinedeployment %q, error: %v", ngImpl.Name, err) } erroneousInstanceInfos := make([]string, 0, len(instances)) for _, instance := range instances { @@ -468,9 +502,9 @@ func (machinedeployment *MachineDeployment) Nodes() ([]cloudprovider.Instance, e // GetOptions returns NodeGroupAutoscalingOptions that should be used for this particular // NodeGroup. Returning a nil will result in using default options. // Implementation optional. -func (machinedeployment *MachineDeployment) GetOptions(defaults config.NodeGroupAutoscalingOptions) (*config.NodeGroupAutoscalingOptions, error) { +func (ngImpl *NodeGroupImpl) GetOptions(defaults config.NodeGroupAutoscalingOptions) (*config.NodeGroupAutoscalingOptions, error) { options := defaults - mcdAnnotations, err := machinedeployment.mcmManager.GetMachineDeploymentAnnotations(machinedeployment.Name) + mcdAnnotations, err := ngImpl.mcmManager.GetMachineDeploymentAnnotations(ngImpl.Name) if err != nil { return nil, err } @@ -504,49 +538,32 @@ func (machinedeployment *MachineDeployment) GetOptions(defaults config.NodeGroup } // TemplateNodeInfo returns a node template for this node group. -func (machinedeployment *MachineDeployment) TemplateNodeInfo() (*schedulerframework.NodeInfo, error) { +func (ngImpl *NodeGroupImpl) TemplateNodeInfo() (*schedulerframework.NodeInfo, error) { - nodeTemplate, err := machinedeployment.mcmManager.GetMachineDeploymentNodeTemplate(machinedeployment) + nodeTemplate, err := ngImpl.mcmManager.GetMachineDeploymentNodeTemplate(ngImpl.Name) if err != nil { return nil, err } - node, err := machinedeployment.mcmManager.buildNodeFromTemplate(machinedeployment.Name, nodeTemplate) + node, err := ngImpl.mcmManager.buildNodeFromTemplate(ngImpl.Name, nodeTemplate) if err != nil { return nil, err } - nodeInfo := schedulerframework.NewNodeInfo(cloudprovider.BuildKubeProxy(machinedeployment.Name)) + nodeInfo := schedulerframework.NewNodeInfo(cloudprovider.BuildKubeProxy(ngImpl.Name)) nodeInfo.SetNode(node) return nodeInfo, nil } // AtomicIncreaseSize is not implemented. -func (machinedeployment *MachineDeployment) AtomicIncreaseSize(delta int) error { +func (ngImpl *NodeGroupImpl) AtomicIncreaseSize(delta int) error { return cloudprovider.ErrNotImplemented } -func buildMachineDeploymentFromSpec(value string, mcmManager *McmManager) (*MachineDeployment, error) { - spec, err := dynamic.SpecFromString(value, true) - - if err != nil { - return nil, fmt.Errorf("failed to parse node group spec: %v", err) - } - s := strings.Split(spec.Name, ".") - Namespace, Name := s[0], s[1] - - machinedeployment := buildMachineDeployment(mcmManager, spec.MinSize, spec.MaxSize, Namespace, Name) - return machinedeployment, nil -} - -func buildMachineDeployment(mcmManager *McmManager, minSize int, maxSize int, namespace string, name string) *MachineDeployment { - return &MachineDeployment{ - mcmManager: mcmManager, - minSize: minSize, - maxSize: maxSize, - Ref: Ref{ - Name: name, - Namespace: namespace, - }, +// getMachineNamesMarkedByCAForDeletion returns the set of machine names marked by CA for deletion. +func getMachineNamesMarkedByCAForDeletion(mcd *v1alpha1.MachineDeployment) []string { + if mcd.Annotations == nil || mcd.Annotations[machinesMarkedByCAForDeletionAnnotation] == "" { + return make([]string, 0) } + return strings.Split(mcd.Annotations[machinesMarkedByCAForDeletionAnnotation], ",") } diff --git a/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider_test.go b/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider_test.go index 6752c2497852..3467b48afcac 100644 --- a/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider_test.go +++ b/cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider_test.go @@ -9,6 +9,7 @@ import ( "errors" "fmt" "math" + "slices" "strings" "testing" "time" @@ -85,10 +86,11 @@ func TestDeleteNodes(t *testing.T) { node *corev1.Node } type expect struct { - machines []*v1alpha1.Machine - mdName string - mdReplicas int32 - err error + prio1Machines []*v1alpha1.Machine + mdName string + mdReplicas int32 + machinesMarkedByCAAnnotationValue string + err error } type data struct { name string @@ -100,42 +102,44 @@ func TestDeleteNodes(t *testing.T) { { "should scale down machine deployment to remove a node", setup{ - nodes: newNodes(2, "fakeID", []bool{true, false}), - machines: newMachines(2, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3", "3"}, []bool{false, false}), + nodes: newNodes(2, "fakeID"), + machines: newMachines(2, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3", "3"}), machineSets: newMachineSets(1, "machinedeployment-1"), machineDeployments: newMachineDeployments(1, 2, nil, nil, nil), nodeGroups: []string{nodeGroup1}, }, - action{node: newNodes(1, "fakeID", []bool{true})[0]}, + action{node: newNodes(1, "fakeID")[0]}, expect{ - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), - mdName: "machinedeployment-1", - mdReplicas: 1, - err: nil, + prio1Machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}), + mdName: "machinedeployment-1", + machinesMarkedByCAAnnotationValue: createMachinesMarkedForDeletionAnnotationValue(generateNames("machine", 1)), + mdReplicas: 1, + err: nil, }, }, { "should scale down machine deployment to remove a placeholder node", setup{ nodes: nil, - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3"}, []bool{false}), + machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3"}), machineSets: newMachineSets(1, "machinedeployment-1"), machineDeployments: newMachineDeployments(1, 1, nil, nil, nil), nodeGroups: []string{nodeGroup2}, }, - action{node: newNode("node-1", "requested://machine-1", true)}, + action{node: newNode("node-1", "requested://machine-1")}, expect{ - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), - mdName: "machinedeployment-1", - mdReplicas: 0, - err: nil, + prio1Machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}), + machinesMarkedByCAAnnotationValue: createMachinesMarkedForDeletionAnnotationValue(generateNames("machine", 1)), + mdName: "machinedeployment-1", + mdReplicas: 0, + err: nil, }, }, { "should not scale down a machine deployment when it is under rolling update", setup{ - nodes: newNodes(2, "fakeID", []bool{true, false}), - machines: newMachines(2, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3", "3"}, []bool{false, false}), + nodes: newNodes(2, "fakeID"), + machines: newMachines(2, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3", "3"}), machineSets: newMachineSets(2, "machinedeployment-1"), machineDeployments: newMachineDeployments(1, 2, &v1alpha1.MachineDeploymentStatus{ Conditions: []v1alpha1.MachineDeploymentCondition{ @@ -144,19 +148,19 @@ func TestDeleteNodes(t *testing.T) { }, nil, nil), nodeGroups: []string{nodeGroup1}, }, - action{node: newNodes(1, "fakeID", []bool{true})[0]}, + action{node: newNodes(1, "fakeID")[0]}, expect{ - machines: nil, - mdName: "machinedeployment-1", - mdReplicas: 2, - err: fmt.Errorf("MachineDeployment machinedeployment-1 is under rolling update , cannot reduce replica count"), + prio1Machines: nil, + mdName: "machinedeployment-1", + mdReplicas: 2, + err: fmt.Errorf("MachineDeployment machinedeployment-1 is under rolling update , cannot reduce replica count"), }, }, { "should not scale down when machine deployment update call times out and should reset priority of the corresponding machine", setup{ - nodes: newNodes(2, "fakeID", []bool{true, false}), - machines: newMachines(2, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3", "3"}, []bool{false, false}), + nodes: newNodes(2, "fakeID"), + machines: newMachines(2, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3", "3"}), machineSets: newMachineSets(1, "machinedeployment-1"), machineDeployments: newMachineDeployments(1, 2, nil, nil, nil), nodeGroups: []string{nodeGroup1}, @@ -166,9 +170,8 @@ func TestDeleteNodes(t *testing.T) { }, }, }, - action{node: newNodes(1, "fakeID", []bool{true})[0]}, + action{node: newNodes(1, "fakeID")[0]}, expect{ - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3"}, []bool{false}), mdName: "machinedeployment-1", mdReplicas: 2, err: errors.Join(nil, fmt.Errorf("unable to scale in machine deployment machinedeployment-1, Error: %w", errors.New(mdUpdateErrorMsg))), @@ -177,8 +180,8 @@ func TestDeleteNodes(t *testing.T) { { "should scale down when machine deployment update call fails but passes within the timeout period", setup{ - nodes: newNodes(2, "fakeID", []bool{true, false}), - machines: newMachines(2, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3", "3"}, []bool{false, false}), + nodes: newNodes(2, "fakeID"), + machines: newMachines(2, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3", "3"}), machineSets: newMachineSets(1, "machinedeployment-1"), machineDeployments: newMachineDeployments(1, 2, nil, nil, nil), nodeGroups: []string{nodeGroup1}, @@ -188,26 +191,26 @@ func TestDeleteNodes(t *testing.T) { }, }, }, - action{node: newNodes(1, "fakeID", []bool{true})[0]}, + action{node: newNodes(1, "fakeID")[0]}, expect{ - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), - mdName: "machinedeployment-1", - mdReplicas: 1, - err: nil, + prio1Machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}), + machinesMarkedByCAAnnotationValue: createMachinesMarkedForDeletionAnnotationValue(generateNames("machine", 1)), + mdName: "machinedeployment-1", + mdReplicas: 1, + err: nil, }, }, { "should not scale down a machine deployment when the corresponding machine is already in terminating state", setup{ - nodes: newNodes(2, "fakeID", []bool{true, false}), - machines: newMachines(2, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3", "3"}, []bool{true, false}), + nodes: newNodes(2, "fakeID"), + machines: newMachines(2, "fakeID", &v1alpha1.MachineStatus{CurrentStatus: v1alpha1.CurrentStatus{Phase: v1alpha1.MachineTerminating}}, "machinedeployment-1", "machineset-1", []string{"3", "3"}), machineSets: newMachineSets(1, "machinedeployment-1"), machineDeployments: newMachineDeployments(1, 2, nil, nil, nil), nodeGroups: []string{nodeGroup1}, }, - action{node: newNodes(1, "fakeID", []bool{true})[0]}, + action{node: newNodes(1, "fakeID")[0]}, expect{ - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3"}, []bool{true}), mdName: "machinedeployment-1", mdReplicas: 2, err: nil, @@ -216,15 +219,14 @@ func TestDeleteNodes(t *testing.T) { { "should not scale down a machine deployment when the corresponding machine is already in failed state", setup{ - nodes: newNodes(2, "fakeID", []bool{true, false}), - machines: newMachines(2, "fakeID", &v1alpha1.MachineStatus{CurrentStatus: v1alpha1.CurrentStatus{Phase: v1alpha1.MachineFailed}}, "machinedeployment-1", "machineset-1", []string{"3", "3"}, []bool{false, false}), + nodes: newNodes(2, "fakeID"), + machines: newMachines(2, "fakeID", &v1alpha1.MachineStatus{CurrentStatus: v1alpha1.CurrentStatus{Phase: v1alpha1.MachineFailed}}, "machinedeployment-1", "machineset-1", []string{"3", "3"}), machineSets: newMachineSets(1, "machinedeployment-1"), machineDeployments: newMachineDeployments(1, 2, nil, nil, nil), nodeGroups: []string{nodeGroup1}, }, - action{node: newNodes(1, "fakeID", []bool{false})[0]}, + action{node: newNodes(1, "fakeID")[0]}, expect{ - machines: newMachines(2, "fakeID", &v1alpha1.MachineStatus{CurrentStatus: v1alpha1.CurrentStatus{Phase: v1alpha1.MachineFailed}}, "machinedeployment-1", "machineset-1", []string{"3", "3"}, []bool{false, false}), mdName: "machinedeployment-1", mdReplicas: 2, err: nil, @@ -233,25 +235,25 @@ func TestDeleteNodes(t *testing.T) { { "should not scale down a machine deployment below the minimum", setup{ - nodes: newNodes(1, "fakeID", []bool{true}), - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3"}, []bool{false}), + nodes: newNodes(1, "fakeID"), + machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3"}), machineSets: newMachineSets(1, "machinedeployment-1"), machineDeployments: newMachineDeployments(1, 1, nil, nil, nil), nodeGroups: []string{nodeGroup1}, }, - action{node: newNodes(1, "fakeID", []bool{true})[0]}, + action{node: newNodes(1, "fakeID")[0]}, expect{ - machines: nil, - mdName: "machinedeployment-1", - mdReplicas: 1, - err: fmt.Errorf("min size reached, nodes will not be deleted"), + prio1Machines: nil, + mdName: "machinedeployment-1", + mdReplicas: 1, + err: fmt.Errorf("min size reached, nodes will not be deleted"), }, }, { "no scale down of machine deployment if priority of the targeted machine cannot be updated to 1", setup{ - nodes: newNodes(2, "fakeID", []bool{true, false}), - machines: newMachines(2, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3", "3"}, []bool{false, false}), + nodes: newNodes(2, "fakeID"), + machines: newMachines(2, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3", "3"}), machineSets: newMachineSets(1, "machinedeployment-1"), machineDeployments: newMachineDeployments(1, 2, nil, nil, nil), nodeGroups: []string{nodeGroup1}, @@ -261,29 +263,29 @@ func TestDeleteNodes(t *testing.T) { }, }, }, - action{node: newNodes(1, "fakeID", []bool{true})[0]}, + action{node: newNodes(1, "fakeID")[0]}, expect{ - machines: nil, - mdName: "machinedeployment-1", - mdReplicas: 2, - err: fmt.Errorf("could not prioritize machine machine-1 for deletion, aborting scale in of machine deployment, Error: %s", mcUpdateErrorMsg), + prio1Machines: nil, + mdName: "machinedeployment-1", + mdReplicas: 2, + err: fmt.Errorf("could not prioritize machine machine-1 for deletion, aborting scale in of machine deployment, Error: %s", mcUpdateErrorMsg), }, }, { "should not scale down machine deployment if the node belongs to another machine deployment", setup{ - nodes: newNodes(2, "fakeID", []bool{true, false}), - machines: newMachines(2, "fakeID", nil, "machinedeployment-2", "machineset-1", []string{"3", "3"}, []bool{false, false}), + nodes: newNodes(2, "fakeID"), + machines: newMachines(2, "fakeID", nil, "machinedeployment-2", "machineset-1", []string{"3", "3"}), machineSets: newMachineSets(1, "machinedeployment-2"), machineDeployments: newMachineDeployments(2, 2, nil, nil, nil), nodeGroups: []string{nodeGroup2, nodeGroup3}, }, - action{node: newNodes(1, "fakeID", []bool{true})[0]}, + action{node: newNodes(1, "fakeID")[0]}, expect{ - machines: nil, - mdName: "machinedeployment-2", - mdReplicas: 2, - err: fmt.Errorf("node-1 belongs to a different machinedeployment than machinedeployment-1"), + prio1Machines: nil, + mdName: "machinedeployment-2", + mdReplicas: 2, + err: fmt.Errorf("node-1 belongs to a different machinedeployment than machinedeployment-1"), }, }, } @@ -296,7 +298,7 @@ func TestDeleteNodes(t *testing.T) { stop := make(chan struct{}) defer close(stop) controlMachineObjects, targetCoreObjects, _ := setupEnv(&entry.setup) - m, trackers, hasSyncedCacheFns := createMcmManager(t, stop, testNamespace, nil, controlMachineObjects, targetCoreObjects, nil) + m, trackers, hasSyncedCacheFns := createMcmManager(t, stop, testNamespace, entry.setup.nodeGroups, controlMachineObjects, targetCoreObjects, nil) defer trackers.Stop() waitForCacheSync(t, stop, hasSyncedCacheFns) @@ -307,7 +309,7 @@ func TestDeleteNodes(t *testing.T) { trackers.ControlMachine.SetFailAtFakeResourceActions(entry.setup.controlMachineFakeResourceActions) } - md, err := buildMachineDeploymentFromSpec(entry.setup.nodeGroups[0], m) + md, err := buildNodeGroupImplFromSpec(entry.setup.nodeGroups[0], m) g.Expect(err).To(BeNil()) err = md.DeleteNodes([]*corev1.Node{entry.action.node}) @@ -321,6 +323,7 @@ func TestDeleteNodes(t *testing.T) { machineDeployment, err := m.machineClient.MachineDeployments(m.namespace).Get(context.TODO(), entry.expect.mdName, metav1.GetOptions{}) g.Expect(err).ToNot(HaveOccurred()) g.Expect(machineDeployment.Spec.Replicas).To(BeNumerically("==", entry.expect.mdReplicas)) + g.Expect(machineDeployment.Annotations[machinesMarkedByCAForDeletionAnnotation]).To(Equal(entry.expect.machinesMarkedByCAAnnotationValue)) machines, err := m.machineClient.Machines(m.namespace).List(context.TODO(), metav1.ListOptions{ LabelSelector: metav1.FormatLabelSelector(&metav1.LabelSelector{ @@ -329,26 +332,52 @@ func TestDeleteNodes(t *testing.T) { }) for _, machine := range machines.Items { - flag := false - for _, entryMachineItem := range entry.expect.machines { - if entryMachineItem.Name == machine.Name { - g.Expect(machine.Annotations[machinePriorityAnnotation]).To(Equal(entryMachineItem.Annotations[machinePriorityAnnotation])) - flag = true - break - } - } - if !flag { - g.Expect(machine.Annotations[machinePriorityAnnotation]).To(Equal("3")) + if slices.ContainsFunc(entry.expect.prio1Machines, func(m *v1alpha1.Machine) bool { + return machine.Name == m.Name + }) { + g.Expect(machine.Annotations[machinePriorityAnnotation]).To(Equal(priorityValueForDeletionCandidateMachines)) + } else { + g.Expect(machine.Annotations[machinePriorityAnnotation]).To(Equal(defaultPriorityValue)) } } }) } } +func TestIdempotencyOfDeleteNodes(t *testing.T) { + setupObj := setup{ + nodes: newNodes(3, "fakeID"), + machines: newMachines(3, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3", "3", "3"}), + machineSets: newMachineSets(1, "machinedeployment-1"), + machineDeployments: newMachineDeployments(1, 3, nil, nil, nil), + nodeGroups: []string{nodeGroup1}, + } + g := NewWithT(t) + stop := make(chan struct{}) + defer close(stop) + controlMachineObjects, targetCoreObjects, _ := setupEnv(&setupObj) + m, trackers, hasSyncedCacheFns := createMcmManager(t, stop, testNamespace, setupObj.nodeGroups, controlMachineObjects, targetCoreObjects, nil) + defer trackers.Stop() + waitForCacheSync(t, stop, hasSyncedCacheFns) + md, err := buildNodeGroupImplFromSpec(setupObj.nodeGroups[0], m) + g.Expect(err).To(BeNil()) + + err = md.DeleteNodes(newNodes(1, "fakeID")) + g.Expect(err).To(BeNil()) + err = md.DeleteNodes(newNodes(1, "fakeID")) + g.Expect(err).To(BeNil()) + + machineDeployment, err := m.machineClient.MachineDeployments(m.namespace).Get(context.TODO(), setupObj.machineDeployments[0].Name, metav1.GetOptions{}) + g.Expect(err).ToNot(HaveOccurred()) + g.Expect(machineDeployment.Spec.Replicas).To(BeNumerically("==", 2)) + g.Expect(machineDeployment.Annotations[machinesMarkedByCAForDeletionAnnotation]).To(Equal(createMachinesMarkedForDeletionAnnotationValue(generateNames("machine", 1)))) +} + func TestRefresh(t *testing.T) { type expect struct { - machines []*v1alpha1.Machine - err error + prio3Machines []string + machinesMarkedByCAForDeletionAnnotationValue string + err error } type data struct { name string @@ -359,8 +388,8 @@ func TestRefresh(t *testing.T) { { "should return an error if MCM has zero available replicas", setup{ - nodes: newNodes(1, "fakeID", []bool{false}), - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), + nodes: newNodes(1, "fakeID"), + machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}), machineDeployments: newMachineDeployments(1, 1, nil, nil, nil), nodeGroups: []string{nodeGroup2}, mcmDeployment: newMCMDeployment(0), @@ -372,8 +401,8 @@ func TestRefresh(t *testing.T) { { "should return an error if MCM deployment is not found", setup{ - nodes: newNodes(1, "fakeID", []bool{false}), - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), + nodes: newNodes(1, "fakeID"), + machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}), machineDeployments: newMachineDeployments(1, 1, nil, nil, nil), nodeGroups: []string{nodeGroup2}, }, @@ -382,88 +411,38 @@ func TestRefresh(t *testing.T) { }, }, { - "should reset priority of a machine to 3 if machine deployment is not scaled in", + "should reset priority of a machine if it is not present in machines-marked-by-ca-for-deletion annotation on machine deployment", setup{ - nodes: newNodes(1, "fakeID", []bool{false}), - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), + nodes: newNodes(1, "fakeID"), + machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}), machineDeployments: newMachineDeployments(1, 1, nil, nil, nil), nodeGroups: []string{nodeGroup2}, mcmDeployment: newMCMDeployment(1), }, expect{ - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3"}, []bool{false}), - err: nil, + prio3Machines: generateNames("machine", 1), + err: nil, }, }, { - "should reset priority of a machine to 3 if machine deployment is not scaled in even if ToBeDeletedTaint is present on the corresponding node", + "should update the machines-marked-by-ca-for-deletion annotation and remove non-existing machines", setup{ - nodes: newNodes(1, "fakeID", []bool{true}), - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), - machineDeployments: newMachineDeployments(1, 1, nil, nil, nil), + nodes: newNodes(1, "fakeID"), + machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}), + machineDeployments: newMachineDeployments(1, 0, nil, map[string]string{machinesMarkedByCAForDeletionAnnotation: "machine-1,machine-2"}, nil), nodeGroups: []string{nodeGroup2}, mcmDeployment: newMCMDeployment(1), }, expect{ - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3"}, []bool{false}), - err: nil, - }, - }, - { - "should NOT skip paused machine deployment", - setup{ - nodes: newNodes(1, "fakeID", []bool{false}), - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), - machineDeployments: newMachineDeployments(1, 1, &v1alpha1.MachineDeploymentStatus{ - Conditions: []v1alpha1.MachineDeploymentCondition{ - {Type: v1alpha1.MachineDeploymentProgressing, Status: v1alpha1.ConditionUnknown, Reason: machineDeploymentPausedReason}, - }, - }, nil, nil), - nodeGroups: []string{nodeGroup2}, - mcmDeployment: newMCMDeployment(1), - }, - expect{ - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3"}, []bool{false}), - err: nil, - }, - }, - { - "should ignore terminating/failed machines in checking if number of annotated machines is more than desired", - setup{ - nodes: newNodes(1, "fakeID", []bool{true}), - machines: newMachines(1, "fakeID", &v1alpha1.MachineStatus{ - CurrentStatus: v1alpha1.CurrentStatus{Phase: v1alpha1.MachineFailed}, - }, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), - machineDeployments: newMachineDeployments(1, 1, nil, nil, nil), - nodeGroups: []string{nodeGroup2}, - mcmDeployment: newMCMDeployment(1), - }, - expect{ - machines: newMachines(1, "fakeID", &v1alpha1.MachineStatus{ - CurrentStatus: v1alpha1.CurrentStatus{Phase: v1alpha1.MachineFailed}, - }, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), + machinesMarkedByCAForDeletionAnnotationValue: createMachinesMarkedForDeletionAnnotationValue(generateNames("machine", 1)), err: nil, }, }, - { - "should not reset priority of a machine to 3 if machine deployment is scaled in", - setup{ - nodes: newNodes(1, "fakeID", []bool{true}), - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), - machineDeployments: newMachineDeployments(1, 0, nil, nil, nil), - nodeGroups: []string{nodeGroup2}, - mcmDeployment: newMCMDeployment(1), - }, - expect{ - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), - err: nil, - }, - }, { "priority reset of machine fails", setup{ - nodes: newNodes(1, "fakeID", []bool{false}), - machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}), + nodes: newNodes(1, "fakeID"), + machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}), machineDeployments: newMachineDeployments(1, 1, nil, nil, nil), controlMachineFakeResourceActions: &customfake.ResourceActions{ Machine: customfake.Actions{ @@ -474,8 +453,7 @@ func TestRefresh(t *testing.T) { mcmDeployment: newMCMDeployment(1), }, expect{ - machines: []*v1alpha1.Machine{newMachine("machine-1", "fakeID-1", nil, "machinedeployment-1", "machineset-1", "1", false, true)}, - err: errors.Join(nil, errors.Join(fmt.Errorf("could not reset priority annotation on machine machine-1, Error: %v", mcUpdateErrorMsg))), + err: errors.Join(nil, errors.Join(fmt.Errorf("could not reset priority annotation on machine machine-1, Error: %v", mcUpdateErrorMsg))), }, }, } @@ -505,10 +483,14 @@ func TestRefresh(t *testing.T) { } else { g.Expect(err).To(BeNil()) } - for _, mc := range entry.expect.machines { - machine, err := m.machineClient.Machines(m.namespace).Get(context.TODO(), mc.Name, metav1.GetOptions{}) - g.Expect(err).To(BeNil()) - g.Expect(mc.Annotations[machinePriorityAnnotation]).To(Equal(machine.Annotations[machinePriorityAnnotation])) + machines, err := m.machineClient.Machines(m.namespace).List(context.TODO(), metav1.ListOptions{}) + g.Expect(err).To(BeNil()) + for _, mc := range machines.Items { + if slices.Contains(entry.expect.prio3Machines, mc.Name) { + g.Expect(mc.Annotations[machinePriorityAnnotation]).To(Equal(defaultPriorityValue)) + } else { + g.Expect(mc.Annotations[machinePriorityAnnotation]).To(Equal(priorityValueForDeletionCandidateMachines)) + } } }) } @@ -554,14 +536,14 @@ func TestNodes(t *testing.T) { { "Correct instances should be returned for machine objects under the machinedeployment", setup{ - nodes: []*corev1.Node{newNode("node-1", "fakeID-1", false)}, + nodes: []*corev1.Node{newNode("node-1", "fakeID-1")}, machines: func() []*v1alpha1.Machine { allMachines := make([]*v1alpha1.Machine, 0, 5) - allMachines = append(allMachines, newMachine("machine-with-registered-node", "fakeID-1", nil, "machinedeployment-1", "", "", false, true)) - allMachines = append(allMachines, newMachine("machine-with-vm-but-no-node", "fakeID-2", nil, "machinedeployment-1", "", "", false, false)) - allMachines = append(allMachines, newMachine("machine-with-vm-creating", "", nil, "machinedeployment-1", "", "", false, false)) - allMachines = append(allMachines, newMachine("machine-with-vm-create-error-out-of-quota", "", &v1alpha1.MachineStatus{LastOperation: v1alpha1.LastOperation{Type: v1alpha1.MachineOperationCreate, State: v1alpha1.MachineStateFailed, ErrorCode: machinecodes.ResourceExhausted.String(), Description: outOfQuotaMachineStatusErrorDescription}}, "machinedeployment-1", "", "", false, false)) - allMachines = append(allMachines, newMachine("machine-with-vm-create-error-invalid-credentials", "", &v1alpha1.MachineStatus{LastOperation: v1alpha1.LastOperation{Type: v1alpha1.MachineOperationCreate, State: v1alpha1.MachineStateFailed, ErrorCode: machinecodes.Internal.String(), Description: invalidCredentialsMachineStatusErrorDescription}}, "machinedeployment-1", "", "", false, false)) + allMachines = append(allMachines, newMachine("machine-with-registered-node", "fakeID-1", nil, "machinedeployment-1", "", "", true)) + allMachines = append(allMachines, newMachine("machine-with-vm-but-no-node", "fakeID-2", nil, "machinedeployment-1", "", "", false)) + allMachines = append(allMachines, newMachine("machine-with-vm-creating", "", nil, "machinedeployment-1", "", "", false)) + allMachines = append(allMachines, newMachine("machine-with-vm-create-error-out-of-quota", "", &v1alpha1.MachineStatus{LastOperation: v1alpha1.LastOperation{Type: v1alpha1.MachineOperationCreate, State: v1alpha1.MachineStateFailed, ErrorCode: machinecodes.ResourceExhausted.String(), Description: outOfQuotaMachineStatusErrorDescription}}, "machinedeployment-1", "", "", false)) + allMachines = append(allMachines, newMachine("machine-with-vm-create-error-invalid-credentials", "", &v1alpha1.MachineStatus{LastOperation: v1alpha1.LastOperation{Type: v1alpha1.MachineOperationCreate, State: v1alpha1.MachineStateFailed, ErrorCode: machinecodes.Internal.String(), Description: invalidCredentialsMachineStatusErrorDescription}}, "machinedeployment-1", "", "", false)) return allMachines }(), machineDeployments: newMachineDeployments(1, 2, nil, nil, nil), @@ -599,7 +581,7 @@ func TestNodes(t *testing.T) { trackers.ControlMachine.SetFailAtFakeResourceActions(entry.setup.controlMachineFakeResourceActions) } - md, err := buildMachineDeploymentFromSpec(entry.setup.nodeGroups[0], m) + md, err := buildNodeGroupImplFromSpec(entry.setup.nodeGroups[0], m) g.Expect(err).To(BeNil()) returnedInstances, err := md.Nodes() @@ -751,7 +733,7 @@ func TestGetOptions(t *testing.T) { defer trackers.Stop() waitForCacheSync(t, stop, hasSyncedCacheFns) - md, err := buildMachineDeploymentFromSpec(entry.setup.nodeGroups[0], m) + md, err := buildNodeGroupImplFromSpec(entry.setup.nodeGroups[0], m) g.Expect(err).To(BeNil()) options, err := md.GetOptions(ngAutoScalingOpDefaults) diff --git a/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go b/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go index 399862c9e383..e0b807c605ac 100644 --- a/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go +++ b/cluster-autoscaler/cloudprovider/mcm/mcm_manager.go @@ -27,6 +27,8 @@ import ( "errors" "flag" "fmt" + "k8s.io/apimachinery/pkg/types" + "k8s.io/autoscaler/cluster-autoscaler/config/dynamic" v1appslister "k8s.io/client-go/listers/apps/v1" "k8s.io/utils/pointer" "maps" @@ -36,6 +38,7 @@ import ( "slices" "strconv" "strings" + "sync" "time" awsapis "github.com/gardener/machine-controller-manager-provider-aws/pkg/aws/apis" @@ -56,7 +59,6 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/autoscaler/cluster-autoscaler/cloudprovider" - "k8s.io/autoscaler/cluster-autoscaler/config/dynamic" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupset" "k8s.io/autoscaler/cluster-autoscaler/utils/gpu" "k8s.io/client-go/discovery" @@ -77,9 +79,9 @@ const ( defaultResetAnnotationTimeout = 10 * time.Second // defaultPriorityValue is the default value for the priority annotation used by CA. It is set to 3 because MCM defaults the priority of machine it creates to 3. defaultPriorityValue = "3" - // priorityValueForCandidateMachines is the priority annotation value set on machines that the CA wants to be deleted. Its value is set to 1. - priorityValueForCandidateMachines = "1" - minResyncPeriodDefault = 1 * time.Hour + // priorityValueForDeletionCandidateMachines is the priority annotation value set on machines that the CA wants to be deleted. Its value is set to 1. + priorityValueForDeletionCandidateMachines = "1" + minResyncPeriodDefault = 1 * time.Hour // machinePriorityAnnotation is the annotation to set machine priority while deletion machinePriorityAnnotation = "machinepriority.machine.sapcloud.io" // kindMachineClass is the kind for generic machine class used by the OOT providers @@ -98,6 +100,9 @@ const ( machineDeploymentPausedReason = "DeploymentPaused" // machineDeploymentNameLabel key for Machine Deployment name in machine labels machineDeploymentNameLabel = "name" + // machinesMarkedByCAForDeletionAnnotation is the annotation set by CA on machine deployment. Its value denotes the machines that + // CA marked for deletion by updating the priority annotation to 1 and scaling down the machine deployment. + machinesMarkedByCAForDeletionAnnotation = "cluster-autoscaler.kubernetes.io/machines-marked-by-ca-for-deletion" ) var ( @@ -124,6 +129,7 @@ type McmManager struct { namespace string interrupt chan struct{} discoveryOpts cloudprovider.NodeGroupDiscoveryOptions + nodeGroups map[types.NamespacedName]*NodeGroupImpl deploymentLister v1appslister.DeploymentLister machineClient machineapi.MachineV1alpha1Interface machineDeploymentLister machinelisters.MachineDeploymentLister @@ -249,6 +255,7 @@ func createMCMManagerInternal(discoveryOpts cloudprovider.NodeGroupDiscoveryOpti m := &McmManager{ namespace: namespace, interrupt: make(chan struct{}), + nodeGroups: make(map[types.NamespacedName]*NodeGroupImpl), deploymentLister: deploymentLister, machineClient: controlMachineClient, machineClassLister: machineClassLister, @@ -260,7 +267,10 @@ func createMCMManagerInternal(discoveryOpts cloudprovider.NodeGroupDiscoveryOpti maxRetryTimeout: maxRetryTimeout, retryInterval: retryInterval, } - + err = m.generateMachineDeploymentMap() + if err != nil { + return nil, err + } targetCoreInformerFactory.Start(m.interrupt) controlMachineInformerFactory.Start(m.interrupt) appsInformerFactory.Start(m.interrupt) @@ -283,6 +293,27 @@ func createMCMManagerInternal(discoveryOpts cloudprovider.NodeGroupDiscoveryOpti return nil, fmt.Errorf("Unable to start cloud provider MCM for cluster autoscaler: API GroupVersion %q or %q or %q is not available; \nFound: %#v", machineGVR, machineSetGVR, machineDeploymentGVR, availableResources) } +func (m *McmManager) generateMachineDeploymentMap() error { + for _, spec := range m.discoveryOpts.NodeGroupSpecs { + if err := m.addNodeGroup(spec); err != nil { + return err + } + } + return nil +} + +// addNodeGroup adds node group defined in string spec. Format: +// minNodes:maxNodes:namespace.machineDeploymentName +func (m *McmManager) addNodeGroup(spec string) error { + nodeGroup, err := buildNodeGroupImplFromSpec(spec, m) + if err != nil { + return err + } + key := types.NamespacedName{Namespace: nodeGroup.Namespace, Name: nodeGroup.Name} + m.nodeGroups[key] = nodeGroup + return nil +} + // TODO: In general, any controller checking this needs to be dynamic so // users don't have to restart their controller manager if they change the apiserver. // Until we get there, the structure here needs to be exposed for the construction of a proper ControllerContext. @@ -342,11 +373,11 @@ func CreateMcmManager(discoveryOpts cloudprovider.NodeGroupDiscoveryOptions) (*M return createMCMManagerInternal(discoveryOpts, defaultRetryInterval, defaultMaxRetryTimeout) } -// GetMachineDeploymentForMachine returns the MachineDeployment for the Machine object. -func (m *McmManager) GetMachineDeploymentForMachine(machine *Ref) (*MachineDeployment, error) { +// GetNodeGroupImpl returns the NodeGroupImpl for the given fully-qualified machine name. +func (m *McmManager) GetNodeGroupImpl(machine *types.NamespacedName) (*NodeGroupImpl, error) { if machine.Name == "" { // Considering the possibility when Machine has been deleted but due to cached Node object it appears here. - return nil, fmt.Errorf("Node does not Exists") + return nil, fmt.Errorf("node does not Exists") } machineObject, err := m.machineLister.Machines(m.namespace).Get(machine.Name) @@ -377,84 +408,21 @@ func (m *McmManager) GetMachineDeploymentForMachine(machine *Ref) (*MachineDeplo return nil, fmt.Errorf("unable to find parent MachineDeployment of given MachineSet object %s %v", machineSetName, err) } - mcmRef := Ref{ - Name: machineDeploymentName, - Namespace: m.namespace, + machineDeployment, ok := m.nodeGroups[types.NamespacedName{Namespace: m.namespace, Name: machineDeploymentName}] + if !ok { + return nil, fmt.Errorf("machineDeployment %s not found in the list of machine deployments", machineDeploymentName) } - - discoveryOpts := m.discoveryOpts - specs := discoveryOpts.NodeGroupSpecs - var min, max int - for _, spec := range specs { - s, err := dynamic.SpecFromString(spec, true) - if err != nil { - return nil, fmt.Errorf("Error occurred while parsing the spec") - } - - str := strings.Split(s.Name, ".") - _, Name := str[0], str[1] - - if Name == machineDeploymentName { - min = s.MinSize - max = s.MaxSize - break - } - } - - return &MachineDeployment{ - mcmRef, - m, - min, - max, - }, nil + return machineDeployment, nil } // Refresh method, for each machine deployment, will reset the priority of the machines if the number of annotated machines is more than desired. // It will select the machines to reset the priority based on the descending order of creation timestamp. func (m *McmManager) Refresh() error { - machineDeployments, err := m.machineDeploymentLister.MachineDeployments(m.namespace).List(labels.Everything()) - if err != nil { - klog.Errorf("[Refresh] unable to list machine deployments") - return err + var collectiveError []error + for _, nodeGroup := range m.nodeGroups { + collectiveError = append(collectiveError, nodeGroup.Refresh()) } - var collectiveError error - for _, machineDeployment := range machineDeployments { - // ignore the machine deployment if it is in rolling update - if !isRollingUpdateFinished(machineDeployment) { - klog.Infof("[Refresh] machine deployment %s is under rolling update, skipping", machineDeployment.Name) - continue - } - replicas := machineDeployment.Spec.Replicas - // check if number of annotated machine objects is more than desired and correspondingly reset the priority annotation value if needed. - machines, err := m.getMachinesForMachineDeployment(machineDeployment.Name) - if err != nil { - klog.Errorf("[Refresh] failed to get machines for machine deployment %s, hence skipping it. Err: %v", machineDeployment.Name, err.Error()) - collectiveError = errors.Join(collectiveError, err) - continue - } - var machinesMarkedForDeletion []*v1alpha1.Machine - for _, machine := range machines { - // no need to reset priority for machines already in termination or failed phase - if machine.Status.CurrentStatus.Phase == v1alpha1.MachineTerminating || machine.Status.CurrentStatus.Phase == v1alpha1.MachineFailed { - continue - } - if annotValue, ok := machine.Annotations[machinePriorityAnnotation]; ok && annotValue == priorityValueForCandidateMachines { - machinesMarkedForDeletion = append(machinesMarkedForDeletion, machine) - } - } - if int(replicas) > len(machines)-len(machinesMarkedForDeletion) { - slices.SortStableFunc(machinesMarkedForDeletion, func(m1, m2 *v1alpha1.Machine) int { - return -m1.CreationTimestamp.Compare(m2.CreationTimestamp.Time) - }) - diff := int(replicas) - len(machines) + len(machinesMarkedForDeletion) - targetRefs := make([]*Ref, 0, diff) - for i := 0; i < min(diff, len(machinesMarkedForDeletion)); i++ { - targetRefs = append(targetRefs, &Ref{Name: machinesMarkedForDeletion[i].Name, Namespace: machinesMarkedForDeletion[i].Namespace}) - } - collectiveError = errors.Join(collectiveError, m.resetPriorityForMachines(targetRefs)) - } - } - return collectiveError + return errors.Join(collectiveError...) } // Cleanup does nothing at the moment. @@ -463,20 +431,19 @@ func (m *McmManager) Cleanup() { return } -// GetMachineDeploymentSize returns the replicas field of the MachineDeployment -func (m *McmManager) GetMachineDeploymentSize(machinedeployment *MachineDeployment) (int64, error) { - md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(machinedeployment.Name) +// GetMachineDeploymentSize returns the replicas field of the MachineDeployment corresponding to the given node group. +func (m *McmManager) GetMachineDeploymentSize(nodeGroupName string) (int64, error) { + md, err := m.GetMachineDeploymentObject(nodeGroupName) if err != nil { - return 0, fmt.Errorf("Unable to fetch MachineDeployment object %s %v", machinedeployment.Name, err) + return 0, err } return int64(md.Spec.Replicas), nil } -// SetMachineDeploymentSize sets the desired size for the Machinedeployment. -func (m *McmManager) SetMachineDeploymentSize(ctx context.Context, machinedeployment *MachineDeployment, size int64) (bool, error) { - md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(machinedeployment.Name) +// SetMachineDeploymentSize sets the desired size for the backing MachineDeployment of the given nodeGroup. +func (m *McmManager) SetMachineDeploymentSize(ctx context.Context, nodeGroup *NodeGroupImpl, size int64) (bool, error) { + md, err := m.GetMachineDeploymentObject(nodeGroup.Name) if err != nil { - klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %v", machinedeployment.Name, err) return true, err } // don't scale down during rolling update, as that could remove ready node with workload @@ -486,51 +453,59 @@ func (m *McmManager) SetMachineDeploymentSize(ctx context.Context, machinedeploy clone := md.DeepCopy() clone.Spec.Replicas = int32(size) - _, err = m.machineClient.MachineDeployments(machinedeployment.Namespace).Update(ctx, clone, metav1.UpdateOptions{}) + _, err = m.machineClient.MachineDeployments(nodeGroup.Namespace).Update(ctx, clone, metav1.UpdateOptions{}) return true, err } -// DeleteMachines annotates the target machines and also reduces the desired replicas of the MachineDeployment. -func (m *McmManager) DeleteMachines(targetMachineRefs []*Ref) error { +// DeleteMachines annotates the target machines and also reduces the desired replicas of the corresponding MachineDeployment. +func (m *McmManager) DeleteMachines(targetMachineRefs []*types.NamespacedName) error { if len(targetMachineRefs) == 0 { return nil } - commonMachineDeployment, err := m.GetMachineDeploymentForMachine(targetMachineRefs[0]) + commonNodeGroup, err := m.GetNodeGroupImpl(targetMachineRefs[0]) if err != nil { return err } + // acquire the mutex + commonNodeGroup.scalingMutex.Lock() + defer commonNodeGroup.scalingMutex.Unlock() // get the machine deployment and return if rolling update is not finished - md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(commonMachineDeployment.Name) + md, err := m.GetMachineDeploymentObject(commonNodeGroup.Name) if err != nil { - klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %v", commonMachineDeployment.Name, err) return err } if !isRollingUpdateFinished(md) { - return fmt.Errorf("MachineDeployment %s is under rolling update , cannot reduce replica count", commonMachineDeployment.Name) + return fmt.Errorf("MachineDeployment %s is under rolling update , cannot reduce replica count", commonNodeGroup.Name) } + machineNamesMarkedByCA := getMachineNamesMarkedByCAForDeletion(md) // update priorities of machines to be deleted except the ones already in termination to 1 - scaleDownAmount, err := m.prioritizeMachinesForDeletion(targetMachineRefs) + machineNamesWithPrio1, err := m.prioritizeMachinesForDeletion(targetMachineRefs) if err != nil { return err } + machineNamesMarkedByCA = append(machineNamesMarkedByCA, machineNamesWithPrio1...) // Trying to update the machineDeployment till the deadline err = m.retry(func(ctx context.Context) (bool, error) { - return m.scaleDownMachineDeployment(ctx, commonMachineDeployment.Name, scaleDownAmount) - }, "MachineDeployment", "update", commonMachineDeployment.Name) + return m.scaleDownAndAnnotateMachineDeployment(ctx, commonNodeGroup.Name, len(machineNamesWithPrio1), createMachinesMarkedForDeletionAnnotationValue(machineNamesMarkedByCA)) + }, "MachineDeployment", "update", commonNodeGroup.Name) if err != nil { - klog.Errorf("unable to scale in machine deployment %s, will reset priority of target machines, Error: %v", commonMachineDeployment.Name, err) - return errors.Join(err, m.resetPriorityForMachines(targetMachineRefs)) + klog.Errorf("unable to scale in machine deployment %s, will reset priority of target machines, Error: %v", commonNodeGroup.Name, err) + return errors.Join(err, m.resetPriorityForMachines(machineNamesWithPrio1)) } - return nil + return err } // resetPriorityForMachines resets the priority of machines passed in the argument to defaultPriorityValue -func (m *McmManager) resetPriorityForMachines(mcRefs []*Ref) error { - var collectiveError error - for _, mcRef := range mcRefs { - machine, err := m.machineLister.Machines(m.namespace).Get(mcRef.Name) +func (m *McmManager) resetPriorityForMachines(mcNames []string) error { + var collectiveError []error + for _, mcName := range mcNames { + machine, err := m.machineLister.Machines(m.namespace).Get(mcName) + if kube_errors.IsNotFound(err) { + klog.Warningf("Machine %s not found, skipping resetting priority annotation", mcName) + continue + } if err != nil { - collectiveError = errors.Join(collectiveError, fmt.Errorf("unable to get Machine object %s, Error: %v", mcRef, err)) + collectiveError = append(collectiveError, fmt.Errorf("unable to get Machine object %s, Error: %v", mcName, err)) continue } ctx, cancelFn := context.WithDeadline(context.Background(), time.Now().Add(defaultResetAnnotationTimeout)) @@ -544,16 +519,18 @@ func (m *McmManager) resetPriorityForMachines(mcRefs []*Ref) error { return nil }() if err != nil { - collectiveError = errors.Join(collectiveError, fmt.Errorf("could not reset priority annotation on machine %s, Error: %v", machine.Name, err)) + collectiveError = append(collectiveError, fmt.Errorf("could not reset priority annotation on machine %s, Error: %v", machine.Name, err)) continue } } - return collectiveError + return errors.Join(collectiveError...) } // prioritizeMachinesForDeletion prioritizes the targeted machines by updating their priority annotation to 1 -func (m *McmManager) prioritizeMachinesForDeletion(targetMachineRefs []*Ref) (int, error) { +func (m *McmManager) prioritizeMachinesForDeletion(targetMachineRefs []*types.NamespacedName) ([]string, error) { var expectedToTerminateMachineNodePairs = make(map[string]string) + var prio1MarkedMachineNames []string + for _, machineRef := range targetMachineRefs { // Trying to update the priority of machineRef till m.maxRetryTimeout if err := m.retry(func(ctx context.Context) (bool, error) { @@ -569,15 +546,20 @@ func (m *McmManager) prioritizeMachinesForDeletion(targetMachineRefs []*Ref) (in if isMachineFailedOrTerminating(mc) { return false, nil } + if mc.Annotations[machinePriorityAnnotation] == priorityValueForDeletionCandidateMachines { + klog.Infof("Machine %q priority is already set to 1, hence skipping the update", mc.Name) + return false, nil + } + prio1MarkedMachineNames = append(prio1MarkedMachineNames, machineRef.Name) expectedToTerminateMachineNodePairs[mc.Name] = mc.Labels["node"] - return m.updateAnnotationOnMachine(ctx, mc.Name, machinePriorityAnnotation, priorityValueForCandidateMachines) + return m.updateAnnotationOnMachine(ctx, mc.Name, machinePriorityAnnotation, priorityValueForDeletionCandidateMachines) }, "Machine", "update", machineRef.Name); err != nil { klog.Errorf("could not prioritize machine %s for deletion, aborting scale in of machine deployment, Error: %v", machineRef.Name, err) - return 0, fmt.Errorf("could not prioritize machine %s for deletion, aborting scale in of machine deployment, Error: %v", machineRef.Name, err) + return nil, fmt.Errorf("could not prioritize machine %s for deletion, aborting scale in of machine deployment, Error: %v", machineRef.Name, err) } } klog.V(2).Infof("Expected to remove following {machineRef: corresponding node} pairs %s", expectedToTerminateMachineNodePairs) - return len(expectedToTerminateMachineNodePairs), nil + return prio1MarkedMachineNames, nil } // updateAnnotationOnMachine returns error only when updating the annotations on machine has been failing consequently and deadline is crossed @@ -592,16 +574,10 @@ func (m *McmManager) updateAnnotationOnMachine(ctx context.Context, mcName strin return true, err } clone := machine.DeepCopy() - if clone.Annotations != nil { - if clone.Annotations[key] == val { - klog.Infof("Machine %q priority is already set to 1, hence skipping the update", machine.Name) - return false, nil - } - clone.Annotations[key] = val - } else { + if clone.Annotations == nil { clone.Annotations = make(map[string]string) - clone.Annotations[key] = val } + clone.Annotations[key] = val _, err = m.machineClient.Machines(machine.Namespace).Update(ctx, clone, metav1.UpdateOptions{}) if err == nil { klog.Infof("Machine %s marked with priority %s successfully", mcName, val) @@ -609,11 +585,12 @@ func (m *McmManager) updateAnnotationOnMachine(ctx context.Context, mcName strin return true, err } -// scaleDownMachineDeployment scales down the machine deployment by the provided scaleDownAmount and returns the updated spec.Replicas after scale down. -func (m *McmManager) scaleDownMachineDeployment(ctx context.Context, mdName string, scaleDownAmount int) (bool, error) { - md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(mdName) +// scaleDownAndAnnotateMachineDeployment scales down the machine deployment by the provided scaleDownAmount and returns the updated spec.Replicas after scale down. +// It also updates the machines-marked-by-ca-for-deletion annotation on the machine deployment with the list of existing machines marked for deletion. +// NOTE: Callers are expected to take the NodeGroup scalingMutex before invoking this method. +func (m *McmManager) scaleDownAndAnnotateMachineDeployment(ctx context.Context, mdName string, scaleDownAmount int, markedMachines string) (bool, error) { + md, err := m.GetMachineDeploymentObject(mdName) if err != nil { - klog.Errorf("Unable to fetch MachineDeployment object %s, Error: %v", mdName, err) return true, err } mdclone := md.DeepCopy() @@ -626,6 +603,10 @@ func (m *McmManager) scaleDownMachineDeployment(ctx context.Context, mdName stri return false, fmt.Errorf("cannot delete machines in machine deployment %s, expected decrease in replicas %d is more than current replicas %d", mdName, scaleDownAmount, mdclone.Spec.Replicas) } mdclone.Spec.Replicas = expectedReplicas + if mdclone.Annotations == nil { + mdclone.Annotations = make(map[string]string) + } + mdclone.Annotations[machinesMarkedByCAForDeletionAnnotation] = markedMachines _, err = m.machineClient.MachineDeployments(mdclone.Namespace).Update(ctx, mdclone, metav1.UpdateOptions{}) if err != nil { return true, fmt.Errorf("unable to scale in machine deployment %s, Error: %w", mdName, err) @@ -659,10 +640,10 @@ func (m *McmManager) retry(fn func(ctx context.Context) (bool, error), resourceT } } -// GetInstancesForMachineDeployment returns list of cloudprovider.Instance for machines which belongs to the MachineDeployment. -func (m *McmManager) GetInstancesForMachineDeployment(machinedeployment *MachineDeployment) ([]cloudprovider.Instance, error) { +// GetInstancesForMachineDeployment returns list of cloudprovider.Instance for machines with the given nodeGroupName. +func (m *McmManager) GetInstancesForMachineDeployment(nodeGroupName string) ([]cloudprovider.Instance, error) { var ( - list = []string{machinedeployment.Name} + list = []string{nodeGroupName} selector = labels.NewSelector() req, _ = labels.NewRequirement("name", selection.Equals, list) ) @@ -670,7 +651,7 @@ func (m *McmManager) GetInstancesForMachineDeployment(machinedeployment *Machine selector = selector.Add(*req) machineList, err := m.machineLister.Machines(m.namespace).List(selector) if err != nil { - return nil, fmt.Errorf("unable to fetch list of Machine objects %v for machinedeployment %q", err, machinedeployment.Name) + return nil, fmt.Errorf("unable to fetch list of Machine objects %v for MachineDeployment %q", err, nodeGroupName) } nodeList, err := m.nodeLister.List(labels.Everything()) @@ -756,21 +737,19 @@ func validateNodeTemplate(nodeTemplateAttributes *v1alpha1.NodeTemplate) error { // GetMachineDeploymentAnnotations returns the annotations present on the machine deployment for the provided machine deployment name func (m *McmManager) GetMachineDeploymentAnnotations(machineDeploymentName string) (map[string]string, error) { - md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(machineDeploymentName) + md, err := m.GetMachineDeploymentObject(machineDeploymentName) if err != nil { - return nil, fmt.Errorf("unable to fetch MachineDeployment object %s, Error: %v", machineDeploymentName, err) + return nil, err } - return md.Annotations, nil } // GetMachineDeploymentNodeTemplate returns the NodeTemplate of a node belonging to the same worker pool as the machinedeployment // If no node present then it forms the nodeTemplate using the one present in machineClass -func (m *McmManager) GetMachineDeploymentNodeTemplate(machinedeployment *MachineDeployment) (*nodeTemplate, error) { - - md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(machinedeployment.Name) +func (m *McmManager) GetMachineDeploymentNodeTemplate(nodeGroupName string) (*nodeTemplate, error) { + md, err := m.GetMachineDeploymentObject(nodeGroupName) if err != nil { - return nil, fmt.Errorf("unable to fetch MachineDeployment object %s, Error: %v", machinedeployment.Name, err) + return nil, err } var ( @@ -846,12 +825,12 @@ func (m *McmManager) GetMachineDeploymentNodeTemplate(machinedeployment *Machine var providerSpec *awsapis.AWSProviderSpec err = json.Unmarshal(mc.ProviderSpec.Raw, &providerSpec) if err != nil { - return nil, fmt.Errorf("Unable to convert from %s to %s for %s, Error: %v", kindMachineClass, providerAWS, machinedeployment.Name, err) + return nil, fmt.Errorf("unable to convert from %s to %s for %s, Error: %v", kindMachineClass, providerAWS, nodeGroupName, err) } awsInstance, exists := AWSInstanceTypes[providerSpec.MachineType] if !exists { - return nil, fmt.Errorf("Unable to fetch details for VM type %s", providerSpec.MachineType) + return nil, fmt.Errorf("unable to fetch details for VM type %s", providerSpec.MachineType) } instance = instanceType{ InstanceType: awsInstance.InstanceType, @@ -868,11 +847,11 @@ func (m *McmManager) GetMachineDeploymentNodeTemplate(machinedeployment *Machine var providerSpec *azureapis.AzureProviderSpec err = json.Unmarshal(mc.ProviderSpec.Raw, &providerSpec) if err != nil { - return nil, fmt.Errorf("Unable to convert from %s to %s for %s, Error: %v", kindMachineClass, providerAzure, machinedeployment.Name, err) + return nil, fmt.Errorf("unable to convert from %s to %s for %s, Error: %v", kindMachineClass, providerAzure, nodeGroupName, err) } azureInstance, exists := AzureInstanceTypes[providerSpec.Properties.HardwareProfile.VMSize] if !exists { - return nil, fmt.Errorf("Unable to fetch details for VM type %s", providerSpec.Properties.HardwareProfile.VMSize) + return nil, fmt.Errorf("unable to fetch details for VM type %s", providerSpec.Properties.HardwareProfile.VMSize) } instance = instanceType{ InstanceType: azureInstance.InstanceType, @@ -916,6 +895,16 @@ func (m *McmManager) GetMachineDeploymentNodeTemplate(machinedeployment *Machine return nodeTmpl, nil } +// GetMachineDeploymentObject returns the MachineDeployment object for the provided machine deployment name +func (m *McmManager) GetMachineDeploymentObject(mdName string) (*v1alpha1.MachineDeployment, error) { + md, err := m.machineDeploymentLister.MachineDeployments(m.namespace).Get(mdName) + if err != nil { + klog.Errorf("unable to fetch MachineDeployment object %s, Error: %v", mdName, err) + return nil, fmt.Errorf("unable to fetch MachineDeployment object %s, Error: %v", mdName, err) + } + return md, nil +} + func isRollingUpdateFinished(md *v1alpha1.MachineDeployment) bool { for _, cond := range md.Status.Conditions { switch { @@ -1058,6 +1047,30 @@ func buildGenericLabels(template *nodeTemplate, nodeName string) map[string]stri return result } +func buildNodeGroupImplFromSpec(value string, mcmManager *McmManager) (*NodeGroupImpl, error) { + spec, err := dynamic.SpecFromString(value, true) + if err != nil { + return nil, fmt.Errorf("failed to parse node group spec: %v", err) + } + s := strings.Split(spec.Name, ".") + Namespace, Name := s[0], s[1] + nodeGroup := buildNodeGroupImpl(mcmManager, spec.MinSize, spec.MaxSize, Namespace, Name) + return nodeGroup, nil +} + +func buildNodeGroupImpl(mcmManager *McmManager, minSize int, maxSize int, namespace string, name string) *NodeGroupImpl { + return &NodeGroupImpl{ + mcmManager: mcmManager, + minSize: minSize, + maxSize: maxSize, + scalingMutex: sync.Mutex{}, + NamespacedName: types.NamespacedName{ + Name: name, + Namespace: namespace, + }, + } +} + // isMachineFailedOrTerminating returns true if machine is already being terminated or considered for termination by autoscaler. func isMachineFailedOrTerminating(machine *v1alpha1.Machine) bool { if !machine.GetDeletionTimestamp().IsZero() || machine.Status.CurrentStatus.Phase == v1alpha1.MachineFailed { @@ -1075,3 +1088,7 @@ func filterExtendedResources(allResources v1.ResourceList) (extendedResources v1 }) return } + +func createMachinesMarkedForDeletionAnnotationValue(machineNames []string) string { + return strings.Join(machineNames, ",") +} diff --git a/cluster-autoscaler/cloudprovider/mcm/test_utils.go b/cluster-autoscaler/cloudprovider/mcm/test_utils.go index 3c3f55e4c696..65c78c553e50 100644 --- a/cluster-autoscaler/cloudprovider/mcm/test_utils.go +++ b/cluster-autoscaler/cloudprovider/mcm/test_utils.go @@ -7,6 +7,7 @@ package mcm import ( "fmt" appsv1 "k8s.io/api/apps/v1" + types "k8s.io/apimachinery/pkg/types" "k8s.io/utils/pointer" "testing" "time" @@ -24,7 +25,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" customfake "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/mcm/fakeclient" - deletetaint "k8s.io/autoscaler/cluster-autoscaler/utils/taints" appsv1informers "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers" ) @@ -42,11 +42,11 @@ func newMachineDeployments( labels map[string]string, ) []*v1alpha1.MachineDeployment { machineDeployments := make([]*v1alpha1.MachineDeployment, machineDeploymentCount) - for i := range machineDeployments { + for i := 0; i < machineDeploymentCount; i++ { machineDeployment := &v1alpha1.MachineDeployment{ TypeMeta: metav1.TypeMeta{ APIVersion: "machine.sapcloud.io", - Kind: "MachineDeployment", + Kind: "NodeGroupImpl", }, ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf("machinedeployment-%d", i+1), @@ -74,7 +74,7 @@ func newMachineSets( ) []*v1alpha1.MachineSet { machineSets := make([]*v1alpha1.MachineSet, machineSetCount) - for i := range machineSets { + for i := 0; i < machineSetCount; i++ { ms := &v1alpha1.MachineSet{ TypeMeta: metav1.TypeMeta{ APIVersion: "machine.sapcloud.io", @@ -97,10 +97,9 @@ func newMachine( statusTemplate *v1alpha1.MachineStatus, mdName, msName string, priorityAnnotationValue string, - setDeletionTimeStamp, setNodeLabel bool, ) *v1alpha1.Machine { - m := newMachines(1, providerId, statusTemplate, mdName, msName, []string{priorityAnnotationValue}, []bool{setDeletionTimeStamp})[0] + m := newMachines(1, providerId, statusTemplate, mdName, msName, []string{priorityAnnotationValue})[0] m.Name = name m.Spec.ProviderID = providerId if !setNodeLabel { @@ -109,26 +108,34 @@ func newMachine( return m } +func generateNames(prefix string, count int) []string { + names := make([]string, count) + for i := 0; i < count; i++ { + names[i] = fmt.Sprintf("%s-%d", prefix, i+1) + } + return names +} + func newMachines( machineCount int, providerIdGenerateName string, statusTemplate *v1alpha1.MachineStatus, mdName, msName string, priorityAnnotationValues []string, - setDeletionTimeStamp []bool, ) []*v1alpha1.Machine { machines := make([]*v1alpha1.Machine, machineCount) - + machineNames := generateNames("machine", machineCount) + nodeNames := generateNames("node", machineCount) currentTime := metav1.Now() - for i := range machines { + for i := 0; i < machineCount; i++ { m := &v1alpha1.Machine{ TypeMeta: metav1.TypeMeta{ APIVersion: "machine.sapcloud.io", Kind: "Machine", }, ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("machine-%d", i+1), + Name: machineNames[i], Namespace: testNamespace, OwnerReferences: []metav1.OwnerReference{ {Name: msName}, @@ -143,12 +150,12 @@ func newMachines( m.Spec = v1alpha1.MachineSpec{ProviderID: fmt.Sprintf("%s/i%d", providerIdGenerateName, i+1)} } - m.Labels["node"] = fmt.Sprintf("node-%d", i+1) - if setDeletionTimeStamp[i] { - m.ObjectMeta.DeletionTimestamp = ¤tTime - } + m.Labels["node"] = nodeNames[i] if statusTemplate != nil { m.Status = *newMachineStatus(statusTemplate) + if m.Status.CurrentStatus.Phase == v1alpha1.MachineTerminating { + m.DeletionTimestamp = ¤tTime + } } machines[i] = m } @@ -158,9 +165,8 @@ func newMachines( func newNode( nodeName, providerId string, - addToBeDeletedTaint bool, ) *corev1.Node { - node := newNodes(1, providerId, []bool{addToBeDeletedTaint})[0] + node := newNodes(1, providerId)[0] clone := node.DeepCopy() clone.Name = nodeName clone.Spec.ProviderID = providerId @@ -170,30 +176,20 @@ func newNode( func newNodes( nodeCount int, providerIdGenerateName string, - addToBeDeletedTaint []bool, ) []*corev1.Node { - nodes := make([]*corev1.Node, nodeCount) - for i := range nodes { - var taints []corev1.Taint - if addToBeDeletedTaint[i] { - taints = append(taints, corev1.Taint{ - Key: deletetaint.ToBeDeletedTaint, - Value: testTaintValue, - Effect: corev1.TaintEffectNoSchedule, - }) - } + nodeNames := generateNames("node", nodeCount) + for i := 0; i < nodeCount; i++ { node := &corev1.Node{ TypeMeta: metav1.TypeMeta{ APIVersion: "appsv1", Kind: "Node", }, ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("node-%d", i+1), + Name: nodeNames[i], }, Spec: corev1.NodeSpec{ ProviderID: fmt.Sprintf("%s/i%d", providerIdGenerateName, i+1), - Taints: taints, }, } @@ -287,6 +283,7 @@ func createMcmManager( discoveryOpts: cloudprovider.NodeGroupDiscoveryOptions{ NodeGroupSpecs: nodeGroups, }, + nodeGroups: make(map[types.NamespacedName]*NodeGroupImpl), deploymentLister: appsControlSharedInformers.Deployments().Lister(), machineClient: fakeTypedMachineClient, machineDeploymentLister: machineDeployments.Lister(), @@ -297,7 +294,7 @@ func createMcmManager( maxRetryTimeout: 5 * time.Second, retryInterval: 1 * time.Second, } - + g.Expect(mcmManager.generateMachineDeploymentMap()).To(gomega.Succeed()) hasSyncedCachesFns := []cache.InformerSynced{ nodes.Informer().HasSynced, machines.Informer().HasSynced,