From 756598d883af981278ebe5288b65c582165959fd Mon Sep 17 00:00:00 2001 From: "vitess-bot[bot]" <108069721+vitess-bot[bot]@users.noreply.github.com> Date: Tue, 28 Jan 2025 14:08:04 -0500 Subject: [PATCH 1/2] Cherry-pick 39a0ddde8f27431f890662d3de8c62fcda530f7e with conflicts --- go/test/endtoend/vreplication/cluster_test.go | 2 +- .../vreplication/vreplication_test.go | 211 ++++++ go/vt/vtctl/workflow/server.go | 65 +- go/vt/vtctl/workflow/stream_migrator.go | 14 +- go/vt/vtctl/workflow/switcher.go | 4 +- go/vt/vtctl/workflow/switcher_dry_run.go | 3 +- go/vt/vtctl/workflow/switcher_interface.go | 5 + go/vt/vtctl/workflow/traffic_switcher.go | 112 ++- go/vt/vtctl/workflow/workflows.go | 686 ++++++++++++++++++ 9 files changed, 1079 insertions(+), 23 deletions(-) create mode 100644 go/vt/vtctl/workflow/workflows.go diff --git a/go/test/endtoend/vreplication/cluster_test.go b/go/test/endtoend/vreplication/cluster_test.go index 7d22d063945..7f06dc87680 100644 --- a/go/test/endtoend/vreplication/cluster_test.go +++ b/go/test/endtoend/vreplication/cluster_test.go @@ -886,7 +886,7 @@ func (vc *VitessCluster) getVttabletsInKeyspace(t *testing.T, cell *Cell, ksName tablets := make(map[string]*cluster.VttabletProcess) for _, shard := range keyspace.Shards { for _, tablet := range shard.Tablets { - if tablet.Vttablet.GetTabletStatus() == "SERVING" { + if tablet.Vttablet.GetTabletStatus() == "SERVING" && (tabletType == "" || strings.EqualFold(tablet.Vttablet.GetTabletType(), tabletType)) { log.Infof("Serving status of tablet %s is %s, %s", tablet.Name, tablet.Vttablet.ServingStatus, tablet.Vttablet.GetTabletStatus()) tablets[tablet.Name] = tablet.Vttablet } diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 4e50ea12af3..e8f04280122 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -767,6 +767,19 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl commit, _ = vc.startQuery(t, openTxQuery) } switchWritesDryRun(t, workflowType, ksWorkflow, dryRunResultsSwitchWritesCustomerShard) +<<<<<<< HEAD +======= + shardNames := make([]string, 0, len(vc.Cells[defaultCell.Name].Keyspaces[sourceKs].Shards)) + for shardName := range maps.Keys(vc.Cells[defaultCell.Name].Keyspaces[sourceKs].Shards) { + shardNames = append(shardNames, shardName) + } + testSwitchTrafficPermissionChecks(t, workflowType, sourceKs, shardNames, targetKs, workflow) + + testSwitchWritesErrorHandling(t, []*cluster.VttabletProcess{productTab}, []*cluster.VttabletProcess{customerTab1, customerTab2}, + workflow, workflowType) + + // Now let's confirm that it works as expected with an error. +>>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616)) switchWrites(t, workflowType, ksWorkflow, false) checkThatVDiffFails(t, targetKs, workflow) @@ -998,6 +1011,7 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou require.NoError(t, vc.AddShards(t, cells, keyspace, targetShards, defaultReplicas, defaultRdonly, tabletIDBase, targetKsOpts)) tablets := vc.getVttabletsInKeyspace(t, defaultCell, ksName, "primary") + var sourceTablets, targetTablets []*cluster.VttabletProcess // Test multi-primary setups, like a Galera cluster, which have auto increment steps > 1. for _, tablet := range tablets { @@ -1010,9 +1024,11 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou targetShards = "," + targetShards + "," for _, tab := range tablets { if strings.Contains(targetShards, ","+tab.Shard+",") { + targetTablets = append(targetTablets, tab) log.Infof("Waiting for vrepl to catch up on %s since it IS a target shard", tab.Shard) catchup(t, tab, workflow, "Reshard") } else { + sourceTablets = append(sourceTablets, tab) log.Infof("Not waiting for vrepl to catch up on %s since it is NOT a target shard", tab.Shard) continue } @@ -1026,6 +1042,10 @@ func reshard(t *testing.T, ksName string, tableName string, workflow string, sou if dryRunResultSwitchWrites != nil { reshardAction(t, "SwitchTraffic", workflow, ksName, "", "", callNames, "primary", "--dry-run") } + if tableName == "customer" { + testSwitchWritesErrorHandling(t, sourceTablets, targetTablets, workflow, "reshard") + } + // Now let's confirm that it works as expected with an error. reshardAction(t, "SwitchTraffic", workflow, ksName, "", "", callNames, "primary") reshardAction(t, "Complete", workflow, ksName, "", "", "", "") for tabletName, count := range counts { @@ -1534,6 +1554,197 @@ func switchWritesDryRun(t *testing.T, workflowType, ksWorkflow string, dryRunRes validateDryRunResults(t, output, dryRunResults) } +<<<<<<< HEAD +======= +// testSwitchTrafficPermissionsChecks confirms that for the SwitchTraffic command, the +// necessary permissions are checked properly on the source keyspace's primary tablets. +// This ensures that we can create and manage the reverse vreplication workflow. +func testSwitchTrafficPermissionChecks(t *testing.T, workflowType, sourceKeyspace string, sourceShards []string, targetKeyspace, workflow string) { + applyPrivileges := func(query string) { + for _, shard := range sourceShards { + primary := vc.getPrimaryTablet(t, sourceKeyspace, shard) + _, err := primary.QueryTablet(query, primary.Keyspace, false) + require.NoError(t, err) + } + } + runDryRunCmd := func(expectErr bool) { + _, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKeyspace, + "SwitchTraffic", "--tablet-types=primary", "--dry-run") + require.True(t, ((err != nil) == expectErr), "expected error: %t, got: %v", expectErr, err) + } + + defer func() { + // Put the default global privs back in place. + applyPrivileges("grant select,insert,update,delete on *.* to vt_filtered@localhost") + }() + + t.Run("test switch traffic permission checks", func(t *testing.T) { + t.Run("test without global privileges", func(t *testing.T) { + applyPrivileges("revoke select,insert,update,delete on *.* from vt_filtered@localhost") + runDryRunCmd(true) + }) + + t.Run("test with db level privileges", func(t *testing.T) { + applyPrivileges(fmt.Sprintf("grant select,insert,update,delete on %s.* to vt_filtered@localhost", + sidecarDBIdentifier)) + runDryRunCmd(false) + }) + + t.Run("test without global or db level privileges", func(t *testing.T) { + applyPrivileges(fmt.Sprintf("revoke select,insert,update,delete on %s.* from vt_filtered@localhost", + sidecarDBIdentifier)) + runDryRunCmd(true) + }) + + t.Run("test with table level privileges", func(t *testing.T) { + applyPrivileges(fmt.Sprintf("grant select,insert,update,delete on %s.vreplication to vt_filtered@localhost", + sidecarDBIdentifier)) + runDryRunCmd(false) + }) + + t.Run("test without global, db, or table level privileges", func(t *testing.T) { + applyPrivileges(fmt.Sprintf("revoke select,insert,update,delete on %s.vreplication from vt_filtered@localhost", + sidecarDBIdentifier)) + runDryRunCmd(true) + }) + }) +} + +// testSwitchWritesErrorHandling confirms that switching writes works as expected +// in the face of vreplication lag (canSwitch() precheck) and when canceling the +// switch due to replication failing to catch up in time. +// The workflow MUST be migrating the customer table from the source to the +// target keyspace AND the workflow must currently have reads switched but not +// writes. +func testSwitchWritesErrorHandling(t *testing.T, sourceTablets, targetTablets []*cluster.VttabletProcess, workflow, workflowType string) { + t.Run("validate switch writes error handling", func(t *testing.T) { + vtgateConn := getConnection(t, vc.ClusterConfig.hostname, vc.ClusterConfig.vtgateMySQLPort) + defer vtgateConn.Close() + require.NotZero(t, len(sourceTablets), "no source tablets provided") + require.NotZero(t, len(targetTablets), "no target tablets provided") + sourceKs := sourceTablets[0].Keyspace + targetKs := targetTablets[0].Keyspace + ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow) + var err error + sourceConns := make([]*mysql.Conn, len(sourceTablets)) + for i, tablet := range sourceTablets { + sourceConns[i], err = tablet.TabletConn(tablet.Keyspace, true) + require.NoError(t, err) + defer sourceConns[i].Close() + } + targetConns := make([]*mysql.Conn, len(targetTablets)) + for i, tablet := range targetTablets { + targetConns[i], err = tablet.TabletConn(tablet.Keyspace, true) + require.NoError(t, err) + defer targetConns[i].Close() + } + startingTestRowID := 10000000 + numTestRows := 100 + addTestRows := func() { + for i := 0; i < numTestRows; i++ { + execVtgateQuery(t, vtgateConn, sourceTablets[0].Keyspace, fmt.Sprintf("insert into customer (cid, name) values (%d, 'laggingCustomer')", + startingTestRowID+i)) + } + } + deleteTestRows := func() { + execVtgateQuery(t, vtgateConn, sourceTablets[0].Keyspace, fmt.Sprintf("delete from customer where cid >= %d", startingTestRowID)) + } + addIndex := func() { + for _, targetConn := range targetConns { + execQuery(t, targetConn, "set session sql_mode=''") + execQuery(t, targetConn, "alter table customer add unique index name_idx (name)") + } + } + dropIndex := func() { + for _, targetConn := range targetConns { + execQuery(t, targetConn, "alter table customer drop index name_idx") + } + } + lockTargetTable := func() { + for _, targetConn := range targetConns { + execQuery(t, targetConn, "lock table customer read") + } + } + unlockTargetTable := func() { + for _, targetConn := range targetConns { + execQuery(t, targetConn, "unlock tables") + } + } + cleanupTestData := func() { + dropIndex() + deleteTestRows() + } + restartWorkflow := func() { + err = vc.VtctldClient.ExecuteCommand("workflow", "--keyspace", targetKs, "start", "--workflow", workflow) + require.NoError(t, err, "failed to start workflow: %v", err) + } + waitForTargetToCatchup := func() { + waitForWorkflowState(t, vc, ksWorkflow, binlogdatapb.VReplicationWorkflowState_Running.String()) + waitForNoWorkflowLag(t, vc, targetKs, workflow) + } + + // First let's test that the prechecks work as expected. We ALTER + // the table on the target shards to add a unique index on the name + // field. + addIndex() + // Then we replicate some test rows across the target shards by + // inserting them in the source keyspace. + addTestRows() + // Now the workflow should go into the error state and the lag should + // start to climb. So we sleep for twice the max lag duration that we + // will set for the SwitchTraffic call. + lagDuration := 3 * time.Second + time.Sleep(lagDuration * 3) + out, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs, + "SwitchTraffic", "--tablet-types=primary", "--timeout=30s", "--max-replication-lag-allowed", lagDuration.String()) + // It should fail in the canSwitch() precheck. + require.Error(t, err) + require.Regexp(t, fmt.Sprintf(".*cannot switch traffic for workflow %s at this time: replication lag [0-9]+s is higher than allowed lag %s.*", + workflow, lagDuration.String()), out) + require.NotContains(t, out, "cancel migration failed") + // Confirm that queries still work fine. + execVtgateQuery(t, vtgateConn, sourceKs, "select * from customer limit 1") + cleanupTestData() + // We have to restart the workflow again as the duplicate key error + // is a permanent/terminal one. + restartWorkflow() + waitForTargetToCatchup() + + // Now let's test that the cancel works by setting the command timeout + // to a fraction (6s) of the default max repl lag duration (30s). First + // we lock the customer table on the target tablets so that we cannot + // apply the INSERTs and catch up. + lockTargetTable() + addTestRows() + timeout := lagDuration * 2 // 6s + // Use the default max-replication-lag-allowed value of 30s. + // We run the command in a goroutine so that we can unblock things + // after the timeout is reached -- as the vplayer query is blocking + // on the table lock in the MySQL layer. + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + out, err = vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKs, + "SwitchTraffic", "--tablet-types=primary", "--timeout", timeout.String()) + }() + time.Sleep(timeout) + // Now we can unblock things and let it continue. + unlockTargetTable() + wg.Wait() + // It should fail due to the command context timeout and we should + // successfully cancel. + require.Error(t, err) + require.Contains(t, out, "failed to sync up replication between the source and target") + require.NotContains(t, out, "cancel migration failed") + // Confirm that queries still work fine. + execVtgateQuery(t, vtgateConn, sourceKs, "select * from customer limit 1") + deleteTestRows() + waitForTargetToCatchup() + }) +} + +>>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616)) // restartWorkflow confirms that a workflow can be successfully // stopped and started. func restartWorkflow(t *testing.T, ksWorkflow string) { diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index f4c761e8b5a..f01cccbe27b 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -3242,10 +3242,30 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit return handleError("failed to migrate the workflow streams", err) } if cancel { - sw.cancelMigration(ctx, sm) - return 0, sw.logs(), nil + if cerr := sw.cancelMigration(ctx, sm); cerr != nil { + err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr) + } + return 0, sw.logs(), err + } + +<<<<<<< HEAD +======= + // We stop writes on the source before stopping the source streams so that the catchup time + // is lessened and other workflows that we have to migrate such as intra-keyspace materialize + // workflows also have a chance to catch up as well because those are internally generated + // GTIDs within the shards we're switching traffic away from. + // For intra-keyspace materialization streams that we migrate where the source and target are + // the keyspace being resharded, we wait for those to catchup in the stopStreams path before + // we actually stop them. + ts.Logger().Infof("Stopping source writes") + if err := sw.stopSourceWrites(ctx); err != nil { + if cerr := sw.cancelMigration(ctx, sm); cerr != nil { + err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr) + } + return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err) } +>>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616)) ts.Logger().Infof("Stopping streams") sourceWorkflows, err = sw.stopStreams(ctx, sm) if err != nil { @@ -3254,6 +3274,7 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit ts.Logger().Errorf("stream in stopStreams: key %s shard %s stream %+v", key, stream.BinlogSource.Shard, stream.BinlogSource) } } +<<<<<<< HEAD sw.cancelMigration(ctx, sm) return handleError("failed to stop the workflow streams", err) } @@ -3262,6 +3283,12 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit if err := sw.stopSourceWrites(ctx); err != nil { sw.cancelMigration(ctx, sm) return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err) +======= + if cerr := sw.cancelMigration(ctx, sm); cerr != nil { + err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr) + } + return handleError(fmt.Sprintf("failed to stop the workflow streams in the %s keyspace", ts.SourceKeyspaceName()), err) +>>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616)) } if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { @@ -3270,7 +3297,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit // the tablet's deny list check and the first mysqld side table lock. for cnt := 1; cnt <= lockTablesCycles; cnt++ { if err := ts.executeLockTablesOnSource(ctx); err != nil { - sw.cancelMigration(ctx, sm) + if cerr := sw.cancelMigration(ctx, sm); cerr != nil { + err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr) + } return handleError(fmt.Sprintf("failed to execute LOCK TABLES (attempt %d of %d) on sources", cnt, lockTablesCycles), err) } // No need to UNLOCK the tables as the connection was closed once the locks were acquired @@ -3280,26 +3309,39 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit } ts.Logger().Infof("Waiting for streams to catchup") +<<<<<<< HEAD if err := sw.waitForCatchup(ctx, timeout); err != nil { sw.cancelMigration(ctx, sm) +======= + if err := sw.waitForCatchup(ctx, waitTimeout); err != nil { + if cerr := sw.cancelMigration(ctx, sm); cerr != nil { + err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr) + } +>>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616)) return handleError("failed to sync up replication between the source and target", err) } ts.Logger().Infof("Migrating streams") if err := sw.migrateStreams(ctx, sm); err != nil { - sw.cancelMigration(ctx, sm) + if cerr := sw.cancelMigration(ctx, sm); cerr != nil { + err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr) + } return handleError("failed to migrate the workflow streams", err) } ts.Logger().Infof("Resetting sequences") if err := sw.resetSequences(ctx); err != nil { - sw.cancelMigration(ctx, sm) + if cerr := sw.cancelMigration(ctx, sm); cerr != nil { + err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr) + } return handleError("failed to reset the sequences", err) } ts.Logger().Infof("Creating reverse streams") if err := sw.createReverseVReplication(ctx); err != nil { - sw.cancelMigration(ctx, sm) + if cerr := sw.cancelMigration(ctx, sm); cerr != nil { + err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr) + } return handleError("failed to create the reverse vreplication streams", err) } @@ -3312,7 +3354,9 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit initSeqCtx, cancel := context.WithTimeout(ctx, timeout/2) defer cancel() if err := sw.initializeTargetSequences(initSeqCtx, sequenceMetadata); err != nil { - sw.cancelMigration(ctx, sm) + if cerr := sw.cancelMigration(ctx, sm); cerr != nil { + err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr) + } return handleError(fmt.Sprintf("failed to initialize the sequences used in the %s keyspace", ts.TargetKeyspaceName()), err) } } @@ -3365,15 +3409,14 @@ func (s *Server) canSwitch(ctx context.Context, ts *trafficSwitcher, state *Stat if err != nil { return "", err } + if wf.MaxVReplicationTransactionLag > maxAllowedReplLagSecs { + return fmt.Sprintf(cannotSwitchHighLag, wf.MaxVReplicationTransactionLag, maxAllowedReplLagSecs), nil + } for _, stream := range wf.ShardStreams { for _, st := range stream.GetStreams() { if st.Message == Frozen { return cannotSwitchFrozen, nil } - // If no new events have been replicated after the copy phase then it will be 0. - if vreplLag := time.Now().Unix() - st.TimeUpdated.Seconds; vreplLag > maxAllowedReplLagSecs { - return fmt.Sprintf(cannotSwitchHighLag, vreplLag, maxAllowedReplLagSecs), nil - } switch st.State { case binlogdatapb.VReplicationWorkflowState_Copying.String(): return cannotSwitchCopyIncomplete, nil diff --git a/go/vt/vtctl/workflow/stream_migrator.go b/go/vt/vtctl/workflow/stream_migrator.go index 7d225f6dd9f..1a7ffc71f24 100644 --- a/go/vt/vtctl/workflow/stream_migrator.go +++ b/go/vt/vtctl/workflow/stream_migrator.go @@ -158,12 +158,15 @@ func (sm *StreamMigrator) Templates() []*VReplicationStream { } // CancelStreamMigrations cancels the stream migrations. -func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context) { +func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context) error { if sm.streams == nil { - return + return nil } + errs := &concurrency.AllErrorRecorder{} - _ = sm.deleteTargetStreams(ctx) + if err := sm.deleteTargetStreams(ctx); err != nil { + errs.RecordError(fmt.Errorf("could not delete target streams: %v", err)) + } // Restart the source streams, but leave the Reshard workflow's reverse // variant stopped. @@ -176,8 +179,13 @@ func (sm *StreamMigrator) CancelStreamMigrations(ctx context.Context) { return err }) if err != nil { + errs.RecordError(fmt.Errorf("could not restart source streams: %v", err)) sm.logger.Errorf("Cancel stream migrations failed: could not restart source streams: %v", err) } + if errs.HasErrors() { + return errs.AggrError(vterrors.Aggregate) + } + return nil } // MigrateStreams migrates N streams diff --git a/go/vt/vtctl/workflow/switcher.go b/go/vt/vtctl/workflow/switcher.go index 0cbdce164dc..5e95e648299 100644 --- a/go/vt/vtctl/workflow/switcher.go +++ b/go/vt/vtctl/workflow/switcher.go @@ -110,8 +110,8 @@ func (r *switcher) stopStreams(ctx context.Context, sm *StreamMigrator) ([]strin return sm.StopStreams(ctx) } -func (r *switcher) cancelMigration(ctx context.Context, sm *StreamMigrator) { - r.ts.cancelMigration(ctx, sm) +func (r *switcher) cancelMigration(ctx context.Context, sm *StreamMigrator) error { + return r.ts.cancelMigration(ctx, sm) } func (r *switcher) lockKeyspace(ctx context.Context, keyspace, action string) (context.Context, func(*error), error) { diff --git a/go/vt/vtctl/workflow/switcher_dry_run.go b/go/vt/vtctl/workflow/switcher_dry_run.go index 21b975a0d6b..b7ad8207574 100644 --- a/go/vt/vtctl/workflow/switcher_dry_run.go +++ b/go/vt/vtctl/workflow/switcher_dry_run.go @@ -214,8 +214,9 @@ func (dr *switcherDryRun) stopStreams(ctx context.Context, sm *StreamMigrator) ( return nil, nil } -func (dr *switcherDryRun) cancelMigration(ctx context.Context, sm *StreamMigrator) { +func (dr *switcherDryRun) cancelMigration(ctx context.Context, sm *StreamMigrator) error { dr.drLog.Log("Cancel migration as requested") + return nil } func (dr *switcherDryRun) lockKeyspace(ctx context.Context, keyspace, _ string) (context.Context, func(*error), error) { diff --git a/go/vt/vtctl/workflow/switcher_interface.go b/go/vt/vtctl/workflow/switcher_interface.go index 8d0f9e847be..970f12ca4b2 100644 --- a/go/vt/vtctl/workflow/switcher_interface.go +++ b/go/vt/vtctl/workflow/switcher_interface.go @@ -24,8 +24,13 @@ import ( ) type iswitcher interface { +<<<<<<< HEAD lockKeyspace(ctx context.Context, keyspace, action string) (context.Context, func(*error), error) cancelMigration(ctx context.Context, sm *StreamMigrator) +======= + lockKeyspace(ctx context.Context, keyspace, action string, opts ...topo.LockOption) (context.Context, func(*error), error) + cancelMigration(ctx context.Context, sm *StreamMigrator) error +>>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616)) stopStreams(ctx context.Context, sm *StreamMigrator) ([]string, error) stopSourceWrites(ctx context.Context) error waitForCatchup(ctx context.Context, filteredReplicationWaitTime time.Duration) error diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index f4d8a13054b..c5e1241e388 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -681,7 +681,11 @@ func (ts *trafficSwitcher) changeShardsAccess(ctx context.Context, keyspace stri func (ts *trafficSwitcher) allowTargetWrites(ctx context.Context) error { if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { +<<<<<<< HEAD return ts.allowTableTargetWrites(ctx) +======= + return ts.switchDeniedTables(ctx, false) +>>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616)) } return ts.changeShardsAccess(ctx, ts.TargetKeyspaceName(), ts.TargetShards(), allowWrites) } @@ -948,7 +952,11 @@ func (ts *trafficSwitcher) waitForCatchup(ctx context.Context, filteredReplicati func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error { var err error if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { +<<<<<<< HEAD err = ts.changeTableSourceWrites(ctx, disallowWrites) +======= + err = ts.switchDeniedTables(ctx, false) +>>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616)) } else { err = ts.changeShardsAccess(ctx, ts.SourceKeyspaceName(), ts.SourceShards(), disallowWrites) } @@ -968,11 +976,52 @@ func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error { }) } +<<<<<<< HEAD func (ts *trafficSwitcher) changeTableSourceWrites(ctx context.Context, access accessType) error { err := ts.ForAllSources(func(source *MigrationSource) error { if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.SourceKeyspaceName(), source.GetShard().ShardName(), func(si *topo.ShardInfo) error { return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, access == allowWrites /* remove */, ts.Tables()) }); err != nil { +======= +// switchDeniedTables switches the denied tables rules for the traffic switch. +// They are added on the source side and removed on the target side. +// If backward is true, then we swap this logic, removing on the source side +// and adding on the target side. You would want to do that e.g. when canceling +// a failed (and currently partial) traffic switch as we may have already +// switched the denied tables entries and in any event we need to go back to +// the original state. +func (ts *trafficSwitcher) switchDeniedTables(ctx context.Context, backward bool) error { + if ts.MigrationType() != binlogdatapb.MigrationType_TABLES { + return nil + } + + rmsource, rmtarget := false, true + if backward { + rmsource, rmtarget = true, false + } + + egrp, ectx := errgroup.WithContext(ctx) + egrp.Go(func() error { + return ts.ForAllSources(func(source *MigrationSource) error { + if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.SourceKeyspaceName(), source.GetShard().ShardName(), func(si *topo.ShardInfo) error { + return si.UpdateDeniedTables(ectx, topodatapb.TabletType_PRIMARY, nil, rmsource, ts.Tables()) + }); err != nil { + return err + } + rtbsCtx, cancel := context.WithTimeout(ectx, shardTabletRefreshTimeout) + defer cancel() + isPartial, partialDetails, err := topotools.RefreshTabletsByShard(rtbsCtx, ts.TopoServer(), ts.TabletManagerClient(), source.GetShard(), nil, ts.Logger()) + if isPartial { + msg := fmt.Sprintf("failed to successfully refresh all tablets in the %s/%s source shard (%v):\n %v", + source.GetShard().Keyspace(), source.GetShard().ShardName(), err, partialDetails) + if ts.force { + log.Warning(msg) + return nil + } else { + return errors.New(msg) + } + } +>>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616)) return err } rtbsCtx, cancel := context.WithTimeout(ctx, shardTabletRefreshTimeout) @@ -984,8 +1033,36 @@ func (ts *trafficSwitcher) changeTableSourceWrites(ctx context.Context, access a } return err }) +<<<<<<< HEAD if err != nil { log.Warningf("Error in changeTableSourceWrites: %s", err) +======= + egrp.Go(func() error { + return ts.ForAllTargets(func(target *MigrationTarget) error { + if _, err := ts.TopoServer().UpdateShardFields(ectx, ts.TargetKeyspaceName(), target.GetShard().ShardName(), func(si *topo.ShardInfo) error { + return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, rmtarget, ts.Tables()) + }); err != nil { + return err + } + rtbsCtx, cancel := context.WithTimeout(ectx, shardTabletRefreshTimeout) + defer cancel() + isPartial, partialDetails, err := topotools.RefreshTabletsByShard(rtbsCtx, ts.TopoServer(), ts.TabletManagerClient(), target.GetShard(), nil, ts.Logger()) + if isPartial { + msg := fmt.Sprintf("failed to successfully refresh all tablets in the %s/%s target shard (%v):\n %v", + target.GetShard().Keyspace(), target.GetShard().ShardName(), err, partialDetails) + if ts.force { + log.Warning(msg) + return nil + } else { + return errors.New(msg) + } + } + return err + }) + }) + if err := egrp.Wait(); err != nil { + ts.Logger().Warningf("Error in switchDeniedTables: %s", err) +>>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616)) return err } // Note that the denied tables, which are being updated in this method, are not part of the SrvVSchema in the topo. @@ -996,8 +1073,9 @@ func (ts *trafficSwitcher) changeTableSourceWrites(ctx context.Context, access a // cancelMigration attempts to revert all changes made during the migration so that we can get back to the // state when traffic switching (or reversing) was initiated. -func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrator) { +func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrator) error { var err error + cancelErrs := &concurrency.AllErrorRecorder{} if ctx.Err() != nil { // Even though we create a new context later on we still record any context error: @@ -1006,21 +1084,33 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat } // We create a new context while canceling the migration, so that we are independent of the original - // context being cancelled prior to or during the cancel operation. - cmTimeout := 60 * time.Second - cmCtx, cmCancel := context.WithTimeout(context.Background(), cmTimeout) + // context being canceled prior to or during the cancel operation itself. + // First we create a copy of the parent context, so that we maintain the locks, but which cannot be + // canceled by the parent context. + wcCtx := context.WithoutCancel(ctx) + // Now we create a child context from that which has a timeout. + cmTimeout := 2 * time.Minute + cmCtx, cmCancel := context.WithTimeout(wcCtx, cmTimeout) defer cmCancel() if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { +<<<<<<< HEAD err = ts.changeTableSourceWrites(cmCtx, allowWrites) +======= + err = ts.switchDeniedTables(cmCtx, true /* revert */) +>>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616)) } else { err = ts.changeShardsAccess(cmCtx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites) } if err != nil { + cancelErrs.RecordError(fmt.Errorf("could not revert denied tables / shard access: %v", err)) ts.Logger().Errorf("Cancel migration failed: could not revert denied tables / shard access: %v", err) } - sm.CancelStreamMigrations(cmCtx) + if err := sm.CancelStreamMigrations(cmCtx); err != nil { + cancelErrs.RecordError(fmt.Errorf("could not cancel stream migrations: %v", err)) + ts.Logger().Errorf("Cancel migration failed: could not cancel stream migrations: %v", err) + } err = ts.ForAllTargets(func(target *MigrationTarget) error { query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s", @@ -1029,13 +1119,25 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat return err }) if err != nil { + cancelErrs.RecordError(fmt.Errorf("could not restart vreplication: %v", err)) ts.Logger().Errorf("Cancel migration failed: could not restart vreplication: %v", err) } +<<<<<<< HEAD err = ts.deleteReverseVReplication(cmCtx) if err != nil { ts.Logger().Errorf("Cancel migration failed: could not delete revers vreplication entries: %v", err) +======= + if err := ts.deleteReverseVReplication(cmCtx); err != nil { + cancelErrs.RecordError(fmt.Errorf("could not delete reverse vreplication streams: %v", err)) + ts.Logger().Errorf("Cancel migration failed: could not delete reverse vreplication streams: %v", err) +>>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616)) } + + if cancelErrs.HasErrors() { + return vterrors.Wrap(cancelErrs.AggrError(vterrors.Aggregate), "cancel migration failed, manual cleanup work may be necessary") + } + return nil } func (ts *trafficSwitcher) freezeTargetVReplication(ctx context.Context) error { diff --git a/go/vt/vtctl/workflow/workflows.go b/go/vt/vtctl/workflow/workflows.go new file mode 100644 index 00000000000..a1b4393f2c0 --- /dev/null +++ b/go/vt/vtctl/workflow/workflows.go @@ -0,0 +1,686 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +This file provides functions for fetching and retrieving information about VReplication workflows + +At the moment it is used by the `GetWorkflows` function in `server.go and includes functionality to +get the following: +- Fetch workflows by shard +- Fetch copy states by shard stream +- Build workflows with metadata +- Fetch stream logs +*/ + +package workflow + +import ( + "context" + "encoding/json" + "fmt" + "math" + "sort" + "strings" + "sync" + "time" + + "golang.org/x/exp/maps" + "golang.org/x/sync/errgroup" + + "vitess.io/vitess/go/sets" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/trace" + "vitess.io/vitess/go/vt/binlog/binlogplayer" + "vitess.io/vitess/go/vt/logutil" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vtctl/workflow/common" + "vitess.io/vitess/go/vt/vtctl/workflow/vexec" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vttablet/tmclient" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + vttimepb "vitess.io/vitess/go/vt/proto/vttime" +) + +// workflowFetcher is responsible for fetching and retrieving information +// about VReplication workflows. +type workflowFetcher struct { + ts *topo.Server + tmc tmclient.TabletManagerClient + + logger logutil.Logger + parser *sqlparser.Parser +} + +type workflowMetadata struct { + sourceKeyspace string + sourceShards sets.Set[string] + targetKeyspace string + targetShards sets.Set[string] + maxVReplicationLag float64 + maxVReplicationTransactionLag float64 +} + +var vrepLogQuery = strings.TrimSpace(` +SELECT + id, + vrepl_id, + type, + state, + message, + created_at, + updated_at, + count +FROM + _vt.vreplication_log +WHERE vrepl_id IN %a +ORDER BY + vrepl_id ASC, + id ASC +`) + +func (wf *workflowFetcher) fetchWorkflowsByShard( + ctx context.Context, + req *vtctldatapb.GetWorkflowsRequest, +) (map[*topo.TabletInfo]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, error) { + readReq := &tabletmanagerdatapb.ReadVReplicationWorkflowsRequest{} + if req.Workflow != "" { + readReq.IncludeWorkflows = []string{req.Workflow} + } + if req.ActiveOnly { + readReq.ExcludeStates = []binlogdatapb.VReplicationWorkflowState{binlogdatapb.VReplicationWorkflowState_Stopped} + } + + m := sync.Mutex{} + + shards, err := common.GetShards(ctx, wf.ts, req.Keyspace, req.Shards) + if err != nil { + return nil, err + } + + results := make(map[*topo.TabletInfo]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, len(shards)) + + err = wf.forAllShards(ctx, req.Keyspace, shards, func(ctx context.Context, si *topo.ShardInfo) error { + primary, err := wf.ts.GetTablet(ctx, si.PrimaryAlias) + if err != nil { + return err + } + if primary == nil { + return fmt.Errorf("%w %s/%s: tablet %v not found", vexec.ErrNoShardPrimary, req.Keyspace, si.ShardName(), topoproto.TabletAliasString(si.PrimaryAlias)) + } + // Clone the request so that we can set the correct DB name for tablet. + req := readReq.CloneVT() + wres, err := wf.tmc.ReadVReplicationWorkflows(ctx, primary.Tablet, req) + if err != nil { + return err + } + m.Lock() + defer m.Unlock() + results[primary] = wres + return nil + }) + if err != nil { + return nil, err + } + + return results, nil +} + +func (wf *workflowFetcher) fetchCopyStatesByShardStream( + ctx context.Context, + workflowsByShard map[*topo.TabletInfo]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, +) (map[string][]*vtctldatapb.Workflow_Stream_CopyState, error) { + m := sync.Mutex{} + + copyStatesByShardStreamId := make(map[string][]*vtctldatapb.Workflow_Stream_CopyState, len(workflowsByShard)) + + fetchCopyStates := func(ctx context.Context, tablet *topo.TabletInfo, streamIds []int32) error { + span, ctx := trace.NewSpan(ctx, "workflowFetcher.workflow.fetchCopyStates") + defer span.Finish() + + span.Annotate("shard", tablet.Shard) + span.Annotate("tablet_alias", tablet.AliasString()) + + copyStates, err := wf.getWorkflowCopyStates(ctx, tablet, streamIds) + if err != nil { + return err + } + + m.Lock() + defer m.Unlock() + + for _, copyState := range copyStates { + shardStreamId := fmt.Sprintf("%s/%d", tablet.Shard, copyState.StreamId) + copyStatesByShardStreamId[shardStreamId] = append( + copyStatesByShardStreamId[shardStreamId], + copyState, + ) + } + + return nil + } + + fetchCopyStatesEg, fetchCopyStatesCtx := errgroup.WithContext(ctx) + for tablet, result := range workflowsByShard { + streamIds := make([]int32, 0, len(result.Workflows)) + for _, wf := range result.Workflows { + for _, stream := range wf.Streams { + streamIds = append(streamIds, stream.Id) + } + } + + if len(streamIds) == 0 { + continue + } + + fetchCopyStatesEg.Go(func() error { + return fetchCopyStates(fetchCopyStatesCtx, tablet, streamIds) + }) + } + if err := fetchCopyStatesEg.Wait(); err != nil { + return nil, err + } + + return copyStatesByShardStreamId, nil +} + +func (wf *workflowFetcher) getWorkflowCopyStates(ctx context.Context, tablet *topo.TabletInfo, streamIds []int32) ([]*vtctldatapb.Workflow_Stream_CopyState, error) { + span, ctx := trace.NewSpan(ctx, "workflowFetcher.workflow.getWorkflowCopyStates") + defer span.Finish() + + span.Annotate("keyspace", tablet.Keyspace) + span.Annotate("shard", tablet.Shard) + span.Annotate("tablet_alias", tablet.AliasString()) + span.Annotate("stream_ids", fmt.Sprintf("%#v", streamIds)) + + idsBV, err := sqltypes.BuildBindVariable(streamIds) + if err != nil { + return nil, err + } + query, err := sqlparser.ParseAndBind("select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in %a and id in (select max(id) from _vt.copy_state where vrepl_id in %a group by vrepl_id, table_name)", + idsBV, idsBV) + if err != nil { + return nil, err + } + qr, err := wf.tmc.VReplicationExec(ctx, tablet.Tablet, query) + if err != nil { + return nil, err + } + + result := sqltypes.Proto3ToResult(qr) + if result == nil { + return nil, nil + } + + copyStates := make([]*vtctldatapb.Workflow_Stream_CopyState, len(result.Rows)) + for i, row := range result.Named().Rows { + streamId, err := row["vrepl_id"].ToInt64() + if err != nil { + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to cast vrepl_id to int64: %v", err) + } + // These string fields are technically varbinary, but this is close enough. + copyStates[i] = &vtctldatapb.Workflow_Stream_CopyState{ + StreamId: streamId, + Table: row["table_name"].ToString(), + LastPk: row["lastpk"].ToString(), + } + } + + return copyStates, nil +} + +func (wf *workflowFetcher) buildWorkflows( + ctx context.Context, + results map[*topo.TabletInfo]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, + copyStatesByShardStreamId map[string][]*vtctldatapb.Workflow_Stream_CopyState, + req *vtctldatapb.GetWorkflowsRequest, +) ([]*vtctldatapb.Workflow, error) { + workflowsMap := make(map[string]*vtctldatapb.Workflow, len(results)) + workflowMetadataMap := make(map[string]*workflowMetadata, len(results)) + + for tablet, result := range results { + // In the old implementation, we knew we had at most one (0 <= N <= 1) + // workflow for each shard primary we queried. There might be multiple + // rows (streams) comprising that workflow, so we would aggregate the + // rows for a given primary into a single value ("the workflow", + // ReplicationStatusResult in the old types). + // + // In this version, we have many (N >= 0) workflows for each shard + // primary we queried, so we need to determine if each row corresponds + // to a workflow we're already aggregating, or if it's a workflow we + // haven't seen yet for that shard primary. We use the workflow name to + // dedupe for this. + for _, wfres := range result.Workflows { + workflowName := wfres.Workflow + workflow, ok := workflowsMap[workflowName] + if !ok { + workflow = &vtctldatapb.Workflow{ + Name: workflowName, + ShardStreams: map[string]*vtctldatapb.Workflow_ShardStream{}, + } + + workflowsMap[workflowName] = workflow + workflowMetadataMap[workflowName] = &workflowMetadata{ + sourceShards: sets.New[string](), + targetShards: sets.New[string](), + } + } + + metadata := workflowMetadataMap[workflowName] + err := wf.scanWorkflow(ctx, workflow, wfres, tablet, metadata, copyStatesByShardStreamId, req.Keyspace) + if err != nil { + return nil, err + } + } + } + + for name, workflow := range workflowsMap { + meta := workflowMetadataMap[name] + updateWorkflowWithMetadata(workflow, meta) + + // Sort shard streams by stream_id ASC, to support an optimization + // in fetchStreamLogs below. + for _, shardStreams := range workflow.ShardStreams { + sort.Slice(shardStreams.Streams, func(i, j int) bool { + return shardStreams.Streams[i].Id < shardStreams.Streams[j].Id + }) + } + } + + if req.IncludeLogs { + var fetchLogsWG sync.WaitGroup + + for _, workflow := range workflowsMap { + // Fetch logs for all streams associated with this workflow in the background. + fetchLogsWG.Add(1) + go func(ctx context.Context, workflow *vtctldatapb.Workflow) { + defer fetchLogsWG.Done() + wf.fetchStreamLogs(ctx, req.Keyspace, workflow) + }(ctx, workflow) + } + + // Wait for all the log fetchers to finish. + fetchLogsWG.Wait() + } + + return maps.Values(workflowsMap), nil +} + +func (wf *workflowFetcher) scanWorkflow( + ctx context.Context, + workflow *vtctldatapb.Workflow, + res *tabletmanagerdatapb.ReadVReplicationWorkflowResponse, + tablet *topo.TabletInfo, + meta *workflowMetadata, + copyStatesByShardStreamId map[string][]*vtctldatapb.Workflow_Stream_CopyState, + keyspace string, +) error { + shardStreamKey := fmt.Sprintf("%s/%s", tablet.Shard, tablet.AliasString()) + shardStream, ok := workflow.ShardStreams[shardStreamKey] + if !ok { + ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) + defer cancel() + + si, err := wf.ts.GetShard(ctx, keyspace, tablet.Shard) + if err != nil { + return err + } + + shardStream = &vtctldatapb.Workflow_ShardStream{ + Streams: nil, + TabletControls: si.TabletControls, + IsPrimaryServing: si.IsPrimaryServing, + } + + workflow.ShardStreams[shardStreamKey] = shardStream + } + + for _, rstream := range res.Streams { + // The value in the pos column can be compressed and thus not + // have a valid GTID consisting of valid UTF-8 characters so we + // have to decode it so that it's properly decompressed first + // when needed. + pos := rstream.Pos + if pos != "" { + mpos, err := binlogplayer.DecodePosition(pos) + if err != nil { + return err + } + pos = mpos.String() + } + + cells := strings.Split(res.Cells, ",") + for i := range cells { + cells[i] = strings.TrimSpace(cells[i]) + } + options := res.Options + if options != "" { + if err := json.Unmarshal([]byte(options), &workflow.Options); err != nil { + return err + } + } + + stream := &vtctldatapb.Workflow_Stream{ + Id: int64(rstream.Id), + Shard: tablet.Shard, + Tablet: tablet.Alias, + BinlogSource: rstream.Bls, + Position: pos, + StopPosition: rstream.StopPos, + State: rstream.State.String(), + DbName: tablet.DbName(), + TabletTypes: res.TabletTypes, + TabletSelectionPreference: res.TabletSelectionPreference, + Cells: cells, + TransactionTimestamp: rstream.TransactionTimestamp, + TimeUpdated: rstream.TimeUpdated, + Message: rstream.Message, + Tags: strings.Split(res.Tags, ","), + RowsCopied: rstream.RowsCopied, + ThrottlerStatus: &vtctldatapb.Workflow_Stream_ThrottlerStatus{ + ComponentThrottled: rstream.ComponentThrottled, + TimeThrottled: rstream.TimeThrottled, + }, + } + + // Merge in copy states, which we've already fetched. + shardStreamId := fmt.Sprintf("%s/%d", tablet.Shard, stream.Id) + if copyStates, ok := copyStatesByShardStreamId[shardStreamId]; ok { + stream.CopyStates = copyStates + } + + if rstream.TimeUpdated == nil { + rstream.TimeUpdated = &vttimepb.Time{} + } + + stream.State = getStreamState(stream, rstream) + + shardStream.Streams = append(shardStream.Streams, stream) + + meta.sourceShards.Insert(stream.BinlogSource.Shard) + meta.targetShards.Insert(tablet.Shard) + + if meta.sourceKeyspace != "" && meta.sourceKeyspace != stream.BinlogSource.Keyspace { + return vterrors.Wrapf(ErrMultipleSourceKeyspaces, "workflow = %v, ks1 = %v, ks2 = %v", workflow.Name, meta.sourceKeyspace, stream.BinlogSource.Keyspace) + } + + meta.sourceKeyspace = stream.BinlogSource.Keyspace + + if meta.targetKeyspace != "" && meta.targetKeyspace != tablet.Keyspace { + return vterrors.Wrapf(ErrMultipleTargetKeyspaces, "workflow = %v, ks1 = %v, ks2 = %v", workflow.Name, meta.targetKeyspace, tablet.Keyspace) + } + + meta.targetKeyspace = tablet.Keyspace + + if stream.TimeUpdated == nil { + stream.TimeUpdated = &vttimepb.Time{} + } + timeUpdated := time.Unix(stream.TimeUpdated.Seconds, 0) + vreplicationLag := time.Since(timeUpdated) + + // MaxVReplicationLag represents the time since we last processed any event + // in the workflow. + if vreplicationLag.Seconds() > meta.maxVReplicationLag { + meta.maxVReplicationLag = vreplicationLag.Seconds() + } + + workflow.WorkflowType = res.WorkflowType.String() + workflow.WorkflowSubType = res.WorkflowSubType.String() + workflow.DeferSecondaryKeys = res.DeferSecondaryKeys + + // MaxVReplicationTransactionLag estimates the max statement processing lag + // between the source and the target across all of the workflow streams. + transactionReplicationLag := getVReplicationTrxLag(rstream.TransactionTimestamp, rstream.TimeUpdated, rstream.TimeHeartbeat, rstream.State) + if transactionReplicationLag > meta.maxVReplicationTransactionLag { + meta.maxVReplicationTransactionLag = transactionReplicationLag + } + } + + return nil +} + +func updateWorkflowWithMetadata(workflow *vtctldatapb.Workflow, meta *workflowMetadata) { + workflow.Source = &vtctldatapb.Workflow_ReplicationLocation{ + Keyspace: meta.sourceKeyspace, + Shards: sets.List(meta.sourceShards), + } + + workflow.Target = &vtctldatapb.Workflow_ReplicationLocation{ + Keyspace: meta.targetKeyspace, + Shards: sets.List(meta.targetShards), + } + + workflow.MaxVReplicationLag = int64(meta.maxVReplicationLag) + workflow.MaxVReplicationTransactionLag = int64(meta.maxVReplicationTransactionLag) +} + +func (wf *workflowFetcher) fetchStreamLogs(ctx context.Context, keyspace string, workflow *vtctldatapb.Workflow) { + span, ctx := trace.NewSpan(ctx, "workflowFetcher.workflow.fetchStreamLogs") + defer span.Finish() + + span.Annotate("keyspace", keyspace) + span.Annotate("workflow", workflow.Name) + + vreplIDs := make([]int64, 0, len(workflow.ShardStreams)) + for _, shardStream := range maps.Values(workflow.ShardStreams) { + for _, stream := range shardStream.Streams { + vreplIDs = append(vreplIDs, stream.Id) + } + } + idsBV, err := sqltypes.BuildBindVariable(vreplIDs) + if err != nil { + return + } + + query, err := sqlparser.ParseAndBind(vrepLogQuery, idsBV) + if err != nil { + return + } + + vx := vexec.NewVExec(keyspace, workflow.Name, wf.ts, wf.tmc, wf.parser) + results, err := vx.QueryContext(ctx, query) + if err != nil { + // Note that we do not return here. If there are any query results + // in the map (i.e. some tablets returned successfully), we will + // still try to read log rows from them on a best-effort basis. But, + // we will also pre-emptively record the top-level fetch error on + // every stream in every shard in the workflow. Further processing + // below may override the error message for certain streams. + for _, streams := range workflow.ShardStreams { + for _, stream := range streams.Streams { + stream.LogFetchError = err.Error() + } + } + } + + for target, p3qr := range results { + qr := sqltypes.Proto3ToResult(p3qr) + shardStreamKey := fmt.Sprintf("%s/%s", target.Shard, target.AliasString()) + + ss, ok := workflow.ShardStreams[shardStreamKey] + if !ok || ss == nil { + continue + } + + streams := ss.Streams + streamIdx := 0 + markErrors := func(err error) { + if streamIdx >= len(streams) { + return + } + + streams[streamIdx].LogFetchError = err.Error() + } + + for _, row := range qr.Named().Rows { + id, err := row["id"].ToCastInt64() + if err != nil { + markErrors(err) + continue + } + + streamID, err := row["vrepl_id"].ToCastInt64() + if err != nil { + markErrors(err) + continue + } + + typ := row["type"].ToString() + state := row["state"].ToString() + message := row["message"].ToString() + + createdAt, err := time.Parse("2006-01-02 15:04:05", row["created_at"].ToString()) + if err != nil { + markErrors(err) + continue + } + + updatedAt, err := time.Parse("2006-01-02 15:04:05", row["updated_at"].ToString()) + if err != nil { + markErrors(err) + continue + } + + count, err := row["count"].ToCastInt64() + if err != nil { + markErrors(err) + continue + } + + streamLog := &vtctldatapb.Workflow_Stream_Log{ + Id: id, + StreamId: streamID, + Type: typ, + State: state, + CreatedAt: &vttimepb.Time{ + Seconds: createdAt.Unix(), + }, + UpdatedAt: &vttimepb.Time{ + Seconds: updatedAt.Unix(), + }, + Message: message, + Count: count, + } + + // Earlier, in buildWorkflows, we sorted each ShardStreams + // slice by ascending id, and our _vt.vreplication_log query + // ordered by (stream_id ASC, id ASC), so we can walk the + // streams in index order in O(n) amortized over all the rows + // for this tablet. + for streamIdx < len(streams) { + stream := streams[streamIdx] + if stream.Id < streamLog.StreamId { + streamIdx++ + continue + } + + if stream.Id > streamLog.StreamId { + wf.logger.Warningf("Found stream log for nonexistent stream: %+v", streamLog) + // This can happen on manual/failed workflow cleanup so move to the next log. + break + } + + // stream.Id == streamLog.StreamId + stream.Logs = append(stream.Logs, streamLog) + break + } + } + } +} + +func (wf *workflowFetcher) forAllShards( + ctx context.Context, + keyspace string, + shards []string, + f func(ctx context.Context, shard *topo.ShardInfo) error, +) error { + eg, egCtx := errgroup.WithContext(ctx) + for _, shard := range shards { + eg.Go(func() error { + si, err := wf.ts.GetShard(ctx, keyspace, shard) + if err != nil { + return err + } + if si.PrimaryAlias == nil { + return fmt.Errorf("%w %s/%s", vexec.ErrNoShardPrimary, keyspace, shard) + } + + if err := f(egCtx, si); err != nil { + return err + } + return nil + }) + } + if err := eg.Wait(); err != nil { + return err + } + return nil +} + +func getStreamState(stream *vtctldatapb.Workflow_Stream, rstream *tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream) string { + switch { + case strings.Contains(strings.ToLower(stream.Message), "error"): + return binlogdatapb.VReplicationWorkflowState_Error.String() + case stream.State == binlogdatapb.VReplicationWorkflowState_Running.String() && len(stream.CopyStates) > 0: + return binlogdatapb.VReplicationWorkflowState_Copying.String() + case stream.State == binlogdatapb.VReplicationWorkflowState_Running.String() && int64(time.Now().Second())-rstream.TimeUpdated.Seconds > 10: + return binlogdatapb.VReplicationWorkflowState_Lagging.String() + } + return rstream.State.String() +} + +// getVReplicationTrxLag estimates the actual statement processing lag between the +// source and the target. If we are still processing source events it is the +// difference between current time and the timestamp of the last event. If +// heartbeats are more recent than the last event, then the lag is the time since +// the last heartbeat as there can be an actual event immediately after the +// heartbeat, but which has not yet been processed on the target. We don't allow +// switching during the copy phase, so in that case we just return a large lag. +// All timestamps are in seconds since epoch. +func getVReplicationTrxLag(trxTs, updatedTs, heartbeatTs *vttimepb.Time, state binlogdatapb.VReplicationWorkflowState) float64 { + if state == binlogdatapb.VReplicationWorkflowState_Copying { + return math.MaxInt64 + } + if trxTs == nil { + trxTs = &vttimepb.Time{} + } + lastTransactionTime := trxTs.Seconds + if updatedTs == nil { + updatedTs = &vttimepb.Time{} + } + lastUpdatedTime := updatedTs.Seconds + if heartbeatTs == nil { + heartbeatTs = &vttimepb.Time{} + } + lastHeartbeatTime := heartbeatTs.Seconds + // We do NOT update the heartbeat timestamp when we are regularly updating the + // position as we replicate transactions (GTIDs). + // When we DO record a heartbeat, we set the updated time to the same value. + // When recording that we are throttled, we update the updated time but NOT + // the heartbeat time. + if lastTransactionTime == 0 /* No replicated events after copy */ || + (lastUpdatedTime == lastHeartbeatTime && /* The last update was from a heartbeat */ + lastUpdatedTime > lastTransactionTime /* No recent transactions, only heartbeats, so all caught up */) { + lastTransactionTime = lastUpdatedTime + } + now := time.Now().Unix() // Seconds since epoch + return float64(now - lastTransactionTime) +} From 2841577ae4beb76d7b55e0a2a0e176405483d56c Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 28 Jan 2025 22:55:43 -0500 Subject: [PATCH 2/2] Fix conflicts Signed-off-by: Matt Lord --- .../vreplication/vreplication_test.go | 73 +- go/vt/vtctl/workflow/server.go | 111 ++- go/vt/vtctl/workflow/switcher_interface.go | 5 - go/vt/vtctl/workflow/traffic_switcher.go | 87 --- go/vt/vtctl/workflow/workflows.go | 686 ------------------ 5 files changed, 58 insertions(+), 904 deletions(-) delete mode 100644 go/vt/vtctl/workflow/workflows.go diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index e8f04280122..7e3f93b6b20 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -762,24 +762,16 @@ func shardCustomer(t *testing.T, testReverse bool, cells []*Cell, sourceCellOrAl switchReads(t, workflowType, cellNames, ksWorkflow, false) assertQueryExecutesOnTablet(t, vtgateConn, productTab, "customer", query, query) - var commit func(t *testing.T) - if withOpenTx { - commit, _ = vc.startQuery(t, openTxQuery) - } switchWritesDryRun(t, workflowType, ksWorkflow, dryRunResultsSwitchWritesCustomerShard) -<<<<<<< HEAD -======= - shardNames := make([]string, 0, len(vc.Cells[defaultCell.Name].Keyspaces[sourceKs].Shards)) - for shardName := range maps.Keys(vc.Cells[defaultCell.Name].Keyspaces[sourceKs].Shards) { - shardNames = append(shardNames, shardName) - } - testSwitchTrafficPermissionChecks(t, workflowType, sourceKs, shardNames, targetKs, workflow) testSwitchWritesErrorHandling(t, []*cluster.VttabletProcess{productTab}, []*cluster.VttabletProcess{customerTab1, customerTab2}, workflow, workflowType) + var commit func(t *testing.T) + if withOpenTx { + commit, _ = vc.startQuery(t, openTxQuery) + } // Now let's confirm that it works as expected with an error. ->>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616)) switchWrites(t, workflowType, ksWorkflow, false) checkThatVDiffFails(t, targetKs, workflow) @@ -1554,62 +1546,6 @@ func switchWritesDryRun(t *testing.T, workflowType, ksWorkflow string, dryRunRes validateDryRunResults(t, output, dryRunResults) } -<<<<<<< HEAD -======= -// testSwitchTrafficPermissionsChecks confirms that for the SwitchTraffic command, the -// necessary permissions are checked properly on the source keyspace's primary tablets. -// This ensures that we can create and manage the reverse vreplication workflow. -func testSwitchTrafficPermissionChecks(t *testing.T, workflowType, sourceKeyspace string, sourceShards []string, targetKeyspace, workflow string) { - applyPrivileges := func(query string) { - for _, shard := range sourceShards { - primary := vc.getPrimaryTablet(t, sourceKeyspace, shard) - _, err := primary.QueryTablet(query, primary.Keyspace, false) - require.NoError(t, err) - } - } - runDryRunCmd := func(expectErr bool) { - _, err := vc.VtctldClient.ExecuteCommandWithOutput(workflowType, "--workflow", workflow, "--target-keyspace", targetKeyspace, - "SwitchTraffic", "--tablet-types=primary", "--dry-run") - require.True(t, ((err != nil) == expectErr), "expected error: %t, got: %v", expectErr, err) - } - - defer func() { - // Put the default global privs back in place. - applyPrivileges("grant select,insert,update,delete on *.* to vt_filtered@localhost") - }() - - t.Run("test switch traffic permission checks", func(t *testing.T) { - t.Run("test without global privileges", func(t *testing.T) { - applyPrivileges("revoke select,insert,update,delete on *.* from vt_filtered@localhost") - runDryRunCmd(true) - }) - - t.Run("test with db level privileges", func(t *testing.T) { - applyPrivileges(fmt.Sprintf("grant select,insert,update,delete on %s.* to vt_filtered@localhost", - sidecarDBIdentifier)) - runDryRunCmd(false) - }) - - t.Run("test without global or db level privileges", func(t *testing.T) { - applyPrivileges(fmt.Sprintf("revoke select,insert,update,delete on %s.* from vt_filtered@localhost", - sidecarDBIdentifier)) - runDryRunCmd(true) - }) - - t.Run("test with table level privileges", func(t *testing.T) { - applyPrivileges(fmt.Sprintf("grant select,insert,update,delete on %s.vreplication to vt_filtered@localhost", - sidecarDBIdentifier)) - runDryRunCmd(false) - }) - - t.Run("test without global, db, or table level privileges", func(t *testing.T) { - applyPrivileges(fmt.Sprintf("revoke select,insert,update,delete on %s.vreplication from vt_filtered@localhost", - sidecarDBIdentifier)) - runDryRunCmd(true) - }) - }) -} - // testSwitchWritesErrorHandling confirms that switching writes works as expected // in the face of vreplication lag (canSwitch() precheck) and when canceling the // switch due to replication failing to catch up in time. @@ -1744,7 +1680,6 @@ func testSwitchWritesErrorHandling(t *testing.T, sourceTablets, targetTablets [] }) } ->>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616)) // restartWorkflow confirms that a workflow can be successfully // stopped and started. func restartWorkflow(t *testing.T, ksWorkflow string) { diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index f01cccbe27b..a4f5ba58364 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -688,11 +688,10 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows targetKeyspaceByWorkflow[workflow.Name] = tablet.Keyspace - timeUpdated := time.Unix(timeUpdatedSeconds, 0) - vreplicationLag := time.Since(timeUpdated) - // MaxVReplicationLag represents the time since we last processed any event // in the workflow. + timeUpdated := time.Unix(timeUpdatedSeconds, 0) + vreplicationLag := time.Since(timeUpdated) if currentMaxLag, ok := maxVReplicationLagByWorkflow[workflow.Name]; ok { if vreplicationLag.Seconds() > currentMaxLag { maxVReplicationLagByWorkflow[workflow.Name] = vreplicationLag.Seconds() @@ -701,32 +700,18 @@ func (s *Server) GetWorkflows(ctx context.Context, req *vtctldatapb.GetWorkflows maxVReplicationLagByWorkflow[workflow.Name] = vreplicationLag.Seconds() } - // MaxVReplicationTransactionLag estimates the actual statement processing lag - // between the source and the target. If we are still processing source events it - // is the difference b/w current time and the timestamp of the last event. If - // heartbeats are more recent than the last event, then the lag is the time since - // the last heartbeat as there can be an actual event immediately after the - // heartbeat, but which has not yet been processed on the target. - // We don't allow switching during the copy phase, so in that case we just return - // a large lag. All timestamps are in seconds since epoch. + // MaxVReplicationTransactionLag estimates the max statement processing lag + // between the source and the target across all of the workflow streams. if _, ok := maxVReplicationTransactionLagByWorkflow[workflow.Name]; !ok { maxVReplicationTransactionLagByWorkflow[workflow.Name] = 0 } - lastTransactionTime := transactionTimeSeconds - lastHeartbeatTime := timeHeartbeat - if stream.State == binlogdatapb.VReplicationWorkflowState_Copying.String() { - maxVReplicationTransactionLagByWorkflow[workflow.Name] = math.MaxInt64 - } else { - if lastTransactionTime == 0 /* no new events after copy */ || - lastHeartbeatTime > lastTransactionTime /* no recent transactions, so all caught up */ { - - lastTransactionTime = lastHeartbeatTime - } - now := time.Now().Unix() /* seconds since epoch */ - transactionReplicationLag := float64(now - lastTransactionTime) - if transactionReplicationLag > maxVReplicationTransactionLagByWorkflow[workflow.Name] { - maxVReplicationTransactionLagByWorkflow[workflow.Name] = transactionReplicationLag - } + heartbeatTimestamp := &vttimepb.Time{ + Seconds: timeHeartbeat, + } + transactionReplicationLag := getVReplicationTrxLag(stream.TransactionTimestamp, stream.TimeUpdated, heartbeatTimestamp, + binlogdatapb.VReplicationWorkflowState(binlogdatapb.VReplicationWorkflowState_value[stream.State])) + if transactionReplicationLag > maxVReplicationTransactionLagByWorkflow[workflow.Name] { + maxVReplicationTransactionLagByWorkflow[workflow.Name] = transactionReplicationLag } return nil @@ -3248,24 +3233,6 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit return 0, sw.logs(), err } -<<<<<<< HEAD -======= - // We stop writes on the source before stopping the source streams so that the catchup time - // is lessened and other workflows that we have to migrate such as intra-keyspace materialize - // workflows also have a chance to catch up as well because those are internally generated - // GTIDs within the shards we're switching traffic away from. - // For intra-keyspace materialization streams that we migrate where the source and target are - // the keyspace being resharded, we wait for those to catchup in the stopStreams path before - // we actually stop them. - ts.Logger().Infof("Stopping source writes") - if err := sw.stopSourceWrites(ctx); err != nil { - if cerr := sw.cancelMigration(ctx, sm); cerr != nil { - err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr) - } - return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err) - } - ->>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616)) ts.Logger().Infof("Stopping streams") sourceWorkflows, err = sw.stopStreams(ctx, sm) if err != nil { @@ -3274,21 +3241,18 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit ts.Logger().Errorf("stream in stopStreams: key %s shard %s stream %+v", key, stream.BinlogSource.Shard, stream.BinlogSource) } } -<<<<<<< HEAD - sw.cancelMigration(ctx, sm) - return handleError("failed to stop the workflow streams", err) + if cerr := sw.cancelMigration(ctx, sm); cerr != nil { + err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr) + } + return handleError(fmt.Sprintf("failed to stop the workflow streams in the %s keyspace", ts.SourceKeyspaceName()), err) } ts.Logger().Infof("Stopping source writes") if err := sw.stopSourceWrites(ctx); err != nil { - sw.cancelMigration(ctx, sm) - return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err) -======= if cerr := sw.cancelMigration(ctx, sm); cerr != nil { err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr) } - return handleError(fmt.Sprintf("failed to stop the workflow streams in the %s keyspace", ts.SourceKeyspaceName()), err) ->>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616)) + return handleError(fmt.Sprintf("failed to stop writes in the %s keyspace", ts.SourceKeyspaceName()), err) } if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { @@ -3309,15 +3273,10 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit } ts.Logger().Infof("Waiting for streams to catchup") -<<<<<<< HEAD if err := sw.waitForCatchup(ctx, timeout); err != nil { - sw.cancelMigration(ctx, sm) -======= - if err := sw.waitForCatchup(ctx, waitTimeout); err != nil { if cerr := sw.cancelMigration(ctx, sm); cerr != nil { err = vterrors.Errorf(vtrpcpb.Code_CANCELED, "%v\n\n%v", err, cerr) } ->>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616)) return handleError("failed to sync up replication between the source and target", err) } @@ -3942,3 +3901,41 @@ func (s *Server) MigrateCreate(ctx context.Context, req *vtctldatapb.MigrateCrea } return s.moveTablesCreate(ctx, moveTablesCreateRequest, binlogdatapb.VReplicationWorkflowType_Migrate) } + +// getVReplicationTrxLag estimates the actual statement processing lag between the +// source and the target. If we are still processing source events it is the +// difference between current time and the timestamp of the last event. If +// heartbeats are more recent than the last event, then the lag is the time since +// the last heartbeat as there can be an actual event immediately after the +// heartbeat, but which has not yet been processed on the target. We don't allow +// switching during the copy phase, so in that case we just return a large lag. +// All timestamps are in seconds since epoch. +func getVReplicationTrxLag(trxTs, updatedTs, heartbeatTs *vttimepb.Time, state binlogdatapb.VReplicationWorkflowState) float64 { + if state == binlogdatapb.VReplicationWorkflowState_Copying { + return math.MaxInt64 + } + if trxTs == nil { + trxTs = &vttimepb.Time{} + } + lastTransactionTime := trxTs.Seconds + if updatedTs == nil { + updatedTs = &vttimepb.Time{} + } + lastUpdatedTime := updatedTs.Seconds + if heartbeatTs == nil { + heartbeatTs = &vttimepb.Time{} + } + lastHeartbeatTime := heartbeatTs.Seconds + // We do NOT update the heartbeat timestamp when we are regularly updating the + // position as we replicate transactions (GTIDs). + // When we DO record a heartbeat, we set the updated time to the same value. + // When recording that we are throttled, we update the updated time but NOT + // the heartbeat time. + if lastTransactionTime == 0 /* No replicated events after copy */ || + (lastUpdatedTime == lastHeartbeatTime && /* The last update was from a heartbeat */ + lastUpdatedTime > lastTransactionTime /* No recent transactions, only heartbeats, so all caught up */) { + lastTransactionTime = lastUpdatedTime + } + now := time.Now().Unix() // Seconds since epoch + return float64(now - lastTransactionTime) +} diff --git a/go/vt/vtctl/workflow/switcher_interface.go b/go/vt/vtctl/workflow/switcher_interface.go index 970f12ca4b2..9f73fd45ad6 100644 --- a/go/vt/vtctl/workflow/switcher_interface.go +++ b/go/vt/vtctl/workflow/switcher_interface.go @@ -24,13 +24,8 @@ import ( ) type iswitcher interface { -<<<<<<< HEAD lockKeyspace(ctx context.Context, keyspace, action string) (context.Context, func(*error), error) - cancelMigration(ctx context.Context, sm *StreamMigrator) -======= - lockKeyspace(ctx context.Context, keyspace, action string, opts ...topo.LockOption) (context.Context, func(*error), error) cancelMigration(ctx context.Context, sm *StreamMigrator) error ->>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616)) stopStreams(ctx context.Context, sm *StreamMigrator) ([]string, error) stopSourceWrites(ctx context.Context) error waitForCatchup(ctx context.Context, filteredReplicationWaitTime time.Duration) error diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index c5e1241e388..79a0492750b 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -681,11 +681,7 @@ func (ts *trafficSwitcher) changeShardsAccess(ctx context.Context, keyspace stri func (ts *trafficSwitcher) allowTargetWrites(ctx context.Context) error { if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { -<<<<<<< HEAD return ts.allowTableTargetWrites(ctx) -======= - return ts.switchDeniedTables(ctx, false) ->>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616)) } return ts.changeShardsAccess(ctx, ts.TargetKeyspaceName(), ts.TargetShards(), allowWrites) } @@ -952,11 +948,7 @@ func (ts *trafficSwitcher) waitForCatchup(ctx context.Context, filteredReplicati func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error { var err error if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { -<<<<<<< HEAD err = ts.changeTableSourceWrites(ctx, disallowWrites) -======= - err = ts.switchDeniedTables(ctx, false) ->>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616)) } else { err = ts.changeShardsAccess(ctx, ts.SourceKeyspaceName(), ts.SourceShards(), disallowWrites) } @@ -976,52 +968,11 @@ func (ts *trafficSwitcher) stopSourceWrites(ctx context.Context) error { }) } -<<<<<<< HEAD func (ts *trafficSwitcher) changeTableSourceWrites(ctx context.Context, access accessType) error { err := ts.ForAllSources(func(source *MigrationSource) error { if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.SourceKeyspaceName(), source.GetShard().ShardName(), func(si *topo.ShardInfo) error { return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, access == allowWrites /* remove */, ts.Tables()) }); err != nil { -======= -// switchDeniedTables switches the denied tables rules for the traffic switch. -// They are added on the source side and removed on the target side. -// If backward is true, then we swap this logic, removing on the source side -// and adding on the target side. You would want to do that e.g. when canceling -// a failed (and currently partial) traffic switch as we may have already -// switched the denied tables entries and in any event we need to go back to -// the original state. -func (ts *trafficSwitcher) switchDeniedTables(ctx context.Context, backward bool) error { - if ts.MigrationType() != binlogdatapb.MigrationType_TABLES { - return nil - } - - rmsource, rmtarget := false, true - if backward { - rmsource, rmtarget = true, false - } - - egrp, ectx := errgroup.WithContext(ctx) - egrp.Go(func() error { - return ts.ForAllSources(func(source *MigrationSource) error { - if _, err := ts.TopoServer().UpdateShardFields(ctx, ts.SourceKeyspaceName(), source.GetShard().ShardName(), func(si *topo.ShardInfo) error { - return si.UpdateDeniedTables(ectx, topodatapb.TabletType_PRIMARY, nil, rmsource, ts.Tables()) - }); err != nil { - return err - } - rtbsCtx, cancel := context.WithTimeout(ectx, shardTabletRefreshTimeout) - defer cancel() - isPartial, partialDetails, err := topotools.RefreshTabletsByShard(rtbsCtx, ts.TopoServer(), ts.TabletManagerClient(), source.GetShard(), nil, ts.Logger()) - if isPartial { - msg := fmt.Sprintf("failed to successfully refresh all tablets in the %s/%s source shard (%v):\n %v", - source.GetShard().Keyspace(), source.GetShard().ShardName(), err, partialDetails) - if ts.force { - log.Warning(msg) - return nil - } else { - return errors.New(msg) - } - } ->>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616)) return err } rtbsCtx, cancel := context.WithTimeout(ctx, shardTabletRefreshTimeout) @@ -1033,36 +984,8 @@ func (ts *trafficSwitcher) switchDeniedTables(ctx context.Context, backward bool } return err }) -<<<<<<< HEAD if err != nil { log.Warningf("Error in changeTableSourceWrites: %s", err) -======= - egrp.Go(func() error { - return ts.ForAllTargets(func(target *MigrationTarget) error { - if _, err := ts.TopoServer().UpdateShardFields(ectx, ts.TargetKeyspaceName(), target.GetShard().ShardName(), func(si *topo.ShardInfo) error { - return si.UpdateDeniedTables(ctx, topodatapb.TabletType_PRIMARY, nil, rmtarget, ts.Tables()) - }); err != nil { - return err - } - rtbsCtx, cancel := context.WithTimeout(ectx, shardTabletRefreshTimeout) - defer cancel() - isPartial, partialDetails, err := topotools.RefreshTabletsByShard(rtbsCtx, ts.TopoServer(), ts.TabletManagerClient(), target.GetShard(), nil, ts.Logger()) - if isPartial { - msg := fmt.Sprintf("failed to successfully refresh all tablets in the %s/%s target shard (%v):\n %v", - target.GetShard().Keyspace(), target.GetShard().ShardName(), err, partialDetails) - if ts.force { - log.Warning(msg) - return nil - } else { - return errors.New(msg) - } - } - return err - }) - }) - if err := egrp.Wait(); err != nil { - ts.Logger().Warningf("Error in switchDeniedTables: %s", err) ->>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616)) return err } // Note that the denied tables, which are being updated in this method, are not part of the SrvVSchema in the topo. @@ -1094,11 +1017,7 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat defer cmCancel() if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { -<<<<<<< HEAD err = ts.changeTableSourceWrites(cmCtx, allowWrites) -======= - err = ts.switchDeniedTables(cmCtx, true /* revert */) ->>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616)) } else { err = ts.changeShardsAccess(cmCtx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites) } @@ -1123,15 +1042,9 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat ts.Logger().Errorf("Cancel migration failed: could not restart vreplication: %v", err) } -<<<<<<< HEAD - err = ts.deleteReverseVReplication(cmCtx) - if err != nil { - ts.Logger().Errorf("Cancel migration failed: could not delete revers vreplication entries: %v", err) -======= if err := ts.deleteReverseVReplication(cmCtx); err != nil { cancelErrs.RecordError(fmt.Errorf("could not delete reverse vreplication streams: %v", err)) ts.Logger().Errorf("Cancel migration failed: could not delete reverse vreplication streams: %v", err) ->>>>>>> 39a0ddde8f (VReplication: Address SwitchTraffic bugs around replication lag and cancel on error (#17616)) } if cancelErrs.HasErrors() { diff --git a/go/vt/vtctl/workflow/workflows.go b/go/vt/vtctl/workflow/workflows.go deleted file mode 100644 index a1b4393f2c0..00000000000 --- a/go/vt/vtctl/workflow/workflows.go +++ /dev/null @@ -1,686 +0,0 @@ -/* -Copyright 2024 The Vitess Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -/* -This file provides functions for fetching and retrieving information about VReplication workflows - -At the moment it is used by the `GetWorkflows` function in `server.go and includes functionality to -get the following: -- Fetch workflows by shard -- Fetch copy states by shard stream -- Build workflows with metadata -- Fetch stream logs -*/ - -package workflow - -import ( - "context" - "encoding/json" - "fmt" - "math" - "sort" - "strings" - "sync" - "time" - - "golang.org/x/exp/maps" - "golang.org/x/sync/errgroup" - - "vitess.io/vitess/go/sets" - "vitess.io/vitess/go/sqltypes" - "vitess.io/vitess/go/trace" - "vitess.io/vitess/go/vt/binlog/binlogplayer" - "vitess.io/vitess/go/vt/logutil" - "vitess.io/vitess/go/vt/sqlparser" - "vitess.io/vitess/go/vt/topo" - "vitess.io/vitess/go/vt/topo/topoproto" - "vitess.io/vitess/go/vt/vtctl/workflow/common" - "vitess.io/vitess/go/vt/vtctl/workflow/vexec" - "vitess.io/vitess/go/vt/vterrors" - "vitess.io/vitess/go/vt/vttablet/tmclient" - - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" - tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" - vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" - vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" - vttimepb "vitess.io/vitess/go/vt/proto/vttime" -) - -// workflowFetcher is responsible for fetching and retrieving information -// about VReplication workflows. -type workflowFetcher struct { - ts *topo.Server - tmc tmclient.TabletManagerClient - - logger logutil.Logger - parser *sqlparser.Parser -} - -type workflowMetadata struct { - sourceKeyspace string - sourceShards sets.Set[string] - targetKeyspace string - targetShards sets.Set[string] - maxVReplicationLag float64 - maxVReplicationTransactionLag float64 -} - -var vrepLogQuery = strings.TrimSpace(` -SELECT - id, - vrepl_id, - type, - state, - message, - created_at, - updated_at, - count -FROM - _vt.vreplication_log -WHERE vrepl_id IN %a -ORDER BY - vrepl_id ASC, - id ASC -`) - -func (wf *workflowFetcher) fetchWorkflowsByShard( - ctx context.Context, - req *vtctldatapb.GetWorkflowsRequest, -) (map[*topo.TabletInfo]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, error) { - readReq := &tabletmanagerdatapb.ReadVReplicationWorkflowsRequest{} - if req.Workflow != "" { - readReq.IncludeWorkflows = []string{req.Workflow} - } - if req.ActiveOnly { - readReq.ExcludeStates = []binlogdatapb.VReplicationWorkflowState{binlogdatapb.VReplicationWorkflowState_Stopped} - } - - m := sync.Mutex{} - - shards, err := common.GetShards(ctx, wf.ts, req.Keyspace, req.Shards) - if err != nil { - return nil, err - } - - results := make(map[*topo.TabletInfo]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, len(shards)) - - err = wf.forAllShards(ctx, req.Keyspace, shards, func(ctx context.Context, si *topo.ShardInfo) error { - primary, err := wf.ts.GetTablet(ctx, si.PrimaryAlias) - if err != nil { - return err - } - if primary == nil { - return fmt.Errorf("%w %s/%s: tablet %v not found", vexec.ErrNoShardPrimary, req.Keyspace, si.ShardName(), topoproto.TabletAliasString(si.PrimaryAlias)) - } - // Clone the request so that we can set the correct DB name for tablet. - req := readReq.CloneVT() - wres, err := wf.tmc.ReadVReplicationWorkflows(ctx, primary.Tablet, req) - if err != nil { - return err - } - m.Lock() - defer m.Unlock() - results[primary] = wres - return nil - }) - if err != nil { - return nil, err - } - - return results, nil -} - -func (wf *workflowFetcher) fetchCopyStatesByShardStream( - ctx context.Context, - workflowsByShard map[*topo.TabletInfo]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, -) (map[string][]*vtctldatapb.Workflow_Stream_CopyState, error) { - m := sync.Mutex{} - - copyStatesByShardStreamId := make(map[string][]*vtctldatapb.Workflow_Stream_CopyState, len(workflowsByShard)) - - fetchCopyStates := func(ctx context.Context, tablet *topo.TabletInfo, streamIds []int32) error { - span, ctx := trace.NewSpan(ctx, "workflowFetcher.workflow.fetchCopyStates") - defer span.Finish() - - span.Annotate("shard", tablet.Shard) - span.Annotate("tablet_alias", tablet.AliasString()) - - copyStates, err := wf.getWorkflowCopyStates(ctx, tablet, streamIds) - if err != nil { - return err - } - - m.Lock() - defer m.Unlock() - - for _, copyState := range copyStates { - shardStreamId := fmt.Sprintf("%s/%d", tablet.Shard, copyState.StreamId) - copyStatesByShardStreamId[shardStreamId] = append( - copyStatesByShardStreamId[shardStreamId], - copyState, - ) - } - - return nil - } - - fetchCopyStatesEg, fetchCopyStatesCtx := errgroup.WithContext(ctx) - for tablet, result := range workflowsByShard { - streamIds := make([]int32, 0, len(result.Workflows)) - for _, wf := range result.Workflows { - for _, stream := range wf.Streams { - streamIds = append(streamIds, stream.Id) - } - } - - if len(streamIds) == 0 { - continue - } - - fetchCopyStatesEg.Go(func() error { - return fetchCopyStates(fetchCopyStatesCtx, tablet, streamIds) - }) - } - if err := fetchCopyStatesEg.Wait(); err != nil { - return nil, err - } - - return copyStatesByShardStreamId, nil -} - -func (wf *workflowFetcher) getWorkflowCopyStates(ctx context.Context, tablet *topo.TabletInfo, streamIds []int32) ([]*vtctldatapb.Workflow_Stream_CopyState, error) { - span, ctx := trace.NewSpan(ctx, "workflowFetcher.workflow.getWorkflowCopyStates") - defer span.Finish() - - span.Annotate("keyspace", tablet.Keyspace) - span.Annotate("shard", tablet.Shard) - span.Annotate("tablet_alias", tablet.AliasString()) - span.Annotate("stream_ids", fmt.Sprintf("%#v", streamIds)) - - idsBV, err := sqltypes.BuildBindVariable(streamIds) - if err != nil { - return nil, err - } - query, err := sqlparser.ParseAndBind("select vrepl_id, table_name, lastpk from _vt.copy_state where vrepl_id in %a and id in (select max(id) from _vt.copy_state where vrepl_id in %a group by vrepl_id, table_name)", - idsBV, idsBV) - if err != nil { - return nil, err - } - qr, err := wf.tmc.VReplicationExec(ctx, tablet.Tablet, query) - if err != nil { - return nil, err - } - - result := sqltypes.Proto3ToResult(qr) - if result == nil { - return nil, nil - } - - copyStates := make([]*vtctldatapb.Workflow_Stream_CopyState, len(result.Rows)) - for i, row := range result.Named().Rows { - streamId, err := row["vrepl_id"].ToInt64() - if err != nil { - return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to cast vrepl_id to int64: %v", err) - } - // These string fields are technically varbinary, but this is close enough. - copyStates[i] = &vtctldatapb.Workflow_Stream_CopyState{ - StreamId: streamId, - Table: row["table_name"].ToString(), - LastPk: row["lastpk"].ToString(), - } - } - - return copyStates, nil -} - -func (wf *workflowFetcher) buildWorkflows( - ctx context.Context, - results map[*topo.TabletInfo]*tabletmanagerdatapb.ReadVReplicationWorkflowsResponse, - copyStatesByShardStreamId map[string][]*vtctldatapb.Workflow_Stream_CopyState, - req *vtctldatapb.GetWorkflowsRequest, -) ([]*vtctldatapb.Workflow, error) { - workflowsMap := make(map[string]*vtctldatapb.Workflow, len(results)) - workflowMetadataMap := make(map[string]*workflowMetadata, len(results)) - - for tablet, result := range results { - // In the old implementation, we knew we had at most one (0 <= N <= 1) - // workflow for each shard primary we queried. There might be multiple - // rows (streams) comprising that workflow, so we would aggregate the - // rows for a given primary into a single value ("the workflow", - // ReplicationStatusResult in the old types). - // - // In this version, we have many (N >= 0) workflows for each shard - // primary we queried, so we need to determine if each row corresponds - // to a workflow we're already aggregating, or if it's a workflow we - // haven't seen yet for that shard primary. We use the workflow name to - // dedupe for this. - for _, wfres := range result.Workflows { - workflowName := wfres.Workflow - workflow, ok := workflowsMap[workflowName] - if !ok { - workflow = &vtctldatapb.Workflow{ - Name: workflowName, - ShardStreams: map[string]*vtctldatapb.Workflow_ShardStream{}, - } - - workflowsMap[workflowName] = workflow - workflowMetadataMap[workflowName] = &workflowMetadata{ - sourceShards: sets.New[string](), - targetShards: sets.New[string](), - } - } - - metadata := workflowMetadataMap[workflowName] - err := wf.scanWorkflow(ctx, workflow, wfres, tablet, metadata, copyStatesByShardStreamId, req.Keyspace) - if err != nil { - return nil, err - } - } - } - - for name, workflow := range workflowsMap { - meta := workflowMetadataMap[name] - updateWorkflowWithMetadata(workflow, meta) - - // Sort shard streams by stream_id ASC, to support an optimization - // in fetchStreamLogs below. - for _, shardStreams := range workflow.ShardStreams { - sort.Slice(shardStreams.Streams, func(i, j int) bool { - return shardStreams.Streams[i].Id < shardStreams.Streams[j].Id - }) - } - } - - if req.IncludeLogs { - var fetchLogsWG sync.WaitGroup - - for _, workflow := range workflowsMap { - // Fetch logs for all streams associated with this workflow in the background. - fetchLogsWG.Add(1) - go func(ctx context.Context, workflow *vtctldatapb.Workflow) { - defer fetchLogsWG.Done() - wf.fetchStreamLogs(ctx, req.Keyspace, workflow) - }(ctx, workflow) - } - - // Wait for all the log fetchers to finish. - fetchLogsWG.Wait() - } - - return maps.Values(workflowsMap), nil -} - -func (wf *workflowFetcher) scanWorkflow( - ctx context.Context, - workflow *vtctldatapb.Workflow, - res *tabletmanagerdatapb.ReadVReplicationWorkflowResponse, - tablet *topo.TabletInfo, - meta *workflowMetadata, - copyStatesByShardStreamId map[string][]*vtctldatapb.Workflow_Stream_CopyState, - keyspace string, -) error { - shardStreamKey := fmt.Sprintf("%s/%s", tablet.Shard, tablet.AliasString()) - shardStream, ok := workflow.ShardStreams[shardStreamKey] - if !ok { - ctx, cancel := context.WithTimeout(ctx, topo.RemoteOperationTimeout) - defer cancel() - - si, err := wf.ts.GetShard(ctx, keyspace, tablet.Shard) - if err != nil { - return err - } - - shardStream = &vtctldatapb.Workflow_ShardStream{ - Streams: nil, - TabletControls: si.TabletControls, - IsPrimaryServing: si.IsPrimaryServing, - } - - workflow.ShardStreams[shardStreamKey] = shardStream - } - - for _, rstream := range res.Streams { - // The value in the pos column can be compressed and thus not - // have a valid GTID consisting of valid UTF-8 characters so we - // have to decode it so that it's properly decompressed first - // when needed. - pos := rstream.Pos - if pos != "" { - mpos, err := binlogplayer.DecodePosition(pos) - if err != nil { - return err - } - pos = mpos.String() - } - - cells := strings.Split(res.Cells, ",") - for i := range cells { - cells[i] = strings.TrimSpace(cells[i]) - } - options := res.Options - if options != "" { - if err := json.Unmarshal([]byte(options), &workflow.Options); err != nil { - return err - } - } - - stream := &vtctldatapb.Workflow_Stream{ - Id: int64(rstream.Id), - Shard: tablet.Shard, - Tablet: tablet.Alias, - BinlogSource: rstream.Bls, - Position: pos, - StopPosition: rstream.StopPos, - State: rstream.State.String(), - DbName: tablet.DbName(), - TabletTypes: res.TabletTypes, - TabletSelectionPreference: res.TabletSelectionPreference, - Cells: cells, - TransactionTimestamp: rstream.TransactionTimestamp, - TimeUpdated: rstream.TimeUpdated, - Message: rstream.Message, - Tags: strings.Split(res.Tags, ","), - RowsCopied: rstream.RowsCopied, - ThrottlerStatus: &vtctldatapb.Workflow_Stream_ThrottlerStatus{ - ComponentThrottled: rstream.ComponentThrottled, - TimeThrottled: rstream.TimeThrottled, - }, - } - - // Merge in copy states, which we've already fetched. - shardStreamId := fmt.Sprintf("%s/%d", tablet.Shard, stream.Id) - if copyStates, ok := copyStatesByShardStreamId[shardStreamId]; ok { - stream.CopyStates = copyStates - } - - if rstream.TimeUpdated == nil { - rstream.TimeUpdated = &vttimepb.Time{} - } - - stream.State = getStreamState(stream, rstream) - - shardStream.Streams = append(shardStream.Streams, stream) - - meta.sourceShards.Insert(stream.BinlogSource.Shard) - meta.targetShards.Insert(tablet.Shard) - - if meta.sourceKeyspace != "" && meta.sourceKeyspace != stream.BinlogSource.Keyspace { - return vterrors.Wrapf(ErrMultipleSourceKeyspaces, "workflow = %v, ks1 = %v, ks2 = %v", workflow.Name, meta.sourceKeyspace, stream.BinlogSource.Keyspace) - } - - meta.sourceKeyspace = stream.BinlogSource.Keyspace - - if meta.targetKeyspace != "" && meta.targetKeyspace != tablet.Keyspace { - return vterrors.Wrapf(ErrMultipleTargetKeyspaces, "workflow = %v, ks1 = %v, ks2 = %v", workflow.Name, meta.targetKeyspace, tablet.Keyspace) - } - - meta.targetKeyspace = tablet.Keyspace - - if stream.TimeUpdated == nil { - stream.TimeUpdated = &vttimepb.Time{} - } - timeUpdated := time.Unix(stream.TimeUpdated.Seconds, 0) - vreplicationLag := time.Since(timeUpdated) - - // MaxVReplicationLag represents the time since we last processed any event - // in the workflow. - if vreplicationLag.Seconds() > meta.maxVReplicationLag { - meta.maxVReplicationLag = vreplicationLag.Seconds() - } - - workflow.WorkflowType = res.WorkflowType.String() - workflow.WorkflowSubType = res.WorkflowSubType.String() - workflow.DeferSecondaryKeys = res.DeferSecondaryKeys - - // MaxVReplicationTransactionLag estimates the max statement processing lag - // between the source and the target across all of the workflow streams. - transactionReplicationLag := getVReplicationTrxLag(rstream.TransactionTimestamp, rstream.TimeUpdated, rstream.TimeHeartbeat, rstream.State) - if transactionReplicationLag > meta.maxVReplicationTransactionLag { - meta.maxVReplicationTransactionLag = transactionReplicationLag - } - } - - return nil -} - -func updateWorkflowWithMetadata(workflow *vtctldatapb.Workflow, meta *workflowMetadata) { - workflow.Source = &vtctldatapb.Workflow_ReplicationLocation{ - Keyspace: meta.sourceKeyspace, - Shards: sets.List(meta.sourceShards), - } - - workflow.Target = &vtctldatapb.Workflow_ReplicationLocation{ - Keyspace: meta.targetKeyspace, - Shards: sets.List(meta.targetShards), - } - - workflow.MaxVReplicationLag = int64(meta.maxVReplicationLag) - workflow.MaxVReplicationTransactionLag = int64(meta.maxVReplicationTransactionLag) -} - -func (wf *workflowFetcher) fetchStreamLogs(ctx context.Context, keyspace string, workflow *vtctldatapb.Workflow) { - span, ctx := trace.NewSpan(ctx, "workflowFetcher.workflow.fetchStreamLogs") - defer span.Finish() - - span.Annotate("keyspace", keyspace) - span.Annotate("workflow", workflow.Name) - - vreplIDs := make([]int64, 0, len(workflow.ShardStreams)) - for _, shardStream := range maps.Values(workflow.ShardStreams) { - for _, stream := range shardStream.Streams { - vreplIDs = append(vreplIDs, stream.Id) - } - } - idsBV, err := sqltypes.BuildBindVariable(vreplIDs) - if err != nil { - return - } - - query, err := sqlparser.ParseAndBind(vrepLogQuery, idsBV) - if err != nil { - return - } - - vx := vexec.NewVExec(keyspace, workflow.Name, wf.ts, wf.tmc, wf.parser) - results, err := vx.QueryContext(ctx, query) - if err != nil { - // Note that we do not return here. If there are any query results - // in the map (i.e. some tablets returned successfully), we will - // still try to read log rows from them on a best-effort basis. But, - // we will also pre-emptively record the top-level fetch error on - // every stream in every shard in the workflow. Further processing - // below may override the error message for certain streams. - for _, streams := range workflow.ShardStreams { - for _, stream := range streams.Streams { - stream.LogFetchError = err.Error() - } - } - } - - for target, p3qr := range results { - qr := sqltypes.Proto3ToResult(p3qr) - shardStreamKey := fmt.Sprintf("%s/%s", target.Shard, target.AliasString()) - - ss, ok := workflow.ShardStreams[shardStreamKey] - if !ok || ss == nil { - continue - } - - streams := ss.Streams - streamIdx := 0 - markErrors := func(err error) { - if streamIdx >= len(streams) { - return - } - - streams[streamIdx].LogFetchError = err.Error() - } - - for _, row := range qr.Named().Rows { - id, err := row["id"].ToCastInt64() - if err != nil { - markErrors(err) - continue - } - - streamID, err := row["vrepl_id"].ToCastInt64() - if err != nil { - markErrors(err) - continue - } - - typ := row["type"].ToString() - state := row["state"].ToString() - message := row["message"].ToString() - - createdAt, err := time.Parse("2006-01-02 15:04:05", row["created_at"].ToString()) - if err != nil { - markErrors(err) - continue - } - - updatedAt, err := time.Parse("2006-01-02 15:04:05", row["updated_at"].ToString()) - if err != nil { - markErrors(err) - continue - } - - count, err := row["count"].ToCastInt64() - if err != nil { - markErrors(err) - continue - } - - streamLog := &vtctldatapb.Workflow_Stream_Log{ - Id: id, - StreamId: streamID, - Type: typ, - State: state, - CreatedAt: &vttimepb.Time{ - Seconds: createdAt.Unix(), - }, - UpdatedAt: &vttimepb.Time{ - Seconds: updatedAt.Unix(), - }, - Message: message, - Count: count, - } - - // Earlier, in buildWorkflows, we sorted each ShardStreams - // slice by ascending id, and our _vt.vreplication_log query - // ordered by (stream_id ASC, id ASC), so we can walk the - // streams in index order in O(n) amortized over all the rows - // for this tablet. - for streamIdx < len(streams) { - stream := streams[streamIdx] - if stream.Id < streamLog.StreamId { - streamIdx++ - continue - } - - if stream.Id > streamLog.StreamId { - wf.logger.Warningf("Found stream log for nonexistent stream: %+v", streamLog) - // This can happen on manual/failed workflow cleanup so move to the next log. - break - } - - // stream.Id == streamLog.StreamId - stream.Logs = append(stream.Logs, streamLog) - break - } - } - } -} - -func (wf *workflowFetcher) forAllShards( - ctx context.Context, - keyspace string, - shards []string, - f func(ctx context.Context, shard *topo.ShardInfo) error, -) error { - eg, egCtx := errgroup.WithContext(ctx) - for _, shard := range shards { - eg.Go(func() error { - si, err := wf.ts.GetShard(ctx, keyspace, shard) - if err != nil { - return err - } - if si.PrimaryAlias == nil { - return fmt.Errorf("%w %s/%s", vexec.ErrNoShardPrimary, keyspace, shard) - } - - if err := f(egCtx, si); err != nil { - return err - } - return nil - }) - } - if err := eg.Wait(); err != nil { - return err - } - return nil -} - -func getStreamState(stream *vtctldatapb.Workflow_Stream, rstream *tabletmanagerdatapb.ReadVReplicationWorkflowResponse_Stream) string { - switch { - case strings.Contains(strings.ToLower(stream.Message), "error"): - return binlogdatapb.VReplicationWorkflowState_Error.String() - case stream.State == binlogdatapb.VReplicationWorkflowState_Running.String() && len(stream.CopyStates) > 0: - return binlogdatapb.VReplicationWorkflowState_Copying.String() - case stream.State == binlogdatapb.VReplicationWorkflowState_Running.String() && int64(time.Now().Second())-rstream.TimeUpdated.Seconds > 10: - return binlogdatapb.VReplicationWorkflowState_Lagging.String() - } - return rstream.State.String() -} - -// getVReplicationTrxLag estimates the actual statement processing lag between the -// source and the target. If we are still processing source events it is the -// difference between current time and the timestamp of the last event. If -// heartbeats are more recent than the last event, then the lag is the time since -// the last heartbeat as there can be an actual event immediately after the -// heartbeat, but which has not yet been processed on the target. We don't allow -// switching during the copy phase, so in that case we just return a large lag. -// All timestamps are in seconds since epoch. -func getVReplicationTrxLag(trxTs, updatedTs, heartbeatTs *vttimepb.Time, state binlogdatapb.VReplicationWorkflowState) float64 { - if state == binlogdatapb.VReplicationWorkflowState_Copying { - return math.MaxInt64 - } - if trxTs == nil { - trxTs = &vttimepb.Time{} - } - lastTransactionTime := trxTs.Seconds - if updatedTs == nil { - updatedTs = &vttimepb.Time{} - } - lastUpdatedTime := updatedTs.Seconds - if heartbeatTs == nil { - heartbeatTs = &vttimepb.Time{} - } - lastHeartbeatTime := heartbeatTs.Seconds - // We do NOT update the heartbeat timestamp when we are regularly updating the - // position as we replicate transactions (GTIDs). - // When we DO record a heartbeat, we set the updated time to the same value. - // When recording that we are throttled, we update the updated time but NOT - // the heartbeat time. - if lastTransactionTime == 0 /* No replicated events after copy */ || - (lastUpdatedTime == lastHeartbeatTime && /* The last update was from a heartbeat */ - lastUpdatedTime > lastTransactionTime /* No recent transactions, only heartbeats, so all caught up */) { - lastTransactionTime = lastUpdatedTime - } - now := time.Now().Unix() // Seconds since epoch - return float64(now - lastTransactionTime) -}