Skip to content

Commit

Permalink
domain: move deltaUpdateTicker/gcstats into a new goroutine
Browse files Browse the repository at this point in the history
  • Loading branch information
winoros committed Jan 21, 2025
1 parent a71c8cc commit 5143a48
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 33 deletions.
113 changes: 81 additions & 32 deletions pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2432,16 +2432,18 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err
do.indexUsageWorker()
}, "indexUsageWorker")
if do.statsLease <= 0 {
// For statsLease > 0, `updateStatsWorker` handles the quit of stats owner.
// For statsLease > 0, `gcStatsWorker` handles the quit of stats owner.
do.wg.Run(func() { quitStatsOwner(do, do.statsOwner) }, "quitStatsOwner")
return nil
}
do.SetStatsUpdating(true)
do.wg.Run(do.asyncLoadHistogram, "asyncLoadHistogram")
// The stats updated worker doesn't require the stats initialization to be completed.
// This is because the updated worker's primary responsibilities are to update the change delta and handle DDL operations.
// The gcStatsWorker/dumpColStatsUsageWorker/deltaUpdateTickerWorker doesn't require the stats initialization to be completed.
// This is because thos workers' primary responsibilities are to update the change delta and handle DDL operations.
// These tasks do not interfere with or depend on the initialization process.
do.wg.Run(func() { do.updateStatsWorker(ctx) }, "updateStatsWorker")
do.wg.Run(do.gcStatsWorker, "gcStatsWorker")
do.wg.Run(do.dumpColStatsUsageWorker, "dumpColStatsUsageWorker")
do.wg.Run(do.deltaUpdateTickerWorker, "deltaUpdateTickerWorker")
// Wait for the stats worker to finish the initialization.
// Otherwise, we may start the auto analyze worker before the stats cache is initialized.
do.wg.Run(
Expand Down Expand Up @@ -2647,60 +2649,66 @@ func (do *Domain) indexUsageWorker() {
}
}

func (do *Domain) updateStatsWorkerExitPreprocessing(statsHandle *handle.Handle) {
func (do *Domain) gcStatsWorkerExitPreprocessing() {
ch := make(chan struct{}, 1)
timeout, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
go func() {
logutil.BgLogger().Info("updateStatsWorker is going to exit, start to flush stats")
statsHandle.FlushStats()
logutil.BgLogger().Info("updateStatsWorker ready to release owner")
logutil.BgLogger().Info("gcStatsWorker ready to release owner")
do.statsOwner.Close()
ch <- struct{}{}
}()
select {
case <-ch:
logutil.BgLogger().Info("updateStatsWorker exit preprocessing finished")
logutil.BgLogger().Info("gcStatsWorker exit preprocessing finished")
return
case <-timeout.Done():
logutil.BgLogger().Warn("gcStatsWorker exit preprocessing timeout, force exiting")
return
}
}

func (*Domain) deltaUpdateTickerWorkerExitPreprocessing(statsHandle *handle.Handle) {
ch := make(chan struct{}, 1)
timeout, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
go func() {
logutil.BgLogger().Info("deltaUpdateTicker is going to exit, start to flush stats")
statsHandle.FlushStats()
ch <- struct{}{}
}()
select {
case <-ch:
logutil.BgLogger().Info("deltaUpdateTicker exit preprocessing finished")
return
case <-timeout.Done():
logutil.BgLogger().Warn("updateStatsWorker exit preprocessing timeout, force exiting")
logutil.BgLogger().Warn("deltaUpdateTicker exit preprocessing timeout, force exiting")
return
}
}

func (do *Domain) updateStatsWorker(_ sessionctx.Context) {
defer util.Recover(metrics.LabelDomain, "updateStatsWorker", nil, false)
logutil.BgLogger().Info("updateStatsWorker started.")
func (do *Domain) gcStatsWorker() {
defer util.Recover(metrics.LabelDomain, "gcStatsWorker", nil, false)
logutil.BgLogger().Info("gcStatsWorker started.")
lease := do.statsLease
// We need to have different nodes trigger tasks at different times to avoid the herd effect.
randDuration := time.Duration(rand.Int63n(int64(time.Minute)))
deltaUpdateTicker := time.NewTicker(20*lease + randDuration)
gcStatsTicker := time.NewTicker(100 * lease)
dumpColStatsUsageTicker := time.NewTicker(100 * lease)
updateStatsHealthyTicker := time.NewTicker(20 * lease)
readMemTicker := time.NewTicker(memory.ReadMemInterval)
statsHandle := do.StatsHandle()
defer func() {
dumpColStatsUsageTicker.Stop()
gcStatsTicker.Stop()
deltaUpdateTicker.Stop()
readMemTicker.Stop()
updateStatsHealthyTicker.Stop()
do.SetStatsUpdating(false)
logutil.BgLogger().Info("updateStatsWorker exited.")
logutil.BgLogger().Info("gcStatsWorker exited.")
}()
defer util.Recover(metrics.LabelDomain, "updateStatsWorker", nil, false)
defer util.Recover(metrics.LabelDomain, "gcStatsWorker", nil, false)

for {
select {
case <-do.exit:
do.updateStatsWorkerExitPreprocessing(statsHandle)
do.gcStatsWorkerExitPreprocessing()
return
case <-deltaUpdateTicker.C:
err := statsHandle.DumpStatsDeltaToKV(false)
if err != nil {
logutil.BgLogger().Warn("dump stats delta failed", zap.Error(err))
}
case <-gcStatsTicker.C:
if !do.statsOwner.IsOwner() {
continue
Expand All @@ -2710,11 +2718,6 @@ func (do *Domain) updateStatsWorker(_ sessionctx.Context) {
logutil.BgLogger().Warn("GC stats failed", zap.Error(err))
}
do.CheckAutoAnalyzeWindows()
case <-dumpColStatsUsageTicker.C:
err := statsHandle.DumpColStatsUsageToKV()
if err != nil {
logutil.BgLogger().Warn("dump column stats usage failed", zap.Error(err))
}
case <-readMemTicker.C:
memory.ForceReadMemStats()
do.StatsHandle().StatsCache.TriggerEvict()
Expand All @@ -2724,6 +2727,52 @@ func (do *Domain) updateStatsWorker(_ sessionctx.Context) {
}
}

func (do *Domain) dumpColStatsUsageWorker() {
logutil.BgLogger().Info("dumpColStatsUsageWorker started.")
lease := do.statsLease
dumpColStatsUsageTicker := time.NewTicker(100 * lease)
statsHandle := do.StatsHandle()
defer func() {
dumpColStatsUsageTicker.Stop()
logutil.BgLogger().Info("dumpColStatsUsageWorker exited.")
}()
defer util.Recover(metrics.LabelDomain, "dumpColStatsUsageWorker", nil, false)

for {
select {
case <-do.exit:
return
case <-dumpColStatsUsageTicker.C:
err := statsHandle.DumpColStatsUsageToKV()
if err != nil {
logutil.BgLogger().Warn("dump column stats usage failed", zap.Error(err))
}
}
}
}

func (do *Domain) deltaUpdateTickerWorker() {
defer util.Recover(metrics.LabelDomain, "deltaUpdateTickerWorker", nil, false)
logutil.BgLogger().Info("deltaUpdateTickerWorker started.")
lease := do.statsLease
// We need to have different nodes trigger tasks at different times to avoid the herd effect.
randDuration := time.Duration(rand.Int63n(int64(time.Minute)))
deltaUpdateTicker := time.NewTicker(20*lease + randDuration)
statsHandle := do.StatsHandle()
for {
select {
case <-do.exit:
do.deltaUpdateTickerWorkerExitPreprocessing(statsHandle)
return
case <-deltaUpdateTicker.C:
err := statsHandle.DumpStatsDeltaToKV(false)
if err != nil {
logutil.BgLogger().Warn("dump stats delta failed", zap.Error(err))
}
}
}
}

func (do *Domain) autoAnalyzeWorker() {
defer util.Recover(metrics.LabelDomain, "autoAnalyzeWorker", nil, false)
statsHandle := do.StatsHandle()
Expand Down
2 changes: 1 addition & 1 deletion pkg/domain/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func TestStatWorkRecoverFromPanic(t *testing.T) {
metrics.PanicCounter.Reset()
// Since the stats lease is 0 now, so create a new ticker will panic.
// Test that they can recover from panic correctly.
dom.updateStatsWorker(mock.NewContext())
dom.gcStatsWorker()
dom.autoAnalyzeWorker()
counter := metrics.PanicCounter.WithLabelValues(metrics.LabelDomain)
pb := &dto.Metric{}
Expand Down

0 comments on commit 5143a48

Please sign in to comment.