Skip to content

Commit

Permalink
Add configMap informer (julienschmidt#326)
Browse files Browse the repository at this point in the history
Signed-off-by: laminar <[email protected]>
Signed-off-by: laminar <[email protected]>

Co-authored-by: Aaron Schlesinger <[email protected]>
  • Loading branch information
tpiperatgod and arschles authored Dec 14, 2021
1 parent 8622cf5 commit 201c7cb
Show file tree
Hide file tree
Showing 8 changed files with 255 additions and 68 deletions.
14 changes: 5 additions & 9 deletions interceptor/config/serving.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package config

import (
"time"

"github.com/kelseyhightower/envconfig"
)

Expand All @@ -16,15 +18,9 @@ type Serving struct {
// This is the server that the external scaler will issue metrics
// requests to
AdminPort int `envconfig:"KEDA_HTTP_ADMIN_PORT" required:"true"`
// RoutingTableUpdateDurationMS is the interval (in milliseconds) representing how
// often to do a complete update of the routing table ConfigMap.
//
// The interceptor will also open a watch stream to the routing table
// ConfigMap and attempt to update the routing table on every update.
//
// Since it does full updates alongside watch stream updates, it can
// only process one at a time. Therefore, this is a best effort timeout
RoutingTableUpdateDurationMS int `envconfig:"KEDA_HTTP_ROUTING_TABLE_UPDATE_DURATION_MS" default:"500"`
// ConfigMapCacheRsyncPeriod is the time interval
// for the configmap informer to rsync the local cache.
ConfigMapCacheRsyncPeriod time.Duration `envconfig:"KEDA_HTTP_SCALER_CONFIG_MAP_INFORMER_RSYNC_PERIOD" default:"60m"`
// The interceptor has an internal process that periodically fetches the state
// of deployment that is running the servers it forwards to.
//
Expand Down
14 changes: 12 additions & 2 deletions interceptor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ func main() {
q := queue.NewMemory()
routingTable := routing.NewTable()

// Create the informer of ConfigMap resource,
// the resynchronization period of the informer should be not less than 1s,
// refer to: https://github.com/kubernetes/client-go/blob/v0.22.2/tools/cache/shared_informer.go#L475
configMapInformer := k8s.NewInformerConfigMapUpdater(
lggr,
cl,
servingCfg.ConfigMapCacheRsyncPeriod,
)

lggr.Info(
"Fetching initial routing table",
)
Expand Down Expand Up @@ -109,10 +118,11 @@ func main() {
err := routing.StartConfigMapRoutingTableUpdater(
ctx,
lggr,
time.Duration(servingCfg.RoutingTableUpdateDurationMS)*time.Millisecond,
configMapsInterface,
configMapInformer,
servingCfg.CurrentNamespace,
routingTable,
q,
nil,
)
lggr.Error(err, "config map routing table updater failed")
return err
Expand Down
132 changes: 132 additions & 0 deletions pkg/k8s/config_map_cache_informer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
package k8s

import (
"context"
"encoding/json"
"fmt"
"time"

"github.com/go-logr/logr"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/informers"
infcorev1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)

type InformerConfigMapUpdater struct {
lggr logr.Logger
cmInformer infcorev1.ConfigMapInformer
bcaster *watch.Broadcaster
}

func (i *InformerConfigMapUpdater) MarshalJSON() ([]byte, error) {
lst := i.cmInformer.Lister()
cms, err := lst.List(labels.Everything())
if err != nil {
return nil, err
}
return json.Marshal(&cms)
}

func (i *InformerConfigMapUpdater) Start(ctx context.Context) error {
i.cmInformer.Informer().Run(ctx.Done())
return errors.Wrap(
ctx.Err(),
"configMap informer was stopped",
)
}

func (i *InformerConfigMapUpdater) Get(
ns,
name string,
) (corev1.ConfigMap, error) {
cm, err := i.cmInformer.Lister().ConfigMaps(ns).Get(name)
if err != nil {
return corev1.ConfigMap{}, err
}
return *cm, nil
}

func (i *InformerConfigMapUpdater) Watch(
ns,
name string,
) watch.Interface {
return watch.Filter(i.bcaster.Watch(), func(e watch.Event) (watch.Event, bool) {
cm, ok := e.Object.(*corev1.ConfigMap)
if !ok {
i.lggr.Error(
fmt.Errorf("informer expected ConfigMap, ignoring this event"),
"event",
e,
)
return e, false
}
if cm.Namespace == ns && cm.Name == name {
return e, true
}
return e, false
})
}

func (i *InformerConfigMapUpdater) addEvtHandler(obj interface{}) {
cm, ok := obj.(*corev1.ConfigMap)
if !ok {
i.lggr.Error(
fmt.Errorf("informer expected configMap, got %v", obj),
"not forwarding event",
)
return
}
i.bcaster.Action(watch.Added, cm)
}

func (i *InformerConfigMapUpdater) updateEvtHandler(oldObj, newObj interface{}) {
cm, ok := newObj.(*corev1.ConfigMap)
if !ok {
i.lggr.Error(
fmt.Errorf("informer expected configMap, got %v", newObj),
"not forwarding event",
)
return
}
i.bcaster.Action(watch.Modified, cm)
}

func (i *InformerConfigMapUpdater) deleteEvtHandler(obj interface{}) {
cm, ok := obj.(*corev1.ConfigMap)
if !ok {
i.lggr.Error(
fmt.Errorf("informer expected configMap, got %v", obj),
"not forwarding event",
)
return
}
i.bcaster.Action(watch.Deleted, cm)
}

func NewInformerConfigMapUpdater(
lggr logr.Logger,
cl kubernetes.Interface,
defaultResync time.Duration,
) *InformerConfigMapUpdater {
factory := informers.NewSharedInformerFactory(
cl,
defaultResync,
)
cmInformer := factory.Core().V1().ConfigMaps()
ret := &InformerConfigMapUpdater{
lggr: lggr,
bcaster: watch.NewBroadcaster(0, watch.WaitIfChannelFull),
cmInformer: cmInformer,
}
ret.cmInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ret.addEvtHandler,
UpdateFunc: ret.updateEvtHandler,
DeleteFunc: ret.deleteEvtHandler,
})
return ret
}
80 changes: 44 additions & 36 deletions pkg/routing/config_map_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ package routing

import (
"context"
"time"

"github.com/go-logr/logr"
"github.com/kedacore/http-add-on/pkg/k8s"
"github.com/kedacore/http-add-on/pkg/queue"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
)

// StartConfigMapRoutingTableUpdater starts a loop that does the following:
Expand All @@ -21,49 +19,42 @@ import (
// called ConfigMapRoutingTableName. On either of those events, decodes
// that ConfigMap into a routing table and stores the new table into table
// using table.Replace(newTable)
// - Execute the callback function, if one exists
// - Returns an appropriate non-nil error if ctx.Done() receives
func StartConfigMapRoutingTableUpdater(
ctx context.Context,
lggr logr.Logger,
updateEvery time.Duration,
getterWatcher k8s.ConfigMapGetterWatcher,
cmInformer *k8s.InformerConfigMapUpdater,
ns string,
table *Table,
q queue.Counter,
cbFunc func() error,
) error {
lggr = lggr.WithName("pkg.routing.StartConfigMapRoutingTableUpdater")
watchIface, err := getterWatcher.Watch(ctx, metav1.ListOptions{})
if err != nil {
return err
}
defer watchIface.Stop()

ticker := time.NewTicker(updateEvery)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return errors.Wrap(ctx.Err(), "context is done")
case <-ticker.C:
if err := GetTable(ctx, lggr, getterWatcher, table, q); err != nil {
return errors.Wrap(err, "failed to fetch routing table")
}
watcher := cmInformer.Watch(ns, ConfigMapRoutingTableName)
defer watcher.Stop()

case evt := <-watchIface.ResultChan():
evtType := evt.Type
obj := evt.Object
if evtType == watch.Added || evtType == watch.Modified {
cm, ok := obj.(*corev1.ConfigMap)
// by definition of watchIface, all returned objects should
// be assertable to a ConfigMap. In the likely impossible
// case that it isn't, just ignore and move on.
// This check is just to be defensive.
ctx, done := context.WithCancel(ctx)
defer done()
grp, ctx := errgroup.WithContext(ctx)

grp.Go(func() error {
defer done()
return cmInformer.Start(ctx)
})

grp.Go(func() error {
defer done()
for {
select {
case event := <-watcher.ResultChan():
cm, ok := event.Object.(*corev1.ConfigMap)
// Theoretically this will not happen
if !ok {
continue
}
// the watcher is open on all ConfigMaps in the namespace, so
// bail out of this loop iteration immediately if the event
// isn't for the routing table ConfigMap.
if cm.Name != ConfigMapRoutingTableName {
lggr.Info(
"The event object observed is not a configmap",
)
continue
}
newTable, err := FetchTableFromConfigMap(cm, q)
Expand All @@ -81,8 +72,25 @@ func StartConfigMapRoutingTableUpdater(
)
continue
}
// Execute the callback function, if one exists
if cbFunc != nil {
if err := cbFunc(); err != nil {
lggr.Error(
err,
"failed to exec the callback function",
)
continue
}
}
case <-ctx.Done():
return errors.Wrap(ctx.Err(), "context is done")
}
}
}
})

if err := grp.Wait(); err != nil {
lggr.Error(err, "config map routing updater is failed")
return err
}
return nil
}
Loading

0 comments on commit 201c7cb

Please sign in to comment.