From 8bb50ba555304e7d72621cf94568951c965537a6 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Wed, 22 Jan 2025 09:42:16 +0530 Subject: [PATCH 1/6] feat: change how clusters to watch to accept shard ranges instead of exact shard values Signed-off-by: Manan Gupta --- go/vt/key/key.go | 8 + go/vt/topo/shard_test.go | 8 + go/vt/vtorc/logic/keyspace_shard_discovery.go | 26 +-- .../logic/keyspace_shard_discovery_test.go | 3 +- go/vt/vtorc/logic/tablet_discovery.go | 84 +++++---- go/vt/vtorc/logic/tablet_discovery_test.go | 171 +++++++++++++++--- go/vt/vtorc/logic/vtorc.go | 6 - 7 files changed, 217 insertions(+), 89 deletions(-) diff --git a/go/vt/key/key.go b/go/vt/key/key.go index 89d956bd433..d951d123f0b 100644 --- a/go/vt/key/key.go +++ b/go/vt/key/key.go @@ -95,6 +95,14 @@ func NewKeyRange(start []byte, end []byte) *topodatapb.KeyRange { return &topodatapb.KeyRange{Start: start, End: end} } +// NewCompleteKeyRange returns a complete key range. +func NewCompleteKeyRange() *topodatapb.KeyRange { + return &topodatapb.KeyRange{ + Start: []byte{}, + End: []byte{}, + } +} + // KeyRangeAdd adds two adjacent KeyRange values (in any order) into a single value. If the values are not adjacent, // it returns false. func KeyRangeAdd(a, b *topodatapb.KeyRange) (*topodatapb.KeyRange, bool) { diff --git a/go/vt/topo/shard_test.go b/go/vt/topo/shard_test.go index 6bd4aae5b62..915bcd18e3c 100644 --- a/go/vt/topo/shard_test.go +++ b/go/vt/topo/shard_test.go @@ -323,6 +323,14 @@ func TestValidateShardName(t *testing.T) { }, valid: true, }, + { + name: "-", + expectedRange: &topodatapb.KeyRange{ + Start: []byte{}, + End: []byte{}, + }, + valid: true, + }, { name: "40-80", expectedRange: &topodatapb.KeyRange{ diff --git a/go/vt/vtorc/logic/keyspace_shard_discovery.go b/go/vt/vtorc/logic/keyspace_shard_discovery.go index 0dd17cb65fd..8115e614418 100644 --- a/go/vt/vtorc/logic/keyspace_shard_discovery.go +++ b/go/vt/vtorc/logic/keyspace_shard_discovery.go @@ -18,10 +18,10 @@ package logic import ( "context" - "sort" - "strings" "sync" + "golang.org/x/exp/maps" + "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/topo" @@ -31,7 +31,7 @@ import ( // RefreshAllKeyspacesAndShards reloads the keyspace and shard information for the keyspaces that vtorc is concerned with. func RefreshAllKeyspacesAndShards(ctx context.Context) error { var keyspaces []string - if len(clustersToWatch) == 0 { // all known keyspaces + if len(shardsToWatch) == 0 { // all known keyspaces ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer cancel() var err error @@ -41,26 +41,10 @@ func RefreshAllKeyspacesAndShards(ctx context.Context) error { return err } } else { - // Parse input and build list of keyspaces - for _, ks := range clustersToWatch { - if strings.Contains(ks, "/") { - // This is a keyspace/shard specification - input := strings.Split(ks, "/") - keyspaces = append(keyspaces, input[0]) - } else { - // Assume this is a keyspace - keyspaces = append(keyspaces, ks) - } - } - if len(keyspaces) == 0 { - log.Errorf("Found no keyspaces for input: %+v", clustersToWatch) - return nil - } + // Get keyspaces to watch from the list of known keyspaces. + keyspaces = maps.Keys(shardsToWatch) } - // Sort the list of keyspaces. - // The list can have duplicates because the input to clusters to watch may have multiple shards of the same keyspace - sort.Strings(keyspaces) refreshCtx, refreshCancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) defer refreshCancel() var wg sync.WaitGroup diff --git a/go/vt/vtorc/logic/keyspace_shard_discovery_test.go b/go/vt/vtorc/logic/keyspace_shard_discovery_test.go index 8218af45db6..5d21bfc4d04 100644 --- a/go/vt/vtorc/logic/keyspace_shard_discovery_test.go +++ b/go/vt/vtorc/logic/keyspace_shard_discovery_test.go @@ -93,6 +93,7 @@ func TestRefreshAllKeyspaces(t *testing.T) { // Set clusters to watch to only watch ks1 and ks3 onlyKs1and3 := []string{"ks1/-80", "ks3/-80", "ks3/80-"} clustersToWatch = onlyKs1and3 + initializeShardsToWatch() require.NoError(t, RefreshAllKeyspacesAndShards(context.Background())) // Verify that we only have ks1 and ks3 in vtorc's db. @@ -106,6 +107,7 @@ func TestRefreshAllKeyspaces(t *testing.T) { // Set clusters to watch to watch all keyspaces clustersToWatch = nil + initializeShardsToWatch() // Change the durability policy of ks1 reparenttestutil.SetKeyspaceDurability(ctx, t, ts, "ks1", policy.DurabilitySemiSync) require.NoError(t, RefreshAllKeyspacesAndShards(context.Background())) @@ -119,7 +121,6 @@ func TestRefreshAllKeyspaces(t *testing.T) { verifyPrimaryAlias(t, "ks3", "80-", "zone_ks3-0000000101", "") verifyKeyspaceInfo(t, "ks4", keyspaceDurabilityTest, "") verifyPrimaryAlias(t, "ks4", "80-", "zone_ks4-0000000101", "") - } func TestRefreshKeyspace(t *testing.T) { diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index eb10bb2a667..defc2aa227e 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -32,6 +32,7 @@ import ( "google.golang.org/protobuf/proto" "vitess.io/vitess/go/vt/external/golib/sqlutils" + "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" @@ -48,8 +49,10 @@ var ( clustersToWatch []string shutdownWaitTime = 30 * time.Second shardsLockCounter int32 - shardsToWatch map[string][]string - shardsToWatchMu sync.Mutex + // shardsToWatch is a map storing the shards for a given keyspace that need to be watched. + // We store the key range for all the shards that we want to watch. + // This is populated by parsing `--clusters_to_watch` flag. + shardsToWatch map[string][]*topodatapb.KeyRange // ErrNoPrimaryTablet is a fixed error message. ErrNoPrimaryTablet = errors.New("no primary tablet found") @@ -61,14 +64,14 @@ func RegisterFlags(fs *pflag.FlagSet) { fs.DurationVar(&shutdownWaitTime, "shutdown_wait_time", shutdownWaitTime, "Maximum time to wait for VTOrc to release all the locks that it is holding before shutting down on SIGTERM") } -// updateShardsToWatch parses the --clusters_to_watch flag-value +// initializeShardsToWatch parses the --clusters_to_watch flag-value // into a map of keyspace/shards. -func updateShardsToWatch() { +func initializeShardsToWatch() { + shardsToWatch = make(map[string][]*topodatapb.KeyRange) if len(clustersToWatch) == 0 { return } - newShardsToWatch := make(map[string][]string, 0) for _, ks := range clustersToWatch { if strings.Contains(ks, "/") && !strings.HasSuffix(ks, "/") { // Validate keyspace/shard parses. @@ -77,34 +80,51 @@ func updateShardsToWatch() { log.Errorf("Could not parse keyspace/shard %q: %+v", ks, err) continue } - newShardsToWatch[k] = append(newShardsToWatch[k], s) - } else { - ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) - defer cancel() - // Assume this is a keyspace and find all shards in keyspace. - // Remove trailing slash if exists. - ks = strings.TrimSuffix(ks, "/") - shards, err := ts.GetShardNames(ctx, ks) + // Parse the shard name into key range value. + _, keyRange, err := topo.ValidateShardName(s) if err != nil { - // Log the err and continue. - log.Errorf("Error fetching shards for keyspace: %v", ks) - continue + log.Errorf("Could not parse shard name %q: %+v", s, err) } - if len(shards) == 0 { - log.Errorf("Topo has no shards for ks: %v", ks) - continue + // If the key range is nil, then the user is not using RangeBased Sharding. + // So we want to watch all the shards of the keyspace. + if keyRange == nil { + keyRange = key.NewCompleteKeyRange() } - newShardsToWatch[ks] = shards + shardsToWatch[k] = append(shardsToWatch[k], keyRange) + } else { + // Remove trailing slash if exists. + ks = strings.TrimSuffix(ks, "/") + // We store the entire range of key range if nothing is specified. + shardsToWatch[ks] = []*topodatapb.KeyRange{key.NewCompleteKeyRange()} } } - if len(newShardsToWatch) == 0 { - log.Error("No keyspace/shards to watch") - return + + if len(shardsToWatch) == 0 { + log.Error("No keyspace/shards to watch, watching all keyspaces") } +} - shardsToWatchMu.Lock() - defer shardsToWatchMu.Unlock() - shardsToWatch = newShardsToWatch +// tabletPartOfWatch checks if the given tablet is part of the watch list. +func tabletPartOfWatch(tablet *topodatapb.Tablet) bool { + // If we are watching all keyspaces, then we want to watch this tablet too. + if len(shardsToWatch) == 0 { + return true + } + shardRanges, ok := shardsToWatch[tablet.GetKeyspace()] + // If we don't have the keyspace in our map, then this tablet + // doesn't need to be watched. + if !ok { + return false + } + // Get the tablet's key range, and check if + // it is part of the shard ranges we are watching. + kr := tablet.GetKeyRange() + for _, shardRange := range shardRanges { + if key.KeyRangeContainsKeyRange(shardRange, kr) { + return true + } + } + return false } // OpenTabletDiscovery opens the vitess topo if enables and returns a ticker @@ -117,7 +137,7 @@ func OpenTabletDiscovery() <-chan time.Time { log.Error(err) } // Parse --clusters_to_watch into a filter. - updateShardsToWatch() + initializeShardsToWatch() // We refresh all information from the topo once before we start the ticks to do // it on a timer. ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) @@ -179,16 +199,10 @@ func refreshTabletsUsing(ctx context.Context, loader func(tabletAlias string), f // Filter tablets that should not be watched using shardsToWatch map. matchedTablets := make([]*topo.TabletInfo, 0, len(tablets)) func() { - shardsToWatchMu.Lock() - defer shardsToWatchMu.Unlock() for _, t := range tablets { - if len(shardsToWatch) > 0 { - _, ok := shardsToWatch[t.Tablet.Keyspace] - if !ok || !slices.Contains(shardsToWatch[t.Tablet.Keyspace], t.Tablet.Shard) { - continue // filter - } + if tabletPartOfWatch(t.Tablet) { + matchedTablets = append(matchedTablets, t) } - matchedTablets = append(matchedTablets, t) } }() diff --git a/go/vt/vtorc/logic/tablet_discovery_test.go b/go/vt/vtorc/logic/tablet_discovery_test.go index 54284e8a017..09e551d9d31 100644 --- a/go/vt/vtorc/logic/tablet_discovery_test.go +++ b/go/vt/vtorc/logic/tablet_discovery_test.go @@ -30,6 +30,7 @@ import ( "google.golang.org/protobuf/proto" "vitess.io/vitess/go/vt/external/golib/sqlutils" + "vitess.io/vitess/go/vt/key" topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/proto/vttime" "vitess.io/vitess/go/vt/topo" @@ -102,60 +103,178 @@ var ( } ) -func TestUpdateShardsToWatch(t *testing.T) { +func TestTabletsPartOfWatch(t *testing.T) { oldClustersToWatch := clustersToWatch - oldTs := ts defer func() { clustersToWatch = oldClustersToWatch shardsToWatch = nil - ts = oldTs }() - // Create a memory topo-server and create the keyspace and shard records - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() + testCases := []struct { + in []string + tablet *topodatapb.Tablet + expectedPartOfWatch bool + }{ + { + in: []string{}, + tablet: &topodatapb.Tablet{ + Keyspace: keyspace, + Shard: shard, + }, + expectedPartOfWatch: true, + }, + { + in: []string{keyspace}, + tablet: &topodatapb.Tablet{ + Keyspace: keyspace, + Shard: shard, + }, + expectedPartOfWatch: true, + }, + { + in: []string{keyspace + "/-"}, + tablet: &topodatapb.Tablet{ + Keyspace: keyspace, + Shard: shard, + }, + expectedPartOfWatch: true, + }, + { + in: []string{keyspace + "/" + shard}, + tablet: &topodatapb.Tablet{ + Keyspace: keyspace, + Shard: shard, + }, + expectedPartOfWatch: true, + }, + { + in: []string{"ks/-70", "ks/70-"}, + tablet: &topodatapb.Tablet{ + Keyspace: "ks", + KeyRange: key.NewKeyRange([]byte{0x50}, []byte{0x70}), + }, + expectedPartOfWatch: true, + }, + { + in: []string{"ks/-70", "ks/70-"}, + tablet: &topodatapb.Tablet{ + Keyspace: "ks", + KeyRange: key.NewKeyRange([]byte{0x40}, []byte{0x50}), + }, + expectedPartOfWatch: true, + }, + { + in: []string{"ks/-70", "ks/70-"}, + tablet: &topodatapb.Tablet{ + Keyspace: "ks", + KeyRange: key.NewKeyRange([]byte{0x70}, []byte{0x90}), + }, + expectedPartOfWatch: true, + }, + { + in: []string{"ks/-70", "ks/70-"}, + tablet: &topodatapb.Tablet{ + Keyspace: "ks", + KeyRange: key.NewKeyRange([]byte{0x60}, []byte{0x90}), + }, + expectedPartOfWatch: false, + }, + { + in: []string{"ks2/-70", "ks2/70-", "unknownKs/-", "ks/-80"}, + tablet: &topodatapb.Tablet{ + Keyspace: "ks", + KeyRange: key.NewKeyRange([]byte{0x60}, []byte{0x80}), + }, + expectedPartOfWatch: true, + }, + { + in: []string{"ks2/-70", "ks2/70-", "unknownKs/-", "ks/-80"}, + tablet: &topodatapb.Tablet{ + Keyspace: "ks", + KeyRange: key.NewKeyRange([]byte{0x80}, []byte{0x90}), + }, + expectedPartOfWatch: false, + }, + { + in: []string{"ks2/-70", "ks2/70-", "unknownKs/-", "ks/-80"}, + tablet: &topodatapb.Tablet{ + Keyspace: "ks", + KeyRange: key.NewKeyRange([]byte{0x90}, []byte{0xa0}), + }, + expectedPartOfWatch: false, + }, + } - ts = memorytopo.NewServer(ctx, cell1) - _, err := ts.GetOrCreateShard(context.Background(), keyspace, shard) - require.NoError(t, err) + for _, tt := range testCases { + t.Run(fmt.Sprintf("%v-Tablet-%v-%v", strings.Join(tt.in, ","), tt.tablet.GetKeyspace(), tt.tablet.GetShard()), func(t *testing.T) { + clustersToWatch = tt.in + initializeShardsToWatch() + assert.Equal(t, tt.expectedPartOfWatch, tabletPartOfWatch(tt.tablet)) + }) + } +} + +// TestInitializeShardsToWatch tests that we initialize the shardsToWatch map correctly +// using the `--clusters_to_watch` flag. +func TestInitializeShardsToWatch(t *testing.T) { + oldClustersToWatch := clustersToWatch + defer func() { + clustersToWatch = oldClustersToWatch + shardsToWatch = nil + }() testCases := []struct { in []string - expected map[string][]string + expected map[string][]*topodatapb.KeyRange }{ { in: []string{}, - expected: nil, + expected: map[string][]*topodatapb.KeyRange{}, }, { - in: []string{""}, - expected: map[string][]string{}, + in: []string{"unknownKs"}, + expected: map[string][]*topodatapb.KeyRange{ + "unknownKs": { + key.NewCompleteKeyRange(), + }, + }, }, { in: []string{"test/-"}, - expected: map[string][]string{ - "test": {"-"}, + expected: map[string][]*topodatapb.KeyRange{ + "test": { + key.NewCompleteKeyRange(), + }, }, }, { in: []string{"test/-", "test2/-80", "test2/80-"}, - expected: map[string][]string{ - "test": {"-"}, - "test2": {"-80", "80-"}, + expected: map[string][]*topodatapb.KeyRange{ + "test": { + key.NewCompleteKeyRange(), + }, + "test2": { + key.NewKeyRange([]byte{}, []byte{0x80}), + key.NewKeyRange([]byte{0x80}, []byte{}), + }, }, }, { - // confirm shards fetch from topo + // known keyspace in: []string{keyspace}, - expected: map[string][]string{ - keyspace: {shard}, + expected: map[string][]*topodatapb.KeyRange{ + keyspace: { + key.NewCompleteKeyRange(), + }, }, }, { - // confirm shards fetch from topo when keyspace has trailing-slash + // keyspace with trailing-slash in: []string{keyspace + "/"}, - expected: map[string][]string{ - keyspace: {shard}, + expected: map[string][]*topodatapb.KeyRange{ + keyspace: { + key.NewCompleteKeyRange(), + }, }, }, } @@ -163,10 +282,10 @@ func TestUpdateShardsToWatch(t *testing.T) { for _, testCase := range testCases { t.Run(strings.Join(testCase.in, ","), func(t *testing.T) { defer func() { - shardsToWatch = make(map[string][]string, 0) + shardsToWatch = make(map[string][]*topodatapb.KeyRange, 0) }() clustersToWatch = testCase.in - updateShardsToWatch() + initializeShardsToWatch() require.Equal(t, testCase.expected, shardsToWatch) }) } diff --git a/go/vt/vtorc/logic/vtorc.go b/go/vt/vtorc/logic/vtorc.go index 1fde6e31c0d..5ac5af50d47 100644 --- a/go/vt/vtorc/logic/vtorc.go +++ b/go/vt/vtorc/logic/vtorc.go @@ -326,12 +326,6 @@ func refreshAllInformation(ctx context.Context) error { return RefreshAllKeyspacesAndShards(ctx) }) - // Refresh shards to watch. - eg.Go(func() error { - updateShardsToWatch() - return nil - }) - // Refresh all tablets. eg.Go(func() error { return refreshAllTablets(ctx) From a1b000b608f562176cc11858a647d54b64d7551d Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Wed, 22 Jan 2025 09:48:46 +0530 Subject: [PATCH 2/6] feat: minor refactor and summary changes Signed-off-by: Manan Gupta --- changelog/22.0/22.0.0/summary.md | 6 ++++++ go/flags/endtoend/vtorc.txt | 2 +- go/vt/vtorc/logic/tablet_discovery.go | 2 +- go/vt/vtorc/logic/tablet_discovery_test.go | 8 ++++++++ 4 files changed, 16 insertions(+), 2 deletions(-) diff --git a/changelog/22.0/22.0.0/summary.md b/changelog/22.0/22.0.0/summary.md index 2fb66ea8969..764a68dd7a0 100644 --- a/changelog/22.0/22.0.0/summary.md +++ b/changelog/22.0/22.0.0/summary.md @@ -13,6 +13,7 @@ - **[Support for LAST_INSERT_ID(x)](#last-insert-id)** - **[Support for Maximum Idle Connections in the Pool](#max-idle-connections)** - **[Stalled Disk Recovery in VTOrc](#stall-disk-recovery)** + - **[KeyRanges in `--clusters_to_watch` in VTOrc](#key-range-vtorc)** - **[Minor Changes](#minor-changes)** - **[VTTablet Flags](#flags-vttablet)** - **[Topology read concurrency behaviour changes](#topo-read-concurrency-changes)** @@ -106,6 +107,11 @@ VTOrc can now identify and recover from stalled disk errors. VTTablets test whet To opt into this feature, `--enable-primary-disk-stalled-recovery` flag has to be specified on VTOrc, and `--disk-write-dir` flag has to be specified on the vttablets. `--disk-write-interval` and `--disk-write-timeout` flags can be used to configure the polling interval and timeout respectively. +### KeyRanges in `--clusters_to_watch` in VTOrc +VTOrc now supports specifying KeyRanges in the `--clusters_to_watch` flag. This is useful in scenarios where you don't need to restart a VTOrc instance if you run a reshard. +For example, if a VTOrc is configured to watch `ks/-80`, then it would watch all the shards that fall under the KeyRange `-80`. If a reshard is run and, `-80` is split into new shards `-40`, and `40-80`, the VTOrc instance will automatically start watching the new shard without needing a restart. +The users can still continue to specify exact key ranges too, and the new feature is backward compatible. + ## Minor Changes #### VTTablet Flags diff --git a/go/flags/endtoend/vtorc.txt b/go/flags/endtoend/vtorc.txt index ca8083709e5..c9d77be6082 100644 --- a/go/flags/endtoend/vtorc.txt +++ b/go/flags/endtoend/vtorc.txt @@ -24,7 +24,7 @@ Flags: --bind-address string Bind address for the server. If empty, the server will listen on all available unicast and anycast IP addresses of the local system. --catch-sigpipe catch and ignore SIGPIPE on stdout and stderr if specified --change-tablets-with-errant-gtid-to-drained Whether VTOrc should be changing the type of tablets with errant GTIDs to DRAINED - --clusters_to_watch strings Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: "ks1,ks2/-80" + --clusters_to_watch strings Comma-separated list of keyspaces or keyspace/key-ranges that this instance will monitor and repair. Defaults to all clusters in the topology. Example: "ks1,ks2/-80" --config-file string Full path of the config file (with extension) to use. If set, --config-path, --config-type, and --config-name are ignored. --config-file-not-found-handling ConfigFileNotFoundHandling Behavior when a config file is not found. (Options: error, exit, ignore, warn) (default warn) --config-name string Name of the config file (without extension) to search for. (default "vtconfig") diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index defc2aa227e..3dfec344654 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -60,7 +60,7 @@ var ( // RegisterFlags registers the flags required by VTOrc func RegisterFlags(fs *pflag.FlagSet) { - fs.StringSliceVar(&clustersToWatch, "clusters_to_watch", clustersToWatch, "Comma-separated list of keyspaces or keyspace/shards that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"") + fs.StringSliceVar(&clustersToWatch, "clusters_to_watch", clustersToWatch, "Comma-separated list of keyspaces or keyspace/key-ranges that this instance will monitor and repair. Defaults to all clusters in the topology. Example: \"ks1,ks2/-80\"") fs.DurationVar(&shutdownWaitTime, "shutdown_wait_time", shutdownWaitTime, "Maximum time to wait for VTOrc to release all the locks that it is holding before shutting down on SIGTERM") } diff --git a/go/vt/vtorc/logic/tablet_discovery_test.go b/go/vt/vtorc/logic/tablet_discovery_test.go index 09e551d9d31..b528624a746 100644 --- a/go/vt/vtorc/logic/tablet_discovery_test.go +++ b/go/vt/vtorc/logic/tablet_discovery_test.go @@ -179,6 +179,14 @@ func TestTabletsPartOfWatch(t *testing.T) { }, expectedPartOfWatch: false, }, + { + in: []string{"ks/50-70"}, + tablet: &topodatapb.Tablet{ + Keyspace: "ks", + KeyRange: key.NewKeyRange([]byte{0x50}, []byte{0x70}), + }, + expectedPartOfWatch: true, + }, { in: []string{"ks2/-70", "ks2/70-", "unknownKs/-", "ks/-80"}, tablet: &topodatapb.Tablet{ From 8c12c59bf974b0c561006f18e2049d05fc793d35 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Fri, 24 Jan 2025 15:49:26 +0530 Subject: [PATCH 3/6] feat: add summary changes Signed-off-by: Manan Gupta --- changelog/22.0/22.0.0/summary.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changelog/22.0/22.0.0/summary.md b/changelog/22.0/22.0.0/summary.md index c6c1eb0148e..b3916b8f0e8 100644 --- a/changelog/22.0/22.0.0/summary.md +++ b/changelog/22.0/22.0.0/summary.md @@ -136,7 +136,7 @@ The base system now uses Debian Bookworm instead of Debian Bullseye for the `vit ### KeyRanges in `--clusters_to_watch` in VTOrc VTOrc now supports specifying KeyRanges in the `--clusters_to_watch` flag. This is useful in scenarios where you don't need to restart a VTOrc instance if you run a reshard. -For example, if a VTOrc is configured to watch `ks/-80`, then it would watch all the shards that fall under the KeyRange `-80`. If a reshard is run and, `-80` is split into new shards `-40`, and `40-80`, the VTOrc instance will automatically start watching the new shard without needing a restart. +For example, if a VTOrc is configured to watch `ks/-80`, then it would watch all the shards that fall under the KeyRange `-80`. If a reshard is run and, `-80` is split into new shards `-40`, and `40-80`, the VTOrc instance will automatically start watching the new shard without needing a restart. In the previous logic, watching a shard -80 would watch 1 (or 0) shard only. In the new system, since we interpret -80 as a key range, it can lead to a watch on multiple shards as described in the example. The users can still continue to specify exact key ranges too, and the new feature is backward compatible. ### Support for Filtering Query logs on Error From c70744483914c86b44082e6873059b0615fec126 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Fri, 24 Jan 2025 19:15:49 +0530 Subject: [PATCH 4/6] feat: fix parsing of keyranges and return an error if invalid Signed-off-by: Manan Gupta --- .../logic/keyspace_shard_discovery_test.go | 6 +++-- go/vt/vtorc/logic/tablet_discovery.go | 15 ++++++++--- go/vt/vtorc/logic/tablet_discovery_test.go | 27 ++++++++++++++++--- 3 files changed, 38 insertions(+), 10 deletions(-) diff --git a/go/vt/vtorc/logic/keyspace_shard_discovery_test.go b/go/vt/vtorc/logic/keyspace_shard_discovery_test.go index 5d21bfc4d04..f05295416d0 100644 --- a/go/vt/vtorc/logic/keyspace_shard_discovery_test.go +++ b/go/vt/vtorc/logic/keyspace_shard_discovery_test.go @@ -93,7 +93,8 @@ func TestRefreshAllKeyspaces(t *testing.T) { // Set clusters to watch to only watch ks1 and ks3 onlyKs1and3 := []string{"ks1/-80", "ks3/-80", "ks3/80-"} clustersToWatch = onlyKs1and3 - initializeShardsToWatch() + err := initializeShardsToWatch() + require.NoError(t, err) require.NoError(t, RefreshAllKeyspacesAndShards(context.Background())) // Verify that we only have ks1 and ks3 in vtorc's db. @@ -107,7 +108,8 @@ func TestRefreshAllKeyspaces(t *testing.T) { // Set clusters to watch to watch all keyspaces clustersToWatch = nil - initializeShardsToWatch() + err = initializeShardsToWatch() + require.NoError(t, err) // Change the durability policy of ks1 reparenttestutil.SetKeyspaceDurability(ctx, t, ts, "ks1", policy.DurabilitySemiSync) require.NoError(t, RefreshAllKeyspacesAndShards(context.Background())) diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index 3dfec344654..5b3bf246e50 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -66,10 +66,10 @@ func RegisterFlags(fs *pflag.FlagSet) { // initializeShardsToWatch parses the --clusters_to_watch flag-value // into a map of keyspace/shards. -func initializeShardsToWatch() { +func initializeShardsToWatch() error { shardsToWatch = make(map[string][]*topodatapb.KeyRange) if len(clustersToWatch) == 0 { - return + return nil } for _, ks := range clustersToWatch { @@ -80,10 +80,13 @@ func initializeShardsToWatch() { log.Errorf("Could not parse keyspace/shard %q: %+v", ks, err) continue } + if !key.IsValidKeyRange(s) { + return fmt.Errorf("Invalid key range %q while parsing clusters to watch", s) + } // Parse the shard name into key range value. _, keyRange, err := topo.ValidateShardName(s) if err != nil { - log.Errorf("Could not parse shard name %q: %+v", s, err) + return fmt.Errorf("Could not parse shard name %q: %+v", s, err) } // If the key range is nil, then the user is not using RangeBased Sharding. // So we want to watch all the shards of the keyspace. @@ -102,6 +105,7 @@ func initializeShardsToWatch() { if len(shardsToWatch) == 0 { log.Error("No keyspace/shards to watch, watching all keyspaces") } + return nil } // tabletPartOfWatch checks if the given tablet is part of the watch list. @@ -137,7 +141,10 @@ func OpenTabletDiscovery() <-chan time.Time { log.Error(err) } // Parse --clusters_to_watch into a filter. - initializeShardsToWatch() + err := initializeShardsToWatch() + if err != nil { + log.Fatalf("Error parsing clusters to watch: %v", err) + } // We refresh all information from the topo once before we start the ticks to do // it on a timer. ctx, cancel := context.WithTimeout(context.Background(), topo.RemoteOperationTimeout) diff --git a/go/vt/vtorc/logic/tablet_discovery_test.go b/go/vt/vtorc/logic/tablet_discovery_test.go index b528624a746..ce01e8f9c04 100644 --- a/go/vt/vtorc/logic/tablet_discovery_test.go +++ b/go/vt/vtorc/logic/tablet_discovery_test.go @@ -216,7 +216,8 @@ func TestTabletsPartOfWatch(t *testing.T) { for _, tt := range testCases { t.Run(fmt.Sprintf("%v-Tablet-%v-%v", strings.Join(tt.in, ","), tt.tablet.GetKeyspace(), tt.tablet.GetShard()), func(t *testing.T) { clustersToWatch = tt.in - initializeShardsToWatch() + err := initializeShardsToWatch() + require.NoError(t, err) assert.Equal(t, tt.expectedPartOfWatch, tabletPartOfWatch(tt.tablet)) }) } @@ -232,8 +233,9 @@ func TestInitializeShardsToWatch(t *testing.T) { }() testCases := []struct { - in []string - expected map[string][]*topodatapb.KeyRange + in []string + expected map[string][]*topodatapb.KeyRange + expectedErr string }{ { in: []string{}, @@ -255,6 +257,18 @@ func TestInitializeShardsToWatch(t *testing.T) { }, }, }, + { + in: []string{"test/324"}, + expectedErr: `Invalid key range "324" while parsing clusters to watch`, + }, + { + in: []string{"test/0"}, + expected: map[string][]*topodatapb.KeyRange{ + "test": { + key.NewCompleteKeyRange(), + }, + }, + }, { in: []string{"test/-", "test2/-80", "test2/80-"}, expected: map[string][]*topodatapb.KeyRange{ @@ -293,7 +307,12 @@ func TestInitializeShardsToWatch(t *testing.T) { shardsToWatch = make(map[string][]*topodatapb.KeyRange, 0) }() clustersToWatch = testCase.in - initializeShardsToWatch() + err := initializeShardsToWatch() + if testCase.expectedErr != "" { + require.EqualError(t, err, testCase.expectedErr) + return + } + require.NoError(t, err) require.Equal(t, testCase.expected, shardsToWatch) }) } From b51bcc76a3ba9513475f339f3b330af0c4b8d8fc Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Fri, 24 Jan 2025 19:20:27 +0530 Subject: [PATCH 5/6] feat: use simpler code Signed-off-by: Manan Gupta --- go/vt/key/key.go | 4 ++-- go/vt/vtorc/logic/tablet_discovery.go | 9 ++------- go/vt/vtorc/logic/tablet_discovery_test.go | 4 ++-- 3 files changed, 6 insertions(+), 11 deletions(-) diff --git a/go/vt/key/key.go b/go/vt/key/key.go index d951d123f0b..82852daa16e 100644 --- a/go/vt/key/key.go +++ b/go/vt/key/key.go @@ -98,8 +98,8 @@ func NewKeyRange(start []byte, end []byte) *topodatapb.KeyRange { // NewCompleteKeyRange returns a complete key range. func NewCompleteKeyRange() *topodatapb.KeyRange { return &topodatapb.KeyRange{ - Start: []byte{}, - End: []byte{}, + Start: nil, + End: nil, } } diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index 5b3bf246e50..5fca6de542d 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -84,16 +84,11 @@ func initializeShardsToWatch() error { return fmt.Errorf("Invalid key range %q while parsing clusters to watch", s) } // Parse the shard name into key range value. - _, keyRange, err := topo.ValidateShardName(s) + keyRanges, err := key.ParseShardingSpec(s) if err != nil { return fmt.Errorf("Could not parse shard name %q: %+v", s, err) } - // If the key range is nil, then the user is not using RangeBased Sharding. - // So we want to watch all the shards of the keyspace. - if keyRange == nil { - keyRange = key.NewCompleteKeyRange() - } - shardsToWatch[k] = append(shardsToWatch[k], keyRange) + shardsToWatch[k] = append(shardsToWatch[k], keyRanges...) } else { // Remove trailing slash if exists. ks = strings.TrimSuffix(ks, "/") diff --git a/go/vt/vtorc/logic/tablet_discovery_test.go b/go/vt/vtorc/logic/tablet_discovery_test.go index ce01e8f9c04..eaa1eb4389d 100644 --- a/go/vt/vtorc/logic/tablet_discovery_test.go +++ b/go/vt/vtorc/logic/tablet_discovery_test.go @@ -276,8 +276,8 @@ func TestInitializeShardsToWatch(t *testing.T) { key.NewCompleteKeyRange(), }, "test2": { - key.NewKeyRange([]byte{}, []byte{0x80}), - key.NewKeyRange([]byte{0x80}, []byte{}), + key.NewKeyRange(nil, []byte{0x80}), + key.NewKeyRange([]byte{0x80}, nil), }, }, }, From c4050340e0f85c78f6512c751087e6b952277298 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Mon, 27 Jan 2025 15:53:24 +0530 Subject: [PATCH 6/6] feat: address review comments Signed-off-by: Manan Gupta --- go/vt/vtorc/logic/tablet_discovery.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/go/vt/vtorc/logic/tablet_discovery.go b/go/vt/vtorc/logic/tablet_discovery.go index 5fca6de542d..9351b7d8549 100644 --- a/go/vt/vtorc/logic/tablet_discovery.go +++ b/go/vt/vtorc/logic/tablet_discovery.go @@ -81,12 +81,12 @@ func initializeShardsToWatch() error { continue } if !key.IsValidKeyRange(s) { - return fmt.Errorf("Invalid key range %q while parsing clusters to watch", s) + return fmt.Errorf("invalid key range %q while parsing clusters to watch", s) } // Parse the shard name into key range value. keyRanges, err := key.ParseShardingSpec(s) if err != nil { - return fmt.Errorf("Could not parse shard name %q: %+v", s, err) + return fmt.Errorf("could not parse shard name %q: %+v", s, err) } shardsToWatch[k] = append(shardsToWatch[k], keyRanges...) } else { @@ -138,7 +138,7 @@ func OpenTabletDiscovery() <-chan time.Time { // Parse --clusters_to_watch into a filter. err := initializeShardsToWatch() if err != nil { - log.Fatalf("Error parsing clusters to watch: %v", err) + log.Fatalf("Error parsing --clusters-to-watch: %v", err) } // We refresh all information from the topo once before we start the ticks to do // it on a timer.