diff --git a/cmd/machine-controller-manager-cli/main.go b/cmd/machine-controller-manager-cli/main.go index eee458ac9..ac1c32286 100644 --- a/cmd/machine-controller-manager-cli/main.go +++ b/cmd/machine-controller-manager-cli/main.go @@ -4,8 +4,8 @@ import ( "bytes" "flag" "fmt" - "io/ioutil" "log" + "os" "github.com/gardener/machine-controller-manager/pkg/apis/machine/v1alpha1" "github.com/gardener/machine-controller-manager/pkg/driver" @@ -119,7 +119,7 @@ func main() { // Read function decodes the yaml file passed to it func Read(fileName string, decodedObj interface{}) error { - m, err := ioutil.ReadFile(fileName) + m, err := os.ReadFile(fileName) if err != nil { log.Fatalf("Could not read %s: %s", fileName, err) } diff --git a/cmd/machine-controller-manager/app/controllermanager.go b/cmd/machine-controller-manager/app/controllermanager.go index bd9995638..c0e3db6c6 100644 --- a/cmd/machine-controller-manager/app/controllermanager.go +++ b/cmd/machine-controller-manager/app/controllermanager.go @@ -30,6 +30,7 @@ import ( "os" goruntime "runtime" "strconv" + "sync" "time" machinescheme "github.com/gardener/machine-controller-manager/pkg/client/clientset/versioned/scheme" @@ -73,7 +74,7 @@ var ( ) // Run runs the MCMServer. This should never exit. -func Run(s *options.MCMServer) error { +func Run(ctx context.Context, s *options.MCMServer) error { // To help debugging, immediately log version klog.V(4).Infof("Version: %+v", version.Get()) if err := s.Validate(); err != nil { @@ -126,8 +127,9 @@ func Run(s *options.MCMServer) error { recorder := createRecorder(kubeClientControl) - run := func(ctx context.Context) { - var stop <-chan struct{} + waitGroup := sync.WaitGroup{} + waitGroup.Add(1) + startControllers := func(ctx context.Context) { // Control plane client used to interact with machine APIs controlMachineClientBuilder := machinecontroller.SimpleClientBuilder{ ClientConfig: controlkubeconfig, @@ -141,7 +143,8 @@ func Run(s *options.MCMServer) error { ClientConfig: targetkubeconfig, } - err := StartControllers( + if err := StartControllers( + ctx, s, controlkubeconfig, targetkubeconfig, @@ -149,17 +152,14 @@ func Run(s *options.MCMServer) error { controlCoreClientBuilder, targetCoreClientBuilder, recorder, - stop, - ) - - klog.Fatalf("error running controllers: %v", err) - panic("unreachable") - + ); err != nil { + klog.Fatalf("failed to start controllers: %v", err) + } + waitGroup.Done() } if !s.LeaderElection.LeaderElect { - run(nil) - panic("unreachable") + startControllers(ctx) } id, err := os.Hostname() @@ -182,32 +182,34 @@ func Run(s *options.MCMServer) error { klog.Fatalf("error creating lock: %v", err) } - ctx := context.TODO() leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ Lock: rl, LeaseDuration: s.LeaderElection.LeaseDuration.Duration, RenewDeadline: s.LeaderElection.RenewDeadline.Duration, RetryPeriod: s.LeaderElection.RetryPeriod.Duration, Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: run, + OnStartedLeading: startControllers, OnStoppedLeading: func() { - klog.Fatalf("leaderelection lost") + klog.Errorf("leaderelection lost") + waitGroup.Wait() }, }, }) - panic("unreachable") + + return nil } // StartControllers starts all the controllers which are a part of machine-controller-manager -func StartControllers(s *options.MCMServer, +func StartControllers( + ctx context.Context, + s *options.MCMServer, controlCoreKubeconfig *rest.Config, targetCoreKubeconfig *rest.Config, controlMachineClientBuilder machinecontroller.ClientBuilder, controlCoreClientBuilder corecontroller.ClientBuilder, targetCoreClientBuilder corecontroller.ClientBuilder, recorder record.EventRecorder, - stop <-chan struct{}) error { - +) error { klog.V(5).Info("Getting available resources") availableResources, err := getAvailableResources(controlCoreClientBuilder) if err != nil { @@ -231,18 +233,16 @@ func StartControllers(s *options.MCMServer, if availableResources[machineGVR] || availableResources[machineSetGVR] || availableResources[machineDeploymentGVR] { klog.V(5).Infof("Creating shared informers; resync interval: %v", s.MinResyncPeriod) - controlMachineInformerFactory := machineinformers.NewFilteredSharedInformerFactory( + controlMachineInformerFactory := machineinformers.NewSharedInformerFactoryWithOptions( controlMachineClientBuilder.ClientOrDie("control-machine-shared-informers"), s.MinResyncPeriod.Duration, - s.Namespace, - nil, + machineinformers.WithNamespace(s.Namespace), ) - controlCoreInformerFactory := coreinformers.NewFilteredSharedInformerFactory( + controlCoreInformerFactory := coreinformers.NewSharedInformerFactoryWithOptions( controlCoreClientBuilder.ClientOrDie("control-core-shared-informers"), s.MinResyncPeriod.Duration, - s.Namespace, - nil, + coreinformers.WithNamespace(s.Namespace), ) targetCoreInformerFactory := coreinformers.NewSharedInformerFactory( @@ -284,22 +284,28 @@ func StartControllers(s *options.MCMServer, } klog.V(1).Info("Starting shared informers") - controlMachineInformerFactory.Start(stop) - controlCoreInformerFactory.Start(stop) - targetCoreInformerFactory.Start(stop) + controlMachineInformerFactory.Start(ctx.Done()) + controlCoreInformerFactory.Start(ctx.Done()) + targetCoreInformerFactory.Start(ctx.Done()) klog.V(5).Info("Running controller") - go mcmcontroller.Run(int(s.ConcurrentNodeSyncs), stop) - + var waitGroup sync.WaitGroup + waitGroup.Add(1) + go func() { + mcmcontroller.Run(ctx, int(s.ConcurrentNodeSyncs)) + waitGroup.Done() + }() + waitGroup.Wait() } else { return fmt.Errorf("unable to start machine controller: API GroupVersion %q or %q or %q is not available; \nFound: %#v", machineGVR, machineSetGVR, machineDeploymentGVR, availableResources) } - select {} + return nil } -// TODO: In general, any controller checking this needs to be dynamic so -// users don't have to restart their controller manager if they change the apiserver. +// TODO: In general, any controller checking this needs to be dynamic so users don't have to +// restart their controller manager if they change the apiserver. +// // Until we get there, the structure here needs to be exposed for the construction of a proper ControllerContext. func getAvailableResources(clientBuilder corecontroller.ClientBuilder) (map[schema.GroupVersionResource]bool, error) { var discoveryClient discovery.DiscoveryInterface @@ -330,16 +336,16 @@ func getAvailableResources(clientBuilder corecontroller.ClientBuilder) (map[sche return nil, fmt.Errorf("failed to get api versions from server: %v: %v", healthzContent, err) } - resourceMap, err := discoveryClient.ServerResources() + _, resources, err := discoveryClient.ServerGroupsAndResources() if err != nil { utilruntime.HandleError(fmt.Errorf("unable to get all supported resources from server: %v", err)) } - if len(resourceMap) == 0 { + if len(resources) == 0 { return nil, fmt.Errorf("unable to get any supported resources from server") } allResources := map[schema.GroupVersionResource]bool{} - for _, apiResourceList := range resourceMap { + for _, apiResourceList := range resources { version, err := schema.ParseGroupVersion(apiResourceList.GroupVersion) if err != nil { return nil, err diff --git a/cmd/machine-controller-manager/controller_manager.go b/cmd/machine-controller-manager/controller_manager.go index 1d41f5a11..0b6524506 100644 --- a/cmd/machine-controller-manager/controller_manager.go +++ b/cmd/machine-controller-manager/controller_manager.go @@ -23,6 +23,7 @@ package main import ( "fmt" + "github.com/gardener/machine-controller-manager/pkg/util/signals" "os" "github.com/gardener/machine-controller-manager/cmd/machine-controller-manager/app" @@ -38,7 +39,6 @@ import ( ) func main() { - s := options.NewMCMServer() s.AddFlags(pflag.CommandLine) @@ -46,11 +46,8 @@ func main() { logs.InitLogs() defer logs.FlushLogs() - // verflag.PrintAndExitIfRequested() - - if err := app.Run(s); err != nil { + if err := app.Run(signals.SetupSignalHandler(), s); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } - } diff --git a/go.mod b/go.mod index aaa754744..e70f192e4 100644 --- a/go.mod +++ b/go.mod @@ -102,7 +102,7 @@ require ( golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect golang.org/x/tools v0.1.5 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect - google.golang.org/appengine v1.6.6 // indirect + google.golang.org/appengine v1.6.7 // indirect google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c // indirect google.golang.org/grpc v1.38.0 // indirect google.golang.org/protobuf v1.26.0 // indirect diff --git a/go.sum b/go.sum index 41398585a..6f0f42d2e 100644 --- a/go.sum +++ b/go.sum @@ -814,8 +814,9 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7 google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.6.1/go.mod h1:i06prIuMbXzDqacNJfV5OdTW448YApPu5ww/cMBSeb0= google.golang.org/appengine v1.6.5/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= -google.golang.org/appengine v1.6.6 h1:lMO5rYAqUxkmaj76jAkRUvt5JZgFymx/+Q5Mzfivuhc= google.golang.org/appengine v1.6.6/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= +google.golang.org/appengine v1.6.7 h1:FZR1q0exgwxzPzp/aF+VccGrSfxfPpkBqjIIEq3ru6c= +google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190307195333-5fe7a883aa19/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= diff --git a/pkg/apis/machine/register.go b/pkg/apis/machine/register.go index 136a0ec43..dfb329a48 100644 --- a/pkg/apis/machine/register.go +++ b/pkg/apis/machine/register.go @@ -43,8 +43,7 @@ var ( // the code-generation can find it. SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes) // AddToScheme is exposed for API installation - AddToScheme = SchemeBuilder.AddToScheme - localSchemeBuilder = &SchemeBuilder + AddToScheme = SchemeBuilder.AddToScheme ) func addKnownTypes(scheme *runtime.Scheme) error { diff --git a/pkg/apis/machine/v1alpha1/defaults.go b/pkg/apis/machine/v1alpha1/defaults.go deleted file mode 100644 index 7f5d7eca1..000000000 --- a/pkg/apis/machine/v1alpha1/defaults.go +++ /dev/null @@ -1,25 +0,0 @@ -/* -Copyright (c) 2017 SAP SE or an SAP affiliate company. All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package v1alpha1 - -import ( - "k8s.io/apimachinery/pkg/runtime" -) - -func addDefaultingFuncs(scheme *runtime.Scheme) error { - return RegisterDefaults(scheme) -} diff --git a/pkg/controller/alicloudmachineclass.go b/pkg/controller/alicloudmachineclass.go index c01d4b97d..2d0650f0e 100644 --- a/pkg/controller/alicloudmachineclass.go +++ b/pkg/controller/alicloudmachineclass.go @@ -130,8 +130,7 @@ func (c *controller) alicloudMachineClassDelete(obj interface{}) { // reconcileClusterAlicloudMachineClassKey reconciles an AlicloudMachineClass due to controller resync // or an event on the alicloudMachineClass. -func (c *controller) reconcileClusterAlicloudMachineClassKey(key string) error { - ctx := context.Background() +func (c *controller) reconcileClusterAlicloudMachineClassKey(ctx context.Context, key string) error { _, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err diff --git a/pkg/controller/awsmachineclass.go b/pkg/controller/awsmachineclass.go index bc0b4f092..c39d842c9 100644 --- a/pkg/controller/awsmachineclass.go +++ b/pkg/controller/awsmachineclass.go @@ -131,8 +131,7 @@ func (c *controller) awsMachineClassDelete(obj interface{}) { // reconcileClusterAWSMachineClassKey reconciles an AWSMachineClass due to controller resync // or an event on the awsMachineClass. -func (c *controller) reconcileClusterAWSMachineClassKey(key string) error { - ctx := context.Background() +func (c *controller) reconcileClusterAWSMachineClassKey(ctx context.Context, key string) error { _, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err diff --git a/pkg/controller/awsmachineclass_test.go b/pkg/controller/awsmachineclass_test.go index ec30fdd75..2ac997c4d 100644 --- a/pkg/controller/awsmachineclass_test.go +++ b/pkg/controller/awsmachineclass_test.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/controller/azuremachineclass.go b/pkg/controller/azuremachineclass.go index 7210e22cc..2c78505da 100644 --- a/pkg/controller/azuremachineclass.go +++ b/pkg/controller/azuremachineclass.go @@ -131,8 +131,7 @@ func (c *controller) azureMachineClassDelete(obj interface{}) { // reconcileClusterAzureMachineClassKey reconciles an AzureMachineClass due to controller resync // or an event on the azureMachineClass. -func (c *controller) reconcileClusterAzureMachineClassKey(key string) error { - ctx := context.Background() +func (c *controller) reconcileClusterAzureMachineClassKey(ctx context.Context, key string) error { _, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 116a03a2a..e12848f67 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -18,6 +18,7 @@ limitations under the License. package controller import ( + "context" "fmt" "sync" "time" @@ -414,7 +415,7 @@ type Controller interface { // Run runs the controller until the given stop channel can be read from. // workers specifies the number of goroutines, per resource, processing work // from the resource workqueues - Run(workers int, stopCh <-chan struct{}) + Run(ctx context.Context, workers int) } // controller is a concrete Controller. @@ -479,29 +480,16 @@ type controller struct { machineDeploymentSynced cache.InformerSynced } -func (c *controller) Run(workers int, stopCh <-chan struct{}) { +func (c *controller) Run(ctx context.Context, workers int) { var ( waitGroup sync.WaitGroup ) defer runtimeutil.HandleCrash() - defer c.nodeQueue.ShutDown() - defer c.secretQueue.ShutDown() - defer c.openStackMachineClassQueue.ShutDown() - defer c.awsMachineClassQueue.ShutDown() - defer c.azureMachineClassQueue.ShutDown() - defer c.gcpMachineClassQueue.ShutDown() - defer c.alicloudMachineClassQueue.ShutDown() - defer c.packetMachineClassQueue.ShutDown() - defer c.machineQueue.ShutDown() - defer c.machineSetQueue.ShutDown() - defer c.machineDeploymentQueue.ShutDown() - defer c.machineSafetyOrphanVMsQueue.ShutDown() - defer c.machineSafetyOvershootingQueue.ShutDown() - defer c.machineSafetyAPIServerQueue.ShutDown() - - if !cache.WaitForCacheSync(stopCh, c.secretSynced, c.nodeSynced, c.openStackMachineClassSynced, c.awsMachineClassSynced, c.azureMachineClassSynced, c.gcpMachineClassSynced, c.alicloudMachineClassSynced, c.packetMachineClassSynced, c.machineSynced, c.machineSetSynced, c.machineDeploymentSynced) { + defer c.shutdownQueues() + + if !cache.WaitForCacheSync(ctx.Done(), c.secretSynced, c.nodeSynced, c.openStackMachineClassSynced, c.awsMachineClassSynced, c.azureMachineClassSynced, c.gcpMachineClassSynced, c.alicloudMachineClassSynced, c.packetMachineClassSynced, c.machineSynced, c.machineSetSynced, c.machineDeploymentSynced) { runtimeutil.HandleError(fmt.Errorf("Timed out waiting for caches to sync")) return } @@ -515,36 +503,54 @@ func (c *controller) Run(workers int, stopCh <-chan struct{}) { prometheus.MustRegister(c) for i := 0; i < workers; i++ { - createWorker(c.openStackMachineClassQueue, "ClusterOpenStackMachineClass", maxRetries, true, c.reconcileClusterOpenStackMachineClassKey, stopCh, &waitGroup) - createWorker(c.awsMachineClassQueue, "ClusterAWSMachineClass", maxRetries, true, c.reconcileClusterAWSMachineClassKey, stopCh, &waitGroup) - createWorker(c.azureMachineClassQueue, "ClusterAzureMachineClass", maxRetries, true, c.reconcileClusterAzureMachineClassKey, stopCh, &waitGroup) - createWorker(c.gcpMachineClassQueue, "ClusterGCPMachineClass", maxRetries, true, c.reconcileClusterGCPMachineClassKey, stopCh, &waitGroup) - createWorker(c.alicloudMachineClassQueue, "ClusterAlicloudMachineClass", maxRetries, true, c.reconcileClusterAlicloudMachineClassKey, stopCh, &waitGroup) - createWorker(c.packetMachineClassQueue, "ClusterPacketMachineClass", maxRetries, true, c.reconcileClusterPacketMachineClassKey, stopCh, &waitGroup) - createWorker(c.secretQueue, "ClusterSecret", maxRetries, true, c.reconcileClusterSecretKey, stopCh, &waitGroup) - createWorker(c.nodeQueue, "ClusterNode", maxRetries, true, c.reconcileClusterNodeKey, stopCh, &waitGroup) - createWorker(c.machineQueue, "ClusterMachine", maxRetries, true, c.reconcileClusterMachineKey, stopCh, &waitGroup) - createWorker(c.machineSetQueue, "ClusterMachineSet", maxRetries, true, c.reconcileClusterMachineSet, stopCh, &waitGroup) - createWorker(c.machineDeploymentQueue, "ClusterMachineDeployment", maxRetries, true, c.reconcileClusterMachineDeployment, stopCh, &waitGroup) - createWorker(c.machineSafetyOrphanVMsQueue, "ClusterMachineSafetyOrphanVMs", maxRetries, true, c.reconcileClusterMachineSafetyOrphanVMs, stopCh, &waitGroup) - createWorker(c.machineSafetyOvershootingQueue, "ClusterMachineSafetyOvershooting", maxRetries, true, c.reconcileClusterMachineSafetyOvershooting, stopCh, &waitGroup) - createWorker(c.machineSafetyAPIServerQueue, "ClusterMachineAPIServer", maxRetries, true, c.reconcileClusterMachineSafetyAPIServer, stopCh, &waitGroup) + createWorker(ctx, c.openStackMachineClassQueue, "ClusterOpenStackMachineClass", maxRetries, true, c.reconcileClusterOpenStackMachineClassKey, &waitGroup) + createWorker(ctx, c.awsMachineClassQueue, "ClusterAWSMachineClass", maxRetries, true, c.reconcileClusterAWSMachineClassKey, &waitGroup) + createWorker(ctx, c.azureMachineClassQueue, "ClusterAzureMachineClass", maxRetries, true, c.reconcileClusterAzureMachineClassKey, &waitGroup) + createWorker(ctx, c.gcpMachineClassQueue, "ClusterGCPMachineClass", maxRetries, true, c.reconcileClusterGCPMachineClassKey, &waitGroup) + createWorker(ctx, c.alicloudMachineClassQueue, "ClusterAlicloudMachineClass", maxRetries, true, c.reconcileClusterAlicloudMachineClassKey, &waitGroup) + createWorker(ctx, c.packetMachineClassQueue, "ClusterPacketMachineClass", maxRetries, true, c.reconcileClusterPacketMachineClassKey, &waitGroup) + createWorker(ctx, c.secretQueue, "ClusterSecret", maxRetries, true, c.reconcileClusterSecretKey, &waitGroup) + createWorker(ctx, c.nodeQueue, "ClusterNode", maxRetries, true, c.reconcileClusterNodeKey, &waitGroup) + createWorker(ctx, c.machineQueue, "ClusterMachine", maxRetries, true, c.reconcileClusterMachineKey, &waitGroup) + createWorker(ctx, c.machineSetQueue, "ClusterMachineSet", maxRetries, true, c.reconcileClusterMachineSet, &waitGroup) + createWorker(ctx, c.machineDeploymentQueue, "ClusterMachineDeployment", maxRetries, true, c.reconcileClusterMachineDeployment, &waitGroup) + createWorker(ctx, c.machineSafetyOrphanVMsQueue, "ClusterMachineSafetyOrphanVMs", maxRetries, true, c.reconcileClusterMachineSafetyOrphanVMs, &waitGroup) + createWorker(ctx, c.machineSafetyOvershootingQueue, "ClusterMachineSafetyOvershooting", maxRetries, true, c.reconcileClusterMachineSafetyOvershooting, &waitGroup) + createWorker(ctx, c.machineSafetyAPIServerQueue, "ClusterMachineAPIServer", maxRetries, true, c.reconcileClusterMachineSafetyAPIServer, &waitGroup) } - <-stopCh + c.shutdownQueues() + waitGroup.Wait() + klog.V(1).Info("Shutting down Machine Controller Manager ") handlers.UpdateHealth(false) +} - waitGroup.Wait() +func (c *controller) shutdownQueues() { + c.nodeQueue.ShutDown() + c.secretQueue.ShutDown() + c.openStackMachineClassQueue.ShutDown() + c.awsMachineClassQueue.ShutDown() + c.azureMachineClassQueue.ShutDown() + c.gcpMachineClassQueue.ShutDown() + c.alicloudMachineClassQueue.ShutDown() + c.packetMachineClassQueue.ShutDown() + c.machineQueue.ShutDown() + c.machineSetQueue.ShutDown() + c.machineDeploymentQueue.ShutDown() + c.machineSafetyOrphanVMsQueue.ShutDown() + c.machineSafetyOvershootingQueue.ShutDown() + c.machineSafetyAPIServerQueue.ShutDown() } // createWorker creates and runs a worker thread that just processes items in the // specified queue. The worker will run until stopCh is closed. The worker will be // added to the wait group when started and marked done when finished. -func createWorker(queue workqueue.RateLimitingInterface, resourceType string, maxRetries int, forgetAfterSuccess bool, reconciler func(key string) error, stopCh <-chan struct{}, waitGroup *sync.WaitGroup) { +func createWorker(ctx context.Context, queue workqueue.RateLimitingInterface, resourceType string, maxRetries int, forgetAfterSuccess bool, reconciler func(ctx context.Context, key string) error, waitGroup *sync.WaitGroup) { waitGroup.Add(1) go func() { - wait.Until(worker(queue, resourceType, maxRetries, forgetAfterSuccess, reconciler), time.Second, stopCh) + wait.UntilWithContext(ctx, worker(queue, resourceType, maxRetries, forgetAfterSuccess, reconciler), time.Second) + klog.V(5).Infof("Stopped worker for type: %s", resourceType) waitGroup.Done() }() } @@ -554,8 +560,8 @@ func createWorker(queue workqueue.RateLimitingInterface, resourceType string, ma // It enforces that the reconciler is never invoked concurrently with the same key. // If forgetAfterSuccess is true, it will cause the queue to forget the item should reconciliation // have no error. -func worker(queue workqueue.RateLimitingInterface, resourceType string, maxRetries int, forgetAfterSuccess bool, reconciler func(key string) error) func() { - return func() { +func worker(queue workqueue.RateLimitingInterface, resourceType string, maxRetries int, forgetAfterSuccess bool, reconciler func(ctx context.Context, key string) error) func(context.Context) { + return func(ctx context.Context) { exit := false for !exit { exit = func() bool { @@ -565,7 +571,7 @@ func worker(queue workqueue.RateLimitingInterface, resourceType string, maxRetri } defer queue.Done(key) - err := reconciler(key.(string)) + err := reconciler(ctx, key.(string)) if err == nil { if forgetAfterSuccess { queue.Forget(key) diff --git a/pkg/controller/controller_ref_manager.go b/pkg/controller/controller_ref_manager.go index 1de6a11c8..950816a02 100644 --- a/pkg/controller/controller_ref_manager.go +++ b/pkg/controller/controller_ref_manager.go @@ -60,8 +60,8 @@ func (m *BaseControllerRefManager) CanAdopt() error { // ClaimObject tries to take ownership of an object for this controller. // // It will reconcile the following: -// * Adopt orphans if the match function returns true. -// * Release owned objects if the match function returns false. +// - Adopt orphans if the match function returns true. +// - Release owned objects if the match function returns false. // // A non-nil error is returned if some form of reconciliation was attempted and // failed. Usually, controllers should try again later in case reconciliation @@ -146,8 +146,9 @@ type MachineControllerRefManager struct { // If CanAdopt() returns a non-nil error, all adoptions will fail. // // NOTE: Once CanAdopt() is called, it will not be called again by the same -// MachineControllerRefManager machine. Create a new machine if it makes -// sense to check CanAdopt() again (e.g. in a different sync pass). +// +// MachineControllerRefManager machine. Create a new machine if it makes +// sense to check CanAdopt() again (e.g. in a different sync pass). func NewMachineControllerRefManager( machineControl MachineControlInterface, controller metav1.Object, @@ -169,8 +170,8 @@ func NewMachineControllerRefManager( // ClaimMachines tries to take ownership of a list of Machines. // // It will reconcile the following: -// * Adopt orphans if the selector matches. -// * Release owned objects if the selector no longer matches. +// - Adopt orphans if the selector matches. +// - Release owned objects if the selector no longer matches. // // Optional: If one or more filters are specified, a Machine will only be claimed if // all filters return true. @@ -289,8 +290,9 @@ type MachineSetControllerRefManager struct { // If CanAdopt() returns a non-nil error, all adoptions will fail. // // NOTE: Once CanAdopt() is called, it will not be called again by the same -// MachineSetControllerRefManager machine. Create a new machine if it -// makes sense to check CanAdopt() again (e.g. in a different sync pass). +// +// MachineSetControllerRefManager machine. Create a new machine if it +// makes sense to check CanAdopt() again (e.g. in a different sync pass). func NewMachineSetControllerRefManager( machineSetControl MachineSetControlInterface, controller metav1.Object, @@ -312,8 +314,8 @@ func NewMachineSetControllerRefManager( // ClaimMachineSets tries to take ownership of a list of MachineSets. // // It will reconcile the following: -// * Adopt orphans if the selector matches. -// * Release owned objects if the selector no longer matches. +// - Adopt orphans if the selector matches. +// - Release owned objects if the selector no longer matches. // // A non-nil error is returned if some form of reconciliation was attempted and // failed. Usually, controllers should try again later in case reconciliation diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index c9de9f789..fe58158a3 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -466,10 +466,10 @@ func validateControllerRef(controllerRef *metav1.OwnerReference) error { if len(controllerRef.Kind) == 0 { return fmt.Errorf("controllerRef has empty Kind") } - if controllerRef.Controller == nil || *controllerRef.Controller != true { + if controllerRef.Controller == nil || !*controllerRef.Controller { return fmt.Errorf("controllerRef.Controller is not set to true") } - if controllerRef.BlockOwnerDeletion == nil || *controllerRef.BlockOwnerDeletion != true { + if controllerRef.BlockOwnerDeletion == nil || !*controllerRef.BlockOwnerDeletion { return fmt.Errorf("controllerRef.BlockOwnerDeletion is not set") } return nil @@ -814,15 +814,11 @@ func IsMachineActive(p *v1alpha1.Machine) bool { // IsMachineFailed checks if machine has failed func IsMachineFailed(p *v1alpha1.Machine) bool { - if p.Status.CurrentStatus.Phase == v1alpha1.MachineFailed { - return true - } - - return false + return p.Status.CurrentStatus.Phase == v1alpha1.MachineFailed } // MachineKey is the function used to get the machine name from machine object -//ToCheck : as machine-namespace does not matter +// ToCheck : as machine-namespace does not matter func MachineKey(machine *v1alpha1.Machine) string { return fmt.Sprintf("%v", machine.Name) } @@ -1019,6 +1015,10 @@ func RemoveAnnotationsOffNode(ctx context.Context, c clientset.Interface, nodeNa // Remove the annotations from the node. newNode, updated, err = annotationsutils.RemoveAnnotation(oldNodeCopy, annotations) + if err != nil { + klog.Errorf("Failed to remove annotations from node Node %s. Err: %v", nodeName, err) + return err + } if !updated { return nil @@ -1030,7 +1030,7 @@ func RemoveAnnotationsOffNode(ctx context.Context, c clientset.Interface, nodeNa // GetAnnotationsFromNode returns all the annotations of the provided node. func GetAnnotationsFromNode(ctx context.Context, c clientset.Interface, nodeName string) (map[string]string, error) { - // Short circuit if annotation doesnt exist for limiting API calls. + // Short circuit if annotation doesn't exist for limiting API calls. if nodeName == "" { return nil, nil } diff --git a/pkg/controller/controller_utils_test.go b/pkg/controller/controller_utils_test.go index b74ab8a2a..5ffeec18c 100644 --- a/pkg/controller/controller_utils_test.go +++ b/pkg/controller/controller_utils_test.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/controller/deployment.go b/pkg/controller/deployment.go index fba7718dc..12cb89299 100644 --- a/pkg/controller/deployment.go +++ b/pkg/controller/deployment.go @@ -282,7 +282,7 @@ func (dc *controller) enqueueRateLimited(deployment *v1alpha1.MachineDeployment) dc.machineDeploymentQueue.AddRateLimited(key) } -// enqueueMachineDeploymentAfter will enqueue a deployment after the provided amount of time. +// enqueueMachineDeploymentAfter will enqueue a deployment after the provided amount of time. func (dc *controller) enqueueMachineDeploymentAfter(deployment *v1alpha1.MachineDeployment, after time.Duration) { key, err := KeyFunc(deployment) if err != nil { @@ -427,8 +427,7 @@ func (dc *controller) getMachineMapForMachineDeployment(d *v1alpha1.MachineDeplo // reconcileClusterMachineDeployment will sync the deployment with the given key. // This function is not meant to be invoked concurrently with the same key. -func (dc *controller) reconcileClusterMachineDeployment(key string) error { - ctx := context.Background() +func (dc *controller) reconcileClusterMachineDeployment(ctx context.Context, key string) error { startTime := time.Now() klog.V(4).Infof("Started syncing machine deployment %q (%v)", key, startTime) defer func() { diff --git a/pkg/controller/deployment_machineset_util.go b/pkg/controller/deployment_machineset_util.go index ef09643c3..d75daaedd 100644 --- a/pkg/controller/deployment_machineset_util.go +++ b/pkg/controller/deployment_machineset_util.go @@ -217,11 +217,6 @@ func isMachineAvailable(machine *v1alpha1.Machine) bool { } func isMachineReady(machine *v1alpha1.Machine) bool { - // TODO add more conditions - if machine.Status.CurrentStatus.Phase == v1alpha1.MachineRunning { - return true - } - - return false + return machine.Status.CurrentStatus.Phase == v1alpha1.MachineRunning } diff --git a/pkg/controller/deployment_rollback_test.go b/pkg/controller/deployment_rollback_test.go index 90a7a3c6c..3dbb02ee9 100644 --- a/pkg/controller/deployment_rollback_test.go +++ b/pkg/controller/deployment_rollback_test.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/controller/deployment_rolling_test.go b/pkg/controller/deployment_rolling_test.go index a3b87244c..28ef9a11b 100644 --- a/pkg/controller/deployment_rolling_test.go +++ b/pkg/controller/deployment_rolling_test.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/controller/deployment_sync.go b/pkg/controller/deployment_sync.go index 0a11ba9e2..ab8fda2e6 100644 --- a/pkg/controller/deployment_sync.go +++ b/pkg/controller/deployment_sync.go @@ -105,7 +105,7 @@ func (dc *controller) checkPausedConditions(ctx context.Context, d *v1alpha1.Mac } var err error - d, err = dc.controlMachineClient.MachineDeployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) + _, err = dc.controlMachineClient.MachineDeployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) return err } @@ -114,10 +114,10 @@ func (dc *controller) checkPausedConditions(ctx context.Context, d *v1alpha1.Mac // rsList should come from getReplicaSetsForDeployment(d). // machineMap should come from getmachineMapForDeployment(d, rsList). // -// 1. Get all old RSes this deployment targets, and calculate the max revision number among them (maxOldV). -// 2. Get new RS this deployment targets (whose machine template matches deployment's), and update new RS's revision number to (maxOldV + 1), -// only if its revision number is smaller than (maxOldV + 1). If this step failed, we'll update it in the next deployment sync loop. -// 3. Copy new RS's revision number to deployment (update deployment's revision). If this step failed, we'll update it in the next deployment sync loop. +// 1. Get all old RSes this deployment targets, and calculate the max revision number among them (maxOldV). +// 2. Get new RS this deployment targets (whose machine template matches deployment's), and update new RS's revision number to (maxOldV + 1), +// only if its revision number is smaller than (maxOldV + 1). If this step failed, we'll update it in the next deployment sync loop. +// 3. Copy new RS's revision number to deployment (update deployment's revision). If this step failed, we'll update it in the next deployment sync loop. // // Note that currently the deployment controller is using caches to avoid querying the server for reads. // This may lead to stale reads of machine sets, thus incorrect deployment status. @@ -284,7 +284,7 @@ func (dc *controller) getNewMachineSet(ctx context.Context, d *v1alpha1.MachineD } dCopy := d.DeepCopy() dCopy.Status = newStatus - if d, err = dc.controlMachineClient.MachineDeployments(dCopy.Namespace).UpdateStatus(ctx, dCopy, metav1.UpdateOptions{}); err != nil { + if _, err = dc.controlMachineClient.MachineDeployments(dCopy.Namespace).UpdateStatus(ctx, dCopy, metav1.UpdateOptions{}); err != nil { return nil, err } } diff --git a/pkg/controller/deployment_sync_test.go b/pkg/controller/deployment_sync_test.go index 49f3a598c..eb9402d09 100644 --- a/pkg/controller/deployment_sync_test.go +++ b/pkg/controller/deployment_sync_test.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/controller/deployment_test.go b/pkg/controller/deployment_test.go index 5a21de284..e897dce05 100644 --- a/pkg/controller/deployment_test.go +++ b/pkg/controller/deployment_test.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -1582,7 +1582,7 @@ var _ = Describe("machineDeployment", func() { defer trackers.Stop() waitForCacheSync(stop, c) Key := testNamespace + "/" + testMachineDeployment.Name - c.reconcileClusterMachineDeployment(Key) + c.reconcileClusterMachineDeployment(context.TODO(), Key) waitForCacheSync(stop, c) actualMachineDeployment, _ := c.controlMachineClient.MachineDeployments(testNamespace).Get(context.Background(), testMachineDeployment.Name, metav1.GetOptions{}) diff --git a/pkg/controller/deployment_util.go b/pkg/controller/deployment_util.go index 53a0bb56e..f9f7bdfda 100644 --- a/pkg/controller/deployment_util.go +++ b/pkg/controller/deployment_util.go @@ -460,7 +460,8 @@ var annotationsToSkip = map[string]bool{ // skipCopyAnnotation returns true if we should skip copying the annotation with the given annotation key // TODO: How to decide which annotations should / should not be copied? -// See https://github.com/kubernetes/kubernetes/pull/20035#issuecomment-179558615 +// +// See https://github.com/kubernetes/kubernetes/pull/20035#issuecomment-179558615 func skipCopyAnnotation(key string) bool { return annotationsToSkip[key] } diff --git a/pkg/controller/deployment_util_test.go b/pkg/controller/deployment_util_test.go index 9d5c5637b..f95c582a8 100644 --- a/pkg/controller/deployment_util_test.go +++ b/pkg/controller/deployment_util_test.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/controller/drain.go b/pkg/controller/drain.go index f09b54b46..983a543aa 100644 --- a/pkg/controller/drain.go +++ b/pkg/controller/drain.go @@ -486,7 +486,6 @@ func (o *DrainOptions) evictPodsWithoutPv(ctx context.Context, attemptEvict bool for _, pod := range pods { go o.evictPodWithoutPVInternal(ctx, attemptEvict, pod, policyGroupVersion, getPodFn, returnCh) } - return } func sortPodsByPriority(pods []*corev1.Pod) { @@ -550,7 +549,7 @@ func filterSharedPVs(pvMap map[string][]string) { for pod, vols := range pvMap { volList := []string{} for _, vol := range vols { - if sharedVol[vol] == false { + if !sharedVol[vol] { volList = append(volList, vol) } } @@ -613,8 +612,6 @@ func (o *DrainOptions) evictPodsWithPv(ctx context.Context, attemptEvict bool, p returnCh <- fmt.Errorf("Error deleting pod %s/%s from node %q", pod.Namespace, pod.Name, pod.Spec.NodeName) } } - - return } func (o *DrainOptions) evictPodsWithPVInternal(ctx context.Context, attemptEvict bool, pods []*corev1.Pod, volMap map[string][]string, diff --git a/pkg/controller/drain_test.go b/pkg/controller/drain_test.go index 9693008b7..3842dfd62 100644 --- a/pkg/controller/drain_test.go +++ b/pkg/controller/drain_test.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -257,7 +257,7 @@ var _ = Describe("drain", func() { go func() { defer wg.Done() runPodDrainHandlers(pod) - fmt.Fprintf(GinkgoWriter, "Drained pod %s/%s in %s\n", pod.Namespace, pod.Name, time.Now().Sub(start).String()) + fmt.Fprintf(GinkgoWriter, "Drained pod %s/%s in %s\n", pod.Namespace, pod.Name, time.Since(start).String()) }() nEvictions++ @@ -289,7 +289,7 @@ var _ = Describe("drain", func() { go func() { defer wg.Done() runPodDrainHandlers(pod) - fmt.Fprintf(GinkgoWriter, "Drained pod %s/%s in %s\n", pod.Namespace, pod.Name, time.Now().Sub(start).String()) + fmt.Fprintf(GinkgoWriter, "Drained pod %s/%s in %s\n", pod.Namespace, pod.Name, time.Since(start).String()) }() default: err = fmt.Errorf("Expected type k8stesting.GetAction but got %T", action) diff --git a/pkg/controller/gcpmachineclass.go b/pkg/controller/gcpmachineclass.go index d9336afce..303995b5a 100644 --- a/pkg/controller/gcpmachineclass.go +++ b/pkg/controller/gcpmachineclass.go @@ -131,8 +131,7 @@ func (c *controller) gcpMachineClassDelete(obj interface{}) { // reconcileClusterGCPMachineClassKey reconciles an GCPMachineClass due to controller resync // or an event on the gcpMachineClass. -func (c *controller) reconcileClusterGCPMachineClassKey(key string) error { - ctx := context.Background() +func (c *controller) reconcileClusterGCPMachineClassKey(ctx context.Context, key string) error { _, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err diff --git a/pkg/controller/gcpmachineclass_test.go b/pkg/controller/gcpmachineclass_test.go index 4e2d697d8..a30634d76 100644 --- a/pkg/controller/gcpmachineclass_test.go +++ b/pkg/controller/gcpmachineclass_test.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/controller/machine.go b/pkg/controller/machine.go index 9b1977dbb..7668325c8 100644 --- a/pkg/controller/machine.go +++ b/pkg/controller/machine.go @@ -58,8 +58,8 @@ const ( ) /* - SECTION - Machine controller - Machine add, update, delete watches +SECTION +Machine controller - Machine add, update, delete watches */ func (c *controller) addMachine(obj interface{}) { klog.V(4).Infof("Adding machine object") @@ -113,8 +113,7 @@ func (c *controller) enqueueMachineAfter(obj interface{}, after time.Duration) { } } -func (c *controller) reconcileClusterMachineKey(key string) error { - ctx := context.Background() +func (c *controller) reconcileClusterMachineKey(ctx context.Context, key string) error { _, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err @@ -142,7 +141,7 @@ func (c *controller) reconcileClusterMachine(ctx context.Context, machine *v1alp }() if c.safetyOptions.MachineControllerFrozen && machine.DeletionTimestamp == nil { - message := "Machine controller has frozen. Retrying reconcile after 10 minutes" + message := "machine controller has frozen. Retrying reconcile after 10 minutes" klog.V(3).Info(message) return errors.New(message) } @@ -157,9 +156,9 @@ func (c *controller) reconcileClusterMachine(ctx context.Context, machine *v1alp if err != nil { return err } - validationerr := validation.ValidateMachine(internalMachine) - if validationerr.ToAggregate() != nil && len(validationerr.ToAggregate().Errors()) > 0 { - klog.Errorf("Validation of Machine failed %s", validationerr.ToAggregate().Error()) + validationErr := validation.ValidateMachine(internalMachine) + if validationErr.ToAggregate() != nil && len(validationErr.ToAggregate().Errors()) > 0 { + klog.Errorf("Validation of Machine failed %s", validationErr.ToAggregate().Error()) return nil } @@ -170,8 +169,8 @@ func (c *controller) reconcileClusterMachine(ctx context.Context, machine *v1alp return nil } - driver := driver.NewDriver(machine.Spec.ProviderID, secretData, machine.Spec.Class.Kind, MachineClass, machine.Name) - actualProviderID, err := driver.GetExisting() + d := driver.NewDriver(machine.Spec.ProviderID, secretData, machine.Spec.Class.Kind, MachineClass, machine.Name) + actualProviderID, err := d.GetExisting() if err != nil { return err } else if actualProviderID == "fake" { @@ -208,7 +207,7 @@ func (c *controller) reconcileClusterMachine(ctx context.Context, machine *v1alp if machine.DeletionTimestamp != nil { // Processing of delete event - if err := c.machineDelete(ctx, machine, driver); err != nil { + if err := c.machineDelete(ctx, machine, d); err != nil { c.enqueueMachineAfter(machine, MachineEnqueueRetryPeriod) return nil } @@ -222,7 +221,7 @@ func (c *controller) reconcileClusterMachine(ctx context.Context, machine *v1alp if machine.Status.CurrentStatus.Phase == v1alpha1.MachineFailed { return nil } else if actualProviderID == "" { - if err := c.machineCreate(ctx, machine, driver); err != nil { + if err := c.machineCreate(ctx, machine, d); err != nil { c.enqueueMachineAfter(machine, MachineEnqueueRetryPeriod) return nil } @@ -237,8 +236,8 @@ func (c *controller) reconcileClusterMachine(ctx context.Context, machine *v1alp } /* - SECTION - Machine controller - nodeToMachine +SECTION +Machine controller - nodeToMachine */ func (c *controller) addNodeToMachine(obj interface{}) { node := obj.(*corev1.Node) @@ -305,7 +304,7 @@ func (c *controller) getMachineFromNode(nodeName string) (*v1alpha1.Machine, err machines, _ := c.machineLister.List(selector) if len(machines) > 1 { - return nil, errors.New("Multiple machines matching node") + return nil, errors.New("multiple machines matching node") } else if len(machines) < 1 { return nil, nil } @@ -334,7 +333,7 @@ func (c *controller) updateMachineState(ctx context.Context, machine *v1alpha1.M nodeName = node.Name clone := machine.DeepCopy() clone.Status.Node = nodeName - clone, err = c.controlMachineClient.Machines(clone.Namespace).UpdateStatus(ctx, clone, metav1.UpdateOptions{}) + _, err = c.controlMachineClient.Machines(clone.Namespace).UpdateStatus(ctx, clone, metav1.UpdateOptions{}) if err != nil { klog.Errorf("Could not update status of the machine-object %s due to error %v", machine.Name, err) return machine, err @@ -342,10 +341,10 @@ func (c *controller) updateMachineState(ctx context.Context, machine *v1alpha1.M break } } - // Couldnt adopt any node-object. + // Couldn't adopt any node-object. if nodeName == "" { // There are no objects mapped to this machine object - // Hence node status need not be propogated to machine object + // Hence node status need not be propagated to machine object return machine, nil } } @@ -519,7 +518,7 @@ func (c *controller) machineCreate(ctx context.Context, machine *v1alpha1.Machin } // Return with error - return fmt.Errorf("Couldn't find machine object, hence deleted orphan VM") + return fmt.Errorf("couldn't find machine object, hence deleted orphan VM") } klog.Warningf("Machine GET failed for %q. Retrying, error: %s", machineName, err) @@ -660,7 +659,7 @@ func (c *controller) machineDelete(ctx context.Context, machine *v1alpha1.Machin timeOutOccurred = utiltime.HasTimeOutOccurred(*machine.DeletionTimestamp, timeOutDuration) if forceDeleteLabelPresent || timeOutOccurred { - // To perform forceful machine drain/delete either one of the below conditions must be satified + // To perform forceful machine drain/delete either one of the below conditions must be satisfied // 1. force-deletion: "True" label must be present // 2. Deletion operation is more than drain-timeout minutes old forceDeleteMachine = true @@ -699,7 +698,7 @@ func (c *controller) machineDelete(ctx context.Context, machine *v1alpha1.Machin } if machineID != "" && nodeName != "" { - // Begin drain logic only when the nodeName & providerID exist's for the machine + // Begin drain logic only when the nodeName & providerID exists for the machine buf := bytes.NewBuffer([]byte{}) errBuf := bytes.NewBuffer([]byte{}) @@ -792,7 +791,7 @@ func (c *controller) machineDelete(ctx context.Context, machine *v1alpha1.Machin // Delete node object err = c.targetCoreClient.CoreV1().Nodes().Delete(ctx, nodeName, metav1.DeleteOptions{}) if err != nil && !apierrors.IsNotFound(err) { - // If its an error, and anyother error than object not found + // If it's an error, and another error than object not found message := fmt.Sprintf("Deletion of Node Object %q failed due to error: %s", nodeName, err) lastOperation := v1alpha1.LastOperation{ Description: message, @@ -812,7 +811,7 @@ func (c *controller) machineDelete(ctx context.Context, machine *v1alpha1.Machin // Delete machine object err = c.controlMachineClient.Machines(machine.Namespace).Delete(ctx, machine.Name, metav1.DeleteOptions{}) if err != nil && !apierrors.IsNotFound(err) { - // If its an error, and anyother error than object not found + // If it's an error, and another error than object not found klog.Errorf("Deletion of Machine Object %q failed due to error: %s", machine.Name, err) return err } @@ -919,7 +918,7 @@ func (c *controller) updateMachineConditions(ctx context.Context, machine *v1alp objectRequiresUpdate = true } else if c.isHealthy(clone) && clone.Status.CurrentStatus.Phase != v1alpha1.MachineRunning { - // If machine is healhy and current machinePhase is not running. + // If machine is healthy and current machinePhase is not running. // indicates that the machine is not healthy and status needs to be updated. if clone.Status.LastOperation.Type == v1alpha1.MachineOperationCreate && @@ -1011,8 +1010,8 @@ func (c *controller) deleteMachineFinalizers(ctx context.Context, machine *v1alp } /* - SECTION - Helper Functions +SECTION +Helper Functions */ func (c *controller) isHealthy(machine *v1alpha1.Machine) bool { numOfConditions := len(machine.Status.Conditions) @@ -1084,7 +1083,7 @@ func (c *controller) checkMachineTimeout(ctx context.Context, machine *v1alpha1. timeOutDuration, ) } else { - // Timeour occurred due to machine being unhealthy for too long + // Timeout occurred due to machine being unhealthy for too long description = fmt.Sprintf( "Machine %s is not healthy since %s minutes. Changing status to failed. Node Conditions: %+v", machine.Name, diff --git a/pkg/controller/machine_safety.go b/pkg/controller/machine_safety.go index d1852d4cb..7a9e923ae 100644 --- a/pkg/controller/machine_safety.go +++ b/pkg/controller/machine_safety.go @@ -47,8 +47,7 @@ const ( ) // reconcileClusterMachineSafetyOrphanVMs checks for any orphan VMs and deletes them -func (c *controller) reconcileClusterMachineSafetyOrphanVMs(key string) error { - ctx := context.Background() +func (c *controller) reconcileClusterMachineSafetyOrphanVMs(ctx context.Context, key string) error { reSyncAfter := c.safetyOptions.MachineSafetyOrphanVMsPeriod.Duration defer c.machineSafetyOrphanVMsQueue.AddAfter("", reSyncAfter) @@ -62,48 +61,39 @@ func (c *controller) reconcileClusterMachineSafetyOrphanVMs(key string) error { // reconcileClusterMachineSafetyOvershooting checks all machineSet/machineDeployment // if the number of machine objects backing them is way beyond its desired replicas -func (c *controller) reconcileClusterMachineSafetyOvershooting(key string) error { - ctx := context.Background() - stopCh := make(chan struct{}) - defer close(stopCh) - +func (c *controller) reconcileClusterMachineSafetyOvershooting(ctx context.Context, key string) error { reSyncAfter := c.safetyOptions.MachineSafetyOvershootingPeriod.Duration defer c.machineSafetyOvershootingQueue.AddAfter("", reSyncAfter) klog.V(4).Infof("reconcileClusterMachineSafetyOvershooting: Start") defer klog.V(4).Infof("reconcileClusterMachineSafetyOvershooting: End, reSync-Period: %v", reSyncAfter) - err := c.checkAndFreezeORUnfreezeMachineSets(ctx) - if err != nil { + if err := c.checkAndFreezeORUnfreezeMachineSets(ctx); err != nil { klog.Errorf("SafetyController: %v", err) } - cache.WaitForCacheSync(stopCh, c.machineSetSynced, c.machineDeploymentSynced) + cache.WaitForCacheSync(ctx.Done(), c.machineSetSynced, c.machineDeploymentSynced) - err = c.syncMachineDeploymentFreezeState(ctx) - if err != nil { + if err := c.syncMachineDeploymentFreezeState(ctx); err != nil { klog.Errorf("SafetyController: %v", err) } - cache.WaitForCacheSync(stopCh, c.machineDeploymentSynced) + cache.WaitForCacheSync(ctx.Done(), c.machineDeploymentSynced) - err = c.unfreezeMachineDeploymentsWithUnfreezeAnnotation(ctx) - if err != nil { + if err := c.unfreezeMachineDeploymentsWithUnfreezeAnnotation(ctx); err != nil { klog.Errorf("SafetyController: %v", err) } - cache.WaitForCacheSync(stopCh, c.machineSetSynced) + cache.WaitForCacheSync(ctx.Done(), c.machineSetSynced) - err = c.unfreezeMachineSetsWithUnfreezeAnnotation(ctx) - if err != nil { + if err := c.unfreezeMachineSetsWithUnfreezeAnnotation(ctx); err != nil { klog.Errorf("SafetyController: %v", err) } - return err + return nil } // reconcileClusterMachineSafetyAPIServer checks control and target clusters // and checks if their APIServer's are reachable // If they are not reachable, they set a machineControllerFreeze flag -func (c *controller) reconcileClusterMachineSafetyAPIServer(key string) error { - ctx := context.Background() +func (c *controller) reconcileClusterMachineSafetyAPIServer(ctx context.Context, key string) error { statusCheckTimeout := c.safetyOptions.MachineSafetyAPIServerStatusCheckTimeout.Duration statusCheckPeriod := c.safetyOptions.MachineSafetyAPIServerStatusCheckPeriod.Duration @@ -163,7 +153,7 @@ func (c *controller) reconcileClusterMachineSafetyAPIServer(key string) error { // If timeout has not started c.safetyOptions.APIserverInactiveStartTime = time.Now() } - if time.Now().Sub(c.safetyOptions.APIserverInactiveStartTime) > statusCheckTimeout { + if time.Since(c.safetyOptions.APIserverInactiveStartTime) > statusCheckTimeout { // If APIServer has been down for more than statusCheckTimeout c.safetyOptions.MachineControllerFrozen = true klog.V(2).Infof("SafetyController: Freezing Machine Controller") @@ -333,7 +323,6 @@ func (c *controller) checkAndFreezeORUnfreezeMachineSets(ctx context.Context) er } for _, machineSet := range machineSets { - filteredMachines, err := c.machineLister.List(labels.Everything()) if err != nil { klog.Error("SafetyController: Error while trying to LIST machines - ", err) @@ -567,10 +556,7 @@ func (c *controller) checkMachineClass( // Making sure that its not a VM just being created, machine object not yet updated at API server if len(listOfVMs) > 1 { - stopCh := make(chan struct{}) - defer close(stopCh) - - if !cache.WaitForCacheSync(stopCh, c.machineSynced) { + if !cache.WaitForCacheSync(ctx.Done(), c.machineSynced) { klog.Errorf("SafetyController: Timed out waiting for caches to sync. Error: %s", err) return } @@ -660,7 +646,6 @@ func (c *controller) deleteOrphanVM(vm driver.VMs, secretData map[string][]byte, // freezeMachineSetAndDeployment freezes machineSet and machineDeployment (who is the owner of the machineSet) func (c *controller) freezeMachineSetAndDeployment(ctx context.Context, machineSet *v1alpha1.MachineSet, reason string, message string) error { - klog.V(2).Infof("SafetyController: Freezing MachineSet %q due to %q", machineSet.Name, reason) // Get the latest version of the machineSet so that we can avoid conflicts diff --git a/pkg/controller/machine_safety_test.go b/pkg/controller/machine_safety_test.go index b4437eae6..abf1060bd 100644 --- a/pkg/controller/machine_safety_test.go +++ b/pkg/controller/machine_safety_test.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -221,8 +221,6 @@ var _ = Describe("#machine_safety", func() { machineDeploymentLabels map[string]string machineDeploymentConditions []v1alpha1.MachineDeploymentCondition } - type action struct { - } type expect struct { machineSetAnnotations map[string]string machineSetLabels map[string]string @@ -233,7 +231,6 @@ var _ = Describe("#machine_safety", func() { } type data struct { setup setup - action action expect expect } @@ -321,7 +318,7 @@ var _ = Describe("#machine_safety", func() { Expect(err).To(BeNil()) Expect(len(machines)).To(Equal(data.setup.machineReplicas)) - c.reconcileClusterMachineSafetyOvershooting("") + c.reconcileClusterMachineSafetyOvershooting(context.TODO(), "") ms, err := c.controlMachineClient.MachineSets(testNamespace).List(context.TODO(), metav1.ListOptions{}) Expect(err).To(BeNil()) @@ -807,14 +804,14 @@ var _ = Describe("#machine_safety", func() { c.safetyOptions.APIserverInactiveStartTime = apiServerInactiveStartTime c.safetyOptions.MachineControllerFrozen = preMachineControllerIsFrozen if !controlAPIServerIsUp { - trackers.ControlMachine.SetError("APIServer is Not Reachable") - trackers.ControlCore.SetError("APIServer is Not Reachable") + Expect(trackers.ControlMachine.SetError("APIServer is Not Reachable")).NotTo(HaveOccurred()) + Expect(trackers.ControlCore.SetError("APIServer is Not Reachable")).NotTo(HaveOccurred()) } if !targetAPIServerIsUp { - trackers.TargetCore.SetError("APIServer is Not Reachable") + Expect(trackers.TargetCore.SetError("APIServer is Not Reachable")).NotTo(HaveOccurred()) } - c.reconcileClusterMachineSafetyAPIServer("") + c.reconcileClusterMachineSafetyAPIServer(context.TODO(), "") Expect(c.safetyOptions.MachineControllerFrozen).Should(Equal(postMachineControllerFrozen)) }, diff --git a/pkg/controller/machine_test.go b/pkg/controller/machine_test.go index a3761519b..2ded098ac 100644 --- a/pkg/controller/machine_test.go +++ b/pkg/controller/machine_test.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -1634,7 +1634,6 @@ var _ = Describe("machine", func() { } type expect struct { machine *machinev1.Machine - err bool } type data struct { setup setup @@ -1670,7 +1669,7 @@ var _ = Describe("machine", func() { action := data.action machine, err := controller.controlMachineClient.Machines(objMeta.Namespace).Get(context.TODO(), action.machine, metav1.GetOptions{}) - //Expect(err).ToNot(HaveOccurred()) + Expect(err).ToNot(HaveOccurred()) controller.checkMachineTimeout(context.TODO(), machine) @@ -1852,7 +1851,6 @@ var _ = Describe("machine", func() { } type expect struct { machine *machinev1.Machine - err bool } type data struct { setup setup diff --git a/pkg/controller/machine_util.go b/pkg/controller/machine_util.go index 0907eca0e..85d5b093c 100644 --- a/pkg/controller/machine_util.go +++ b/pkg/controller/machine_util.go @@ -58,7 +58,8 @@ var ( ) // TODO: use client library instead when it starts to support update retries -// see https://github.com/kubernetes/kubernetes/issues/21479 +// +// see https://github.com/kubernetes/kubernetes/issues/21479 type updateMachineFunc func(machine *v1alpha1.Machine) error // UpdateMachineWithRetries updates a machine with given applyUpdate function. Note that machine not found error is ignored. diff --git a/pkg/controller/machine_util_test.go b/pkg/controller/machine_util_test.go index 5b648770e..504cfc3df 100644 --- a/pkg/controller/machine_util_test.go +++ b/pkg/controller/machine_util_test.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/controller/machineset.go b/pkg/controller/machineset.go index 499095be2..eff0eb2cc 100644 --- a/pkg/controller/machineset.go +++ b/pkg/controller/machineset.go @@ -431,10 +431,7 @@ func (c *controller) manageReplicas(ctx context.Context, allMachines []*v1alpha1 // syncMachineSet will sync the MachineSet with the given key if it has had its expectations fulfilled, // meaning it did not expect to see any more of its machines created or deleted. This function is not meant to be // invoked concurrently with the same key. -func (c *controller) reconcileClusterMachineSet(key string) error { - - ctx := context.Background() - +func (c *controller) reconcileClusterMachineSet(ctx context.Context, key string) error { startTime := time.Now() klog.V(4).Infof("Start syncing machine set %q", key) defer func() { diff --git a/pkg/controller/machineset_test.go b/pkg/controller/machineset_test.go index 0c328d6a0..2ffea5e3c 100644 --- a/pkg/controller/machineset_test.go +++ b/pkg/controller/machineset_test.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -971,7 +971,7 @@ var _ = Describe("machineset", func() { Expect(len(machines.Items)).To(Equal(int(0))) Key := testNamespace + "/" + testMachineSet.Name - Err := c.reconcileClusterMachineSet(Key) + Err := c.reconcileClusterMachineSet(context.TODO(), Key) waitForCacheSync(stop, c) machines, _ = c.controlMachineClient.Machines(testNamespace).List(context.TODO(), metav1.ListOptions{}) @@ -993,7 +993,7 @@ var _ = Describe("machineset", func() { Expect(len(machines.Items)).To(Equal(int(0))) Key := testNamespace + "/" + testMachineSet.Name - Err := c.reconcileClusterMachineSet(Key) + Err := c.reconcileClusterMachineSet(context.TODO(), Key) waitForCacheSync(stop, c) Expect(Err).Should(BeNil()) @@ -1015,7 +1015,7 @@ var _ = Describe("machineset", func() { Expect(len(machines.Items)).To(Equal(int(0))) Key := testNamespace + "/" + testMachineSet.Name - Err := c.reconcileClusterMachineSet(Key) + Err := c.reconcileClusterMachineSet(context.TODO(), Key) waitForCacheSync(stop, c) Expect(Err).Should(BeNil()) @@ -1038,7 +1038,7 @@ var _ = Describe("machineset", func() { Expect(len(machines.Items)).To(Equal(int(0))) Key := testNamespace + "/" + testMachineSet.Name - Err := c.reconcileClusterMachineSet(Key) + Err := c.reconcileClusterMachineSet(context.TODO(), Key) waitForCacheSync(stop, c) machines, _ = c.controlMachineClient.Machines(testNamespace).List(context.Background(), metav1.ListOptions{}) diff --git a/pkg/controller/machineset_util.go b/pkg/controller/machineset_util.go index 906209f8b..645faeb94 100644 --- a/pkg/controller/machineset_util.go +++ b/pkg/controller/machineset_util.go @@ -40,7 +40,8 @@ import ( ) // TODO: use client library instead when it starts to support update retries -// see https://github.com/kubernetes/kubernetes/issues/21479 +// +// see https://github.com/kubernetes/kubernetes/issues/21479 type updateISFunc func(is *v1alpha1.MachineSet) error // UpdateISWithRetries updates a RS with given applyUpdate function. Note that RS not found error is ignored. diff --git a/pkg/controller/node.go b/pkg/controller/node.go index 77fd14b8e..3cbcfcc1c 100644 --- a/pkg/controller/node.go +++ b/pkg/controller/node.go @@ -18,6 +18,7 @@ limitations under the License. package controller import ( + "context" v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" @@ -48,7 +49,7 @@ func (c *controller) nodeDelete(obj interface{}) { } // Not being used at the moment, saving it for a future use case. -func (c *controller) reconcileClusterNodeKey(key string) error { +func (c *controller) reconcileClusterNodeKey(ctx context.Context, key string) error { node, err := c.nodeLister.Get(key) if apierrors.IsNotFound(err) { return nil diff --git a/pkg/controller/openstackmachineclass.go b/pkg/controller/openstackmachineclass.go index 3f2ed5bf7..0b88f6670 100644 --- a/pkg/controller/openstackmachineclass.go +++ b/pkg/controller/openstackmachineclass.go @@ -131,8 +131,7 @@ func (c *controller) openStackMachineClassDelete(obj interface{}) { // reconcileClusterOpenStackMachineClassKey reconciles an OpenStackMachineClass due to controller resync // or an event on the openStackMachineClass. -func (c *controller) reconcileClusterOpenStackMachineClassKey(key string) error { - ctx := context.Background() +func (c *controller) reconcileClusterOpenStackMachineClassKey(ctx context.Context, key string) error { _, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err diff --git a/pkg/controller/packetcloudmachineclass.go b/pkg/controller/packetcloudmachineclass.go index 77a8b90c4..af56f6718 100644 --- a/pkg/controller/packetcloudmachineclass.go +++ b/pkg/controller/packetcloudmachineclass.go @@ -131,8 +131,7 @@ func (c *controller) packetMachineClassDelete(obj interface{}) { // reconcileClusterPacketMachineClassKey reconciles a PacketMachineClass due to controller resync // or an event on the packetMachineClass. -func (c *controller) reconcileClusterPacketMachineClassKey(key string) error { - ctx := context.Background() +func (c *controller) reconcileClusterPacketMachineClassKey(ctx context.Context, key string) error { _, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err diff --git a/pkg/controller/secret.go b/pkg/controller/secret.go index 91924cd88..b554853ed 100644 --- a/pkg/controller/secret.go +++ b/pkg/controller/secret.go @@ -34,8 +34,7 @@ import ( // reconcileClusterSecretKey reconciles an secret due to controller resync // or an event on the secret -func (c *controller) reconcileClusterSecretKey(key string) error { - ctx := context.Background() +func (c *controller) reconcileClusterSecretKey(ctx context.Context, key string) error { namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err diff --git a/pkg/controller/secret_test.go b/pkg/controller/secret_test.go index 11d514db6..55ca8d1bb 100644 --- a/pkg/controller/secret_test.go +++ b/pkg/controller/secret_test.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/driver/driver_alicloud.go b/pkg/driver/driver_alicloud.go index a0b5ff3ff..122d75e2c 100644 --- a/pkg/driver/driver_alicloud.go +++ b/pkg/driver/driver_alicloud.go @@ -326,12 +326,12 @@ func (c *AlicloudDriver) GetVolNames(specs []corev1.PersistentVolumeSpec) ([]str return names, nil } -//GetUserData return the used data whit which the VM will be booted +// GetUserData return the used data whit which the VM will be booted func (c *AlicloudDriver) GetUserData() string { return c.UserData } -//SetUserData set the used data whit which the VM will be booted +// SetUserData set the used data whit which the VM will be booted func (c *AlicloudDriver) SetUserData(userData string) { c.UserData = userData } diff --git a/pkg/driver/driver_aws.go b/pkg/driver/driver_aws.go index 9fc607c22..f51584d46 100644 --- a/pkg/driver/driver_aws.go +++ b/pkg/driver/driver_aws.go @@ -345,7 +345,7 @@ func (d *AWSDriver) Delete(machineID string) error { return nil } - err = errors.New("Machine already terminated") + err = errors.New("machine already terminated") } klog.Errorf("Could not terminate machine: %s", err.Error()) @@ -501,12 +501,12 @@ func (d *AWSDriver) GetVolNames(specs []corev1.PersistentVolumeSpec) ([]string, return names, nil } -//GetUserData return the used data whit which the VM will be booted +// GetUserData return the used data whit which the VM will be booted func (d *AWSDriver) GetUserData() string { return d.UserData } -//SetUserData set the used data whit which the VM will be booted +// SetUserData set the used data whit which the VM will be booted func (d *AWSDriver) SetUserData(userData string) { d.UserData = userData } @@ -516,11 +516,12 @@ var awsVolumeRegMatch = regexp.MustCompile("^vol-[^/]*$") // kubernetesVolumeIDToEBSVolumeID translates Kubernetes volume ID to EBS volume ID // KubernetsVolumeID forms: -// * aws:/// -// * aws:/// -// * +// - aws:/// +// - aws:/// +// - +// // EBS Volume ID form: -// * vol- +// - vol- func kubernetesVolumeIDToEBSVolumeID(kubernetesID string) (string, error) { // name looks like aws://availability-zone/awsVolumeId diff --git a/pkg/driver/driver_azure.go b/pkg/driver/driver_azure.go index e6acd6105..c2a346f2f 100644 --- a/pkg/driver/driver_azure.go +++ b/pkg/driver/driver_azure.go @@ -384,12 +384,12 @@ func (d *AzureDriver) GetVMs(machineID string) (result VMs, err error) { return } -//GetUserData return the user data with which the VM will be booted +// GetUserData return the user data with which the VM will be booted func (d *AzureDriver) GetUserData() string { return d.UserData } -//SetUserData set the used data whit which the VM will be booted +// SetUserData set the used data whit which the VM will be booted func (d *AzureDriver) SetUserData(userData string) { d.UserData = userData } @@ -614,7 +614,7 @@ func (d *AzureDriver) createVMNicDisk() (*compute.VirtualMachine, error) { return nil, onARMAPIErrorFail(prometheusServiceVM, err, "MarketplaceAgreementsClient.Get failed for %s", d.AzureMachineClass.Name) } - if agreement.Accepted == nil || *agreement.Accepted == false { + if agreement.Accepted == nil || !*agreement.Accepted { // Need to accept the terms at least once for the subscription klog.V(2).Info("Accepting terms for subscription to make use of the plan") @@ -958,11 +958,9 @@ func (clients *azureDriverClients) deleteVMNicDisks(ctx context.Context, resourc deleters := []func() error{nicDeleter, diskDeleter} - if dataDiskNames != nil { - for _, dataDiskName := range dataDiskNames { - dataDiskDeleter := clients.getDeleterForDisk(ctx, resourceGroupName, dataDiskName) - deleters = append(deleters, dataDiskDeleter) - } + for _, dataDiskName := range dataDiskNames { + dataDiskDeleter := clients.getDeleterForDisk(ctx, resourceGroupName, dataDiskName) + deleters = append(deleters, dataDiskDeleter) } return runInParallel(deleters) @@ -1156,7 +1154,7 @@ func notFound(err error) bool { func retrieveRequestID(err error) (bool, string, *autorest.DetailedError) { switch err.(type) { case autorest.DetailedError: - detailedErr := autorest.DetailedError(err.(autorest.DetailedError)) + detailedErr := err.(autorest.DetailedError) if detailedErr.Response != nil { requestID := strings.Join(detailedErr.Response.Header["X-Ms-Request-Id"], "") return true, requestID, &detailedErr diff --git a/pkg/driver/driver_fake.go b/pkg/driver/driver_fake.go index 61710c145..5c2099a52 100644 --- a/pkg/driver/driver_fake.go +++ b/pkg/driver/driver_fake.go @@ -88,12 +88,12 @@ func (d *FakeDriver) GetVolNames(specs []corev1.PersistentVolumeSpec) ([]string, return volNames, nil } -//GetUserData return the used data whit which the VM will be booted +// GetUserData return the used data whit which the VM will be booted func (d *FakeDriver) GetUserData() string { return "" } -//SetUserData set the used data whit which the VM will be booted +// SetUserData set the used data whit which the VM will be booted func (d *FakeDriver) SetUserData(userData string) { return } diff --git a/pkg/driver/driver_gcp.go b/pkg/driver/driver_gcp.go index f91cd901d..41aef5975 100644 --- a/pkg/driver/driver_gcp.go +++ b/pkg/driver/driver_gcp.go @@ -92,7 +92,7 @@ func (d *GCPDriver) Create() (string, string, error) { for _, disk := range d.GCPMachineClass.Spec.Disks { var attachedDisk compute.AttachedDisk autoDelete := false - if disk.AutoDelete == nil || *disk.AutoDelete == true { + if disk.AutoDelete == nil || *disk.AutoDelete { autoDelete = true } if disk.Type == "SCRATCH" { @@ -137,7 +137,7 @@ func (d *GCPDriver) Create() (string, string, error) { for _, nic := range d.GCPMachineClass.Spec.NetworkInterfaces { computeNIC := &compute.NetworkInterface{} - if nic.DisableExternalIP == false { + if !nic.DisableExternalIP { // When DisableExternalIP is false, implies Attach an external IP to VM computeNIC.AccessConfigs = []*compute.AccessConfig{{}} } @@ -386,12 +386,12 @@ func (d *GCPDriver) GetVolNames(specs []corev1.PersistentVolumeSpec) ([]string, return names, nil } -//GetUserData return the used data whit which the VM will be booted +// GetUserData return the used data whit which the VM will be booted func (d *GCPDriver) GetUserData() string { return d.UserData } -//SetUserData set the used data whit which the VM will be booted +// SetUserData set the used data whit which the VM will be booted func (d *GCPDriver) SetUserData(userData string) { d.UserData = userData } diff --git a/pkg/driver/driver_openstack.go b/pkg/driver/driver_openstack.go index ff2d28e9b..79c91d55a 100644 --- a/pkg/driver/driver_openstack.go +++ b/pkg/driver/driver_openstack.go @@ -613,12 +613,12 @@ func (d *OpenStackDriver) GetVolNames(specs []corev1.PersistentVolumeSpec) ([]st return names, nil } -//GetUserData return the used data whit which the VM will be booted +// GetUserData return the used data whit which the VM will be booted func (d *OpenStackDriver) GetUserData() string { return d.UserData } -//SetUserData set the used data whit which the VM will be booted +// SetUserData set the used data whit which the VM will be booted func (d *OpenStackDriver) SetUserData(userData string) { d.UserData = userData } diff --git a/pkg/driver/driver_packet.go b/pkg/driver/driver_packet.go index 83cdb5d1f..41d66728f 100644 --- a/pkg/driver/driver_packet.go +++ b/pkg/driver/driver_packet.go @@ -179,12 +179,12 @@ func (d *PacketDriver) GetVolNames(specs []corev1.PersistentVolumeSpec) ([]strin return names, fmt.Errorf("Not implemented yet") } -//GetUserData return the used data whit which the VM will be booted +// GetUserData return the used data whit which the VM will be booted func (d *PacketDriver) GetUserData() string { return d.UserData } -//SetUserData set the used data whit which the VM will be booted +// SetUserData set the used data whit which the VM will be booted func (d *PacketDriver) SetUserData(userData string) { d.UserData = userData } diff --git a/pkg/test/integration/common/framework.go b/pkg/test/integration/common/framework.go index 6394abf87..515366c05 100644 --- a/pkg/test/integration/common/framework.go +++ b/pkg/test/integration/common/framework.go @@ -5,7 +5,6 @@ import ( "encoding/json" "fmt" "io" - "io/ioutil" "log" "os" "os/exec" @@ -81,12 +80,12 @@ var ( isTagsStrings = os.Getenv("TAGS_ARE_STRINGS") ) -//ProviderSpecPatch struct holds tags for provider, which we want to patch the machineclass with +// ProviderSpecPatch struct holds tags for provider, which we want to patch the machineclass with type ProviderSpecPatch struct { Tags []string `json:"tags"` } -//MachineClassPatch struct holds values of patch for machine class for provider GCP +// MachineClassPatch struct holds values of patch for machine class for provider GCP type MachineClassPatch struct { ProviderSpec ProviderSpecPatch `json:"providerSpec"` } @@ -485,7 +484,7 @@ func (c *IntegrationTestFramework) updatePatchFile() { data, _ := json.MarshalIndent(patchMachineClassData, "", " ") //writing to machine-class-patch.json - _ = ioutil.WriteFile(filepath.Join("..", "..", "..", + _ = os.WriteFile(filepath.Join("..", "..", "..", ".ci", "controllers-test", "machine-class-patch.json"), data, 0644) @@ -672,15 +671,22 @@ func rotateLogFile(fileName string) (*os.File, error) { if _, err := os.Stat(fileName); err == nil { // !strings.Contains(err.Error(), "no such file or directory") { for i := 9; i > 0; i-- { - os.Rename(fmt.Sprintf("%s.%d", fileName, i), fmt.Sprintf("%s.%d", fileName, i+1)) + oldName := fmt.Sprintf("%s.%d", fileName, i) + newName := fmt.Sprintf("%s.%d", fileName, i+1) + if err := os.Rename(oldName, newName); err != nil { + return nil, fmt.Errorf("failed to rename %s to %s: %w", oldName, newName, err) + } + } + newName := fmt.Sprintf("%s.%d", fileName, 1) + if err := os.Rename(fileName, newName); err != nil { + return nil, fmt.Errorf("failed to rename %s to %s: %w", fileName, newName, err) } - os.Rename(fileName, fmt.Sprintf("%s.%d", fileName, 1)) } return os.Create(fileName) } -//runControllersLocally run the machine controller and machine controller manager binary locally +// runControllersLocally run the machine controller and machine controller manager binary locally func (c *IntegrationTestFramework) runControllersLocally() { ginkgo.By("Starting Machine Controller ") args := strings.Fields( @@ -1036,7 +1042,7 @@ func (c *IntegrationTestFramework) ControllerTests() { }, ).Stream(ctx) gomega.Expect(err).NotTo(gomega.HaveOccurred()) - io.Copy(mcOutputFile, readCloser) + _, err = io.Copy(mcOutputFile, readCloser) gomega.Expect(err).NotTo(gomega.HaveOccurred()) } else { readCloser, err := c.ControlCluster.Clientset.CoreV1(). @@ -1045,7 +1051,7 @@ func (c *IntegrationTestFramework) ControllerTests() { Container: containers[i].Name, }).Stream(ctx) gomega.Expect(err).NotTo(gomega.HaveOccurred()) - io.Copy(mcmOutputFile, readCloser) + _, err = io.Copy(mcmOutputFile, readCloser) gomega.Expect(err).NotTo(gomega.HaveOccurred()) } } @@ -1054,14 +1060,14 @@ func (c *IntegrationTestFramework) ControllerTests() { ginkgo.By("Searching for Froze in mcm log file") frozeRegexp, _ := regexp.Compile(` Froze MachineSet`) gomega.Eventually(func() bool { - data, _ := ioutil.ReadFile(mcmLogFile) + data, _ := os.ReadFile(mcmLogFile) return frozeRegexp.Match(data) }, c.timeout, c.pollingInterval).Should(gomega.BeTrue()) ginkgo.By("Searching Unfroze in mcm log file") unfrozeRegexp, _ := regexp.Compile(` Unfroze MachineSet`) gomega.Eventually(func() bool { - data, _ := ioutil.ReadFile(mcmLogFile) + data, _ := os.ReadFile(mcmLogFile) return unfrozeRegexp.Match(data) }, c.timeout, c.pollingInterval).Should(gomega.BeTrue()) }) @@ -1174,7 +1180,7 @@ func (c *IntegrationTestFramework) ControllerTests() { }) } -//Cleanup performs rollback of original resources and removes any machines created by the test +// Cleanup performs rollback of original resources and removes any machines created by the test func (c *IntegrationTestFramework) Cleanup() { ctx := context.Background() diff --git a/pkg/test/integration/common/helpers/cluster.go b/pkg/test/integration/common/helpers/cluster.go index 4b34411dc..d0b8f8059 100644 --- a/pkg/test/integration/common/helpers/cluster.go +++ b/pkg/test/integration/common/helpers/cluster.go @@ -13,7 +13,7 @@ import ( "k8s.io/client-go/tools/clientcmd" ) -//Cluster type to hold cluster specific details +// Cluster type to hold cluster specific details type Cluster struct { restConfig *rest.Config Clientset *kubernetes.Clientset @@ -90,7 +90,7 @@ func (c *Cluster) IsSeed(target *Cluster) bool { return false } -//ClusterName retrieves cluster name from the kubeconfig +// ClusterName retrieves cluster name from the kubeconfig func (c *Cluster) ClusterName() (string, error) { var clusterName string config, err := clientcmd.LoadFromFile(c.KubeConfigFilePath) diff --git a/pkg/test/integration/common/helpers/handling_files.go b/pkg/test/integration/common/helpers/handling_files.go index ee1c25e84..00175f5e1 100644 --- a/pkg/test/integration/common/helpers/handling_files.go +++ b/pkg/test/integration/common/helpers/handling_files.go @@ -2,8 +2,6 @@ package helpers import ( "context" - "fmt" - "io/ioutil" "log" "os" "path/filepath" @@ -26,7 +24,7 @@ import ( // ParseK8sYaml reads a yaml file and parses it based on the scheme func ParseK8sYaml(filepath string) ([]runtime.Object, []*schema.GroupVersionKind, error) { - fileR, err := ioutil.ReadFile(filepath) + fileR, err := os.ReadFile(filepath) if err != nil { return nil, nil, err } @@ -50,7 +48,7 @@ func ParseK8sYaml(filepath string) ([]runtime.Object, []*schema.GroupVersionKind decode := apiextensionsscheme.Codecs.UniversalDeserializer().Decode obj, groupVersionKind, err := decode([]byte(f), nil, nil) if err != nil { - log.Println(fmt.Sprintf("Error while decoding YAML object. Err was: %s", err)) + log.Printf("Error while decoding YAML object. Err was: %s", err) retErr = err continue } @@ -66,7 +64,7 @@ func ParseK8sYaml(filepath string) ([]runtime.Object, []*schema.GroupVersionKind decode := scheme.Codecs.UniversalDeserializer().Decode obj, groupVersionKind, err := decode([]byte(f), nil, nil) if err != nil { - log.Println(fmt.Sprintf("Error while decoding YAML object. Err was: %s", err)) + log.Printf("Error while decoding YAML object. Err was: %s", err) retErr = err continue } @@ -77,7 +75,7 @@ func ParseK8sYaml(filepath string) ([]runtime.Object, []*schema.GroupVersionKind decode := mcmscheme.Codecs.UniversalDeserializer().Decode obj, groupVersionKind, err := decode([]byte(f), nil, nil) if err != nil { - log.Println(fmt.Sprintf("Error while decoding YAML object. Err was: %s", err)) + log.Printf("Error while decoding YAML object. Err was: %s", err) retErr = err continue } diff --git a/pkg/test/integration/common/helpers/nodes.go b/pkg/test/integration/common/helpers/nodes.go index a4f2928af..10345118f 100644 --- a/pkg/test/integration/common/helpers/nodes.go +++ b/pkg/test/integration/common/helpers/nodes.go @@ -7,18 +7,18 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -//ProbeNodes tries to probe for nodes. +// ProbeNodes tries to probe for nodes. func (c *Cluster) ProbeNodes() error { _, err := c.Clientset.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) return err } -//getNodes tries to retrieve the list of node objects in the cluster. +// getNodes tries to retrieve the list of node objects in the cluster. func (c *Cluster) getNodes() (*v1.NodeList, error) { return c.Clientset.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) } -//GetNumberOfReadyNodes tries to retrieve the list of node objects in the cluster. +// GetNumberOfReadyNodes tries to retrieve the list of node objects in the cluster. func (c *Cluster) GetNumberOfReadyNodes() int16 { nodes, _ := c.getNodes() count := 0 @@ -32,7 +32,7 @@ func (c *Cluster) GetNumberOfReadyNodes() int16 { return int16(count) } -//GetNumberOfNodes tries to retrieve the list of node objects in the cluster. +// GetNumberOfNodes tries to retrieve the list of node objects in the cluster. func (c *Cluster) GetNumberOfNodes() int16 { nodes, _ := c.getNodes() return int16(len(nodes.Items)) diff --git a/pkg/test/integration/common/helpers/resources_tracker_interface.go b/pkg/test/integration/common/helpers/resources_tracker_interface.go index 5b588d487..d8722f04a 100644 --- a/pkg/test/integration/common/helpers/resources_tracker_interface.go +++ b/pkg/test/integration/common/helpers/resources_tracker_interface.go @@ -4,9 +4,9 @@ import ( v1alpha1 "github.com/gardener/machine-controller-manager/pkg/apis/machine/v1alpha1" ) -//ResourcesTrackerInterface provides an interface to check for orphan resources. -//The implementation should handle probing for resources while contructing or calling New method -//And reporting orphan resources whenever IsOrphanedResourcesAvailable is invoked +// ResourcesTrackerInterface provides an interface to check for orphan resources. +// The implementation should handle probing for resources while contructing or calling New method +// And reporting orphan resources whenever IsOrphanedResourcesAvailable is invoked type ResourcesTrackerInterface interface { IsOrphanedResourcesAvailable() bool InitializeResourcesTracker( diff --git a/pkg/util/annotations/annotations_test.go b/pkg/util/annotations/annotations_test.go index 1a3bd1199..635aaa742 100644 --- a/pkg/util/annotations/annotations_test.go +++ b/pkg/util/annotations/annotations_test.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/util/backoff/backoff_suite_test.go b/pkg/util/backoff/backoff_suite_test.go index eb7b980a3..abe390329 100644 --- a/pkg/util/backoff/backoff_suite_test.go +++ b/pkg/util/backoff/backoff_suite_test.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/util/backoff/wait_test.go b/pkg/util/backoff/wait_test.go index 1c2f07f70..a380f02ea 100644 --- a/pkg/util/backoff/wait_test.go +++ b/pkg/util/backoff/wait_test.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/util/configz/configz_test.go b/pkg/util/configz/configz_test.go index 3da690bcf..69294db17 100644 --- a/pkg/util/configz/configz_test.go +++ b/pkg/util/configz/configz_test.go @@ -21,7 +21,7 @@ https://github.com/kubernetes/kubernetes/blob/release-1.8/pkg/util/configz/confi package configz import ( - "io/ioutil" + "io" "net/http" "net/http/httptest" "testing" @@ -43,7 +43,7 @@ func TestConfigz(t *testing.T) { t.Fatalf("err: %v", err) } - body, err := ioutil.ReadAll(resp.Body) + body, err := io.ReadAll(resp.Body) if err != nil { t.Fatalf("err: %v", err) } @@ -57,7 +57,7 @@ func TestConfigz(t *testing.T) { t.Fatalf("err: %v", err) } - body, err = ioutil.ReadAll(resp.Body) + body, err = io.ReadAll(resp.Body) if err != nil { t.Fatalf("err: %v", err) } @@ -71,7 +71,7 @@ func TestConfigz(t *testing.T) { t.Fatalf("err: %v", err) } - body, err = ioutil.ReadAll(resp.Body) + body, err = io.ReadAll(resp.Body) if err != nil { t.Fatalf("err: %v", err) } diff --git a/pkg/util/permits/permits.go b/pkg/util/permits/permits.go index d1b353a9e..f45e7ae01 100644 --- a/pkg/util/permits/permits.go +++ b/pkg/util/permits/permits.go @@ -19,6 +19,7 @@ limitations under the License. package permits import ( + "context" "sync" "time" @@ -40,8 +41,8 @@ type permit struct { } // NewPermitGiver returns a new PermitGiver -func NewPermitGiver(stalePermitKeyTimeout time.Duration, janitorFrequency time.Duration) PermitGiver { - stopC := make(chan struct{}) +func NewPermitGiver(ctx context.Context, stalePermitKeyTimeout time.Duration, janitorFrequency time.Duration) PermitGiver { + stopC := make(chan bool) pg := permitGiver{ keyPermitsMap: sync.Map{}, stopC: stopC, @@ -52,6 +53,10 @@ func NewPermitGiver(stalePermitKeyTimeout time.Duration, janitorFrequency time.D for { select { case <-stopC: + klog.Info("Janitor stopped") + return + case <-ctx.Done(): + klog.Info("Janitor cancelled") return case <-ticker.C: pg.cleanupStalePermitEntries(stalePermitKeyTimeout) @@ -63,7 +68,7 @@ func NewPermitGiver(stalePermitKeyTimeout time.Duration, janitorFrequency time.D type permitGiver struct { keyPermitsMap sync.Map - stopC chan struct{} + stopC chan bool } func (pg *permitGiver) RegisterPermits(key string, numPermits int) { @@ -179,7 +184,9 @@ func (pg *permitGiver) cleanupStalePermitEntries(stalePermitKeyTimeout time.Dura } func (pg *permitGiver) Close() { - close(pg.stopC) + if !pg.isClosed() { + close(pg.stopC) + } // close all permit channels pg.keyPermitsMap.Range(func(key, value interface{}) bool { p := value.(permit) diff --git a/pkg/util/permits/permits_suite_test.go b/pkg/util/permits/permits_suite_test.go index ac27bc57f..3a53facc3 100644 --- a/pkg/util/permits/permits_suite_test.go +++ b/pkg/util/permits/permits_suite_test.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/util/permits/permits_test.go b/pkg/util/permits/permits_test.go index e939cb6c4..958f0138c 100644 --- a/pkg/util/permits/permits_test.go +++ b/pkg/util/permits/permits_test.go @@ -17,6 +17,7 @@ limitations under the License. package permits import ( + "context" "time" . "github.com/onsi/ginkgo" @@ -34,7 +35,7 @@ var _ = Describe("permit", func() { Describe("#RegisterPermits", func() { BeforeEach(func() { - pg = NewPermitGiver(5*time.Second, 1*time.Second).(*permitGiver) + pg = NewPermitGiver(context.TODO(), 5*time.Second, 1*time.Second).(*permitGiver) }) AfterEach(func() { @@ -70,7 +71,7 @@ var _ = Describe("permit", func() { Describe("#DeletePermits", func() { BeforeEach(func() { - pg = NewPermitGiver(5*time.Second, 1*time.Second).(*permitGiver) + pg = NewPermitGiver(context.TODO(), 5*time.Second, 1*time.Second).(*permitGiver) pg.RegisterPermits(key1, 1) }) AfterEach(func() { @@ -98,7 +99,7 @@ var _ = Describe("permit", func() { Describe("#TryPermit", func() { BeforeEach(func() { - pg = NewPermitGiver(5*time.Second, 1*time.Second).(*permitGiver) + pg = NewPermitGiver(context.TODO(), 5*time.Second, 1*time.Second).(*permitGiver) pg.RegisterPermits(key1, 1) }) It("should return false if permitGiver is closed", func() { @@ -117,7 +118,7 @@ var _ = Describe("permit", func() { Describe("#Release Permit", func() { BeforeEach(func() { - pg = NewPermitGiver(5*time.Second, 1*time.Second).(*permitGiver) + pg = NewPermitGiver(context.TODO(), 5*time.Second, 1*time.Second).(*permitGiver) pg.RegisterPermits(key1, 1) pg.TryPermit(key1, 5*time.Second) }) @@ -139,7 +140,7 @@ var _ = Describe("permit", func() { Describe("#isClose", func() { BeforeEach(func() { - pg = NewPermitGiver(5*time.Second, 1*time.Second).(*permitGiver) + pg = NewPermitGiver(context.TODO(), 5*time.Second, 1*time.Second).(*permitGiver) }) It("return true if closed", func() { pg.Close() @@ -153,7 +154,7 @@ var _ = Describe("permit", func() { Describe("#Close", func() { BeforeEach(func() { - pg = NewPermitGiver(5*time.Second, 1*time.Second).(*permitGiver) + pg = NewPermitGiver(context.TODO(), 5*time.Second, 1*time.Second).(*permitGiver) pg.RegisterPermits(key1, 1) }) It("closed the PermitGiver", func() { @@ -165,7 +166,7 @@ var _ = Describe("permit", func() { Describe("#NewPermitGiver", func() { BeforeEach(func() { - pg = NewPermitGiver(5*time.Second, 1*time.Second).(*permitGiver) + pg = NewPermitGiver(context.TODO(), 5*time.Second, 1*time.Second).(*permitGiver) pg.RegisterPermits(key1, 1) }) AfterEach(func() { diff --git a/pkg/util/provider/app/app.go b/pkg/util/provider/app/app.go index e87bd35c8..746e9ccb6 100644 --- a/pkg/util/provider/app/app.go +++ b/pkg/util/provider/app/app.go @@ -30,6 +30,7 @@ import ( "os" goruntime "runtime" "strconv" + "sync" "time" "github.com/Masterminds/semver" @@ -74,7 +75,7 @@ var ( ) // Run runs the MCServer. This should never exit. -func Run(s *options.MCServer, driver driver.Driver) error { +func Run(ctx context.Context, s *options.MCServer, driver driver.Driver) error { // To help debugging, immediately log version klog.V(4).Infof("Version: %+v", version.Get()) if err := s.Validate(); err != nil { @@ -121,12 +122,13 @@ func Run(s *options.MCServer, driver driver.Driver) error { leaderElectionClient := kubernetes.NewForConfigOrDie(rest.AddUserAgent(controlkubeconfig, "machine-leader-election")) klog.V(4).Info("Starting http server and mux") - go startHTTP(s) + go startHTTP(ctx, s) recorder := createRecorder(kubeClientControl) - run := func(ctx context.Context) { - var stop <-chan struct{} + waitGroup := sync.WaitGroup{} + waitGroup.Add(1) + startControllers := func(ctx context.Context) { // Control plane client used to interact with machine APIs controlMachineClientBuilder := machineclientbuilder.SimpleClientBuilder{ ClientConfig: controlkubeconfig, @@ -140,7 +142,8 @@ func Run(s *options.MCServer, driver driver.Driver) error { ClientConfig: targetkubeconfig, } - err := StartControllers( + if err := StartControllers( + ctx, s, controlkubeconfig, targetkubeconfig, @@ -149,17 +152,14 @@ func Run(s *options.MCServer, driver driver.Driver) error { targetCoreClientBuilder, driver, recorder, - stop, - ) - - klog.Fatalf("error running controllers: %v", err) - panic("unreachable") - + ); err != nil { + klog.Fatalf("failed to start controllers: %v", err) + } + waitGroup.Done() } if !s.LeaderElection.LeaderElect { - run(nil) - panic("unreachable") + startControllers(ctx) } id, err := os.Hostname() @@ -182,24 +182,27 @@ func Run(s *options.MCServer, driver driver.Driver) error { klog.Fatalf("error creating lock: %v", err) } - ctx := context.TODO() leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ Lock: rl, LeaseDuration: s.LeaderElection.LeaseDuration.Duration, RenewDeadline: s.LeaderElection.RenewDeadline.Duration, RetryPeriod: s.LeaderElection.RetryPeriod.Duration, Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: run, + OnStartedLeading: startControllers, OnStoppedLeading: func() { - klog.Fatalf("leaderelection lost") + klog.Errorf("leaderelection lost") + waitGroup.Wait() }, }, }) - panic("unreachable") + + return nil } // StartControllers starts all the controllers which are a part of machine-controller -func StartControllers(s *options.MCServer, +func StartControllers( + ctx context.Context, + s *options.MCServer, controlCoreKubeconfig *rest.Config, targetCoreKubeconfig *rest.Config, controlMachineClientBuilder machineclientbuilder.ClientBuilder, @@ -207,7 +210,7 @@ func StartControllers(s *options.MCServer, targetCoreClientBuilder coreclientbuilder.ClientBuilder, driver driver.Driver, recorder record.EventRecorder, - stop <-chan struct{}) error { +) error { klog.V(5).Info("Getting available resources") availableResources, err := getAvailableResources(controlCoreClientBuilder) @@ -241,18 +244,16 @@ func StartControllers(s *options.MCServer, if availableResources[machineGVR] { klog.V(5).Infof("Creating shared informers; resync interval: %v", s.MinResyncPeriod) - controlMachineInformerFactory := machineinformers.NewFilteredSharedInformerFactory( + controlMachineInformerFactory := machineinformers.NewSharedInformerFactoryWithOptions( controlMachineClientBuilder.ClientOrDie("control-machine-shared-informers"), s.MinResyncPeriod.Duration, - s.Namespace, - nil, + machineinformers.WithNamespace(s.Namespace), ) - controlCoreInformerFactory := coreinformers.NewFilteredSharedInformerFactory( + controlCoreInformerFactory := coreinformers.NewSharedInformerFactoryWithOptions( controlCoreClientBuilder.ClientOrDie("control-core-shared-informers"), s.MinResyncPeriod.Duration, - s.Namespace, - nil, + coreinformers.WithNamespace(s.Namespace), ) targetCoreInformerFactory := coreinformers.NewSharedInformerFactory( @@ -276,6 +277,7 @@ func StartControllers(s *options.MCServer, klog.V(5).Infof("Creating controllers...") machineController, err := machinecontroller.NewController( + ctx, s.Namespace, controlMachineClient, controlCoreClient, @@ -301,18 +303,23 @@ func StartControllers(s *options.MCServer, } klog.V(1).Info("Starting shared informers") - controlMachineInformerFactory.Start(stop) - controlCoreInformerFactory.Start(stop) - targetCoreInformerFactory.Start(stop) + controlMachineInformerFactory.Start(ctx.Done()) + controlCoreInformerFactory.Start(ctx.Done()) + targetCoreInformerFactory.Start(ctx.Done()) klog.V(5).Info("Running controller") - go machineController.Run(int(s.ConcurrentNodeSyncs), stop) - + var waitGroup sync.WaitGroup + waitGroup.Add(1) + go func() { + machineController.Run(ctx, int(s.ConcurrentNodeSyncs)) + waitGroup.Done() + }() + waitGroup.Wait() } else { return fmt.Errorf("unable to start machine controller: API GroupVersion %q is not available; \nFound: %#v", machineGVR, availableResources) } - select {} + return nil } // TODO: In general, any controller checking this needs to be dynamic so @@ -347,22 +354,22 @@ func getAvailableResources(clientBuilder coreclientbuilder.ClientBuilder) (map[s return nil, fmt.Errorf("failed to get api versions from server: %v: %v", healthzContent, err) } - resourceMap, err := discoveryClient.ServerResources() + _, resources, err := discoveryClient.ServerGroupsAndResources() if err != nil { utilruntime.HandleError(fmt.Errorf("unable to get all supported resources from server: %v", err)) } - if len(resourceMap) == 0 { + if len(resources) == 0 { return nil, fmt.Errorf("unable to get any supported resources from server") } allResources := map[schema.GroupVersionResource]bool{} - for _, apiResourceList := range resourceMap { - version, err := schema.ParseGroupVersion(apiResourceList.GroupVersion) + for _, apiResourceList := range resources { + gv, err := schema.ParseGroupVersion(apiResourceList.GroupVersion) if err != nil { return nil, err } for _, apiResource := range apiResourceList.APIResources { - allResources[version.WithResource(apiResource.Name)] = true + allResources[gv.WithResource(apiResource.Name)] = true } } @@ -377,7 +384,7 @@ func createRecorder(kubeClient *kubernetes.Clientset) record.EventRecorder { return eventBroadcaster.NewRecorder(kubescheme.Scheme, v1.EventSource{Component: controllerManagerAgentName}) } -func startHTTP(s *options.MCServer) { +func startHTTP(ctx context.Context, s *options.MCServer) { mux := http.NewServeMux() if s.EnableProfiling { mux.HandleFunc("/debug/pprof/", pprof.Index) @@ -403,5 +410,10 @@ func startHTTP(s *options.MCServer) { Addr: net.JoinHostPort(s.Address, strconv.Itoa(int(s.Port))), Handler: mux, } - klog.Fatal(server.ListenAndServe()) + + if err := server.ListenAndServe(); err != http.ErrServerClosed { + klog.Fatal(server.ListenAndServe()) + } + + defer server.Shutdown(ctx) } diff --git a/pkg/util/provider/drain/drain.go b/pkg/util/provider/drain/drain.go index 02e6315dc..9f42426dd 100644 --- a/pkg/util/provider/drain/drain.go +++ b/pkg/util/provider/drain/drain.go @@ -568,7 +568,7 @@ func filterSharedPVs(pvMap map[string][]string) { for pod, vols := range pvMap { volList := []string{} for _, vol := range vols { - if sharedVol[vol] == false { + if !sharedVol[vol] { volList = append(volList, vol) } } diff --git a/pkg/util/provider/drain/volume_attachment.go b/pkg/util/provider/drain/volume_attachment.go index f3dcc24ec..e6e6ed225 100644 --- a/pkg/util/provider/drain/volume_attachment.go +++ b/pkg/util/provider/drain/volume_attachment.go @@ -49,16 +49,18 @@ func (v *VolumeAttachmentHandler) dispatch(obj interface{}) { volumeAttachment := obj.(*storagev1.VolumeAttachment) if volumeAttachment == nil { klog.Errorf("Couldn't convert to volumeAttachment from object %v", obj) + // return here to avoid nil pointer dereference + return } - klog.V(4).Infof("Dispatching request for PV %s", *volumeAttachment.Spec.Source.PersistentVolumeName) - defer klog.V(4).Infof("Done dispatching request for PV %s", *volumeAttachment.Spec.Source.PersistentVolumeName) + klog.V(4).Infof("Dispatching request for PV %s", volumeAttachment.Spec.Source.PersistentVolumeName) + defer klog.V(4).Infof("Done dispatching request for PV %s", volumeAttachment.Spec.Source.PersistentVolumeName) v.Lock() defer v.Unlock() for i, worker := range v.workers { - klog.V(4).Infof("Dispatching request for PV %s to worker %d/%v", *volumeAttachment.Spec.Source.PersistentVolumeName, i, worker) + klog.V(4).Infof("Dispatching request for PV %s to worker %d/%v", volumeAttachment.Spec.Source.PersistentVolumeName, i, worker) select { case worker <- volumeAttachment: diff --git a/pkg/util/provider/driver/fake.go b/pkg/util/provider/driver/fake.go index 83b123434..f341e41cc 100644 --- a/pkg/util/provider/driver/fake.go +++ b/pkg/util/provider/driver/fake.go @@ -19,8 +19,6 @@ package driver import ( "context" - "fmt" - "github.com/gardener/machine-controller-manager/pkg/util/provider/machinecodes/codes" "github.com/gardener/machine-controller-manager/pkg/util/provider/machinecodes/status" ) @@ -87,7 +85,7 @@ func (d *FakeDriver) DeleteMachine(ctx context.Context, deleteMachineRequest *De func (d *FakeDriver) GetMachineStatus(ctx context.Context, getMachineStatusRequest *GetMachineStatusRequest) (*GetMachineStatusResponse, error) { switch { case !d.VMExists: - errMessage := fmt.Sprintf("Fake plugin is returning no VM instances backing this machine object") + errMessage := "Fake plugin is returning no VM instances backing this machine object" return nil, status.Error(codes.NotFound, errMessage) case d.Err != nil: return nil, d.Err diff --git a/pkg/util/provider/machinecontroller/controller.go b/pkg/util/provider/machinecontroller/controller.go index ef7182b9f..1adb94434 100644 --- a/pkg/util/provider/machinecontroller/controller.go +++ b/pkg/util/provider/machinecontroller/controller.go @@ -18,6 +18,7 @@ limitations under the License. package controller import ( + "context" "fmt" "sync" "time" @@ -74,27 +75,7 @@ const ( ) // NewController returns a new Node controller. -func NewController( - namespace string, - controlMachineClient machineapi.MachineV1alpha1Interface, - controlCoreClient kubernetes.Interface, - targetCoreClient kubernetes.Interface, - driver driver.Driver, - pvcInformer coreinformers.PersistentVolumeClaimInformer, - pvInformer coreinformers.PersistentVolumeInformer, - secretInformer coreinformers.SecretInformer, - nodeInformer coreinformers.NodeInformer, - pdbV1beta1Informer policyv1beta1informers.PodDisruptionBudgetInformer, - pdbV1Informer policyv1informers.PodDisruptionBudgetInformer, - volumeAttachmentInformer storageinformers.VolumeAttachmentInformer, - machineClassInformer machineinformers.MachineClassInformer, - machineInformer machineinformers.MachineInformer, - recorder record.EventRecorder, - safetyOptions options.SafetyOptions, - nodeConditions string, - bootstrapTokenAuthExtraGroups string, - targetKubernetesVersion *semver.Version, -) (Controller, error) { +func NewController(ctx context.Context, namespace string, controlMachineClient machineapi.MachineV1alpha1Interface, controlCoreClient kubernetes.Interface, targetCoreClient kubernetes.Interface, driver driver.Driver, pvcInformer coreinformers.PersistentVolumeClaimInformer, pvInformer coreinformers.PersistentVolumeInformer, secretInformer coreinformers.SecretInformer, nodeInformer coreinformers.NodeInformer, pdbV1beta1Informer policyv1beta1informers.PodDisruptionBudgetInformer, pdbV1Informer policyv1informers.PodDisruptionBudgetInformer, volumeAttachmentInformer storageinformers.VolumeAttachmentInformer, machineClassInformer machineinformers.MachineClassInformer, machineInformer machineinformers.MachineInformer, recorder record.EventRecorder, safetyOptions options.SafetyOptions, nodeConditions string, bootstrapTokenAuthExtraGroups string, targetKubernetesVersion *semver.Version) (Controller, error) { const ( // volumeAttachmentGroupName group name volumeAttachmentGroupName = "storage.k8s.io" @@ -126,7 +107,7 @@ func NewController( driver: driver, bootstrapTokenAuthExtraGroups: bootstrapTokenAuthExtraGroups, volumeAttachmentHandler: nil, - permitGiver: permits.NewPermitGiver(permitGiverStaleEntryTimeout, janitorFreq), + permitGiver: permits.NewPermitGiver(ctx, permitGiverStaleEntryTimeout, janitorFreq), targetKubernetesVersion: targetKubernetesVersion, } @@ -143,6 +124,7 @@ func NewController( eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(klog.Infof) eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: typedcorev1.New(controlCoreClient.CoreV1().RESTClient()).Events(namespace)}) + defer eventBroadcaster.Shutdown() // Controller listers controller.pvcLister = pvcInformer.Lister() @@ -245,7 +227,7 @@ type Controller interface { // Run runs the controller until the given stop channel can be read from. // workers specifies the number of goroutines, per resource, processing work // from the resource workqueues - Run(workers int, stopCh <-chan struct{}) + Run(ctx context.Context, workers int) } // controller is a concrete Controller. @@ -299,28 +281,20 @@ type controller struct { machineSynced cache.InformerSynced } -func (c *controller) Run(workers int, stopCh <-chan struct{}) { - +func (c *controller) Run(ctx context.Context, workers int) { var ( waitGroup sync.WaitGroup ) - defer runtimeutil.HandleCrash() - defer c.permitGiver.Close() - defer c.nodeQueue.ShutDown() - defer c.secretQueue.ShutDown() - defer c.machineClassQueue.ShutDown() - defer c.machineQueue.ShutDown() - defer c.machineSafetyOrphanVMsQueue.ShutDown() - defer c.machineSafetyAPIServerQueue.ShutDown() + defer c.shutDown() if k8sutils.ConstraintK8sGreaterEqual121.Check(c.targetKubernetesVersion) { - if !cache.WaitForCacheSync(stopCh, c.secretSynced, c.pvcSynced, c.pvSynced, c.pdbV1Synced, c.volumeAttachementSynced, c.nodeSynced, c.machineClassSynced, c.machineSynced) { + if !cache.WaitForCacheSync(ctx.Done(), c.secretSynced, c.pvcSynced, c.pvSynced, c.pdbV1Synced, c.volumeAttachementSynced, c.nodeSynced, c.machineClassSynced, c.machineSynced) { runtimeutil.HandleError(fmt.Errorf("Timed out waiting for caches to sync")) return } } else { - if !cache.WaitForCacheSync(stopCh, c.secretSynced, c.pvcSynced, c.pvSynced, c.pdbV1beta1Synced, c.volumeAttachementSynced, c.nodeSynced, c.machineClassSynced, c.machineSynced) { + if !cache.WaitForCacheSync(ctx.Done(), c.secretSynced, c.pvcSynced, c.pvSynced, c.pdbV1beta1Synced, c.volumeAttachementSynced, c.nodeSynced, c.machineClassSynced, c.machineSynced) { runtimeutil.HandleError(fmt.Errorf("Timed out waiting for caches to sync")) return } @@ -335,28 +309,38 @@ func (c *controller) Run(workers int, stopCh <-chan struct{}) { prometheus.MustRegister(c) for i := 0; i < workers; i++ { - createWorker(c.secretQueue, "ClusterSecret", maxRetries, true, c.reconcileClusterSecretKey, stopCh, &waitGroup) - createWorker(c.machineClassQueue, "ClusterMachineClass", maxRetries, true, c.reconcileClusterMachineClassKey, stopCh, &waitGroup) - createWorker(c.nodeQueue, "ClusterNode", maxRetries, true, c.reconcileClusterNodeKey, stopCh, &waitGroup) - createWorker(c.machineQueue, "ClusterMachine", maxRetries, true, c.reconcileClusterMachineKey, stopCh, &waitGroup) - createWorker(c.machineSafetyOrphanVMsQueue, "ClusterMachineSafetyOrphanVMs", maxRetries, true, c.reconcileClusterMachineSafetyOrphanVMs, stopCh, &waitGroup) - createWorker(c.machineSafetyAPIServerQueue, "ClusterMachineAPIServer", maxRetries, true, c.reconcileClusterMachineSafetyAPIServer, stopCh, &waitGroup) + createWorker(ctx, c.secretQueue, "ClusterSecret", maxRetries, true, c.reconcileClusterSecretKey, &waitGroup) + createWorker(ctx, c.machineClassQueue, "ClusterMachineClass", maxRetries, true, c.reconcileClusterMachineClassKey, &waitGroup) + createWorker(ctx, c.nodeQueue, "ClusterNode", maxRetries, true, c.reconcileClusterNodeKey, &waitGroup) + createWorker(ctx, c.machineQueue, "ClusterMachine", maxRetries, true, c.reconcileClusterMachineKey, &waitGroup) + createWorker(ctx, c.machineSafetyOrphanVMsQueue, "ClusterMachineSafetyOrphanVMs", maxRetries, true, c.reconcileClusterMachineSafetyOrphanVMs, &waitGroup) + createWorker(ctx, c.machineSafetyAPIServerQueue, "ClusterMachineAPIServer", maxRetries, true, c.reconcileClusterMachineSafetyAPIServer, &waitGroup) } - <-stopCh + waitGroup.Wait() + c.shutDown() + klog.V(1).Info("Shutting down Machine Controller Manager ") handlers.UpdateHealth(false) +} - waitGroup.Wait() +func (c *controller) shutDown() { + c.permitGiver.Close() + c.nodeQueue.ShutDown() + c.secretQueue.ShutDown() + c.machineClassQueue.ShutDown() + c.machineQueue.ShutDown() + c.machineSafetyOrphanVMsQueue.ShutDown() + c.machineSafetyAPIServerQueue.ShutDown() } // createWorker creates and runs a worker thread that just processes items in the // specified queue. The worker will run until stopCh is closed. The worker will be // added to the wait group when started and marked done when finished. -func createWorker(queue workqueue.RateLimitingInterface, resourceType string, maxRetries int, forgetAfterSuccess bool, reconciler func(key string) error, stopCh <-chan struct{}, waitGroup *sync.WaitGroup) { +func createWorker(ctx context.Context, queue workqueue.RateLimitingInterface, resourceType string, maxRetries int, forgetAfterSuccess bool, reconciler func(ctx context.Context, key string) error, waitGroup *sync.WaitGroup) { waitGroup.Add(1) go func() { - wait.Until(worker(queue, resourceType, maxRetries, forgetAfterSuccess, reconciler), time.Second, stopCh) + wait.UntilWithContext(ctx, worker(queue, resourceType, maxRetries, forgetAfterSuccess, reconciler), time.Second) waitGroup.Done() }() } @@ -366,8 +350,8 @@ func createWorker(queue workqueue.RateLimitingInterface, resourceType string, ma // It enforces that the reconciler is never invoked concurrently with the same key. // If forgetAfterSuccess is true, it will cause the queue to forget the item should reconciliation // have no error. -func worker(queue workqueue.RateLimitingInterface, resourceType string, maxRetries int, forgetAfterSuccess bool, reconciler func(key string) error) func() { - return func() { +func worker(queue workqueue.RateLimitingInterface, resourceType string, maxRetries int, forgetAfterSuccess bool, reconciler func(ctx context.Context, key string) error) func(ctx context.Context) { + return func(ctx context.Context) { exit := false for !exit { exit = func() bool { @@ -377,7 +361,7 @@ func worker(queue workqueue.RateLimitingInterface, resourceType string, maxRetri } defer queue.Done(key) - err := reconciler(key.(string)) + err := reconciler(ctx, key.(string)) if err == nil { if forgetAfterSuccess { queue.Forget(key) diff --git a/pkg/util/provider/machinecontroller/machine.go b/pkg/util/provider/machinecontroller/machine.go index ed6e29ec2..b95845168 100644 --- a/pkg/util/provider/machinecontroller/machine.go +++ b/pkg/util/provider/machinecontroller/machine.go @@ -42,8 +42,8 @@ import ( ) /* - SECTION - Machine controller - Machine add, update, delete watches +SECTION +Machine controller - Machine add, update, delete watches */ func (c *controller) addMachine(obj interface{}) { klog.V(5).Infof("Adding machine object") @@ -85,9 +85,7 @@ func (c *controller) enqueueMachineAfter(obj interface{}, after time.Duration) { } } -func (c *controller) reconcileClusterMachineKey(key string) error { - ctx := context.Background() - +func (c *controller) reconcileClusterMachineKey(ctx context.Context, key string) error { _, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err @@ -188,8 +186,8 @@ func (c *controller) reconcileClusterMachine(ctx context.Context, machine *v1alp } /* - SECTION - Machine controller - nodeToMachine +SECTION +Machine controller - nodeToMachine */ var ( errMultipleMachineMatch = errors.New("Multiple machines matching node") diff --git a/pkg/util/provider/machinecontroller/machine_safety.go b/pkg/util/provider/machinecontroller/machine_safety.go index 1d02a822d..8ac54aa2b 100644 --- a/pkg/util/provider/machinecontroller/machine_safety.go +++ b/pkg/util/provider/machinecontroller/machine_safety.go @@ -47,8 +47,7 @@ const ( ) // reconcileClusterMachineSafetyOrphanVMs checks for any orphan VMs and deletes them -func (c *controller) reconcileClusterMachineSafetyOrphanVMs(key string) error { - ctx := context.Background() +func (c *controller) reconcileClusterMachineSafetyOrphanVMs(ctx context.Context, key string) error { reSyncAfter := c.safetyOptions.MachineSafetyOrphanVMsPeriod.Duration defer c.machineSafetyOrphanVMsQueue.AddAfter("", reSyncAfter) @@ -73,8 +72,7 @@ func (c *controller) reconcileClusterMachineSafetyOrphanVMs(key string) error { // reconcileClusterMachineSafetyAPIServer checks control and target clusters // and checks if their APIServer's are reachable // If they are not reachable, they set a machineControllerFreeze flag -func (c *controller) reconcileClusterMachineSafetyAPIServer(key string) error { - ctx := context.Background() +func (c *controller) reconcileClusterMachineSafetyAPIServer(ctx context.Context, key string) error { statusCheckTimeout := c.safetyOptions.MachineSafetyAPIServerStatusCheckTimeout.Duration statusCheckPeriod := c.safetyOptions.MachineSafetyAPIServerStatusCheckPeriod.Duration diff --git a/pkg/util/provider/machinecontroller/machine_safety_test.go b/pkg/util/provider/machinecontroller/machine_safety_test.go index eccad9197..2a3313b40 100644 --- a/pkg/util/provider/machinecontroller/machine_safety_test.go +++ b/pkg/util/provider/machinecontroller/machine_safety_test.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -193,7 +193,7 @@ var _ = Describe("safety_logic", func() { trackers.TargetCore.SetError("APIServer is Not Reachable") } - c.reconcileClusterMachineSafetyAPIServer("") + c.reconcileClusterMachineSafetyAPIServer(context.TODO(), "") Expect(c.safetyOptions.MachineControllerFrozen).Should(Equal(postMachineControllerFrozen)) }, diff --git a/pkg/util/provider/machinecontroller/machine_test.go b/pkg/util/provider/machinecontroller/machine_test.go index 782fbc5a1..10b9943a2 100644 --- a/pkg/util/provider/machinecontroller/machine_test.go +++ b/pkg/util/provider/machinecontroller/machine_test.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -1515,7 +1515,7 @@ var _ = Describe("machine", func() { LastUpdateTime: metav1.Now(), }, LastOperation: v1alpha1.LastOperation{ - Description: fmt.Sprintf("Skipping drain as nodeName is not a valid one for machine. Initiate VM deletion"), + Description: "Skipping drain as nodeName is not a valid one for machine. Initiate VM deletion", State: v1alpha1.MachineStateProcessing, Type: v1alpha1.MachineOperationDelete, LastUpdateTime: metav1.Now(), diff --git a/pkg/util/provider/machinecontroller/machine_util.go b/pkg/util/provider/machinecontroller/machine_util.go index 92fa47498..3c3cf08b6 100644 --- a/pkg/util/provider/machinecontroller/machine_util.go +++ b/pkg/util/provider/machinecontroller/machine_util.go @@ -66,7 +66,7 @@ const ( ) // TODO: use client library instead when it starts to support update retries -// see https://github.com/kubernetes/kubernetes/issues/21479 +// see https://github.com/kubernetes/kubernetes/issues/21479 type updateMachineFunc func(machine *v1alpha1.Machine) error /* @@ -816,8 +816,8 @@ func (c *controller) deleteMachineFinalizers(ctx context.Context, machine *v1alp } /* - SECTION - Helper Functions +SECTION +Helper Functions */ func (c *controller) isHealthy(machine *v1alpha1.Machine) bool { numOfConditions := len(machine.Status.Conditions) diff --git a/pkg/util/provider/machinecontroller/machine_util_test.go b/pkg/util/provider/machinecontroller/machine_util_test.go index f8e4af29a..9ae385f7a 100644 --- a/pkg/util/provider/machinecontroller/machine_util_test.go +++ b/pkg/util/provider/machinecontroller/machine_util_test.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -2152,7 +2152,7 @@ var _ = Describe("machine_util", func() { c, trackers = createController(stop, testNamespace, controlMachineObjects, nil, targetCoreObjects, nil) defer trackers.Stop() - c.permitGiver = permits.NewPermitGiver(5*time.Second, 1*time.Second) + c.permitGiver = permits.NewPermitGiver(context.TODO(), 5*time.Second, 1*time.Second) defer c.permitGiver.Close() waitForCacheSync(stop, c) @@ -2342,7 +2342,7 @@ var _ = Describe("machine_util", func() { c, trackers = createController(stop, testNamespace, controlMachineObjects, nil, targetCoreObjects, nil) defer trackers.Stop() - c.permitGiver = permits.NewPermitGiver(5*time.Second, 1*time.Second) + c.permitGiver = permits.NewPermitGiver(context.TODO(), 5*time.Second, 1*time.Second) defer c.permitGiver.Close() waitForCacheSync(stop, c) diff --git a/pkg/util/provider/machinecontroller/machineclass.go b/pkg/util/provider/machinecontroller/machineclass.go index 6ba849975..8b0961aa3 100644 --- a/pkg/util/provider/machinecontroller/machineclass.go +++ b/pkg/util/provider/machinecontroller/machineclass.go @@ -107,8 +107,7 @@ func (c *controller) machineClassDelete(obj interface{}) { // reconcileClusterMachineClassKey reconciles an machineClass due to controller resync // or an event on the machineClass. -func (c *controller) reconcileClusterMachineClassKey(key string) error { - ctx := context.Background() +func (c *controller) reconcileClusterMachineClassKey(ctx context.Context, key string) error { _, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err diff --git a/pkg/util/provider/machinecontroller/machineclass_test.go b/pkg/util/provider/machinecontroller/machineclass_test.go index 4eb2fa6e3..3f2d64099 100644 --- a/pkg/util/provider/machinecontroller/machineclass_test.go +++ b/pkg/util/provider/machinecontroller/machineclass_test.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/util/provider/machinecontroller/metrics.go b/pkg/util/provider/machinecontroller/metrics.go index 7dfcf29be..71304bc65 100644 --- a/pkg/util/provider/machinecontroller/metrics.go +++ b/pkg/util/provider/machinecontroller/metrics.go @@ -238,7 +238,7 @@ func (c *controller) CollectMachineSetMetrics(ch chan<- prometheus.Metric) { } */ -// CollectMachines is method to collect Machine related metrics. +// CollectMachineMetrics is method to collect Machine related metrics. func (c *controller) CollectMachineMetrics(ch chan<- prometheus.Metric) { // Collect the count of machines managed by the mcm. machineList, err := c.machineLister.Machines(c.namespace).List(labels.Everything()) diff --git a/pkg/util/provider/machinecontroller/migrate_machineclass_test.go b/pkg/util/provider/machinecontroller/migrate_machineclass_test.go index 85da8653a..b15f8e095 100644 --- a/pkg/util/provider/machinecontroller/migrate_machineclass_test.go +++ b/pkg/util/provider/machinecontroller/migrate_machineclass_test.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/util/provider/machinecontroller/node.go b/pkg/util/provider/machinecontroller/node.go index 77fd14b8e..3cbcfcc1c 100644 --- a/pkg/util/provider/machinecontroller/node.go +++ b/pkg/util/provider/machinecontroller/node.go @@ -18,6 +18,7 @@ limitations under the License. package controller import ( + "context" v1 "k8s.io/api/core/v1" "k8s.io/klog/v2" @@ -48,7 +49,7 @@ func (c *controller) nodeDelete(obj interface{}) { } // Not being used at the moment, saving it for a future use case. -func (c *controller) reconcileClusterNodeKey(key string) error { +func (c *controller) reconcileClusterNodeKey(ctx context.Context, key string) error { node, err := c.nodeLister.Get(key) if apierrors.IsNotFound(err) { return nil diff --git a/pkg/util/provider/machinecontroller/secret.go b/pkg/util/provider/machinecontroller/secret.go index b620ae0a4..e08f7c439 100644 --- a/pkg/util/provider/machinecontroller/secret.go +++ b/pkg/util/provider/machinecontroller/secret.go @@ -34,8 +34,7 @@ import ( // reconcileClusterSecretKey reconciles an secret due to controller resync // or an event on the secret -func (c *controller) reconcileClusterSecretKey(key string) error { - ctx := context.Background() +func (c *controller) reconcileClusterSecretKey(ctx context.Context, key string) error { namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err diff --git a/pkg/util/provider/machinecontroller/secret_test.go b/pkg/util/provider/machinecontroller/secret_test.go index 852ee5f5d..9861adabd 100644 --- a/pkg/util/provider/machinecontroller/secret_test.go +++ b/pkg/util/provider/machinecontroller/secret_test.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, diff --git a/pkg/util/signals/signals.go b/pkg/util/signals/signals.go new file mode 100644 index 000000000..ddfc7e01a --- /dev/null +++ b/pkg/util/signals/signals.go @@ -0,0 +1,33 @@ +package signals + +import ( + "context" + "os" + "os/signal" + "syscall" +) + +var ( + shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM} + onlyOneSignalHandler = make(chan struct{}) +) + +// SetupSignalHandler registers for SIGTERM and SIGINT. A stop channel is returned +// which is closed on one of these signals. If a second signal is caught, the program +// is terminated with exit code 1. +func SetupSignalHandler() context.Context { + close(onlyOneSignalHandler) // panics when called twice + + ctx, cancel := context.WithCancel(context.Background()) + + c := make(chan os.Signal, 2) + signal.Notify(c, shutdownSignals...) + go func() { + <-c + cancel() + <-c + os.Exit(1) // second signal. Exit directly. + }() + + return ctx +} diff --git a/pkg/util/taints/taints.go b/pkg/util/taints/taints.go index 627a9661a..ee553fc36 100644 --- a/pkg/util/taints/taints.go +++ b/pkg/util/taints/taints.go @@ -121,7 +121,7 @@ func ParseTaints(spec []string) ([]v1.Taint, []v1.Taint, error) { uniqueTaints := map[v1.TaintEffect]sets.String{} for _, taintSpec := range spec { - if strings.Index(taintSpec, "=") != -1 && strings.Index(taintSpec, ":") != -1 { + if strings.Contains(taintSpec, "=") && strings.Contains(taintSpec, ":") { newTaint, err := parseTaint(taintSpec) if err != nil { return nil, nil, err @@ -140,7 +140,7 @@ func ParseTaints(spec []string) ([]v1.Taint, []v1.Taint, error) { } else if strings.HasSuffix(taintSpec, "-") { taintKey := taintSpec[:len(taintSpec)-1] var effect v1.TaintEffect - if strings.Index(taintKey, ":") != -1 { + if strings.Contains(taintKey, ":") { parts := strings.Split(taintKey, ":") taintKey = parts[0] effect = v1.TaintEffect(parts[1]) diff --git a/pkg/util/time/time_test.go b/pkg/util/time/time_test.go index d64a69a72..fcd740711 100644 --- a/pkg/util/time/time_test.go +++ b/pkg/util/time/time_test.go @@ -5,7 +5,7 @@ Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, @@ -30,14 +30,11 @@ var _ = Describe("time", func() { timeStamp metav1.Time period time.Duration } - type action struct { - } type expect struct { timeOutOccurred bool } type data struct { setup setup - action action expect expect } DescribeTable("##TimeOut scenarios", diff --git a/vendor/modules.txt b/vendor/modules.txt index d0c368c53..e69c34185 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -554,7 +554,7 @@ google.golang.org/api/option/internaloption google.golang.org/api/transport/cert google.golang.org/api/transport/http google.golang.org/api/transport/http/internal/propagation -# google.golang.org/appengine v1.6.6 +# google.golang.org/appengine v1.6.7 ## explicit; go 1.11 google.golang.org/appengine google.golang.org/appengine/internal