Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] eventService: reduce resolvedTs lag #526

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions maintainer/maintainer.go
Original file line number Diff line number Diff line change
Expand Up @@ -431,13 +431,13 @@ func (m *Maintainer) calCheckpointTs() {
func (m *Maintainer) updateMetrics() {
phyCkpTs := oracle.ExtractPhysical(m.watermark.CheckpointTs)
m.changefeedCheckpointTsGauge.Set(float64(phyCkpTs))
lag := (oracle.GetPhysical(time.Now()) - phyCkpTs) / 1e3
m.changefeedCheckpointTsLagGauge.Set(float64(lag))
lag := float64(oracle.GetPhysical(time.Now())-phyCkpTs) / 1e3
m.changefeedCheckpointTsLagGauge.Set(lag)

phyResolvedTs := oracle.ExtractPhysical(m.watermark.ResolvedTs)
m.changefeedResolvedTsGauge.Set(float64(phyResolvedTs))
lag = (oracle.GetPhysical(time.Now()) - phyResolvedTs) / 1e3
m.changefeedResolvedTsLagGauge.Set(float64(lag))
lag = float64(oracle.GetPhysical(time.Now())-phyResolvedTs) / 1e3
m.changefeedResolvedTsLagGauge.Set(lag)

m.changefeedStatusGauge.Set(float64(m.state))
}
Expand Down
77 changes: 47 additions & 30 deletions pkg/eventservice/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package eventservice

import (
"context"
"runtime"
"strconv"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -35,6 +36,7 @@ var metricEventServiceSendEventDuration = metrics.EventServiceSendEventDuration.
var metricEventBrokerDropTaskCount = metrics.EventServiceDropScanTaskCount
var metricEventBrokerDropResolvedTsCount = metrics.EventServiceDropResolvedTsCount
var metricScanTaskQueueDuration = metrics.EventServiceScanTaskQueueDuration
var metricEventBrokerHandleDuration = metrics.EventServiceHandleDuration

// eventBroker get event from the eventStore, and send the event to the dispatchers.
// Every TiDB cluster has a eventBroker.
Expand All @@ -57,10 +59,6 @@ type eventBroker struct {
// taskPool is used to store the scan tasks and merge the tasks of same dispatcher.
// TODO: Make it support merge the tasks of the same table span, even if the tasks are from different dispatchers.
taskPool *scanTaskPool

// GID here is the internal changefeedID, use to identify the area of the dispatcher.
ds dynstream.DynamicStream[common.GID, common.DispatcherID, scanTask, *eventBroker, *dispatcherEventsHandler]

// scanWorkerCount is the number of the scan workers to spawn.
scanWorkerCount int

Expand Down Expand Up @@ -105,13 +103,12 @@ func newEventBroker(
eventStore: eventStore,
mounter: pevent.NewMounter(tz),
schemaStore: schemaStore,
notifyCh: make(chan *dispatcherStat, defaultChannelSize*16),
notifyCh: make(chan *dispatcherStat, 1024*1024),
dispatchers: sync.Map{},
tableTriggerDispatchers: sync.Map{},
msgSender: mc,
taskPool: newScanTaskPool(),
scanWorkerCount: defaultScanWorkerCount,
ds: ds,
messageCh: make(chan wrapEvent, defaultChannelSize),
resolvedTsCaches: make(map[node.ID]*resolvedTsCache),
cancel: cancel,
Expand Down Expand Up @@ -141,13 +138,11 @@ func (c *eventBroker) sendWatermark(
counter prometheus.Counter,
) {
c.emitSyncPointEventIfNeeded(watermark, d, server)

re := pevent.NewResolvedEvent(watermark, d.info.GetID())
resolvedEvent := newWrapResolvedEvent(
server,
re,
d.getEventSenderState())

select {
case c.messageCh <- resolvedEvent:
if counter != nil {
Expand Down Expand Up @@ -177,18 +172,24 @@ func (c *eventBroker) runScanWorker(ctx context.Context) {
}

func (c *eventBroker) runGenTasks(ctx context.Context) {
c.wg.Add(1)
go func() {
defer c.wg.Done()
for {
select {
case <-ctx.Done():
return
case stat := <-c.notifyCh:
c.ds.In() <- newScanTask(stat)
workerCnt := runtime.NumCPU()
c.wg.Add(workerCnt)
for i := 0; i < workerCnt; i++ {
go func() {
defer c.wg.Done()
for {
select {
case <-ctx.Done():
return
case stat := <-c.notifyCh:
//log.Info("receive dispatcher stat", zap.Stringer("dispatcher", stat.info.GetID()))
//stat.watermark.Store(stat.resolvedTs.Load())
task := newScanTask(stat)
doHandle(c, task)
}
}
}
}()
}()
}
}

// TODO: maybe event driven model is better. It is coupled with the detail implementation of
Expand Down Expand Up @@ -261,10 +262,6 @@ func (c *eventBroker) sendDDL(ctx context.Context, remoteID node.ID, e pevent.DD
}
}

func (c *eventBroker) wakeDispatcher(dispatcherID common.DispatcherID) {
c.ds.Wake() <- dispatcherID
}

// checkNeedScan checks if the dispatcher needs to scan the event store.
// If the dispatcher needs to scan the event store, it returns true.
// If the dispatcher does not need to scan the event store, it send the watermark to the dispatcher
Expand Down Expand Up @@ -354,7 +351,9 @@ func (c *eventBroker) doScan(ctx context.Context, task scanTask) {
remoteID := node.ID(task.dispatcherStat.info.GetServerID())
dispatcherID := task.dispatcherStat.info.GetID()

defer c.wakeDispatcher(dispatcherID)
log.Info("start to scan", zap.Stringer("dispatcher", dispatcherID), zap.Bool("isRunning", task.dispatcherStat.isRunning.Load()))

defer task.dispatcherStat.isHandling.CompareAndSwap(true, false)
// If the target is not ready to send, we don't need to scan the event store.
// To avoid the useless scan task.
if !c.msgSender.IsReadyToSend(remoteID) {
Expand Down Expand Up @@ -567,6 +566,7 @@ func (c *eventBroker) updateMetrics(ctx context.Context) {
case <-ticker.C:
pullerMinResolvedTs := uint64(0)
dispatcherMinWaterMark := uint64(0)
var slowestDispatcher *dispatcherStat
c.dispatchers.Range(func(key, value interface{}) bool {
dispatcher := value.(*dispatcherStat)
resolvedTs := dispatcher.resolvedTs.Load()
Expand All @@ -576,6 +576,7 @@ func (c *eventBroker) updateMetrics(ctx context.Context) {
watermark := dispatcher.watermark.Load()
if dispatcherMinWaterMark == 0 || watermark < dispatcherMinWaterMark {
dispatcherMinWaterMark = watermark
slowestDispatcher = dispatcher
}
return true
})
Expand All @@ -586,9 +587,9 @@ func (c *eventBroker) updateMetrics(ctx context.Context) {
lag := float64(oracle.GetPhysical(time.Now())-phyResolvedTs) / 1e3
c.metricEventServicePullerResolvedTs.Set(float64(phyResolvedTs))
c.metricEventServiceResolvedTsLag.Set(lag)

lag = float64(oracle.GetPhysical(time.Now())-oracle.ExtractPhysical(dispatcherMinWaterMark)) / 1e3
c.metricEventServiceDispatcherResolvedTs.Set(lag)
log.Warn("slowest dispatcher in event broker", zap.Stringer("dispatcher", slowestDispatcher.info.GetID()), zap.Uint64("watermark", slowestDispatcher.watermark.Load()), zap.Uint64("seq", slowestDispatcher.seq.Load()))
}
}
}()
Expand Down Expand Up @@ -620,15 +621,19 @@ func (c *eventBroker) updateDispatcherSendTs(ctx context.Context) {
func (c *eventBroker) close() {
c.cancel()
c.wg.Wait()
c.ds.Close()
}

func (c *eventBroker) onNotify(d *dispatcherStat, resolvedTs uint64) {
if d.onSubscriptionResolvedTs(resolvedTs) {
// Note: don't block the caller of this function.
select {
case c.notifyCh <- d:
default:
if d.isHandling.CompareAndSwap(false, true) {
// c.notifyCh <- d
select {
case c.notifyCh <- d:
default:
d.isHandling.Store(false)
metricEventBrokerDropTaskCount.Inc()
}
}
}
}
Expand Down Expand Up @@ -690,7 +695,6 @@ func (c *eventBroker) addDispatcher(info DispatcherInfo) {
}
dispatcher.updateTableInfo(tableInfo)
eventStoreRegisterDuration := time.Since(start)
c.ds.AddPath(id, c, dynstream.AreaSettings{})

log.Info("register dispatcher", zap.Uint64("clusterID", c.tidbClusterID),
zap.Any("dispatcherID", id), zap.Int64("tableID", span.TableID),
Expand Down Expand Up @@ -728,6 +732,7 @@ func (c *eventBroker) resumeDispatcher(dispatcherInfo DispatcherInfo) {
log.Info("resume dispatcher", zap.Any("dispatcher", stat.info.GetID()), zap.Uint64("checkpointTs", stat.watermark.Load()), zap.Uint64("seq", stat.seq.Load()))
// Reset the watermark to the startTs of the dispatcherInfo.
stat.isRunning.Store(true)
stat.isHandling.Store(false)
}

func (c *eventBroker) resetDispatcher(dispatcherInfo DispatcherInfo) {
Expand Down Expand Up @@ -772,6 +777,8 @@ type dispatcherStat struct {
nextSyncPoint uint64
syncPointInterval time.Duration

isHandling atomic.Bool

metricSorterOutputEventCountKV prometheus.Counter
metricEventServiceSendKvCount prometheus.Counter
metricEventServiceSendDDLCount prometheus.Counter
Expand Down Expand Up @@ -801,6 +808,7 @@ func newDispatcherStat(
dispStat.resolvedTs.Store(startTs)
dispStat.watermark.Store(startTs)
dispStat.isRunning.Store(true)
dispStat.isHandling.Store(false)
return dispStat
}

Expand Down Expand Up @@ -860,6 +868,8 @@ type scanTaskPool struct {
// pendingTaskQueue is used to store the tasks that are waiting to be handled by the scan workers.
// The length of the pendingTaskQueue is equal to the number of the scan workers.
pendingTaskQueue chan scanTask

lastPrint time.Time
}

func newScanTaskPool() *scanTaskPool {
Expand All @@ -873,9 +883,16 @@ func newScanTaskPool() *scanTaskPool {
func (p *scanTaskPool) pushTask(task scanTask) bool {
select {
case p.pendingTaskQueue <- task:
if time.Since(p.lastPrint) > time.Second*10 {
log.Info("push task to scanTaskPool", zap.Int("pendingTaskQueue", len(p.pendingTaskQueue)))
p.lastPrint = time.Now()
}
return true
default:
metricEventBrokerDropTaskCount.Inc()
if !task.dispatcherStat.isHandling.CompareAndSwap(true, false) {
panic("the dispatcher is not handling, but the task is dropped")
}
// If the queue is full, we just drop the task
return false
}
Expand Down
16 changes: 14 additions & 2 deletions pkg/eventservice/helper.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package eventservice

import (
"time"

"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/utils/dynstream"
Expand All @@ -21,10 +23,18 @@ func (h *dispatcherEventsHandler) Handle(broker *eventBroker, tasks ...scanTask)
if len(tasks) != 1 {
log.Panic("only one task is allowed")
}
task := tasks[0]
return doHandle(broker, tasks[0])
}

func doHandle(broker *eventBroker, task scanTask) bool {
startTime := time.Now()
defer func() {
metricEventBrokerHandleDuration.Observe(float64(time.Since(startTime).Milliseconds()))
}()
needScan, _ := broker.checkNeedScan(task)
if !needScan {
task.handle()
task.dispatcherStat.isHandling.CompareAndSwap(true, false)
return false
}
// The dispatcher has new events. We need to push the task to the task pool.
Expand All @@ -47,4 +57,6 @@ func (h *dispatcherEventsHandler) GetArea(path common.DispatcherID, dest *eventB
}
func (h *dispatcherEventsHandler) GetTimestamp(event scanTask) dynstream.Timestamp { return 0 }
func (h *dispatcherEventsHandler) IsPaused(event scanTask) bool { return false }
func (h *dispatcherEventsHandler) OnDrop(event scanTask) {}
func (h *dispatcherEventsHandler) OnDrop(event scanTask) {
event.handle()
}
9 changes: 9 additions & 0 deletions pkg/metrics/event_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ var (
Help: "The duration of a scan task being queued",
Buckets: prometheus.DefBuckets,
})
EventServiceHandleDuration = prometheus.NewHistogram(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "event_service",
Name: "handle_duration",
Help: "The duration of handling a scan task",
Buckets: prometheus.DefBuckets,
})
)

// InitMetrics registers all metrics in this file.
Expand All @@ -107,4 +115,5 @@ func InitEventServiceMetrics(registry *prometheus.Registry) {
registry.MustRegister(EventServiceDropScanTaskCount)
registry.MustRegister(EventServiceDropResolvedTsCount)
registry.MustRegister(EventServiceScanTaskQueueDuration)
registry.MustRegister(EventServiceHandleDuration)
}