diff --git a/dispatcher.go b/dispatcher.go index 38c2be5..66ba857 100644 --- a/dispatcher.go +++ b/dispatcher.go @@ -25,5 +25,5 @@ type outboundEnvelope struct { cancel func() } -// eventTypeContextKey is the internal key type for storing the event type -type eventTypeContextKey struct{} +// schemeContextKey is the internal key type for storing the event type +type schemeContextKey struct{} diff --git a/eventDispatcher.go b/eventDispatcher.go index 51f68ff..5f27e68 100644 --- a/eventDispatcher.go +++ b/eventDispatcher.go @@ -10,7 +10,6 @@ import ( "io" "net/http" "runtime/debug" - "strings" "time" "go.uber.org/zap" @@ -88,11 +87,8 @@ func NewEventDispatcher(om OutboundMeasures, o *Outbounder, urlFilter URLFilter) func (d *eventDispatcher) OnDeviceEvent(event *device.Event) { // TODO improve how we test dispatchEvent & dispatchTo var ( - err error - message *wrp.Message - eventType = unknown - url = unknown - code = messageDroppedCode + err error + scheme = unknown ) defer func() { @@ -100,9 +96,9 @@ func (d *eventDispatcher) OnDeviceEvent(event *device.Event) { d.logger.Debug("stacktrace from panic", zap.String("stacktrace", string(debug.Stack())), zap.Any("panic", r)) switch event.Type { case device.Connect, device.Disconnect, device.MessageReceived: - d.logger.Error("Dropped message, event not sent", zap.String(eventLabel, eventType), zap.String(codeLabel, code), zap.String(reasonLabel, panicReason), zap.String(urlLabel, url), zap.Any("panic", r)) - d.droppedMessages.With(prometheus.Labels{eventLabel: eventType, codeLabel: code, reasonLabel: panicReason, urlLabel: url}).Add(1.0) - d.outboundEvents.With(prometheus.Labels{eventLabel: eventType, reasonLabel: panicReason, urlLabel: url, outcomeLabel: failureOutcome}).Add(1.0) + d.logger.Error("Dropped message, event not sent", zap.String(schemeLabel, scheme), zap.String(codeLabel, messageDroppedCode), zap.String(reasonLabel, panicReason), zap.Any("panic", r)) + d.droppedMessages.With(prometheus.Labels{schemeLabel: scheme, codeLabel: messageDroppedCode, reasonLabel: panicReason}).Add(1.0) + d.outboundEvents.With(prometheus.Labels{schemeLabel: scheme, reasonLabel: panicReason, outcomeLabel: failureOutcome}).Add(1.0) } } }() @@ -114,70 +110,81 @@ func (d *eventDispatcher) OnDeviceEvent(event *device.Event) { switch event.Type { case device.Connect: - eventType, message = newOnlineMessage(d.source, event.Device) - url, err = d.encodeAndDispatchEvent(eventType, wrp.Msgpack, message) + scheme = wrp.SchemeEvent + eventType, message := newOnlineMessage(d.source, event.Device) + _, err = d.encodeAndDispatchEvent(eventType, wrp.Msgpack, message) if err != nil { d.logger.Error("Error dispatching online event", zap.Any("eventType", eventType), zap.Any("destination", message.Destination), zap.Error(err)) } case device.Disconnect: - eventType, message = newOfflineMessage(d.source, event.Device) - url, err = d.encodeAndDispatchEvent(eventType, wrp.Msgpack, message) + scheme = wrp.SchemeEvent + eventType, message := newOfflineMessage(d.source, event.Device) + _, err = d.encodeAndDispatchEvent(eventType, wrp.Msgpack, message) if err != nil { d.logger.Error("Error dispatching offline event", zap.Any("eventType", eventType), zap.Any("destination", message.Destination), zap.Error(err)) } case device.MessageReceived: - if routable, ok := event.Message.(wrp.Routable); ok { - destination := routable.To() - contentType := event.Format.ContentType() - if strings.HasPrefix(destination, EventPrefix) { - var l wrp.Locator - if l, err = wrp.ParseLocator(destination); err == nil { - eventType = l.Authority - url, err = d.dispatchEvent(eventType, contentType, event.Contents) - if err != nil { - d.logger.Error("Error dispatching event", zap.Any("eventType", eventType), zap.Any("destination", destination), zap.Error(err)) - } - } - } else if strings.HasPrefix(destination, DNSPrefix) { - eventType = event.Type.String() - unfilteredURL := destination[len(DNSPrefix):] - url, err = d.dispatchTo(unfilteredURL, contentType, event.Contents, eventType) - if err != nil { - d.logger.Error("Error dispatching to endpoint", zap.Any("destination", destination), zap.Error(err)) - } - } else { - eventType = event.Type.String() - err = ErrorUnroutableDestination - d.logger.Error("Unroutable destination", zap.Any("destination", destination)) - } + scheme, err = d.routeMessageReceivedEvent(event) + if err != nil { + scheme = unknown } default: - eventType = event.Type.String() err = ErrorUnsupportedEvent - if routable, ok := event.Message.(wrp.Routable); ok { - url = routable.To() - } } var outboundEventsLabels prometheus.Labels if err != nil { reason := getDroppedMessageReason(err) - outboundEventsLabels = prometheus.Labels{eventLabel: eventType, reasonLabel: reason, urlLabel: url, outcomeLabel: failureOutcome} + outboundEventsLabels = prometheus.Labels{schemeLabel: scheme, reasonLabel: reason, outcomeLabel: failureOutcome} if errors.Is(err, ErrorUnsupportedEvent) { - d.logger.Debug("Dropped message, event not sent", zap.String(eventLabel, eventType), zap.String(codeLabel, code), zap.String(reasonLabel, reason), zap.String(urlLabel, url), zap.Error(err)) + d.logger.Debug("Dropped message, event not sent", zap.String(schemeLabel, scheme), zap.String(codeLabel, messageDroppedCode), zap.String(reasonLabel, reason), zap.Error(err)) } else { - d.logger.Error("Dropped message, event not sent", zap.String(eventLabel, eventType), zap.String(codeLabel, code), zap.String(reasonLabel, reason), zap.String(urlLabel, url), zap.Error(err)) + d.logger.Error("Dropped message, event not sent", zap.String(schemeLabel, scheme), zap.String(codeLabel, messageDroppedCode), zap.String(reasonLabel, reason), zap.Error(err)) } - d.droppedMessages.With(prometheus.Labels{eventLabel: eventType, codeLabel: code, reasonLabel: reason, urlLabel: url}).Add(1.0) + d.droppedMessages.With(prometheus.Labels{schemeLabel: scheme, codeLabel: messageDroppedCode, reasonLabel: reason}).Add(1.0) } else { - outboundEventsLabels = prometheus.Labels{eventLabel: eventType, reasonLabel: noErrReason, urlLabel: url, outcomeLabel: successOutcome} + outboundEventsLabels = prometheus.Labels{schemeLabel: scheme, reasonLabel: noErrReason, outcomeLabel: successOutcome} } d.outboundEvents.With(outboundEventsLabels).Add(1.0) } +func (d *eventDispatcher) routeMessageReceivedEvent(event *device.Event) (scheme string, err error) { + routable, ok := event.Message.(wrp.Routable) + if !ok { + return "", errors.New("wrp event message is not routable") + } + + destination := routable.To() + contentType := event.Format.ContentType() + var l wrp.Locator + if l, err = wrp.ParseLocator(destination); err != nil { + return "", err + } + + scheme = l.Scheme + eventType := l.Authority + switch scheme { + case wrp.SchemeEvent: + _, err = d.dispatchEvent(eventType, contentType, event.Contents) + case wrp.SchemeDNS: + url := l.Authority + l.Ignored + // `l.Authority + l.Ignored` is used because incoming dns events are expected to have the format `dns:some_url` or dns:some_scheme://some_url. + _, err = d.dispatchTo(url, contentType, event.Contents) + default: + scheme = unknown + err = ErrorUnroutableDestination + } + + if err != nil { + d.logger.Error("Error dispatching event", zap.String(schemeLabel, scheme), zap.Any("destination", destination), zap.Error(err)) + } + + return scheme, err +} + // send wraps the given request in an outboundEnvelope together with a cancellable context, // then asynchronously sends that request to the outbounds channel. This method will // block on the outbound channel only as long as the context is not canceled, i.e. does not time out. @@ -224,8 +231,8 @@ func (d *eventDispatcher) dispatchEvent(eventType, contentType string, contents } ctx := context.WithValue( - context.Background(), eventTypeContextKey{}, - eventType, + context.Background(), schemeContextKey{}, + wrp.SchemeEvent, ) for _, url = range endpoints { @@ -264,7 +271,7 @@ func (d *eventDispatcher) encodeAndDispatchEvent(eventType string, format wrp.Fo return url, nil } -func (d *eventDispatcher) dispatchTo(unfiltered string, contentType string, contents []byte, eventType string) (string, error) { +func (d *eventDispatcher) dispatchTo(unfiltered string, contentType string, contents []byte) (string, error) { var ( err error url = unfiltered @@ -281,7 +288,8 @@ func (d *eventDispatcher) dispatchTo(unfiltered string, contentType string, cont } return request.URL.String(), d.send( - context.WithValue(context.Background(), eventTypeContextKey{}, eventType), + context.WithValue(context.Background(), schemeContextKey{}, + wrp.SchemeDNS), request, ) } diff --git a/eventDispatcher_test.go b/eventDispatcher_test.go index b12be6a..41a2e5d 100644 --- a/eventDispatcher_test.go +++ b/eventDispatcher_test.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io" + "math" "testing" "time" @@ -122,8 +123,6 @@ func testEventDispatcherOnDeviceEventBadURLFilter(t *testing.T) { func testEventDispatcherOnDeviceEventDispatchEvent(t *testing.T) { var ( - assert = assert.New(t) - require = require.New(t) testData = []struct { outbounder *Outbounder destination string @@ -193,54 +192,59 @@ func testEventDispatcherOnDeviceEventDispatchEvent(t *testing.T) { ) for _, record := range testData { - for _, format := range []wrp.Format{wrp.Msgpack, wrp.JSON} { - t.Logf("%#v, method=%s, format=%s", record, record.outbounder.method(), format) - - var ( - expectedContents = []byte{1, 2, 3, 4} - urlFilter = new(mockURLFilter) - ) - - om, err := NewTestOutboundMeasures() - require.NoError(err) - dispatcher, outbounds, err := NewEventDispatcher(om, record.outbounder, urlFilter) - require.NotNil(dispatcher) - require.NotNil(outbounds) - require.NoError(err) - - dispatcher.OnDeviceEvent(&device.Event{ - Type: device.MessageReceived, - Message: &wrp.Message{Destination: record.destination}, - Format: format, - Contents: expectedContents, - }) - - assert.Equal(len(record.expectedEndpoints), len(outbounds), "incorrect envelope count") - actualEndpoints := make(map[string]bool, len(record.expectedEndpoints)) - for len(outbounds) > 0 { - select { - case e := <-outbounds: - e.cancel() - <-e.request.Context().Done() - - assert.Equal(record.outbounder.method(), e.request.Method) - assert.Equal(format.ContentType(), e.request.Header.Get("Content-Type")) - - urlString := e.request.URL.String() - assert.False(actualEndpoints[urlString]) - actualEndpoints[urlString] = true - - actualContents, err := io.ReadAll(e.request.Body) - assert.NoError(err) - assert.Equal(expectedContents, actualContents) - - default: + t.Run(record.destination, func(t *testing.T) { + for _, format := range []wrp.Format{wrp.Msgpack, wrp.JSON} { + assert := assert.New(t) + require := require.New(t) + + t.Logf("%#v, method=%s, format=%s", record, record.outbounder.method(), format) + + var ( + expectedContents = []byte{1, 2, 3, 4} + urlFilter = new(mockURLFilter) + ) + + om, err := NewTestOutboundMeasures() + require.NoError(err) + dispatcher, outbounds, err := NewEventDispatcher(om, record.outbounder, urlFilter) + require.NotNil(dispatcher) + require.NotNil(outbounds) + require.NoError(err) + + dispatcher.OnDeviceEvent(&device.Event{ + Type: device.MessageReceived, + Message: &wrp.Message{Destination: record.destination}, + Format: format, + Contents: expectedContents, + }) + + assert.Equal(len(record.expectedEndpoints), len(outbounds), "incorrect envelope count") + actualEndpoints := make(map[string]bool, len(record.expectedEndpoints)) + for len(outbounds) > 0 { + select { + case e := <-outbounds: + e.cancel() + <-e.request.Context().Done() + + assert.Equal(record.outbounder.method(), e.request.Method) + assert.Equal(format.ContentType(), e.request.Header.Get("Content-Type")) + + urlString := e.request.URL.String() + assert.False(actualEndpoints[urlString]) + actualEndpoints[urlString] = true + + actualContents, err := io.ReadAll(e.request.Body) + assert.NoError(err) + assert.Equal(expectedContents, actualContents) + + default: + } } - } - assert.Equal(record.expectedEndpoints, actualEndpoints) - urlFilter.AssertExpectations(t) - } + assert.Equal(record.expectedEndpoints, actualEndpoints) + urlFilter.AssertExpectations(t) + } + }) } } @@ -273,7 +277,7 @@ func testEventDispatcherOnDeviceEventFullQueue(t *testing.T) { require.NoError(err) d.(*eventDispatcher).outbounds = make(chan outboundEnvelope) - dm.On("With", prometheus.Labels{eventLabel: expectedEventType, codeLabel: messageDroppedCode, reasonLabel: fullQueueReason, urlLabel: "nowhere.com"}).Return().Once() + dm.On("With", prometheus.Labels{schemeLabel: unknown, codeLabel: messageDroppedCode, reasonLabel: fullQueueReason}).Return().Once() dm.On("Add", 1.).Return().Once() d.OnDeviceEvent(&device.Event{ Type: device.MessageReceived, @@ -284,47 +288,100 @@ func testEventDispatcherOnDeviceEventFullQueue(t *testing.T) { dm.AssertExpectations(t) } -func testEventDispatcherOnDeviceEventMessageReceived(t *testing.T) { - var ( - assert = assert.New(t) - require = require.New(t) - b bytes.Buffer - expectedEventType = "node-change" - m = wrp.Message{Destination: fmt.Sprintf("event:%s/mac:11:22:33:44:55:66/Online/unknown/deb2eb69999", expectedEventType)} - o = Outbounder{ - Method: "PATCH", - EventEndpoints: map[string]interface{}{"default": []string{"nowhere.com"}}, - Logger: zap.New( - zapcore.NewCore(zapcore.NewJSONEncoder( - zapcore.EncoderConfig{ - MessageKey: "message", - }), zapcore.AddSync(&b), zapcore.ErrorLevel), - ), - } - ) +func testEventDispatcherEventTypes(t *testing.T) { - om, err := NewTestOutboundMeasures() - require.NoError(err) - dispatcher, outbounds, err := NewEventDispatcher(om, &o, nil) - require.NotNil(dispatcher) - require.NotNil(outbounds) - require.NoError(err) + tests := []struct { + description string + event device.Event + expectedScheme string + }{ + { + description: "event", + event: device.Event{ + Type: device.MessageReceived, + Message: &wrp.Message{Destination: "event:node-change/mac:11:22:33:44:55:66/Online/unknown/deb2eb69999"}, + }, + expectedScheme: wrp.SchemeEvent, + }, + { + description: "dns", + event: device.Event{ + Type: device.MessageReceived, + Message: &wrp.Message{Destination: "dns:node-change/mac:11:22:33:44:55:66/Online/unknown/deb2eb69999"}, + }, + expectedScheme: wrp.SchemeDNS, + }, + { + description: "device-status: Connect", + event: device.Event{ + Type: device.Connect, + }, + expectedScheme: wrp.SchemeEvent, + }, + { + description: "device-status: Disconnect", + event: device.Event{ + Type: device.Disconnect, + }, + expectedScheme: wrp.SchemeEvent, + }, + // Note, event types MessageSent, MessageFailed, TransactionComplete and TransactionBroken are always dropped. + } - dispatcher.OnDeviceEvent(&device.Event{ - Type: device.MessageReceived, - Message: &m, - }) + for _, tc := range tests { + t.Run(tc.description, func(t *testing.T) { + + var ( + assert = assert.New(t) + require = require.New(t) + d = new(device.MockDevice) + deviceMetadata = genTestMetadata() + b bytes.Buffer + o = Outbounder{ + Method: "PATCH", + EventEndpoints: map[string]interface{}{"default": []string{"nowhere.com"}}, + Logger: zap.New( + zapcore.NewCore(zapcore.NewJSONEncoder( + zapcore.EncoderConfig{ + MessageKey: "message", + }), zapcore.AddSync(&b), zapcore.ErrorLevel), + ), + } + ) + + switch tc.event.Type { + case device.Connect, device.Disconnect: + d.On("ID").Return(device.ID("mac:123412341234")) + d.On("Metadata").Return(deviceMetadata) + d.On("Convey").Return(convey.C(nil)) + if tc.event.Type == device.Disconnect { + d.On("Statistics").Return(device.NewStatistics(nil, time.Now())) + d.On("CloseReason").Return(device.CloseReason{}) + } + } - require.Equal(1, len(outbounds)) - e := <-outbounds - e.cancel() - <-e.request.Context().Done() + om, err := NewTestOutboundMeasures() + require.NoError(err) + dispatcher, outbounds, err := NewEventDispatcher(om, &o, nil) + require.NotNil(dispatcher) + require.NotNil(outbounds) + require.NoError(err) + + tc.event.Device = d + dispatcher.OnDeviceEvent(&tc.event) + + require.Equal(1, len(outbounds)) + e := <-outbounds + e.cancel() + <-e.request.Context().Done() - assert.Equal(o.method(), e.request.Method) - assert.Zero(b) - eventType, ok := e.request.Context().Value(eventTypeContextKey{}).(string) - require.True(ok) - assert.Equal(expectedEventType, eventType) + assert.Equal(o.method(), e.request.Method) + assert.Zero(b) + scheme, ok := e.request.Context().Value(schemeContextKey{}).(string) + require.True(ok) + assert.Equal(tc.expectedScheme, scheme) + }) + } } func testEventDispatcherOnDeviceEventFilterError(t *testing.T) { @@ -511,13 +568,163 @@ func testEventDispatcherOnDeviceEventEventMapError(t *testing.T) { assert.Error(err) } +func testEventDispatcherMetrics(t *testing.T) { + url := "nowhere.com" + tests := []struct { + description string + event device.Event + expectedDropMessageLabels prometheus.Labels + expectedOutboundEventsLabels prometheus.Labels + expectedOutboundTotal int + unsupportedEventType bool + }{ + { + description: "Connect", + event: device.Event{Type: device.Connect}, + expectedOutboundEventsLabels: prometheus.Labels{schemeLabel: wrp.SchemeEvent, reasonLabel: noErrReason, outcomeLabel: successOutcome}, + expectedOutboundTotal: 1, + }, + { + description: "Disconnect", + event: device.Event{Type: device.Disconnect}, + expectedOutboundEventsLabels: prometheus.Labels{schemeLabel: wrp.SchemeEvent, reasonLabel: noErrReason, outcomeLabel: successOutcome}, + expectedOutboundTotal: 1, + }, + { + description: "MessageReceived: event scheme", + event: device.Event{Type: device.MessageReceived, Message: &wrp.Message{Destination: "event:custom-event"}}, + expectedOutboundEventsLabels: prometheus.Labels{schemeLabel: wrp.SchemeEvent, reasonLabel: noErrReason, outcomeLabel: successOutcome}, + expectedOutboundTotal: 1, + }, + { + description: "MessageReceived: dns scheme", + event: device.Event{Type: device.MessageReceived, Message: &wrp.Message{Destination: "dns:some_url/custom-event"}}, + expectedOutboundEventsLabels: prometheus.Labels{schemeLabel: wrp.SchemeDNS, reasonLabel: noErrReason, outcomeLabel: successOutcome}, + expectedOutboundTotal: 1, + }, + { + description: "MessageReceived: dns scheme (with https protocol)", + event: device.Event{Type: device.MessageReceived, Message: &wrp.Message{Destination: "dns:https://some_url/custom-event"}}, + expectedOutboundEventsLabels: prometheus.Labels{schemeLabel: wrp.SchemeDNS, reasonLabel: noErrReason, outcomeLabel: successOutcome}, + expectedOutboundTotal: 1, + }, + { + description: "MessageSent", + event: device.Event{Type: device.MessageSent}, + expectedDropMessageLabels: prometheus.Labels{schemeLabel: unknown, codeLabel: messageDroppedCode, reasonLabel: notSupportedEventReason}, + expectedOutboundEventsLabels: prometheus.Labels{schemeLabel: unknown, reasonLabel: notSupportedEventReason, outcomeLabel: failureOutcome}, + unsupportedEventType: true, + }, + { + description: "MessageFailed", + event: device.Event{Type: device.MessageFailed}, + expectedDropMessageLabels: prometheus.Labels{schemeLabel: unknown, codeLabel: messageDroppedCode, reasonLabel: notSupportedEventReason}, + expectedOutboundEventsLabels: prometheus.Labels{schemeLabel: unknown, reasonLabel: notSupportedEventReason, outcomeLabel: failureOutcome}, + unsupportedEventType: true, + }, + { + description: "TransactionComplete", + event: device.Event{Type: device.TransactionComplete}, + expectedDropMessageLabels: prometheus.Labels{schemeLabel: unknown, codeLabel: messageDroppedCode, reasonLabel: notSupportedEventReason}, + expectedOutboundEventsLabels: prometheus.Labels{schemeLabel: unknown, reasonLabel: notSupportedEventReason, outcomeLabel: failureOutcome}, + unsupportedEventType: true, + }, + { + description: "TransactionBroken", + event: device.Event{Type: device.TransactionBroken}, + expectedDropMessageLabels: prometheus.Labels{schemeLabel: unknown, codeLabel: messageDroppedCode, reasonLabel: notSupportedEventReason}, + expectedOutboundEventsLabels: prometheus.Labels{schemeLabel: unknown, reasonLabel: notSupportedEventReason, outcomeLabel: failureOutcome}, + unsupportedEventType: true, + }, + { + description: "Nonexistent positive event type", + event: device.Event{Type: device.EventType(math.MaxUint8)}, + expectedDropMessageLabels: prometheus.Labels{schemeLabel: unknown, codeLabel: messageDroppedCode, reasonLabel: notSupportedEventReason}, + expectedOutboundEventsLabels: prometheus.Labels{schemeLabel: unknown, reasonLabel: notSupportedEventReason, outcomeLabel: failureOutcome}, + unsupportedEventType: true, + }, + } + + for _, tc := range tests { + t.Run(tc.description, func(t *testing.T) { + var ( + assert = assert.New(t) + require = require.New(t) + d = new(device.MockDevice) + b = bytes.Buffer{} + o = Outbounder{ + Method: "PATCH", + EventEndpoints: map[string]interface{}{"default": []string{url}}, + Logger: zap.New( + zapcore.NewCore(zapcore.NewJSONEncoder( + zapcore.EncoderConfig{ + MessageKey: "message", + }), zapcore.AddSync(&b), zapcore.ErrorLevel), + ), + } + + dropMessagesCounter = new(mockCounter) + outboundEventsCounter = new(mockCounter) + deviceMetadata = genTestMetadata() + ) + + om, err := NewTestOutboundMeasures() + require.NoError(err) + + om.DroppedMessages = dropMessagesCounter + om.OutboundEvents = outboundEventsCounter + dispatcher, outbounds, err := NewEventDispatcher(om, &o, nil) + + require.NotNil(dispatcher) + require.NotNil(outbounds) + require.NoError(err) + + switch tc.event.Type { + case device.Connect, device.Disconnect: + d.On("ID").Return(device.ID("mac:123412341234")) + d.On("Metadata").Return(deviceMetadata) + d.On("Convey").Return(convey.C(nil)) + if tc.event.Type == device.Disconnect { + d.On("Statistics").Return(device.NewStatistics(nil, time.Now())) + d.On("CloseReason").Return(device.CloseReason{}) + } + } + + if tc.unsupportedEventType { + require.NotEmpty(tc.expectedDropMessageLabels, "expectedDropMessageLabels should not be empty") + + dropMessagesCounter.On("With", tc.expectedDropMessageLabels).Return().Once() + dropMessagesCounter.On("Add", 1.).Return().Once() + } else { + dropMessagesCounter.On("With", tc.expectedDropMessageLabels).Panic("Func dropMessagesCounter.With should have not been called") + dropMessagesCounter.On("Add", 1.).Panic("Func dropMessagesCounter.Add should have not been called") + } + + require.NotEmpty(tc.expectedOutboundEventsLabels, "expectedOutboundEventsLabels should not be empty") + outboundEventsCounter.On("With", tc.expectedOutboundEventsLabels).Return().Once() + outboundEventsCounter.On("Add", 1.).Return().Once() + + tc.event.Device = d + dispatcher.OnDeviceEvent(&tc.event) + assert.Equal(tc.expectedOutboundTotal, len(outbounds)) + assert.Zero(b) + + d.AssertExpectations(t) + outboundEventsCounter.AssertExpectations(t) + if tc.unsupportedEventType { + dropMessagesCounter.AssertExpectations(t) + } + }) + } +} + func TestEventDispatcherOnDeviceEvent(t *testing.T) { tests := []struct { description string test func(*testing.T) }{ {"ConnectEvent", testEventDispatcherOnDeviceEventConnectEvent}, - {"CorrectEventType", testEventDispatcherOnDeviceEventMessageReceived}, + {"EventTypes", testEventDispatcherEventTypes}, {"DisconnectEvent", testEventDispatcherOnDeviceEventDisconnectEvent}, {"Unroutable", testEventDispatcherOnDeviceEventUnroutable}, {"BadURLFilter", testEventDispatcherOnDeviceEventBadURLFilter}, @@ -527,6 +734,7 @@ func TestEventDispatcherOnDeviceEvent(t *testing.T) { {"DispatchTo", testEventDispatcherOnDeviceEventDispatchTo}, {"NilEventError", testEventDispatcherOnDeviceEventNilEventError}, {"EventMapError", testEventDispatcherOnDeviceEventEventMapError}, + {"EventDispatcherMetrics", testEventDispatcherMetrics}, } for _, tc := range tests { diff --git a/metrics.go b/metrics.go index 85e5c4e..48be8dc 100644 --- a/metrics.go +++ b/metrics.go @@ -46,9 +46,8 @@ const ( qosLevelLabel = "qos_level" partnerIDLabel = "partner_id" messageType = "message_type" - urlLabel = "url" codeLabel = "code" - eventLabel = "event" + schemeLabel = "scheme" ) // label values @@ -171,7 +170,7 @@ func NewOutboundMeasures(tf *touchstone.Factory) (om OutboundMeasures, errs erro NativeHistogramMaxExemplars: -1, NativeHistogramExemplarTTL: time.Minute * 5, }, - []string{eventLabel, codeLabel, reasonLabel, urlLabel}..., + []string{schemeLabel, codeLabel, reasonLabel}..., ) errs = errors.Join(errs, err) @@ -180,7 +179,7 @@ func NewOutboundMeasures(tf *touchstone.Factory) (om OutboundMeasures, errs erro Name: OutboundRequestCounter, Help: "The count of outbound requests", }, - []string{eventLabel, codeLabel, reasonLabel, urlLabel}..., + []string{schemeLabel, codeLabel, reasonLabel}..., ) errs = errors.Join(errs, err) @@ -201,7 +200,7 @@ func NewOutboundMeasures(tf *touchstone.Factory) (om OutboundMeasures, errs erro NativeHistogramMaxExemplars: -1, NativeHistogramExemplarTTL: time.Minute * 5, }, - []string{eventLabel, codeLabel}..., + []string{schemeLabel, codeLabel}..., ) errs = errors.Join(errs, err) @@ -210,7 +209,7 @@ func NewOutboundMeasures(tf *touchstone.Factory) (om OutboundMeasures, errs erro Name: TotalOutboundEvents, Help: "Total count of outbound events", }, - []string{eventLabel, reasonLabel, urlLabel, outcomeLabel}..., + []string{schemeLabel, reasonLabel, outcomeLabel}..., ) errs = errors.Join(errs, err) @@ -235,7 +234,7 @@ func NewOutboundMeasures(tf *touchstone.Factory) (om OutboundMeasures, errs erro Name: OutboundDroppedMessageCounter, Help: "The total count of messages dropped", }, - []string{eventLabel, codeLabel, reasonLabel, urlLabel}..., + []string{schemeLabel, codeLabel, reasonLabel}..., ) errs = errors.Join(errs, err) @@ -303,9 +302,9 @@ func NewOutboundMeasures(tf *touchstone.Factory) (om OutboundMeasures, errs erro func InstrumentOutboundSize(obs HistogramVec, next http.RoundTripper) promhttp.RoundTripperFunc { return promhttp.RoundTripperFunc(func(request *http.Request) (*http.Response, error) { - eventType, ok := request.Context().Value(eventTypeContextKey{}).(string) + scheme, ok := request.Context().Value(schemeContextKey{}).(string) if !ok { - eventType = unknown + scheme = unknown } response, err := next.RoundTrip(request) @@ -318,9 +317,9 @@ func InstrumentOutboundSize(obs HistogramVec, next http.RoundTripper) promhttp.R code = strconv.Itoa(response.StatusCode) } - labels = prometheus.Labels{eventLabel: eventType, codeLabel: code} + labels = prometheus.Labels{schemeLabel: scheme, codeLabel: code} } else { - labels = prometheus.Labels{eventLabel: eventType, codeLabel: strconv.Itoa(response.StatusCode)} + labels = prometheus.Labels{schemeLabel: scheme, codeLabel: strconv.Itoa(response.StatusCode)} } obs.With(labels).Observe(float64(size)) @@ -331,9 +330,9 @@ func InstrumentOutboundSize(obs HistogramVec, next http.RoundTripper) promhttp.R func InstrumentOutboundDuration(obs HistogramVec, next http.RoundTripper) promhttp.RoundTripperFunc { return promhttp.RoundTripperFunc(func(request *http.Request) (*http.Response, error) { - eventType, ok := request.Context().Value(eventTypeContextKey{}).(string) + scheme, ok := request.Context().Value(schemeContextKey{}).(string) if !ok { - eventType = unknown + scheme = unknown } start := time.Now() @@ -347,9 +346,9 @@ func InstrumentOutboundDuration(obs HistogramVec, next http.RoundTripper) promht code = strconv.Itoa(response.StatusCode) } - labels = prometheus.Labels{eventLabel: eventType, codeLabel: code, reasonLabel: getDoErrReason(err), urlLabel: request.URL.String()} + labels = prometheus.Labels{schemeLabel: scheme, codeLabel: code, reasonLabel: getDoErrReason(err)} } else { - labels = prometheus.Labels{eventLabel: eventType, codeLabel: strconv.Itoa(response.StatusCode), reasonLabel: expectedCodeReason, urlLabel: request.URL.String()} + labels = prometheus.Labels{schemeLabel: scheme, codeLabel: strconv.Itoa(response.StatusCode), reasonLabel: expectedCodeReason} if response.StatusCode != http.StatusAccepted { labels[reasonLabel] = non202CodeReason } @@ -363,9 +362,9 @@ func InstrumentOutboundDuration(obs HistogramVec, next http.RoundTripper) promht func InstrumentOutboundCounter(counter CounterVec, next http.RoundTripper) promhttp.RoundTripperFunc { return promhttp.RoundTripperFunc(func(request *http.Request) (*http.Response, error) { - eventType, ok := request.Context().Value(eventTypeContextKey{}).(string) + scheme, ok := request.Context().Value(schemeContextKey{}).(string) if !ok { - eventType = unknown + scheme = unknown } response, err := next.RoundTrip(request) @@ -377,9 +376,9 @@ func InstrumentOutboundCounter(counter CounterVec, next http.RoundTripper) promh code = strconv.Itoa(response.StatusCode) } - labels = prometheus.Labels{eventLabel: eventType, codeLabel: code, reasonLabel: getDoErrReason(err), urlLabel: request.URL.String()} + labels = prometheus.Labels{schemeLabel: scheme, codeLabel: code, reasonLabel: getDoErrReason(err)} } else { - labels = prometheus.Labels{eventLabel: eventType, codeLabel: strconv.Itoa(response.StatusCode), reasonLabel: noErrReason, urlLabel: request.URL.String()} + labels = prometheus.Labels{schemeLabel: scheme, codeLabel: strconv.Itoa(response.StatusCode), reasonLabel: noErrReason} if response.StatusCode != http.StatusAccepted { labels[reasonLabel] = non202CodeReason } diff --git a/outbounder.go b/outbounder.go index 27621c2..4821957 100644 --- a/outbounder.go +++ b/outbounder.go @@ -22,12 +22,6 @@ const ( // OutbounderKey is the Viper subkey which is expected to hold Outbounder configuration OutbounderKey = "device.outbound" - // EventPrefix is the string prefix for WRP destinations that should be treated as events - EventPrefix = "event:" - - // DNSPrefix is the string prefix for WRP destinations that should be treated as exact URLs - DNSPrefix = "dns:" - DefaultDefaultScheme = "https" DefaultAllowedScheme = "https" diff --git a/workerPool.go b/workerPool.go index 2df1692..4117117 100644 --- a/workerPool.go +++ b/workerPool.go @@ -60,44 +60,41 @@ func (wp *WorkerPool) Run() { func (wp *WorkerPool) transact(e outboundEnvelope) { defer e.cancel() - eventType, ok := e.request.Context().Value(eventTypeContextKey{}).(string) + scheme, ok := e.request.Context().Value(schemeContextKey{}).(string) if !ok { - eventType = unknown + scheme = unknown } // bail out early if the request has been on the queue too long if err := e.request.Context().Err(); err != nil { - url := e.request.URL.String() reason := getDroppedMessageReason(err) - wp.droppedMessages.With(prometheus.Labels{eventLabel: eventType, codeLabel: messageDroppedCode, reasonLabel: reason, urlLabel: url}).Add(1) - wp.logger.Error("Outbound message expired while on queue", zap.String("event", eventType), zap.String("reason", reason), zap.Error(err), zap.String("url", url)) + wp.droppedMessages.With(prometheus.Labels{schemeLabel: scheme, codeLabel: messageDroppedCode, reasonLabel: reason}).Add(1) + wp.logger.Error("Outbound message expired while on queue", zap.String(schemeLabel, scheme), zap.String("reason", reason), zap.Error(err)) return } response, err := wp.transactor(e.request) if err != nil { - url := e.request.URL.String() reason := getDroppedMessageReason(err) code := messageDroppedCode if response != nil { code = strconv.Itoa(response.StatusCode) } - wp.droppedMessages.With(prometheus.Labels{eventLabel: eventType, codeLabel: code, reasonLabel: reason, urlLabel: url}).Add(1) - wp.logger.Error("HTTP transaction error", zap.String(eventLabel, eventType), zap.String(codeLabel, code), zap.String(reasonLabel, reason), zap.Error(err), zap.String(urlLabel, url)) + wp.droppedMessages.With(prometheus.Labels{schemeLabel: scheme, codeLabel: code, reasonLabel: reason}).Add(1) + wp.logger.Error("HTTP transaction error", zap.String(schemeLabel, scheme), zap.String(codeLabel, code), zap.String(reasonLabel, reason), zap.Error(err)) return } code := strconv.Itoa(response.StatusCode) - url := e.request.URL.String() switch response.StatusCode { case http.StatusAccepted: - wp.logger.Debug("HTTP response", zap.String("status", response.Status), zap.String(eventLabel, eventType), zap.String(codeLabel, code), zap.String(reasonLabel, expectedCodeReason), zap.String(urlLabel, url)) + wp.logger.Debug("HTTP response", zap.String("status", response.Status), zap.String(schemeLabel, scheme), zap.String(codeLabel, code), zap.String(reasonLabel, expectedCodeReason)) default: - wp.droppedMessages.With(prometheus.Labels{eventLabel: eventType, codeLabel: code, reasonLabel: non202CodeReason, urlLabel: url}).Add(1) - wp.logger.Warn("HTTP response", zap.String(eventLabel, eventType), zap.String(codeLabel, code), zap.String(reasonLabel, non202CodeReason), zap.String(urlLabel, url)) + wp.droppedMessages.With(prometheus.Labels{schemeLabel: scheme, codeLabel: code, reasonLabel: non202CodeReason}).Add(1) + wp.logger.Warn("HTTP response", zap.String(schemeLabel, scheme), zap.String(codeLabel, code), zap.String(reasonLabel, non202CodeReason)) } io.Copy(io.Discard, response.Body) diff --git a/workerPool_test.go b/workerPool_test.go index 8fd8234..e2dd896 100644 --- a/workerPool_test.go +++ b/workerPool_test.go @@ -14,6 +14,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/xmidt-org/wrp-go/v3" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) @@ -30,7 +31,7 @@ func testWorkerPoolTransactHTTPSuccess(t *testing.T) { }), zapcore.AddSync(&b), zapcore.ErrorLevel), ) target = "http://localhost/foo" - expectedRequest = httptest.NewRequest("POST", target, nil).WithContext(context.WithValue(context.Background(), eventTypeContextKey{}, EventPrefix)) + expectedRequest = httptest.NewRequest("POST", target, nil).WithContext(context.WithValue(context.Background(), schemeContextKey{}, wrp.SchemeEvent)) envelope = outboundEnvelope{expectedRequest, func() {}} dm = new(mockCounter) wp = &WorkerPool{ @@ -48,7 +49,7 @@ func testWorkerPoolTransactHTTPSuccess(t *testing.T) { } ) - dm.On("With", prometheus.Labels{eventLabel: EventPrefix, codeLabel: strconv.Itoa(http.StatusAccepted), reasonLabel: non202CodeReason, urlLabel: target}).Panic("Func dm.With should have not been called") + dm.On("With", prometheus.Labels{schemeLabel: wrp.SchemeEvent, codeLabel: strconv.Itoa(http.StatusAccepted), reasonLabel: non202CodeReason}).Panic("Func dm.With should have not been called") dm.On("Add", 1.).Panic("Func dm.Add should have not been called") require.NotPanics(func() { wp.transact(envelope) }) assert.Equal(b.Len(), 0) @@ -58,7 +59,7 @@ func testWorkerPoolTransactHTTPError(t *testing.T) { var ( assert = assert.New(t) target = "http://localhost/foo" - expectedRequest = httptest.NewRequest("POST", target, nil).WithContext(context.WithValue(context.Background(), eventTypeContextKey{}, EventPrefix)) + expectedRequest = httptest.NewRequest("POST", target, nil).WithContext(context.WithValue(context.Background(), schemeContextKey{}, wrp.SchemeEvent)) envelope = outboundEnvelope{expectedRequest, func() {}} tests = []struct { description string @@ -160,7 +161,7 @@ func testWorkerPoolTransactHTTPError(t *testing.T) { ) dm := new(mockCounter) tc.wp.droppedMessages = dm - dm.On("With", prometheus.Labels{eventLabel: EventPrefix, codeLabel: strconv.Itoa(tc.expectedCode), reasonLabel: tc.expectedReason, urlLabel: target}).Return().Once() + dm.On("With", prometheus.Labels{schemeLabel: wrp.SchemeEvent, codeLabel: strconv.Itoa(tc.expectedCode), reasonLabel: tc.expectedReason}).Return().Once() dm.On("Add", 1.).Return().Once() tc.wp.transact(envelope) assert.Greater(b.Len(), 0)