Skip to content

Commit

Permalink
swapping watch broadcaster channel behavior (julienschmidt#283)
Browse files Browse the repository at this point in the history
* swapping watch broadcaster channel behavior

Signed-off-by: Aaron Schlesinger <[email protected]>

* small test refactor

Signed-off-by: Aaron Schlesinger <[email protected]>

* more comments in test

Signed-off-by: Aaron Schlesinger <[email protected]>

* more comments and interface implementation check for fake deployment cache

Signed-off-by: Aaron Schlesinger <[email protected]>

* adding comment to closeableWatcher

Signed-off-by: Aaron Schlesinger <[email protected]>

* forgot the beginning of comment

Signed-off-by: Aaron Schlesinger <[email protected]>

* adding error checking to the closeableWatcher

Signed-off-by: Aaron Schlesinger <[email protected]>

* removing resultCalls from the closeable watcher

Signed-off-by: Aaron Schlesinger <[email protected]>

* removing the closeableWatcher in favor of the fake watcher functionality in client-go

Signed-off-by: Aaron Schlesinger <[email protected]>

* closeableWatcher => reopeningWatcher

The core functionality needed was automatic reopening of watch channels. This commit
implements that while removing some ancillary things like specifying whether
to reopen etc...

Signed-off-by: Aaron <[email protected]>

* fixing nil ptr exception

Signed-off-by: Aaron <[email protected]>

* clarifying comments

Signed-off-by: Aaron <[email protected]>
  • Loading branch information
arschles authored Oct 25, 2021
1 parent 41e86c1 commit 8ebbabc
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 83 deletions.
9 changes: 8 additions & 1 deletion pkg/k8s/deployment_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func NewK8sDeploymentCache(
cl DeploymentListerWatcher,
) (*K8sDeploymentCache, error) {
lggr = lggr.WithName("pkg.k8s.NewK8sDeploymentCache")
bcaster := watch.NewBroadcaster(5, watch.DropIfChannelFull)
bcaster := watch.NewBroadcaster(5, watch.WaitIfChannelFull)

ret := &K8sDeploymentCache{
latest: map[string]appsv1.Deployment{},
Expand Down Expand Up @@ -87,6 +87,7 @@ func (k *K8sDeploymentCache) StartWatcher(
"error creating new watch stream",
)
}
defer watcher.Stop()

ch := watcher.ResultChan()
fetchTicker := time.NewTicker(fetchTickDur)
Expand All @@ -109,6 +110,10 @@ func (k *K8sDeploymentCache) StartWatcher(
case evt, validRecv := <-ch:
// handle closed watch stream
if !validRecv {
// make sure to stop the watcher before doing anything else.
// below, we assign watcher to a new watcher, and that will
// then be closed in the defer we set up previously
watcher.Stop()
newWatcher, err := k.cl.Watch(ctx, metav1.ListOptions{})
if err != nil {
lggr.Error(
Expand All @@ -120,7 +125,9 @@ func (k *K8sDeploymentCache) StartWatcher(
"failed to re-open watch stream",
)
}

ch = newWatcher.ResultChan()
watcher = newWatcher
} else {
if err := k.addEvt(evt); err != nil {
lggr.Error(
Expand Down
4 changes: 4 additions & 0 deletions pkg/k8s/deployment_cache_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import (
"k8s.io/apimachinery/pkg/watch"
)

// FakeDeploymentCache is a fake implementation of
// DeploymentCache, suitable for testing interceptor-level
// logic, without requiring any real Kubernetes client
// or API interaction
type FakeDeploymentCache struct {
json.Marshaler
Mut *sync.RWMutex
Expand Down
84 changes: 62 additions & 22 deletions pkg/k8s/deployment_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package k8s
import (
"context"
"errors"
"fmt"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -63,7 +64,13 @@ func TestK8sDeploymentCacheMergeAndBroadcastList(t *testing.T) {
context.Background(),
)
defer done()
cache, err := NewK8sDeploymentCache(ctx, logr.Discard(), newFakeDeploymentListerWatcher())
lggr := logr.Discard()
listerWatcher := newFakeDeploymentListerWatcher()
cache, err := NewK8sDeploymentCache(
ctx,
lggr,
listerWatcher,
)
r.NoError(err)
deplList := &appsv1.DeploymentList{
Items: []appsv1.Deployment{
Expand Down Expand Up @@ -176,6 +183,8 @@ func callMergeAndBcast(
cache.mergeAndBroadcastList(deplList)
}()

// after we know all events were received,
// make sure we got the ones we expected
wg.Wait()
return evts
}
Expand Down Expand Up @@ -252,24 +261,55 @@ func TestK8sDeploymentCachePeriodicFetch(t *testing.T) {
// add the deployment without sending an event, to make sure that
// the internal loop won't receive any events and will rely on
// just the ticker
lw.addDeployment(*depl, false)
lw.addDeployment(*depl)
time.Sleep(tickDur * 2)
// make sure that the deployment was fetched
fetched, err := cache.Get(depl.ObjectMeta.Name)
r.NoError(err)
r.Equal(*depl, fetched)
r.Equal(0, len(lw.getWatcher().getEvents()))
r.Equal(0, len(lw.row.getEvents()))
}

func signalSent(name string, sig <-chan struct{}) error {
const dur = 100 * time.Millisecond
select {
case <-time.After(dur):
return fmt.Errorf("%s signal not called within %s", name, dur)
case <-sig:
}
return nil
}

func errSignalSent(name string, sig <-chan error) error {
const dur = 100 * time.Millisecond
select {
case <-time.After(dur):
return fmt.Errorf("%s signal not called within %s", name, dur)
case <-sig:
}
return nil
}

// test to make sure that the update loop tries to re-establish watch
// streams when they're broken
// test to make sure that the update loop tries to
// re-establish watch streams when they're broken
func TestK8sDeploymentCacheRewatch(t *testing.T) {
r := require.New(t)
ctx, done := context.WithCancel(
context.Background(),
)
defer done()
lw := newFakeDeploymentListerWatcher()
// channel that will receive when watch is called
watchCh := make(chan struct{})
watchCallNum := 0
lw.watchCB = func() {
select {
case watchCh <- struct{}{}:
case <-time.After(100 * time.Millisecond):
t.Fatalf("watch callback called but not acknowledged within 100ms for call num %d", watchCallNum+1)
}
watchCallNum++
}
cache, err := NewK8sDeploymentCache(ctx, logr.Discard(), lw)
r.NoError(err)

Expand All @@ -282,32 +322,32 @@ func TestK8sDeploymentCacheRewatch(t *testing.T) {
watcherErrCh <- cache.StartWatcher(ctx, logr.Discard(), tickDur)
}()

// wait 1/2 second to make sure the watcher goroutine can start up
// and doesn't return any errors
select {
case err := <-watcherErrCh:
r.NoError(err)
case <-time.After(500 * time.Millisecond):
}
// make sure the Watch() was called and that the watcher
// itself didn't error
r.NoError(signalSent("watch", watchCh))
r.Error(errSignalSent("watcher error", watcherErrCh))

// close all open watch channels after waiting a bit for the watcher to start.
// in this call we're allowing channels to be reopened
lw.getWatcher().closeOpenChans(true)
time.Sleep(500 * time.Millisecond)
// close all open watch watch channels, then make sure Watch()
// was called again and there was no watch error
lw.row.Stop()
r.NoError(signalSent("watch", watchCh))
r.Error(errSignalSent("watcher error", watcherErrCh))

// add the deployment and send an event.
depl := newDeployment("testns", "testdepl", "testing", nil, nil, nil, core.PullAlways)
lw.addDeployment(*depl, true)
// sleep for a bit to make sure the watcher has had time to re-establish the watch
// and receive the event
time.Sleep(500 * time.Millisecond)
r.NoError(lw.addDeploymentAndSendEvent(*depl, 100*time.Millisecond))

// make sure that watch wasn't called a third time
// and that the watcher itself didn't error
r.Error(signalSent("watch", watchCh))
r.Error(errSignalSent("watcher error", watcherErrCh))

// make sure that an event came through
r.Equal(1, len(lw.getWatcher().getEvents()))
r.Equal(1, len(lw.row.getEvents()))
// make sure that the deployment was fetched
fetched, err := cache.Get(depl.ObjectMeta.Name)
r.NoError(err)
r.Equal(*depl, fetched)

}

// test to make sure that when the context is closed, the deployment
Expand Down
132 changes: 72 additions & 60 deletions pkg/k8s/watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,108 +2,116 @@ package k8s

import (
"context"
"errors"
"fmt"
"sync"
"time"

"github.com/google/uuid"
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
)

// closeableWatcher is a watch.Interface that can be closed
// and optionally reopened
type closeableWatcher struct {
uid uuid.UUID
mut *sync.RWMutex
ch chan watch.Event
events []watch.Event
closed bool
allowReopen bool
// reopeningWatcher is a watch.Interface that, when closed, is
// reopened immediately upon the next call to ResultChan().
//
// it's similar to the RaceFreeFakeWatcher in k8s.io/apimachinery/pkg/watch
// except that requires you to manually call a Reset() function to
// reopen the watcher. The automatic reopen functionality is
// necessary.
type reopeningWatcher struct {
mut *sync.RWMutex
ch chan watch.Event
events []watch.Event
closed bool
}

func newCloseableWatcher() *closeableWatcher {
return &closeableWatcher{
uid: uuid.New(),
mut: new(sync.RWMutex),
ch: make(chan watch.Event),
closed: false,
allowReopen: true,
func newReopeningWatcher() *reopeningWatcher {
return &reopeningWatcher{
mut: new(sync.RWMutex),
ch: make(chan watch.Event),
events: nil,
closed: false,
}
}

func (w *closeableWatcher) String() string {
return fmt.Sprintf(
"closeableWatcher %s. events = %v",
w.uid.String(),
w.events,
)
}

func (w *closeableWatcher) Stop() {
func (w *reopeningWatcher) Stop() {
w.mut.RLock()
defer w.mut.RUnlock()
close(w.ch)
if !w.closed {
close(w.ch)
w.closed = true
}
}

func (w *closeableWatcher) ResultChan() <-chan watch.Event {
func (w *reopeningWatcher) ResultChan() <-chan watch.Event {
w.mut.Lock()
defer w.mut.Unlock()
if w.closed && w.allowReopen {
if w.closed {
w.ch = make(chan watch.Event)
w.closed = false
}
return w.ch
}

func (w *closeableWatcher) closeOpenChans(allowReopen bool) {
w.mut.Lock()
defer w.mut.Unlock()
close(w.ch)
w.closed = true
w.allowReopen = allowReopen
}

func (w *closeableWatcher) Add(d *appsv1.Deployment) {
func (w *reopeningWatcher) Add(d *appsv1.Deployment, dur time.Duration) error {
w.mut.RLock()
defer w.mut.RUnlock()
if w.closed {
return errors.New("watcher is closed")
}
evt := watch.Event{
Type: watch.Added,
Object: d,
}
w.ch <- evt
select {
case w.ch <- evt:
case <-time.After(dur):
return fmt.Errorf("couldn't send ADD event within %s", dur)
}
w.events = append(w.events, evt)
return nil
}

func (w *closeableWatcher) Modify(d *appsv1.Deployment) {
func (w *reopeningWatcher) Modify(d *appsv1.Deployment, dur time.Duration) error {
w.mut.RLock()
defer w.mut.RUnlock()
if w.closed {
return errors.New("watcher is closed")
}
evt := watch.Event{
Type: watch.Modified,
Object: d,
}
w.ch <- evt
select {
case w.ch <- evt:
case <-time.After(dur):
return fmt.Errorf("couldn't send MODIFY event within %s", dur)
}
w.events = append(w.events, evt)
return nil
}

func (w *closeableWatcher) getEvents() []watch.Event {
func (w *reopeningWatcher) getEvents() []watch.Event {
w.mut.RLock()
defer w.mut.RUnlock()
return w.events
}

type fakeDeploymentListerWatcher struct {
mut *sync.RWMutex
watcher *closeableWatcher
row *reopeningWatcher
items map[string]appsv1.Deployment
watchCB func()
}

var _ DeploymentListerWatcher = &fakeDeploymentListerWatcher{}

func newFakeDeploymentListerWatcher() *fakeDeploymentListerWatcher {
w := newCloseableWatcher()
return &fakeDeploymentListerWatcher{
mut: new(sync.RWMutex),
watcher: w,
items: map[string]appsv1.Deployment{},
mut: new(sync.RWMutex),
row: newReopeningWatcher(),
items: map[string]appsv1.Deployment{},
}
}

Expand All @@ -118,27 +126,31 @@ func (lw *fakeDeploymentListerWatcher) List(ctx context.Context, options metav1.
}

func (lw *fakeDeploymentListerWatcher) Watch(ctx context.Context, options metav1.ListOptions) (watch.Interface, error) {
return lw.watcher, nil
}

func (lw *fakeDeploymentListerWatcher) getWatcher() *closeableWatcher {
return lw.watcher
if lw.watchCB != nil {
lw.watchCB()
}
return lw.row, nil
}

// addDeployment adds d to the internal deployments list, or overwrites it if it
// already existed. in either case, it will be returned by a future call to List.
// in the former case, an ADD event if sent if sendEvent is true, and in the latter
// case, a MODIFY event is sent if sendEvent is true
func (lw *fakeDeploymentListerWatcher) addDeployment(d appsv1.Deployment, sendEvent bool) {
func (lw *fakeDeploymentListerWatcher) addDeployment(d appsv1.Deployment) {
lw.mut.Lock()
defer lw.mut.Unlock()
_, existed := lw.items[d.ObjectMeta.Name]
lw.items[d.ObjectMeta.Name] = d
if sendEvent {
if existed {
lw.watcher.Modify(&d)
} else {
lw.watcher.Add(&d)
}
}

func (lw *fakeDeploymentListerWatcher) addDeploymentAndSendEvent(d appsv1.Deployment, waitDur time.Duration) error {
lw.mut.RLock()
defer lw.mut.RUnlock()
_, existed := lw.items[d.GetName()]
lw.items[d.GetName()] = d
if existed {
return lw.row.Modify(&d, waitDur)
} else {
return lw.row.Add(&d, waitDur)
}

}

0 comments on commit 8ebbabc

Please sign in to comment.