Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

br: add retry for raw kv client put (#58963) #59078

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2521,7 +2521,7 @@
log.Debug("after rewrite entry", zap.Int("new-key-len", len(newEntry.Key)),
zap.Int("new-value-len", len(entry.e.Value)), zap.ByteString("new-key", newEntry.Key))

if err := rc.rawKVClient.Put(ctx, newEntry.Key, newEntry.Value, entry.ts); err != nil {
if err := PutRawKvWithRetry(ctx, rc.rawKVClient, newEntry.Key, newEntry.Value, entry.ts); err != nil {

Check warning on line 2524 in br/pkg/restore/client.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/restore/client.go#L2524

Added line #L2524 was not covered by tests
return 0, 0, errors.Trace(err)
}

Expand Down Expand Up @@ -2933,3 +2933,13 @@
func (b *waitTiFlashBackoffer) Attempt() int {
return b.Attempts
}

func PutRawKvWithRetry(ctx context.Context, client *RawKVBatchClient, key, value []byte, originTs uint64) error {
err := utils.WithRetry(ctx, func() error {
return client.Put(ctx, key, value, originTs)
}, utils.NewRawClientBackoffStrategy())
if err != nil {
return errors.Errorf("failed to put raw kv after retry")
}
return nil
}
67 changes: 67 additions & 0 deletions br/pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/parser/types"
"github.com/pingcap/tidb/tablecodec"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/rawkv"
pd "github.com/tikv/pd/client"
"golang.org/x/exp/slices"
"google.golang.org/grpc/keepalive"
Expand Down Expand Up @@ -1865,3 +1866,69 @@ func TestCheckNewCollationEnable(t *testing.T) {
require.Equal(t, ca.newCollationEnableInCluster == "True", enabled)
}
}

type mockRawKVClient struct {
restore.RawkvClient
putCount int
errThreshold int
}

func (m *mockRawKVClient) BatchPut(ctx context.Context, keys, values [][]byte, options ...rawkv.RawOption) error {
m.putCount += 1
if m.errThreshold >= m.putCount {
return errors.New("rpcClient is idle")
}
return nil
}

func TestPutRawKvWithRetry(t *testing.T) {
tests := []struct {
name string
errThreshold int
cancelAfter time.Duration
wantErr string
wantPuts int
}{
{
name: "success on first try",
errThreshold: 0,
wantPuts: 1,
},
{
name: "success on after failure",
errThreshold: 2,
wantPuts: 3,
},
{
name: "fails all retries",
errThreshold: 5,
wantErr: "failed to put raw kv after retry",
wantPuts: 5,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mockRawClient := &mockRawKVClient{
errThreshold: tt.errThreshold,
}
client := restore.NewRawKVBatchClient(mockRawClient, 1)

ctx := context.Background()
if tt.cancelAfter > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, tt.cancelAfter)
defer cancel()
}

err := restore.PutRawKvWithRetry(ctx, client, []byte("key"), []byte("value"), 1)

if tt.wantErr != "" {
require.ErrorContains(t, err, tt.wantErr)
} else {
require.NoError(t, err)
}
require.Equal(t, tt.wantPuts, mockRawClient.putCount)
})
}
}
Empty file.
36 changes: 36 additions & 0 deletions br/pkg/utils/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@
flashbackRetryTime = 3
flashbackWaitInterval = 3000 * time.Millisecond
flashbackMaxWaitInterval = 15 * time.Second

rawClientMaxAttempts = 5
rawClientDelayTime = 500 * time.Millisecond
rawClientMaxDelayTime = 5 * time.Second
)

// ConstantBackoff is a backoffer that retry forever until success.
Expand Down Expand Up @@ -289,3 +293,35 @@
func (bo *flashbackBackoffer) Attempt() int {
return bo.attempt
}

type RawClientBackoffStrategy struct {
Attempts int
BaseBackoff time.Duration

Check warning on line 299 in br/pkg/utils/backoff.go

View check run for this annotation

Codecov / codecov/patch

br/pkg/utils/backoff.go#L298-L299

Added lines #L298 - L299 were not covered by tests
MaxBackoff time.Duration
}

func NewRawClientBackoffStrategy() Backoffer {
return &RawClientBackoffStrategy{
Attempts: rawClientMaxAttempts,
BaseBackoff: rawClientDelayTime,
MaxBackoff: rawClientMaxDelayTime,
}
}

// NextBackoff returns a duration to wait before retrying again
func (b *RawClientBackoffStrategy) NextBackoff(error) time.Duration {
bo := b.BaseBackoff
b.Attempts--
if b.Attempts == 0 {
return 0
}
b.BaseBackoff *= 2
if b.BaseBackoff > b.MaxBackoff {
b.BaseBackoff = b.MaxBackoff
}
return bo
}

func (b *RawClientBackoffStrategy) Attempt() int {
return b.Attempts
}
6 changes: 3 additions & 3 deletions build/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package(default_visibility = ["//visibility:public"])

load("@io_bazel_rules_go//go:def.bzl", "nogo")
load("@bazel_skylib//rules:common_settings.bzl", "bool_flag")
load("@io_bazel_rules_go//go:def.bzl", "nogo")
load("//build/linter/staticcheck:def.bzl", "staticcheck_analyzers")

package(default_visibility = ["//visibility:public"])

bool_flag(
name = "with_nogo_flag",
build_setting_default = False,
Expand Down