Skip to content

Commit

Permalink
Add replica count check
Browse files Browse the repository at this point in the history
  • Loading branch information
Kartikeya Pharasi authored and kpharasi committed Dec 9, 2024
1 parent 747a58e commit 564f504
Show file tree
Hide file tree
Showing 21 changed files with 933 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ func (c *ClientConnectionConfigController) DoesGenerationMatch(*log.Entry, inter
return false, nil
}

func (c *ClientConnectionConfigController) IsOnlyReplicaCountChanged(*log.Entry, interface{}, interface{}) (bool, error) {
return false, nil
}

type clientConnectionSettingsItem struct {
clientConnectionSettings *v1.ClientConnectionConfig
status string
Expand Down
14 changes: 14 additions & 0 deletions admiral/pkg/controller/admiral/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type Delegator interface {
LogValueOfAdmiralIoIgnore(interface{})
Get(ctx context.Context, isRetry bool, obj interface{}) (interface{}, error)
DoesGenerationMatch(*log.Entry, interface{}, interface{}) (bool, error)
IsOnlyReplicaCountChanged(*log.Entry, interface{}, interface{}) (bool, error)
}

type EventType string
Expand Down Expand Up @@ -167,6 +168,19 @@ func NewController(name, clusterEndpoint string, stopCh <-chan struct{}, delegat
return
}

// Check if the generation of the object has changed
// if the generation of old and new object is same then we do not process the object
isOnlyReplicaCountChanged, err := controller.delegator.IsOnlyReplicaCountChanged(ctxLogger, oldObj, newObj)
if err != nil {
ctxLogger.Errorf(ControllerLogFormat, taskAddEventToQueue, controller.queue.Len(), err.Error())
}
if status == common.Processed && isOnlyReplicaCountChanged {
ctxLogger.Infof(ControllerLogFormat, taskAddEventToQueue, controller.queue.Len(),
fmt.Sprintf("skipped processing event due to status=%s isOnlyReplicaCountChanged=%v",
status, isOnlyReplicaCountChanged))
return
}

controller.delegator.LogValueOfAdmiralIoIgnore(newObj)
latestObj, isVersionChanged := checkIfResourceVersionHasIncreased(ctxLogger, ctx, oldObj, newObj, delegator)
txId, ctxLogger = updateTxId(ctx, newObj, latestObj, txId, ctxLogger, controller)
Expand Down
4 changes: 4 additions & 0 deletions admiral/pkg/controller/admiral/dependency.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ func (d *DependencyController) DoesGenerationMatch(*log.Entry, interface{}, inte
return false, nil
}

func (c *DependencyController) IsOnlyReplicaCountChanged(*log.Entry, interface{}, interface{}) (bool, error) {
return false, nil
}

type DependencyItem struct {
Dependency *v1.Dependency
Status string
Expand Down
28 changes: 28 additions & 0 deletions admiral/pkg/controller/admiral/deployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package admiral
import (
"context"
"fmt"
"reflect"
"time"

"github.com/istio-ecosystem/admiral/admiral/pkg/client/loader"
Expand Down Expand Up @@ -72,6 +73,33 @@ func (d *DeploymentController) DoesGenerationMatch(ctxLogger *log.Entry, obj int
return false, nil
}

func (d *DeploymentController) IsOnlyReplicaCountChanged(ctxLogger *log.Entry, obj interface{}, oldObj interface{}) (bool, error) {
if !common.IsOnlyReplicaCountChanged() {
ctxLogger.Debugf(ControllerLogFormat, "IsOnlyReplicaCountChanged", "",
fmt.Sprintf("replica count check is disabled"))
return false, nil
}
deploymentNew, ok := obj.(*k8sAppsV1.Deployment)
if !ok {
return false, fmt.Errorf("type assertion failed, %v is not of type *v1.Deployment", obj)
}
deploymentOld, ok := oldObj.(*k8sAppsV1.Deployment)
if !ok {
return false, fmt.Errorf("type assertion failed, %v is not of type *v1.Deployment", oldObj)
}

deploymentNew.Spec.Replicas = nil
deploymentOld.Spec.Replicas = nil

if reflect.DeepEqual(deploymentOld.Spec, deploymentNew.Spec) {
ctxLogger.Infof(ControllerLogFormat, "IsOnlyReplicaCountChanged", "",
fmt.Sprintf("old and new spec matched for deployment excluding replica count %s", deploymentNew.Name))
return true, nil
}

return false, nil
}

type deploymentCache struct {
//map of dependencies key=identity value array of onboarded identities
cache map[string]*DeploymentClusterEntry
Expand Down
4 changes: 4 additions & 0 deletions admiral/pkg/controller/admiral/envoyfilter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ func (e *EnvoyFilterController) DoesGenerationMatch(*log.Entry, interface{}, int
return false, nil
}

func (e *EnvoyFilterController) IsOnlyReplicaCountChanged(*log.Entry, interface{}, interface{}) (bool, error) {
return false, nil
}

func (e *EnvoyFilterController) Added(ctx context.Context, obj interface{}) error {
ef, ok := obj.(*networking.EnvoyFilter)
if !ok {
Expand Down
4 changes: 4 additions & 0 deletions admiral/pkg/controller/admiral/globaltraffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ func (d *GlobalTrafficController) DoesGenerationMatch(*logrus.Entry, interface{}
return false, nil
}

func (d *GlobalTrafficController) IsOnlyReplicaCountChanged(*logrus.Entry, interface{}, interface{}) (bool, error) {
return false, nil
}

type gtpItem struct {
GlobalTrafficPolicy *v1.GlobalTrafficPolicy
Status string
Expand Down
258 changes: 258 additions & 0 deletions admiral/pkg/controller/admiral/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,258 @@
package admiral

import (
"context"
"fmt"
v12 "k8s.io/api/batch/v1"
meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/client-go/informers/batch/v1"
"sync"
"time"

"github.com/istio-ecosystem/admiral/admiral/pkg/client/loader"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/common"
log "github.com/sirupsen/logrus"
"k8s.io/client-go/rest"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)

//Job controller discovers jobs as mesh clients (its assumed that k8s Job doesn't have any ingress communication)

type JobController struct {
K8sClient kubernetes.Interface
JobHandler ClientDiscoveryHandler
informer cache.SharedIndexInformer
Cache *jobCache
}

type JobEntry struct {
Identity string
Jobs map[string]*common.K8sObject
}

type jobCache struct {
//map of dependencies key=identity value array of onboarded identities
cache map[string]*JobEntry
mutex *sync.Mutex
}

func NewJobCache() *jobCache {
return &jobCache{
cache: make(map[string]*JobEntry),
mutex: &sync.Mutex{},
}
}

func getK8sObjectFromJob(job *v12.Job) *common.K8sObject {
return &common.K8sObject{
Name: job.Name,
Namespace: job.Namespace,
Annotations: job.Spec.Template.Annotations,
Labels: job.Spec.Template.Labels,
Status: common.NotProcessed,
Type: common.Job,
}
}

func (p *jobCache) Put(job *common.K8sObject) *common.K8sObject {
defer p.mutex.Unlock()
p.mutex.Lock()
identity := common.GetGlobalIdentifier(job.Annotations, job.Labels)
existingJobs := p.cache[identity]
if existingJobs == nil {
existingJobs = &JobEntry{
Identity: identity,
Jobs: map[string]*common.K8sObject{job.Namespace: job},
}
p.cache[identity] = existingJobs
return job
} else {
jobInCache := existingJobs.Jobs[job.Namespace]
if jobInCache == nil {
existingJobs.Jobs[job.Namespace] = job
p.cache[identity] = existingJobs
return job
}
}
return job
}

func (p *jobCache) Get(key string, namespace string) *common.K8sObject {
defer p.mutex.Unlock()
p.mutex.Lock()

jce, ok := p.cache[key]
if ok {
j, ok := jce.Jobs[namespace]
if ok {
return j
}
}
return nil
}

func (p *jobCache) GetJobProcessStatus(job *v12.Job) (string, error) {
defer p.mutex.Unlock()
p.mutex.Lock()
jobObj := getK8sObjectFromJob(job)
identity := common.GetGlobalIdentifier(jobObj.Annotations, jobObj.Labels)

jce, ok := p.cache[identity]
if ok {
jobFromNamespace, ok := jce.Jobs[job.Namespace]
if ok {
return jobFromNamespace.Status, nil
}
}

return common.NotProcessed, nil
}

func (p *jobCache) UpdateJobProcessStatus(job *v12.Job, status string) error {
defer p.mutex.Unlock()
p.mutex.Lock()
jobObj := getK8sObjectFromJob(job)
identity := common.GetGlobalIdentifier(jobObj.Annotations, jobObj.Labels)

jce, ok := p.cache[identity]
if ok {
jobFromNamespace, ok := jce.Jobs[job.Namespace]
if ok {
jobFromNamespace.Status = status
p.cache[jce.Identity] = jce
return nil
} else {
newJob := getK8sObjectFromJob(job)
newJob.Status = status
jce.Jobs[job.Namespace] = newJob
p.cache[jce.Identity] = jce
return nil
}
}

return fmt.Errorf(LogCacheFormat, "UpdateStatus", "Job",
job.Name, job.Namespace, "", "nothing to update, job not found in cache")
}

func (p *JobController) DoesGenerationMatch(ctxLogger *log.Entry, obj interface{}, oldObj interface{}) (bool, error) {
if !common.DoGenerationCheck() {
ctxLogger.Debugf(ControllerLogFormat, "DoesGenerationMatch", "",
fmt.Sprintf("generation check is disabled"))
return false, nil
}
jobNew, ok := obj.(*v12.Job)
if !ok {
return false, fmt.Errorf("type assertion failed, %v is not of type *Job", obj)
}
jobOld, ok := oldObj.(*v12.Job)
if !ok {
return false, fmt.Errorf("type assertion failed, %v is not of type *Job", oldObj)
}
if jobNew.Generation == jobOld.Generation {
ctxLogger.Infof(ControllerLogFormat, "DoesGenerationMatch", "",
fmt.Sprintf("old and new generation matched for job %s", jobNew.Name))
return true, nil
}
return false, nil
}

func (p *JobController) IsOnlyReplicaCountChanged(*log.Entry, interface{}, interface{}) (bool, error) {
return false, nil
}

func NewJobController(stopCh <-chan struct{}, handler ClientDiscoveryHandler, config *rest.Config, resyncPeriod time.Duration, clientLoader loader.ClientLoader) (*JobController, error) {

jobController := JobController{}
jobController.JobHandler = handler

var err error

jobController.K8sClient, err = clientLoader.LoadKubeClientFromConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create dependency controller k8s client: %v", err)
}

jobController.informer = v1.NewJobInformer(
jobController.K8sClient,
meta_v1.NamespaceAll,
resyncPeriod,
cache.Indexers{},
)

jobController.Cache = NewJobCache()

NewController("job-ctrl", config.Host, stopCh, &jobController, jobController.informer)

return &jobController, nil
}

func (d *JobController) Added(ctx context.Context, obj interface{}) error {
return addUpdateJob(d, ctx, obj)
}

func (d *JobController) Updated(ctx context.Context, obj interface{}, oldObj interface{}) error {
//Not Required, this is a no-op as as Add event already handles registering this as a mesh client
return nil
}

func addUpdateJob(j *JobController, ctx context.Context, obj interface{}) error {
job, ok := obj.(*v12.Job)
if !ok {
return fmt.Errorf("failed to covert informer object to Job")
}
if !common.ShouldIgnore(job.Spec.Template.Annotations, job.Spec.Template.Labels) {
k8sObj := getK8sObjectFromJob(job)
newK8sObj := j.Cache.Put(k8sObj)
newK8sObj.Status = common.ProcessingInProgress
return j.JobHandler.Added(ctx, newK8sObj)
}
return nil
}

func (p *JobController) Deleted(ctx context.Context, obj interface{}) error {
//Not Required (to be handled via asset off boarding)
return nil
}

func (d *JobController) GetProcessItemStatus(obj interface{}) (string, error) {
job, ok := obj.(*v12.Job)
if !ok {
return common.NotProcessed, fmt.Errorf("type assertion failed, %v is not of type *common.K8sObject", obj)
}
return d.Cache.GetJobProcessStatus(job)
}

func (d *JobController) UpdateProcessItemStatus(obj interface{}, status string) error {
job, ok := obj.(*v12.Job)
if !ok {
return fmt.Errorf("type assertion failed, %v is not of type *Job", obj)
}
return d.Cache.UpdateJobProcessStatus(job, status)
}

func (d *JobController) LogValueOfAdmiralIoIgnore(obj interface{}) {
job, ok := obj.(*v12.Job)
if !ok {
return
}
jobObj := getK8sObjectFromJob(job)
if jobObj.Annotations[common.AdmiralIgnoreAnnotation] == "true" {
log.Infof("op=%s type=%v name=%v namespace=%s cluster=%s message=%s", "admiralIoIgnoreAnnotationCheck", common.MonoVertex,
job.Name, job.Namespace, "", "Value=true")
}
}

func (j *JobController) Get(ctx context.Context, isRetry bool, obj interface{}) (interface{}, error) {
job, ok := obj.(*v12.Job)
if ok && isRetry {
jobObj := getK8sObjectFromJob(job)
identity := common.GetGlobalIdentifier(jobObj.Annotations, jobObj.Labels)
return j.Cache.Get(identity, job.Namespace), nil
}
if ok && j.K8sClient != nil {
return j.K8sClient.BatchV1().Jobs(job.Namespace).Get(ctx, job.Name, meta_v1.GetOptions{})
}
return nil, fmt.Errorf("kubernetes client is not initialized, txId=%s", ctx.Value("txId"))
}
Loading

0 comments on commit 564f504

Please sign in to comment.