Skip to content
This repository has been archived by the owner on Feb 22, 2024. It is now read-only.

Add extra metrics #124

Merged
merged 2 commits into from
Dec 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions internal/app/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,25 @@ func (s *Server) TrackConnection(gctx global.Context, r *http.Request, con clien
// Increment counters
atomic.AddInt32(s.activeConns, 1)

switch con.Transport() {
case client.TransportEventStream:
atomic.AddInt32(s.activeEventStreams, 1)
gctx.Inst().Monitoring.EventV3().CurrentEventStreams.Inc()

defer func() {
atomic.AddInt32(s.activeEventStreams, -1)
gctx.Inst().Monitoring.EventV3().CurrentEventStreams.Dec()
}()
case client.TransportWebSocket:
atomic.AddInt32(s.activeWebSockets, 1)
gctx.Inst().Monitoring.EventV3().CurrentWebSockets.Inc()

defer func() {
atomic.AddInt32(s.activeWebSockets, -1)
gctx.Inst().Monitoring.EventV3().CurrentWebSockets.Dec()
}()
}

gctx.Inst().Monitoring.EventV3().CurrentConnections.Inc()
gctx.Inst().Monitoring.EventV3().TotalConnections.Observe(1)

Expand Down Expand Up @@ -51,6 +70,7 @@ func (s *Server) TrackConnection(gctx global.Context, r *http.Request, con clien

gctx.Inst().Monitoring.EventV3().CurrentConnections.Dec()
gctx.Inst().Monitoring.EventV3().TotalConnections.Observe(float64(time.Since(start)/time.Millisecond) / 1000)
gctx.Inst().Monitoring.EventV3().TotalConnectionDurationSeconds.Observe(float64(time.Since(start).Seconds()))

zap.S().Debugw("connection ended",
"client_addr", clientAddr,
Expand Down
9 changes: 9 additions & 0 deletions internal/app/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type Connection interface {
SendClose(code events.CloseCode, after time.Duration)
// SetWriter defines the connection's writable stream (SSE only)
SetWriter(w *bufio.Writer, f http.Flusher)
// Return the name of the transport used by this connection
Transport() Transport
}

func IsClientSentOp(op events.Opcode) bool {
Expand Down Expand Up @@ -315,3 +317,10 @@ var (
ErrAlreadySubscribed = fmt.Errorf("already subscribed")
ErrNotSubscribed = fmt.Errorf("not subscribed")
)

type Transport string

const (
TransportWebSocket Transport = "WebSocket"
TransportEventStream Transport = "EventStream"
)
4 changes: 4 additions & 0 deletions internal/app/connection/eventstream/eventstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,7 @@ func SetEventStreamHeaders(w http.ResponseWriter) {

w.WriteHeader(http.StatusOK)
}

func (es *EventStream) Transport() client.Transport {
return client.TransportEventStream
}
4 changes: 4 additions & 0 deletions internal/app/connection/eventstream/eventstream_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ func (es *EventStream) Read(gctx global.Context) {
es.SendClose(events.CloseCodeRestart, time.Second*5)
return
case <-heartbeat.C:
gctx.Inst().Monitoring.EventV3().Heartbeats.Observe(1)

if err := es.SendHeartbeat(); err != nil {
return
}
Expand All @@ -56,6 +58,8 @@ func (es *EventStream) Read(gctx global.Context) {

// Dispatch the event to the client
es.handler.OnDispatch(gctx, msg)

gctx.Inst().Monitoring.EventV3().Dispatches.Observe(1)
}
}
}
8 changes: 8 additions & 0 deletions internal/app/connection/websocket/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,10 @@ func (w *WebSocket) Greet(gctx global.Context) error {
HeartbeatInterval: uint32(w.heartbeatInterval),
SessionID: hex.EncodeToString(w.sessionID),
SubscriptionLimit: w.subscriptionLimit,
Instance: events.HelloPayloadInstanceInfo{
Name: gctx.Config().Pod.Name,
Population: gctx.Inst().ConcurrencyValue,
},
})

return w.Write(msg.ToRaw())
Expand Down Expand Up @@ -215,3 +219,7 @@ func (w *WebSocket) Destroy(gctx global.Context) {
w.SetReady()
w.evm.Destroy(gctx)
}

func (w *WebSocket) Transport() client.Transport {
return client.TransportWebSocket
}
4 changes: 4 additions & 0 deletions internal/app/connection/websocket/websocket_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ func (w *WebSocket) Read(gctx global.Context) {
return
case <-heartbeat.C: // Send a heartbeat
if !deferred {
gctx.Inst().Monitoring.EventV3().Heartbeats.Observe(1)

if err := w.SendHeartbeat(); err != nil {
return
}
Expand All @@ -169,6 +171,8 @@ func (w *WebSocket) Read(gctx global.Context) {

// Dispatch the event to the client
w.handler.OnDispatch(gctx, msg)

gctx.Inst().Monitoring.EventV3().Dispatches.Observe(1)
}
}
}
20 changes: 15 additions & 5 deletions internal/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ type Server struct {
locked bool
shutdown chan struct{}

activeConns *int32
activeConns *int32
activeEventStreams *int32
activeWebSockets *int32
}

func New(gctx global.Context) (*Server, <-chan struct{}) {
Expand All @@ -48,7 +50,9 @@ func New(gctx global.Context) (*Server, <-chan struct{}) {

shutdown: make(chan struct{}),

activeConns: new(int32),
activeConns: new(int32),
activeEventStreams: new(int32),
activeWebSockets: new(int32),
}

srv.setRoutes()
Expand Down Expand Up @@ -125,11 +129,17 @@ func New(gctx global.Context) (*Server, <-chan struct{}) {
ticker.Stop()
return
case <-ticker.C:
v := atomic.LoadInt32(srv.activeConns)
a := atomic.LoadInt32(srv.activeConns)
es := atomic.LoadInt32(srv.activeEventStreams)
ws := atomic.LoadInt32(srv.activeWebSockets)

gctx.Inst().ConcurrencyValue = v
gctx.Inst().ConcurrencyValue = a

zap.S().Infof("concurrency: %d", v)
zap.S().Infow("stats",
"concurrency", a,
"eventstreams", es,
"websockets", ws,
)
}
}
}()
Expand Down
15 changes: 4 additions & 11 deletions internal/instance/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,16 @@ import (
)

type Monitoring interface {
EventV1() EventV1
EventV3() EventV3
Register(prometheus.Registerer)
}

type EventV1 struct {
ChannelEmotes EventV1ChannelEmotes
}

type EventV3 struct {
TotalConnections prometheus.Histogram
TotalConnectionDurationSeconds prometheus.Histogram
CurrentConnections prometheus.Gauge
}

type EventV1ChannelEmotes struct {
TotalConnections prometheus.Histogram
TotalConnectionDurationSeconds prometheus.Histogram
CurrentConnections prometheus.Gauge
CurrentEventStreams prometheus.Gauge
CurrentWebSockets prometheus.Gauge
Heartbeats prometheus.Histogram
Dispatches prometheus.Histogram
}
53 changes: 24 additions & 29 deletions internal/monitoring/prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,23 @@ import (
)

type mon struct {
eventv1 instance.EventV1
eventv3 instance.EventV3
}

func (m *mon) EventV1() instance.EventV1 {
return m.eventv1
}

func (m *mon) EventV3() instance.EventV3 {
return m.eventv3
}

func (m *mon) Register(r prometheus.Registerer) {
r.MustRegister(
// v1 channel-emotes
m.eventv1.ChannelEmotes.TotalConnections,
m.eventv1.ChannelEmotes.TotalConnectionDurationSeconds,
m.eventv1.ChannelEmotes.CurrentConnections,

// v3
m.eventv3.TotalConnections,
m.eventv3.TotalConnectionDurationSeconds,
m.eventv3.CurrentConnections,
m.eventv3.CurrentEventStreams,
m.eventv3.CurrentWebSockets,
m.eventv3.Heartbeats,
m.eventv3.Dispatches,
)
}

Expand All @@ -46,25 +40,6 @@ func labelsFromKeyValue(kv []configure.KeyValue) prometheus.Labels {

func NewPrometheus(gCtx global.Context) instance.Monitoring {
return &mon{
eventv1: instance.EventV1{
ChannelEmotes: instance.EventV1ChannelEmotes{
TotalConnections: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "events_total_connections",
ConstLabels: labelsFromKeyValue(gCtx.Config().Monitoring.Labels),
Help: "The total number of connections",
}),
TotalConnectionDurationSeconds: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "events_total_connection_duration_seconds",
ConstLabels: labelsFromKeyValue(gCtx.Config().Monitoring.Labels),
Help: "The total number of seconds used on connections",
}),
CurrentConnections: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "events_current_connections",
ConstLabels: labelsFromKeyValue(gCtx.Config().Monitoring.Labels),
Help: "The current number of connections",
}),
},
},
eventv3: instance.EventV3{
TotalConnections: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "events_v3_total_connections",
Expand All @@ -81,6 +56,26 @@ func NewPrometheus(gCtx global.Context) instance.Monitoring {
ConstLabels: labelsFromKeyValue(gCtx.Config().Monitoring.Labels),
Help: "The current number of connections",
}),
CurrentEventStreams: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "events_v3_current_event_streams",
ConstLabels: labelsFromKeyValue(gCtx.Config().Monitoring.Labels),
Help: "The current number of connections via EventStream transport",
}),
CurrentWebSockets: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "events_v3_current_event_websockets",
ConstLabels: labelsFromKeyValue(gCtx.Config().Monitoring.Labels),
Help: "The current number of connections via WebSocket transport",
}),
Heartbeats: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "events_v3_heartbeats",
ConstLabels: labelsFromKeyValue(gCtx.Config().Monitoring.Labels),
Help: "The number of heartbeats sent out to clients",
}),
Dispatches: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "events_v3_dispatches",
ConstLabels: labelsFromKeyValue(gCtx.Config().Monitoring.Labels),
Help: "The number of dispatches sent out to clients",
}),
},
}
}
4 changes: 2 additions & 2 deletions terraform/deployment.tf
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ resource "kubernetes_deployment" "app" {

strategy {
rolling_update {
max_surge = "2"
max_unavailable = "2"
max_surge = "4"
max_unavailable = "4"
}
type = "RollingUpdate"
}
Expand Down
Loading