Skip to content

Commit

Permalink
[TT-13139] Remove unused stream managers map
Browse files Browse the repository at this point in the history
  • Loading branch information
buraksezer committed Oct 7, 2024
1 parent 4b6e0b8 commit 3b60f35
Showing 1 changed file with 6 additions and 20 deletions.
26 changes: 6 additions & 20 deletions gateway/mw_streaming.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,10 @@ type StreamingMiddleware struct {

createStreamManagerLock sync.Mutex
streamManagerCache sync.Map // Map of payload hash to StreamManager

streamManagers sync.Map // Map of consumer group IDs to StreamManager
ctx context.Context
cancel context.CancelFunc
allowedUnsafe []string
defaultStreamManager *StreamManager
ctx context.Context
cancel context.CancelFunc
allowedUnsafe []string
defaultStreamManager *StreamManager
}

// StreamManager is responsible for creating a single stream
Expand Down Expand Up @@ -425,25 +423,13 @@ func (s *StreamingMiddleware) Unload() {
s.Logger().Debugf("Unloading streaming middleware %s", s.Spec.Name)

totalStreams := 0
s.streamManagers.Range(func(_, value interface{}) bool {
manager, ok := value.(*StreamManager)
if !ok {
return true
}
manager.streams.Range(func(_, _ interface{}) bool {
totalStreams++
return true
})
return true
})
globalStreamCounter.Add(-int64(totalStreams))

s.cancel()

s.Logger().Debug("Closing active streams")
s.streamManagerCache.Range(func(_, value interface{}) bool {
manager := value.(*StreamManager)
manager.streams.Range(func(_, streamValue interface{}) bool {
totalStreams++
if stream, ok := streamValue.(*streaming.Stream); ok {
if err := stream.Reset(); err != nil {
return true
Expand All @@ -454,7 +440,7 @@ func (s *StreamingMiddleware) Unload() {
return true
})

s.streamManagers = sync.Map{}
globalStreamCounter.Add(-int64(totalStreams))
s.streamManagerCache = sync.Map{}

s.Logger().Info("All streams successfully removed")
Expand Down

0 comments on commit 3b60f35

Please sign in to comment.