From 1f85c7fc01b2f0baa9b8ca6ad83f9152255125a2 Mon Sep 17 00:00:00 2001 From: dongmen <20351731+asddongmen@users.noreply.github.com> Date: Thu, 2 Jan 2025 15:37:27 +0800 Subject: [PATCH] tests: Add unit test for DS (#756) Signed-off-by: dongmen <414110582@qq.com> --- .github/workflows/integration_test_mysql.yaml | 3 +- .github/workflows/uint_test.yaml | 4 +- utils/dynstream/event_queue.go | 7 +- utils/dynstream/event_queue_test.go | 256 ++++++++++++++++++ utils/dynstream/memory_control.go | 13 - utils/dynstream/parallel_dynamic_stream.go | 2 +- .../dynstream/parallel_dynamic_stream_test.go | 134 +++++++++ utils/dynstream/stream.go | 18 +- utils/dynstream/stream_test.go | 109 +++++++- 9 files changed, 520 insertions(+), 26 deletions(-) create mode 100644 utils/dynstream/event_queue_test.go create mode 100644 utils/dynstream/parallel_dynamic_stream_test.go diff --git a/.github/workflows/integration_test_mysql.yaml b/.github/workflows/integration_test_mysql.yaml index ab0ac5a04..21c55c5a1 100644 --- a/.github/workflows/integration_test_mysql.yaml +++ b/.github/workflows/integration_test_mysql.yaml @@ -80,7 +80,8 @@ jobs: mkdir -p ./logs/$CASE cat $DIR/stdout.log tail -n 10 $DIR/cdc.log - sudo cp -r -L $DIR/{*.log,sync_diff} ./logs/$CASE/ + sudo cp -r -L $DIR/{*.log} ./logs/$CASE/ + sudo cp -r -L $DIR/{sync_diff} ./logs/$CASE/ || true sudo chown -R runner ./logs sudo tar -czvf ./logs.tar.gz ./logs # Update logs as artifact seems not stable, so we set `continue-on-error: true` here. diff --git a/.github/workflows/uint_test.yaml b/.github/workflows/uint_test.yaml index 9f1bda947..d4ee537cd 100644 --- a/.github/workflows/uint_test.yaml +++ b/.github/workflows/uint_test.yaml @@ -91,7 +91,7 @@ jobs: eventservice_unit_test: runs-on: ubuntu-latest - name: EventService Unit Test + name: Other Unit Tests steps: - name: Check out code uses: actions/checkout@v2 @@ -104,6 +104,8 @@ jobs: - name: Unit Test run: | go test --tags=intest -timeout 120s github.com/pingcap/ticdc/pkg/eventservice/... + go test --tags=intest -timeout 120s github.com/pingcap/ticdc/utils/dynstream... + diff --git a/utils/dynstream/event_queue.go b/utils/dynstream/event_queue.go index b7a11653c..6ffa8ae26 100644 --- a/utils/dynstream/event_queue.go +++ b/utils/dynstream/event_queue.go @@ -28,7 +28,7 @@ func newEventQueue[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]](optio option: option, handler: handler, eventBlockAlloc: deque.NewBlockAllocator[eventWrap[A, P, T, D, H]](32, 1024), - signalQueue: deque.NewDeque[eventSignal[A, P, T, D, H]](1024, deque.NewBlockAllocator[eventSignal[A, P, T, D, H]](1024, 32)), + signalQueue: deque.NewDeque(1024, deque.NewBlockAllocator[eventSignal[A, P, T, D, H]](1024, 32)), totalPendingLength: &atomic.Int64{}, } @@ -97,8 +97,8 @@ func (q *eventQueue[A, P, T, D, H]) popEvents(buf []T) ([]T, *pathInfo[A, P, T, if path.blocking || path.removed { // The path is blocking or removed, we should ignore the signal completely. // Since when it is waked, a signal event will be added to the queue. - q.signalQueue.PopFront() q.totalPendingLength.Add(-int64(signal.eventCount)) + q.signalQueue.PopFront() continue } @@ -114,6 +114,8 @@ func (q *eventQueue[A, P, T, D, H]) popEvents(buf []T) ([]T, *pathInfo[A, P, T, continue } firstGroup := firstEvent.eventType.DataGroup + firstProperty := firstEvent.eventType.Property + appendToBuf(firstEvent, path) // Try to batch events with the same data group. @@ -126,6 +128,7 @@ func (q *eventQueue[A, P, T, D, H]) popEvents(buf []T) ([]T, *pathInfo[A, P, T, // Only batch events with the same data group and when the event is batchable. if !ok || (firstGroup != front.eventType.DataGroup) || + firstProperty == NonBatchable || front.eventType.Property == NonBatchable { break } diff --git a/utils/dynstream/event_queue_test.go b/utils/dynstream/event_queue_test.go new file mode 100644 index 000000000..89782bdd7 --- /dev/null +++ b/utils/dynstream/event_queue_test.go @@ -0,0 +1,256 @@ +package dynstream + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestNewEventQueue(t *testing.T) { + handler := mockHandler{} + option := Option{BatchCount: 10} + + eq := newEventQueue(option, &handler) + + require.NotNil(t, eq.eventBlockAlloc) + require.NotNil(t, eq.signalQueue) + require.NotNil(t, eq.totalPendingLength) + require.Equal(t, option, eq.option) + require.Equal(t, &handler, eq.handler) + require.Equal(t, int64(0), eq.totalPendingLength.Load()) +} + +func TestAppendAndPopSingleEvent(t *testing.T) { + handler := mockHandler{} + eq := newEventQueue(Option{BatchCount: 10}, &handler) + + // create a path + path := newPathInfo[int, string, *mockEvent, any, *mockHandler](0, "test", nil) + eq.initPath(path) + + // append an event + event := eventWrap[int, string, *mockEvent, any, *mockHandler]{ + pathInfo: path, + event: &mockEvent{value: 1}, + eventType: EventType{ + DataGroup: 1, + Property: BatchableData, + }, + } + + eq.appendEvent(event) + + // verify the event is appended + require.Equal(t, int64(1), eq.totalPendingLength.Load()) + + // pop the event + buf := make([]*mockEvent, 0) + events, popPath := eq.popEvents(buf) + + require.Equal(t, 1, len(events)) + require.Equal(t, mockEvent{value: 1}, *events[0]) + require.Equal(t, path, popPath) + require.Equal(t, int64(0), eq.totalPendingLength.Load()) +} + +func TestBlockAndWakePath(t *testing.T) { + handler := mockHandler{} + eq := newEventQueue(Option{BatchCount: 10}, &handler) + + path := newPathInfo[int, string, *mockEvent, any, *mockHandler](0, "test", nil) + eq.initPath(path) + + // append an event + event := eventWrap[int, string, *mockEvent, any, *mockHandler]{ + pathInfo: path, + event: &mockEvent{value: 1}, + eventType: EventType{ + DataGroup: 1, + Property: BatchableData, + }, + } + eq.appendEvent(event) + + // block the path + eq.blockPath(path) + + // try to pop the event (should not pop) + buf := make([]*mockEvent, 0) + events, _ := eq.popEvents(buf) + require.Equal(t, 0, len(events)) + require.Equal(t, int64(0), eq.totalPendingLength.Load()) + + // wake the path + eq.wakePath(path) + require.Equal(t, int64(1), eq.totalPendingLength.Load()) + + // try to pop the event (should pop) + events, popPath := eq.popEvents(buf) + require.Equal(t, 1, len(events)) + require.Equal(t, &mockEvent{value: 1}, events[0]) + require.Equal(t, path, popPath) + require.Equal(t, int64(0), eq.totalPendingLength.Load()) +} + +func TestBatchEvents(t *testing.T) { + handler := mockHandler{} + eq := newEventQueue(Option{BatchCount: 3}, &handler) + + path := newPathInfo[int, string, *mockEvent, any, *mockHandler](0, "test", nil) + eq.initPath(path) + + // append multiple events with the same DataGroup + for i := 1; i <= 5; i++ { + event := eventWrap[int, string, *mockEvent, any, *mockHandler]{ + pathInfo: path, + event: &mockEvent{value: i}, + eventType: EventType{ + DataGroup: 1, + Property: BatchableData, + }, + } + eq.appendEvent(event) + } + + // verify the batch pop + buf := make([]*mockEvent, 0) + events, _ := eq.popEvents(buf) + + // since BatchCount = 3, only the first 3 events should be popped + require.Equal(t, 3, len(events)) + require.Equal(t, &mockEvent{value: 1}, events[0]) + require.Equal(t, &mockEvent{value: 2}, events[1]) + require.Equal(t, &mockEvent{value: 3}, events[2]) + + // verify the remaining event count + require.Equal(t, int64(2), eq.totalPendingLength.Load()) +} + +func TestBatchableAndNonBatchableEvents(t *testing.T) { + handler := mockHandler{} + eq := newEventQueue(Option{BatchCount: 3}, &handler) + + path := newPathInfo[int, string, *mockEvent, any, *mockHandler](0, "test", nil) + eq.initPath(path) + + // append a non-batchable event + event1 := eventWrap[int, string, *mockEvent, any, *mockHandler]{ + pathInfo: path, + event: &mockEvent{value: 1}, + eventType: EventType{ + DataGroup: 1, + Property: NonBatchable, + }, + } + eq.appendEvent(event1) + + // append 2 batchable events + for i := 1; i <= 2; i++ { + e := eventWrap[int, string, *mockEvent, any, *mockHandler]{ + pathInfo: path, + event: &mockEvent{value: i}, + eventType: EventType{ + DataGroup: 1, + Property: BatchableData, + }, + } + eq.appendEvent(e) + } + + // add 2 non-batchable events + for i := 1; i <= 2; i++ { + e := eventWrap[int, string, *mockEvent, any, *mockHandler]{ + pathInfo: path, + event: &mockEvent{value: i}, + eventType: EventType{ + DataGroup: 1, + Property: NonBatchable, + }, + } + eq.appendEvent(e) + } + + // append 5 batchable events + for i := 1; i <= 5; i++ { + e := eventWrap[int, string, *mockEvent, any, *mockHandler]{ + pathInfo: path, + event: &mockEvent{value: i}, + eventType: EventType{ + DataGroup: 1, + Property: BatchableData, + }, + } + eq.appendEvent(e) + } + + // case 1: pop the first non-batchable event + buf := make([]*mockEvent, 0) + events, _ := eq.popEvents(buf) + require.Equal(t, 1, len(events)) + require.Equal(t, &mockEvent{value: 1}, events[0]) + require.Equal(t, int64(9), eq.totalPendingLength.Load()) + + // case 2: pop the first 2 batchable event + buf = make([]*mockEvent, 0) + events, _ = eq.popEvents(buf) + require.Equal(t, 2, len(events)) + require.Equal(t, &mockEvent{value: 1}, events[0]) + require.Equal(t, &mockEvent{value: 2}, events[1]) + require.Equal(t, int64(7), eq.totalPendingLength.Load()) + + // case 3: pop a non-batchable event + buf = make([]*mockEvent, 0) + events, _ = eq.popEvents(buf) + require.Equal(t, 1, len(events)) + require.Equal(t, &mockEvent{value: 1}, events[0]) + require.Equal(t, int64(6), eq.totalPendingLength.Load()) + + // case 4: pop the second non-batchable event + buf = make([]*mockEvent, 0) + events, _ = eq.popEvents(buf) + require.Equal(t, 1, len(events)) + require.Equal(t, &mockEvent{value: 2}, events[0]) + require.Equal(t, int64(5), eq.totalPendingLength.Load()) + + // case 5: pop the first 3 batchable events + buf = make([]*mockEvent, 0) + events, _ = eq.popEvents(buf) + require.Equal(t, 3, len(events)) + require.Equal(t, &mockEvent{value: 1}, events[0]) + require.Equal(t, &mockEvent{value: 2}, events[1]) + require.Equal(t, &mockEvent{value: 3}, events[2]) + require.Equal(t, int64(2), eq.totalPendingLength.Load()) + + // case 6: pop the remaining 2 batchable events + buf = make([]*mockEvent, 0) + events, _ = eq.popEvents(buf) + require.Equal(t, 2, len(events)) + require.Equal(t, &mockEvent{value: 4}, events[0]) + require.Equal(t, &mockEvent{value: 5}, events[1]) + require.Equal(t, int64(0), eq.totalPendingLength.Load()) +} + +func TestRemovePath(t *testing.T) { + handler := mockHandler{} + eq := newEventQueue(Option{BatchCount: 3}, &handler) + + path := newPathInfo[int, string, *mockEvent, any, *mockHandler](0, "test", nil) + eq.initPath(path) + + e := eventWrap[int, string, *mockEvent, any, *mockHandler]{ + pathInfo: path, + event: &mockEvent{value: 1}, + eventType: EventType{ + DataGroup: 1, + Property: BatchableData, + }, + } + eq.appendEvent(e) + require.Equal(t, int64(1), eq.totalPendingLength.Load()) + + path.removed = true + buf := make([]*mockEvent, 0) + events, _ := eq.popEvents(buf) + require.Equal(t, 0, len(events)) + require.Equal(t, int64(0), eq.totalPendingLength.Load()) +} diff --git a/utils/dynstream/memory_control.go b/utils/dynstream/memory_control.go index 353c1eacb..b511d2383 100644 --- a/utils/dynstream/memory_control.go +++ b/utils/dynstream/memory_control.go @@ -214,16 +214,3 @@ func (m *memControl[A, P, T, D, H]) getMetrics() (usedMemory int64, maxMemory in } return usedMemory, maxMemory } - -func (p *pathInfo[A, P, T, D, H]) SetHeapIndex(index int) { - p.sizeHeapIndex = index -} - -func (p *pathInfo[A, P, T, D, H]) GetHeapIndex() int { - return p.sizeHeapIndex -} - -func (p *pathInfo[A, P, T, D, H]) LessThan(other *pathInfo[A, P, T, D, H]) bool { - // pathSizeHeap should be in descending order. That say the node with the largest pending size is the top. - return p.pendingSize.Load() > other.pendingSize.Load() -} diff --git a/utils/dynstream/parallel_dynamic_stream.go b/utils/dynstream/parallel_dynamic_stream.go index a9fd7ea4a..a4f2bb538 100644 --- a/utils/dynstream/parallel_dynamic_stream.go +++ b/utils/dynstream/parallel_dynamic_stream.go @@ -128,7 +128,7 @@ func (s *parallelDynamicStream[A, P, T, D, H]) AddPath(path P, dest D, as ...Are s.pathMap[path] = pi s.setMemControl(pi, as...) - pi.stream.in() <- eventWrap[A, P, T, D, H]{pathInfo: pi, newPath: true} + pi.stream.addPath(pi) s._statAddPathCount.Add(1) return nil diff --git a/utils/dynstream/parallel_dynamic_stream_test.go b/utils/dynstream/parallel_dynamic_stream_test.go new file mode 100644 index 000000000..4f3078f1f --- /dev/null +++ b/utils/dynstream/parallel_dynamic_stream_test.go @@ -0,0 +1,134 @@ +package dynstream + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// Mock hasher +func mockHasher(p string) uint64 { + return uint64(len(p)) +} + +func TestParallelDynamicStreamBasic(t *testing.T) { + handler := &mockHandler{} + option := Option{StreamCount: 4} + stream := NewParallelDynamicStream(mockHasher, handler, option) + stream.Start() + defer stream.Close() + + t.Run("add path", func(t *testing.T) { + err := stream.AddPath("path1", "dest1") + require.NoError(t, err) + // Test duplicate path + err = stream.AddPath("path1", "dest1") + require.Error(t, err) + }) + + t.Run("remove path", func(t *testing.T) { + err := stream.RemovePath("path1") + require.NoError(t, err) + // Test non-existent path + err = stream.RemovePath("path1") + require.Error(t, err) + }) +} + +func TestParallelDynamicStreamPush(t *testing.T) { + handler := &mockHandler{} + option := Option{StreamCount: 4} + stream := newParallelDynamicStream(mockHasher, handler, option) + stream.Start() + defer stream.Close() + + // case 1: push to non-existent path + event := mockEvent{id: 1, path: "non-existent", value: 10, sleep: 10 * time.Millisecond} + stream.Push("non-existent", &event) // Should be dropped silently + require.Equal(t, 1, len(handler.droppedEvents)) + require.Equal(t, event, *handler.droppedEvents[0]) + handler.droppedEvents = handler.droppedEvents[:0] + + // case 2: push to existing path + path := "test/path" + err := stream.AddPath(path, "dest1") + require.NoError(t, err) + event = mockEvent{id: 1, path: path, value: 10, sleep: 10 * time.Millisecond} + stream.Push(path, &event) + require.Equal(t, 0, len(handler.droppedEvents)) +} + +func TestParallelDynamicStreamMetrics(t *testing.T) { + handler := &mockHandler{} + option := Option{StreamCount: 4} + stream := newParallelDynamicStream(mockHasher, handler, option) + + stream.Start() + defer stream.Close() + + // Add some paths + err := stream.AddPath("path1", "dest1") + require.NoError(t, err) + err = stream.AddPath("path2", "dest2") + require.NoError(t, err) + + // Remove one path + err = stream.RemovePath("path1") + require.NoError(t, err) + + metrics := stream.GetMetrics() + require.Equal(t, 2, metrics.AddPath) + require.Equal(t, 1, metrics.RemovePath) +} + +func TestParallelDynamicStreamMemoryControl(t *testing.T) { + handler := &mockHandler{} + option := Option{ + StreamCount: 4, + EnableMemoryControl: true, + } + stream := newParallelDynamicStream(mockHasher, handler, option) + + stream.Start() + defer stream.Close() + + // case 1: memory control enabled + require.NotNil(t, stream.memControl) + require.NotNil(t, stream.feedbackChan) + settings := AreaSettings{MaxPendingSize: 1024, FeedbackInterval: 10 * time.Millisecond} + // The path is belong to area 0 + stream.AddPath("path1", "dest1", settings) + stream.mutex.Lock() + require.Equal(t, 1, len(stream.pathMap)) + pi := stream.pathMap["path1"] + stream.mutex.Unlock() + require.Equal(t, 0, pi.area) + require.Equal(t, 1024, pi.areaMemStat.settings.Load().MaxPendingSize) + require.Equal(t, 10*time.Millisecond, pi.areaMemStat.settings.Load().FeedbackInterval) + + // case 2: add event to the path + startNotify := &sync.WaitGroup{} + doneNotify := &sync.WaitGroup{} + inc := &atomic.Int64{} + work := newInc(1, inc) + stream.Push("path1", newMockEvent(1, "path1", 10*time.Millisecond, work, startNotify, doneNotify)) + startNotify.Wait() + require.Equal(t, int64(0), inc.Load()) + doneNotify.Wait() + require.Equal(t, int64(1), inc.Load()) +} + +func TestFeedBack(t *testing.T) { + fb1 := Feedback[int, string, any]{ + FeedbackType: 0, + } + require.False(t, fb1.IsAreaFeedback()) + require.False(t, fb1.PauseArea) + require.False(t, fb1.PausePath) + + fb1.PauseArea = true + require.True(t, fb1.PauseArea) +} diff --git a/utils/dynstream/stream.go b/utils/dynstream/stream.go index dfab9999d..299eb18d3 100644 --- a/utils/dynstream/stream.go +++ b/utils/dynstream/stream.go @@ -37,7 +37,7 @@ type stream[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]] struct { isClosed atomic.Bool - handleWg sync.WaitGroup + wg sync.WaitGroup startTime time.Time } @@ -66,6 +66,10 @@ func newStream[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]]( return s } +func (s *stream[A, P, T, D, H]) addPath(path *pathInfo[A, P, T, D, H]) { + s.in() <- eventWrap[A, P, T, D, H]{pathInfo: path, newPath: true} +} + func (s *stream[A, P, T, D, H]) getPendingSize() int { if s.option.UseBuffer { return len(s.inChan) + int(s.bufferCount.Load()) + len(s.outChan) + int(s.eventQueue.totalPendingLength.Load()) @@ -90,10 +94,11 @@ func (s *stream[A, P, T, D, H]) start() { } if s.option.UseBuffer { + s.wg.Add(1) go s.receiver() } - s.handleWg.Add(1) + s.wg.Add(1) go s.handleLoop() } @@ -108,7 +113,7 @@ func (s *stream[A, P, T, D, H]) close(wait ...bool) { } } if len(wait) == 0 || wait[0] { - s.handleWg.Wait() + s.wg.Wait() } } @@ -127,7 +132,9 @@ func (s *stream[A, P, T, D, H]) receiver() { } } close(s.outChan) + s.wg.Done() }() + for { event, ok := buffer.FrontRef() if !ok { @@ -182,7 +189,7 @@ func (s *stream[A, P, T, D, H]) handleLoop() { handleEvent(e) } - s.handleWg.Done() + s.wg.Done() }() // Variables below will be used in the Loop below. @@ -268,8 +275,7 @@ type pathInfo[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]] struct { pendingQueue *deque.Deque[eventWrap[A, P, T, D, H]] // Fields used by the memory control. - areaMemStat *areaMemStat[A, P, T, D, H] - sizeHeapIndex int + areaMemStat *areaMemStat[A, P, T, D, H] pendingSize atomic.Uint32 // The total size(bytes) of pending events in the pendingQueue of the path. paused atomic.Bool // The path is paused to send events. diff --git a/utils/dynstream/stream_test.go b/utils/dynstream/stream_test.go index b261d04dc..87a9f3774 100644 --- a/utils/dynstream/stream_test.go +++ b/utils/dynstream/stream_test.go @@ -3,7 +3,10 @@ package dynstream import ( "sync" "sync/atomic" + "testing" "time" + + "github.com/stretchr/testify/require" ) type mockWork interface { @@ -13,6 +16,7 @@ type mockWork interface { type mockEvent struct { id int path string + value int sleep time.Duration work mockWork @@ -76,10 +80,111 @@ func (h *mockHandler) drainDroppedEvents() []*mockEvent { } type Inc struct { - num int64 - inc *atomic.Int64 + num int64 + inc *atomic.Int64 + notify *sync.WaitGroup } func (i *Inc) Do() { i.inc.Add(i.num) + if i.notify != nil { + i.notify.Done() + } +} + +func newInc(num int64, inc *atomic.Int64, notify ...*sync.WaitGroup) *Inc { + i := &Inc{num: num, inc: inc} + if len(notify) > 0 { + i.notify = notify[0] + i.notify.Add(1) + } + return i +} + +func TestStreamBasic(t *testing.T) { + handler := mockHandler{} + stream := newStream(1, &handler, Option{UseBuffer: false}) + require.Equal(t, 0, stream.getPendingSize()) + + stream.start() + defer stream.close() + pi := newPathInfo[int, string, *mockEvent, any, *mockHandler](1, "test/path", nil) + stream.addPath(pi) + // Test basic event handling + inc := &atomic.Int64{} + // notify is used to wait for the event to be processed + notify := &sync.WaitGroup{} + + newEvent := func(num int) eventWrap[int, string, *mockEvent, any, *mockHandler] { + return eventWrap[int, string, *mockEvent, any, *mockHandler]{ + pathInfo: pi, + event: newMockEvent(num, pi.path, 0, newInc(int64(num), inc, notify), nil, nil), + } + } + + // Send event to stream + stream.in() <- newEvent(1) + + notify.Wait() + // Verify event was processed + require.Equal(t, int64(1), inc.Load()) + require.Equal(t, 0, stream.getPendingSize()) + + // Test multiple events + stream.in() <- newEvent(2) + stream.in() <- newEvent(3) + + notify.Wait() + // Verify all events were processed + require.Equal(t, int64(6), inc.Load()) // 1 + 2 + 3 = 6 + require.Equal(t, 0, stream.getPendingSize()) +} + +func TestStreamBasicWithBuffer(t *testing.T) { + handler := mockHandler{} + stream := newStream(1, &handler, Option{UseBuffer: true}) + require.Equal(t, 0, stream.getPendingSize()) + + stream.start() + defer stream.close() + pi := newPathInfo[int, string, *mockEvent, any, *mockHandler](1, "test/path", nil) + stream.addPath(pi) + // Test basic event handling + inc := &atomic.Int64{} + // notify is used to wait for the event to be processed + notify := &sync.WaitGroup{} + + newEvent := func(num int) eventWrap[int, string, *mockEvent, any, *mockHandler] { + return eventWrap[int, string, *mockEvent, any, *mockHandler]{ + pathInfo: pi, + event: newMockEvent(num, pi.path, 0, newInc(int64(num), inc, notify), nil, nil), + } + } + + // Send event to stream + stream.in() <- newEvent(1) + + notify.Wait() + // Verify event was processed + require.Equal(t, int64(1), inc.Load()) + require.Equal(t, 0, stream.getPendingSize()) + + // Test multiple events + stream.in() <- newEvent(2) + stream.in() <- newEvent(3) + + notify.Wait() + // Verify all events were processed + require.Equal(t, int64(6), inc.Load()) // 1 + 2 + 3 = 6 + require.Equal(t, 0, stream.getPendingSize()) +} + +func TestPathInfo(t *testing.T) { + // case 1: new path info + pi := newPathInfo[int, string, *mockEvent, any, *mockHandler](1, "test/path", nil) + require.Equal(t, 1, pi.area) + require.Equal(t, "test/path", pi.path) + require.Equal(t, uint32(0), pi.pendingSize.Load()) + require.Equal(t, false, pi.paused.Load()) + require.Equal(t, time.Unix(0, 0), pi.lastSendFeedbackTime.Load()) }