From 1910e29570e6be3a98a579794d180419840552cd Mon Sep 17 00:00:00 2001 From: anatoleam Date: Mon, 4 Dec 2023 10:45:01 -0600 Subject: [PATCH 1/2] feat: add more tracked stats to prometheus --- internal/app/connect.go | 20 +++++++ internal/app/connection/connection.go | 9 ++++ .../app/connection/eventstream/eventstream.go | 4 ++ .../eventstream/eventstream_read.go | 4 ++ .../app/connection/websocket/websocket.go | 8 +++ .../connection/websocket/websocket_read.go | 4 ++ internal/app/server.go | 20 +++++-- internal/instance/monitoring.go | 15 ++---- internal/monitoring/prom.go | 53 +++++++++---------- 9 files changed, 92 insertions(+), 45 deletions(-) diff --git a/internal/app/connect.go b/internal/app/connect.go index 22a91b1..f2f0c37 100644 --- a/internal/app/connect.go +++ b/internal/app/connect.go @@ -24,6 +24,25 @@ func (s *Server) TrackConnection(gctx global.Context, r *http.Request, con clien // Increment counters atomic.AddInt32(s.activeConns, 1) + switch con.Transport() { + case client.TransportEventStream: + atomic.AddInt32(s.activeEventStreams, 1) + gctx.Inst().Monitoring.EventV3().CurrentEventStreams.Inc() + + defer func() { + atomic.AddInt32(s.activeEventStreams, -1) + gctx.Inst().Monitoring.EventV3().CurrentEventStreams.Dec() + }() + case client.TransportWebSocket: + atomic.AddInt32(s.activeWebSockets, 1) + gctx.Inst().Monitoring.EventV3().CurrentWebSockets.Inc() + + defer func() { + atomic.AddInt32(s.activeWebSockets, -1) + gctx.Inst().Monitoring.EventV3().CurrentWebSockets.Dec() + }() + } + gctx.Inst().Monitoring.EventV3().CurrentConnections.Inc() gctx.Inst().Monitoring.EventV3().TotalConnections.Observe(1) @@ -51,6 +70,7 @@ func (s *Server) TrackConnection(gctx global.Context, r *http.Request, con clien gctx.Inst().Monitoring.EventV3().CurrentConnections.Dec() gctx.Inst().Monitoring.EventV3().TotalConnections.Observe(float64(time.Since(start)/time.Millisecond) / 1000) + gctx.Inst().Monitoring.EventV3().TotalConnectionDurationSeconds.Observe(float64(time.Since(start).Seconds())) zap.S().Debugw("connection ended", "client_addr", clientAddr, diff --git a/internal/app/connection/connection.go b/internal/app/connection/connection.go index 0c1a6f2..c9ff3e8 100644 --- a/internal/app/connection/connection.go +++ b/internal/app/connection/connection.go @@ -52,6 +52,8 @@ type Connection interface { SendClose(code events.CloseCode, after time.Duration) // SetWriter defines the connection's writable stream (SSE only) SetWriter(w *bufio.Writer, f http.Flusher) + // Return the name of the transport used by this connection + Transport() Transport } func IsClientSentOp(op events.Opcode) bool { @@ -315,3 +317,10 @@ var ( ErrAlreadySubscribed = fmt.Errorf("already subscribed") ErrNotSubscribed = fmt.Errorf("not subscribed") ) + +type Transport string + +const ( + TransportWebSocket Transport = "WebSocket" + TransportEventStream Transport = "EventStream" +) diff --git a/internal/app/connection/eventstream/eventstream.go b/internal/app/connection/eventstream/eventstream.go index 4f14821..da54c00 100644 --- a/internal/app/connection/eventstream/eventstream.go +++ b/internal/app/connection/eventstream/eventstream.go @@ -250,3 +250,7 @@ func SetEventStreamHeaders(w http.ResponseWriter) { w.WriteHeader(http.StatusOK) } + +func (es *EventStream) Transport() client.Transport { + return client.TransportEventStream +} diff --git a/internal/app/connection/eventstream/eventstream_read.go b/internal/app/connection/eventstream/eventstream_read.go index 7c0dfd8..bdb0716 100644 --- a/internal/app/connection/eventstream/eventstream_read.go +++ b/internal/app/connection/eventstream/eventstream_read.go @@ -37,6 +37,8 @@ func (es *EventStream) Read(gctx global.Context) { es.SendClose(events.CloseCodeRestart, time.Second*5) return case <-heartbeat.C: + gctx.Inst().Monitoring.EventV3().Heartbeats.Observe(1) + if err := es.SendHeartbeat(); err != nil { return } @@ -56,6 +58,8 @@ func (es *EventStream) Read(gctx global.Context) { // Dispatch the event to the client es.handler.OnDispatch(gctx, msg) + + gctx.Inst().Monitoring.EventV3().Dispatches.Observe(1) } } } diff --git a/internal/app/connection/websocket/websocket.go b/internal/app/connection/websocket/websocket.go index 73f6e23..63ea507 100644 --- a/internal/app/connection/websocket/websocket.go +++ b/internal/app/connection/websocket/websocket.go @@ -81,6 +81,10 @@ func (w *WebSocket) Greet(gctx global.Context) error { HeartbeatInterval: uint32(w.heartbeatInterval), SessionID: hex.EncodeToString(w.sessionID), SubscriptionLimit: w.subscriptionLimit, + Instance: events.HelloPayloadInstanceInfo{ + Name: gctx.Config().Pod.Name, + Population: gctx.Inst().ConcurrencyValue, + }, }) return w.Write(msg.ToRaw()) @@ -215,3 +219,7 @@ func (w *WebSocket) Destroy(gctx global.Context) { w.SetReady() w.evm.Destroy(gctx) } + +func (w *WebSocket) Transport() client.Transport { + return client.TransportWebSocket +} diff --git a/internal/app/connection/websocket/websocket_read.go b/internal/app/connection/websocket/websocket_read.go index 769458a..74f8f66 100644 --- a/internal/app/connection/websocket/websocket_read.go +++ b/internal/app/connection/websocket/websocket_read.go @@ -146,6 +146,8 @@ func (w *WebSocket) Read(gctx global.Context) { return case <-heartbeat.C: // Send a heartbeat if !deferred { + gctx.Inst().Monitoring.EventV3().Heartbeats.Observe(1) + if err := w.SendHeartbeat(); err != nil { return } @@ -169,6 +171,8 @@ func (w *WebSocket) Read(gctx global.Context) { // Dispatch the event to the client w.handler.OnDispatch(gctx, msg) + + gctx.Inst().Monitoring.EventV3().Dispatches.Observe(1) } } } diff --git a/internal/app/server.go b/internal/app/server.go index 12c5163..0eae1f9 100644 --- a/internal/app/server.go +++ b/internal/app/server.go @@ -29,7 +29,9 @@ type Server struct { locked bool shutdown chan struct{} - activeConns *int32 + activeConns *int32 + activeEventStreams *int32 + activeWebSockets *int32 } func New(gctx global.Context) (*Server, <-chan struct{}) { @@ -48,7 +50,9 @@ func New(gctx global.Context) (*Server, <-chan struct{}) { shutdown: make(chan struct{}), - activeConns: new(int32), + activeConns: new(int32), + activeEventStreams: new(int32), + activeWebSockets: new(int32), } srv.setRoutes() @@ -125,11 +129,17 @@ func New(gctx global.Context) (*Server, <-chan struct{}) { ticker.Stop() return case <-ticker.C: - v := atomic.LoadInt32(srv.activeConns) + a := atomic.LoadInt32(srv.activeConns) + es := atomic.LoadInt32(srv.activeEventStreams) + ws := atomic.LoadInt32(srv.activeWebSockets) - gctx.Inst().ConcurrencyValue = v + gctx.Inst().ConcurrencyValue = a - zap.S().Infof("concurrency: %d", v) + zap.S().Infow("stats", + "concurrency", a, + "eventstreams", es, + "websockets", ws, + ) } } }() diff --git a/internal/instance/monitoring.go b/internal/instance/monitoring.go index 22a64f3..b5f2c31 100644 --- a/internal/instance/monitoring.go +++ b/internal/instance/monitoring.go @@ -5,23 +5,16 @@ import ( ) type Monitoring interface { - EventV1() EventV1 EventV3() EventV3 Register(prometheus.Registerer) } -type EventV1 struct { - ChannelEmotes EventV1ChannelEmotes -} - type EventV3 struct { TotalConnections prometheus.Histogram TotalConnectionDurationSeconds prometheus.Histogram CurrentConnections prometheus.Gauge -} - -type EventV1ChannelEmotes struct { - TotalConnections prometheus.Histogram - TotalConnectionDurationSeconds prometheus.Histogram - CurrentConnections prometheus.Gauge + CurrentEventStreams prometheus.Gauge + CurrentWebSockets prometheus.Gauge + Heartbeats prometheus.Histogram + Dispatches prometheus.Histogram } diff --git a/internal/monitoring/prom.go b/internal/monitoring/prom.go index f7d9d41..c028c4b 100644 --- a/internal/monitoring/prom.go +++ b/internal/monitoring/prom.go @@ -8,29 +8,23 @@ import ( ) type mon struct { - eventv1 instance.EventV1 eventv3 instance.EventV3 } -func (m *mon) EventV1() instance.EventV1 { - return m.eventv1 -} - func (m *mon) EventV3() instance.EventV3 { return m.eventv3 } func (m *mon) Register(r prometheus.Registerer) { r.MustRegister( - // v1 channel-emotes - m.eventv1.ChannelEmotes.TotalConnections, - m.eventv1.ChannelEmotes.TotalConnectionDurationSeconds, - m.eventv1.ChannelEmotes.CurrentConnections, - // v3 m.eventv3.TotalConnections, m.eventv3.TotalConnectionDurationSeconds, m.eventv3.CurrentConnections, + m.eventv3.CurrentEventStreams, + m.eventv3.CurrentWebSockets, + m.eventv3.Heartbeats, + m.eventv3.Dispatches, ) } @@ -46,25 +40,6 @@ func labelsFromKeyValue(kv []configure.KeyValue) prometheus.Labels { func NewPrometheus(gCtx global.Context) instance.Monitoring { return &mon{ - eventv1: instance.EventV1{ - ChannelEmotes: instance.EventV1ChannelEmotes{ - TotalConnections: prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "events_total_connections", - ConstLabels: labelsFromKeyValue(gCtx.Config().Monitoring.Labels), - Help: "The total number of connections", - }), - TotalConnectionDurationSeconds: prometheus.NewHistogram(prometheus.HistogramOpts{ - Name: "events_total_connection_duration_seconds", - ConstLabels: labelsFromKeyValue(gCtx.Config().Monitoring.Labels), - Help: "The total number of seconds used on connections", - }), - CurrentConnections: prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "events_current_connections", - ConstLabels: labelsFromKeyValue(gCtx.Config().Monitoring.Labels), - Help: "The current number of connections", - }), - }, - }, eventv3: instance.EventV3{ TotalConnections: prometheus.NewHistogram(prometheus.HistogramOpts{ Name: "events_v3_total_connections", @@ -81,6 +56,26 @@ func NewPrometheus(gCtx global.Context) instance.Monitoring { ConstLabels: labelsFromKeyValue(gCtx.Config().Monitoring.Labels), Help: "The current number of connections", }), + CurrentEventStreams: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "events_v3_current_event_streams", + ConstLabels: labelsFromKeyValue(gCtx.Config().Monitoring.Labels), + Help: "The current number of connections via EventStream transport", + }), + CurrentWebSockets: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "events_v3_current_event_websockets", + ConstLabels: labelsFromKeyValue(gCtx.Config().Monitoring.Labels), + Help: "The current number of connections via WebSocket transport", + }), + Heartbeats: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "events_v3_heartbeats", + ConstLabels: labelsFromKeyValue(gCtx.Config().Monitoring.Labels), + Help: "The number of heartbeats sent out to clients", + }), + Dispatches: prometheus.NewHistogram(prometheus.HistogramOpts{ + Name: "events_v3_dispatches", + ConstLabels: labelsFromKeyValue(gCtx.Config().Monitoring.Labels), + Help: "The number of dispatches sent out to clients", + }), }, } } From 927c308828b2c35f9579123133ccde14c6259714 Mon Sep 17 00:00:00 2001 From: anatoleam Date: Mon, 4 Dec 2023 10:47:20 -0600 Subject: [PATCH 2/2] increase rollout rate --- terraform/deployment.tf | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/terraform/deployment.tf b/terraform/deployment.tf index a21579e..17f9170 100644 --- a/terraform/deployment.tf +++ b/terraform/deployment.tf @@ -51,8 +51,8 @@ resource "kubernetes_deployment" "app" { strategy { rolling_update { - max_surge = "2" - max_unavailable = "2" + max_surge = "4" + max_unavailable = "4" } type = "RollingUpdate" }