Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support KeyRange in --clusters_to_watch flag #17604

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changelog/22.0/22.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
- **[Stalled Disk Recovery in VTOrc](#stall-disk-recovery)**
- **[Update default MySQL version to 8.0.40](#mysql-8-0-40)**
- **[Update lite images to Debian Bookworm](#debian-bookworm)**
- **[KeyRanges in `--clusters_to_watch` in VTOrc](#key-range-vtorc)**
- **[Support for Filtering Query logs on Error](#query-logs)**
- **[Minor Changes](#minor-changes)**
- **[VTTablet Flags](#flags-vttablet)**
Expand Down Expand Up @@ -133,6 +134,11 @@ This is the last time this will be needed in the `8.0.x` series, as starting wit

The base system now uses Debian Bookworm instead of Debian Bullseye for the `vitess/lite` images. This change was brought by [Pull Request #17552].

### <a id="key-range-vtorc"/>KeyRanges in `--clusters_to_watch` in VTOrc</a>
VTOrc now supports specifying KeyRanges in the `--clusters_to_watch` flag. This is useful in scenarios where you don't need to restart a VTOrc instance if you run a reshard.
For example, if a VTOrc is configured to watch `ks/-80`, then it would watch all the shards that fall under the KeyRange `-80`. If a reshard is run and, `-80` is split into new shards `-40`, and `40-80`, the VTOrc instance will automatically start watching the new shard without needing a restart.
Copy link
Contributor

@timvaillancourt timvaillancourt Jan 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@GuptaManan100 I think this explains the "new" behaviour very well, but I feel like a user of the old way won't fully understand the risks

For example, a user with --clusters_to_watch foo/-80 in the previous logic will watch 1 (or 0) shards only. After this change, with the same flag value they would watch any shard from - to 80, which could be surprising. So I guess I'm saying explaining the difference between the old logic and this PR might make this risk more clear

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What should I add to make it more clear? Should I add a line something like -
In the previous logic, watching a shard -80 would watch 1 (or 0) shard only. In the new system, since we interpret -80 as a key range, it can lead to a watch on multiple shards as described in the example.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@GuptaManan100 that extra detail is great 👍

The users can still continue to specify exact key ranges too, and the new feature is backward compatible.

### <a id="query-logs"/>Support for Filtering Query logs on Error</a>

The `querylog-mode` setting can be configured to `error` to log only queries that result in errors. This option is supported in both VTGate and VTTablet.
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtorc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Flags:
--bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system.
--catch-sigpipe catch and ignore SIGPIPE on stdout and stderr if specified
--change-tablets-with-errant-gtid-to-drained Whether VTOrc should be changing the type of tablets with errant GTIDs to DRAINED
--clusters_to_watch strings Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: "ks1,ks2/-80"
--clusters_to_watch strings Comma-separated list of keyspaces or keyspace/key-ranges that this instance will monitor and repair. Defaults to all clusters in the topology. Example: "ks1,ks2/-80"
--config-file string Full path of the config file (with extension) to use. If set, --config-path, --config-type, and --config-name are ignored.
--config-file-not-found-handling ConfigFileNotFoundHandling Behavior when a config file is not found. (Options: error, exit, ignore, warn) (default warn)
--config-name string Name of the config file (without extension) to search for. (default "vtconfig")
Expand Down
8 changes: 8 additions & 0 deletions go/vt/key/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,14 @@ func NewKeyRange(start []byte, end []byte) *topodatapb.KeyRange {
return &topodatapb.KeyRange{Start: start, End: end}
}

// NewCompleteKeyRange returns a complete key range.
func NewCompleteKeyRange() *topodatapb.KeyRange {
return &topodatapb.KeyRange{
Start: []byte{},
End: []byte{},
}
}

// KeyRangeAdd adds two adjacent KeyRange values (in any order) into a single value. If the values are not adjacent,
// it returns false.
func KeyRangeAdd(a, b *topodatapb.KeyRange) (*topodatapb.KeyRange, bool) {
Expand Down
8 changes: 8 additions & 0 deletions go/vt/topo/shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,14 @@ func TestValidateShardName(t *testing.T) {
},
valid: true,
},
{
name: "-",
expectedRange: &topodatapb.KeyRange{
Start: []byte{},
End: []byte{},
},
valid: true,
},
{
name: "40-80",
expectedRange: &topodatapb.KeyRange{
Expand Down
26 changes: 5 additions & 21 deletions go/vt/vtorc/logic/keyspace_shard_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ package logic

import (
"context"
"sort"
"strings"
"sync"

"golang.org/x/exp/maps"

"vitess.io/vitess/go/vt/log"

"vitess.io/vitess/go/vt/topo"
Expand All @@ -31,7 +31,7 @@ import (
// RefreshAllKeyspacesAndShards reloads the keyspace and shard information for the keyspaces that vtorc is concerned with.
func RefreshAllKeyspacesAndShards(ctx context.Context) error {
var keyspaces []string
if len(clustersToWatch) == 0 { // all known keyspaces
if len(shardsToWatch) == 0 { // all known keyspaces
ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer cancel()
var err error
Expand All @@ -41,26 +41,10 @@ func RefreshAllKeyspacesAndShards(ctx context.Context) error {
return err
}
} else {
// Parse input and build list of keyspaces
for _, ks := range clustersToWatch {
if strings.Contains(ks, "/") {
// This is a keyspace/shard specification
input := strings.Split(ks, "/")
keyspaces = append(keyspaces, input[0])
} else {
// Assume this is a keyspace
keyspaces = append(keyspaces, ks)
}
}
if len(keyspaces) == 0 {
log.Errorf("Found no keyspaces for input: %+v", clustersToWatch)
return nil
}
// Get keyspaces to watch from the list of known keyspaces.
keyspaces = maps.Keys(shardsToWatch)
}

// Sort the list of keyspaces.
// The list can have duplicates because the input to clusters to watch may have multiple shards of the same keyspace
sort.Strings(keyspaces)
refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout)
defer refreshCancel()
var wg sync.WaitGroup
Expand Down
3 changes: 2 additions & 1 deletion go/vt/vtorc/logic/keyspace_shard_discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func TestRefreshAllKeyspaces(t *testing.T) {
// Set clusters to watch to only watch ks1 and ks3
onlyKs1and3 := []string{"ks1/-80", "ks3/-80", "ks3/80-"}
clustersToWatch = onlyKs1and3
initializeShardsToWatch()
require.NoError(t, RefreshAllKeyspacesAndShards(context.Background()))

// Verify that we only have ks1 and ks3 in vtorc's db.
Expand All @@ -106,6 +107,7 @@ func TestRefreshAllKeyspaces(t *testing.T) {

// Set clusters to watch to watch all keyspaces
clustersToWatch = nil
initializeShardsToWatch()
// Change the durability policy of ks1
reparenttestutil.SetKeyspaceDurability(ctx, t, ts, "ks1", policy.DurabilitySemiSync)
require.NoError(t, RefreshAllKeyspacesAndShards(context.Background()))
Expand All @@ -119,7 +121,6 @@ func TestRefreshAllKeyspaces(t *testing.T) {
verifyPrimaryAlias(t, "ks3", "80-", "zone_ks3-0000000101", "")
verifyKeyspaceInfo(t, "ks4", keyspaceDurabilityTest, "")
verifyPrimaryAlias(t, "ks4", "80-", "zone_ks4-0000000101", "")

}

func TestRefreshKeyspace(t *testing.T) {
Expand Down
86 changes: 50 additions & 36 deletions go/vt/vtorc/logic/tablet_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/vt/external/golib/sqlutils"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/topo"
Expand All @@ -48,27 +49,29 @@ var (
clustersToWatch []string
shutdownWaitTime = 30 * time.Second
shardsLockCounter int32
shardsToWatch map[string][]string
shardsToWatchMu sync.Mutex
// shardsToWatch is a map storing the shards for a given keyspace that need to be watched.
// We store the key range for all the shards that we want to watch.
// This is populated by parsing `--clusters_to_watch` flag.
shardsToWatch map[string][]*topodatapb.KeyRange

// ErrNoPrimaryTablet is a fixed error message.
ErrNoPrimaryTablet = errors.New("no primary tablet found")
)

// RegisterFlags registers the flags required by VTOrc
func RegisterFlags(fs *pflag.FlagSet) {
fs.StringSliceVar(&clustersToWatch, "clusters_to_watch", clustersToWatch, "Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"")
fs.StringSliceVar(&clustersToWatch, "clusters_to_watch", clustersToWatch, "Comma-separated list of keyspaces or keyspace/key-ranges that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"")
fs.DurationVar(&shutdownWaitTime, "shutdown_wait_time", shutdownWaitTime, "Maximum time to wait for VTOrc to release all the locks that it is holding before shutting down on SIGTERM")
}

// updateShardsToWatch parses the --clusters_to_watch flag-value
// initializeShardsToWatch parses the --clusters_to_watch flag-value
// into a map of keyspace/shards.
func updateShardsToWatch() {
func initializeShardsToWatch() {
shardsToWatch = make(map[string][]*topodatapb.KeyRange)
if len(clustersToWatch) == 0 {
return
}

newShardsToWatch := make(map[string][]string, 0)
for _, ks := range clustersToWatch {
if strings.Contains(ks, "/") && !strings.HasSuffix(ks, "/") {
// Validate keyspace/shard parses.
Expand All @@ -77,34 +80,51 @@ func updateShardsToWatch() {
log.Errorf("Could not parse keyspace/shard %q: %+v", ks, err)
continue
}
newShardsToWatch[k] = append(newShardsToWatch[k], s)
} else {
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
defer cancel()
// Assume this is a keyspace and find all shards in keyspace.
// Remove trailing slash if exists.
ks = strings.TrimSuffix(ks, "/")
shards, err := ts.GetShardNames(ctx, ks)
// Parse the shard name into key range value.
_, keyRange, err := topo.ValidateShardName(s)
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
// Log the err and continue.
log.Errorf("Error fetching shards for keyspace: %v", ks)
continue
log.Errorf("Could not parse shard name %q: %+v", s, err)
}
if len(shards) == 0 {
log.Errorf("Topo has no shards for ks: %v", ks)
continue
// If the key range is nil, then the user is not using RangeBased Sharding.
// So we want to watch all the shards of the keyspace.
if keyRange == nil {
keyRange = key.NewCompleteKeyRange()
GuptaManan100 marked this conversation as resolved.
Show resolved Hide resolved
}
newShardsToWatch[ks] = shards
shardsToWatch[k] = append(shardsToWatch[k], keyRange)
} else {
// Remove trailing slash if exists.
ks = strings.TrimSuffix(ks, "/")
// We store the entire range of key range if nothing is specified.
shardsToWatch[ks] = []*topodatapb.KeyRange{key.NewCompleteKeyRange()}
}
}
if len(newShardsToWatch) == 0 {
log.Error("No keyspace/shards to watch")
return

if len(shardsToWatch) == 0 {
log.Error("No keyspace/shards to watch, watching all keyspaces")
}
}

shardsToWatchMu.Lock()
defer shardsToWatchMu.Unlock()
shardsToWatch = newShardsToWatch
// tabletPartOfWatch checks if the given tablet is part of the watch list.
func tabletPartOfWatch(tablet *topodatapb.Tablet) bool {
// If we are watching all keyspaces, then we want to watch this tablet too.
if len(shardsToWatch) == 0 {
return true
}
shardRanges, ok := shardsToWatch[tablet.GetKeyspace()]
// If we don't have the keyspace in our map, then this tablet
// doesn't need to be watched.
if !ok {
return false
}
// Get the tablet's key range, and check if
// it is part of the shard ranges we are watching.
kr := tablet.GetKeyRange()
for _, shardRange := range shardRanges {
if key.KeyRangeContainsKeyRange(shardRange, kr) {
return true
}
}
return false
}

// OpenTabletDiscovery opens the vitess topo if enables and returns a ticker
Expand All @@ -117,7 +137,7 @@ func OpenTabletDiscovery() <-chan time.Time {
log.Error(err)
}
// Parse --clusters_to_watch into a filter.
updateShardsToWatch()
initializeShardsToWatch()
// We refresh all information from the topo once before we start the ticks to do
// it on a timer.
ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout)
Expand Down Expand Up @@ -179,16 +199,10 @@ func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), f
// Filter tablets that should not be watched using shardsToWatch map.
matchedTablets := make([]*topo.TabletInfo, 0, len(tablets))
func() {
shardsToWatchMu.Lock()
defer shardsToWatchMu.Unlock()
for _, t := range tablets {
if len(shardsToWatch) > 0 {
_, ok := shardsToWatch[t.Tablet.Keyspace]
if !ok || !slices.Contains(shardsToWatch[t.Tablet.Keyspace], t.Tablet.Shard) {
continue // filter
}
if tabletPartOfWatch(t.Tablet) {
matchedTablets = append(matchedTablets, t)
}
matchedTablets = append(matchedTablets, t)
}
}()

Expand Down
Loading
Loading