Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Suspend CA activities if MCM is offline #256

Merged
36 changes: 34 additions & 2 deletions cluster-autoscaler/cloudprovider/mcm/fakeclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,15 +415,16 @@ func NewMachineClientSet(objects ...runtime.Object) (*fakeuntyped.Clientset, *Fa

// FakeObjectTrackers is a struct containing all the controller fake object trackers
type FakeObjectTrackers struct {
ControlMachine, TargetCore *FakeObjectTracker
ControlMachine, TargetCore, ControlApps *FakeObjectTracker
}

// NewFakeObjectTrackers initializes fakeObjectTrackers initializes the fake object trackers
func NewFakeObjectTrackers(controlMachine, targetCore *FakeObjectTracker) *FakeObjectTrackers {
func NewFakeObjectTrackers(controlMachine, targetCore, controlApps *FakeObjectTracker) *FakeObjectTrackers {

fakeObjectTrackers := &FakeObjectTrackers{
ControlMachine: controlMachine,
TargetCore: targetCore,
ControlApps: controlApps,
}

return fakeObjectTrackers
Expand Down Expand Up @@ -483,6 +484,37 @@ func NewCoreClientSet(objects ...runtime.Object) (*Clientset, *FakeObjectTracker
return cs, o
}

// NewAppsV1ClientSet returns a clientset that will respond with the provided objects.
// It's backed by a very simple object tracker that processes creates, updates and deletions as-is,
// without applying any validations and/or defaults. It shouldn't be considered a replacement
// for a real clientset and is mostly useful in simple unit tests.
func NewAppsV1ClientSet(objects ...runtime.Object) (*Clientset, *FakeObjectTracker) {
sssash18 marked this conversation as resolved.
Show resolved Hide resolved

var scheme = runtime.NewScheme()
var codecs = serializer.NewCodecFactory(scheme)

metav1.AddToGroupVersion(scheme, schema.GroupVersion{Version: "v1", Group: "apps"})
_ = k8sfake.AddToScheme(scheme)

o := &FakeObjectTracker{
FakeWatcher: watch.NewFake(),
delegatee: k8stesting.NewObjectTracker(scheme, codecs.UniversalDecoder()),
}

for _, obj := range objects {
if err := o.Add(obj); err != nil {
panic(err)
}
}

cs := &Clientset{Clientset: &k8sfake.Clientset{}}
cs.FakeDiscovery = &fakediscovery.FakeDiscovery{Fake: &cs.Fake}
cs.Fake.AddReactor("*", "*", k8stesting.ObjectReaction(o))
cs.Fake.AddWatchReactor("*", o.watchReactionFunc)

return cs, o
}

// Clientset extends k8sfake.Clientset to override the Policy implementation.
// This is because the default Policy fake implementation does not propagate the
// eviction name.
Expand Down
21 changes: 16 additions & 5 deletions cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,17 @@ package mcm
import (
"context"
"fmt"
"strings"

apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"

"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/klog/v2"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
"strings"
)

const (
Expand Down Expand Up @@ -195,6 +193,19 @@ func (mcm *mcmCloudProvider) GetResourceLimiter() (*cloudprovider.ResourceLimite
// Refresh is called before every main loop and can be used to dynamically update cloud provider state.
// In particular the list of node groups returned by NodeGroups can change as a result of CloudProvider.Refresh().
func (mcm *mcmCloudProvider) Refresh() error {

namespace := mcm.mcmManager.namespace
deployment, err := mcm.mcmManager.deploymentLister.Deployments(namespace).Get("machine-controller-manager")
if err != nil {
klog.Errorf("failed to get machine-controller-manager deployment: %v", err.Error())
return err
sssash18 marked this conversation as resolved.
Show resolved Hide resolved
}

if !(deployment.Status.AvailableReplicas >= 1) {
klog.Errorf("machine-controller-manager is offline. Cluster autoscaler operations would be suspended.")
return errors.NewAutoscalerError(errors.CloudProviderError, "machine-controller-manager is offline. Cluster autoscaler operations would be suspended.")
sssash18 marked this conversation as resolved.
Show resolved Hide resolved
}

for _, machineDeployment := range mcm.machinedeployments {
err := mcm.mcmManager.resetPriorityForNotToBeDeletedMachines(machineDeployment.Name)
if err != nil {
Expand Down
70 changes: 61 additions & 9 deletions cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ import (
"context"
"errors"
"fmt"
v1 "k8s.io/api/apps/v1"
errors2 "k8s.io/apimachinery/pkg/api/errors"
sssash18 marked this conversation as resolved.
Show resolved Hide resolved
caerror "k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"math"
"strings"
"testing"
Expand Down Expand Up @@ -51,13 +54,14 @@ type setup struct {
machines []*v1alpha1.Machine
machineSets []*v1alpha1.MachineSet
machineDeployments []*v1alpha1.MachineDeployment
deployments []*v1.Deployment
sssash18 marked this conversation as resolved.
Show resolved Hide resolved
machineClasses []*v1alpha1.MachineClass
nodeGroups []string
targetCoreFakeResourceActions *customfake.ResourceActions
controlMachineFakeResourceActions *customfake.ResourceActions
}

func setupEnv(setup *setup) ([]runtime.Object, []runtime.Object) {
func setupEnv(setup *setup) ([]runtime.Object, []runtime.Object, []runtime.Object) {
var controlMachineObjects []runtime.Object
for _, o := range setup.machines {
controlMachineObjects = append(controlMachineObjects, o)
Expand All @@ -78,7 +82,12 @@ func setupEnv(setup *setup) ([]runtime.Object, []runtime.Object) {
targetCoreObjects = append(targetCoreObjects, o)
}

return controlMachineObjects, targetCoreObjects
var appsControlObjects []runtime.Object

for _, o := range setup.deployments {
appsControlObjects = append(appsControlObjects, o)
}
return controlMachineObjects, targetCoreObjects, appsControlObjects
}

func TestDeleteNodes(t *testing.T) {
Expand Down Expand Up @@ -279,8 +288,8 @@ func TestDeleteNodes(t *testing.T) {
g := NewWithT(t)
stop := make(chan struct{})
defer close(stop)
controlMachineObjects, targetCoreObjects := setupEnv(&entry.setup)
m, trackers, hasSyncedCacheFns := createMcmManager(t, stop, testNamespace, nil, controlMachineObjects, targetCoreObjects)
controlMachineObjects, targetCoreObjects, _ := setupEnv(&entry.setup)
m, trackers, hasSyncedCacheFns := createMcmManager(t, stop, testNamespace, nil, controlMachineObjects, targetCoreObjects, nil)
defer trackers.Stop()
waitForCacheSync(t, stop, hasSyncedCacheFns)

Expand Down Expand Up @@ -341,12 +350,53 @@ func TestRefresh(t *testing.T) {
}
table := []data{
{

himanshu-kun marked this conversation as resolved.
Show resolved Hide resolved
"should set available replicas of mcm as zero",
sssash18 marked this conversation as resolved.
Show resolved Hide resolved
setup{
nodes: newNodes(1, "fakeID", []bool{false}),
machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}),
machineDeployments: newMachineDeployments(1, 1, nil, nil, nil),
nodeGroups: []string{nodeGroup2},
deployments: newDeployments(0),
},
expect{
err: caerror.NewAutoscalerError(caerror.CloudProviderError, "machine-controller-manager is offline. Cluster autoscaler operations would be suspended."),
},
},
{

"should delete the deployment of mcm",
setup{
nodes: newNodes(1, "fakeID", []bool{false}),
machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}),
machineDeployments: newMachineDeployments(1, 1, nil, nil, nil),
nodeGroups: []string{nodeGroup2},
},
expect{
err: &errors2.StatusError{
ErrStatus: metav1.Status{
Status: "Failure",
Message: "deployment.apps \"machine-controller-manager\" not found",
Reason: "NotFound",
Details: &metav1.StatusDetails{
Name: "machine-controller-manager",
Group: "apps",
Kind: "deployment",
},
Code: 404,
},
},
},
},
{

"should reset priority of a machine with node without ToBeDeletedTaint to 3",
setup{
nodes: newNodes(1, "fakeID", []bool{false}),
machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}),
machineDeployments: newMachineDeployments(1, 1, nil, nil, nil),
nodeGroups: []string{nodeGroup2},
deployments: newDeployments(1),
},
expect{
machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3"}, []bool{false}),
Expand All @@ -360,6 +410,7 @@ func TestRefresh(t *testing.T) {
machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}),
machineDeployments: newMachineDeployments(1, 1, nil, nil, nil),
nodeGroups: []string{nodeGroup2},
deployments: newDeployments(1),
},
expect{
machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}),
Expand All @@ -377,7 +428,8 @@ func TestRefresh(t *testing.T) {
Update: customfake.CreateFakeResponse(math.MaxInt32, mcUpdateErrorMsg, 0),
},
},
nodeGroups: []string{nodeGroup2},
nodeGroups: []string{nodeGroup2},
deployments: newDeployments(1),
},
expect{
machines: []*v1alpha1.Machine{newMachine("machine-1", "fakeID-1", nil, "machinedeployment-1", "machineset-1", "1", false, true)},
Expand All @@ -392,8 +444,8 @@ func TestRefresh(t *testing.T) {
g := NewWithT(t)
stop := make(chan struct{})
defer close(stop)
controlMachineObjects, targetCoreObjects := setupEnv(&entry.setup)
m, trackers, hasSyncedCacheFns := createMcmManager(t, stop, testNamespace, entry.setup.nodeGroups, controlMachineObjects, targetCoreObjects)
controlMachineObjects, targetCoreObjects, appsControlObjects := setupEnv(&entry.setup)
m, trackers, hasSyncedCacheFns := createMcmManager(t, stop, testNamespace, entry.setup.nodeGroups, controlMachineObjects, targetCoreObjects, appsControlObjects)
defer trackers.Stop()
waitForCacheSync(t, stop, hasSyncedCacheFns)

Expand Down Expand Up @@ -493,8 +545,8 @@ func TestNodes(t *testing.T) {
g := NewWithT(t)
stop := make(chan struct{})
defer close(stop)
controlMachineObjects, targetCoreObjects := setupEnv(&entry.setup)
m, trackers, hasSyncedCacheFns := createMcmManager(t, stop, testNamespace, nil, controlMachineObjects, targetCoreObjects)
controlMachineObjects, targetCoreObjects, _ := setupEnv(&entry.setup)
m, trackers, hasSyncedCacheFns := createMcmManager(t, stop, testNamespace, nil, controlMachineObjects, targetCoreObjects, nil)
defer trackers.Stop()
waitForCacheSync(t, stop, hasSyncedCacheFns)

Expand Down
14 changes: 14 additions & 0 deletions cluster-autoscaler/cloudprovider/mcm/mcm_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ import (
"errors"
"flag"
"fmt"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/fields"
v1appslister "k8s.io/client-go/listers/apps/v1"
"math/rand"
"net/http"
"os"
Expand Down Expand Up @@ -115,6 +118,7 @@ type McmManager struct {
namespace string
interrupt chan struct{}
discoveryOpts cloudprovider.NodeGroupDiscoveryOptions
deploymentLister v1appslister.DeploymentLister
machineClient machineapi.MachineV1alpha1Interface
machineDeploymentLister machinelisters.MachineDeploymentLister
machineSetLister machinelisters.MachineSetLister
Expand Down Expand Up @@ -167,6 +171,15 @@ func createMCMManagerInternal(discoveryOpts cloudprovider.NodeGroupDiscoveryOpti
controlCoreClientBuilder := CoreControllerClientBuilder{
ClientConfig: controlKubeconfig,
}

deployKubeClient := controlCoreClientBuilder.ClientOrDie("deploykubeclient")
selector := fields.Everything()
deploymentListWatch := cache.NewListWatchFromClient(deployKubeClient.AppsV1().RESTClient(), "deployments", namespace, selector)
store, reflector := cache.NewNamespaceKeyedIndexerAndReflector(deploymentListWatch, &appsv1.Deployment{}, time.Hour)
deploymentLister := v1appslister.NewDeploymentLister(store)
stopCh := make(chan struct{})
go reflector.Run(stopCh)

availableResources, err := getAvailableResources(controlCoreClientBuilder)
if err != nil {
return nil, err
Expand Down Expand Up @@ -232,6 +245,7 @@ func createMCMManagerInternal(discoveryOpts cloudprovider.NodeGroupDiscoveryOpti
m := &McmManager{
namespace: namespace,
interrupt: make(chan struct{}),
deploymentLister: deploymentLister,
machineClient: controlMachineClient,
machineClassLister: machineClassLister,
machineLister: machineSharedInformers.Machines().Lister(),
Expand Down
36 changes: 34 additions & 2 deletions cluster-autoscaler/cloudprovider/mcm/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package mcm

import (
"fmt"
v1 "k8s.io/api/apps/v1"
sssash18 marked this conversation as resolved.
Show resolved Hide resolved
"k8s.io/utils/pointer"
"testing"
"time"

Expand All @@ -35,6 +37,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
customfake "k8s.io/autoscaler/cluster-autoscaler/cloudprovider/mcm/fakeclient"
deletetaint "k8s.io/autoscaler/cluster-autoscaler/utils/taints"
appsv1informers "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers"
)

Expand Down Expand Up @@ -220,24 +223,42 @@ func newMachineStatus(statusTemplate *v1alpha1.MachineStatus) *v1alpha1.MachineS
return statusTemplate.DeepCopy()
}

func newDeployments(availableReplicas int32) []*v1.Deployment {
sssash18 marked this conversation as resolved.
Show resolved Hide resolved
return []*v1.Deployment{
{
ObjectMeta: metav1.ObjectMeta{
Name: "machine-controller-manager",
Namespace: testNamespace,
},
Spec: v1.DeploymentSpec{
Replicas: pointer.Int32(1),
},
Status: v1.DeploymentStatus{
AvailableReplicas: availableReplicas,
},
},
}
}

func createMcmManager(
t *testing.T,
stop <-chan struct{},
namespace string,
nodeGroups []string, controlMachineObjects, targetCoreObjects []runtime.Object,
nodeGroups []string, controlMachineObjects, targetCoreObjects, controlAppsObjects []runtime.Object,
) (*McmManager, *customfake.FakeObjectTrackers, []cache.InformerSynced) {
g := gomega.NewWithT(t)
fakeControlMachineClient, controlMachineObjectTracker := customfake.NewMachineClientSet(controlMachineObjects...)
fakeTypedMachineClient := &faketyped.FakeMachineV1alpha1{
Fake: &fakeControlMachineClient.Fake,
}
fakeTargetCoreClient, targetCoreObjectTracker := customfake.NewCoreClientSet(targetCoreObjects...)
fakeControlAppsClient, controlAppsObjectTracker := customfake.NewAppsV1ClientSet(controlAppsObjects...)
fakeObjectTrackers := customfake.NewFakeObjectTrackers(
controlMachineObjectTracker,
targetCoreObjectTracker,
controlAppsObjectTracker,
)
fakeObjectTrackers.Start()

coreTargetInformerFactory := coreinformers.NewFilteredSharedInformerFactory(
fakeTargetCoreClient,
100*time.Millisecond,
Expand All @@ -248,6 +269,15 @@ func createMcmManager(
coreTargetSharedInformers := coreTargetInformerFactory.Core().V1()
nodes := coreTargetSharedInformers.Nodes()

appsControlInformerFactory := appsv1informers.NewFilteredSharedInformerFactory(
fakeControlAppsClient,
100*time.Millisecond,
namespace,
nil,
)
defer appsControlInformerFactory.Start(stop)
appsControlSharedInformers := appsControlInformerFactory.Apps().V1()

controlMachineInformerFactory := machineinformers.NewFilteredSharedInformerFactory(
fakeControlMachineClient,
100*time.Millisecond,
Expand All @@ -272,6 +302,7 @@ func createMcmManager(
discoveryOpts: cloudprovider.NodeGroupDiscoveryOptions{
NodeGroupSpecs: nodeGroups,
},
deploymentLister: appsControlSharedInformers.Deployments().Lister(),
machineClient: fakeTypedMachineClient,
machineDeploymentLister: machineDeployments.Lister(),
machineSetLister: machineSets.Lister(),
Expand All @@ -288,6 +319,7 @@ func createMcmManager(
machineSets.Informer().HasSynced,
machineDeployments.Informer().HasSynced,
machineClasses.Informer().HasSynced,
appsControlSharedInformers.Deployments().Informer().HasSynced,
}

return &mcmManager, fakeObjectTrackers, hasSyncedCachesFns
Expand Down
Loading