Skip to content

Commit

Permalink
br: add retry for raw kv client put (#58963) (#59082)
Browse files Browse the repository at this point in the history
close #58845
  • Loading branch information
ti-chi-bot authored Jan 22, 2025
1 parent b0819b6 commit 5fbcd40
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 4 deletions.
12 changes: 11 additions & 1 deletion br/pkg/restore/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3278,7 +3278,7 @@ func (rc *Client) restoreMetaKvEntries(
failpoint.Inject("failed-to-restore-metakv", func(_ failpoint.Value) {
failpoint.Return(0, 0, errors.Errorf("failpoint: failed to restore metakv"))
})
if err := rc.rawKVClient.Put(ctx, newEntry.Key, newEntry.Value, entry.ts); err != nil {
if err := PutRawKvWithRetry(ctx, rc.rawKVClient, newEntry.Key, newEntry.Value, entry.ts); err != nil {
return 0, 0, errors.Trace(err)
}
// for failpoint, we need to flush the cache in rawKVClient every time
Expand Down Expand Up @@ -3867,3 +3867,13 @@ func (b *waitTiFlashBackoffer) NextBackoff(error) time.Duration {
func (b *waitTiFlashBackoffer) Attempt() int {
return b.Attempts
}

func PutRawKvWithRetry(ctx context.Context, client *RawKVBatchClient, key, value []byte, originTs uint64) error {
err := utils.WithRetry(ctx, func() error {
return client.Put(ctx, key, value, originTs)
}, utils.NewRawClientBackoffStrategy())
if err != nil {
return errors.Errorf("failed to put raw kv after retry")
}
return nil
}
67 changes: 67 additions & 0 deletions br/pkg/restore/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/tidb/pkg/tablecodec"
filter "github.com/pingcap/tidb/pkg/util/table-filter"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/rawkv"
pd "github.com/tikv/pd/client"
"google.golang.org/grpc/keepalive"
)
Expand Down Expand Up @@ -2007,3 +2008,69 @@ func TestCheckNewCollationEnable(t *testing.T) {
require.Equal(t, ca.newCollationEnableInCluster == "True", enabled)
}
}

type mockRawKVClient struct {
rawkv.Client
putCount int
errThreshold int
}

func (m *mockRawKVClient) BatchPut(ctx context.Context, keys, values [][]byte, options ...rawkv.RawOption) error {
m.putCount += 1
if m.errThreshold >= m.putCount {
return errors.New("rpcClient is idle")
}
return nil
}

func TestPutRawKvWithRetry(t *testing.T) {
tests := []struct {
name string
errThreshold int
cancelAfter time.Duration
wantErr string
wantPuts int
}{
{
name: "success on first try",
errThreshold: 0,
wantPuts: 1,
},
{
name: "success on after failure",
errThreshold: 2,
wantPuts: 3,
},
{
name: "fails all retries",
errThreshold: 5,
wantErr: "failed to put raw kv after retry",
wantPuts: 5,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
mockRawClient := &mockRawKVClient{
errThreshold: tt.errThreshold,
}
client := restore.NewRawKVBatchClient(mockRawClient, 1)

ctx := context.Background()
if tt.cancelAfter > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, tt.cancelAfter)
defer cancel()
}

err := restore.PutRawKvWithRetry(ctx, client, []byte("key"), []byte("value"), 1)

if tt.wantErr != "" {
require.ErrorContains(t, err, tt.wantErr)
} else {
require.NoError(t, err)
}
require.Equal(t, tt.wantPuts, mockRawClient.putCount)
})
}
}
36 changes: 36 additions & 0 deletions br/pkg/utils/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ const (
ChecksumWaitInterval = 1 * time.Second
ChecksumMaxWaitInterval = 30 * time.Second

rawClientMaxAttempts = 5
rawClientDelayTime = 500 * time.Millisecond
rawClientMaxDelayTime = 5 * time.Second

gRPC_Cancel = "the client connection is closing"
)

Expand Down Expand Up @@ -280,3 +284,35 @@ func (bo *pdReqBackoffer) NextBackoff(err error) time.Duration {
func (bo *pdReqBackoffer) Attempt() int {
return bo.attempt
}

type RawClientBackoffStrategy struct {
Attempts int
BaseBackoff time.Duration
MaxBackoff time.Duration
}

func NewRawClientBackoffStrategy() Backoffer {
return &RawClientBackoffStrategy{
Attempts: rawClientMaxAttempts,
BaseBackoff: rawClientDelayTime,
MaxBackoff: rawClientMaxAttempts,
}
}

// NextBackoff returns a duration to wait before retrying again
func (b *RawClientBackoffStrategy) NextBackoff(error) time.Duration {
bo := b.BaseBackoff
b.Attempts--
if b.Attempts == 0 {
return 0
}
b.BaseBackoff *= 2
if b.BaseBackoff > b.MaxBackoff {
b.BaseBackoff = b.MaxBackoff
}
return bo
}

func (b *RawClientBackoffStrategy) Attempt() int {
return b.Attempts
}
6 changes: 3 additions & 3 deletions build/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package(default_visibility = ["//visibility:public"])

load("@io_bazel_rules_go//go:def.bzl", "go_library", "nogo")
load("@bazel_skylib//rules:common_settings.bzl", "bool_flag")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "nogo")
load("//build/linter/staticcheck:def.bzl", "staticcheck_analyzers")

package(default_visibility = ["//visibility:public"])

bool_flag(
name = "with_nogo_flag",
build_setting_default = False,
Expand Down

0 comments on commit 5fbcd40

Please sign in to comment.