Skip to content

Commit

Permalink
Merge pull request #468 from xmidt-org/denopink/feat/upgrade-to-touch…
Browse files Browse the repository at this point in the history
…stone-httpaux

feat: upgrade to httpaux & touchstone (remove gokit metrics)
  • Loading branch information
denopink authored Sep 9, 2024
2 parents ec37b0a + f74a2a2 commit 4b44b82
Show file tree
Hide file tree
Showing 15 changed files with 464 additions and 281 deletions.
6 changes: 3 additions & 3 deletions binOp.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (

// Errors
var (
errIterableTypeOnly = errors.New("Only slices and arrays are currently supported as iterable")
errNumericalTypeOnly = errors.New("Only numerical values are supported")
errOpNotSupported = errors.New("Operation not supported")
errIterableTypeOnly = errors.New("only slices and arrays are currently supported as iterable")
errNumericalTypeOnly = errors.New("only numerical values are supported")
errOpNotSupported = errors.New("operation not supported")
)

// Supported operations
Expand Down
42 changes: 35 additions & 7 deletions control.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
package main

import (
"errors"
"fmt"
"net/http"

"github.com/prometheus/client_golang/prometheus"
"github.com/xmidt-org/candlelight"
"github.com/xmidt-org/touchstone"
"go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux"
"go.uber.org/zap"

Expand All @@ -20,9 +23,6 @@ import (
// nolint:staticcheck
"github.com/xmidt-org/webpa-common/v2/xhttp"
"github.com/xmidt-org/webpa-common/v2/xhttp/gate"

// nolint:staticcheck
"github.com/xmidt-org/webpa-common/v2/xmetrics"
)

const (
Expand All @@ -32,7 +32,7 @@ const (
drainPath = "/device/drain"
)

func StartControlServer(logger *zap.Logger, manager device.Manager, deviceGate devicegate.Interface, registry xmetrics.Registry, v *viper.Viper, tracing candlelight.Tracing) (func(http.Handler) http.Handler, error) {
func StartControlServer(logger *zap.Logger, manager device.Manager, deviceGate devicegate.Interface, tf *touchstone.Factory, v *viper.Viper, tracing candlelight.Tracing) (func(http.Handler) http.Handler, error) {
if !v.IsSet(ControlKey) {
return xhttp.NilConstructor, nil
}
Expand All @@ -44,17 +44,45 @@ func StartControlServer(logger *zap.Logger, manager device.Manager, deviceGate d

options.Logger = logger

var errs error
gateStatus, err := tf.NewGauge(
prometheus.GaugeOpts{
Name: GateStatus,
Help: "Indicates whether the device gate is open (1.0) or closed (0.0)",
},
)
errs = errors.Join(errs, err)

drainStatus, err := tf.NewGauge(
prometheus.GaugeOpts{
Name: DrainStatus,
Help: "Indicates whether a device drain operation is currently running",
},
)
errs = errors.Join(errs, err)

drainCounter, err := tf.NewGauge(
prometheus.GaugeOpts{
Name: DrainCounter,
Help: "The total count of devices disconnected due to a drain since the server started",
},
)
errs = errors.Join(errs, err)
if errs != nil {
return xhttp.NilConstructor, err
}

var (
g = gate.New(
true,
gate.WithGauge(registry.NewGauge(GateStatus)),
gate.WithGauge(gateStatus),
)

d = drain.New(
drain.WithLogger(logger),
drain.WithManager(manager),
drain.WithStateGauge(registry.NewGauge(DrainStatus)),
drain.WithDrainCounter(registry.NewCounter(DrainCounter)),
drain.WithStateGauge(drainStatus),
drain.WithDrainCounter(drainCounter),
)

gateLogger = devicegate.GateLogger{Logger: logger}
Expand Down
16 changes: 8 additions & 8 deletions deviceAccess.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"net/http"

"github.com/fatih/structs"
"github.com/go-kit/kit/metrics"
"github.com/prometheus/client_golang/prometheus"
"github.com/thedevsaddam/gojsonq/v2"
"github.com/xmidt-org/webpa-common/v2/device"
"go.uber.org/zap"
Expand Down Expand Up @@ -77,26 +77,26 @@ type deviceAccess interface {

type talariaDeviceAccess struct {
strict bool
wrpMessagesCounter metrics.Counter
wrpMessagesCounter CounterVec
deviceRegistry device.Registry
checks []*parsedCheck
sep string
logger *zap.Logger
}

func (t *talariaDeviceAccess) withFailure(labelValues ...string) metrics.Counter {
func (t *talariaDeviceAccess) withFailure(labelValues ...string) prometheus.Counter {
if !t.strict {
return t.withSuccess(labelValues...)
}
return t.wrpMessagesCounter.With(append(labelValues, outcomeLabel, rejected)...)
return t.wrpMessagesCounter.WithLabelValues(append(labelValues, outcomeLabel, rejected)...)
}

func (t *talariaDeviceAccess) withFatal(labelValues ...string) metrics.Counter {
return t.wrpMessagesCounter.With(append(labelValues, outcomeLabel, rejected)...)
func (t *talariaDeviceAccess) withFatal(labelValues ...string) prometheus.Counter {
return t.wrpMessagesCounter.WithLabelValues(append(labelValues, outcomeLabel, rejected)...)
}

func (t *talariaDeviceAccess) withSuccess(labelValues ...string) metrics.Counter {
return t.wrpMessagesCounter.With(append(labelValues, outcomeLabel, accepted)...)
func (t *talariaDeviceAccess) withSuccess(labelValues ...string) prometheus.Counter {
return t.wrpMessagesCounter.WithLabelValues(append(labelValues, outcomeLabel, accepted)...)
}

func getRight(check *parsedCheck, wrpCredentials *gojsonq.JSONQ) interface{} {
Expand Down
31 changes: 4 additions & 27 deletions deviceAccess_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ import (
"errors"
"testing"

// nolint:staticcheck

"github.com/go-kit/kit/metrics"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/xmidt-org/webpa-common/v2/device"
Expand Down Expand Up @@ -43,7 +40,7 @@ func testAuthorizeWRP(t *testing.T, testCases []deviceAccessTestCase, strict boo
mockDevice = new(device.MockDevice)
mockBinOp = new(mockBinOp)
testLogger = zaptest.NewLogger(t)
counter = newTestCounter()
counter = mockCounter{labelPairs: make(map[string]string)}
expectedLabels = getLabelMaps(testCase.ExpectedError, testCase.IsFatal, strict, testCase.BaseLabelPairs)

wrpMsg = &wrp.Message{
Expand All @@ -69,9 +66,11 @@ func testAuthorizeWRP(t *testing.T, testCases []deviceAccessTestCase, strict boo
checks = getChecks(t, mockBinOp, testCase.IncompleteCheck, testCase.Authorized)
}

counter.On("WithLabelValues", []string{reasonLabel, invalidWRPDest, outcomeLabel, rejected}).Return().Once()
counter.On("Add", 1.).Return().Once()
deviceAccessAuthority := &talariaDeviceAccess{
strict: strict,
wrpMessagesCounter: counter,
wrpMessagesCounter: &counter,
deviceRegistry: mockDeviceRegistry,
sep: ">",
logger: testLogger,
Expand Down Expand Up @@ -184,28 +183,6 @@ func getLabelMaps(err error, isFatal, strict bool, baseLabelPairs map[string]str
return out
}

type testCounter struct {
count float64
labelPairs map[string]string
}

func (c *testCounter) Add(delta float64) {
c.count += delta
}

func (c *testCounter) With(labelValues ...string) metrics.Counter {
for i := 0; i < len(labelValues)-1; i += 2 {
c.labelPairs[labelValues[i]] = labelValues[i+1]
}
return c
}

func newTestCounter() *testCounter {
return &testCounter{
labelPairs: make(map[string]string),
}
}

func getTestDeviceMetadata() *device.Metadata {
metadata := new(device.Metadata)
claims := map[string]interface{}{
Expand Down
3 changes: 1 addition & 2 deletions eventDispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"go.uber.org/zap"

"github.com/go-kit/kit/metrics"
"github.com/prometheus/client_golang/prometheus"
"github.com/xmidt-org/webpa-common/v2/device"
"github.com/xmidt-org/webpa-common/v2/event"
Expand Down Expand Up @@ -43,7 +42,7 @@ type eventDispatcher struct {
authorizationKey string
source string
eventMap event.MultiMap
queueSize metrics.Gauge
queueSize prometheus.Gauge
droppedMessages CounterVec
outboundEvents CounterVec
outbounds chan<- outboundEnvelope
Expand Down
72 changes: 48 additions & 24 deletions eventDispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,14 @@ func genTestMetadata() *device.Metadata {

func testEventDispatcherOnDeviceEventConnectEvent(t *testing.T) {
var (
assert = assert.New(t)
require = require.New(t)
d = new(device.MockDevice)
dispatcher, outbounds, err = NewEventDispatcher(NewTestOutboundMeasures(), nil, nil)
assert = assert.New(t)
require = require.New(t)
d = new(device.MockDevice)
)

om, err := NewTestOutboundMeasures()
require.NoError(err)
dispatcher, outbounds, err := NewEventDispatcher(om, nil, nil)
require.NotNil(dispatcher)
require.NotNil(outbounds)
require.NoError(err)
Expand All @@ -57,12 +59,14 @@ func testEventDispatcherOnDeviceEventConnectEvent(t *testing.T) {

func testEventDispatcherOnDeviceEventDisconnectEvent(t *testing.T) {
var (
assert = assert.New(t)
require = require.New(t)
d = new(device.MockDevice)
dispatcher, outbounds, err = NewEventDispatcher(NewTestOutboundMeasures(), nil, nil)
assert = assert.New(t)
require = require.New(t)
d = new(device.MockDevice)
)

om, err := NewTestOutboundMeasures()
require.NoError(err)
dispatcher, outbounds, err := NewEventDispatcher(om, nil, nil)
require.NotNil(dispatcher)
require.NotNil(outbounds)
require.NoError(err)
Expand All @@ -83,11 +87,13 @@ func testEventDispatcherOnDeviceEventDisconnectEvent(t *testing.T) {

func testEventDispatcherOnDeviceEventUnroutable(t *testing.T) {
var (
assert = assert.New(t)
require = require.New(t)
dispatcher, outbounds, err = NewEventDispatcher(NewTestOutboundMeasures(), nil, nil)
assert = assert.New(t)
require = require.New(t)
)

om, err := NewTestOutboundMeasures()
require.NoError(err)
dispatcher, outbounds, err := NewEventDispatcher(om, nil, nil)
require.NotNil(dispatcher)
require.NotNil(outbounds)
require.NoError(err)
Expand All @@ -102,10 +108,13 @@ func testEventDispatcherOnDeviceEventUnroutable(t *testing.T) {

func testEventDispatcherOnDeviceEventBadURLFilter(t *testing.T) {
var (
assert = assert.New(t)
dispatcher, outbounds, err = NewEventDispatcher(NewTestOutboundMeasures(), &Outbounder{DefaultScheme: "bad"}, nil)
assert = assert.New(t)
require = require.New(t)
)

om, err := NewTestOutboundMeasures()
require.NoError(err)
dispatcher, outbounds, err := NewEventDispatcher(om, &Outbounder{DefaultScheme: "bad"}, nil)
assert.Nil(dispatcher)
assert.Nil(outbounds)
assert.Error(err)
Expand Down Expand Up @@ -188,11 +197,13 @@ func testEventDispatcherOnDeviceEventDispatchEvent(t *testing.T) {
t.Logf("%#v, method=%s, format=%s", record, record.outbounder.method(), format)

var (
expectedContents = []byte{1, 2, 3, 4}
urlFilter = new(mockURLFilter)
dispatcher, outbounds, err = NewEventDispatcher(NewTestOutboundMeasures(), record.outbounder, urlFilter)
expectedContents = []byte{1, 2, 3, 4}
urlFilter = new(mockURLFilter)
)

om, err := NewTestOutboundMeasures()
require.NoError(err)
dispatcher, outbounds, err := NewEventDispatcher(om, record.outbounder, urlFilter)
require.NotNil(dispatcher)
require.NotNil(outbounds)
require.NoError(err)
Expand Down Expand Up @@ -250,8 +261,10 @@ func testEventDispatcherOnDeviceEventFullQueue(t *testing.T) {
}), zapcore.AddSync(&b), zapcore.ErrorLevel),
),
}
om = NewTestOutboundMeasures()
)

om, err := NewTestOutboundMeasures()
require.NoError(err)
dm := new(mockCounter)
om.DroppedMessages = dm
d, _, err := NewEventDispatcher(om, outbounder, nil)
Expand Down Expand Up @@ -288,9 +301,11 @@ func testEventDispatcherOnDeviceEventMessageReceived(t *testing.T) {
}), zapcore.AddSync(&b), zapcore.ErrorLevel),
),
}
dispatcher, outbounds, err = NewEventDispatcher(NewTestOutboundMeasures(), &o, nil)
)

om, err := NewTestOutboundMeasures()
require.NoError(err)
dispatcher, outbounds, err := NewEventDispatcher(om, &o, nil)
require.NotNil(dispatcher)
require.NotNil(outbounds)
require.NoError(err)
Expand Down Expand Up @@ -329,9 +344,11 @@ func testEventDispatcherOnDeviceEventFilterError(t *testing.T) {
}), zapcore.AddSync(&b), zapcore.ErrorLevel),
),
}
dispatcher, outbounds, err = NewEventDispatcher(NewTestOutboundMeasures(), &o, urlFilter)
)

om, err := NewTestOutboundMeasures()
require.NoError(err)
dispatcher, outbounds, err := NewEventDispatcher(om, &o, urlFilter)
require.NotNil(dispatcher)
require.NotNil(outbounds)
require.NoError(err)
Expand Down Expand Up @@ -411,11 +428,13 @@ func testEventDispatcherOnDeviceEventDispatchTo(t *testing.T) {
t.Logf("%#v, method=%s, format=%s", record, record.outbounder.method(), format)

var (
expectedContents = []byte{4, 7, 8, 1}
urlFilter = new(mockURLFilter)
dispatcher, outbounds, err = NewEventDispatcher(NewTestOutboundMeasures(), record.outbounder, urlFilter)
expectedContents = []byte{4, 7, 8, 1}
urlFilter = new(mockURLFilter)
)

om, err := NewTestOutboundMeasures()
require.NoError(err)
dispatcher, outbounds, err := NewEventDispatcher(om, record.outbounder, urlFilter)
require.NotNil(dispatcher)
require.NotNil(outbounds)
require.NoError(err)
Expand Down Expand Up @@ -468,7 +487,9 @@ func testEventDispatcherOnDeviceEventNilEventError(t *testing.T) {
}), zapcore.AddSync(&b), zapcore.ErrorLevel),
)
o.Logger = logger
dp, _, err := NewEventDispatcher(NewTestOutboundMeasures(), o, nil)
om, err := NewTestOutboundMeasures()
require.NoError(err)
dp, _, err := NewEventDispatcher(om, o, nil)
require.NotNil(dp)
require.NoError(err)
// Purge init logs
Expand All @@ -481,8 +502,11 @@ func testEventDispatcherOnDeviceEventNilEventError(t *testing.T) {

func testEventDispatcherOnDeviceEventEventMapError(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
o := &Outbounder{EventEndpoints: map[string]interface{}{"bad": -17.6}}
dp, _, err := NewEventDispatcher(NewTestOutboundMeasures(), o, nil)
om, err := NewTestOutboundMeasures()
require.NoError(err)
dp, _, err := NewEventDispatcher(om, o, nil)
assert.Nil(dp)
assert.Error(err)
}
Expand Down
Loading

0 comments on commit 4b44b82

Please sign in to comment.