Skip to content

Commit

Permalink
test: refactored waitinghandler unit-tests
Browse files Browse the repository at this point in the history
  • Loading branch information
WRichter72 committed Jan 27, 2025
1 parent 1e41502 commit b49cc5c
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 30 deletions.
30 changes: 19 additions & 11 deletions internal/handler/waiting.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,18 @@ import (
"time"
)

var funcGetRepublishingCacheMap = getRepublishingCacheMap
var funcGetCircuitBreakerCacheMap = getCircuitBreakerCacheMap
type (
WaitingHandlerInterface interface {
CheckWaitingEvents()
GetCircuitBreakerSubscriptionsMap() (map[string]struct{}, error)
GetRepublishingSubscriptionsMap() (map[string]struct{}, error)
}
waitingHandler struct{}
)

var WaitingHandlerService WaitingHandlerInterface = new(waitingHandler)

func CheckWaitingEvents() {
func (waitingHandler *waitingHandler) CheckWaitingEvents() {
log.Info().Msgf("Republish messages stucked in state WAITING")

minMessageAge := config.Current.WaitingHandler.MinMessageAge
Expand Down Expand Up @@ -47,14 +55,14 @@ func CheckWaitingEvents() {
}

// Get all republishing cache entries
republishingSubscriptionMap, err := funcGetRepublishingCacheMap()
republishingSubscriptionsMap, err := WaitingHandlerService.GetRepublishingSubscriptionsMap()
if err != nil {
log.Error().Err(err).Msgf("Error while fetching rebublishing cache entries for events stucked in state WAITING")
return
}

// Get all circuit-breaker entries with status OPEN
circuitBreakerSubscriptionMap, err := funcGetCircuitBreakerCacheMap()
circuitBreakerSubscriptionsMap, err := WaitingHandlerService.GetCircuitBreakerSubscriptionsMap()
if err != nil {
log.Error().Err(err).Msgf("Error while fetching circuit breaker cache entries for events stucked in state WAITING")
return
Expand All @@ -63,8 +71,8 @@ func CheckWaitingEvents() {
// Check if subscription is in republishing cache or in circuit breaker cache. If not create a republishing cache entry
for _, subscriptionId := range dbSubscriptionsForWaitingEvents {
log.Debug().Msgf("Checking subscription for events stucked in state WAITING. subscription: %v", subscriptionId)
if _, inRepublishing := republishingSubscriptionMap[subscriptionId]; !inRepublishing {
if _, inCircuitBreaker := circuitBreakerSubscriptionMap[subscriptionId]; !inCircuitBreaker {
if _, inRepublishing := republishingSubscriptionsMap[subscriptionId]; !inRepublishing {
if _, inCircuitBreaker := circuitBreakerSubscriptionsMap[subscriptionId]; !inCircuitBreaker {
log.Warn().Msgf("Subscription %v has waiting messages and no circuitbreaker entry or republishing entry. Events stucked in state WAITING", subscriptionId)

// Create republishing cache entry for subscription with stuck waiting events
Expand All @@ -83,12 +91,12 @@ func CheckWaitingEvents() {
}

// ToDo Only for testing
log.Info().Msgf("Found republishing entries: %v", republishingSubscriptionMap)
log.Info().Msgf("Found circuitbreaker entries: %v", circuitBreakerSubscriptionMap)
log.Info().Msgf("Found republishing entries: %v", republishingSubscriptionsMap)
log.Info().Msgf("Found circuitbreaker entries: %v", circuitBreakerSubscriptionsMap)
log.Info().Msgf("Found waiting messages: %v", dbSubscriptionsForWaitingEvents)
}

func getCircuitBreakerCacheMap() (map[string]struct{}, error) {
func (waitingHandler *waitingHandler) GetCircuitBreakerSubscriptionsMap() (map[string]struct{}, error) {

statusQuery := predicate.Equal("status", string(enum.CircuitBreakerStatusOpen))
circuitBreakerEntries, err := cache.CircuitBreakerCache.GetQuery(config.Current.Hazelcast.Caches.CircuitBreakerCache, statusQuery)
Expand All @@ -103,7 +111,7 @@ func getCircuitBreakerCacheMap() (map[string]struct{}, error) {
return circuitBreakerMap, nil
}

func getRepublishingCacheMap() (map[string]struct{}, error) {
func (waitingHandler *waitingHandler) GetRepublishingSubscriptionsMap() (map[string]struct{}, error) {

cacheRepublishingEntries, err := cache.RepublishingCache.GetEntrySet(context.Background())
if err != nil {
Expand Down
28 changes: 10 additions & 18 deletions internal/handler/waiting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,14 @@ import (
"testing"
)

var Mock = mock.Mock{}

//func getCircuitBreakerCacheMap() (map[string]struct{}, error) {
// args := Mock.Called()
// return args.Get(0).(map[string]struct{}), args.Error(1)
//}

func TestCheckWaitingEvents(t *testing.T) {

mockMongo := new(test.MockMongoHandler)
mongo.CurrentConnection = mockMongo

mockWaitingHandler := new(test.MockWaitingHandler)
WaitingHandlerService = mockWaitingHandler

mockCircuitBreakerCache := new(test.CircuitBreakerMockCache)
cache.CircuitBreakerCache = mockCircuitBreakerCache

Expand All @@ -34,27 +30,23 @@ func TestCheckWaitingEvents(t *testing.T) {
mockHandlerCache := new(test.MockHandlerCache)
cache.HandlerCache = mockHandlerCache

// Testdata
// Prepare testdata
var mockedDbSubscriptionIds = []string{"subscription-1", "subscription-2"}
mockedCircuitBreakerSubscriptionsMap := map[string]struct{}{"subscription-1": struct{}{}, "subscription-2": struct{}{}}

// Mocks
// Prepare mocks
mockHandlerCache.On("NewLockContext", mock.Anything).Return(context.Background())
mockHandlerCache.On("TryLockWithTimeout", mock.Anything, mock.Anything, mock.Anything).Return(true, nil)
mockHandlerCache.On("Unlock", mock.Anything, mock.Anything).Return(nil)

mockMongo.On("FindDistinctSubscriptionsForWaitingEvents", mock.Anything, mock.Anything).Return(mockedDbSubscriptionIds, nil)

funcGetCircuitBreakerCacheMap = func() (map[string]struct{}, error) {
return mockedCircuitBreakerSubscriptionsMap, nil
}

funcGetRepublishingCacheMap = func() (map[string]struct{}, error) {
return nil, nil
}
mockWaitingHandler.On("GetCircuitBreakerSubscriptionsMap").Return(mockedCircuitBreakerSubscriptionsMap, nil)
mockWaitingHandler.On("GetRepublishingSubscriptionsMap").Return(mockedCircuitBreakerSubscriptionsMap, nil)

// Call the function to test
CheckWaitingEvents()
// Call function to test
waitingHandler := new(waitingHandler)
waitingHandler.CheckWaitingEvents()

// Assertions
mockMongo.AssertExpectations(t)
Expand Down
2 changes: 1 addition & 1 deletion internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func StartScheduler() {

// Schedule the task for checking for stuck waiting events
if _, err := scheduler.Every(config.Current.WaitingHandler.CheckInterval).Do(func() {
handler.CheckWaitingEvents()
handler.WaitingHandlerService.CheckWaitingEvents()
}); err != nil {
log.Error().Err(err).Msgf("Error while scheduling WAITING-Handler: %v", err)
}
Expand Down
21 changes: 21 additions & 0 deletions internal/test/waitinghandler_mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package test

import "github.com/stretchr/testify/mock"

type MockWaitingHandler struct {
mock.Mock
}

func (f *MockWaitingHandler) CheckWaitingEvents() {
return
}

func (f *MockWaitingHandler) GetCircuitBreakerSubscriptionsMap() (map[string]struct{}, error) {
args := f.Called()
return args.Get(0).(map[string]struct{}), args.Error(1)
}

func (f *MockWaitingHandler) GetRepublishingSubscriptionsMap() (map[string]struct{}, error) {
args := f.Called()
return args.Get(0).(map[string]struct{}), args.Error(1)
}

0 comments on commit b49cc5c

Please sign in to comment.