diff --git a/downstreamadapter/dispatchermanager/event_dispatcher_manager.go b/downstreamadapter/dispatchermanager/event_dispatcher_manager.go index 11d2833b..68e7fd95 100644 --- a/downstreamadapter/dispatchermanager/event_dispatcher_manager.go +++ b/downstreamadapter/dispatchermanager/event_dispatcher_manager.go @@ -267,6 +267,8 @@ func (e *EventDispatcherManager) close(removeChangefeed bool) { metrics.CreateDispatcherDuration.DeleteLabelValues(e.changefeedID.Namespace(), e.changefeedID.Name()) metrics.EventDispatcherManagerCheckpointTsGauge.DeleteLabelValues(e.changefeedID.Namespace(), e.changefeedID.Name()) metrics.EventDispatcherManagerResolvedTsGauge.DeleteLabelValues(e.changefeedID.Namespace(), e.changefeedID.Name()) + metrics.ChangefeedCheckpointTsLagGauge.DeleteLabelValues(e.changefeedID.Namespace(), e.changefeedID.Name()) + metrics.ChangefeedResolvedTsLagGauge.DeleteLabelValues(e.changefeedID.Namespace(), e.changefeedID.Name()) e.closed.Store(true) log.Info("event dispatcher manager closed", zap.Stringer("changefeedID", e.changefeedID)) diff --git a/maintainer/maintainer.go b/maintainer/maintainer.go index 40008f07..8470078a 100644 --- a/maintainer/maintainer.go +++ b/maintainer/maintainer.go @@ -310,6 +310,8 @@ func (m *Maintainer) initialize() error { func (m *Maintainer) cleanupMetrics() { metrics.ChangefeedCheckpointTsGauge.DeleteLabelValues(m.id.Namespace(), m.id.Name()) metrics.ChangefeedCheckpointTsLagGauge.DeleteLabelValues(m.id.Namespace(), m.id.Name()) + metrics.ChangefeedResolvedTsGauge.DeleteLabelValues(m.id.Namespace(), m.id.Name()) + metrics.ChangefeedResolvedTsLagGauge.DeleteLabelValues(m.id.Namespace(), m.id.Name()) metrics.ChangefeedStatusGauge.DeleteLabelValues(m.id.Namespace(), m.id.Name()) metrics.ScheduleTaskGuage.DeleteLabelValues(m.id.Namespace(), m.id.Name()) metrics.RunningScheduleTaskGauge.DeleteLabelValues(m.id.Namespace(), m.id.Name()) diff --git a/maintainer/maintainer_manager.go b/maintainer/maintainer_manager.go index a2dd70c8..25cabcc0 100644 --- a/maintainer/maintainer_manager.go +++ b/maintainer/maintainer_manager.go @@ -82,7 +82,7 @@ func NewMaintainerManager(selfNode *node.Info, tsoClient: pdClient, regionCache: regionCache, } - m.stream = dynstream.NewParallelDynamicStream(func(path common.GID) uint64 { return path.FastHash() }, NewStreamHandler()) + m.stream = dynstream.NewDynamicStream(NewStreamHandler()) m.stream.Start() mc.RegisterHandler(messaging.MaintainerManagerTopic, m.recvMessages) diff --git a/utils/dynstream/dynamic_stream_test.go b/utils/dynstream/dynamic_stream_test.go index 1cd357db..bc3c0fac 100644 --- a/utils/dynstream/dynamic_stream_test.go +++ b/utils/dynstream/dynamic_stream_test.go @@ -360,8 +360,8 @@ func TestDynamicStreamRemovePath(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(1) // Only one event is processed - ds.Push("p1", &simpleEvent{path: "p1", wg: wg}) - ds.Push("p1", &simpleEvent{path: "p1", wg: wg}) + ds.Push("path", &simpleEvent{path: "p1", wg: wg}) + ds.Push("path", &simpleEvent{path: "p1", wg: wg}) // The case is good if it doesn't panic // Sleep is to make sure events are actually processed @@ -536,13 +536,13 @@ func TestDynamicStreamOrder(t *testing.T) { wg := &sync.WaitGroup{} wg.Add(6) - ds.Push("p1", &testOrder{path: "p1", id: 1, timestamp: 1, wg: wg}) - ds.Push("p2", &testOrder{path: "p2", id: 2, timestamp: 3, wg: wg}) - ds.Push("p3", &testOrder{path: "p3", id: 3, timestamp: 6, wg: wg}) + ds.Push("path", &testOrder{path: "p1", id: 1, timestamp: 1, wg: wg}) + ds.Push("path", &testOrder{path: "p2", id: 2, timestamp: 3, wg: wg}) + ds.Push("path", &testOrder{path: "p3", id: 3, timestamp: 6, wg: wg}) - ds.Push("p1", &testOrder{path: "p1", id: 4, timestamp: 2, wg: wg}) - ds.Push("p2", &testOrder{path: "p2", id: 5, timestamp: 4, wg: wg}) - ds.Push("p3", &testOrder{path: "p3", id: 6, timestamp: 5, wg: wg}) + ds.Push("path", &testOrder{path: "p1", id: 4, timestamp: 2, wg: wg}) + ds.Push("path", &testOrder{path: "p2", id: 5, timestamp: 4, wg: wg}) + ds.Push("path", &testOrder{path: "p3", id: 6, timestamp: 5, wg: wg}) time.Sleep(10 * time.Millisecond) // Make sure all the events are in the pending queue hwg.Done() diff --git a/utils/dynstream/interfaces.go b/utils/dynstream/interfaces.go index 7fad227a..2fd44255 100644 --- a/utils/dynstream/interfaces.go +++ b/utils/dynstream/interfaces.go @@ -184,7 +184,7 @@ type Option struct { ReportInterval time.Duration // The interval of reporting the status of stream, the status is used by the scheduler. StreamCount int // The count of streams. I.e. the count of goroutines to handle events. By default 0, means runtime.NumCPU(). - BatchCount int // The batch count of handling events. <= 1 means no batch. By default 1. + BatchCount int // The batch size of handling events. <= 1 means no batch. By default 1. BatchBytes int // The max bytes of the batch. <= 1 means no limit. By default 0. EnableMemoryControl bool // Enable the memory control. By default false. @@ -249,17 +249,14 @@ func (f *Feedback[A, P, D]) String() string { return fmt.Sprintf("DynamicStream Feedback{Area: %v, Path: %v, Pause: %v}", f.Area, f.Path, f.Pause) } -// NewDynamicStream creates a new dynamic stream with a single stream by default. func NewDynamicStream[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]](handler H, option ...Option) DynamicStream[A, P, T, D, H] { opt := NewOption() - opt.StreamCount = 1 if len(option) > 0 { opt = option[0] } - return newParallelDynamicStream(func(path P) uint64 { return 0 }, handler, opt) + return newDynamicStreamImpl(handler, opt) } -// NewParallelDynamicStream creates a new dynamic stream with CPU number streams by default. func NewParallelDynamicStream[A Area, P Path, T Event, D Dest, H Handler[A, P, T, D]](hasher PathHasher[P], handler H, option ...Option) DynamicStream[A, P, T, D, H] { opt := NewOption() if len(option) > 0 {