Skip to content

Commit

Permalink
puller: fix metrics (#692)
Browse files Browse the repository at this point in the history
Signed-off-by: dongmen <[email protected]>
  • Loading branch information
asddongmen authored Dec 18, 2024
1 parent 21d100f commit 2d950b4
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 15 deletions.
6 changes: 3 additions & 3 deletions coordinator/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type Controller struct {

nodeChanged *atomic.Bool

cfScheduller *scheduler.Controller
cfScheduler *scheduler.Controller
operatorController *operator.Controller
changefeedDB *changefeed.ChangefeedDB
messageCenter messaging.MessageCenter
Expand Down Expand Up @@ -94,7 +94,7 @@ func NewController(
version: version,
batchSize: batchSize,
bootstrapped: atomic.NewBool(false),
cfScheduller: scheduler.NewController(map[string]scheduler.Scheduler{
cfScheduler: scheduler.NewController(map[string]scheduler.Scheduler{
scheduler.BasicScheduler: scheduler.NewBasicScheduler(selfNode.ID.String(), batchSize, oc, changefeedDB, nodeManager, oc.NewAddMaintainerOperator),
scheduler.BalanceScheduler: scheduler.NewBalanceScheduler(selfNode.ID.String(), batchSize, oc, changefeedDB, nodeManager, balanceInterval, oc.NewMoveMaintainerOperator),
}),
Expand Down Expand Up @@ -358,7 +358,7 @@ func (c *Controller) FinishBootstrap(workingMap map[common.ChangeFeedID]remoteMa
}

// start operator and scheduler
c.taskHandlers = append(c.taskHandlers, c.cfScheduller.Start(c.taskScheduler)...)
c.taskHandlers = append(c.taskHandlers, c.cfScheduler.Start(c.taskScheduler)...)
operatorControllerHandle := c.taskScheduler.Submit(c.operatorController, time.Now())
c.taskHandlers = append(c.taskHandlers, operatorControllerHandle)
c.bootstrapped.Store(true)
Expand Down
21 changes: 16 additions & 5 deletions coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,14 @@ import (
// coordinator implements the Coordinator interface
type coordinator struct {
nodeInfo *node.Info
initialized bool
version int64
lastTickTime time.Time

controller *Controller

mc messaging.MessageCenter
stream dynstream.DynamicStream[int, string, *Event, *Controller, *StreamHandler]
taskScheduler threadpool.ThreadPool
controller *Controller

gcManager gc.Manager
pdClient pd.Client
Expand Down Expand Up @@ -85,9 +85,20 @@ func New(node *node.Info,
c.stream.Start()
c.taskScheduler = threadpool.NewThreadPoolDefault()

ctl := NewController(c.version, c.nodeInfo, c.updatedChangefeedCh, c.stateChangedCh, backend, c.stream, c.taskScheduler, batchSize, balanceCheckInterval)
c.controller = ctl
if err := c.stream.AddPath("coordinator", ctl); err != nil {
controller := NewController(
c.version,
c.nodeInfo,
c.updatedChangefeedCh,
c.stateChangedCh,
backend,
c.stream,
c.taskScheduler,
batchSize,
balanceCheckInterval,
)

c.controller = controller
if err := c.stream.AddPath("coordinator", controller); err != nil {
log.Panic("failed to add path",
zap.Error(err))
}
Expand Down
7 changes: 7 additions & 0 deletions logservice/logpuller/region_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,16 @@ import (
"github.com/pingcap/kvproto/pkg/cdcpb"
"github.com/pingcap/log"
"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/metrics"
"github.com/pingcap/ticdc/utils/dynstream"
"go.uber.org/zap"
)

var (
metricsResolvedTsCount = metrics.PullerEventCounter.WithLabelValues("resolved_ts")
metricsEventCount = metrics.PullerEventCounter.WithLabelValues("event")
)

const (
DataGroupResolvedTs = 1
DataGroupEntries = 2
Expand Down Expand Up @@ -79,6 +85,7 @@ func (h *regionEventHandler) Handle(span *subscribedSpan, events ...regionEvent)
}
}
if len(span.kvEventsCache) > 0 {
metricsEventCount.Add(float64(len(span.kvEventsCache)))
await := span.consumeKVEvents(span.kvEventsCache, func() {
span.clearKVEventsCache()
h.subClient.wakeSubscription(span.subID)
Expand Down
1 change: 1 addition & 0 deletions logservice/logpuller/region_request_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ func (s *regionRequestWorker) dispatchRegionChangeEvents(ctx context.Context, ev

func (s *regionRequestWorker) dispatchResolvedTsEvent(resolvedTsEvent *cdcpb.ResolvedTs) {
subscriptionID := SubscriptionID(resolvedTsEvent.RequestId)
metricsResolvedTsCount.Add(float64(len(resolvedTsEvent.Regions)))
s.client.metrics.batchResolvedSize.Observe(float64(len(resolvedTsEvent.Regions)))
for _, regionID := range resolvedTsEvent.Regions {
if state := s.getRegionState(subscriptionID, regionID); state != nil {
Expand Down
17 changes: 10 additions & 7 deletions maintainer/maintainer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ func NewMaintainerManager(selfNode *node.Info,
}
m.stream = dynstream.NewDynamicStream(NewStreamHandler())
m.stream.Start()
mc.RegisterHandler(messaging.MaintainerManagerTopic, m.recvMessages)

mc.RegisterHandler(messaging.MaintainerManagerTopic, m.recvMessages)
mc.RegisterHandler(messaging.MaintainerTopic,
func(ctx context.Context, msg *messaging.TargetMessage) error {
req := msg.Message[0].(*heartbeatpb.MaintainerCloseResponse)
Expand All @@ -94,12 +94,13 @@ func NewMaintainerManager(selfNode *node.Info,
return m
}

// recvMessages is the message handler for maintainer manager
func (m *Manager) recvMessages(ctx context.Context, msg *messaging.TargetMessage) error {
switch msg.Type {
// receive message from coordinator
case messaging.TypeAddMaintainerRequest, messaging.TypeRemoveMaintainerRequest:
fallthrough
case messaging.TypeCoordinatorBootstrapRequest:
// Coordinator related messages
case messaging.TypeAddMaintainerRequest,
messaging.TypeRemoveMaintainerRequest,
messaging.TypeCoordinatorBootstrapRequest:
select {
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -145,7 +146,6 @@ func (m *Manager) Run(ctx context.Context) error {
case <-ticker.C:
//1. try to send heartbeat to coordinator
m.sendHeartbeat()

//2. cleanup removed maintainers
m.maintainers.Range(func(key, value interface{}) bool {
cf := value.(*Maintainer)
Expand Down Expand Up @@ -204,6 +204,7 @@ func (m *Manager) onCoordinatorBootstrapRequest(msg *messaging.TargetMessage) {
m.maintainers.Range(func(key, value interface{}) bool {
maintainer := value.(*Maintainer)
response.Statuses = append(response.Statuses, maintainer.GetMaintainerStatus())
// fizz: 这有什么用?
maintainer.statusChanged.Store(false)
maintainer.lastReportTime = time.Now()
return true
Expand All @@ -212,6 +213,7 @@ func (m *Manager) onCoordinatorBootstrapRequest(msg *messaging.TargetMessage) {
msg = m.newCoordinatorTopicMessage(response)
err := m.mc.SendCommand(msg)
if err != nil {
// fizz: 为什么不用重发?
log.Warn("send command failed", zap.Error(err))
}
log.Info("new coordinator online",
Expand Down Expand Up @@ -327,7 +329,8 @@ func (m *Manager) handleMessage(msg *messaging.TargetMessage) {
case messaging.TypeCoordinatorBootstrapRequest:
log.Info("received coordinator bootstrap request", zap.String("from", msg.From.String()))
m.onCoordinatorBootstrapRequest(msg)
case messaging.TypeAddMaintainerRequest, messaging.TypeRemoveMaintainerRequest:
case messaging.TypeAddMaintainerRequest,
messaging.TypeRemoveMaintainerRequest:
if m.coordinatorVersion > 0 {
status := m.onDispatchMaintainerRequest(msg)
if status == nil {
Expand Down

0 comments on commit 2d950b4

Please sign in to comment.