Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-19.0] VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616) #17642

Merged
merged 2 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,7 +886,7 @@ func (vc *VitessCluster) getVttabletsInKeyspace(t *testing.T, cell *Cell, ksName
tablets := make(map[string]*cluster.VttabletProcess)
for _, shard := range keyspace.Shards {
for _, tablet := range shard.Tablets {
if tablet.Vttablet.GetTabletStatus() == "SERVING" {
if tablet.Vttablet.GetTabletStatus() == "SERVING" && (tabletType == "" || strings.EqualFold(tablet.Vttablet.GetTabletType(), tabletType)) {
log.Infof("Serving status of tablet %s is %s, %s", tablet.Name, tablet.Vttablet.ServingStatus, tablet.Vttablet.GetTabletStatus())
tablets[tablet.Name] = tablet.Vttablet
}
Expand Down
148 changes: 147 additions & 1 deletion go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,11 +762,16 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl
switchReads(t, workflowType, cellNames, ksWorkflow, false)
assertQueryExecutesOnTablet(t, vtgateConn, productTab, "customer", query, query)

switchWritesDryRun(t, workflowType, ksWorkflow, dryRunResultsSwitchWritesCustomerShard)

testSwitchWritesErrorHandling(t, []*cluster.VttabletProcess{productTab}, []*cluster.VttabletProcess{customerTab1, customerTab2},
workflow, workflowType)

var commit func(t *testing.T)
if withOpenTx {
commit, _ = vc.startQuery(t, openTxQuery)
}
switchWritesDryRun(t, workflowType, ksWorkflow, dryRunResultsSwitchWritesCustomerShard)
// Now let's confirm that it works as expected with an error.
switchWrites(t, workflowType, ksWorkflow, false)

checkThatVDiffFails(t, targetKs, workflow)
Expand Down Expand Up @@ -998,6 +1003,7 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou
require.NoError(t, vc.AddShards(t, cells, keyspace, targetShards, defaultReplicas, defaultRdonly, tabletIDBase, targetKsOpts))

tablets := vc.getVttabletsInKeyspace(t, defaultCell, ksName, "primary")
var sourceTablets, targetTablets []*cluster.VttabletProcess

// Test multi-primary setups, like a Galera cluster, which have auto increment steps > 1.
for _, tablet := range tablets {
Expand All @@ -1010,9 +1016,11 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou
targetShards = "," + targetShards + ","
for _, tab := range tablets {
if strings.Contains(targetShards, ","+tab.Shard+",") {
targetTablets = append(targetTablets, tab)
log.Infof("Waiting for vrepl to catch up on %s since it IS a target shard", tab.Shard)
catchup(t, tab, workflow, "Reshard")
} else {
sourceTablets = append(sourceTablets, tab)
log.Infof("Not waiting for vrepl to catch up on %s since it is NOT a target shard", tab.Shard)
continue
}
Expand All @@ -1026,6 +1034,10 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou
if dryRunResultSwitchWrites != nil {
reshardAction(t, "SwitchTraffic", workflow, ksName, "", "", callNames, "primary", "--dry-run")
}
if tableName == "customer" {
testSwitchWritesErrorHandling(t, sourceTablets, targetTablets, workflow, "reshard")
}
// Now let's confirm that it works as expected with an error.
reshardAction(t, "SwitchTraffic", workflow, ksName, "", "", callNames, "primary")
reshardAction(t, "Complete", workflow, ksName, "", "", "", "")
for tabletName, count := range counts {
Expand Down Expand Up @@ -1534,6 +1546,140 @@ func switchWritesDryRun(t *testing.T, workflowType, ksWorkflow string, dryRunRes
validateDryRunResults(t, output, dryRunResults)
}

// testSwitchWritesErrorHandling confirms that switching writes works as expected
// in the face of vreplication lag (canSwitch() precheck) and when canceling the
// switch due to replication failing to catch up in time.
// The workflow MUST be migrating the customer table from the source to the
// target keyspace AND the workflow must currently have reads switched but not
// writes.
func testSwitchWritesErrorHandling(t *testing.T, sourceTablets, targetTablets []*cluster.VttabletProcess, workflow, workflowType string) {
t.Run("validate switch writes error handling", func(t *testing.T) {
vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort)
defer vtgateConn.Close()
require.NotZero(t, len(sourceTablets), "no source tablets provided")
require.NotZero(t, len(targetTablets), "no target tablets provided")
sourceKs := sourceTablets[0].Keyspace
targetKs := targetTablets[0].Keyspace
ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow)
var err error
sourceConns := make([]*mysql.Conn, len(sourceTablets))
for i, tablet := range sourceTablets {
sourceConns[i], err = tablet.TabletConn(tablet.Keyspace, true)
require.NoError(t, err)
defer sourceConns[i].Close()
}
targetConns := make([]*mysql.Conn, len(targetTablets))
for i, tablet := range targetTablets {
targetConns[i], err = tablet.TabletConn(tablet.Keyspace, true)
require.NoError(t, err)
defer targetConns[i].Close()
}
startingTestRowID := 10000000
numTestRows := 100
addTestRows := func() {
for i := 0; i < numTestRows; i++ {
execVtgateQuery(t, vtgateConn, sourceTablets[0].Keyspace, fmt.Sprintf("insert into customer (cid, name) values (%d, 'laggingCustomer')",
startingTestRowID+i))
}
}
deleteTestRows := func() {
execVtgateQuery(t, vtgateConn, sourceTablets[0].Keyspace, fmt.Sprintf("delete from customer where cid >= %d", startingTestRowID))
}
addIndex := func() {
for _, targetConn := range targetConns {
execQuery(t, targetConn, "set session sql_mode=''")
execQuery(t, targetConn, "alter table customer add unique index name_idx (name)")
}
}
dropIndex := func() {
for _, targetConn := range targetConns {
execQuery(t, targetConn, "alter table customer drop index name_idx")
}
}
lockTargetTable := func() {
for _, targetConn := range targetConns {
execQuery(t, targetConn, "lock table customer read")
}
}
unlockTargetTable := func() {
for _, targetConn := range targetConns {
execQuery(t, targetConn, "unlock tables")
}
}
cleanupTestData := func() {
dropIndex()
deleteTestRows()
}
restartWorkflow := func() {
err = vc.VtctldClient.ExecuteCommand("workflow", "--keyspace", targetKs, "start", "--workflow", workflow)
require.NoError(t, err, "failed to start workflow: %v", err)
}
waitForTargetToCatchup := func() {
waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String())
waitForNoWorkflowLag(t, vc, targetKs, workflow)
}

// First let's test that the prechecks work as expected. We ALTER
// the table on the target shards to add a unique index on the name
// field.
addIndex()
// Then we replicate some test rows across the target shards by
// inserting them in the source keyspace.
addTestRows()
// Now the workflow should go into the error state and the lag should
// start to climb. So we sleep for twice the max lag duration that we
// will set for the SwitchTraffic call.
lagDuration := 3 * time.Second
time.Sleep(lagDuration * 3)
out, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs,
"SwitchTraffic", "--tablet-types=primary", "--timeout=30s", "--max-replication-lag-allowed", lagDuration.String())
// It should fail in the canSwitch() precheck.
require.Error(t, err)
require.Regexp(t, fmt.Sprintf(".*cannot switch traffic for workflow %s at this time: replication lag [0-9]+s is higher than allowed lag %s.*",
workflow, lagDuration.String()), out)
require.NotContains(t, out, "cancel migration failed")
// Confirm that queries still work fine.
execVtgateQuery(t, vtgateConn, sourceKs, "select * from customer limit 1")
cleanupTestData()
// We have to restart the workflow again as the duplicate key error
// is a permanent/terminal one.
restartWorkflow()
waitForTargetToCatchup()

// Now let's test that the cancel works by setting the command timeout
// to a fraction (6s) of the default max repl lag duration (30s). First
// we lock the customer table on the target tablets so that we cannot
// apply the INSERTs and catch up.
lockTargetTable()
addTestRows()
timeout := lagDuration * 2 // 6s
// Use the default max-replication-lag-allowed value of 30s.
// We run the command in a goroutine so that we can unblock things
// after the timeout is reached -- as the vplayer query is blocking
// on the table lock in the MySQL layer.
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
out, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs,
"SwitchTraffic", "--tablet-types=primary", "--timeout", timeout.String())
}()
time.Sleep(timeout)
// Now we can unblock things and let it continue.
unlockTargetTable()
wg.Wait()
// It should fail due to the command context timeout and we should
// successfully cancel.
require.Error(t, err)
require.Contains(t, out, "failed to sync up replication between the source and target")
require.NotContains(t, out, "cancel migration failed")
// Confirm that queries still work fine.
execVtgateQuery(t, vtgateConn, sourceKs, "select * from customer limit 1")
deleteTestRows()
waitForTargetToCatchup()
})
}

// restartWorkflow confirms that a workflow can be successfully
// stopped and started.
func restartWorkflow(t *testing.T, ksWorkflow string) {
Expand Down
Loading
Loading