Skip to content

Commit

Permalink
DS: revert some changes (#707)
Browse files Browse the repository at this point in the history
Signed-off-by: dongmen <[email protected]>
  • Loading branch information
asddongmen authored Dec 20, 2024
1 parent cf291d1 commit aa442e9
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,8 @@ func (e *EventDispatcherManager) close(removeChangefeed bool) {
metrics.CreateDispatcherDuration.DeleteLabelValues(e.changefeedID.Namespace(), e.changefeedID.Name())
metrics.EventDispatcherManagerCheckpointTsGauge.DeleteLabelValues(e.changefeedID.Namespace(), e.changefeedID.Name())
metrics.EventDispatcherManagerResolvedTsGauge.DeleteLabelValues(e.changefeedID.Namespace(), e.changefeedID.Name())
metrics.ChangefeedCheckpointTsLagGauge.DeleteLabelValues(e.changefeedID.Namespace(), e.changefeedID.Name())
metrics.ChangefeedResolvedTsLagGauge.DeleteLabelValues(e.changefeedID.Namespace(), e.changefeedID.Name())

e.closed.Store(true)
log.Info("event dispatcher manager closed", zap.Stringer("changefeedID", e.changefeedID))
Expand Down
2 changes: 2 additions & 0 deletions maintainer/maintainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,8 @@ func (m *Maintainer) initialize() error {
func (m *Maintainer) cleanupMetrics() {
metrics.ChangefeedCheckpointTsGauge.DeleteLabelValues(m.id.Namespace(), m.id.Name())
metrics.ChangefeedCheckpointTsLagGauge.DeleteLabelValues(m.id.Namespace(), m.id.Name())
metrics.ChangefeedResolvedTsGauge.DeleteLabelValues(m.id.Namespace(), m.id.Name())
metrics.ChangefeedResolvedTsLagGauge.DeleteLabelValues(m.id.Namespace(), m.id.Name())
metrics.ChangefeedStatusGauge.DeleteLabelValues(m.id.Namespace(), m.id.Name())
metrics.ScheduleTaskGuage.DeleteLabelValues(m.id.Namespace(), m.id.Name())
metrics.RunningScheduleTaskGauge.DeleteLabelValues(m.id.Namespace(), m.id.Name())
Expand Down
2 changes: 1 addition & 1 deletion maintainer/maintainer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func NewMaintainerManager(selfNode *node.Info,
tsoClient: pdClient,
regionCache: regionCache,
}
m.stream = dynstream.NewParallelDynamicStream(func(path common.GID) uint64 { return path.FastHash() }, NewStreamHandler())
m.stream = dynstream.NewDynamicStream(NewStreamHandler())
m.stream.Start()

mc.RegisterHandler(messaging.MaintainerManagerTopic, m.recvMessages)
Expand Down
16 changes: 8 additions & 8 deletions utils/dynstream/dynamic_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,8 @@ func TestDynamicStreamRemovePath(t *testing.T) {

wg := &sync.WaitGroup{}
wg.Add(1) // Only one event is processed
ds.Push("p1", &simpleEvent{path: "p1", wg: wg})
ds.Push("p1", &simpleEvent{path: "p1", wg: wg})
ds.Push("path", &simpleEvent{path: "p1", wg: wg})
ds.Push("path", &simpleEvent{path: "p1", wg: wg})

// The case is good if it doesn't panic
// Sleep is to make sure events are actually processed
Expand Down Expand Up @@ -536,13 +536,13 @@ func TestDynamicStreamOrder(t *testing.T) {

wg := &sync.WaitGroup{}
wg.Add(6)
ds.Push("p1", &testOrder{path: "p1", id: 1, timestamp: 1, wg: wg})
ds.Push("p2", &testOrder{path: "p2", id: 2, timestamp: 3, wg: wg})
ds.Push("p3", &testOrder{path: "p3", id: 3, timestamp: 6, wg: wg})
ds.Push("path", &testOrder{path: "p1", id: 1, timestamp: 1, wg: wg})
ds.Push("path", &testOrder{path: "p2", id: 2, timestamp: 3, wg: wg})
ds.Push("path", &testOrder{path: "p3", id: 3, timestamp: 6, wg: wg})

ds.Push("p1", &testOrder{path: "p1", id: 4, timestamp: 2, wg: wg})
ds.Push("p2", &testOrder{path: "p2", id: 5, timestamp: 4, wg: wg})
ds.Push("p3", &testOrder{path: "p3", id: 6, timestamp: 5, wg: wg})
ds.Push("path", &testOrder{path: "p1", id: 4, timestamp: 2, wg: wg})
ds.Push("path", &testOrder{path: "p2", id: 5, timestamp: 4, wg: wg})
ds.Push("path", &testOrder{path: "p3", id: 6, timestamp: 5, wg: wg})

time.Sleep(10 * time.Millisecond) // Make sure all the events are in the pending queue
hwg.Done()
Expand Down
7 changes: 2 additions & 5 deletions utils/dynstream/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ type Option struct {
ReportInterval time.Duration // The interval of reporting the status of stream, the status is used by the scheduler.

StreamCount int // The count of streams. I.e. the count of goroutines to handle events. By default 0, means runtime.NumCPU().
BatchCount int // The batch count of handling events. <= 1 means no batch. By default 1.
BatchCount int // The batch size of handling events. <= 1 means no batch. By default 1.
BatchBytes int // The max bytes of the batch. <= 1 means no limit. By default 0.

EnableMemoryControl bool // Enable the memory control. By default false.
Expand Down Expand Up @@ -249,17 +249,14 @@ func (f *Feedback[A, P, D]) String() string {
return fmt.Sprintf("DynamicStream Feedback{Area: %v, Path: %v, Pause: %v}", f.Area, f.Path, f.Pause)
}

// NewDynamicStream creates a new dynamic stream with a single stream by default.
func NewDynamicStream[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]](handler H, option ...Option) DynamicStream[A, P, T, D, H] {
opt := NewOption()
opt.StreamCount = 1
if len(option) > 0 {
opt = option[0]
}
return newParallelDynamicStream(func(path P) uint64 { return 0 }, handler, opt)
return newDynamicStreamImpl(handler, opt)
}

// NewParallelDynamicStream creates a new dynamic stream with CPU number streams by default.
func NewParallelDynamicStream[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]](hasher PathHasher[P], handler H, option ...Option) DynamicStream[A, P, T, D, H] {
opt := NewOption()
if len(option) > 0 {
Expand Down

0 comments on commit aa442e9

Please sign in to comment.