Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Suspend CA activities if MCM is offline #256

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -20,37 +20,26 @@ https://github.com/kubernetes/kubernetes/blob/release-1.8/pkg/controller/client_
package mcm

import (
clientgoclientset "k8s.io/client-go/kubernetes"
clientset "k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"

"k8s.io/klog/v2"
)

// CoreClientBuilder allows you to get clients and configs for core controllers
type CoreClientBuilder interface {
Config(name string) (*restclient.Config, error)
ConfigOrDie(name string) *restclient.Config
Client(name string) (clientset.Interface, error)
ClientOrDie(name string) clientset.Interface
ClientGoClient(name string) (clientgoclientset.Interface, error)
ClientGoClientOrDie(name string) clientgoclientset.Interface
}

// CoreControllerClientBuilder returns a fixed client with different user agents
type CoreControllerClientBuilder struct {
// ClientBuilder returns a fixed client with different user agents
type ClientBuilder struct {
// ClientConfig is a skeleton config to clone and use as the basis for each controller client
ClientConfig *restclient.Config
}

// Config lets you configure the client builder
func (b CoreControllerClientBuilder) Config(name string) (*restclient.Config, error) {
func (b ClientBuilder) Config(name string) (*restclient.Config, error) {
clientConfig := *b.ClientConfig
return restclient.AddUserAgent(&clientConfig, name), nil
}

// ConfigOrDie either configures or die's while configuring
func (b CoreControllerClientBuilder) ConfigOrDie(name string) *restclient.Config {
func (b ClientBuilder) ConfigOrDie(name string) *restclient.Config {
clientConfig, err := b.Config(name)
if err != nil {
klog.Fatal(err)
Expand All @@ -59,7 +48,7 @@ func (b CoreControllerClientBuilder) ConfigOrDie(name string) *restclient.Config
}

// Client builds a new client for clientBuilder
func (b CoreControllerClientBuilder) Client(name string) (clientset.Interface, error) {
func (b ClientBuilder) Client(name string) (clientset.Interface, error) {
clientConfig, err := b.Config(name)
if err != nil {
return nil, err
Expand All @@ -68,28 +57,10 @@ func (b CoreControllerClientBuilder) Client(name string) (clientset.Interface, e
}

// ClientOrDie builds a client or die's
func (b CoreControllerClientBuilder) ClientOrDie(name string) clientset.Interface {
func (b ClientBuilder) ClientOrDie(name string) clientset.Interface {
client, err := b.Client(name)
if err != nil {
klog.Fatal(err)
}
return client
}

// ClientGoClient builds a go client
func (b CoreControllerClientBuilder) ClientGoClient(name string) (clientgoclientset.Interface, error) {
clientConfig, err := b.Config(name)
if err != nil {
return nil, err
}
return clientgoclientset.NewForConfig(clientConfig)
}

// ClientGoClientOrDie builds a go client or die's
func (b CoreControllerClientBuilder) ClientGoClientOrDie(name string) clientgoclientset.Interface {
client, err := b.ClientGoClient(name)
if err != nil {
klog.Fatal(err)
}
return client
}
36 changes: 34 additions & 2 deletions cluster-autoscaler/cloudprovider/mcm/fakeclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,15 +415,16 @@ func NewMachineClientSet(objects ...runtime.Object) (*fakeuntyped.Clientset, *Fa

// FakeObjectTrackers is a struct containing all the controller fake object trackers
type FakeObjectTrackers struct {
ControlMachine, TargetCore *FakeObjectTracker
ControlMachine, TargetCore, ControlApps *FakeObjectTracker
}

// NewFakeObjectTrackers initializes fakeObjectTrackers initializes the fake object trackers
func NewFakeObjectTrackers(controlMachine, targetCore *FakeObjectTracker) *FakeObjectTrackers {
func NewFakeObjectTrackers(controlMachine, targetCore, controlApps *FakeObjectTracker) *FakeObjectTrackers {

fakeObjectTrackers := &FakeObjectTrackers{
ControlMachine: controlMachine,
TargetCore: targetCore,
ControlApps: controlApps,
}

return fakeObjectTrackers
Expand Down Expand Up @@ -483,6 +484,37 @@ func NewCoreClientSet(objects ...runtime.Object) (*Clientset, *FakeObjectTracker
return cs, o
}

// NewAppsClientSet returns a clientset that will respond with the provided objects.
// It's backed by a very simple object tracker that processes creates, updates and deletions as-is,
// without applying any validations and/or defaults. It shouldn't be considered a replacement
// for a real clientset and is mostly useful in simple unit tests.
func NewAppsClientSet(objects ...runtime.Object) (*Clientset, *FakeObjectTracker) {

var scheme = runtime.NewScheme()
var codecs = serializer.NewCodecFactory(scheme)

metav1.AddToGroupVersion(scheme, schema.GroupVersion{Version: "v1", Group: "apps"})
_ = k8sfake.AddToScheme(scheme)

o := &FakeObjectTracker{
FakeWatcher: watch.NewFake(),
delegatee: k8stesting.NewObjectTracker(scheme, codecs.UniversalDecoder()),
}

for _, obj := range objects {
if err := o.Add(obj); err != nil {
panic(err)
}
}

cs := &Clientset{Clientset: &k8sfake.Clientset{}}
cs.FakeDiscovery = &fakediscovery.FakeDiscovery{Fake: &cs.Fake}
cs.Fake.AddReactor("*", "*", k8stesting.ObjectReaction(o))
cs.Fake.AddWatchReactor("*", o.watchReactionFunc)

return cs, o
}

// Clientset extends k8sfake.Clientset to override the Policy implementation.
// This is because the default Policy fake implementation does not propagate the
// eviction name.
Expand Down
28 changes: 23 additions & 5 deletions cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,17 @@ package mcm
import (
"context"
"fmt"
"strings"

apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"

"k8s.io/autoscaler/cluster-autoscaler/cloudprovider"
"k8s.io/autoscaler/cluster-autoscaler/config"
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"k8s.io/klog/v2"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
"strings"
)

const (
Expand Down Expand Up @@ -192,9 +190,29 @@ func (mcm *mcmCloudProvider) GetResourceLimiter() (*cloudprovider.ResourceLimite
return mcm.resourceLimiter, nil
}

func (mcm *mcmCloudProvider) checkMCMAvailableReplicas() error {
namespace := mcm.mcmManager.namespace
deployment, err := mcm.mcmManager.deploymentLister.Deployments(namespace).Get("machine-controller-manager")
if err != nil {
return fmt.Errorf("failed to get machine-controller-manager deployment: %v", err.Error())
}

if deployment.Status.AvailableReplicas == 0 {
return fmt.Errorf("machine-controller-manager is offline. Cluster autoscaler operations would be suspended.")
}

return nil
}

// Refresh is called before every main loop and can be used to dynamically update cloud provider state.
// In particular the list of node groups returned by NodeGroups can change as a result of CloudProvider.Refresh().
func (mcm *mcmCloudProvider) Refresh() error {

err := mcm.checkMCMAvailableReplicas()
if err != nil {
return err
}

for _, machineDeployment := range mcm.machinedeployments {
err := mcm.mcmManager.resetPriorityForNotToBeDeletedMachines(machineDeployment.Name)
if err != nil {
Expand Down
57 changes: 48 additions & 9 deletions cluster-autoscaler/cloudprovider/mcm/mcm_cloud_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
v1 "k8s.io/api/apps/v1"
"math"
"strings"
"testing"
Expand Down Expand Up @@ -51,13 +52,14 @@ type setup struct {
machines []*v1alpha1.Machine
machineSets []*v1alpha1.MachineSet
machineDeployments []*v1alpha1.MachineDeployment
mcmDeployment *v1.Deployment
machineClasses []*v1alpha1.MachineClass
nodeGroups []string
targetCoreFakeResourceActions *customfake.ResourceActions
controlMachineFakeResourceActions *customfake.ResourceActions
}

func setupEnv(setup *setup) ([]runtime.Object, []runtime.Object) {
func setupEnv(setup *setup) ([]runtime.Object, []runtime.Object, []runtime.Object) {
var controlMachineObjects []runtime.Object
for _, o := range setup.machines {
controlMachineObjects = append(controlMachineObjects, o)
Expand All @@ -78,7 +80,13 @@ func setupEnv(setup *setup) ([]runtime.Object, []runtime.Object) {
targetCoreObjects = append(targetCoreObjects, o)
}

return controlMachineObjects, targetCoreObjects
var appsControlObjects []runtime.Object

if setup.mcmDeployment != nil {
appsControlObjects = append(appsControlObjects, setup.mcmDeployment)
}

return controlMachineObjects, targetCoreObjects, appsControlObjects
}

func TestDeleteNodes(t *testing.T) {
Expand Down Expand Up @@ -279,8 +287,8 @@ func TestDeleteNodes(t *testing.T) {
g := NewWithT(t)
stop := make(chan struct{})
defer close(stop)
controlMachineObjects, targetCoreObjects := setupEnv(&entry.setup)
m, trackers, hasSyncedCacheFns := createMcmManager(t, stop, testNamespace, nil, controlMachineObjects, targetCoreObjects)
controlMachineObjects, targetCoreObjects, _ := setupEnv(&entry.setup)
m, trackers, hasSyncedCacheFns := createMcmManager(t, stop, testNamespace, nil, controlMachineObjects, targetCoreObjects, nil)
defer trackers.Stop()
waitForCacheSync(t, stop, hasSyncedCacheFns)

Expand Down Expand Up @@ -341,12 +349,41 @@ func TestRefresh(t *testing.T) {
}
table := []data{
{

himanshu-kun marked this conversation as resolved.
Show resolved Hide resolved
"should return an error if MCM has zero available replicas",
setup{
nodes: newNodes(1, "fakeID", []bool{false}),
machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}),
machineDeployments: newMachineDeployments(1, 1, nil, nil, nil),
nodeGroups: []string{nodeGroup2},
mcmDeployment: newMCMDeployment(0),
},
expect{
err: fmt.Errorf("machine-controller-manager is offline. Cluster autoscaler operations would be suspended."),
},
},
{

"should return an error if MCM deployment is not found",
setup{
nodes: newNodes(1, "fakeID", []bool{false}),
machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}),
machineDeployments: newMachineDeployments(1, 1, nil, nil, nil),
nodeGroups: []string{nodeGroup2},
},
expect{
err: fmt.Errorf("failed to get machine-controller-manager deployment: deployment.apps \"machine-controller-manager\" not found"),
},
},
{

"should reset priority of a machine with node without ToBeDeletedTaint to 3",
setup{
nodes: newNodes(1, "fakeID", []bool{false}),
machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}),
machineDeployments: newMachineDeployments(1, 1, nil, nil, nil),
nodeGroups: []string{nodeGroup2},
mcmDeployment: newMCMDeployment(1),
},
expect{
machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"3"}, []bool{false}),
Expand All @@ -360,6 +397,7 @@ func TestRefresh(t *testing.T) {
machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}),
machineDeployments: newMachineDeployments(1, 1, nil, nil, nil),
nodeGroups: []string{nodeGroup2},
mcmDeployment: newMCMDeployment(1),
},
expect{
machines: newMachines(1, "fakeID", nil, "machinedeployment-1", "machineset-1", []string{"1"}, []bool{false}),
Expand All @@ -377,7 +415,8 @@ func TestRefresh(t *testing.T) {
Update: customfake.CreateFakeResponse(math.MaxInt32, mcUpdateErrorMsg, 0),
},
},
nodeGroups: []string{nodeGroup2},
nodeGroups: []string{nodeGroup2},
mcmDeployment: newMCMDeployment(1),
},
expect{
machines: []*v1alpha1.Machine{newMachine("machine-1", "fakeID-1", nil, "machinedeployment-1", "machineset-1", "1", false, true)},
Expand All @@ -392,8 +431,8 @@ func TestRefresh(t *testing.T) {
g := NewWithT(t)
stop := make(chan struct{})
defer close(stop)
controlMachineObjects, targetCoreObjects := setupEnv(&entry.setup)
m, trackers, hasSyncedCacheFns := createMcmManager(t, stop, testNamespace, entry.setup.nodeGroups, controlMachineObjects, targetCoreObjects)
controlMachineObjects, targetCoreObjects, appsControlObjects := setupEnv(&entry.setup)
m, trackers, hasSyncedCacheFns := createMcmManager(t, stop, testNamespace, entry.setup.nodeGroups, controlMachineObjects, targetCoreObjects, appsControlObjects)
defer trackers.Stop()
waitForCacheSync(t, stop, hasSyncedCacheFns)

Expand Down Expand Up @@ -493,8 +532,8 @@ func TestNodes(t *testing.T) {
g := NewWithT(t)
stop := make(chan struct{})
defer close(stop)
controlMachineObjects, targetCoreObjects := setupEnv(&entry.setup)
m, trackers, hasSyncedCacheFns := createMcmManager(t, stop, testNamespace, nil, controlMachineObjects, targetCoreObjects)
controlMachineObjects, targetCoreObjects, _ := setupEnv(&entry.setup)
m, trackers, hasSyncedCacheFns := createMcmManager(t, stop, testNamespace, nil, controlMachineObjects, targetCoreObjects, nil)
defer trackers.Stop()
waitForCacheSync(t, stop, hasSyncedCacheFns)

Expand Down
Loading
Loading