-
Notifications
You must be signed in to change notification settings - Fork 25
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix Race conditions in Targeted Deletion of machines by CA #341
base: machine-controller-manager-provider
Are you sure you want to change the base?
Changes from 4 commits
43cffaf
0d939b0
98f20d3
56d80ac
f3774f4
9063248
68d2046
b2e1c3c
c515f39
063f07a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,8 +24,13 @@ package mcm | |
import ( | ||
"context" | ||
"fmt" | ||
"github.com/gardener/machine-controller-manager/pkg/apis/machine/v1alpha1" | ||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
"k8s.io/apimachinery/pkg/util/sets" | ||
"slices" | ||
"strconv" | ||
"strings" | ||
"sync" | ||
"time" | ||
|
||
apiv1 "k8s.io/api/core/v1" | ||
|
@@ -67,15 +72,14 @@ const ( | |
// MCMCloudProvider implements the cloud provider interface for machine-controller-manager | ||
// Reference: https://github.com/gardener/machine-controller-manager | ||
type mcmCloudProvider struct { | ||
mcmManager *McmManager | ||
machinedeployments map[types.NamespacedName]*MachineDeployment | ||
resourceLimiter *cloudprovider.ResourceLimiter | ||
mcmManager *McmManager | ||
resourceLimiter *cloudprovider.ResourceLimiter | ||
} | ||
|
||
// BuildMcmCloudProvider builds CloudProvider implementation for machine-controller-manager. | ||
func BuildMcmCloudProvider(mcmManager *McmManager, resourceLimiter *cloudprovider.ResourceLimiter) (cloudprovider.CloudProvider, error) { | ||
if mcmManager.discoveryOpts.StaticDiscoverySpecified() { | ||
return buildStaticallyDiscoveringProvider(mcmManager, mcmManager.discoveryOpts.NodeGroupSpecs, resourceLimiter) | ||
return buildStaticallyDiscoveringProvider(mcmManager, resourceLimiter) | ||
} | ||
return nil, fmt.Errorf("Failed to build an mcm cloud provider: Either node group specs or node group auto discovery spec must be specified") | ||
} | ||
|
@@ -96,16 +100,10 @@ func BuildMCM(opts config.AutoscalingOptions, do cloudprovider.NodeGroupDiscover | |
return provider | ||
} | ||
|
||
func buildStaticallyDiscoveringProvider(mcmManager *McmManager, specs []string, resourceLimiter *cloudprovider.ResourceLimiter) (*mcmCloudProvider, error) { | ||
func buildStaticallyDiscoveringProvider(mcmManager *McmManager, resourceLimiter *cloudprovider.ResourceLimiter) (*mcmCloudProvider, error) { | ||
mcm := &mcmCloudProvider{ | ||
mcmManager: mcmManager, | ||
machinedeployments: make(map[types.NamespacedName]*MachineDeployment), | ||
resourceLimiter: resourceLimiter, | ||
} | ||
for _, spec := range specs { | ||
if err := mcm.addNodeGroup(spec); err != nil { | ||
return nil, err | ||
} | ||
mcmManager: mcmManager, | ||
resourceLimiter: resourceLimiter, | ||
} | ||
return mcm, nil | ||
} | ||
|
@@ -116,31 +114,14 @@ func (mcm *mcmCloudProvider) Cleanup() error { | |
return nil | ||
} | ||
|
||
// addNodeGroup adds node group defined in string spec. Format: | ||
// minNodes:maxNodes:namespace.machineDeploymentName | ||
func (mcm *mcmCloudProvider) addNodeGroup(spec string) error { | ||
machinedeployment, err := buildMachineDeploymentFromSpec(spec, mcm.mcmManager) | ||
if err != nil { | ||
return err | ||
} | ||
mcm.addMachineDeployment(machinedeployment) | ||
return nil | ||
} | ||
|
||
func (mcm *mcmCloudProvider) addMachineDeployment(machinedeployment *MachineDeployment) { | ||
key := types.NamespacedName{Namespace: machinedeployment.Namespace, Name: machinedeployment.Name} | ||
mcm.machinedeployments[key] = machinedeployment | ||
return | ||
} | ||
|
||
func (mcm *mcmCloudProvider) Name() string { | ||
return "machine-controller-manager" | ||
} | ||
|
||
// NodeGroups returns all node groups configured for this cloud provider. | ||
func (mcm *mcmCloudProvider) NodeGroups() []cloudprovider.NodeGroup { | ||
result := make([]cloudprovider.NodeGroup, 0, len(mcm.machinedeployments)) | ||
for _, machinedeployment := range mcm.machinedeployments { | ||
result := make([]cloudprovider.NodeGroup, 0, len(mcm.mcmManager.machineDeployments)) | ||
for _, machinedeployment := range mcm.mcmManager.machineDeployments { | ||
if machinedeployment.maxSize == 0 { | ||
continue | ||
} | ||
|
@@ -172,7 +153,7 @@ func (mcm *mcmCloudProvider) NodeGroupForNode(node *apiv1.Node) (cloudprovider.N | |
} | ||
|
||
key := types.NamespacedName{Namespace: md.Namespace, Name: md.Name} | ||
_, isManaged := mcm.machinedeployments[key] | ||
_, isManaged := mcm.mcmManager.machineDeployments[key] | ||
if !isManaged { | ||
klog.V(4).Infof("Skipped node %v, it's not managed by this controller", node.Spec.ProviderID) | ||
return nil, nil | ||
|
@@ -293,8 +274,9 @@ type MachineDeployment struct { | |
|
||
mcmManager *McmManager | ||
|
||
minSize int | ||
maxSize int | ||
scalingMutex sync.Mutex | ||
minSize int | ||
maxSize int | ||
} | ||
|
||
// MaxSize returns maximum size of the node group. | ||
|
@@ -343,6 +325,8 @@ func (machinedeployment *MachineDeployment) IncreaseSize(delta int) error { | |
if delta <= 0 { | ||
return fmt.Errorf("size increase must be positive") | ||
} | ||
machinedeployment.scalingMutex.Lock() | ||
defer machinedeployment.scalingMutex.Unlock() | ||
size, err := machinedeployment.mcmManager.GetMachineDeploymentSize(machinedeployment) | ||
if err != nil { | ||
return err | ||
|
@@ -366,6 +350,8 @@ func (machinedeployment *MachineDeployment) DecreaseTargetSize(delta int) error | |
if delta >= 0 { | ||
return fmt.Errorf("size decrease size must be negative") | ||
} | ||
machinedeployment.scalingMutex.Lock() | ||
defer machinedeployment.scalingMutex.Unlock() | ||
size, err := machinedeployment.mcmManager.GetMachineDeploymentSize(machinedeployment) | ||
if err != nil { | ||
return err | ||
|
@@ -380,6 +366,54 @@ func (machinedeployment *MachineDeployment) DecreaseTargetSize(delta int) error | |
}, "MachineDeployment", "update", machinedeployment.Name) | ||
} | ||
|
||
// Refresh resets the priority annotation for the machines that are not present in machines-marked-by-ca-for-deletion annotation on the machineDeployment | ||
func (machineDeployment *MachineDeployment) Refresh() error { | ||
machineDeployment.scalingMutex.Lock() | ||
defer machineDeployment.scalingMutex.Unlock() | ||
mcd, err := machineDeployment.mcmManager.machineDeploymentLister.MachineDeployments(machineDeployment.Namespace).Get(machineDeployment.Name) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is repeated nearly a dozen times everywhere including common error handling: |
||
if err != nil { | ||
return fmt.Errorf("failed to get machine deployment %s: %v", machineDeployment.Name, err) | ||
} | ||
// ignore the machine deployment if it is in rolling update | ||
if !isRollingUpdateFinished(mcd) { | ||
klog.Infof("machine deployment %s is under rolling update, skipping", machineDeployment.Name) | ||
return nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn't we return an There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should remove this check. Even if a machineDeployment is under rolling update, we should allow the annotation update if needed. wdyt? |
||
} | ||
markedMachines := sets.New(strings.Split(mcd.Annotations[machinesMarkedByCAForDeletion], ",")...) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please move this to a small function |
||
machines, err := machineDeployment.mcmManager.getMachinesForMachineDeployment(machineDeployment.Name) | ||
if err != nil { | ||
klog.Errorf("[Refresh] failed to get machines for machine deployment %s, hence skipping it. Err: %v", machineDeployment.Name, err.Error()) | ||
return err | ||
} | ||
var incorrectlyMarkedMachines []*Ref | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe we can omit the |
||
for _, machine := range machines { | ||
// no need to reset priority for machines already in termination or failed phase | ||
if machine.Status.CurrentStatus.Phase == v1alpha1.MachineTerminating || machine.Status.CurrentStatus.Phase == v1alpha1.MachineFailed { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we already have a utiltiy method |
||
continue | ||
} | ||
if annotValue, ok := machine.Annotations[machinePriorityAnnotation]; ok && annotValue == priorityValueForCandidateMachines && !markedMachines.Has(machine.Name) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Out of curiosity, Why is this called There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can rename it to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should also add a comment here to explain what we are doing so next guy making a patch doesn't scratch his head. |
||
incorrectlyMarkedMachines = append(incorrectlyMarkedMachines, &Ref{Name: machine.Name, Namespace: machine.Namespace}) | ||
} | ||
} | ||
var updatedMarkedMachines []string | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
for machineName := range markedMachines { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. minor: Technically, you can create There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AFAIK There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes but it is not part of the apimachinery version we currently use. Anyways, we do not need sets, I have replaced them with slices. |
||
if slices.ContainsFunc(machines, func(mc *v1alpha1.Machine) bool { | ||
return mc.Name == machineName | ||
}) { | ||
updatedMarkedMachines = append(updatedMarkedMachines, machineName) | ||
} | ||
} | ||
clone := mcd.DeepCopy() | ||
clone.Annotations[machinesMarkedByCAForDeletion] = strings.Join(updatedMarkedMachines, ",") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This |
||
ctx, cancelFn := context.WithTimeout(context.Background(), machineDeployment.mcmManager.maxRetryTimeout) | ||
defer cancelFn() | ||
_, err = machineDeployment.mcmManager.machineClient.MachineDeployments(machineDeployment.Namespace).Update(ctx, clone, metav1.UpdateOptions{}) | ||
if err != nil { | ||
return err | ||
} | ||
return machineDeployment.mcmManager.resetPriorityForMachines(incorrectlyMarkedMachines) | ||
} | ||
|
||
// Belongs returns true if the given node belongs to the NodeGroup. | ||
// TODO: Implement this to iterate over machines under machinedeployment, and return true if node exists in list. | ||
func (machinedeployment *MachineDeployment) Belongs(node *apiv1.Node) (bool, error) { | ||
|
@@ -541,9 +575,10 @@ func buildMachineDeploymentFromSpec(value string, mcmManager *McmManager) (*Mach | |
|
||
func buildMachineDeployment(mcmManager *McmManager, minSize int, maxSize int, namespace string, name string) *MachineDeployment { | ||
return &MachineDeployment{ | ||
mcmManager: mcmManager, | ||
minSize: minSize, | ||
maxSize: maxSize, | ||
mcmManager: mcmManager, | ||
minSize: minSize, | ||
maxSize: maxSize, | ||
scalingMutex: sync.Mutex{}, | ||
Ref: Ref{ | ||
Name: name, | ||
Namespace: namespace, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why should a scaling mutex be used in
Refresh
?Scaling mutex by its very name signifies mutex meant to be taken for any scaling event and not when its trying to read/get machine deployments.
Also can this result in the next reconcile of
RunOnce
to be stuck because it cannot go beyondCloudProvider.Refresh
call?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need a mutex when running scaling operations or modifying attributes that directly affect the scaling operation like the newly introduced annotation. There is not any doubt about this as we HAVE to prevent concurrent scaleups and scsledowns due to the fact that our MCM scaling is async and that multiple Go routines can perform scale downs, leading to non-deterministic behaviour.
You have a point that
RunOnce
should return an error and not block if the mutex is already acquired. Will change the code to check if mutex is acquired and if so return an informative error. Will also test this change.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Modified to take a
TryLock
inNodeGroupImpl.Refresh()
. This is also a limited fix since it will block CA from changing other NodeGroups but won't block go-routines. I don't see any better way with current CA architecture. One possible improvement is to introduce a mutex with a timeout. If you find a better way, please let me know.