diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..4d1f1fd --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,13 @@ +{ + "version": "0.2.0", + "configurations": [ + { + "name": "Launch Consul ESM", + "type": "go", + "request": "launch", + "mode": "auto", + "program": "${workspaceFolder}", + "args": ["--config-file=${workspaceFolder}/config.hcl"] + } + ] +} \ No newline at end of file diff --git a/README.md b/README.md index 39ac897..86cc1e6 100644 --- a/README.md +++ b/README.md @@ -215,6 +215,9 @@ token = "" // The Consul datacenter to use. datacenter = "dc1" +// The target Admin Partition to use. +partition = "" + // The CA file to use for talking to Consul over TLS. Can also be provided // though the CONSUL_CACERT environment variable. ca_file = "" diff --git a/agent.go b/agent.go index 7c30a45..1a9daef 100644 --- a/agent.go +++ b/agent.go @@ -165,7 +165,7 @@ func (a *Agent) Run() error { wg.Wait() // Clean up. - if err := a.client.Agent().ServiceDeregister(a.serviceID()); err != nil { + if err := a.client.Agent().ServiceDeregisterOpts(a.serviceID(), a.ConsulQueryOption()); err != nil { a.logger.Warn("Failed to deregister service", "error", err) } @@ -200,7 +200,7 @@ func (e *alreadyExistsError) Error() string { // register is used to register this agent with Consul service discovery. func (a *Agent) register() error { // agent ids need to be unique to disambiguate different instances on same host - if existing, _, _ := a.client.Agent().Service(a.serviceID(), nil); existing != nil { + if existing, _, _ := a.client.Agent().Service(a.serviceID(), a.ConsulQueryOption()); existing != nil { return &alreadyExistsError{a.serviceID()} } @@ -209,6 +209,10 @@ func (a *Agent) register() error { Name: a.config.Service, Meta: a.serviceMeta(), } + a.HasPartition(func(partition string) { + service.Partition = partition + }) + if a.config.Tag != "" { service.Tags = []string{a.config.Tag} } @@ -279,7 +283,7 @@ func (a *Agent) runRegister() { return case <-time.After(agentTTL): - services, err := a.client.Agent().Services() + services, err := a.client.Agent().ServicesWithFilterOpts("", a.ConsulQueryOption()) if err != nil { a.logger.Error("Failed to check services (will retry)", "error", err) time.Sleep(retryTime) @@ -323,7 +327,10 @@ REGISTER: DeregisterCriticalServiceAfter: deregisterTime.String(), }, } - if err := a.client.Agent().CheckRegister(check); err != nil { + a.HasPartition(func(partition string) { + check.Partition = partition + }) + if err := a.client.Agent().CheckRegisterOpts(check, a.ConsulQueryOption()); err != nil { a.logger.Error("Failed to register TTL check (will retry)", "error", err) time.Sleep(retryTime) goto REGISTER @@ -336,7 +343,7 @@ REGISTER: return case <-time.After(agentTTL / 2): - if err := a.client.Agent().UpdateTTL(ttlID, "", api.HealthPassing); err != nil { + if err := a.client.Agent().UpdateTTLOpts(ttlID, "", api.HealthPassing, a.ConsulQueryOption()); err != nil { a.logger.Error("Failed to refresh agent TTL check (will reregister)", "error", err) time.Sleep(retryTime) goto REGISTER @@ -501,17 +508,21 @@ func (a *Agent) watchHealthChecks(nodeListCh chan map[string]bool) { } func (a *Agent) getHealthChecks(waitIndex uint64, nodes map[string]bool) (api.HealthChecks, uint64) { - namespaces, err := namespacesList(a.client) + namespaces, err := namespacesList(a.client, a.config) if err != nil { a.logger.Warn("Error getting namespaces, falling back to default namespace", "error", err) namespaces = []*api.Namespace{{Name: ""}} } ctx, cancelFunc := context.WithCancel(context.Background()) - opts := (&api.QueryOptions{ + opts := &api.QueryOptions{ NodeMeta: a.config.NodeMeta, WaitIndex: waitIndex, - }).WithContext(ctx) + } + opts = opts.WithContext(ctx) + a.HasPartition(func(partition string) { + opts.Partition = partition + }) defer cancelFunc() go func() { select { @@ -595,7 +606,7 @@ VERIFYCONSULSERVER: } // Fetch server versions - svs, _, err := a.client.Catalog().Service("consul", "", nil) + svs, _, err := a.client.Catalog().Service("consul", "", a.ConsulQueryOption()) if err != nil { if strings.Contains(err.Error(), "429") { // 429 is a warning that something is unhealthy. This may occur when ESM @@ -635,3 +646,37 @@ VERIFYCONSULSERVER: a.logger.Debug("Consul agent and all servers are running compatible versions with ESM") return nil } + +// PartitionOrEmpty returns the partition if it exists, otherwise returns an empty string. +func (a *Agent) PartitionOrEmpty() string { + if a.config == nil || a.config.Partition == "" { + return "" + } + return a.config.Partition +} + +// HasPartition checks if the partition is valid and calls the callback with the partition if it has any. +func (a *Agent) HasPartition(callback func(partition string)) { + partition := a.PartitionOrEmpty() + + if partition == "" || strings.ToLower(partition) == "default" { + // Ignore empty or default partitions + return + } + + callback(a.config.Partition) +} + +// ConsulQueryOption constructs and returns a new api.QueryOptions object. +// If the Agent has a valid partition, it sets the partition in the QueryOptions. +// +// Returns: +// *api.QueryOptions: A new QueryOptions object with the partition set if applicable. + +func (a *Agent) ConsulQueryOption() *api.QueryOptions { + opts := &api.QueryOptions{} + a.HasPartition(func(partition string) { + opts.Partition = partition + }) + return opts +} diff --git a/agent_test.go b/agent_test.go index 84f4789..298f8f4 100644 --- a/agent_test.go +++ b/agent_test.go @@ -15,6 +15,7 @@ import ( "time" "github.com/hashicorp/go-hclog" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/hashicorp/consul/api" @@ -36,7 +37,6 @@ func testAgent(t *testing.T, cb func(*Config)) *Agent { if cb != nil { cb(conf) } - agent, err := NewAgent(conf, logger) if err != nil { t.Fatal(err) @@ -570,3 +570,175 @@ func TestAgent_getHealthChecks(t *testing.T) { } }) } + +func TestAgent_PartitionOrEmpty(t *testing.T) { + conf, err := DefaultConfig() + if err != nil { + t.Fatal(err) + } + + cases := []struct { + name, partition, expected string + }{ + {"No partition", "", ""}, + {"default partition", "default", "default"}, + {"admin partition", "admin", "admin"}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + conf.Partition = tc.partition + + agent := &Agent{ + config: conf, + } + + assert.Equal(t, tc.expected, agent.PartitionOrEmpty()) + }) + } +} + +func TestAgent_getHealthChecksWithPartition(t *testing.T) { + testPartition := "test-partition" + notUniqueInstanceID := "not-unique-instance-id" + partitionQueryParamKey := "partition" + t.Run("with-partition", func(t *testing.T) { + listener, err := net.Listen("tcp", ":0") + require.NoError(t, err) + port := listener.Addr().(*net.TCPAddr).Port + addr := fmt.Sprintf("127.0.0.1:%d", port) + testNs := map[string]bool{} + ts := httptest.NewUnstartedServer(http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + switch r.URL.EscapedPath() { + case "/v1/status/leader": + + assert.Equal(t, testPartition, r.URL.Query().Get(partitionQueryParamKey)) + fmt.Fprint(w, `"`+addr+`"`) + case "/v1/namespaces": + assert.Equal(t, testPartition, r.URL.Query().Get(partitionQueryParamKey)) + fmt.Fprint(w, testNamespaces()) + case "/v1/health/state/any": + assert.Equal(t, testPartition, r.URL.Query().Get(partitionQueryParamKey)) + namespace := r.URL.Query()["ns"][0] + testNs[namespace] = true + if namespace == "default" { + fmt.Fprint(w, "[]") + } + fmt.Fprint(w, testHealthChecks(r.URL.Query()["ns"][0])) + case "/v1/agent/service/consul-esm:not-unique-instance-id": + assert.Equal(t, testPartition, r.URL.Query().Get(partitionQueryParamKey)) + // write status 404, to tell the service is not registered and proceed with registration + w.WriteHeader(http.StatusNotFound) + case "/v1/agent/service/register": + assert.Equal(t, testPartition, r.URL.Query().Get(partitionQueryParamKey)) + var svc api.AgentServiceRegistration + err := json.NewDecoder(r.Body).Decode(&svc) + require.NoError(t, err) + assert.Equal(t, testPartition, svc.Partition) + w.WriteHeader(http.StatusOK) + case "/v1/kv/consul-esm/agents/consul-esm:not-unique-instance-id": + assert.Equal(t, testPartition, r.URL.Query().Get(partitionQueryParamKey)) + case "/v1/session/create": + assert.Equal(t, testPartition, r.URL.Query().Get(partitionQueryParamKey)) + case "/v1/agent/check/register": + assert.Equal(t, testPartition, r.URL.Query().Get(partitionQueryParamKey)) + case "/v1/agent/check/update/consul-esm:not-unique-instance-id:agent-ttl": + assert.Equal(t, testPartition, r.URL.Query().Get(partitionQueryParamKey)) + case "/v1/catalog/service/consul-esm": + assert.Equal(t, testPartition, r.URL.Query().Get(partitionQueryParamKey)) + case "/v1/agent/services": + assert.Equal(t, testPartition, r.URL.Query().Get(partitionQueryParamKey)) + + default: + t.Log("unhandled:", r.URL.EscapedPath()) + } + })) + ts.Listener = listener + ts.Start() + defer ts.Close() + + agent := testAgent(t, func(c *Config) { + c.HTTPAddr = addr + c.Tag = "test" + c.Partition = testPartition + c.InstanceID = notUniqueInstanceID + }) + defer agent.Shutdown() + + ourNodes := map[string]bool{"foo": true} + ourChecks, _ := agent.getHealthChecks(0, ourNodes) + if len(ourChecks) != 2 { + t.Error("should be 2 checks, got", len(ourChecks)) + } + ns1check := ourChecks[0] + ns2check := ourChecks[1] + if ns1check.CheckID != "ns1_svc_ck" { + t.Error("Wrong check id:", ns1check.CheckID) + } + if ns2check.CheckID != "ns2_svc_ck" { + t.Error("Wrong check id:", ns1check.CheckID) + } + + // test the state API is called for each namespace in an agent's partition + assert.Len(t, testNs, 3) + }) +} + +func TestAgent_HasPartition(t *testing.T) { + conf, err := DefaultConfig() + if err != nil { + t.Fatal(err) + } + + cases := []struct { + name, partition, expected string + }{ + {"No partition", "", ""}, + {"default partition", "default", ""}, + {"admin partition", "admin", "admin"}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + conf.Partition = tc.partition + + agent := &Agent{ + config: conf, + } + + actualPartition := "" + agent.HasPartition(func(partition string) { + actualPartition = partition + }) + + assert.Equal(t, tc.expected, actualPartition) + }) + } +} + +func TestAgent_ConsulQueryOptions(t *testing.T) { + conf, err := DefaultConfig() + if err != nil { + t.Fatal(err) + } + + cases := []struct { + name, partition, expected string + }{ + {"No partition", "", ""}, + {"default partition", "default", ""}, + {"admin partition", "admin", "admin"}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + conf.Partition = tc.partition + + agent := &Agent{ + config: conf, + } + + opts := agent.ConsulQueryOption() + + assert.Equal(t, tc.expected, opts.Partition) + }) + } +} diff --git a/config.go b/config.go index bf322ac..fbe704a 100644 --- a/config.go +++ b/config.go @@ -33,9 +33,10 @@ type Config struct { SyslogFacility string LogJSON bool - Service string - Tag string - KVPath string + Partition string + Service string + Tag string + KVPath string InstanceID string NodeMeta map[string]string @@ -84,6 +85,11 @@ func (c *Config) ClientConfig() *api.Config { if c.Datacenter != "" { conf.Datacenter = c.Datacenter } + + if c.Partition != "" { + conf.Partition = c.Partition + } + if c.CAFile != "" { conf.TLSConfig.CAFile = c.CAFile } @@ -127,6 +133,7 @@ func DefaultConfig() (*Config, error) { NodeReconnectTimeout: 72 * time.Hour, PingType: PingTypeUDP, DisableCoordinateUpdates: false, + Partition: "", }, nil } @@ -169,6 +176,7 @@ type HumanConfig struct { Tag flags.StringValue `mapstructure:"consul_service_tag"` KVPath flags.StringValue `mapstructure:"consul_kv_path"` NodeMeta []map[string]string `mapstructure:"external_node_meta"` + Partition flags.StringValue `mapstructure:"partition"` NodeReconnectTimeout flags.DurationValue `mapstructure:"node_reconnect_timeout"` NodeProbeInterval flags.DurationValue `mapstructure:"node_probe_interval"` @@ -461,6 +469,7 @@ func MergeConfig(dst *Config, src *HumanConfig) error { src.EnableSyslog.Merge(&dst.EnableSyslog) src.InstanceID.Merge(&dst.InstanceID) src.Service.Merge(&dst.Service) + src.Partition.Merge(&dst.Partition) src.Tag.Merge(&dst.Tag) src.KVPath.Merge(&dst.KVPath) if len(src.NodeMeta) == 1 { diff --git a/config_test.go b/config_test.go index 8e7f078..36748fc 100644 --- a/config_test.go +++ b/config_test.go @@ -309,6 +309,66 @@ func TestConvertTelemetry(t *testing.T) { } } +func TestPartition(t *testing.T) { + cases := []struct { + name string + config string + expectedConfig Config + expectError bool + }{ + { + "Partition not defined", + "", + Config{ + Partition: "", + }, + false, + }, + { + "Partition is empty", + "partition = \"\"", + Config{ + Partition: "", + }, + false, + }, + { + "Partition is default", + "partition = \"default\"", + Config{ + Partition: "default", + }, + false, + }, + { + "Partition is non-default", + "partition = \"admin\"", + Config{ + Partition: "admin", + }, + false, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + raw := bytes.NewBufferString(tc.config) + humanConfig, err := DecodeConfig(raw) + if tc.expectError { + assert.Error(t, err) + return + } + + result := &Config{} + MergeConfig(result, humanConfig) + + // comparing partition only string + assert.Equal(t, tc.expectedConfig.Partition, result.Partition) + assert.NoError(t, err) + }) + } +} + func stringPointer(s string) *string { if len(s) == 0 { return nil diff --git a/coordinate.go b/coordinate.go index 497f623..35a1588 100644 --- a/coordinate.go +++ b/coordinate.go @@ -170,12 +170,16 @@ func (a *Agent) updateHealthyNodeTxn(node *api.Node, kvClient *api.KV, key strin // If a critical node went back to passing, delete the KV entry for it. var ops api.TxnOps if kvPair != nil { + kvOps := &api.KVTxnOp{ + Verb: api.KVDeleteCAS, + Key: key, + Index: kvPair.ModifyIndex, + } + a.HasPartition(func(partition string) { + kvOps.Partition = partition + }) ops = append(ops, &api.TxnOp{ - KV: &api.KVTxnOp{ - Verb: api.KVDeleteCAS, - Key: key, - Index: kvPair.ModifyIndex, - }, + KV: kvOps, }) a.logger.Trace("Deleting KV entry", "key", key) } @@ -212,12 +216,16 @@ func (a *Agent) updateFailedNodeTxn(node *api.Node, kvClient *api.KV, key string var ops api.TxnOps if kvPair == nil { bytes, _ := time.Now().UTC().GobEncode() + kvOps := &api.KVTxnOp{ + Verb: api.KVSet, + Key: key, + Value: bytes, + } + a.HasPartition(func(partition string) { + kvOps.Partition = partition + }) ops = append(ops, &api.TxnOp{ - KV: &api.KVTxnOp{ - Verb: api.KVSet, - Key: key, - Value: bytes, - }, + KV: kvOps, }) a.logger.Trace("Writing KV entry for key", "key", key) } else { @@ -233,17 +241,21 @@ func (a *Agent) updateFailedNodeTxn(node *api.Node, kvClient *api.KV, key string node.Node, "failureTimeout", a.config.NodeReconnectTimeout.String()) // Clear the KV entry. + kvOps := &api.KVTxnOp{ + Verb: api.KVDeleteCAS, + Key: key, + Index: kvPair.ModifyIndex, + } + a.HasPartition(func(partition string) { + kvOps.Partition = partition + }) ops = append(ops, &api.TxnOp{ - KV: &api.KVTxnOp{ - Verb: api.KVDeleteCAS, - Key: key, - Index: kvPair.ModifyIndex, - }, + KV: kvOps, }) // If the node still exists in the catalog, add an atomic delete on the node to // the list of operations to run. - existing, _, err := a.client.Catalog().Node(node.Node, nil) + existing, _, err := a.client.Catalog().Node(node.Node, a.ConsulQueryOption()) if err != nil { return fmt.Errorf("could not fetch existing node %q: %v", node.Node, err) } @@ -273,16 +285,20 @@ func (a *Agent) updateFailedNodeTxn(node *api.Node, kvClient *api.KV, key string func (a *Agent) updateNodeCheck(node *api.Node, ops api.TxnOps, status, output string) error { metrics.IncrCounter([]string{"coord", "txn"}, 1) // Update the external health check status. + healthCheck := api.HealthCheck{ + Node: node.Node, + CheckID: externalCheckName, + Name: "External Node Status", + Status: status, + Output: output, + } + a.HasPartition(func(partition string) { + healthCheck.Partition = partition + }) ops = append(ops, &api.TxnOp{ Check: &api.CheckTxnOp{ - Verb: api.CheckSet, - Check: api.HealthCheck{ - Node: node.Node, - CheckID: externalCheckName, - Name: "External Node Status", - Status: status, - Output: output, - }, + Verb: api.CheckSet, + Check: healthCheck, }, }) diff --git a/go.mod b/go.mod index a2bd4dd..1ad0ed9 100644 --- a/go.mod +++ b/go.mod @@ -8,8 +8,8 @@ require ( github.com/armon/go-metrics v0.4.1 github.com/go-ping/ping v1.1.0 github.com/hashicorp/consul v1.16.1 - github.com/hashicorp/consul/api v1.28.3 - github.com/hashicorp/consul/sdk v0.15.0 + github.com/hashicorp/consul/api v1.29.4 + github.com/hashicorp/consul/sdk v0.16.1 github.com/hashicorp/go-hclog v1.6.3 github.com/hashicorp/go-multierror v1.1.1 github.com/hashicorp/go-uuid v1.0.3 @@ -49,7 +49,6 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/consul-net-rpc v0.0.0-20240430162428-577c8c7c7d01 // indirect github.com/hashicorp/consul/envoyextensions v0.6.1 // indirect - github.com/hashicorp/consul/proto-public v0.6.1 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-bexpr v0.1.14 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect @@ -88,15 +87,15 @@ require ( github.com/spf13/cast v1.6.0 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/tv42/httpunix v0.0.0-20191220191345-2ba4b9c3382c // indirect - golang.org/x/crypto v0.23.0 // indirect - golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect - golang.org/x/mod v0.17.0 // indirect - golang.org/x/net v0.25.0 // indirect - golang.org/x/sync v0.7.0 // indirect - golang.org/x/sys v0.20.0 // indirect - golang.org/x/text v0.15.0 // indirect + golang.org/x/crypto v0.26.0 // indirect + golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 // indirect + golang.org/x/mod v0.20.0 // indirect + golang.org/x/net v0.28.0 // indirect + golang.org/x/sync v0.8.0 // indirect + golang.org/x/sys v0.24.0 // indirect + golang.org/x/text v0.17.0 // indirect golang.org/x/time v0.5.0 // indirect - golang.org/x/tools v0.21.0 // indirect + golang.org/x/tools v0.24.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240521202816-d264139d666e // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240521202816-d264139d666e // indirect google.golang.org/grpc v1.64.0 // indirect diff --git a/go.sum b/go.sum index 627a599..6a6c492 100644 --- a/go.sum +++ b/go.sum @@ -100,14 +100,14 @@ github.com/hashicorp/consul v1.16.1 h1:3CeNybQgjxJ3wu2IUSi3OySn4bQ70sv1jENtLJCrk github.com/hashicorp/consul v1.16.1/go.mod h1:GH3Ybk4rNKf0wVLfwG3btwPilh+sMwGtvymgFqFRqp0= github.com/hashicorp/consul-net-rpc v0.0.0-20240430162428-577c8c7c7d01 h1:4O69aRyKW3RQlVuipnWAZ7HlF39zhUz4ku6wFyfRD1Q= github.com/hashicorp/consul-net-rpc v0.0.0-20240430162428-577c8c7c7d01/go.mod h1:eRpZHC2gAAPhCdnIo2Jwgw/YrngMgxIwzIWxB6riM10= -github.com/hashicorp/consul/api v1.28.3 h1:IE06LST/knnCQ+cxcvzyXRF/DetkgGhJoaOFd4l9xkk= -github.com/hashicorp/consul/api v1.28.3/go.mod h1:7AGcUFu28HkgOKD/GmsIGIFzRTmN0L02AE9Thsr2OhU= +github.com/hashicorp/consul/api v1.29.4 h1:P6slzxDLBOxUSj3fWo2o65VuKtbtOXFi7TSSgtXutuE= +github.com/hashicorp/consul/api v1.29.4/go.mod h1:HUlfw+l2Zy68ceJavv2zAyArl2fqhGWnMycyt56sBgg= github.com/hashicorp/consul/envoyextensions v0.6.1 h1:s1E0qMlH0MN3JOXK+q2az8i0xsVlFC6likdJb4Ur8bU= github.com/hashicorp/consul/envoyextensions v0.6.1/go.mod h1:OmF8193EYIc0iptUek/HiEB2yk/OotIVDgssB+GLRuc= -github.com/hashicorp/consul/proto-public v0.6.1 h1:+uzH3olCrksXYWAYHKqK782CtK9scfqH+Unlw3UHhCg= -github.com/hashicorp/consul/proto-public v0.6.1/go.mod h1:cXXbOg74KBNGajC+o8RlA502Esf0R9prcoJgiOX/2Tg= -github.com/hashicorp/consul/sdk v0.15.0 h1:2qK9nDrr4tiJKRoxPGhm6B7xJjLVIQqkjiab2M4aKjU= -github.com/hashicorp/consul/sdk v0.15.0/go.mod h1:r/OmRRPbHOe0yxNahLw7G9x5WG17E1BIECMtCjcPSNo= +github.com/hashicorp/consul/proto-public v0.6.2 h1:+DA/3g/IiKlJZb88NBn0ZgXrxJp2NlvCZdEyl+qxvL0= +github.com/hashicorp/consul/proto-public v0.6.2/go.mod h1:cXXbOg74KBNGajC+o8RlA502Esf0R9prcoJgiOX/2Tg= +github.com/hashicorp/consul/sdk v0.16.1 h1:V8TxTnImoPD5cj0U9Spl0TUxcytjcbbJeADFF07KdHg= +github.com/hashicorp/consul/sdk v0.16.1/go.mod h1:fSXvwxB2hmh1FMZCNl6PwX0Q/1wdWtHJcZ7Ea5tns0s= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -309,16 +309,16 @@ golang.org/x/crypto v0.0.0-20200414173820-0848c9571904/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.3.0/go.mod h1:hebNnKkNXi2UzZN1eVRvBB7co0a+JxK6XbPiWVs/3J4= -golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI= -golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8= -golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 h1:vr/HnozRka3pE4EsMEg1lgkXJkTFJCVUX+S/ZT6wYzM= -golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc= +golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= +golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= +golang.org/x/exp v0.0.0-20240823005443-9b4947da3948 h1:kx6Ds3MlpiUHKj7syVnbp57++8WpuKPcR5yjLBjvLEA= +golang.org/x/exp v0.0.0-20240823005443-9b4947da3948/go.mod h1:akd2r19cwCdwSwWeIdzYQGa/EZZyqcOdwWiwj5L5eKQ= golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug= golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= -golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= -golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= +golang.org/x/mod v0.20.0 h1:utOm6MM3R3dnawAiJgn0y+xvuYRsm1RKM/4giyfDgV0= +golang.org/x/mod v0.20.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -331,16 +331,16 @@ golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLd golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= -golang.org/x/net v0.25.0 h1:d/OCCoBEUq33pjydKrGQhw7IlUPI2Oylr+8qLx49kac= -golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM= +golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= +golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= -golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -370,8 +370,8 @@ golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y= -golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= +golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= @@ -381,8 +381,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= -golang.org/x/text v0.15.0 h1:h1V/4gjBv8v9cjcR6+AR5+/cIYK5N/WAgiv4xlsEtAk= -golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= +golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -392,8 +392,8 @@ golang.org/x/tools v0.0.0-20190907020128-2ca718005c18/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= -golang.org/x/tools v0.21.0 h1:qc0xYgIbsSDt9EyWz05J5wfa7LOVW0YTLOXrqdLAWIw= -golang.org/x/tools v0.21.0/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= +golang.org/x/tools v0.24.0 h1:J1shsA93PJUEVaUSaay7UXAyE8aimq3GW0pjlolpa24= +golang.org/x/tools v0.24.0/go.mod h1:YhNqVBIfWHdzvTLs0d8LCuMhkKUgSUKldakyV7W/WDQ= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/genproto/googleapis/api v0.0.0-20240521202816-d264139d666e h1:SkdGTrROJl2jRGT/Fxv5QUf9jtdKCQh4KQJXbXVLAi0= diff --git a/leader.go b/leader.go index 48e4238..d650f8a 100644 --- a/leader.go +++ b/leader.go @@ -107,7 +107,7 @@ func nodeLists(nodes []*api.Node, insts []*api.ServiceEntry, } func (a *Agent) commitOps(ops api.KVTxnOps) bool { - success, results, _, err := a.client.KV().Txn(ops, nil) + success, results, _, err := a.client.KV().Txn(ops, a.ConsulQueryOption()) if err != nil || !success { a.logger.Error("Error writing state to KV store", "results", results, "error", err) // Try again after the wait because we got an error. @@ -157,11 +157,15 @@ WATCH_NODES_WAIT: healthNodes, pingNodes := nodeLists(externalNodes, healthyInstances) // Write the KV update as a transaction. + kvOps := &api.KVTxnOp{ + Verb: api.KVDeleteTree, + Key: a.kvNodeListPath(), + } + a.HasPartition(func(partition string) { + kvOps.Partition = partition + }) ops := api.KVTxnOps{ - &api.KVTxnOp{ - Verb: api.KVDeleteTree, - Key: a.kvNodeListPath(), - }, + kvOps, } for _, agent := range healthyInstances { bytes, _ := json.Marshal(NodeWatchList{ @@ -173,6 +177,9 @@ WATCH_NODES_WAIT: Key: a.kvNodeListPath() + agent.Service.ID, Value: bytes, } + a.HasPartition(func(partition string) { + op.Partition = partition + }) ops = append(ops, op) // Flush any ops if we're nearing a transaction limit @@ -206,6 +213,10 @@ func (a *Agent) watchExternalNodes(nodeCh chan []*api.Node, stopCh <-chan struct opts := &api.QueryOptions{ NodeMeta: a.config.NodeMeta, } + a.HasPartition(func(partition string) { + opts.Partition = partition + }) + ctx, cancelFunc := context.WithCancel(context.Background()) opts = opts.WithContext(ctx) go func() { @@ -249,6 +260,9 @@ func (a *Agent) watchServiceInstances(instanceCh chan []*api.ServiceEntry, stopC var opts *api.QueryOptions ctx, cancelFunc := context.WithCancel(context.Background()) opts = opts.WithContext(ctx) + a.HasPartition(func(partition string) { + opts.Partition = partition + }) go func() { <-stopCh cancelFunc() @@ -282,7 +296,7 @@ func (a *Agent) getServiceInstances(opts *api.QueryOptions) ([]*api.ServiceEntry var healthyInstances []*api.ServiceEntry var meta *api.QueryMeta - namespaces, err := namespacesList(a.client) + namespaces, err := namespacesList(a.client, a.config) if err != nil { return nil, err } @@ -313,8 +327,11 @@ func (a *Agent) getServiceInstances(opts *api.QueryOptions) ([]*api.ServiceEntry // namespacesList returns a list of all accessable namespaces. // Returns namespace "" (none) if none found for consul OSS compatibility. -func namespacesList(client *api.Client) ([]*api.Namespace, error) { - namespaces, _, err := client.Namespaces().List(nil) +func namespacesList(client *api.Client, config *Config) ([]*api.Namespace, error) { + opts := &api.QueryOptions{ + Partition: config.Partition, + } + namespaces, _, err := client.Namespaces().List(opts) if e, ok := err.(api.StatusError); ok && e.Code == 404 { namespaces = []*api.Namespace{{Name: ""}} } else if err != nil { diff --git a/leader_test.go b/leader_test.go index c8cd1a1..3485e4a 100644 --- a/leader_test.go +++ b/leader_test.go @@ -15,6 +15,7 @@ import ( "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil/retry" + "github.com/stretchr/testify/assert" ) func (a *Agent) verifyUpdates(t *testing.T, expectedHealthNodes, expectedProbeNodes []string) { @@ -458,9 +459,11 @@ func Test_namespacesList(t *testing.T) { if err != nil { t.Fatal(err) } + + config := &Config{Partition: "default"} // simulate enterprise consul testcase = "ent" - nss, err := namespacesList(client) + nss, err := namespacesList(client, config) if err != nil { t.Fatal("unexpected error:", err) } @@ -469,7 +472,7 @@ func Test_namespacesList(t *testing.T) { } // simulate oss consul testcase = "oss" - nss, err = namespacesList(client) + nss, err = namespacesList(client, config) if err != nil { t.Fatal("unexpected error:", err) } @@ -478,7 +481,7 @@ func Test_namespacesList(t *testing.T) { } // simulate other random error testcase = "err" - nss, err = namespacesList(client) + nss, err = namespacesList(client, config) if err == nil { t.Fatal("unexpected error:", err) } @@ -488,44 +491,70 @@ func Test_namespacesList(t *testing.T) { } func Test_getServiceInstances(t *testing.T) { - ts := httptest.NewServer(http.HandlerFunc( - func(w http.ResponseWriter, r *http.Request) { - uri := r.RequestURI - switch { // ignore anything that doesn't require a return body - case strings.Contains(uri, "status/leader"): - fmt.Fprint(w, `"127.0.0.1"`) - case strings.Contains(uri, "namespace"): - fmt.Fprint(w, namespacesJSON) - case strings.Contains(uri, "health/service"): - fmt.Fprint(w, healthserviceJSON) + partitionQueryParamKey := "partition" + // parameterized test + cases := []struct { + name, partition string + }{ + {"No partition", ""}, + {"default partition", "default"}, + {"admin partition", "admin"}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + uri := r.RequestURI + switch { // ignore anything that doesn't require a return body + case strings.Contains(uri, "status/leader"): + assert.Equal(t, tc.partition, r.URL.Query().Get(partitionQueryParamKey)) + fmt.Fprint(w, `"127.0.0.1"`) + case strings.Contains(uri, "namespace"): + assert.Equal(t, tc.partition, r.URL.Query().Get(partitionQueryParamKey)) + fmt.Fprint(w, namespacesJSON) + case strings.Contains(uri, "health/service"): + assert.Equal(t, tc.partition, r.URL.Query().Get(partitionQueryParamKey)) + fmt.Fprint(w, healthserviceJSON) + } + })) + defer ts.Close() + + var agent *Agent + if tc.partition == "" { + agent = testAgent(t, func(c *Config) { + c.HTTPAddr = ts.URL + c.InstanceID = "test-agent" + }) + } else { + agent = testAgent(t, func(c *Config) { + c.HTTPAddr = ts.URL + c.InstanceID = "test-agent" + c.Partition = tc.partition + }) } - })) - defer ts.Close() - agent := testAgent(t, func(c *Config) { - c.HTTPAddr = ts.URL - c.InstanceID = "test-agent" - }) - defer agent.Shutdown() + defer agent.Shutdown() - opts := &api.QueryOptions{} - serviceInstances, err := agent.getServiceInstances(opts) - if err != nil { - t.Fatal(err) - } - // 4 because test data has 2 namespaces each with 2 services - if len(serviceInstances) != 4 { - t.Fatal("Wrong number of services", len(serviceInstances)) - } - for _, si := range serviceInstances { - sv := si.Service - switch { - case sv.ID == "one" && sv.Namespace == "foo": - case sv.ID == "one" && sv.Namespace == "default": - case sv.ID == "two" && sv.Namespace == "foo": - case sv.ID == "two" && sv.Namespace == "default": - default: - t.Fatalf("Unknown service: %#v\n", si.Service) - } + opts := &api.QueryOptions{} + serviceInstances, err := agent.getServiceInstances(opts) + if err != nil { + t.Fatal(err) + } + // 4 because test data has 2 namespaces each with 2 services + if len(serviceInstances) != 4 { + t.Fatal("Wrong number of services", len(serviceInstances)) + } + for _, si := range serviceInstances { + sv := si.Service + switch { + case sv.ID == "one" && sv.Namespace == "foo": + case sv.ID == "one" && sv.Namespace == "default": + case sv.ID == "two" && sv.Namespace == "foo": + case sv.ID == "two" && sv.Namespace == "default": + default: + t.Fatalf("Unknown service: %#v\n", si.Service) + } + } + }) } } diff --git a/main.go b/main.go index 22052b0..ee09e8c 100644 --- a/main.go +++ b/main.go @@ -104,6 +104,7 @@ func main() { } else { ui.Info(fmt.Sprintf(" Datacenter: %q", config.Datacenter)) } + ui.Info(fmt.Sprintf(" Partition: %q", config.Partition)) ui.Info(fmt.Sprintf(" Service: %q", config.Service)) ui.Info(fmt.Sprintf(" Service Tag: %q", config.Tag)) ui.Info(fmt.Sprintf(" Service ID: %q", agent.serviceID()))