From 5143a485b69002d4e70627f2d3515109058083ff Mon Sep 17 00:00:00 2001 From: Yiding Date: Tue, 21 Jan 2025 22:53:16 +0800 Subject: [PATCH] domain: move deltaUpdateTicker/gcstats into a new goroutine --- pkg/domain/domain.go | 113 +++++++++++++++++++++++++++----------- pkg/domain/domain_test.go | 2 +- 2 files changed, 82 insertions(+), 33 deletions(-) diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index cb76a2fe6b586..dff60ec067103 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -2432,16 +2432,18 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err do.indexUsageWorker() }, "indexUsageWorker") if do.statsLease <= 0 { - // For statsLease > 0, `updateStatsWorker` handles the quit of stats owner. + // For statsLease > 0, `gcStatsWorker` handles the quit of stats owner. do.wg.Run(func() { quitStatsOwner(do, do.statsOwner) }, "quitStatsOwner") return nil } do.SetStatsUpdating(true) do.wg.Run(do.asyncLoadHistogram, "asyncLoadHistogram") - // The stats updated worker doesn't require the stats initialization to be completed. - // This is because the updated worker's primary responsibilities are to update the change delta and handle DDL operations. + // The gcStatsWorker/dumpColStatsUsageWorker/deltaUpdateTickerWorker doesn't require the stats initialization to be completed. + // This is because thos workers' primary responsibilities are to update the change delta and handle DDL operations. // These tasks do not interfere with or depend on the initialization process. - do.wg.Run(func() { do.updateStatsWorker(ctx) }, "updateStatsWorker") + do.wg.Run(do.gcStatsWorker, "gcStatsWorker") + do.wg.Run(do.dumpColStatsUsageWorker, "dumpColStatsUsageWorker") + do.wg.Run(do.deltaUpdateTickerWorker, "deltaUpdateTickerWorker") // Wait for the stats worker to finish the initialization. // Otherwise, we may start the auto analyze worker before the stats cache is initialized. do.wg.Run( @@ -2647,60 +2649,66 @@ func (do *Domain) indexUsageWorker() { } } -func (do *Domain) updateStatsWorkerExitPreprocessing(statsHandle *handle.Handle) { +func (do *Domain) gcStatsWorkerExitPreprocessing() { ch := make(chan struct{}, 1) timeout, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() go func() { - logutil.BgLogger().Info("updateStatsWorker is going to exit, start to flush stats") - statsHandle.FlushStats() - logutil.BgLogger().Info("updateStatsWorker ready to release owner") + logutil.BgLogger().Info("gcStatsWorker ready to release owner") do.statsOwner.Close() ch <- struct{}{} }() select { case <-ch: - logutil.BgLogger().Info("updateStatsWorker exit preprocessing finished") + logutil.BgLogger().Info("gcStatsWorker exit preprocessing finished") + return + case <-timeout.Done(): + logutil.BgLogger().Warn("gcStatsWorker exit preprocessing timeout, force exiting") + return + } +} + +func (*Domain) deltaUpdateTickerWorkerExitPreprocessing(statsHandle *handle.Handle) { + ch := make(chan struct{}, 1) + timeout, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + go func() { + logutil.BgLogger().Info("deltaUpdateTicker is going to exit, start to flush stats") + statsHandle.FlushStats() + ch <- struct{}{} + }() + select { + case <-ch: + logutil.BgLogger().Info("deltaUpdateTicker exit preprocessing finished") return case <-timeout.Done(): - logutil.BgLogger().Warn("updateStatsWorker exit preprocessing timeout, force exiting") + logutil.BgLogger().Warn("deltaUpdateTicker exit preprocessing timeout, force exiting") return } } -func (do *Domain) updateStatsWorker(_ sessionctx.Context) { - defer util.Recover(metrics.LabelDomain, "updateStatsWorker", nil, false) - logutil.BgLogger().Info("updateStatsWorker started.") +func (do *Domain) gcStatsWorker() { + defer util.Recover(metrics.LabelDomain, "gcStatsWorker", nil, false) + logutil.BgLogger().Info("gcStatsWorker started.") lease := do.statsLease - // We need to have different nodes trigger tasks at different times to avoid the herd effect. - randDuration := time.Duration(rand.Int63n(int64(time.Minute))) - deltaUpdateTicker := time.NewTicker(20*lease + randDuration) gcStatsTicker := time.NewTicker(100 * lease) - dumpColStatsUsageTicker := time.NewTicker(100 * lease) updateStatsHealthyTicker := time.NewTicker(20 * lease) readMemTicker := time.NewTicker(memory.ReadMemInterval) statsHandle := do.StatsHandle() defer func() { - dumpColStatsUsageTicker.Stop() gcStatsTicker.Stop() - deltaUpdateTicker.Stop() readMemTicker.Stop() updateStatsHealthyTicker.Stop() do.SetStatsUpdating(false) - logutil.BgLogger().Info("updateStatsWorker exited.") + logutil.BgLogger().Info("gcStatsWorker exited.") }() - defer util.Recover(metrics.LabelDomain, "updateStatsWorker", nil, false) + defer util.Recover(metrics.LabelDomain, "gcStatsWorker", nil, false) for { select { case <-do.exit: - do.updateStatsWorkerExitPreprocessing(statsHandle) + do.gcStatsWorkerExitPreprocessing() return - case <-deltaUpdateTicker.C: - err := statsHandle.DumpStatsDeltaToKV(false) - if err != nil { - logutil.BgLogger().Warn("dump stats delta failed", zap.Error(err)) - } case <-gcStatsTicker.C: if !do.statsOwner.IsOwner() { continue @@ -2710,11 +2718,6 @@ func (do *Domain) updateStatsWorker(_ sessionctx.Context) { logutil.BgLogger().Warn("GC stats failed", zap.Error(err)) } do.CheckAutoAnalyzeWindows() - case <-dumpColStatsUsageTicker.C: - err := statsHandle.DumpColStatsUsageToKV() - if err != nil { - logutil.BgLogger().Warn("dump column stats usage failed", zap.Error(err)) - } case <-readMemTicker.C: memory.ForceReadMemStats() do.StatsHandle().StatsCache.TriggerEvict() @@ -2724,6 +2727,52 @@ func (do *Domain) updateStatsWorker(_ sessionctx.Context) { } } +func (do *Domain) dumpColStatsUsageWorker() { + logutil.BgLogger().Info("dumpColStatsUsageWorker started.") + lease := do.statsLease + dumpColStatsUsageTicker := time.NewTicker(100 * lease) + statsHandle := do.StatsHandle() + defer func() { + dumpColStatsUsageTicker.Stop() + logutil.BgLogger().Info("dumpColStatsUsageWorker exited.") + }() + defer util.Recover(metrics.LabelDomain, "dumpColStatsUsageWorker", nil, false) + + for { + select { + case <-do.exit: + return + case <-dumpColStatsUsageTicker.C: + err := statsHandle.DumpColStatsUsageToKV() + if err != nil { + logutil.BgLogger().Warn("dump column stats usage failed", zap.Error(err)) + } + } + } +} + +func (do *Domain) deltaUpdateTickerWorker() { + defer util.Recover(metrics.LabelDomain, "deltaUpdateTickerWorker", nil, false) + logutil.BgLogger().Info("deltaUpdateTickerWorker started.") + lease := do.statsLease + // We need to have different nodes trigger tasks at different times to avoid the herd effect. + randDuration := time.Duration(rand.Int63n(int64(time.Minute))) + deltaUpdateTicker := time.NewTicker(20*lease + randDuration) + statsHandle := do.StatsHandle() + for { + select { + case <-do.exit: + do.deltaUpdateTickerWorkerExitPreprocessing(statsHandle) + return + case <-deltaUpdateTicker.C: + err := statsHandle.DumpStatsDeltaToKV(false) + if err != nil { + logutil.BgLogger().Warn("dump stats delta failed", zap.Error(err)) + } + } + } +} + func (do *Domain) autoAnalyzeWorker() { defer util.Recover(metrics.LabelDomain, "autoAnalyzeWorker", nil, false) statsHandle := do.StatsHandle() diff --git a/pkg/domain/domain_test.go b/pkg/domain/domain_test.go index 3a4665c376eaa..3f58566521d9f 100644 --- a/pkg/domain/domain_test.go +++ b/pkg/domain/domain_test.go @@ -181,7 +181,7 @@ func TestStatWorkRecoverFromPanic(t *testing.T) { metrics.PanicCounter.Reset() // Since the stats lease is 0 now, so create a new ticker will panic. // Test that they can recover from panic correctly. - dom.updateStatsWorker(mock.NewContext()) + dom.gcStatsWorker() dom.autoAnalyzeWorker() counter := metrics.PanicCounter.WithLabelValues(metrics.LabelDomain) pb := &dto.Metric{}