Skip to content

Commit

Permalink
support scanning more than the default Namespace
Browse files Browse the repository at this point in the history
This change allows ESM to scan all accessable namespaces for services to
monitor. By "accessable" we mean what Consul's ACLs/Tokens are
configured to allow.
  • Loading branch information
eikenb committed Sep 17, 2021
1 parent 4b13779 commit b9f0e68
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 11 deletions.
70 changes: 59 additions & 11 deletions leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"reflect"
"sort"
"strings"
"time"

"github.com/hashicorp/consul/api"
Expand Down Expand Up @@ -256,22 +257,69 @@ func (a *Agent) watchServiceInstances(instanceCh chan []*api.ServiceEntry, stopC
case <-stopCh:
return
case <-time.After(retryTime / 10):
// Sleep here to limit how much load we put on the Consul servers. We can
// wait a lot less than the normal retry time here because the ESM service instance
// list is relatively small and cheap to query.
// Sleep here to limit how much load we put on the Consul servers.
// We can wait a lot less than the normal retry time here because
// the ESM service instance list is relatively small and cheap to
// query.
}

healthyInstances, meta, err := a.client.Health().Service(a.config.Service, a.config.Tag, true, opts)
switch healthyInstances, err := a.getServiceInstances(opts); err {
case nil:
instanceCh <- healthyInstances
default:
a.logger.Warn("[WARN] Error querying for health check info",
"error", err)
continue // not needed, but nice to be explicit
}
}
}

// getServiceInstances retuns a list of services with a 'passing' (healthy) state.
// It loops over all available namespaces to get instances from each.
func (a *Agent) getServiceInstances(opts *api.QueryOptions) ([]*api.ServiceEntry, error) {
var healthyInstances []*api.ServiceEntry
var meta *api.QueryMeta

namespaces, err := namespacesList(a.client)
if err != nil {
return nil, err
}

for _, ns := range namespaces {
if ns.Name != "" {
a.logger.Info("checking namespaces for services", "name", ns.Name)
}
opts.Namespace = ns.Name
healthy, m, err := a.client.Health().Service(a.config.Service,
a.config.Tag, true, opts)
if err != nil {
a.logger.Warn("[WARN] Error querying for health check info", "error", err)
continue
return nil, err
}
sort.Slice(healthyInstances, func(a, b int) bool {
return healthyInstances[a].Service.ID < healthyInstances[b].Service.ID
})
meta = m // keep last good meta
for _, h := range healthy {
healthyInstances = append(healthyInstances, h)
}
}
opts.WaitIndex = meta.LastIndex

opts.WaitIndex = meta.LastIndex
sort.Slice(healthyInstances, func(a, b int) bool {
return healthyInstances[a].Service.ID < healthyInstances[b].Service.ID
})

return healthyInstances, nil
}

instanceCh <- healthyInstances
// namespacesList returns a list of all accessable namespaces.
// Returns namespace "" (none) if none found for consul OSS compatibility.
func namespacesList(client *api.Client) ([]*api.Namespace, error) {
ossErr := "Unexpected response code: 404" // error snippet OSS consul returns
namespaces, _, err := client.Namespaces().List(nil)
switch {
case err == nil:
case strings.Contains(err.Error(), ossErr):
namespaces = []*api.Namespace{{Name: ""}}
case err != nil: // default, but more explicit
return nil, err
}
return namespaces, nil
}
107 changes: 107 additions & 0 deletions leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ package main
import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"reflect"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -417,3 +420,107 @@ func TestLeader_nodeLists(t *testing.T) {
len(health), len(ping))
}
}

const namespacesJSON = `[
{ "Name": "default", "Description": "Builtin Default Namespace" },
{ "Name": "foo", "Description": "foo" }
]`

const healthserviceJSON = `[
{ "Service": {"ID": "one", "Namespace": "foo" } },
{ "Service": {"ID": "two", "Namespace": "default" } }
]`

func Test_namespacesList(t *testing.T) {
testcase := ""
ts := httptest.NewServer(http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
switch testcase {
case "ent":
fmt.Fprint(w, namespacesJSON)
case "oss":
http.NotFound(w, r)
case "err":
http.Error(w, "use a french-press", http.StatusTeapot)
default:
t.Fatal("unknown test case:", testcase)
}
}))
defer ts.Close()

//client, err := api.NewClient(&api.Config{Address: "127.0.0.1:8500"})
client, err := api.NewClient(&api.Config{Address: ts.URL})
if err != nil {
t.Fatal(err)
}
// simulate enterprise consul
testcase = "ent"
nss, err := namespacesList(client)
if err != nil {
t.Fatal("unexpected error:", err)
}
if len(nss) != 2 || nss[0].Name != "default" || nss[1].Name != "foo" {
t.Fatalf("bad value for namespace names: %#v\n", nss)
}
// simulate oss consul
testcase = "oss"
nss, err = namespacesList(client)
if err != nil {
t.Fatal("unexpected error:", err)
}
if len(nss) != 1 || nss[0].Name != "" {
t.Fatalf("bad value for namespace names: %#v\n", len(nss))
}
// simulate other random error
testcase = "err"
nss, err = namespacesList(client)
if err == nil {
t.Fatal("unexpected error:", err)
}
if nss != nil {
t.Fatalf("bad value for namespace names: %#v\n", len(nss))
}
}

func Test_getServiceInstances(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
uri := r.RequestURI
switch { // ignore anything that doesn't require a return body
case strings.Contains(uri, "status/leader"):
fmt.Fprint(w, `"127.0.0.1"`)
case strings.Contains(uri, "namespace"):
fmt.Fprint(w, namespacesJSON)
case strings.Contains(uri, "health/service"):
fmt.Fprint(w, healthserviceJSON)
}
}))
defer ts.Close()

agent := testAgent(t, func(c *Config) {
c.HTTPAddr = ts.URL
c.InstanceID = "test-agent"
})
defer agent.Shutdown()

opts := &api.QueryOptions{}
serviceInstances, err := agent.getServiceInstances(opts)
if err != nil {
t.Fatal(err)
}
// 4 because test data has 2 namespaces each with 2 services
if len(serviceInstances) != 4 {
t.Fatal("Wrong number of services", len(serviceInstances))
}
for _, si := range serviceInstances {
sv := si.Service
switch {
case sv.ID == "one" && sv.Namespace == "foo":
case sv.ID == "one" && sv.Namespace == "default":
case sv.ID == "two" && sv.Namespace == "foo":
case sv.ID == "two" && sv.Namespace == "default":
default:
t.Fatalf("Unknown service: %#v\n", si.Service)
}
}
}

0 comments on commit b9f0e68

Please sign in to comment.