Skip to content

Commit

Permalink
runaway: fix race problem when using multiple coprocessor workers (#5…
Browse files Browse the repository at this point in the history
  • Loading branch information
nolouch authored Jan 17, 2025
1 parent 889bf45 commit 365a722
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 10 deletions.
10 changes: 8 additions & 2 deletions pkg/resourcegroup/runaway/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,14 @@ go_library(
go_test(
name = "runaway_test",
timeout = "short",
srcs = ["record_test.go"],
srcs = [
"checker_test.go",
"record_test.go",
],
embed = [":runaway"],
flaky = True,
deps = ["@com_github_stretchr_testify//assert"],
deps = [
"@com_github_stretchr_testify//assert",
"@com_github_tikv_client_go_v2//util",
],
)
19 changes: 11 additions & 8 deletions pkg/resourcegroup/runaway/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,20 @@ type Checker struct {
deadline time.Time
ruThreshold int64
processedKeysThreshold int64
// using total processed_keys to accumulate all coprocessor tasks.
totalProcessedKeys int64
// From the group runaway settings, which will be applied when a query lacks a specified watch rule.
settings *rmpb.RunawaySettings

// watchAction is the specified watch action for the runaway query.
// If it's not given, the action defined in `settings` will be used.
watchAction rmpb.RunawayAction

// mutable fields below
// using total processed_keys to accumulate all coprocessor tasks.
totalProcessedKeys int64
// markedByIdentifyInRunawaySettings is set to true when the query matches the group runaway settings.
markedByIdentifyInRunawaySettings atomic.Bool
// markedByQueryWatchRule is set to true when the query matches the specified watch rules.
markedByQueryWatchRule bool
// watchAction is the specified watch action for the runaway query.
// If it's not given, the action defined in `settings` will be used.
watchAction rmpb.RunawayAction
}

// NewChecker creates a new RunawayChecker.
Expand Down Expand Up @@ -328,8 +330,9 @@ func (r *Checker) CheckThresholds(ruDetail *util.RUDetails, processKeys int64, e
checkTime = now
}
// add the processed keys to the total processed keys.
r.totalProcessedKeys += processKeys
exceedCause := r.exceedsThresholds(checkTime, ruDetail, r.totalProcessedKeys)
atomic.AddInt64(&r.totalProcessedKeys, processKeys)
totalProcessedKeys := atomic.LoadInt64(&r.totalProcessedKeys)
exceedCause := r.exceedsThresholds(checkTime, ruDetail, totalProcessedKeys)
if !r.markedByIdentifyInRunawaySettings.Load() {
if exceedCause != "" && r.markRunawayByIdentifyInRunawaySettings(&now, exceedCause) {
if r.markRunawayByIdentifyInRunawaySettings(&now, exceedCause) {
Expand Down Expand Up @@ -405,5 +408,5 @@ func (r *Checker) ResetTotalProcessedKeys() {
if r == nil {
return
}
r.totalProcessedKeys = 0
atomic.StoreInt64(&r.totalProcessedKeys, 0)
}
64 changes: 64 additions & 0 deletions pkg/resourcegroup/runaway/checker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package runaway

import (
"sync"
"sync/atomic"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/tikv/client-go/v2/util"
)

func TestConcurrentResetAndCheckThresholds(t *testing.T) {
checker := &Checker{}

// Simulate concurrent calls to ResetTotalProcessedKeys and CheckThresholds
var wg sync.WaitGroup
numGoroutines := 5
processKeys := int64(10)

// Goroutines for CheckThresholds
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 100; j++ {
_ = checker.CheckThresholds(&util.RUDetails{}, processKeys, nil)
}
}()
}

// Goroutines for ResetTotalProcessedKeys
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < 100; j++ {
checker.ResetTotalProcessedKeys()
time.Sleep(time.Millisecond) // simulate some delay
}
}()
}

// Wait for all goroutines to finish
wg.Wait()

// Final check to ensure no race conditions occurred
finalValue := atomic.LoadInt64(&checker.totalProcessedKeys)
assert.GreaterOrEqual(t, finalValue, int64(0), "unexpected negative totalProcessedKeys value")
}

0 comments on commit 365a722

Please sign in to comment.