Skip to content
This repository has been archived by the owner on Mar 18, 2024. It is now read-only.

implement polling to periodically refresh the backend services #4

Merged
merged 1 commit into from
Dec 9, 2015
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"regexp"
"strings"
"sync"
"time"

"github.com/gorilla/rpc/v2"
"github.com/levenlabs/gatewayrpc"
Expand All @@ -25,7 +26,8 @@ import (
type remoteService struct {
gatewayrpc.Service
*url.URL
srv bool
origURL string
srv bool
}

// Request contains all the data about an incoming request which is currently
Expand Down Expand Up @@ -54,6 +56,7 @@ type Gateway struct {
services map[string]remoteService
mutex sync.RWMutex
codecs map[string]rpc.Codec
poll <-chan time.Time

// BackupHandler, if not nil, will be used to handle the requests which
// don't have a corresponding backend service to forward to (based on their
Expand All @@ -77,6 +80,7 @@ func NewGateway() Gateway {
return Gateway{
services: map[string]remoteService{},
codecs: map[string]rpc.Codec{},
poll: time.Tick(30 * time.Second),
}
}

Expand Down Expand Up @@ -123,12 +127,32 @@ func (g Gateway) AddURL(u string) error {
g.services[srv.Name] = remoteService{
Service: srv,
URL: uu,
origURL: u,
srv: srvOK,
}
}
return nil
}

func (g Gateway) refreshURLs() {
llog.Debug("refreshing urls")
g.mutex.RLock()
srvs := make([]remoteService, 0, len(g.services))
for _, srv := range g.services {
srvs = append(srvs, srv)
}
g.mutex.RUnlock()

for _, srv := range srvs {
if err := g.AddURL(srv.origURL); err != nil {
llog.Error("error refreshing url", llog.KV{
"url": srv.origURL,
"err": err,
})
}
}
}

// RegisterCodec is used to register an encoder/decoder which will operate on
// requests with the given contentType
func (g Gateway) RegisterCodec(codec rpc.Codec, contentType string) {
Expand Down Expand Up @@ -188,6 +212,17 @@ type serverRequest struct {
}

func (g Gateway) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// Periodically we want to refresh the services that gateway knows about. We
// do it in a new goroutine so we don't block this actual request. We don't
// want to simply have a dedicated go routine looping over the poll channel
// to do this because having an http.Handler spawn up its own routine that's
// making requests and doing stuff is kind of unexpected behavior
select {
case <-g.poll:
go g.refreshURLs()
default:
}

kv := rpcutil.RequestKV(r)
llog.Debug("ServeHTTP called", kv)

Expand Down