Skip to content

Commit

Permalink
Merge branch 'ossmaster' into MESH-5201-Statesyncer-Registry
Browse files Browse the repository at this point in the history
# Conflicts:
#	admiral/pkg/clusters/util.go
  • Loading branch information
Ryan Tay committed Dec 12, 2024
2 parents 23e3d42 + 4c606db commit 62ff439
Show file tree
Hide file tree
Showing 28 changed files with 410 additions and 12 deletions.
1 change: 1 addition & 0 deletions admiral/cmd/admiral/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ func GetRootCmd(args []string) *cobra.Command {
rootCmd.PersistentFlags().Int64Var(&params.DefaultWarmupDurationSecs, "default_warmup_duration_in_seconds", 45, "The default value for the warmupDurationSecs to be used on Destination Rules created by admiral")

rootCmd.PersistentFlags().BoolVar(&params.EnableGenerationCheck, "enable_generation_check", true, "Enable/Disable Generation Check")
rootCmd.PersistentFlags().BoolVar(&params.EnableIsOnlyReplicaCountChangedCheck, "enable_replica_count_check", false, "Enable/Disable Replica Count Check")
rootCmd.PersistentFlags().BoolVar(&params.ClientInitiatedProcessingEnabled, "client_initiated_processing_enabled", true, "Enable/Disable Client Initiated Processing")
rootCmd.PersistentFlags().BoolVar(&params.PreventSplitBrain, "prevent_split_brain", true, "Enable/Disable Explicit Split Brain prevention logic")

Expand Down
1 change: 1 addition & 0 deletions admiral/pkg/clusters/deployment_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func HandleEventForDeployment(ctx context.Context, event admiral.EventType, obj
// Use the same function as added deployment function to update and put new service entry in place to replace old one
_, err := modifyServiceEntryForNewServiceOrPod(ctx, event, env, globalIdentifier, remoteRegistry)
if common.ClientInitiatedProcessingEnabled() {
log.Infof(LogFormat, event, common.DeploymentResourceType, obj.Name, clusterName, "Client initiated processing started for "+globalIdentifier)
depProcessErr := processClientDependencyRecord(ctx, remoteRegistry, globalIdentifier, clusterName, obj.Namespace)
if depProcessErr != nil {
return common.AppendError(err, depProcessErr)
Expand Down
1 change: 1 addition & 0 deletions admiral/pkg/clusters/rollout_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func HandleEventForRollout(ctx context.Context, event admiral.EventType, obj *ar
_, err := modifyServiceEntryForNewServiceOrPod(ctx, event, env, globalIdentifier, remoteRegistry)

if common.ClientInitiatedProcessingEnabled() {
log.Infof(LogFormat, event, common.DeploymentResourceType, obj.Name, clusterName, "Client initiated processing started for "+globalIdentifier)
rolloutProcessErr := processClientDependencyRecord(ctx, remoteRegistry, globalIdentifier, clusterName, obj.Namespace)
if rolloutProcessErr != nil {
return common.AppendError(err, rolloutProcessErr)
Expand Down
26 changes: 14 additions & 12 deletions admiral/pkg/clusters/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,20 @@ func parseMigrationService(migrationServices map[string]*k8sV1.Service, meshPort
})
return services
}
func processClientDependencyRecord(ctx context.Context, remoteRegistry *RemoteRegistry, globalIdentifier string, clusterName string, clientNs string) error {
var destinationsToBeProcessed []string

destinationsToBeProcessed = getDestinationsToBeProcessedForClientInitiatedProcessing(remoteRegistry, globalIdentifier, clusterName, clientNs, destinationsToBeProcessed)
log.Infof(LogFormat, "Update", common.DependencyResourceType, globalIdentifier, clusterName, fmt.Sprintf("destinationsToBeProcessed=%v", destinationsToBeProcessed))
var sourceClusterMap = common.NewMap()
sourceClusterMap.Put(clusterName, clusterName)

err := processDestinationsForSourceIdentity(ctx, remoteRegistry, "Update", true, sourceClusterMap, destinationsToBeProcessed, globalIdentifier, modifyServiceEntryForNewServiceOrPod)
if err != nil {
return errors.New("failed to perform client initiated processing for " + globalIdentifier + ", got error: " + err.Error())
}
return nil
}

func getDestinationsToBeProcessedForClientInitiatedProcessing(remoteRegistry *RemoteRegistry, globalIdentifier string, clusterName string, clientNs string, destinationsToBeProcessed []string) []string {
actualServerIdentities := remoteRegistry.AdmiralCache.SourceToDestinations.Get(globalIdentifier)
Expand Down Expand Up @@ -516,15 +530,3 @@ func (r RouteDestinationSorted) Less(i, j int) bool {
func (r RouteDestinationSorted) Swap(i, j int) {
r[i], r[j] = r[j], r[i]
}

func processClientDependencyRecord(ctx context.Context, remoteRegistry *RemoteRegistry, globalIdentifier string, clusterName string, clientNs string) error {
var destinationsToBeProcessed []string

destinationsToBeProcessed = getDestinationsToBeProcessedForClientInitiatedProcessing(remoteRegistry, globalIdentifier, clusterName, clientNs, destinationsToBeProcessed)

var sourceClusterMap = common.NewMap()
sourceClusterMap.Put(clusterName, clusterName)

err := processDestinationsForSourceIdentity(ctx, remoteRegistry, "Update", true, sourceClusterMap, destinationsToBeProcessed, globalIdentifier, modifyServiceEntryForNewServiceOrPod)
return err
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func (c *ClientConnectionConfigController) DoesGenerationMatch(*log.Entry, inter
return false, nil
}

func (c *ClientConnectionConfigController) IsOnlyReplicaCountChanged(*log.Entry, interface{}, interface{}) (bool, error) {
return false, nil
}

type clientConnectionSettingsItem struct {
clientConnectionSettings *v1.ClientConnectionConfig
status string
Expand Down
14 changes: 14 additions & 0 deletions admiral/pkg/controller/admiral/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type Delegator interface {
LogValueOfAdmiralIoIgnore(interface{})
Get(ctx context.Context, isRetry bool, obj interface{}) (interface{}, error)
DoesGenerationMatch(*log.Entry, interface{}, interface{}) (bool, error)
IsOnlyReplicaCountChanged(*log.Entry, interface{}, interface{}) (bool, error)
}

type EventType string
Expand Down Expand Up @@ -172,6 +173,19 @@ func NewController(name, clusterEndpoint string, stopCh <-chan struct{}, delegat
return
}

// Check if the generation of the object has changed
// if the generation of old and new object is same then we do not process the object
isOnlyReplicaCountChanged, err := controller.delegator.IsOnlyReplicaCountChanged(ctxLogger, oldObj, newObj)
if err != nil {
ctxLogger.Errorf(ControllerLogFormat, taskAddEventToQueue, controller.queue.Len(), err.Error())
}
if status == common.Processed && isOnlyReplicaCountChanged {
ctxLogger.Infof(ControllerLogFormat, taskAddEventToQueue, controller.queue.Len(),
fmt.Sprintf("skipped processing event due to status=%s isOnlyReplicaCountChanged=%v",
status, isOnlyReplicaCountChanged))
return
}

controller.delegator.LogValueOfAdmiralIoIgnore(newObj)
latestObj, isVersionChanged := checkIfResourceVersionHasIncreased(ctxLogger, ctx, oldObj, newObj, delegator)
txId, ctxLogger = updateTxId(ctx, newObj, latestObj, txId, ctxLogger, controller)
Expand Down
4 changes: 4 additions & 0 deletions admiral/pkg/controller/admiral/delegator_mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ func (m *MockDelegator) DoesGenerationMatch(ctx *log.Entry, i interface{}, i2 in
return false, nil
}

func (m *MockDelegator) IsOnlyReplicaCountChanged(ctx *log.Entry, i interface{}, i2 interface{}) (bool, error) {
return false, nil
}

func NewMockDelegator() *MockDelegator {
return &MockDelegator{}
}
Expand Down
4 changes: 4 additions & 0 deletions admiral/pkg/controller/admiral/dependency.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ func (d *DependencyController) DoesGenerationMatch(*log.Entry, interface{}, inte
return false, nil
}

func (c *DependencyController) IsOnlyReplicaCountChanged(*log.Entry, interface{}, interface{}) (bool, error) {
return false, nil
}

type DependencyItem struct {
Dependency *v1.Dependency
Status string
Expand Down
36 changes: 36 additions & 0 deletions admiral/pkg/controller/admiral/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package admiral
import (
"context"
"fmt"
"reflect"
"time"

"github.com/istio-ecosystem/admiral/admiral/pkg/client/loader"
Expand Down Expand Up @@ -72,6 +73,41 @@ func (d *DeploymentController) DoesGenerationMatch(ctxLogger *log.Entry, obj int
return false, nil
}

func (d *DeploymentController) IsOnlyReplicaCountChanged(ctxLogger *log.Entry, obj interface{}, oldObj interface{}) (bool, error) {
if !common.IsOnlyReplicaCountChanged() {
ctxLogger.Debugf(ControllerLogFormat, "IsOnlyReplicaCountChanged", "",
fmt.Sprintf("replica count check is disabled"))
return false, nil
}
deploymentNew, ok := obj.(*k8sAppsV1.Deployment)
if !ok {
return false, fmt.Errorf("type assertion failed, %v is not of type *v1.Deployment", obj)
}
deploymentOld, ok := oldObj.(*k8sAppsV1.Deployment)
if !ok {
return false, fmt.Errorf("type assertion failed, %v is not of type *v1.Deployment", oldObj)
}

// Temporarily storing replica count to use later after the check is complete
newReplicaCount := deploymentNew.Spec.Replicas
oldReplicaCount := deploymentOld.Spec.Replicas

deploymentNew.Spec.Replicas = nil
deploymentOld.Spec.Replicas = nil

if reflect.DeepEqual(deploymentOld.Spec, deploymentNew.Spec) {
ctxLogger.Infof(ControllerLogFormat, "IsOnlyReplicaCountChanged", "",
fmt.Sprintf("old and new spec matched for deployment excluding replica count %s", deploymentNew.Name))
deploymentNew.Spec.Replicas = newReplicaCount
deploymentOld.Spec.Replicas = oldReplicaCount
return true, nil
}

deploymentNew.Spec.Replicas = newReplicaCount
deploymentOld.Spec.Replicas = oldReplicaCount
return false, nil
}

type deploymentCache struct {
//map of dependencies key=identity value array of onboarded identities
cache map[string]*DeploymentClusterEntry
Expand Down
116 changes: 116 additions & 0 deletions admiral/pkg/controller/admiral/deployment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,122 @@ func TestDeploymentControlle_DoesGenerationMatch(t *testing.T) {

}

func TestDeploymentController_IsOnlyReplicaCountChanged(t *testing.T) {
dc := DeploymentController{}
replicaNewCount := int32(1)
replicaOldCount := int32(2)

admiralParams := common.AdmiralParams{}

testCases := []struct {
name string
deploymentNew interface{}
deploymentOld interface{}
enableIsOnlyReplicaCountChangedCheck bool
expectedValue bool
expectedError error
}{
{
name: "Given context, new deploy and old deploy object " +
"When new deploy is not of type *v1.Deployment " +
"Then func should return an error",
deploymentNew: struct{}{},
deploymentOld: struct{}{},
enableIsOnlyReplicaCountChangedCheck: true,
expectedError: fmt.Errorf("type assertion failed, {} is not of type *v1.Deployment"),
},
{
name: "Given context, new deploy and old deploy object " +
"When old deploy is not of type *v1.Deployment " +
"Then func should return an error",
deploymentNew: struct{}{},
deploymentOld: struct{}{},
enableIsOnlyReplicaCountChangedCheck: true,
expectedError: fmt.Errorf("type assertion failed, {} is not of type *v1.Deployment"),
},
{
name: "Given context, new deploy and old deploy object " +
"When is replica count changed check is enabled " +
"And everything in the spec expect the count is the same " +
"Then func should return true ",
deploymentNew: &k8sAppsV1.Deployment{
Spec: k8sAppsV1.DeploymentSpec{
Replicas: &replicaNewCount,
},
},
deploymentOld: &k8sAppsV1.Deployment{
Spec: k8sAppsV1.DeploymentSpec{
Replicas: &replicaOldCount,
},
},
expectedValue: true,
expectedError: nil,
enableIsOnlyReplicaCountChangedCheck: true,
},
{
name: "Given context, new deploy and old deploy object " +
"When deploy is replica count changed check is disabled " +
"Then func should return false",
deploymentNew: &k8sAppsV1.Deployment{
Spec: k8sAppsV1.DeploymentSpec{
Replicas: &replicaNewCount,
},
},
deploymentOld: &k8sAppsV1.Deployment{
Spec: k8sAppsV1.DeploymentSpec{
Replicas: &replicaOldCount,
},
},
expectedValue: false,
expectedError: nil,
enableIsOnlyReplicaCountChangedCheck: false,
},
{
name: "Given context, new deploy and old deploy object " +
"When is replica count changed check is enabled " +
"And something in the spec expect the count is different " +
"Then func should return false ",
deploymentNew: &k8sAppsV1.Deployment{
Spec: k8sAppsV1.DeploymentSpec{
Replicas: &replicaNewCount,
Paused: false,
},
},
deploymentOld: &k8sAppsV1.Deployment{
Spec: k8sAppsV1.DeploymentSpec{
Replicas: &replicaOldCount,
Paused: true,
},
},
expectedValue: false,
enableIsOnlyReplicaCountChangedCheck: true,
expectedError: nil,
},
}

ctxLogger := log.WithFields(log.Fields{
"txId": "abc",
})

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
admiralParams.EnableIsOnlyReplicaCountChangedCheck = tc.enableIsOnlyReplicaCountChangedCheck
common.ResetSync()
common.InitializeConfig(admiralParams)
actual, err := dc.IsOnlyReplicaCountChanged(ctxLogger, tc.deploymentNew, tc.deploymentOld)
if !ErrorEqualOrSimilar(err, tc.expectedError) {
t.Errorf("expected: %v, got: %v", tc.expectedError, err)
}
if err == nil {
if tc.expectedValue != actual {
t.Errorf("expected: %v, got: %v", tc.expectedValue, actual)
}
}
})
}

}

func TestNewDeploymentController(t *testing.T) {
config, err := clientcmd.BuildConfigFromFlags("", "../../test/resources/[email protected]")
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions admiral/pkg/controller/admiral/envoyfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ func (e *EnvoyFilterController) DoesGenerationMatch(*log.Entry, interface{}, int
return false, nil
}

func (e *EnvoyFilterController) IsOnlyReplicaCountChanged(*log.Entry, interface{}, interface{}) (bool, error) {
return false, nil
}

func (e *EnvoyFilterController) Added(ctx context.Context, obj interface{}) error {
ef, ok := obj.(*networking.EnvoyFilter)
if !ok {
Expand Down
4 changes: 4 additions & 0 deletions admiral/pkg/controller/admiral/globaltraffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ func (d *GlobalTrafficController) DoesGenerationMatch(*logrus.Entry, interface{}
return false, nil
}

func (d *GlobalTrafficController) IsOnlyReplicaCountChanged(*logrus.Entry, interface{}, interface{}) (bool, error) {
return false, nil
}

type gtpItem struct {
GlobalTrafficPolicy *v1.GlobalTrafficPolicy
Status string
Expand Down
4 changes: 4 additions & 0 deletions admiral/pkg/controller/admiral/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,10 @@ func (p *JobController) DoesGenerationMatch(ctxLogger *log.Entry, obj interface{
return false, nil
}

func (p *JobController) IsOnlyReplicaCountChanged(*log.Entry, interface{}, interface{}) (bool, error) {
return false, nil
}

func NewJobController(stopCh <-chan struct{}, handler ClientDiscoveryHandler, config *rest.Config, resyncPeriod time.Duration, clientLoader loader.ClientLoader) (*JobController, error) {

jobController := JobController{}
Expand Down
4 changes: 4 additions & 0 deletions admiral/pkg/controller/admiral/monovertex.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,10 @@ func (p *MonoVertexController) DoesGenerationMatch(ctxLogger *log.Entry, obj int
return false, nil
}

func (p *MonoVertexController) IsOnlyReplicaCountChanged(*log.Entry, interface{}, interface{}) (bool, error) {
return false, nil
}

func NewMonoVertexController(stopCh <-chan struct{}, handler ClientDiscoveryHandler, config *rest.Config, resyncPeriod time.Duration, clientLoader loader.ClientLoader) (*MonoVertexController, error) {

monoVertexController := MonoVertexController{}
Expand Down
4 changes: 4 additions & 0 deletions admiral/pkg/controller/admiral/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ func (p *NodeController) DoesGenerationMatch(*log.Entry, interface{}, interface{
return false, nil
}

func (p *NodeController) IsOnlyReplicaCountChanged(*log.Entry, interface{}, interface{}) (bool, error) {
return false, nil
}

type Locality struct {
Region string
}
Expand Down
4 changes: 4 additions & 0 deletions admiral/pkg/controller/admiral/outlierdetection.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func (o *OutlierDetectionController) DoesGenerationMatch(*logrus.Entry, interfac
return false, nil
}

func (o *OutlierDetectionController) IsOnlyReplicaCountChanged(*logrus.Entry, interface{}, interface{}) (bool, error) {
return false, nil
}

func (c *odCache) Put(od *v1.OutlierDetection) {

defer c.mutex.Unlock()
Expand Down
Loading

0 comments on commit 62ff439

Please sign in to comment.