Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics for PeersUpdateManager #1310

Merged
merged 2 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/cmd/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func startManagement(t *testing.T, config *mgmt.Config) (*grpc.Server, net.Liste
t.Fatal(err)
}

peersUpdateManager := mgmt.NewPeersUpdateManager()
peersUpdateManager := mgmt.NewPeersUpdateManager(nil)
eventStore := &activity.InMemoryEventStore{}
if err != nil {
return nil, nil
Expand Down
2 changes: 1 addition & 1 deletion client/internal/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1044,7 +1044,7 @@ func startManagement(dataDir string) (*grpc.Server, string, error) {
return nil, "", err
}

peersUpdateManager := server.NewPeersUpdateManager()
peersUpdateManager := server.NewPeersUpdateManager(nil)
eventStore := &activity.InMemoryEventStore{}
if err != nil {
return nil, "", err
Expand Down
2 changes: 1 addition & 1 deletion management/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) {
t.Fatal(err)
}

peersUpdateManager := mgmt.NewPeersUpdateManager()
peersUpdateManager := mgmt.NewPeersUpdateManager(nil)
eventStore := &activity.InMemoryEventStore{}
accountManager, err := mgmt.BuildManager(store, peersUpdateManager, nil, "", "",
eventStore, false)
Expand Down
2 changes: 1 addition & 1 deletion management/cmd/management.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ var (
if err != nil {
return fmt.Errorf("failed creating Store: %s: %v", config.Datadir, err)
}
peersUpdateManager := server.NewPeersUpdateManager()
peersUpdateManager := server.NewPeersUpdateManager(appMetrics)

var idpManager idp.Manager
if config.IdpManagerConfig != nil {
Expand Down
2 changes: 1 addition & 1 deletion management/server/account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2047,7 +2047,7 @@ func createManager(t *testing.T) (*DefaultAccountManager, error) {
return nil, err
}
eventStore := &activity.InMemoryEventStore{}
return BuildManager(store, NewPeersUpdateManager(), nil, "", "netbird.cloud", eventStore, false)
return BuildManager(store, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, false)
}

func createStore(t *testing.T) (Store, error) {
Expand Down
2 changes: 1 addition & 1 deletion management/server/dns_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func createDNSManager(t *testing.T) (*DefaultAccountManager, error) {
return nil, err
}
eventStore := &activity.InMemoryEventStore{}
return BuildManager(store, NewPeersUpdateManager(), nil, "", "netbird.test", eventStore, false)
return BuildManager(store, NewPeersUpdateManager(nil), nil, "", "netbird.test", eventStore, false)
}

func createDNSStore(t *testing.T) (Store, error) {
Expand Down
2 changes: 1 addition & 1 deletion management/server/management_proto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func startManagement(t *testing.T, config *Config) (*grpc.Server, string, error)
if err != nil {
return nil, "", err
}
peersUpdateManager := NewPeersUpdateManager()
peersUpdateManager := NewPeersUpdateManager(nil)
eventStore := &activity.InMemoryEventStore{}
accountManager, err := BuildManager(store, peersUpdateManager, nil, "", "",
eventStore, false)
Expand Down
2 changes: 1 addition & 1 deletion management/server/management_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,7 @@ func startServer(config *server.Config) (*grpc.Server, net.Listener) {
if err != nil {
log.Fatalf("failed creating a store: %s: %v", config.Datadir, err)
}
peersUpdateManager := server.NewPeersUpdateManager()
peersUpdateManager := server.NewPeersUpdateManager(nil)
eventStore := &activity.InMemoryEventStore{}
accountManager, err := server.BuildManager(store, peersUpdateManager, nil, "", "",
eventStore, false)
Expand Down
2 changes: 1 addition & 1 deletion management/server/nameserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@ func createNSManager(t *testing.T) (*DefaultAccountManager, error) {
return nil, err
}
eventStore := &activity.InMemoryEventStore{}
return BuildManager(store, NewPeersUpdateManager(), nil, "", "", eventStore, false)
return BuildManager(store, NewPeersUpdateManager(nil), nil, "", "", eventStore, false)
}

func createNSStore(t *testing.T) (Store, error) {
Expand Down
2 changes: 1 addition & 1 deletion management/server/route_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,7 +1013,7 @@ func createRouterManager(t *testing.T) (*DefaultAccountManager, error) {
return nil, err
}
eventStore := &activity.InMemoryEventStore{}
return BuildManager(store, NewPeersUpdateManager(), nil, "", "", eventStore, false)
return BuildManager(store, NewPeersUpdateManager(nil), nil, "", "", eventStore, false)
}

func createRouterStore(t *testing.T) (Store, error) {
Expand Down
60 changes: 44 additions & 16 deletions management/server/telemetry/app_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ const defaultEndpoint = "/metrics"

// MockAppMetrics mocks the AppMetrics interface
type MockAppMetrics struct {
GetMeterFunc func() metric2.Meter
CloseFunc func() error
ExposeFunc func(port int, endpoint string) error
IDPMetricsFunc func() *IDPMetrics
HTTPMiddlewareFunc func() *HTTPMiddleware
GRPCMetricsFunc func() *GRPCMetrics
StoreMetricsFunc func() *StoreMetrics
GetMeterFunc func() metric2.Meter
CloseFunc func() error
ExposeFunc func(port int, endpoint string) error
IDPMetricsFunc func() *IDPMetrics
HTTPMiddlewareFunc func() *HTTPMiddleware
GRPCMetricsFunc func() *GRPCMetrics
StoreMetricsFunc func() *StoreMetrics
UpdateChannelMetricsFunc func() *UpdateChannelMetrics
}

// GetMeter mocks the GetMeter function of the AppMetrics interface
Expand Down Expand Up @@ -85,6 +86,14 @@ func (mock *MockAppMetrics) StoreMetrics() *StoreMetrics {
return nil
}

// UpdateChannelMetrics mocks the MockAppMetrics function of the UpdateChannelMetrics interface
func (mock *MockAppMetrics) UpdateChannelMetrics() *UpdateChannelMetrics {
if mock.UpdateChannelMetricsFunc != nil {
return mock.UpdateChannelMetricsFunc()
}
return nil
}

// AppMetrics is metrics interface
type AppMetrics interface {
GetMeter() metric2.Meter
Expand All @@ -94,18 +103,20 @@ type AppMetrics interface {
HTTPMiddleware() *HTTPMiddleware
GRPCMetrics() *GRPCMetrics
StoreMetrics() *StoreMetrics
UpdateChannelMetrics() *UpdateChannelMetrics
}

// defaultAppMetrics are core application metrics based on OpenTelemetry https://opentelemetry.io/
type defaultAppMetrics struct {
// Meter can be used by different application parts to create counters and measure things
Meter metric2.Meter
listener net.Listener
ctx context.Context
idpMetrics *IDPMetrics
httpMiddleware *HTTPMiddleware
grpcMetrics *GRPCMetrics
storeMetrics *StoreMetrics
Meter metric2.Meter
listener net.Listener
ctx context.Context
idpMetrics *IDPMetrics
httpMiddleware *HTTPMiddleware
grpcMetrics *GRPCMetrics
storeMetrics *StoreMetrics
updateChannelMetrics *UpdateChannelMetrics
}

// IDPMetrics returns metrics for the idp package
Expand All @@ -128,6 +139,11 @@ func (appMetrics *defaultAppMetrics) StoreMetrics() *StoreMetrics {
return appMetrics.storeMetrics
}

// UpdateChannelMetrics returns metrics for the updatechannel
func (appMetrics *defaultAppMetrics) UpdateChannelMetrics() *UpdateChannelMetrics {
return appMetrics.updateChannelMetrics
}

// Close stop application metrics HTTP handler and closes listener.
func (appMetrics *defaultAppMetrics) Close() error {
if appMetrics.listener == nil {
Expand Down Expand Up @@ -199,6 +215,18 @@ func NewDefaultAppMetrics(ctx context.Context) (AppMetrics, error) {
return nil, err
}

return &defaultAppMetrics{Meter: meter, ctx: ctx, idpMetrics: idpMetrics, httpMiddleware: middleware,
grpcMetrics: grpcMetrics, storeMetrics: storeMetrics}, nil
updateChannelMetrics, err := NewUpdateChannelMetrics(ctx, meter)
if err != nil {
return nil, err
}

return &defaultAppMetrics{
Meter: meter,
ctx: ctx,
idpMetrics: idpMetrics,
httpMiddleware: middleware,
grpcMetrics: grpcMetrics,
storeMetrics: storeMetrics,
updateChannelMetrics: updateChannelMetrics,
}, nil
}
2 changes: 1 addition & 1 deletion management/server/telemetry/store_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"go.opentelemetry.io/otel/metric/instrument/syncint64"
)

// StoreMetrics represents all metrics related to the FileStore
// StoreMetrics represents all metrics related to the Store
type StoreMetrics struct {
globalLockAcquisitionDurationMicro syncint64.Histogram
globalLockAcquisitionDurationMs syncint64.Histogram
Expand Down
141 changes: 141 additions & 0 deletions management/server/telemetry/updatechannel_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package telemetry

import (
"context"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/instrument/syncint64"
)

// UpdateChannelMetrics represents all metrics related to the UpdateChannel
type UpdateChannelMetrics struct {
createChannelDurationMs syncint64.Histogram
createChannelDurationMicro syncint64.Histogram
closeChannelDurationMs syncint64.Histogram
closeChannelDurationMicro syncint64.Histogram
closeChannelsDurationMs syncint64.Histogram
closeChannelsDurationMicro syncint64.Histogram
closeChannels syncint64.Histogram
sendUpdateDurationMs syncint64.Histogram
sendUpdateDurationMicro syncint64.Histogram
getAllConnectedPeersDurationMs syncint64.Histogram
getAllConnectedPeersDurationMicro syncint64.Histogram
getAllConnectedPeers syncint64.Histogram
ctx context.Context
}

// NewUpdateChannelMetrics creates an instance of UpdateChannel
func NewUpdateChannelMetrics(ctx context.Context, meter metric.Meter) (*UpdateChannelMetrics, error) {
createChannelDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.create.duration.ms")
if err != nil {
return nil, err
}

createChannelDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.create.duration.micro")
if err != nil {
return nil, err
}

closeChannelDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.close.one.duration.ms")
if err != nil {
return nil, err
}

closeChannelDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.close.one.duration.micro")
if err != nil {
return nil, err
}

closeChannelsDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.close.multiple.duration.ms")
if err != nil {
return nil, err
}

closeChannelsDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.close.multiple.duration.micro")
if err != nil {
return nil, err
}

closeChannels, err := meter.SyncInt64().Histogram("management.updatechannel.close.multiple.channels")
if err != nil {
return nil, err
}

sendUpdateDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.send.duration.ms")
if err != nil {
return nil, err
}

sendUpdateDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.send.duration.micro")
if err != nil {
return nil, err
}

getAllConnectedPeersDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.get.all.duration.ms")
if err != nil {
return nil, err
}

getAllConnectedPeersDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.get.all.duration.micro")
if err != nil {
return nil, err
}

getAllConnectedPeers, err := meter.SyncInt64().Histogram("management.updatechannel.get.all.peers")
if err != nil {
return nil, err
}

return &UpdateChannelMetrics{
createChannelDurationMs: createChannelDurationMs,
createChannelDurationMicro: createChannelDurationMicro,
closeChannelDurationMs: closeChannelDurationMs,
closeChannelDurationMicro: closeChannelDurationMicro,
closeChannelsDurationMs: closeChannelsDurationMs,
closeChannelsDurationMicro: closeChannelsDurationMicro,
closeChannels: closeChannels,
sendUpdateDurationMs: sendUpdateDurationMs,
sendUpdateDurationMicro: sendUpdateDurationMicro,
getAllConnectedPeersDurationMs: getAllConnectedPeersDurationMs,
getAllConnectedPeersDurationMicro: getAllConnectedPeersDurationMicro,
getAllConnectedPeers: getAllConnectedPeers,
ctx: ctx,
}, nil
}

// CountCreateChannelDuration counts the duration of the CreateChannel method,
// closed indicates if existing channel was closed before creation of a new one
func (metrics *UpdateChannelMetrics) CountCreateChannelDuration(duration time.Duration, closed bool) {
metrics.createChannelDurationMs.Record(metrics.ctx, duration.Milliseconds(), attribute.Bool("closed", closed))
metrics.createChannelDurationMicro.Record(metrics.ctx, duration.Microseconds(), attribute.Bool("closed", closed))
}

// CountCloseChannelDuration counts the duration of the CloseChannel method
func (metrics *UpdateChannelMetrics) CountCloseChannelDuration(duration time.Duration) {
metrics.closeChannelDurationMs.Record(metrics.ctx, duration.Milliseconds())
metrics.closeChannelDurationMicro.Record(metrics.ctx, duration.Microseconds())
}

// CountCloseChannelsDuration counts the duration of the CloseChannels method and the number of channels have been closed
func (metrics *UpdateChannelMetrics) CountCloseChannelsDuration(duration time.Duration, channels int) {
metrics.closeChannelsDurationMs.Record(metrics.ctx, duration.Milliseconds())
metrics.closeChannelsDurationMicro.Record(metrics.ctx, duration.Microseconds())
metrics.closeChannels.Record(metrics.ctx, int64(channels))
}

// CountSendUpdateDuration counts the duration of the SendUpdate method
// found indicates if peer had channel, dropped indicates if the message was dropped due channel buffer overload
func (metrics *UpdateChannelMetrics) CountSendUpdateDuration(duration time.Duration, found, dropped bool) {
attrs := []attribute.KeyValue{attribute.Bool("found", found), attribute.Bool("dropped", dropped)}
metrics.sendUpdateDurationMs.Record(metrics.ctx, duration.Milliseconds(), attrs...)
metrics.sendUpdateDurationMicro.Record(metrics.ctx, duration.Microseconds(), attrs...)
}

// CountGetAllConnectedPeersDuration counts the duration of the GetAllConnectedPeers method and the number of peers have been returned
func (metrics *UpdateChannelMetrics) CountGetAllConnectedPeersDuration(duration time.Duration, peers int) {
metrics.getAllConnectedPeersDurationMs.Record(metrics.ctx, duration.Milliseconds())
metrics.getAllConnectedPeersDurationMicro.Record(metrics.ctx, duration.Microseconds())
metrics.getAllConnectedPeers.Record(metrics.ctx, int64(peers))
}
6 changes: 3 additions & 3 deletions management/server/turncredentials_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var TurnTestHost = &Host{
func TestTimeBasedAuthSecretsManager_GenerateCredentials(t *testing.T) {
ttl := util.Duration{Duration: time.Hour}
secret := "some_secret"
peersManager := NewPeersUpdateManager()
peersManager := NewPeersUpdateManager(nil)

tested := NewTimeBasedAuthSecretsManager(peersManager, &TURNConfig{
CredentialsTTL: ttl,
Expand All @@ -44,7 +44,7 @@ func TestTimeBasedAuthSecretsManager_GenerateCredentials(t *testing.T) {
func TestTimeBasedAuthSecretsManager_SetupRefresh(t *testing.T) {
ttl := util.Duration{Duration: 2 * time.Second}
secret := "some_secret"
peersManager := NewPeersUpdateManager()
peersManager := NewPeersUpdateManager(nil)
peer := "some_peer"
updateChannel := peersManager.CreateChannel(peer)

Expand Down Expand Up @@ -93,7 +93,7 @@ loop:
func TestTimeBasedAuthSecretsManager_CancelRefresh(t *testing.T) {
ttl := util.Duration{Duration: time.Hour}
secret := "some_secret"
peersManager := NewPeersUpdateManager()
peersManager := NewPeersUpdateManager(nil)
peer := "some_peer"

tested := NewTimeBasedAuthSecretsManager(peersManager, &TURNConfig{
Expand Down
Loading