Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add stream analytics to ee (TT-13233) #6671

Merged
merged 3 commits into from
Nov 6, 2024

Conversation

pvormste
Copy link
Contributor

@pvormste pvormste commented Oct 25, 2024

User description

TT-13233
Summary Streams in Pump - Enable Pump to purge Stream API records
Type Story Story
Status In Dev
Points N/A
Labels -

This PR adds analytics to stream APIs for EE.

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)
  • Refactoring or add test (improvements in base code or adds test coverage to functionality)

PR Type

Enhancement, Tests


Description

  • Introduced stream analytics interfaces and no-op implementations to support analytics recording.
  • Enhanced Manager and Middleware to integrate with StreamAnalyticsFactory.
  • Implemented DefaultStreamAnalyticsFactory and StreamAnalyticsResponseWriter for detailed analytics.
  • Added comprehensive tests for new analytics components and functionalities.

Changes walkthrough 📝

Relevant files
Enhancement
7 files
analytics.go
Add interfaces and no-op implementations for stream analytics

ee/middleware/streams/analytics.go

  • Introduced StreamAnalyticsFactory and StreamAnalyticsRecorder
    interfaces.
  • Added NoopStreamAnalyticsFactory and NoopStreamAnalyticsRecorder
    implementations.
  • Defined error ErrResponseWriterNotHijackable.
  • +42/-0   
    manager.go
    Integrate analytics factory into stream manager                   

    ee/middleware/streams/manager.go

  • Added analyticsFactory to Manager struct.
  • Implemented SetAnalyticsFactory method.
  • Updated stream creation logic to use HandleFuncAdapter.
  • +21/-13 
    middleware.go
    Enhance middleware with analytics factory support               

    ee/middleware/streams/middleware.go

  • Added analyticsFactory to Middleware struct.
  • Updated NewMiddleware to accept analyticsFactory.
  • Implemented SetAnalyticsFactory and GetStreamManager methods.
  • +35/-12 
    util.go
    Refactor and enhance handle function adapter                         

    ee/middleware/streams/util.go

  • Refactored handleFuncAdapter to HandleFuncAdapter.
  • Integrated analytics recording in HandleFunc.
  • +20/-17 
    analytics_streams.go
    Implement default stream analytics factory and response writer

    gateway/analytics_streams.go

  • Added DefaultStreamAnalyticsFactory and related recorder
    implementations.
  • Implemented StreamAnalyticsResponseWriter.
  • Provided utility functions for analytics recording.
  • +230/-0 
    handler_success.go
    Refactor record detail logic for streams                                 

    gateway/handler_success.go

  • Added recordDetailUnsafe function.
  • Refactored recordDetail to use recordDetailUnsafe.
  • +6/-1     
    mw_streaming_ee.go
    Integrate analytics factory into streaming middleware       

    gateway/mw_streaming_ee.go

  • Integrated StreamAnalyticsFactory into streaming middleware
    initialization.
  • +3/-1     
    Tests
    2 files
    analytics_streams_test.go
    Add tests for stream analytics components                               

    gateway/analytics_streams_test.go

  • Added tests for websocket upgrade detection.
  • Tested DefaultStreamAnalyticsFactory and
    StreamAnalyticsResponseWriter.
  • Verified analytics recording functionality.
  • +316/-0 
    mw_streaming_test.go
    Update streaming middleware tests for analytics integration

    gateway/mw_streaming_test.go

  • Updated streaming middleware test setup to include analytics factory.
  • +1/-1     

    💡 PR-Agent usage: Comment /help "your question" on any pull request to receive relevant information

    @buger
    Copy link
    Member

    buger commented Oct 25, 2024

    Knock Knock! 🔍

    Just thought I'd let you know that your PR title and story title look quite different. PR titles that closely resemble the story title make it easier for reviewers to understand the context of the PR.

    An easy-to-understand PR title a day makes the reviewer review away! 😛⚡️
    Story Title Streams in Pump - Enable Pump to purge Stream API records
    PR Title add stream analytics to ee (TT-13233)

    Check out this guide to learn more about PR best-practices.

    Copy link
    Contributor

    PR Reviewer Guide 🔍

    Here are some key observations to aid the review process:

    ⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
    🧪 PR contains tests
    🔒 No security concerns identified
    ⚡ Recommended focus areas for review

    Error Handling
    The implementation of error handling in StreamAnalyticsRecorder and StreamAnalyticsResponseWriter should be reviewed to ensure that errors are handled appropriately and not just logged.

    Websocket Handling
    The handling of websocket upgrades and the recording of analytics for websocket connections should be carefully reviewed to ensure accuracy and efficiency.

    Copy link
    Contributor

    github-actions bot commented Oct 25, 2024

    API Changes

    --- prev.txt	2024-11-06 11:49:13.942652537 +0000
    +++ current.txt	2024-11-06 11:49:07.322674355 +0000
    @@ -8174,6 +8174,9 @@
     
     VARIABLES
     
    +var (
    +	ErrResponseWriterNotHijackable = errors.New("ResponseWriter is not hijackable")
    +)
     var GlobalStreamCounter atomic.Int64
         GlobalStreamCounter is used for testing.
     
    @@ -8214,11 +8217,23 @@
     }
         Gateway is the subset of Gateway APIs that the middleware uses.
     
    +type HandleFuncAdapter struct {
    +	StreamID         string
    +	StreamManager    *Manager
    +	StreamMiddleware *Middleware
    +	Muxer            *mux.Router
    +	Logger           *logrus.Entry
    +}
    +
    +func (h *HandleFuncAdapter) HandleFunc(path string, f func(http.ResponseWriter, *http.Request))
    +
     type Manager struct {
     	// Has unexported fields.
     }
         Manager is responsible for creating a single stream.
     
    +func (sm *Manager) SetAnalyticsFactory(factory StreamAnalyticsFactory)
    +
     type Middleware struct {
     	Spec *APISpec
     	Gw   Gateway
    @@ -8229,7 +8244,7 @@
     }
         Middleware implements a streaming middleware.
     
    -func NewMiddleware(gw Gateway, mw BaseMiddleware, spec *APISpec) *Middleware
    +func NewMiddleware(gw Gateway, mw BaseMiddleware, spec *APISpec, analyticsFactory StreamAnalyticsFactory) *Middleware
         NewMiddleware returns a new instance of Middleware.
     
     func (s *Middleware) CreateStreamManager(r *http.Request) *Manager
    @@ -8242,6 +8257,8 @@
     func (s *Middleware) GC()
         GC removes inactive stream managers.
     
    +func (s *Middleware) GetStreamManager() *Manager
    +
     func (s *Middleware) Init()
         Init initializes the middleware
     
    @@ -8254,9 +8271,23 @@
     func (s *Middleware) ProcessRequest(w http.ResponseWriter, r *http.Request, _ interface{}) (error, int)
         ProcessRequest will handle the streaming functionality.
     
    +func (s *Middleware) SetAnalyticsFactory(factory StreamAnalyticsFactory)
    +
     func (s *Middleware) Unload()
         Unload closes and remove active streams.
     
    +type NoopStreamAnalyticsFactory struct{}
    +
    +func (n *NoopStreamAnalyticsFactory) CreateRecorder(r *http.Request) StreamAnalyticsRecorder
    +
    +func (n *NoopStreamAnalyticsFactory) CreateResponseWriter(w http.ResponseWriter, r *http.Request, streamID string, recorder StreamAnalyticsRecorder) http.ResponseWriter
    +
    +type NoopStreamAnalyticsRecorder struct{}
    +
    +func (n *NoopStreamAnalyticsRecorder) PrepareRecord(r *http.Request)
    +
    +func (n *NoopStreamAnalyticsRecorder) RecordHit(statusCode int, latency analytics.Latency) error
    +
     type Stream struct {
     	// Has unexported fields.
     }
    @@ -8280,6 +8311,16 @@
     func (s *Stream) Stop() error
         Stop cleans up the stream
     
    +type StreamAnalyticsFactory interface {
    +	CreateRecorder(r *http.Request) StreamAnalyticsRecorder
    +	CreateResponseWriter(w http.ResponseWriter, r *http.Request, streamID string, recorder StreamAnalyticsRecorder) http.ResponseWriter
    +}
    +
    +type StreamAnalyticsRecorder interface {
    +	PrepareRecord(r *http.Request)
    +	RecordHit(statusCode int, latency analytics.Latency) error
    +}
    +
     type StreamsConfig struct {
     	Info struct {
     		Version string `json:"version"`

    Copy link
    Contributor

    PR Code Suggestions ✨

    Explore these optional code suggestions:

    CategorySuggestion                                                                                                                                    Score
    Possible bug
    Add nil checks for parameters in CreateResponseWriter to ensure stability

    Validate the recorder and w for nil before proceeding with operations in
    CreateResponseWriter to prevent runtime errors.

    gateway/analytics_streams.go [47-48]

    +if recorder == nil || w == nil {
    +  return nil
    +}
     return NewStreamAnalyticsResponseWriter(d.Logger, w, r, streamID, recorder)
    Suggestion importance[1-10]: 8

    Why: The suggestion addresses a potential runtime error by adding nil checks for critical parameters, enhancing the robustness and stability of the CreateResponseWriter function.

    8
    Enhancement
    Remove redundant setting of analyticsFactory when it is not nil

    Ensure that s.analyticsFactory is not nil before setting it in Init to avoid
    redundant operations.

    ee/middleware/streams/middleware.go [94-99]

     if s.analyticsFactory == nil {
       s.SetAnalyticsFactory(&NoopStreamAnalyticsFactory{})
    -} else {
    -  s.SetAnalyticsFactory(s.analyticsFactory)
     }
    Suggestion importance[1-10]: 7

    Why: The suggestion correctly identifies a redundant operation in the Init method and proposes a simplification that improves code clarity without affecting functionality.

    7

    @pvormste pvormste force-pushed the feat/TT-13233/stream-analytics branch from ef65c61 to 3036180 Compare October 25, 2024 10:26
    @pvormste pvormste force-pushed the feat/TT-13233/stream-analytics branch from d37cf5c to 1371089 Compare November 6, 2024 11:47
    @pvormste pvormste enabled auto-merge (squash) November 6, 2024 11:49
    @pvormste pvormste merged commit 810f981 into master Nov 6, 2024
    26 of 39 checks passed
    @pvormste pvormste deleted the feat/TT-13233/stream-analytics branch November 6, 2024 12:23
    Copy link

    sonarcloud bot commented Nov 6, 2024

    Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
    Projects
    None yet
    Development

    Successfully merging this pull request may close these issues.

    3 participants