Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue# 306: Added tagging controller #308

Merged
merged 42 commits into from
Mar 30, 2022
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
aeaaf45
Add tagging controller configuration
nguyenkndinh Feb 23, 2022
6c7fbb1
add log
nguyenkndinh Feb 23, 2022
4e40c99
rearrange the controllers
nguyenkndinh Feb 23, 2022
793403b
remove debugging log
nguyenkndinh Feb 24, 2022
92d5743
removed route controller
nguyenkndinh Feb 24, 2022
1e8be5a
added a blank test file for the tagging controller
nguyenkndinh Feb 24, 2022
014b607
remove predefined tag
nguyenkndinh Feb 24, 2022
dc0227c
Refactoring based on recommendations
nguyenkndinh Mar 1, 2022
5821607
Sticking to the naming convention
nguyenkndinh Mar 1, 2022
615f200
address more comments on naming
nguyenkndinh Mar 8, 2022
ac31741
Merge branch 'kubernetes:master' into master
nguyenkndinh Mar 9, 2022
f2d58ab
Using ListNode to get nodes entering and leaving the cluster.
nguyenkndinh Mar 15, 2022
cc464d6
refactor
nguyenkndinh Mar 15, 2022
a3f19e5
Add testing and controller config skeletons
nguyenkndinh Mar 19, 2022
1f12385
Added tagging and flags mechanisms
nguyenkndinh Mar 22, 2022
a580ac2
Disabled the tagging controller by default
nguyenkndinh Mar 22, 2022
966e97f
Updated test structure
nguyenkndinh Mar 23, 2022
e458d6f
Merge branch 'kubernetes:master' into master
nguyenkndinh Mar 23, 2022
5546277
Removed logging statements.
nguyenkndinh Mar 23, 2022
374a18e
Making the tests more robust
nguyenkndinh Mar 23, 2022
332ac7e
Renaming the maps in tagging controller
nguyenkndinh Mar 23, 2022
b5375b7
Refactoring names and remove debugging logs
nguyenkndinh Mar 23, 2022
dd2bd14
Add failure test cases for when EC2 return error
nguyenkndinh Mar 23, 2022
0330b61
adding details for --resources
nguyenkndinh Mar 24, 2022
0c9bd00
add in Copyright message
nguyenkndinh Mar 24, 2022
633768d
Using NodeInformer and Workqueue for tagging resources
nguyenkndinh Mar 25, 2022
2511565
Used workqueue for both tag and untag actions
nguyenkndinh Mar 25, 2022
dda006d
Update docs/tagging_controller.md
nguyenkndinh Mar 25, 2022
22067dc
Renamed fields in the tagging controller to be more user friendly
nguyenkndinh Mar 26, 2022
40f3796
Added in a loop to make sure all messages are processed before shutti…
nguyenkndinh Mar 26, 2022
6247587
Added more logging
nguyenkndinh Mar 26, 2022
4466163
Added more testing
nguyenkndinh Mar 26, 2022
fa80a6a
cosmetic change
nguyenkndinh Mar 28, 2022
2f2f639
use array instead of map for supported resources
nguyenkndinh Mar 28, 2022
df3d102
Reworked the workqueue with workitem
nguyenkndinh Mar 28, 2022
19a931d
Addressed comments
nguyenkndinh Mar 29, 2022
a27a550
addressed verify-lint errors
nguyenkndinh Mar 29, 2022
a17f858
addressed comments and verify-lint
nguyenkndinh Mar 29, 2022
4fdd368
address validate-lint error
nguyenkndinh Mar 29, 2022
2e4b7a8
missed a couple more lint errors
nguyenkndinh Mar 29, 2022
5fb201b
Updated doc to be clearer
nguyenkndinh Mar 29, 2022
781d0e2
Add TODOs for e2e testing and non-retryable workitem
nguyenkndinh Mar 30, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ docs/book/_book/
site/
.vscode/
e2e.test
.idea/
29 changes: 20 additions & 9 deletions cmd/aws-cloud-controller-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,21 @@ limitations under the License.
package main

import (
"math/rand"
"os"
"time"

"k8s.io/apimachinery/pkg/util/wait"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/cloud-provider-aws/pkg/controllers/tagging"
awsv1 "k8s.io/cloud-provider-aws/pkg/providers/v1"
awsv2 "k8s.io/cloud-provider-aws/pkg/providers/v2"
"k8s.io/cloud-provider/app"
"k8s.io/cloud-provider/options"
cliflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/logs"
_ "k8s.io/component-base/metrics/prometheus/clientgo" // for client metric registration
_ "k8s.io/component-base/metrics/prometheus/version" // for version metric registration
"k8s.io/klog/v2"

cloudprovider "k8s.io/cloud-provider"
awsv1 "k8s.io/cloud-provider-aws/pkg/providers/v1"
awsv2 "k8s.io/cloud-provider-aws/pkg/providers/v2"
"math/rand"
"os"
"time"

cloudcontrollerconfig "k8s.io/cloud-provider/app/config"
)
Expand All @@ -62,11 +61,23 @@ func main() {
}

controllerInitializers := app.DefaultInitFuncConstructors
taggingControllerWrapper := tagging.TaggingControllerWrapper{}
fss := cliflag.NamedFlagSets{}
taggingControllerWrapper.Options.AddFlags(fss.FlagSet("tagging controller"))

taggingControllerConstructor := app.ControllerInitFuncConstructor{
InitContext: app.ControllerInitContext{
ClientName: tagging.TaggingControllerClientName,
},
Constructor: taggingControllerWrapper.StartTaggingControllerWrapper,
}

controllerInitializers[tagging.TaggingControllerKey] = taggingControllerConstructor
app.ControllersDisabledByDefault.Insert(tagging.TaggingControllerKey)
command := app.NewCloudControllerManagerCommand(opts, cloudInitializer, controllerInitializers, fss, wait.NeverStop)

if err := command.Execute(); err != nil {
os.Exit(1)
klog.Fatalf("unable to execute command: %v", err)
}
}

Expand Down
7 changes: 7 additions & 0 deletions docs/tagging_controller.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# The Tagging Controller

The tagging controller is responsible for tagging and untagging node resources when they join and leave the cluster, respectively. It can add and remove tags based on user input. Unlike the existing controllers, the tagging controller works exclusively with AWS. The AWS APIs it uses are `ec2:CreateTags` and `ec2:DeleteTags`.

| Flag | Valid Values | Default | Description |
|------| --- | --- | --- |
| tags | Comma-separated list of key=value | - | A comma-separated list of key-value pairs which will be recorded as nodes' additional tags. For example: "Key1=Val1,Key2=Val2,KeyNoVal1=,KeyNoVal2" |
21 changes: 21 additions & 0 deletions pkg/controllers/options/resources.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package options

const (
Instance string = "instance"
)

var SupportedResources = []string{
Instance,
}
55 changes: 55 additions & 0 deletions pkg/controllers/options/tagging_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package options

import (
"fmt"
"github.com/spf13/pflag"
)

type TaggingControllerOptions struct {
Tags map[string]string
Resources []string
}

func (o *TaggingControllerOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringToStringVar(&o.Tags, "tags", o.Tags, "Tags to apply to AWS resources in the tagging controller.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Give format of tags in the help string.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this done? Not seeing it in the current diff. EDIT I see you added it to the error message, I think its more user friendly to (also) have it in the initial help message, or else the user has to guess and get it wrong in order to get the format.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, sorry, I added it to the error string and forgot to add it here.

fs.StringArrayVar(&o.Resources, "resources", o.Resources, "AWS resources name to add/remove tags in the tagging controller.")
}

func (o *TaggingControllerOptions) Validate() error {
if len(o.Tags) == 0 {
return fmt.Errorf("--tags must not be empty and must be a form of key=value")
}

if len(o.Resources) == 0 {
return fmt.Errorf("--resources must not be empty")
}

for _, r := range o.Resources {
found := false

for _, resource := range SupportedResources {
if r == resource {
found = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: if you're not storing each value that doesn't match the supported resources, then just return the error from inside the if.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Also, for a very slight readability improvement you could use a map or set for supportedResources so that the check is a lookup instead of an inner loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it, will update.
Regarding a map, I was using it in one of the previous commit but was told this is easier to read. (2f2f639)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I guess your reviewers have a difference of opinion then :) I'm thinking the checks if it is in the map don't have to loop over it, but its not important. Up to you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A set would have perfectly solved this but with map, we had to earlier loop through it to print out a valid list of inputs which wasn't very readable in my opinion (and just printing the map as is would not have been pretty). Not something I am super concerned about too. :)

}
}

if !found {
return fmt.Errorf("%s is not a supported resource. Current supported resources %v", r, SupportedResources)
}
}

return nil
}
248 changes: 248 additions & 0 deletions pkg/controllers/tagging/tagging_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package tagging

import (
"fmt"
v1 "k8s.io/api/core/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
cloudprovider "k8s.io/cloud-provider"
opt "k8s.io/cloud-provider-aws/pkg/controllers/options"
awsv1 "k8s.io/cloud-provider-aws/pkg/providers/v1"
"k8s.io/klog/v2"
"time"
)

// workItem contains the node and an action for that node
type workItem struct {
node *v1.Node
action func(node *v1.Node) error
}

// TaggingController is the controller implementation for tagging cluster resources.
// It periodically check for Node events (creating/deleting) to apply appropriate
// tags to resources.
type TaggingController struct {
nodeInformer coreinformers.NodeInformer
kubeClient clientset.Interface
cloud *awsv1.Cloud
workqueue workqueue.RateLimitingInterface
nodesSynced cache.InformerSynced
// Value controlling TaggingController monitoring period, i.e. how often does TaggingController
// check node list. This value should be lower than nodeMonitorGracePeriod
// set in controller-manager
nodeMonitorPeriod time.Duration

// Representing the user input for tags
tags map[string]string

// Representing the resources to tag
resources []string
}

// NewTaggingController creates a NewTaggingController object
func NewTaggingController(
nodeInformer coreinformers.NodeInformer,
kubeClient clientset.Interface,
cloud cloudprovider.Interface,
nodeMonitorPeriod time.Duration,
tags map[string]string,
resources []string) (*TaggingController, error) {

awsCloud, ok := cloud.(*awsv1.Cloud)
if !ok {
err := fmt.Errorf("tagging controller does not support %v provider", cloud.ProviderName())
return nil, err
}

tc := &TaggingController{
nodeInformer: nodeInformer,
kubeClient: kubeClient,
cloud: awsCloud,
nodeMonitorPeriod: nodeMonitorPeriod,
tags: tags,
resources: resources,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Tagging"),
nodesSynced: nodeInformer.Informer().HasSynced,
}

// Use shared informer to listen to add/update/delete of nodes. Note that any nodes
// that exist before tagging controller starts will show up in the update method
tc.nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { tc.enqueueNode(obj, tc.tagNodesResources) },
UpdateFunc: func(oldObj, newObj interface{}) { tc.enqueueNode(newObj, tc.tagNodesResources) },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is going to work. Right now you never remove tags unless the node is deleted, right? How does a user change an existing tag that they previously set?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to tag any nodes that exist before the controller were brought up, similar to the node controller.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah and I just realized you will basically don't delete tags unless nodes are deleted? Probably partially because we can't recognize tags applied by the controller from user-owned tags...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I guess we should document (maybe you already did) that the controller only deletes tags on a node when the node object gets deleted. It doesn't really expect the tags to change. And if the tags do change, we just leave the existing tags alone and apply the new ones?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will add it to the controller md

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nckturner right now the tags are passed as flags to this controller so we don't really expect them to change. But you have a good point - in case we decide to move these to a config map or make them more configurable, we'll probably need to delete the tags as well.

DeleteFunc: func(obj interface{}) { tc.enqueueNode(obj, tc.untagNodeResources) },
})

return tc, nil
}

// Run will start the controller to tag resources attached to the cluster
// and untag resources detached from the cluster.
func (tc *TaggingController) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer tc.workqueue.ShutDown()

// Wait for the caches to be synced before starting workers
klog.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, tc.nodesSynced); !ok {
klog.Errorf("failed to wait for caches to sync")
return
}

klog.Infof("Starting the tagging controller")
go wait.Until(tc.work, tc.nodeMonitorPeriod, stopCh)

<-stopCh
}

// work is a long-running function that continuously
// call process() for each message on the workqueue
func (tc *TaggingController) work() {
for tc.Process() {
}
}

// Process reads each message in the queue and performs either
// tag or untag function on the Node object
func (tc *TaggingController) Process() bool {
obj, shutdown := tc.workqueue.Get()
if shutdown {
return false
}

klog.Infof("Starting to process %v", obj)

err := func(obj interface{}) error {
defer tc.workqueue.Done(obj)

workItem, ok := obj.(*workItem)
if !ok {
tc.workqueue.Forget(obj)
utilruntime.HandleError(fmt.Errorf("expected workItem in workqueue but got %#v", obj))
return nil
}

err := workItem.action(workItem.node)
if err != nil {
// Put the item back on the workqueue to handle any transient errors.
tc.workqueue.AddRateLimited(workItem)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this happen on non-retryable errors too? Like if the tag is never going to succeed are we going to keep adding it to the workqueue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes it does. Currently, we do not handle specific errors from EC2 but simply retrying everything with the assumption that no non-retryable errors expected here. Do you have any recommendation for the specific errors that we should look out for that could happen?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to return nil from workItem.action for any non-retryable errors... and I'm not sure what possibilities there are... incorrectly formatted tag?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should log the error but we don't want to keep spamming the tag API, if possible.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

okay, I can try looking into https://docs.aws.amazon.com/AWSEC2/latest/APIReference/errors-overview.html#CommonErrors and see what errors are non-retryable in this case.

return fmt.Errorf("error finishing work item '%v': %s, requeuing", workItem, err.Error())
}

tc.workqueue.Forget(obj)
klog.Infof("Finished processing %v", workItem)
return nil
}(obj)

if err != nil {
klog.Errorf("Error occurred while processing %v", obj)
utilruntime.HandleError(err)
}

return true
}

// tagNodesResources tag node resources from a list of nodes
// If we want to tag more resources, modify this function appropriately
func (tc *TaggingController) tagNodesResources(node *v1.Node) error {
for _, resource := range tc.resources {
switch resource {
case opt.Instance:
err := tc.tagEc2Instance(node)
if err != nil {
return err
}
}
}

return nil
}

// tagEc2Instances applies the provided tags to each EC2 instance in
// the cluster.
func (tc *TaggingController) tagEc2Instance(node *v1.Node) error {
instanceId, err := awsv1.KubernetesInstanceID(node.Spec.ProviderID).MapToAWSInstanceID()

if err != nil {
klog.Errorf("Error in getting instanceID for node %s, error: %v", node.GetName(), err)
return err
} else {
err := tc.cloud.TagResource(string(instanceId), tc.tags)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are retries and throttling type errors handled somewhere? I see that retriable errors are handled here so as long as we are using that function under the covers then we are ok. Not sure if we are.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is handled here by simply requeuing the items back to the queue with rate limiting.


if err != nil {
klog.Errorf("Error in tagging EC2 instance for node %s, error: %v", node.GetName(), err)
return err
}
}

klog.Infof("Successfully tagged %s with %v", instanceId, tc.tags)

return nil
}

// untagNodeResources untag node resources from a list of nodes
// If we want to untag more resources, modify this function appropriately
func (tc *TaggingController) untagNodeResources(node *v1.Node) error {
for _, resource := range tc.resources {
switch resource {
case opt.Instance:
err := tc.untagEc2Instance(node)
if err != nil {
return err
}
}
}

return nil
}

// untagEc2Instances deletes the provided tags to each EC2 instances in
// the cluster.
func (tc *TaggingController) untagEc2Instance(node *v1.Node) error {
instanceId, err := awsv1.KubernetesInstanceID(node.Spec.ProviderID).MapToAWSInstanceID()

if err != nil {
klog.Errorf("Error in getting instanceID for node %s, error: %v", node.GetName(), err)
return err
} else {
err := tc.cloud.UntagResource(string(instanceId), tc.tags)

if err != nil {
klog.Errorf("Error in untagging EC2 instance for node %s, error: %v", node.GetName(), err)
return err
}
}

klog.Infof("Successfully untagged %s with %v", instanceId, tc.tags)

return nil
}

// enqueueNode takes in the object and an
// action for the object for a workitem and enqueue to the workqueue
func (tc *TaggingController) enqueueNode(obj interface{}, action func(node *v1.Node) error) {
node := obj.(*v1.Node)
item := &workItem{
node: node,
action: action,
}
tc.workqueue.Add(item)
klog.Infof("Added %s to the workqueue", item)
}
Loading