Skip to content

Commit

Permalink
Handle watch errors
Browse files Browse the repository at this point in the history
If a watch error is received we might have lost events,
so reset local storages and reconnect to receive
everything.

Watch errors have been seen when reconnecting to a
restarted API server.
  • Loading branch information
jsoriano committed Jun 26, 2017
1 parent 226200b commit 6854982
Showing 1 changed file with 23 additions and 14 deletions.
37 changes: 23 additions & 14 deletions kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api"
"k8s.io/client-go/pkg/api/meta"
"k8s.io/client-go/pkg/api/unversioned"
"k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/pkg/util/wait"
"k8s.io/client-go/pkg/watch"
Expand Down Expand Up @@ -285,19 +286,16 @@ func (c *KubernetesClient) Watch() error {
})
go updater.Run()

c.nodeStore = NodeStore{NewLocalStore()}
c.serviceStore = ServiceStore{NewLocalStore()}
c.endpointsStore = EndpointsStore{NewLocalStore()}
resetStores := func() {
isFirstUpdate = true
c.nodeStore = NodeStore{NewLocalStore()}
c.serviceStore = ServiceStore{NewLocalStore()}
c.endpointsStore = EndpointsStore{NewLocalStore()}
c.lastResourceVersion = ""
}
resetStores()

updateStore := func(s Store, e watch.Event) {
if e.Object == nil {
return
}
defer func() {
accessor, _ := meta.Accessor(e.Object)
c.lastResourceVersion = accessor.GetResourceVersion()
}()

switch e.Type {
case watch.Added:
s.Update(e.Object)
Expand All @@ -317,6 +315,18 @@ func (c *KubernetesClient) Watch() error {
}
case watch.Deleted:
s.Delete(e.Object)
case watch.Error:
status, ok := e.Object.(*unversioned.Status)
if ok {
log.Printf("Error received while watching: %s", status.Message)
}
log.Println("Local caches will be rebuilt")
resetStores()
return
}
accessor, _ := meta.Accessor(e.Object)
if accessor != nil {
c.lastResourceVersion = accessor.GetResourceVersion()
}
updater.Signal()
}
Expand All @@ -338,14 +348,13 @@ func (c *KubernetesClient) Watch() error {
c.eventForwarder(e)
}

if !more {
c.stopWatchers()
if !more || e.Type == watch.Error {
log.Printf("Connection closed, trying to reconnect...")
timeout := time.Duration(reconnectTimeoutSeconds) * time.Second
err := wait.Poll(5*time.Second, timeout, func() (bool, error) {
err := c.connect()
if err != nil {
log.Println(err)
log.Println("Couldn't reconnect: ", err)
}
return err == nil, nil
})
Expand Down

0 comments on commit 6854982

Please sign in to comment.