Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue# 306: Added tagging controller #308

Merged
merged 42 commits into from
Mar 30, 2022
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
aeaaf45
Add tagging controller configuration
nguyenkndinh Feb 23, 2022
6c7fbb1
add log
nguyenkndinh Feb 23, 2022
4e40c99
rearrange the controllers
nguyenkndinh Feb 23, 2022
793403b
remove debugging log
nguyenkndinh Feb 24, 2022
92d5743
removed route controller
nguyenkndinh Feb 24, 2022
1e8be5a
added a blank test file for the tagging controller
nguyenkndinh Feb 24, 2022
014b607
remove predefined tag
nguyenkndinh Feb 24, 2022
dc0227c
Refactoring based on recommendations
nguyenkndinh Mar 1, 2022
5821607
Sticking to the naming convention
nguyenkndinh Mar 1, 2022
615f200
address more comments on naming
nguyenkndinh Mar 8, 2022
ac31741
Merge branch 'kubernetes:master' into master
nguyenkndinh Mar 9, 2022
f2d58ab
Using ListNode to get nodes entering and leaving the cluster.
nguyenkndinh Mar 15, 2022
cc464d6
refactor
nguyenkndinh Mar 15, 2022
a3f19e5
Add testing and controller config skeletons
nguyenkndinh Mar 19, 2022
1f12385
Added tagging and flags mechanisms
nguyenkndinh Mar 22, 2022
a580ac2
Disabled the tagging controller by default
nguyenkndinh Mar 22, 2022
966e97f
Updated test structure
nguyenkndinh Mar 23, 2022
e458d6f
Merge branch 'kubernetes:master' into master
nguyenkndinh Mar 23, 2022
5546277
Removed logging statements.
nguyenkndinh Mar 23, 2022
374a18e
Making the tests more robust
nguyenkndinh Mar 23, 2022
332ac7e
Renaming the maps in tagging controller
nguyenkndinh Mar 23, 2022
b5375b7
Refactoring names and remove debugging logs
nguyenkndinh Mar 23, 2022
dd2bd14
Add failure test cases for when EC2 return error
nguyenkndinh Mar 23, 2022
0330b61
adding details for --resources
nguyenkndinh Mar 24, 2022
0c9bd00
add in Copyright message
nguyenkndinh Mar 24, 2022
633768d
Using NodeInformer and Workqueue for tagging resources
nguyenkndinh Mar 25, 2022
2511565
Used workqueue for both tag and untag actions
nguyenkndinh Mar 25, 2022
dda006d
Update docs/tagging_controller.md
nguyenkndinh Mar 25, 2022
22067dc
Renamed fields in the tagging controller to be more user friendly
nguyenkndinh Mar 26, 2022
40f3796
Added in a loop to make sure all messages are processed before shutti…
nguyenkndinh Mar 26, 2022
6247587
Added more logging
nguyenkndinh Mar 26, 2022
4466163
Added more testing
nguyenkndinh Mar 26, 2022
fa80a6a
cosmetic change
nguyenkndinh Mar 28, 2022
2f2f639
use array instead of map for supported resources
nguyenkndinh Mar 28, 2022
df3d102
Reworked the workqueue with workitem
nguyenkndinh Mar 28, 2022
19a931d
Addressed comments
nguyenkndinh Mar 29, 2022
a27a550
addressed verify-lint errors
nguyenkndinh Mar 29, 2022
a17f858
addressed comments and verify-lint
nguyenkndinh Mar 29, 2022
4fdd368
address validate-lint error
nguyenkndinh Mar 29, 2022
2e4b7a8
missed a couple more lint errors
nguyenkndinh Mar 29, 2022
5fb201b
Updated doc to be clearer
nguyenkndinh Mar 29, 2022
781d0e2
Add TODOs for e2e testing and non-retryable workitem
nguyenkndinh Mar 30, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions cmd/aws-cloud-controller-manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ import (
"time"

"k8s.io/apimachinery/pkg/util/wait"
cloudprovider "k8s.io/cloud-provider"
awsv1 "k8s.io/cloud-provider-aws/pkg/providers/v1"
awsv2 "k8s.io/cloud-provider-aws/pkg/providers/v2"
"k8s.io/cloud-provider/app"
"k8s.io/cloud-provider/options"
cliflag "k8s.io/component-base/cli/flag"
Expand All @@ -39,11 +42,9 @@ import (
_ "k8s.io/component-base/metrics/prometheus/version" // for version metric registration
"k8s.io/klog/v2"

cloudprovider "k8s.io/cloud-provider"
awsv1 "k8s.io/cloud-provider-aws/pkg/providers/v1"
awsv2 "k8s.io/cloud-provider-aws/pkg/providers/v2"

cloudcontrollerconfig "k8s.io/cloud-provider/app/config"

awscontrollers "k8s.io/cloud-provider-aws/pkg/controllers"
)

const (
Expand All @@ -61,12 +62,12 @@ func main() {
klog.Fatalf("unable to initialize command options: %v", err)
}

controllerInitializers := app.DefaultInitFuncConstructors
controllerInitializers := awscontrollers.BuildControllerInitializers()
fss := cliflag.NamedFlagSets{}
command := app.NewCloudControllerManagerCommand(opts, cloudInitializer, controllerInitializers, fss, wait.NeverStop)

if err := command.Execute(); err != nil {
os.Exit(1)
klog.Fatalf("unable to execute command: %v", err)
}
}

Expand Down
75 changes: 75 additions & 0 deletions pkg/controllers/aws_controller_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package controllers

import (
"context"
cloudprovider "k8s.io/cloud-provider"
taggingcontroller "k8s.io/cloud-provider-aws/pkg/controllers/tagging"
"k8s.io/cloud-provider/app"
cloudcontrollerconfig "k8s.io/cloud-provider/app/config"
genericcontrollermanager "k8s.io/controller-manager/app"
"k8s.io/controller-manager/controller"
"k8s.io/klog/v2"
)

const (
TaggingControllerClientName = "tagging-controller"
TaggingControllerKey = "tagging"
)

// BuildControllerInitializers is used to add new controllers built in this package to
// the existing list of controllers from cloud-provider
func BuildControllerInitializers() map[string]app.ControllerInitFuncConstructor {
controllerInitializers := app.DefaultInitFuncConstructors

taggingControllerConstructor := app.ControllerInitFuncConstructor{
InitContext: app.ControllerInitContext{
ClientName: TaggingControllerClientName,
},
Constructor: startTaggingControllerWrapper,
}

controllerInitializers[TaggingControllerKey] = taggingControllerConstructor

// TODO: remove the following line to enable the route controller
delete(controllerInitializers, "route")

return controllerInitializers
}

// StartTaggingControllerWrapper is used to take cloud config as input and start the tagging controller
func startTaggingControllerWrapper(initContext app.ControllerInitContext, completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface) app.InitFunc {
return func(ctx context.Context, controllerContext genericcontrollermanager.ControllerContext) (controller.Interface, bool, error) {
return startTaggingController(ctx, initContext, completedConfig, cloud)
}
}

func startTaggingController(ctx context.Context, initContext app.ControllerInitContext, completedConfig *cloudcontrollerconfig.CompletedConfig, cloud cloudprovider.Interface) (controller.Interface, bool, error) {
// Start the TaggingController
taggingcontroller, err := taggingcontroller.NewTaggingController(
completedConfig.SharedInformers.Core().V1().Nodes(),
// cloud node lifecycle controller uses existing cluster role from node-controller
completedConfig.ClientBuilder.ClientOrDie(initContext.ClientName),
cloud,
completedConfig.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration)
if err != nil {
klog.Warningf("failed to start tagging controller: %s", err)
return nil, false, nil
}

go taggingcontroller.Run(ctx)

return nil, true, nil
}
171 changes: 171 additions & 0 deletions pkg/controllers/tagging/tagging_controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package tagging

import (
"context"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
coreinformers "k8s.io/client-go/informers/core/v1"
clientset "k8s.io/client-go/kubernetes"
v1lister "k8s.io/client-go/listers/core/v1"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/klog/v2"
"time"
)

// TaggingController is the controller implementation for tagging cluster resources.
// It periodically check for Node events (creating/deleting) to apply appropriate
// tags to resources.
type TaggingController struct {
kubeClient clientset.Interface
nodeLister v1lister.NodeLister

cloud cloudprovider.Interface

// Value controlling TaggingController monitoring period, i.e. how often does TaggingController
// check node list. This value should be lower than nodeMonitorGracePeriod
// set in controller-manager
nodeMonitorPeriod time.Duration

// A map presenting the node and whether it currently exists
taggedNodes map[string]bool

// A map representing the nodes that were ever in the cluster
nodeMap map[string]*v1.Node
}

// NewTaggingController creates a NewTaggingController object
func NewTaggingController(
nodeInformer coreinformers.NodeInformer,
kubeClient clientset.Interface,
cloud cloudprovider.Interface,
nodeMonitorPeriod time.Duration) (*TaggingController, error) {

tc := &TaggingController{
kubeClient: kubeClient,
nodeLister: nodeInformer.Lister(),
cloud: cloud,
nodeMonitorPeriod: nodeMonitorPeriod,
taggedNodes: make(map[string]bool),
nodeMap: make(map[string]*v1.Node),
}

return tc, nil
}

// Run will start the controller to tag resources attached to a cluster
// and untag resources detached from a cluster.
func (tc *TaggingController) Run(ctx context.Context) {
defer utilruntime.HandleCrash()

wait.UntilWithContext(ctx, tc.monitorNodes, tc.nodeMonitorPeriod)
}

func (tc *TaggingController) monitorNodes(ctx context.Context) {
nodes, err := tc.nodeLister.List(labels.Everything())
if err != nil {
klog.Errorf("error listing nodes from cache: %s", err)
return
}

for k := range tc.taggedNodes {
tc.taggedNodes[k] = false
}

var nodesToTag []*v1.Node
for _, node := range nodes {
if _, ok := tc.taggedNodes[node.GetName()]; !ok {
nodesToTag = append(nodesToTag, node)
}

tc.nodeMap[node.GetName()] = node
tc.taggedNodes[node.GetName()] = true
}
tc.tagNodesResources(nodesToTag)

var nodesToUntag []*v1.Node
for nodeName, existed := range tc.taggedNodes {
if existed == false {
nodesToUntag = append(nodesToUntag, tc.nodeMap[nodeName])
}
}
tc.untagNodeResources(nodesToUntag)

tc.syncDeletedNodesToTaggedNodes()
}

// tagNodesResources tag node resources from a list of node
// If we want to tag more resources, modify this function appropriately
func (tc *TaggingController) tagNodesResources(nodes []*v1.Node) {
for _, node := range nodes {
klog.Infof("Tagging resources for node %s.", node.GetName())
}
}

func (tc *TaggingController) untagNodeResources(nodes []*v1.Node) {
for _, node := range nodes {
klog.Infof("Untagging resources for node %s.", node.GetName())
}
}

// syncDeletedNodes delete (k, v) from taggedNodes
// if it doesn't exist
func (tc *TaggingController) syncDeletedNodesToTaggedNodes() {
for k, v := range tc.taggedNodes {
if v == false {
delete(tc.taggedNodes, k)
}
}
}

//// tagEc2Instances applies the provided tags to each EC2 instances in
//// the cluster.
//func (tc *TaggingController) tagEc2Instances(nodes []*v1.Node) {
// var instanceIds []*string
// for _, node := range nodes {
// instanceId, _ := awsv1.KubernetesInstanceID(node.Spec.ProviderID).MapToAWSInstanceID()
// instanceIds = append(instanceIds, aws.String(string(instanceId)))
// }
//
// tc.tagResources(instanceIds)
//}

//func (tc *TaggingController) tagResources(resourceIds []*string) {
// request := &ec2.CreateTagsInput{
// Resources: resourceIds,
// Tags: tc.getTagsFromInputs(),
// }
//
// _, error := awsv1.awsSdkEC2.CreateTags(request)
// awsv1.Cloud.TagResoures(request)
//
// if error != nil {
// klog.Errorf("Error occurred trying to tag resources, %s", error)
// }
//}
//
//// Sample function demonstrating that we'll get the tag list from user
//func (tc *TaggingController) getTagsFromInputs() []*ec2.Tag {
// var awsTags []*ec2.Tag
// tag := &ec2.Tag{
// Key: aws.String("Sample Key"),
// Value: aws.String("Sample value"),
// }
// awsTags = append(awsTags, tag)
//
// return awsTags
//}
14 changes: 14 additions & 0 deletions pkg/controllers/tagging/tagging_controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package tagging