Skip to content

Commit

Permalink
maintainer: mainter manager use parallel DS (#706)
Browse files Browse the repository at this point in the history
Signed-off-by: dongmen <[email protected]>
  • Loading branch information
asddongmen authored Dec 20, 2024
1 parent 807962b commit cf291d1
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 3 deletions.
1 change: 0 additions & 1 deletion .github/workflows/uint_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -88,5 +88,4 @@ jobs:
- name: Unit Test
run: |
go test --tags=intest -timeout 120s github.com/pingcap/ticdc/coordinator/...
go test --tags=intest -timeout 120s github.com/pingcap/ticdc/pkg/eventservice...
2 changes: 1 addition & 1 deletion maintainer/maintainer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func NewMaintainerManager(selfNode *node.Info,
tsoClient: pdClient,
regionCache: regionCache,
}
m.stream = dynstream.NewDynamicStream(NewStreamHandler())
m.stream = dynstream.NewParallelDynamicStream(func(path common.GID) uint64 { return path.FastHash() }, NewStreamHandler())
m.stream.Start()

mc.RegisterHandler(messaging.MaintainerManagerTopic, m.recvMessages)
Expand Down
4 changes: 3 additions & 1 deletion utils/dynstream/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,15 +249,17 @@ func (f *Feedback[A, P, D]) String() string {
return fmt.Sprintf("DynamicStream Feedback{Area: %v, Path: %v, Pause: %v}", f.Area, f.Path, f.Pause)
}

// NewDynamicStream creates a new dynamic stream with a single stream by default.
func NewDynamicStream[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]](handler H, option ...Option) DynamicStream[A, P, T, D, H] {
opt := NewOption()
opt.StreamCount = 1
if len(option) > 0 {
opt = option[0]
}
opt.StreamCount = 1
return newParallelDynamicStream(func(path P) uint64 { return 0 }, handler, opt)
}

// NewParallelDynamicStream creates a new dynamic stream with CPU number streams by default.
func NewParallelDynamicStream[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]](hasher PathHasher[P], handler H, option ...Option) DynamicStream[A, P, T, D, H] {
opt := NewOption()
if len(option) > 0 {
Expand Down

0 comments on commit cf291d1

Please sign in to comment.