diff --git a/config.go b/config.go index 73c5c40..2c5e59b 100644 --- a/config.go +++ b/config.go @@ -21,6 +21,7 @@ import ( const ( PingTypeUDP = "udp" PingTypeSocket = "socket" + PingTypeTCP = "tcp:" ) type Config struct { diff --git a/coordinate.go b/coordinate.go index 4cf3e00..d270224 100644 --- a/coordinate.go +++ b/coordinate.go @@ -4,6 +4,8 @@ import ( "errors" "fmt" "math/rand" + "net" + "strconv" "strings" "time" @@ -76,7 +78,7 @@ func (a *Agent) updateCoords(nodeCh <-chan []*api.Node) { node := nodes[index] a.inflightLock.Lock() if _, ok := a.inflightPings[node.Node]; ok { - a.logger.Warn("Error pinging node, last request still outstanding", "node", node.Node, "nodeId" , node.ID) + a.logger.Warn("Error pinging node, last request still outstanding", "node", node.Node, "nodeId", node.ID) } else { a.inflightPings[node.Node] = struct{}{} go a.runNodePing(node) @@ -95,9 +97,13 @@ func (a *Agent) runNodePing(node *api.Node) { a.logger.Error("could not get critical status for node", "node", node.Node, "error", err) } - // Run an ICMP ping to the node. - rtt, err := pingNode(node.Address, a.config.PingType) - + // Run a ping to the node. + var rtt time.Duration + if pingType, ok := node.Meta["ping-type"]; ok { + rtt, err = pingNodeCustom(node.Address, pingType) + } else { + rtt, err = pingNode(node.Address, a.config.PingType) + } // Update the node's health based on the results of the ping. if err == nil { if err := a.updateHealthyNode(node, kvClient, key, kvPair); err != nil { @@ -171,7 +177,7 @@ func (a *Agent) updateHealthyNodeTxn(node *api.Node, kvClient *api.KV, key strin Index: kvPair.ModifyIndex, }, }) - a.logger.Trace("Deleting KV entry", "key", key) + a.logger.Trace("Deleting KV entry", "key", key) } // Batch the possible KV deletion operation with the external health check update. @@ -363,7 +369,7 @@ func (a *Agent) updateNodeCoordinate(node *api.Node, rtt time.Duration) error { // Don't update the coordinate in the catalog if the coordinate already // exists and the change is insignificant if len(coords) > 0 && coord.Coord.DistanceTo(newCoord) <= time.Millisecond { - a.logger.Trace("Skipped update for coordinates", "node", node.Node, "distanceFromPreviousCoord", coord.Coord.DistanceTo(newCoord)) + a.logger.Trace("Skipped update for coordinates", "node", node.Node, "distanceFromPreviousCoord", coord.Coord.DistanceTo(newCoord)) return nil } @@ -417,3 +423,36 @@ func pingNode(addr string, method string) (time.Duration, error) { return 0, pingErr } } + +// pingNodeCustom check the ping-type in node metadata +func pingNodeCustom(addr string, method string) (time.Duration, error) { + switch { + case strings.HasPrefix(strings.ToLower(method), PingTypeTCP): + tcp := strings.SplitN(method, `:`, 2) + port, err := strconv.ParseUint(tcp[1], 10, 16) // ensure 0 <= port <= 65535 + if err != nil { + return 0, fmt.Errorf("invalid tcp port %q", method) + } + return pingNodeTCP(addr, uint16(port)) + case strings.ToLower(method) == PingTypeUDP: + return pingNode(addr, PingTypeUDP) + case strings.ToLower(method) == PingTypeSocket || strings.ToLower(method) == "icmp": + return pingNode(addr, PingTypeSocket) + default: + return pingNode(addr, method) + } +} + +// pingNodeCustom runs a TCP handshake against an address. +// It will returns the round-trip time for establishing connection +func pingNodeTCP(addr string, port uint16) (time.Duration, error) { + var tcp_timeout time.Duration = 2 * time.Second + start := time.Now() + conn, err := net.DialTimeout("tcp", strings.Join([]string{addr, fmt.Sprintf("%d", port)}, `:`), tcp_timeout) + if err != nil { + return 0, fmt.Errorf("cannot establish tcp connection %q", err) + } + elapsed := time.Since(start) + conn.Close() + return elapsed, nil +} diff --git a/coordinate_test.go b/coordinate_test.go index a0eb21c..ff68e9c 100644 --- a/coordinate_test.go +++ b/coordinate_test.go @@ -1,11 +1,14 @@ package main import ( - "github.com/hashicorp/go-hclog" "os" + "strconv" + "strings" "testing" "time" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil/retry" ) @@ -40,10 +43,10 @@ func TestCoordinate_updateNodeCoordinate(t *testing.T) { } agent := &Agent{ - client: client, - config: conf, - logger: hclog.New(&hclog.LoggerOptions{ - Name: "consul-esm", + client: client, + config: conf, + logger: hclog.New(&hclog.LoggerOptions{ + Name: "consul-esm", Level: hclog.LevelFromString("INFO"), IncludeLocation: true, Output: LOGOUT, @@ -94,9 +97,9 @@ func TestCoordinate_updateNodeCheck(t *testing.T) { } agent := &Agent{ - client: client, - config: conf, - logger: hclog.New(&hclog.LoggerOptions{ + client: client, + config: conf, + logger: hclog.New(&hclog.LoggerOptions{ Name: "consul-esm", Level: hclog.LevelFromString("INFO"), IncludeLocation: true, @@ -199,9 +202,9 @@ func TestCoordinate_reapFailedNode(t *testing.T) { } agent := &Agent{ - client: client, - config: conf, - logger: hclog.New(&hclog.LoggerOptions{ + client: client, + config: conf, + logger: hclog.New(&hclog.LoggerOptions{ Name: "consul-esm", Level: hclog.LevelFromString("INFO"), IncludeLocation: true, @@ -303,6 +306,31 @@ func TestCoordinate_parallelPings(t *testing.T) { } } + // Register some nodes with custom checks + tcp_nodes := map[string]string{ + // when running test with raw socket privilege + // "node6": "ICMP", + "node7": "UDP", + "node8": strings.Join([]string{"tcp", strconv.Itoa(s.Config.Ports.SerfLan)}, `:`), + "node9": strings.Join([]string{"tcp", strconv.Itoa(s.Config.Ports.HTTP)}, `:`), + } + for node, method := range tcp_nodes { + meta := map[string]string{ + "external-node": "true", + "external-probe": "true", + "ping-type": method, + } + _, err := client.Catalog().Register(&api.CatalogRegistration{ + Node: node, + Address: "127.0.0.1", + Datacenter: "dc1", + NodeMeta: meta, + }, nil) + if err != nil { + t.Fatal(err) + } + } + // Register an ESM agent. agent1 := testAgent(t, func(c *Config) { c.HTTPAddr = s.HTTPAddr @@ -332,8 +360,31 @@ func TestCoordinate_parallelPings(t *testing.T) { ModifyIndex: checks[0].ModifyIndex, } if err := compareHealthCheck(checks[0], expected); err != nil { + r.Fatalf("Node %s: %q\n", node, err) + } + } + + for node, _ := range tcp_nodes { + checks, _, err := client.Health().Node(node, nil) + if err != nil { r.Fatal(err) } + if len(checks) != 1 { + r.Fatal("Bad number of checks; wanted 1, got ", len(checks)) + } + expected := &api.HealthCheck{ + Node: node, + CheckID: externalCheckName, + Name: "External Node Status", + Status: api.HealthPassing, + Output: NodeAliveStatus, + CreateIndex: checks[0].CreateIndex, + ModifyIndex: checks[0].ModifyIndex, + } + if err := compareHealthCheck(checks[0], expected); err != nil { + + r.Fatalf("Node %s: %q\n", node, err) + } } }) } diff --git a/version/version.go b/version/version.go index 4d84ba3..ca88f1a 100644 --- a/version/version.go +++ b/version/version.go @@ -22,7 +22,7 @@ var ( // Version is the main version number that is being run at the moment. // Note: our current release process does a pattern match on this variable. - Version = "0.5.0" + Version = "0.6.0" // VersionPrerelease is a pre-release marker for the version. If this is "" // (empty string) then it means that it is a final release. Otherwise, this @@ -44,7 +44,7 @@ func init() { // GetHumanVersion composes the parts of the version in a way that's suitable // for displaying to humans. func GetHumanVersion() string { - version := fmt.Sprintf("%s v%s",Name, Version) + version := fmt.Sprintf("%s v%s", Name, Version) release := VersionPrerelease if GitDescribe == "" && release == "" {