Skip to content

Commit

Permalink
update to use Consul's new StatusHandler, et. al.
Browse files Browse the repository at this point in the history
StatusHandler is an internal public struct used to track service status.
This replaces the older Notify interface, subsuming it as a field.

The Notify interface itself (used on a field of the StatusHandler) also
has added a new method, ServiceExists, which was added as part of the
Consul's Alias feature and which we don't support so use add a stubbed
out method.

The IDs used to track the checks have also changed from a string to
a struct. The struct contains the string and some enterprise meta data.
For now we just ignore the enterprise portion internally.
  • Loading branch information
eikenb committed May 26, 2022
1 parent f5dc65c commit 824ec85
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 17 deletions.
42 changes: 28 additions & 14 deletions check.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (

"github.com/armon/go-metrics"
consulchecks "github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/types"
Expand Down Expand Up @@ -101,15 +102,16 @@ func (c *CheckRunner) updateCheckHTTP(latestCheck *api.HealthCheck, checkHash ty
tlsConfig.ServerName = definition.TLSServerName

http := &consulchecks.CheckHTTP{
Notify: c,
CheckID: checkHash,
CheckID: structs.CheckID{ID: checkHash},
HTTP: definition.HTTP,
Header: definition.Header,
Method: definition.Method,
Interval: definition.IntervalDuration,
Timeout: definition.TimeoutDuration,
Logger: c.logger,
TLSClientConfig: tlsConfig,
StatusHandler: consulchecks.NewStatusHandler(c, c.logger,
c.PassingThreshold, c.CriticalThreshold, c.CriticalThreshold),
}

if check, checkExists := c.checks[checkHash]; checkExists {
Expand Down Expand Up @@ -151,15 +153,18 @@ func (c *CheckRunner) updateCheckHTTP(latestCheck *api.HealthCheck, checkHash ty
return true
}

func (c *CheckRunner) updateCheckTCP(latestCheck *api.HealthCheck, checkHash types.CheckID,
definition *api.HealthCheckDefinition, updated, added checkIDSet) bool {
func (c *CheckRunner) updateCheckTCP(
latestCheck *api.HealthCheck, checkHash types.CheckID,
definition *api.HealthCheckDefinition, updated, added checkIDSet,
) bool {
tcp := &consulchecks.CheckTCP{
Notify: c,
CheckID: checkHash,
CheckID: structs.CheckID{ID: checkHash},
TCP: definition.TCP,
Interval: definition.IntervalDuration,
Timeout: definition.TimeoutDuration,
Logger: c.logger,
StatusHandler: consulchecks.NewStatusHandler(c, c.logger,
c.PassingThreshold, c.CriticalThreshold, c.CriticalThreshold),
}

if check, checkExists := c.checks[checkHash]; checkExists {
Expand Down Expand Up @@ -288,13 +293,22 @@ func (c *CheckRunner) UpdateChecks(checks api.HealthChecks) {
}
}

// ServiceExists is part of the consulchecks.CheckNotifier interface.
// It is currently used as part of Consul's alias service feature.
// This function is used to check for localality of service.
// Unsuppported at this time, so hardcoded false return.
func (c *CheckRunner) ServiceExists(serviceID structs.ServiceID) bool {
return false
}

// UpdateCheck handles the output of an HTTP/TCP check and decides whether or not
// to push an update to the catalog.
func (c *CheckRunner) UpdateCheck(checkID types.CheckID, status, output string) {
func (c *CheckRunner) UpdateCheck(checkID structs.CheckID, status, output string) {
c.Lock()
defer c.Unlock()

check, ok := c.checks[checkID]
checkHash := checkID.ID
check, ok := c.checks[checkHash]
if !ok {
return
}
Expand Down Expand Up @@ -322,11 +336,11 @@ func (c *CheckRunner) UpdateCheck(checkID types.CheckID, status, output string)

// Update the critical time tracking
if status == api.HealthCritical {
if _, ok := c.checksCritical[checkID]; !ok {
c.checksCritical[checkID] = time.Now()
if _, ok := c.checksCritical[checkHash]; !ok {
c.checksCritical[checkHash] = time.Now()
}
} else {
delete(c.checksCritical, checkID)
delete(c.checksCritical, checkHash)
}

// Defer a sync if the output has changed. This is an optimization around
Expand All @@ -335,15 +349,15 @@ func (c *CheckRunner) UpdateCheck(checkID types.CheckID, status, output string)
// change we do the write immediately.
if c.CheckUpdateInterval > 0 && check.Status == status {
check.Output = output
if _, ok := c.deferCheck[checkID]; !ok {
if _, ok := c.deferCheck[checkHash]; !ok {
intv := time.Duration(uint64(c.CheckUpdateInterval)/2) + lib.RandomStagger(c.CheckUpdateInterval)
deferSync := time.AfterFunc(intv, func() {
c.Lock()
c.handleCheckUpdate(&check.HealthCheck, status, output)
delete(c.deferCheck, checkID)
delete(c.deferCheck, checkHash)
c.Unlock()
})
c.deferCheck[checkID] = deferSync
c.deferCheck[checkHash] = deferSync
}
return
}
Expand Down
8 changes: 5 additions & 3 deletions check_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/hashicorp/go-hclog"

"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -348,9 +349,10 @@ func TestCheck_NoFlapping(t *testing.T) {

runner.UpdateChecks(checks)

id := checkHash(checks[0])
hash := checkHash(checks[0])
id := structs.CheckID{ID: hash}

originalCheck, ok := runner.checks[id]
originalCheck, ok := runner.checks[hash]
if !ok {
t.Fatalf("Check was not stored on runner.checks as expected. Checks: %v", runner.checks)
}
Expand Down Expand Up @@ -412,7 +414,7 @@ func TestCheck_NoFlapping(t *testing.T) {
assert.Equal(t, api.HealthCritical, originalCheck.Status)

runner.UpdateChecks(checks)
currentCheck, ok := runner.checks[id]
currentCheck, ok := runner.checks[hash]
if !ok {
t.Fatalf("Current check was not stored on runner.checks as expected. Checks: %v", runner.checks)
}
Expand Down

0 comments on commit 824ec85

Please sign in to comment.