Skip to content
This repository has been archived by the owner on Jul 24, 2024. It is now read-only.

Commit

Permalink
lightning: check and restore pd scheduler even if our task failed (#1336
Browse files Browse the repository at this point in the history
) (#1422)
  • Loading branch information
ti-chi-bot authored Sep 13, 2021
1 parent adff4c1 commit 1eb2415
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 30 deletions.
75 changes: 55 additions & 20 deletions pkg/lightning/restore/meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,10 @@ func (m *dbTableMetaMgr) FinishTable(ctx context.Context) error {
type taskMetaMgr interface {
InitTask(ctx context.Context) error
CheckAndPausePdSchedulers(ctx context.Context) (pdutil.UndoFunc, error)
CheckAndFinishRestore(ctx context.Context) (bool, error)
// CheckAndFinishRestore check task meta and return whether to switch cluster to normal state and clean up the metadata
// Return values: first boolean indicates whether switch back tidb cluster to normal state (restore schedulers, switch tikv to normal)
// the second boolean indicates whether to clean up the metadata in tidb
CheckAndFinishRestore(ctx context.Context, finished bool) (shouldSwitchBack bool, shouldCleanupMeta bool, err error)
Cleanup(ctx context.Context) error
CleanupAllMetas(ctx context.Context) error
}
Expand All @@ -485,6 +488,11 @@ const (
taskMetaStatusSwitchBack
)

const (
taskStateNormal int = iota
taskStateExited
)

func (m taskMetaStatus) String() string {
switch m {
case taskMetaStatusInitial:
Expand Down Expand Up @@ -526,8 +534,8 @@ func (m *dbTaskMetaMgr) InitTask(ctx context.Context) error {
Logger: log.L(),
}
// avoid override existing metadata if the meta is already inserted.
stmt := fmt.Sprintf(`INSERT IGNORE INTO %s (task_id, status) values (?, ?)`, m.tableName)
err := exec.Exec(ctx, "init task meta", stmt, m.taskID, taskMetaStatusInitial.String())
stmt := fmt.Sprintf(`INSERT INTO %s (task_id, status) values (?, ?) ON DUPLICATE KEY UPDATE state = ?`, m.tableName)
err := exec.Exec(ctx, "init task meta", stmt, m.taskID, taskMetaStatusInitial.String(), taskStateNormal)
return errors.Trace(err)
}

Expand All @@ -551,7 +559,7 @@ func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.U
paused := false
var pausedCfg storedCfgs
err = exec.Transact(ctx, "check and pause schedulers", func(ctx context.Context, tx *sql.Tx) error {
query := fmt.Sprintf("SELECT task_id, pd_cfgs, status from %s FOR UPDATE", m.tableName)
query := fmt.Sprintf("SELECT task_id, pd_cfgs, status, state from %s FOR UPDATE", m.tableName)
rows, err := tx.QueryContext(ctx, query)
if err != nil {
return errors.Annotate(err, "fetch task meta failed")
Expand All @@ -566,10 +574,11 @@ func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.U
taskID int64
cfg string
statusValue string
state int
)
var cfgStr string
for rows.Next() {
if err = rows.Scan(&taskID, &cfg, &statusValue); err != nil {
if err = rows.Scan(&taskID, &cfg, &statusValue, &state); err != nil {
return errors.Trace(err)
}
status, err := parseTaskMetaStatus(statusValue)
Expand Down Expand Up @@ -643,10 +652,13 @@ func (m *dbTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.U
}, nil
}

func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context) (bool, error) {
// CheckAndFinishRestore check task meta and return whether to switch cluster to normal state and clean up the metadata
// Return values: first boolean indicates whether switch back tidb cluster to normal state (restore schedulers, switch tikv to normal)
// the second boolean indicates whether to clean up the metadata in tidb
func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context, finished bool) (bool, bool, error) {
conn, err := m.session.Conn(ctx)
if err != nil {
return false, errors.Trace(err)
return false, false, errors.Trace(err)
}
defer conn.Close()
exec := &common.SQLWithRetry{
Expand All @@ -655,12 +667,13 @@ func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context) (bool, error)
}
err = exec.Exec(ctx, "enable pessimistic transaction", "SET SESSION tidb_txn_mode = 'pessimistic';")
if err != nil {
return false, errors.Annotate(err, "enable pessimistic transaction failed")
return false, false, errors.Annotate(err, "enable pessimistic transaction failed")
}

switchBack := true
allFinished := finished
err = exec.Transact(ctx, "check and finish schedulers", func(ctx context.Context, tx *sql.Tx) error {
query := fmt.Sprintf("SELECT task_id, status from %s FOR UPDATE", m.tableName)
query := fmt.Sprintf("SELECT task_id, status, state from %s FOR UPDATE", m.tableName)
rows, err := tx.QueryContext(ctx, query)
if err != nil {
return errors.Annotate(err, "fetch task meta failed")
Expand All @@ -674,10 +687,12 @@ func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context) (bool, error)
var (
taskID int64
statusValue string
state int
)
newStatus := taskMetaStatusSwitchBack

taskStatus := taskMetaStatusInitial
for rows.Next() {
if err = rows.Scan(&taskID, &statusValue); err != nil {
if err = rows.Scan(&taskID, &statusValue, &state); err != nil {
return errors.Trace(err)
}
status, err := parseTaskMetaStatus(statusValue)
Expand All @@ -686,27 +701,47 @@ func (m *dbTaskMetaMgr) CheckAndFinishRestore(ctx context.Context) (bool, error)
}

if taskID == m.taskID {
taskStatus = status
continue
}

if status < taskMetaStatusSwitchSkipped {
newStatus = taskMetaStatusSwitchSkipped
switchBack = false
break
allFinished = false
// check if other task still running
if state == taskStateNormal {
log.L().Info("unfinished task found", zap.Int64("task_id", taskID),
zap.Stringer("status", status))
switchBack = false
}
}
}
if err = rows.Close(); err != nil {
return errors.Trace(err)
}
closed = true

query = fmt.Sprintf("update %s set status = ? where task_id = ?", m.tableName)
_, err = tx.ExecContext(ctx, query, newStatus.String(), m.taskID)
if taskStatus < taskMetaStatusSwitchSkipped {
newStatus := taskMetaStatusSwitchBack
newState := taskStateNormal
if !finished {
newStatus = taskStatus
newState = taskStateExited
} else if !allFinished {
newStatus = taskMetaStatusSwitchSkipped
}

return errors.Trace(err)
query = fmt.Sprintf("update %s set status = ?, state = ? where task_id = ?", m.tableName)
if _, err = tx.ExecContext(ctx, query, newStatus.String(), newState, m.taskID); err != nil {
return errors.Trace(err)
}
}

return nil
})
log.L().Info("check all task finish status", zap.Bool("task_finished", finished),
zap.Bool("all_finished", allFinished), zap.Bool("switch_back", switchBack))

return switchBack, err
return switchBack, allFinished, err
}

func (m *dbTaskMetaMgr) Cleanup(ctx context.Context) error {
Expand Down Expand Up @@ -773,8 +808,8 @@ func (m noopTaskMetaMgr) CheckAndPausePdSchedulers(ctx context.Context) (pdutil.
}, nil
}

func (m noopTaskMetaMgr) CheckAndFinishRestore(ctx context.Context) (bool, error) {
return false, nil
func (m noopTaskMetaMgr) CheckAndFinishRestore(context.Context, bool) (bool, bool, error) {
return false, true, nil
}

func (m noopTaskMetaMgr) Cleanup(ctx context.Context) error {
Expand Down
26 changes: 16 additions & 10 deletions pkg/lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ const (
task_id BIGINT(20) UNSIGNED NOT NULL,
pd_cfgs VARCHAR(2048) NOT NULL DEFAULT '',
status VARCHAR(32) NOT NULL,
state TINYINT(1) NOT NULL DEFAULT 0 COMMENT '0: normal, 1: exited before finish',
PRIMARY KEY (task_id)
);`

Expand Down Expand Up @@ -1163,6 +1164,7 @@ func (rc *Controller) restoreTables(ctx context.Context) error {
// we do not do switch back automatically
cleanupFunc := func() {}
switchBack := false
taskFinished := false
if rc.cfg.TikvImporter.Backend == config.BackendLocal {
// disable some pd schedulers
pdController, err := pdutil.NewPdController(ctx, rc.cfg.TiDB.PdAddr,
Expand All @@ -1183,7 +1185,7 @@ func (rc *Controller) restoreTables(ctx context.Context) error {
if restoreFn != nil {
// use context.Background to make sure this restore function can still be executed even if ctx is canceled
restoreCtx := context.Background()
needSwitchBack, err := mgr.CheckAndFinishRestore(restoreCtx)
needSwitchBack, needCleanup, err := mgr.CheckAndFinishRestore(restoreCtx, taskFinished)
if err != nil {
logTask.Warn("check restore pd schedulers failed", zap.Error(err))
return
Expand All @@ -1193,19 +1195,22 @@ func (rc *Controller) restoreTables(ctx context.Context) error {
if restoreE := restoreFn(restoreCtx); restoreE != nil {
logTask.Warn("failed to restore removed schedulers, you may need to restore them manually", zap.Error(restoreE))
}

logTask.Info("add back PD leader&region schedulers")
// clean up task metas
if cleanupErr := mgr.Cleanup(restoreCtx); cleanupErr != nil {
logTask.Warn("failed to clean task metas, you may need to restore them manually", zap.Error(cleanupErr))
}
// cleanup table meta and schema db if needed.
cleanupFunc = func() {
if e := mgr.CleanupAllMetas(restoreCtx); err != nil {
logTask.Warn("failed to clean table task metas, you may need to restore them manually", zap.Error(e))
if needCleanup {
logTask.Info("cleanup task metas")
if cleanupErr := mgr.Cleanup(restoreCtx); cleanupErr != nil {
logTask.Warn("failed to clean task metas, you may need to restore them manually", zap.Error(cleanupErr))
}
// cleanup table meta and schema db if needed.
cleanupFunc = func() {
if e := mgr.CleanupAllMetas(restoreCtx); err != nil {
logTask.Warn("failed to clean table task metas, you may need to restore them manually", zap.Error(e))
}
}
}
}

logTask.Info("add back PD leader&region schedulers")
}

pdController.Close()
Expand Down Expand Up @@ -1403,6 +1408,7 @@ func (rc *Controller) restoreTables(ctx context.Context) error {
// finishSchedulers()
// cancelFunc(switchBack)
// finishFuncCalled = true
taskFinished = true

close(postProcessTaskChan)
// otherwise, we should run all tasks in the post-process task chan
Expand Down

0 comments on commit 1eb2415

Please sign in to comment.