Skip to content
This repository has been archived by the owner on Mar 11, 2021. It is now read-only.

Commit

Permalink
Add auto refresh of Clusters API (#591)
Browse files Browse the repository at this point in the history
Refactored code to load the clusters and keep them
in cache: done explicitly at `cluster.Service` initialization
so the cache is always pre-filled, then use a `sync.RWMutex`
to let the readers access the data concurrently, but block access
when the writer (running in a separate go routine) needs to
change the values

Also, use/return `[]cluster.Cluster` instead of `[]*cluster.Cluster`
when getting all clusters, and return `cluster.Cluster` when resolving

Fixes #515

Signed-off-by: Xavier Coulon <[email protected]>
  • Loading branch information
aslakknutsen authored May 5, 2018
1 parent b83a946 commit 5714dfd
Show file tree
Hide file tree
Showing 18 changed files with 409 additions and 95 deletions.
5 changes: 3 additions & 2 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ ignored = [

[[constraint]]
name = "github.com/dnaeon/go-vcr"
revision= "9d71b8a6df86e00127f96bc8dabc09856ab8afdb"
#revision= "9d71b8a6df86e00127f96bc8dabc09856ab8afdb"
source= "https://github.com/xcoulon/go-vcr/"
revision= "fd097d581a47517ee36686adfd3153d6f8eca367"

[[constraint]]
name = "github.com/fabric8-services/fabric8-auth"
Expand Down
5 changes: 2 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,7 @@ app/controllers.go: $(DESIGNS) $(GOAGEN_BIN) $(VENDOR_DIR)
$(GOAGEN_BIN) client -d ${PACKAGE_NAME}/${DESIGN_DIR}
$(GOAGEN_BIN) swagger -d ${PACKAGE_NAME}/${DESIGN_DIR}
$(GOAGEN_BIN) client -d github.com/fabric8-services/fabric8-auth/design --notool --out auth --pkg client



.PHONY: migrate-database
## Compiles the server and runs the database migration with it
migrate-database: $(BINARY_SERVER_BIN)
Expand Down Expand Up @@ -289,4 +288,4 @@ fast-docker: bin/docker/fabric8-tenant-linux
docker build -t fabric8/fabric8-tenant:dev bin/docker

kube-redeploy: fast-docker
kubectl delete pod -l service=init-tenant
kubectl delete pod -l service=init-tenant
5 changes: 5 additions & 0 deletions OpenShiftTemplate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ objects:
configMapKeyRef:
name: f8tenant
key: auth.url
- name: F8_CLUSTER_REFRESH_DELAY
valueFrom:
configMapKeyRef:
name: f8tenant
key: cluster.refresh.delay
- name: F8_TEMPLATE_RECOMMENDER_EXTERNAL_NAME
valueFrom:
configMapKeyRef:
Expand Down
18 changes: 14 additions & 4 deletions cluster/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,29 @@ package cluster
import (
"context"
"fmt"

"github.com/fabric8-services/fabric8-wit/log"
errs "github.com/pkg/errors"
)

// Resolve a func to resolve a cluster
type Resolve func(ctx context.Context, target string) (*Cluster, error)
type Resolve func(ctx context.Context, target string) (Cluster, error)

// NewResolve returns a new Cluster
func NewResolve(clusters []*Cluster) Resolve {
return func(ctx context.Context, target string) (*Cluster, error) {
func NewResolve(clusterService Service) Resolve {
return func(ctx context.Context, target string) (Cluster, error) {
clusters, err := clusterService.GetClusters(context.Background())
if err != nil {
log.Panic(nil, map[string]interface{}{
"err": err,
}, "unable to resolve clusters")
return Cluster{}, errs.Wrapf(err, "unable to resolve cluster")
}
for _, cluster := range clusters {
if cleanURL(target) == cleanURL(cluster.APIURL) {
return cluster, nil
}
}
return nil, fmt.Errorf("unable to resolve cluster")
return Cluster{}, fmt.Errorf("unable to resolve cluster")
}
}
106 changes: 88 additions & 18 deletions cluster/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@ import (
"context"
"io/ioutil"
"strings"
"sync"
"time"

"github.com/fabric8-services/fabric8-tenant/auth"
authclient "github.com/fabric8-services/fabric8-tenant/auth/client"
"github.com/fabric8-services/fabric8-tenant/configuration"
"github.com/fabric8-services/fabric8-tenant/openshift"
"github.com/fabric8-services/fabric8-tenant/token"
"github.com/fabric8-services/fabric8-wit/log"
goaclient "github.com/goadesign/goa/client"
"github.com/pkg/errors"
)
Expand All @@ -36,26 +39,79 @@ func cleanURL(url string) string {

// Service the interface for the cluster service
type Service interface {
GetClusters(context.Context) ([]*Cluster, error)
GetClusters(context.Context) ([]Cluster, error)
Stats() Stats
Stop()
}

// Stats some stats about the cached data, for verifying during the tests, at first.
type Stats struct {
CacheHits int
CacheMissed int
CacheRefreshes int
}

// NewService creates a Resolver that rely on the Auth service to retrieve tokens
func NewService(authURL string, serviceToken string, resolveToken token.Resolve, decode token.Decode, options ...configuration.HTTPClientOption) Service {
return &clusterService{authURL: authURL, serviceToken: serviceToken, resolveToken: resolveToken, decode: decode, clientOptions: options}
func NewService(authURL string, clustersRefreshDelay time.Duration, serviceToken string, resolveToken token.Resolve, decode token.Decode, options ...configuration.HTTPClientOption) (Service, error) {
// setup a ticker to refresh the cluster cache at regular intervals
cacheRefresher := time.NewTicker(clustersRefreshDelay)
s := &clusterService{
authURL: authURL,
serviceToken: serviceToken,
resolveToken: resolveToken,
decode: decode,
cacheRefresher: cacheRefresher,
cacheRefreshLock: &sync.RWMutex{},
clientOptions: options}
// immediately load the list of clusters before returning
err := s.refreshCache(context.Background())
if err != nil {
log.Error(nil, map[string]interface{}{"error_message": err}, "failed to load the list of clusters during service initialization")
return nil, err
}
go func() {
for range cacheRefresher.C { // while the `cacheRefresh` ticker is running
s.refreshCache(context.Background())
}
}()
return s, nil
}

type clusterService struct {
authURL string
clientOptions []configuration.HTTPClientOption
serviceToken string
resolveToken token.Resolve
decode token.Decode
authURL string
serviceToken string
resolveToken token.Resolve
cacheRefresher *time.Ticker
cacheRefreshLock *sync.RWMutex
cacheHits int
cacheMissed int
cacheRefreshes int
cachedClusters []Cluster
decode token.Decode
clientOptions []configuration.HTTPClientOption
}

func (s *clusterService) GetClusters(ctx context.Context) ([]*Cluster, error) {
func (s *clusterService) GetClusters(ctx context.Context) ([]Cluster, error) {
s.cacheRefreshLock.RLock()
log.Debug(ctx, nil, "read lock acquired")
clusters := make([]Cluster, len(s.cachedClusters))
copy(clusters, s.cachedClusters)
s.cacheRefreshLock.RUnlock()
log.Debug(ctx, nil, "read lock released")
return clusters, nil // return a copy of the cached clusters
}

func (s *clusterService) Stop() {
s.cacheRefresher.Stop()
}

func (s *clusterService) refreshCache(ctx context.Context) error {
log.Debug(ctx, nil, "refreshing cached list of clusters...")
defer log.Debug(ctx, nil, "refreshed cached list of clusters.")
s.cacheRefreshes = s.cacheRefreshes + 1
client, err := auth.NewClient(s.authURL, s.serviceToken, s.clientOptions...)
if err != nil {
return nil, err
return err
}
client.SetJWTSigner(
&goaclient.JWTSigner{
Expand All @@ -66,7 +122,7 @@ func (s *clusterService) GetClusters(ctx context.Context) ([]*Cluster, error) {

res, err := client.ShowClusters(ctx, authclient.ShowClustersPath())
if err != nil {
return nil, errors.Wrapf(err, "error while doing the request")
return errors.Wrapf(err, "error while doing the request")
}
defer func() {
ioutil.ReadAll(res.Body)
Expand All @@ -75,28 +131,28 @@ func (s *clusterService) GetClusters(ctx context.Context) ([]*Cluster, error) {

validationerror := auth.ValidateResponse(ctx, client, res)
if validationerror != nil {
return nil, errors.Wrapf(validationerror, "error from server %q", s.authURL)
return errors.Wrapf(validationerror, "error from server %q", s.authURL)
}

clusters, err := client.DecodeClusterList(res)
if err != nil {
return nil, errors.Wrapf(err, "error from server %q", s.authURL)
return errors.Wrapf(err, "error from server %q", s.authURL)
}

var cls []*Cluster
var cls []Cluster
for _, cluster := range clusters.Data {
// resolve/obtain the cluster token
clusterUser, clusterToken, err := s.resolveToken(ctx, cluster.APIURL, s.serviceToken, false, s.decode) // can't use "forcePull=true" to validate the `tenant service account` token since it's encrypted on auth
if err != nil {
return nil, errors.Wrapf(err, "Unable to resolve token for cluster %v", cluster.APIURL)
return errors.Wrapf(err, "Unable to resolve token for cluster %v", cluster.APIURL)
}
// verify the token
_, err = openshift.WhoAmI(ctx, cluster.APIURL, clusterToken, s.clientOptions...)
if err != nil {
return nil, errors.Wrapf(err, "token retrieved for cluster %v is invalid", cluster.APIURL)
return errors.Wrapf(err, "token retrieved for cluster %v is invalid", cluster.APIURL)
}

cls = append(cls, &Cluster{
cls = append(cls, Cluster{
APIURL: cluster.APIURL,
AppDNS: cluster.AppDNS,
ConsoleURL: cluster.ConsoleURL,
Expand All @@ -108,5 +164,19 @@ func (s *clusterService) GetClusters(ctx context.Context) ([]*Cluster, error) {
Token: clusterToken,
})
}
return cls, nil
// lock to avoid concurrent writes
s.cacheRefreshLock.Lock()
log.Debug(ctx, nil, "write lock acquired")
s.cachedClusters = cls // only replace at the end of this function and within a Write lock scope, i.e., when all retrieved clusters have been processed
s.cacheRefreshLock.Unlock()
log.Debug(ctx, nil, "write lock released")
return nil
}

func (s *clusterService) Stats() Stats {
return Stats{
CacheHits: s.cacheHits,
CacheMissed: s.cacheMissed,
CacheRefreshes: s.cacheRefreshes,
}
}
Loading

0 comments on commit 5714dfd

Please sign in to comment.