From 2592bc6126b6a10d2f59349bea356e6ea7708e1f Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Fri, 15 Nov 2024 01:08:11 +0800 Subject: [PATCH 1/2] fix --- maintainer/maintainer.go | 8 ++-- pkg/eventservice/event_broker.go | 75 ++++++++++++++++++++------------ pkg/eventservice/helper.go | 16 ++++++- pkg/metrics/event_service.go | 9 ++++ 4 files changed, 73 insertions(+), 35 deletions(-) diff --git a/maintainer/maintainer.go b/maintainer/maintainer.go index 3f62515f..76b7049a 100644 --- a/maintainer/maintainer.go +++ b/maintainer/maintainer.go @@ -431,13 +431,13 @@ func (m *Maintainer) calCheckpointTs() { func (m *Maintainer) updateMetrics() { phyCkpTs := oracle.ExtractPhysical(m.watermark.CheckpointTs) m.changefeedCheckpointTsGauge.Set(float64(phyCkpTs)) - lag := (oracle.GetPhysical(time.Now()) - phyCkpTs) / 1e3 - m.changefeedCheckpointTsLagGauge.Set(float64(lag)) + lag := float64(oracle.GetPhysical(time.Now())-phyCkpTs) / 1e3 + m.changefeedCheckpointTsLagGauge.Set(lag) phyResolvedTs := oracle.ExtractPhysical(m.watermark.ResolvedTs) m.changefeedResolvedTsGauge.Set(float64(phyResolvedTs)) - lag = (oracle.GetPhysical(time.Now()) - phyResolvedTs) / 1e3 - m.changefeedResolvedTsLagGauge.Set(float64(lag)) + lag = float64(oracle.GetPhysical(time.Now())-phyResolvedTs) / 1e3 + m.changefeedResolvedTsLagGauge.Set(lag) m.changefeedStatusGauge.Set(float64(m.state)) } diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index 37f3abc2..04ad33c2 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -2,6 +2,7 @@ package eventservice import ( "context" + "runtime" "strconv" "sync" "sync/atomic" @@ -35,6 +36,7 @@ var metricEventServiceSendEventDuration = metrics.EventServiceSendEventDuration. var metricEventBrokerDropTaskCount = metrics.EventServiceDropScanTaskCount var metricEventBrokerDropResolvedTsCount = metrics.EventServiceDropResolvedTsCount var metricScanTaskQueueDuration = metrics.EventServiceScanTaskQueueDuration +var metricEventBrokerHandleDuration = metrics.EventServiceHandleDuration // eventBroker get event from the eventStore, and send the event to the dispatchers. // Every TiDB cluster has a eventBroker. @@ -57,10 +59,6 @@ type eventBroker struct { // taskPool is used to store the scan tasks and merge the tasks of same dispatcher. // TODO: Make it support merge the tasks of the same table span, even if the tasks are from different dispatchers. taskPool *scanTaskPool - - // GID here is the internal changefeedID, use to identify the area of the dispatcher. - ds dynstream.DynamicStream[common.GID, common.DispatcherID, scanTask, *eventBroker, *dispatcherEventsHandler] - // scanWorkerCount is the number of the scan workers to spawn. scanWorkerCount int @@ -111,7 +109,6 @@ func newEventBroker( msgSender: mc, taskPool: newScanTaskPool(), scanWorkerCount: defaultScanWorkerCount, - ds: ds, messageCh: make(chan wrapEvent, defaultChannelSize), resolvedTsCaches: make(map[node.ID]*resolvedTsCache), cancel: cancel, @@ -141,13 +138,11 @@ func (c *eventBroker) sendWatermark( counter prometheus.Counter, ) { c.emitSyncPointEventIfNeeded(watermark, d, server) - re := pevent.NewResolvedEvent(watermark, d.info.GetID()) resolvedEvent := newWrapResolvedEvent( server, re, d.getEventSenderState()) - select { case c.messageCh <- resolvedEvent: if counter != nil { @@ -177,18 +172,24 @@ func (c *eventBroker) runScanWorker(ctx context.Context) { } func (c *eventBroker) runGenTasks(ctx context.Context) { - c.wg.Add(1) - go func() { - defer c.wg.Done() - for { - select { - case <-ctx.Done(): - return - case stat := <-c.notifyCh: - c.ds.In() <- newScanTask(stat) + workerCnt := runtime.NumCPU() + c.wg.Add(workerCnt) + for i := 0; i < workerCnt; i++ { + go func() { + defer c.wg.Done() + for { + select { + case <-ctx.Done(): + return + case stat := <-c.notifyCh: + //log.Info("receive dispatcher stat", zap.Stringer("dispatcher", stat.info.GetID())) + //stat.watermark.Store(stat.resolvedTs.Load()) + task := newScanTask(stat) + doHandle(c, task) + } } - } - }() + }() + } } // TODO: maybe event driven model is better. It is coupled with the detail implementation of @@ -261,10 +262,6 @@ func (c *eventBroker) sendDDL(ctx context.Context, remoteID node.ID, e pevent.DD } } -func (c *eventBroker) wakeDispatcher(dispatcherID common.DispatcherID) { - c.ds.Wake() <- dispatcherID -} - // checkNeedScan checks if the dispatcher needs to scan the event store. // If the dispatcher needs to scan the event store, it returns true. // If the dispatcher does not need to scan the event store, it send the watermark to the dispatcher @@ -354,7 +351,9 @@ func (c *eventBroker) doScan(ctx context.Context, task scanTask) { remoteID := node.ID(task.dispatcherStat.info.GetServerID()) dispatcherID := task.dispatcherStat.info.GetID() - defer c.wakeDispatcher(dispatcherID) + log.Info("start to scan", zap.Stringer("dispatcher", dispatcherID), zap.Bool("isRunning", task.dispatcherStat.isRunning.Load())) + + defer task.dispatcherStat.isHandling.CompareAndSwap(true, false) // If the target is not ready to send, we don't need to scan the event store. // To avoid the useless scan task. if !c.msgSender.IsReadyToSend(remoteID) { @@ -567,6 +566,7 @@ func (c *eventBroker) updateMetrics(ctx context.Context) { case <-ticker.C: pullerMinResolvedTs := uint64(0) dispatcherMinWaterMark := uint64(0) + var slowestDispatcher *dispatcherStat c.dispatchers.Range(func(key, value interface{}) bool { dispatcher := value.(*dispatcherStat) resolvedTs := dispatcher.resolvedTs.Load() @@ -576,6 +576,7 @@ func (c *eventBroker) updateMetrics(ctx context.Context) { watermark := dispatcher.watermark.Load() if dispatcherMinWaterMark == 0 || watermark < dispatcherMinWaterMark { dispatcherMinWaterMark = watermark + slowestDispatcher = dispatcher } return true }) @@ -586,9 +587,9 @@ func (c *eventBroker) updateMetrics(ctx context.Context) { lag := float64(oracle.GetPhysical(time.Now())-phyResolvedTs) / 1e3 c.metricEventServicePullerResolvedTs.Set(float64(phyResolvedTs)) c.metricEventServiceResolvedTsLag.Set(lag) - lag = float64(oracle.GetPhysical(time.Now())-oracle.ExtractPhysical(dispatcherMinWaterMark)) / 1e3 c.metricEventServiceDispatcherResolvedTs.Set(lag) + log.Warn("slowest dispatcher in event broker", zap.Stringer("dispatcher", slowestDispatcher.info.GetID()), zap.Uint64("watermark", slowestDispatcher.watermark.Load()), zap.Uint64("seq", slowestDispatcher.seq.Load())) } } }() @@ -620,15 +621,19 @@ func (c *eventBroker) updateDispatcherSendTs(ctx context.Context) { func (c *eventBroker) close() { c.cancel() c.wg.Wait() - c.ds.Close() } func (c *eventBroker) onNotify(d *dispatcherStat, resolvedTs uint64) { if d.onSubscriptionResolvedTs(resolvedTs) { // Note: don't block the caller of this function. - select { - case c.notifyCh <- d: - default: + if d.isHandling.CompareAndSwap(false, true) { + // c.notifyCh <- d + select { + case c.notifyCh <- d: + default: + d.isHandling.Store(false) + metricEventBrokerDropTaskCount.Inc() + } } } } @@ -690,7 +695,6 @@ func (c *eventBroker) addDispatcher(info DispatcherInfo) { } dispatcher.updateTableInfo(tableInfo) eventStoreRegisterDuration := time.Since(start) - c.ds.AddPath(id, c, dynstream.AreaSettings{}) log.Info("register dispatcher", zap.Uint64("clusterID", c.tidbClusterID), zap.Any("dispatcherID", id), zap.Int64("tableID", span.TableID), @@ -728,6 +732,7 @@ func (c *eventBroker) resumeDispatcher(dispatcherInfo DispatcherInfo) { log.Info("resume dispatcher", zap.Any("dispatcher", stat.info.GetID()), zap.Uint64("checkpointTs", stat.watermark.Load()), zap.Uint64("seq", stat.seq.Load())) // Reset the watermark to the startTs of the dispatcherInfo. stat.isRunning.Store(true) + stat.isHandling.Store(false) } func (c *eventBroker) resetDispatcher(dispatcherInfo DispatcherInfo) { @@ -772,6 +777,8 @@ type dispatcherStat struct { nextSyncPoint uint64 syncPointInterval time.Duration + isHandling atomic.Bool + metricSorterOutputEventCountKV prometheus.Counter metricEventServiceSendKvCount prometheus.Counter metricEventServiceSendDDLCount prometheus.Counter @@ -801,6 +808,7 @@ func newDispatcherStat( dispStat.resolvedTs.Store(startTs) dispStat.watermark.Store(startTs) dispStat.isRunning.Store(true) + dispStat.isHandling.Store(false) return dispStat } @@ -860,6 +868,8 @@ type scanTaskPool struct { // pendingTaskQueue is used to store the tasks that are waiting to be handled by the scan workers. // The length of the pendingTaskQueue is equal to the number of the scan workers. pendingTaskQueue chan scanTask + + lastPrint time.Time } func newScanTaskPool() *scanTaskPool { @@ -873,9 +883,16 @@ func newScanTaskPool() *scanTaskPool { func (p *scanTaskPool) pushTask(task scanTask) bool { select { case p.pendingTaskQueue <- task: + if time.Since(p.lastPrint) > time.Second*10 { + log.Info("push task to scanTaskPool", zap.Int("pendingTaskQueue", len(p.pendingTaskQueue))) + p.lastPrint = time.Now() + } return true default: metricEventBrokerDropTaskCount.Inc() + if !task.dispatcherStat.isHandling.CompareAndSwap(true, false) { + panic("the dispatcher is not handling, but the task is dropped") + } // If the queue is full, we just drop the task return false } diff --git a/pkg/eventservice/helper.go b/pkg/eventservice/helper.go index 70bc28b9..6688bd9b 100644 --- a/pkg/eventservice/helper.go +++ b/pkg/eventservice/helper.go @@ -1,6 +1,8 @@ package eventservice import ( + "time" + "github.com/pingcap/log" "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/utils/dynstream" @@ -21,10 +23,18 @@ func (h *dispatcherEventsHandler) Handle(broker *eventBroker, tasks ...scanTask) if len(tasks) != 1 { log.Panic("only one task is allowed") } - task := tasks[0] + return doHandle(broker, tasks[0]) +} + +func doHandle(broker *eventBroker, task scanTask) bool { + startTime := time.Now() + defer func() { + metricEventBrokerHandleDuration.Observe(float64(time.Since(startTime).Milliseconds())) + }() needScan, _ := broker.checkNeedScan(task) if !needScan { task.handle() + task.dispatcherStat.isHandling.CompareAndSwap(true, false) return false } // The dispatcher has new events. We need to push the task to the task pool. @@ -47,4 +57,6 @@ func (h *dispatcherEventsHandler) GetArea(path common.DispatcherID, dest *eventB } func (h *dispatcherEventsHandler) GetTimestamp(event scanTask) dynstream.Timestamp { return 0 } func (h *dispatcherEventsHandler) IsPaused(event scanTask) bool { return false } -func (h *dispatcherEventsHandler) OnDrop(event scanTask) {} +func (h *dispatcherEventsHandler) OnDrop(event scanTask) { + event.handle() +} diff --git a/pkg/metrics/event_service.go b/pkg/metrics/event_service.go index 778e3b07..0767e948 100644 --- a/pkg/metrics/event_service.go +++ b/pkg/metrics/event_service.go @@ -93,6 +93,14 @@ var ( Help: "The duration of a scan task being queued", Buckets: prometheus.DefBuckets, }) + EventServiceHandleDuration = prometheus.NewHistogram( + prometheus.HistogramOpts{ + Namespace: "ticdc", + Subsystem: "event_service", + Name: "handle_duration", + Help: "The duration of handling a scan task", + Buckets: prometheus.DefBuckets, + }) ) // InitMetrics registers all metrics in this file. @@ -107,4 +115,5 @@ func InitEventServiceMetrics(registry *prometheus.Registry) { registry.MustRegister(EventServiceDropScanTaskCount) registry.MustRegister(EventServiceDropResolvedTsCount) registry.MustRegister(EventServiceScanTaskQueueDuration) + registry.MustRegister(EventServiceHandleDuration) } From 87feefe409ce3370d49979723eb385f89e0a35c4 Mon Sep 17 00:00:00 2001 From: CharlesCheung <61726649+CharlesCheung96@users.noreply.github.com> Date: Fri, 15 Nov 2024 01:35:33 +0800 Subject: [PATCH 2/2] use 1m channel --- pkg/eventservice/event_broker.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/eventservice/event_broker.go b/pkg/eventservice/event_broker.go index 04ad33c2..715cd5ed 100644 --- a/pkg/eventservice/event_broker.go +++ b/pkg/eventservice/event_broker.go @@ -103,7 +103,7 @@ func newEventBroker( eventStore: eventStore, mounter: pevent.NewMounter(tz), schemaStore: schemaStore, - notifyCh: make(chan *dispatcherStat, defaultChannelSize*16), + notifyCh: make(chan *dispatcherStat, 1024*1024), dispatchers: sync.Map{}, tableTriggerDispatchers: sync.Map{}, msgSender: mc,