-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add stream analytics to ee (TT-13233) #6671
Conversation
Knock Knock! 🔍 Just thought I'd let you know that your PR title and story title look quite different. PR titles that closely resemble the story title make it easier for reviewers to understand the context of the PR. An easy-to-understand PR title a day makes the reviewer review away! 😛⚡️
Check out this guide to learn more about PR best-practices. |
PR Reviewer Guide 🔍Here are some key observations to aid the review process:
|
API Changes --- prev.txt 2024-11-06 11:49:13.942652537 +0000
+++ current.txt 2024-11-06 11:49:07.322674355 +0000
@@ -8174,6 +8174,9 @@
VARIABLES
+var (
+ ErrResponseWriterNotHijackable = errors.New("ResponseWriter is not hijackable")
+)
var GlobalStreamCounter atomic.Int64
GlobalStreamCounter is used for testing.
@@ -8214,11 +8217,23 @@
}
Gateway is the subset of Gateway APIs that the middleware uses.
+type HandleFuncAdapter struct {
+ StreamID string
+ StreamManager *Manager
+ StreamMiddleware *Middleware
+ Muxer *mux.Router
+ Logger *logrus.Entry
+}
+
+func (h *HandleFuncAdapter) HandleFunc(path string, f func(http.ResponseWriter, *http.Request))
+
type Manager struct {
// Has unexported fields.
}
Manager is responsible for creating a single stream.
+func (sm *Manager) SetAnalyticsFactory(factory StreamAnalyticsFactory)
+
type Middleware struct {
Spec *APISpec
Gw Gateway
@@ -8229,7 +8244,7 @@
}
Middleware implements a streaming middleware.
-func NewMiddleware(gw Gateway, mw BaseMiddleware, spec *APISpec) *Middleware
+func NewMiddleware(gw Gateway, mw BaseMiddleware, spec *APISpec, analyticsFactory StreamAnalyticsFactory) *Middleware
NewMiddleware returns a new instance of Middleware.
func (s *Middleware) CreateStreamManager(r *http.Request) *Manager
@@ -8242,6 +8257,8 @@
func (s *Middleware) GC()
GC removes inactive stream managers.
+func (s *Middleware) GetStreamManager() *Manager
+
func (s *Middleware) Init()
Init initializes the middleware
@@ -8254,9 +8271,23 @@
func (s *Middleware) ProcessRequest(w http.ResponseWriter, r *http.Request, _ interface{}) (error, int)
ProcessRequest will handle the streaming functionality.
+func (s *Middleware) SetAnalyticsFactory(factory StreamAnalyticsFactory)
+
func (s *Middleware) Unload()
Unload closes and remove active streams.
+type NoopStreamAnalyticsFactory struct{}
+
+func (n *NoopStreamAnalyticsFactory) CreateRecorder(r *http.Request) StreamAnalyticsRecorder
+
+func (n *NoopStreamAnalyticsFactory) CreateResponseWriter(w http.ResponseWriter, r *http.Request, streamID string, recorder StreamAnalyticsRecorder) http.ResponseWriter
+
+type NoopStreamAnalyticsRecorder struct{}
+
+func (n *NoopStreamAnalyticsRecorder) PrepareRecord(r *http.Request)
+
+func (n *NoopStreamAnalyticsRecorder) RecordHit(statusCode int, latency analytics.Latency) error
+
type Stream struct {
// Has unexported fields.
}
@@ -8280,6 +8311,16 @@
func (s *Stream) Stop() error
Stop cleans up the stream
+type StreamAnalyticsFactory interface {
+ CreateRecorder(r *http.Request) StreamAnalyticsRecorder
+ CreateResponseWriter(w http.ResponseWriter, r *http.Request, streamID string, recorder StreamAnalyticsRecorder) http.ResponseWriter
+}
+
+type StreamAnalyticsRecorder interface {
+ PrepareRecord(r *http.Request)
+ RecordHit(statusCode int, latency analytics.Latency) error
+}
+
type StreamsConfig struct {
Info struct {
Version string `json:"version"` |
PR Code Suggestions ✨Explore these optional code suggestions:
|
ef65c61
to
3036180
Compare
d37cf5c
to
1371089
Compare
Quality Gate passedIssues Measures |
User description
TT-13233
This PR adds analytics to stream APIs for EE.
Types of changes
PR Type
Enhancement, Tests
Description
Manager
andMiddleware
to integrate withStreamAnalyticsFactory
.DefaultStreamAnalyticsFactory
andStreamAnalyticsResponseWriter
for detailed analytics.Changes walkthrough 📝
7 files
analytics.go
Add interfaces and no-op implementations for stream analytics
ee/middleware/streams/analytics.go
StreamAnalyticsFactory
andStreamAnalyticsRecorder
interfaces.
NoopStreamAnalyticsFactory
andNoopStreamAnalyticsRecorder
implementations.
ErrResponseWriterNotHijackable
.manager.go
Integrate analytics factory into stream manager
ee/middleware/streams/manager.go
analyticsFactory
toManager
struct.SetAnalyticsFactory
method.HandleFuncAdapter
.middleware.go
Enhance middleware with analytics factory support
ee/middleware/streams/middleware.go
analyticsFactory
toMiddleware
struct.NewMiddleware
to acceptanalyticsFactory
.SetAnalyticsFactory
andGetStreamManager
methods.util.go
Refactor and enhance handle function adapter
ee/middleware/streams/util.go
handleFuncAdapter
toHandleFuncAdapter
.HandleFunc
.analytics_streams.go
Implement default stream analytics factory and response writer
gateway/analytics_streams.go
DefaultStreamAnalyticsFactory
and related recorderimplementations.
StreamAnalyticsResponseWriter
.handler_success.go
Refactor record detail logic for streams
gateway/handler_success.go
recordDetailUnsafe
function.recordDetail
to userecordDetailUnsafe
.mw_streaming_ee.go
Integrate analytics factory into streaming middleware
gateway/mw_streaming_ee.go
StreamAnalyticsFactory
into streaming middlewareinitialization.
2 files
analytics_streams_test.go
Add tests for stream analytics components
gateway/analytics_streams_test.go
DefaultStreamAnalyticsFactory
andStreamAnalyticsResponseWriter
.mw_streaming_test.go
Update streaming middleware tests for analytics integration
gateway/mw_streaming_test.go