Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: integrate circuitbreaker for get region calls to PD #58737

Merged
merged 8 commits into from
Jan 24, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/domain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ go_library(
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//http",
"@com_github_tikv_pd_client//opt",
"@com_github_tikv_pd_client//pkg/circuitbreaker",
"@com_github_tikv_pd_client//resource_group/controller",
"@io_etcd_go_etcd_client_v3//:client",
"@io_etcd_go_etcd_client_v3//concurrency",
Expand Down
10 changes: 10 additions & 0 deletions pkg/domain/domain_sysvars.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import (
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta"
"github.com/pingcap/tidb/pkg/sessionctx/variable"
"github.com/tikv/client-go/v2/tikv"
pd "github.com/tikv/pd/client"
"github.com/tikv/pd/client/opt"
"github.com/tikv/pd/client/pkg/circuitbreaker"
)

// initDomainSysVars() is called when a domain is initialized.
Expand All @@ -45,6 +47,8 @@ func (do *Domain) initDomainSysVars() {
variable.SetLowResolutionTSOUpdateInterval = do.setLowResolutionTSOUpdateInterval

variable.ChangeSchemaCacheSize = do.changeSchemaCacheSize

variable.ChangePDMetadataCircuitBreakerErrorRateThresholdPct = changePDMetadataCircuitBreakerErrorRateThresholdPct
}

// setStatsCacheCapacity sets statsCache cap
Expand Down Expand Up @@ -150,3 +154,9 @@ func (do *Domain) changeSchemaCacheSize(ctx context.Context, size uint64) error
do.infoCache.Data.SetCacheCapacity(size)
return nil
}

func changePDMetadataCircuitBreakerErrorRateThresholdPct(errorRatePct uint32) {
tikv.ChangePDRegionMetaCircuitBreakerSettings(func(config *circuitbreaker.Settings) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this method does not need the Domain state, why not just call tikv. ChangePDRegionMetaCircuitBreakerSettings in SetGlobal ?

config.ErrorRateThresholdPct = errorRatePct
})
}
8 changes: 8 additions & 0 deletions pkg/sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -3447,6 +3447,14 @@ var defaultSysVars = []*SysVar{
return (*SetPDClientDynamicOption.Load())(TiDBTSOClientRPCMode, val)
},
},
{Scope: ScopeGlobal, Name: TiDBCircuitBreakerPDMetadataErrorRateThresholdPct, Value: strconv.Itoa(DefTiDBCircuitBreakerPDMetaErrorRatePct), Type: TypeUnsigned, MinValue: 0, MaxValue: 100,
SetGlobal: func(_ context.Context, s *SessionVars, val string) error {
if ChangePDMetadataCircuitBreakerErrorRateThresholdPct != nil {
ChangePDMetadataCircuitBreakerErrorRateThresholdPct(uint32(tidbOptPositiveInt32(val, DefTiDBCircuitBreakerPDMetaErrorRatePct)))
}
return nil
},
},
}

// GlobalSystemVariableInitialValue gets the default value for a system variable including ones that are dynamically set (e.g. based on the store)
Expand Down
24 changes: 24 additions & 0 deletions pkg/sessionctx/variable/sysvar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1742,6 +1742,30 @@ func TestTiDBSchemaCacheSize(t *testing.T) {
require.Error(t, err)
}

func TestTiDBCircuitBreakerPDMetadataErrorRateThresholdPct(t *testing.T) {
sv := GetSysVar(TiDBCircuitBreakerPDMetadataErrorRateThresholdPct)
vars := NewSessionVars(nil)

// Too low, will get raised to the min value
val, err := sv.Validate(vars, "-1", ScopeGlobal)
require.NoError(t, err)
require.Equal(t, strconv.FormatInt(GetSysVar(TiDBCircuitBreakerPDMetadataErrorRateThresholdPct).MinValue, 10), val)
warn := vars.StmtCtx.GetWarnings()[0].Err
require.Equal(t, "[variable:1292]Truncated incorrect tidb_cb_pd_metadata_error_rate_threshold_pct value: '-1'", warn.Error())

// Too high, will get lowered to the max value
val, err = sv.Validate(vars, "101", ScopeGlobal)
require.NoError(t, err)
require.Equal(t, strconv.FormatUint(GetSysVar(TiDBCircuitBreakerPDMetadataErrorRateThresholdPct).MaxValue, 10), val)
warn = vars.StmtCtx.GetWarnings()[1].Err
require.Equal(t, "[variable:1292]Truncated incorrect tidb_cb_pd_metadata_error_rate_threshold_pct value: '101'", warn.Error())

// valid
val, err = sv.Validate(vars, "10", ScopeGlobal)
require.NoError(t, err)
require.Equal(t, "10", val)
}

func TestEnableWindowFunction(t *testing.T) {
vars := NewSessionVars(nil)
require.Equal(t, vars.EnableWindowFunction, DefEnableWindowFunction)
Expand Down
6 changes: 6 additions & 0 deletions pkg/sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -1218,6 +1218,9 @@ const (
// TiDBTSOClientRPCMode controls how the TSO client performs the TSO RPC requests. It internally controls the
// concurrency of the RPC. This variable provides an approach to tune the latency of getting timestamps from PD.
TiDBTSOClientRPCMode = "tidb_tso_client_rpc_mode"
// TiDBCircuitBreakerPDMetadataErrorRateThresholdPct variable is used to set percent of errors to trip the circuit breaker for get region calls to PD
// https://github.com/tikv/rfcs/blob/master/text/0115-circuit-breaker.md
TiDBCircuitBreakerPDMetadataErrorRateThresholdPct = "tidb_cb_pd_metadata_error_rate_threshold_pct"
)

// TiDB intentional limits
Expand Down Expand Up @@ -1571,6 +1574,7 @@ const (
DefOptEnableProjectionPushDown = true
DefTiDBEnableSharedLockPromotion = false
DefTiDBTSOClientRPCMode = TSOClientRPCModeDefault
DefTiDBCircuitBreakerPDMetaErrorRatePct = 0
)

// Process global variables.
Expand Down Expand Up @@ -1730,6 +1734,8 @@ var (
EnableStatsOwner func() error = nil
// DisableStatsOwner is the func registered by stats to disable running stats in this instance.
DisableStatsOwner func() error = nil
// ChangePDMetadataCircuitBreakerErrorRateThresholdPct changes the error rate threshold of the PD metadata circuit breaker.
ChangePDMetadataCircuitBreakerErrorRateThresholdPct func(uint32) = nil
)

// Hooks functions for Cluster Resource Control.
Expand Down