Skip to content

Commit

Permalink
tests: Add unit test for DS (#756)
Browse files Browse the repository at this point in the history
Signed-off-by: dongmen <[email protected]>
  • Loading branch information
asddongmen authored Jan 2, 2025
1 parent 15926d5 commit 1f85c7f
Show file tree
Hide file tree
Showing 9 changed files with 520 additions and 26 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/integration_test_mysql.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ jobs:
mkdir -p ./logs/$CASE
cat $DIR/stdout.log
tail -n 10 $DIR/cdc.log
sudo cp -r -L $DIR/{*.log,sync_diff} ./logs/$CASE/
sudo cp -r -L $DIR/{*.log} ./logs/$CASE/
sudo cp -r -L $DIR/{sync_diff} ./logs/$CASE/ || true
sudo chown -R runner ./logs
sudo tar -czvf ./logs.tar.gz ./logs
# Update logs as artifact seems not stable, so we set `continue-on-error: true` here.
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/uint_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ jobs:
eventservice_unit_test:
runs-on: ubuntu-latest
name: EventService Unit Test
name: Other Unit Tests
steps:
- name: Check out code
uses: actions/checkout@v2
Expand All @@ -104,6 +104,8 @@ jobs:
- name: Unit Test
run: |
go test --tags=intest -timeout 120s github.com/pingcap/ticdc/pkg/eventservice/...
go test --tags=intest -timeout 120s github.com/pingcap/ticdc/utils/dynstream...
Expand Down
7 changes: 5 additions & 2 deletions utils/dynstream/event_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func newEventQueue[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]](optio
option: option,
handler: handler,
eventBlockAlloc: deque.NewBlockAllocator[eventWrap[A, P, T, D, H]](32, 1024),
signalQueue: deque.NewDeque[eventSignal[A, P, T, D, H]](1024, deque.NewBlockAllocator[eventSignal[A, P, T, D, H]](1024, 32)),
signalQueue: deque.NewDeque(1024, deque.NewBlockAllocator[eventSignal[A, P, T, D, H]](1024, 32)),
totalPendingLength: &atomic.Int64{},
}

Expand Down Expand Up @@ -97,8 +97,8 @@ func (q *eventQueue[A, P, T, D, H]) popEvents(buf []T) ([]T, *pathInfo[A, P, T,
if path.blocking || path.removed {
// The path is blocking or removed, we should ignore the signal completely.
// Since when it is waked, a signal event will be added to the queue.
q.signalQueue.PopFront()
q.totalPendingLength.Add(-int64(signal.eventCount))
q.signalQueue.PopFront()
continue
}

Expand All @@ -114,6 +114,8 @@ func (q *eventQueue[A, P, T, D, H]) popEvents(buf []T) ([]T, *pathInfo[A, P, T,
continue
}
firstGroup := firstEvent.eventType.DataGroup
firstProperty := firstEvent.eventType.Property

appendToBuf(firstEvent, path)

// Try to batch events with the same data group.
Expand All @@ -126,6 +128,7 @@ func (q *eventQueue[A, P, T, D, H]) popEvents(buf []T) ([]T, *pathInfo[A, P, T,
// Only batch events with the same data group and when the event is batchable.
if !ok ||
(firstGroup != front.eventType.DataGroup) ||
firstProperty == NonBatchable ||
front.eventType.Property == NonBatchable {
break
}
Expand Down
256 changes: 256 additions & 0 deletions utils/dynstream/event_queue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,256 @@
package dynstream

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestNewEventQueue(t *testing.T) {
handler := mockHandler{}
option := Option{BatchCount: 10}

eq := newEventQueue(option, &handler)

require.NotNil(t, eq.eventBlockAlloc)
require.NotNil(t, eq.signalQueue)
require.NotNil(t, eq.totalPendingLength)
require.Equal(t, option, eq.option)
require.Equal(t, &handler, eq.handler)
require.Equal(t, int64(0), eq.totalPendingLength.Load())
}

func TestAppendAndPopSingleEvent(t *testing.T) {
handler := mockHandler{}
eq := newEventQueue(Option{BatchCount: 10}, &handler)

// create a path
path := newPathInfo[int, string, *mockEvent, any, *mockHandler](0, "test", nil)
eq.initPath(path)

// append an event
event := eventWrap[int, string, *mockEvent, any, *mockHandler]{
pathInfo: path,
event: &mockEvent{value: 1},
eventType: EventType{
DataGroup: 1,
Property: BatchableData,
},
}

eq.appendEvent(event)

// verify the event is appended
require.Equal(t, int64(1), eq.totalPendingLength.Load())

// pop the event
buf := make([]*mockEvent, 0)
events, popPath := eq.popEvents(buf)

require.Equal(t, 1, len(events))
require.Equal(t, mockEvent{value: 1}, *events[0])
require.Equal(t, path, popPath)
require.Equal(t, int64(0), eq.totalPendingLength.Load())
}

func TestBlockAndWakePath(t *testing.T) {
handler := mockHandler{}
eq := newEventQueue(Option{BatchCount: 10}, &handler)

path := newPathInfo[int, string, *mockEvent, any, *mockHandler](0, "test", nil)
eq.initPath(path)

// append an event
event := eventWrap[int, string, *mockEvent, any, *mockHandler]{
pathInfo: path,
event: &mockEvent{value: 1},
eventType: EventType{
DataGroup: 1,
Property: BatchableData,
},
}
eq.appendEvent(event)

// block the path
eq.blockPath(path)

// try to pop the event (should not pop)
buf := make([]*mockEvent, 0)
events, _ := eq.popEvents(buf)
require.Equal(t, 0, len(events))
require.Equal(t, int64(0), eq.totalPendingLength.Load())

// wake the path
eq.wakePath(path)
require.Equal(t, int64(1), eq.totalPendingLength.Load())

// try to pop the event (should pop)
events, popPath := eq.popEvents(buf)
require.Equal(t, 1, len(events))
require.Equal(t, &mockEvent{value: 1}, events[0])
require.Equal(t, path, popPath)
require.Equal(t, int64(0), eq.totalPendingLength.Load())
}

func TestBatchEvents(t *testing.T) {
handler := mockHandler{}
eq := newEventQueue(Option{BatchCount: 3}, &handler)

path := newPathInfo[int, string, *mockEvent, any, *mockHandler](0, "test", nil)
eq.initPath(path)

// append multiple events with the same DataGroup
for i := 1; i <= 5; i++ {
event := eventWrap[int, string, *mockEvent, any, *mockHandler]{
pathInfo: path,
event: &mockEvent{value: i},
eventType: EventType{
DataGroup: 1,
Property: BatchableData,
},
}
eq.appendEvent(event)
}

// verify the batch pop
buf := make([]*mockEvent, 0)
events, _ := eq.popEvents(buf)

// since BatchCount = 3, only the first 3 events should be popped
require.Equal(t, 3, len(events))
require.Equal(t, &mockEvent{value: 1}, events[0])
require.Equal(t, &mockEvent{value: 2}, events[1])
require.Equal(t, &mockEvent{value: 3}, events[2])

// verify the remaining event count
require.Equal(t, int64(2), eq.totalPendingLength.Load())
}

func TestBatchableAndNonBatchableEvents(t *testing.T) {
handler := mockHandler{}
eq := newEventQueue(Option{BatchCount: 3}, &handler)

path := newPathInfo[int, string, *mockEvent, any, *mockHandler](0, "test", nil)
eq.initPath(path)

// append a non-batchable event
event1 := eventWrap[int, string, *mockEvent, any, *mockHandler]{
pathInfo: path,
event: &mockEvent{value: 1},
eventType: EventType{
DataGroup: 1,
Property: NonBatchable,
},
}
eq.appendEvent(event1)

// append 2 batchable events
for i := 1; i <= 2; i++ {
e := eventWrap[int, string, *mockEvent, any, *mockHandler]{
pathInfo: path,
event: &mockEvent{value: i},
eventType: EventType{
DataGroup: 1,
Property: BatchableData,
},
}
eq.appendEvent(e)
}

// add 2 non-batchable events
for i := 1; i <= 2; i++ {
e := eventWrap[int, string, *mockEvent, any, *mockHandler]{
pathInfo: path,
event: &mockEvent{value: i},
eventType: EventType{
DataGroup: 1,
Property: NonBatchable,
},
}
eq.appendEvent(e)
}

// append 5 batchable events
for i := 1; i <= 5; i++ {
e := eventWrap[int, string, *mockEvent, any, *mockHandler]{
pathInfo: path,
event: &mockEvent{value: i},
eventType: EventType{
DataGroup: 1,
Property: BatchableData,
},
}
eq.appendEvent(e)
}

// case 1: pop the first non-batchable event
buf := make([]*mockEvent, 0)
events, _ := eq.popEvents(buf)
require.Equal(t, 1, len(events))
require.Equal(t, &mockEvent{value: 1}, events[0])
require.Equal(t, int64(9), eq.totalPendingLength.Load())

// case 2: pop the first 2 batchable event
buf = make([]*mockEvent, 0)
events, _ = eq.popEvents(buf)
require.Equal(t, 2, len(events))
require.Equal(t, &mockEvent{value: 1}, events[0])
require.Equal(t, &mockEvent{value: 2}, events[1])
require.Equal(t, int64(7), eq.totalPendingLength.Load())

// case 3: pop a non-batchable event
buf = make([]*mockEvent, 0)
events, _ = eq.popEvents(buf)
require.Equal(t, 1, len(events))
require.Equal(t, &mockEvent{value: 1}, events[0])
require.Equal(t, int64(6), eq.totalPendingLength.Load())

// case 4: pop the second non-batchable event
buf = make([]*mockEvent, 0)
events, _ = eq.popEvents(buf)
require.Equal(t, 1, len(events))
require.Equal(t, &mockEvent{value: 2}, events[0])
require.Equal(t, int64(5), eq.totalPendingLength.Load())

// case 5: pop the first 3 batchable events
buf = make([]*mockEvent, 0)
events, _ = eq.popEvents(buf)
require.Equal(t, 3, len(events))
require.Equal(t, &mockEvent{value: 1}, events[0])
require.Equal(t, &mockEvent{value: 2}, events[1])
require.Equal(t, &mockEvent{value: 3}, events[2])
require.Equal(t, int64(2), eq.totalPendingLength.Load())

// case 6: pop the remaining 2 batchable events
buf = make([]*mockEvent, 0)
events, _ = eq.popEvents(buf)
require.Equal(t, 2, len(events))
require.Equal(t, &mockEvent{value: 4}, events[0])
require.Equal(t, &mockEvent{value: 5}, events[1])
require.Equal(t, int64(0), eq.totalPendingLength.Load())
}

func TestRemovePath(t *testing.T) {
handler := mockHandler{}
eq := newEventQueue(Option{BatchCount: 3}, &handler)

path := newPathInfo[int, string, *mockEvent, any, *mockHandler](0, "test", nil)
eq.initPath(path)

e := eventWrap[int, string, *mockEvent, any, *mockHandler]{
pathInfo: path,
event: &mockEvent{value: 1},
eventType: EventType{
DataGroup: 1,
Property: BatchableData,
},
}
eq.appendEvent(e)
require.Equal(t, int64(1), eq.totalPendingLength.Load())

path.removed = true
buf := make([]*mockEvent, 0)
events, _ := eq.popEvents(buf)
require.Equal(t, 0, len(events))
require.Equal(t, int64(0), eq.totalPendingLength.Load())
}
13 changes: 0 additions & 13 deletions utils/dynstream/memory_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,16 +214,3 @@ func (m *memControl[A, P, T, D, H]) getMetrics() (usedMemory int64, maxMemory in
}
return usedMemory, maxMemory
}

func (p *pathInfo[A, P, T, D, H]) SetHeapIndex(index int) {
p.sizeHeapIndex = index
}

func (p *pathInfo[A, P, T, D, H]) GetHeapIndex() int {
return p.sizeHeapIndex
}

func (p *pathInfo[A, P, T, D, H]) LessThan(other *pathInfo[A, P, T, D, H]) bool {
// pathSizeHeap should be in descending order. That say the node with the largest pending size is the top.
return p.pendingSize.Load() > other.pendingSize.Load()
}
2 changes: 1 addition & 1 deletion utils/dynstream/parallel_dynamic_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (s *parallelDynamicStream[A, P, T, D, H]) AddPath(path P, dest D, as ...Are
s.pathMap[path] = pi
s.setMemControl(pi, as...)

pi.stream.in() <- eventWrap[A, P, T, D, H]{pathInfo: pi, newPath: true}
pi.stream.addPath(pi)

s._statAddPathCount.Add(1)
return nil
Expand Down
Loading

0 comments on commit 1f85c7f

Please sign in to comment.