Skip to content

Commit

Permalink
remove notify chan
Browse files Browse the repository at this point in the history
Signed-off-by: dongmen <[email protected]>
  • Loading branch information
asddongmen committed Nov 13, 2024
1 parent 24e442f commit 080cfd6
Showing 1 changed file with 23 additions and 24 deletions.
47 changes: 23 additions & 24 deletions pkg/eventservice/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ type eventBroker struct {
// and a goroutine is responsible for sending the message to the dispatchers.
messageCh chan wrapEvent
resolvedTsCaches map[node.ID]*resolvedTsCache
notifyCh chan *dispatcherStat
//notifyCh chan *dispatcherStat

// wg is used to spawn the goroutines.
wg *sync.WaitGroup
Expand Down Expand Up @@ -102,11 +102,11 @@ func newEventBroker(
ds.Start()

c := &eventBroker{
tidbClusterID: id,
eventStore: eventStore,
mounter: pevent.NewMounter(tz),
schemaStore: schemaStore,
notifyCh: make(chan *dispatcherStat, defaultChannelSize*16),
tidbClusterID: id,
eventStore: eventStore,
mounter: pevent.NewMounter(tz),
schemaStore: schemaStore,
//notifyCh: make(chan *dispatcherStat, defaultChannelSize*16),
dispatchers: sync.Map{},
tableTriggerDispatchers: sync.Map{},
msgSender: mc,
Expand All @@ -130,7 +130,7 @@ func newEventBroker(
c.runSendMessageWorker(ctx)
c.updateMetrics(ctx)
c.updateDispatcherSendTs(ctx)
c.runGenTasks(ctx)
//c.runGenTasks(ctx)
log.Info("new event broker created", zap.Uint64("id", id))
return c
}
Expand Down Expand Up @@ -175,22 +175,21 @@ func (c *eventBroker) runScanWorker(ctx context.Context) {
}
}

func (c *eventBroker) runGenTasks(ctx context.Context) {
c.wg.Add(1)
go func() {
defer c.wg.Done()
for {
select {
case <-ctx.Done():
return
case stat := <-c.notifyCh:
//log.Info("receive dispatcher stat", zap.Stringer("dispatcher", stat.info.GetID()))
stat.watermark.Store(stat.resolvedTs.Load())
//c.ds.In() <- newScanTask(stat)
}
}
}()
}
// func (c *eventBroker) runGenTasks(ctx context.Context) {
// c.wg.Add(1)
// go func() {
// defer c.wg.Done()
// for {
// select {
// case <-ctx.Done():
// return
// case stat := <-c.notifyCh:
// stat.watermark.Store(stat.resolvedTs.Load())
// //c.ds.In() <- newScanTask(stat)
// }
// }
// }()
// }

// TODO: maybe event driven model is better. It is coupled with the detail implementation of
// the schemaStore, we will refactor it later.
Expand Down Expand Up @@ -626,7 +625,7 @@ func (c *eventBroker) close() {
func (c *eventBroker) onNotify(d *dispatcherStat, resolvedTs uint64) {
if d.onSubscriptionResolvedTs(resolvedTs) {
// Note: don't block the caller of this function.
c.notifyCh <- d
c.ds.In() <- newScanTask(d)
}
}

Expand Down

0 comments on commit 080cfd6

Please sign in to comment.