Skip to content

Commit

Permalink
Support for migration to NLB from CLB (#381)
Browse files Browse the repository at this point in the history
Adding support for migration to NLB from CLB. 
Support this phase by phase.

---------

Signed-off-by: Viraj Kulkarni <[email protected]>
Co-authored-by: Viraj Kulkarni <[email protected]>
  • Loading branch information
virajrk and Viraj Kulkarni authored Feb 11, 2025
1 parent 1a28c55 commit 94daadf
Show file tree
Hide file tree
Showing 12 changed files with 440 additions and 101 deletions.
5 changes: 4 additions & 1 deletion admiral/cmd/admiral/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ func GetRootCmd(args []string) *cobra.Command {
}

if common.IsAdmiralDynamicConfigEnabled() {
go clusters.UpdateASyncAdmiralConfig(remoteRegistry.DynamicConfigDatabaseClient, params.DynamicSyncPeriod)
ctxDynamicConfig, cancel := context.WithCancel(context.Background())
defer cancel()
go clusters.UpdateASyncAdmiralConfig(ctxDynamicConfig, remoteRegistry, params.DynamicSyncPeriod)
}

// This is required for PERF tests only.
Expand Down Expand Up @@ -292,6 +294,7 @@ func GetRootCmd(args []string) *cobra.Command {
rootCmd.PersistentFlags().StringSliceVar(&params.NLBEnabledClusters, "nlb_enabled_clusters", []string{}, "Comma seperated list of enabled clusters to be enabled for NLB")
rootCmd.PersistentFlags().StringSliceVar(&params.NLBEnabledIdentityList, "nlb_enabled_identity_list", []string{}, "Comma seperated list of enabled idenity list to be enabled for NLB")
rootCmd.PersistentFlags().StringSliceVar(&params.CLBEnabledClusters, "clb_enabled_clusters", []string{}, "Comma seperated list of enabled clusters to be enabled for CLB")
rootCmd.PersistentFlags().StringVar(&params.NLBIngressLabel, "nlb_ingress_label", common.NLBIstioIngressGatewayLabelValue, "The value of the `app` label to use to match and find the service that represents the NLB ingress for cross cluster traffic")

return rootCmd
}
Expand Down
91 changes: 86 additions & 5 deletions admiral/pkg/clusters/admiralDatabaseClient.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package clusters

import (
"context"
"crypto/md5"
"errors"
"fmt"
Expand All @@ -9,6 +10,7 @@ import (
log "github.com/sirupsen/logrus"
"gopkg.in/yaml.v2"
"io/ioutil"
"slices"
"time"
)

Expand Down Expand Up @@ -88,9 +90,27 @@ func (dynamicConfigDatabaseClient *DynamicConfigDatabaseClient) Delete(data inte

func (dynamicConfigDatabaseClient *DynamicConfigDatabaseClient) Get(env, identity string) (interface{}, error) {
//Variable renaming is done to re-purpose existing interface
err := checkIfDynamicConfigDatabaseClientIsInitialized(dynamicConfigDatabaseClient)

if err != nil {
return nil, err
}

return dynamicConfigDatabaseClient.dynamoClient.getDynamicConfig(env, identity, dynamicConfigDatabaseClient.database.TableName)
}

func checkIfDynamicConfigDatabaseClientIsInitialized(dynamicConfigDatabaseClient *DynamicConfigDatabaseClient) error {
if dynamicConfigDatabaseClient == nil || dynamicConfigDatabaseClient.dynamoClient == nil {
return fmt.Errorf("task=%s, dynamoClient is not initialized", common.DynamicConfigUpdate)
}

if dynamicConfigDatabaseClient.database == nil {
return fmt.Errorf("task=%s, database is not initialized", common.DynamicConfigUpdate)
}

return nil
}

func (databaseClient *DummyDatabaseClient) Update(data interface{}, logger *log.Entry) error {
return nil
}
Expand Down Expand Up @@ -158,16 +178,25 @@ func NewDynamicConfigDatabaseClient(path string, dynamoClientInitFunc func(role
return &dynamicConfigClient, nil
}

func UpdateASyncAdmiralConfig(dbClient AdmiralDatabaseManager, syncTime int) {
func UpdateASyncAdmiralConfig(ctxDynamicConfig context.Context, rr *RemoteRegistry, syncTime int) {

for range time.Tick(time.Minute * time.Duration(syncTime)) {
ReadAndUpdateSyncAdmiralConfig(dbClient)
ticker := time.NewTicker(time.Minute * time.Duration(syncTime))
defer ticker.Stop()

for {
select {
case <-ctxDynamicConfig.Done():
log.Infof("task=%v, context done stopping ticker", common.DynamicConfigUpdate)
return
case <-ticker.C:
ReadAndUpdateSyncAdmiralConfig(rr)
}
}
}

func ReadAndUpdateSyncAdmiralConfig(dbClient AdmiralDatabaseManager) error {
func ReadAndUpdateSyncAdmiralConfig(rr *RemoteRegistry) error {

dbRawData, err := dbClient.Get("EnableDynamicConfig", common.Admiral)
dbRawData, err := rr.DynamicConfigDatabaseClient.Get("EnableDynamicConfig", common.Admiral)
if err != nil {
log.Errorf("task=%s, error getting EnableDynamicConfig admiral config, err: %v", common.DynamicConfigUpdate, err)
return err
Expand All @@ -181,13 +210,65 @@ func ReadAndUpdateSyncAdmiralConfig(dbClient AdmiralDatabaseManager) error {
if IsDynamicConfigChanged(configData) {
log.Infof(fmt.Sprintf("task=%s, updating DynamicConfigData with Admiral config", common.DynamicConfigUpdate))
UpdateSyncAdmiralConfig(configData)

ctx := context.Context(context.Background())
//Process NLB Cluster
processLBMigration(ctx, rr, common.GetAdmiralParams().NLBEnabledClusters, &rr.AdmiralCache.NLBEnabledCluster, common.GetAdmiralParams().NLBIngressLabel)
//Process CLB Cluster
processLBMigration(ctx, rr, common.GetAdmiralParams().CLBEnabledClusters, &rr.AdmiralCache.CLBEnabledCluster, common.GetAdmiralParams().LabelSet.GatewayApp)
} else {
log.Infof(fmt.Sprintf("task=%s, no need to update DynamicConfigData", common.DynamicConfigUpdate))
}

return nil
}

func processLBMigration(ctx context.Context, rr *RemoteRegistry, updatedLBs []string, existingCache *[]string, lbLabel string) {

log.Infof("task=%s, Processing LB migration for %s. UpdateReceived=%s, ExistingCache=%s, ", common.LBUpdateProcessor, lbLabel, updatedLBs, existingCache)

for _, cluster := range getLBToProcess(updatedLBs, existingCache) {
err := isServiceControllerInitialized(rr.remoteControllers[cluster])
if err == nil {
for _, fetchService := range rr.remoteControllers[cluster].ServiceController.Cache.Get(common.NamespaceIstioSystem) {
if fetchService.Labels[common.App] == lbLabel {
log.Infof("task=%s, Cluster=%s, Processing LB migration for Cluster.", common.LBUpdateProcessor, cluster)
go handleServiceEventForDeployment(ctx, fetchService, rr, cluster, rr.GetRemoteController(cluster).DeploymentController, rr.GetRemoteController(cluster).ServiceController, HandleEventForDeployment)
go handleServiceEventForRollout(ctx, fetchService, rr, cluster, rr.GetRemoteController(cluster).RolloutController, rr.GetRemoteController(cluster).ServiceController, HandleEventForRollout)
}
}
} else {
log.Infof("task=%s, Cluster=%s, Service Controller not initializ. Skipped LB migration for Cluster.", common.LBUpdateProcessor, cluster)
}
}
}

func getLBToProcess(updatedLB []string, cache *[]string) []string {

var clusersToProcess []string
if cache == nil || len(*cache) == 0 {
*cache = updatedLB
return updatedLB
}
//Validate if New ClusterAdded
for _, clusterFromAdmiralParam := range updatedLB {
if !slices.Contains(*cache, clusterFromAdmiralParam) {
clusersToProcess = append(clusersToProcess, clusterFromAdmiralParam)
*cache = append(*cache, clusterFromAdmiralParam)
}
}

//Validate if cluster Removed
for i, clusterFromCache := range *cache {
if !slices.Contains(updatedLB, clusterFromCache) {
clusersToProcess = append(clusersToProcess, clusterFromCache)
*cache = slices.Delete(*cache, i, i+1)
}
}

return clusersToProcess
}

func IsDynamicConfigChanged(config DynamicConfigData) bool {

if config.EnableDynamicConfig != common.Admiral {
Expand Down
125 changes: 122 additions & 3 deletions admiral/pkg/clusters/admiralDatabaseClient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,17 @@ import (
"errors"
"fmt"
v1 "github.com/istio-ecosystem/admiral/admiral/apis/v1"
"github.com/istio-ecosystem/admiral/admiral/pkg/client/loader"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/admiral"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/common"
"github.com/istio-ecosystem/admiral/admiral/pkg/test"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
k8sv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/rest"
"testing"
"time"
)

/*
Expand Down Expand Up @@ -571,9 +578,9 @@ func (d DummyDynamicConfigDatabaseClient) Delete(data interface{}, logger *log.E
func (d DummyDynamicConfigDatabaseClient) Get(env, identity string) (interface{}, error) {
dummyDynamicConfigData := DynamicConfigData{
EnableDynamicConfig: common.Admiral,
NLBEnabledClusters: []string{"cluster1", "cluster2"},
NLBEnabledClusters: []string{"cluster1"},
NLBEnabledIdentityList: []string{"identity1", "identity2"},
CLBEnabledClusters: []string{"cluster1", "cluster2"},
CLBEnabledClusters: []string{"cluster1"},
}

return dummyDynamicConfigData, nil
Expand All @@ -582,6 +589,90 @@ func (d DummyDynamicConfigDatabaseClient) Get(env, identity string) (interface{}
func TestReadAndUpdateSyncAdmiralConfig(t *testing.T) {

var testData DummyDynamicConfigDatabaseClient

testAdmiralParam := common.GetAdmiralParams()
testAdmiralParam.LabelSet.GatewayApp = common.IstioIngressGatewayLabelValue
testAdmiralParam.NLBIngressLabel = common.NLBIstioIngressGatewayLabelValue
testAdmiralParam.NLBEnabledClusters = []string{"cluster1"}

common.UpdateAdmiralParams(testAdmiralParam)

rr := NewRemoteRegistry(nil, common.AdmiralParams{})

stop := make(chan struct{})
config := rest.Config{
Host: "localhost",
}

testService := k8sv1.Service{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: "clb",
Namespace: common.NamespaceIstioSystem,
Generation: 0,
CreationTimestamp: metav1.Time{},
Labels: map[string]string{common.App: common.IstioIngressGatewayLabelValue},
},
Spec: k8sv1.ServiceSpec{},
Status: k8sv1.ServiceStatus{
LoadBalancer: k8sv1.LoadBalancerStatus{Ingress: make([]k8sv1.LoadBalancerIngress, 0)},
Conditions: nil,
},
}

portStatus := k8sv1.PortStatus{
Port: 007,
Protocol: "HTTP",
Error: nil,
}

testLoadBalancerIngress := k8sv1.LoadBalancerIngress{
IP: "007.007.007.007",
Hostname: "clb.istio.com",
IPMode: nil,
Ports: make([]k8sv1.PortStatus, 0),
}
testLoadBalancerIngress.Ports = append(testLoadBalancerIngress.Ports, portStatus)
testService.Status.LoadBalancer.Ingress = append(testService.Status.LoadBalancer.Ingress, testLoadBalancerIngress)

testService1 := testService.DeepCopy()
testService1.Name = "nlb"
testService1.Labels[common.App] = common.NLBIstioIngressGatewayLabelValue
testService1.Status.LoadBalancer.Ingress[0].Hostname = "nlb.istio.com"

testService2 := testService1.DeepCopy()
testService2.Labels[common.App] = common.NLBIstioIngressGatewayLabelValue + "TEST"
testService2.Name = "nlb2"

testServiceControler, _ := admiral.NewServiceController(stop, &test.MockServiceHandler{}, &config, time.Second*time.Duration(300), loader.GetFakeClientLoader())
testServiceControler.Cache.Put(&testService)
testServiceControler.Cache.Put(testService1)
testServiceControler.Cache.Put(testService2)

rr.remoteControllers["cluster1"] = &RemoteController{
ClusterID: "",
ApiServer: "",
StartTime: time.Time{},
GlobalTraffic: nil,
DeploymentController: nil,
ServiceController: testServiceControler,
NodeController: nil,
ServiceEntryController: nil,
DestinationRuleController: nil,
VirtualServiceController: nil,
SidecarController: nil,
RolloutController: nil,
RoutingPolicyController: nil,
OutlierDetectionController: nil,
ClientConnectionConfigController: nil,
JobController: nil,
VertexController: nil,
MonoVertexController: nil,
stop: nil,
}

rr.DynamicConfigDatabaseClient = testData

type args struct {
dbClient AdmiralDatabaseManager
}
Expand All @@ -596,7 +687,7 @@ func TestReadAndUpdateSyncAdmiralConfig(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := ReadAndUpdateSyncAdmiralConfig(tt.args.dbClient)
err := ReadAndUpdateSyncAdmiralConfig(rr)
if tt.wantErr != nil {
assert.Contains(t, err.Error(), tt.wantErr.Error(), "ReadAndUpdateSyncAdmiralConfig(). Expect error containing %s but got error = %v", tt.wantErr.Error(), err.Error())
} else {
Expand All @@ -605,3 +696,31 @@ func TestReadAndUpdateSyncAdmiralConfig(t *testing.T) {
})
}
}

func Test_getLBToProcess(t *testing.T) {
rr := NewRemoteRegistry(nil, common.AdmiralParams{})
rr.AdmiralCache.NLBEnabledCluster = []string{}

type args struct {
updatedLB []string
cache *[]string
}
tests := []struct {
name string
args args
want []string
}{
{"When cache is not updated then getLBToProcess should be all updated list ",
args{updatedLB: []string{"cluster1", "cluster2"}, cache: &[]string{}}, []string{"cluster1", "cluster2"}},
{"When cluster is removed from update list then getLBToProcess should return removed cluster",
args{updatedLB: []string{"cluster1", "cluster2"}, cache: &[]string{"cluster1", "cluster2", "cluster3"}}, []string{"cluster3"}},
{"When cluster is added from update list then getLBToProcess should return added cluster",
args{updatedLB: []string{"cluster1", "cluster2", "cluster3"}, cache: &[]string{"cluster1", "cluster2"}}, []string{"cluster3"}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t, tt.want, getLBToProcess(tt.args.updatedLB, tt.args.cache), "getLBToProcess(%v, %v)", tt.args.updatedLB, *tt.args.cache)
assert.Equal(t, tt.args.updatedLB, *tt.args.cache, "getLBToProcess should update cache based upon params")
})
}
}
Loading

0 comments on commit 94daadf

Please sign in to comment.