Skip to content

Commit

Permalink
Merge pull request #2789 from actiontech/2028-1
Browse files Browse the repository at this point in the history
modify: backup compatible with other plugins
  • Loading branch information
ColdWaterLW authored Dec 6, 2024
2 parents 15b560b + a2e322e commit 7d916e2
Show file tree
Hide file tree
Showing 5 changed files with 22 additions and 28 deletions.
3 changes: 2 additions & 1 deletion sqle/driver/mysql/audit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ func NewMockInspectWithIsExecutedSQL(e *executor.Executor) *MysqlDriverImpl {
Password: "123456",
DatabaseName: "mysql",
},
Ctx: session.NewMockContext(e),
isConnected: true,
Ctx: session.NewMockContext(e),
cnf: &Config{
DDLOSCMinSize: 16,
DDLGhostMinSize: 16,
Expand Down
10 changes: 5 additions & 5 deletions sqle/driver/mysql/rollback.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,7 +415,7 @@ func (i *MysqlDriverImpl) generateInsertRollbackSql(stmt *ast.InsertStmt) (strin
for n, name := range columnsName {
_, isPk := pkColumnsName[name]
if isPk {
where = append(where, fmt.Sprintf("%s = '%s'", name, util.ExprFormat(value[n])))
where = append(where, fmt.Sprintf("%s = '%s'", name, restore(value[n])))
}
}
if len(where) != len(pkColumnsName) {
Expand All @@ -437,7 +437,7 @@ func (i *MysqlDriverImpl) generateInsertRollbackSql(stmt *ast.InsertStmt) (strin
name := setExpr.Column.Name.String()
_, isPk := pkColumnsName[name]
if isPk {
where = append(where, fmt.Sprintf("%s = '%s'", name, util.ExprFormat(setExpr.Expr)))
where = append(where, fmt.Sprintf("%s = '%s'", name, restore(setExpr.Expr)))
}
}
if len(where) != len(pkColumnsName) {
Expand Down Expand Up @@ -617,7 +617,7 @@ func (i *MysqlDriverImpl) generateUpdateRollbackSql(stmt *ast.UpdateStmt) (strin
colChanged = true
if isPk {
isPkChanged = true
pkValue = util.ExprFormat(l.Expr)
pkValue = restore(l.Expr)
}
}
}
Expand Down Expand Up @@ -702,12 +702,12 @@ func (i *MysqlDriverImpl) generateGetRecordsSql(expr string, tableName *ast.Tabl
recordSql = fmt.Sprintf("%s AS %s", recordSql, tableAlias)
}
if where != nil {
recordSql = fmt.Sprintf("%s WHERE %s", recordSql, util.ExprFormat(where))
recordSql = fmt.Sprintf("%s WHERE %s", recordSql, restore(where))
}
if order != nil {
recordSql = fmt.Sprintf("%s ORDER BY", recordSql)
for _, item := range order.Items {
recordSql = fmt.Sprintf("%s %s", recordSql, util.ExprFormat(item.Expr))
recordSql = fmt.Sprintf("%s %s", recordSql, restore(item.Expr))
if item.Desc {
recordSql = fmt.Sprintf("%s DESC", recordSql)
}
Expand Down
8 changes: 2 additions & 6 deletions sqle/server/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,13 @@ package server

import "github.com/actiontech/sqle/sqle/model"

type BackupTask interface {
Backup() error
}

type BackupStrategy string

const (
BackupStrategyNone BackupStrategy = "none" // 不备份(不支持备份、无需备份、选择不备份)
BackupStrategyReverseSql BackupStrategy = "reverse_sql" // 备份为反向SQL
BackupStrategyOriginalRow BackupStrategy = "original_row" // 备份为原始行
BackupStrategyManually BackupStrategy = "manual" // 标记为人工备份
BackupStrategyManually BackupStrategy = "manual" // 标记为人工备份
BackupRowsAffectedLimit int = 1000 // SQL影响行数上限,超过该上限的SQL不进行备份
)

Expand Down Expand Up @@ -63,4 +59,4 @@ func (m backupTaskMap) AddBackupTask(backupTask *model.BackupTask) {
if _, exist := m[backupTask.ExecuteSqlId]; !exist {
m[backupTask.ExecuteSqlId] = backupTask
}
}
}
8 changes: 4 additions & 4 deletions sqle/server/backup_ce.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ func (BackupService) CheckIsDbTypeSupportEnableBackup(dbType string) error {
return nil
}

type BaseBackupTask struct{}
type BackupManager struct{}

func (t BaseBackupTask) Backup() error {
func (t BackupManager) Backup() error {
return nil
}

func initModelBackupTask(p driver.Plugin, task *model.Task, sql *model.ExecuteSQL) *model.BackupTask {
return &model.BackupTask{}
}

func toBackupTask(a driver.Plugin, sql *model.ExecuteSQL) (BackupTask, error) {
return &BaseBackupTask{}, nil
func getBackupManager(p driver.Plugin, sql *model.ExecuteSQL, dbType string) (*BackupManager, error) {
return &BackupManager{}, nil
}

func (BackupService) GetRollbackSqlsMap(taskId uint) (map[uint][]string, error) {
Expand Down
21 changes: 9 additions & 12 deletions sqle/server/sqled.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,8 @@ func (a *action) GetTaskStatus(st *model.Storage) string {
}

func (a *action) execTask() (err error) {
// TODO if enable backup and plugin support backup
if a.task.EnableBackup {
svc := BackupService{}
if svc.CheckCanTaskBackup(a.task) {
err = a.backupAndExecSql()
if err != nil {
return err
Expand Down Expand Up @@ -483,19 +483,16 @@ backupAndExecSql() 备份与执行SQL:
按照顺序,先根据一条SQL备份,再执行该SQL。备份过程中涉及连库查询和保存数据。
*/
func (a *action) backupAndExecSql() error {
svc := BackupService{}
for _, executeSQL := range a.task.ExecuteSQLs {
if svc.CheckCanTaskBackup(a.task) {
backupTask, err := toBackupTask(a.plugin, executeSQL)
if err != nil {
return fmt.Errorf("in backupAndExecSql when convert toBackupTask, err %w , backup task: %v, task: %v", err, executeSQL.BackupTask.ID, a.task.ID)
}
if err = backupTask.Backup(); err != nil {
return fmt.Errorf("in backupAndExecSql when backupTask Backup, err %w, backup task: %v, task: %v", err, executeSQL.BackupTask.ID, a.task.ID)
}
backupMgr, err := getBackupManager(a.plugin, executeSQL, a.task.DBType)
if err != nil {
return fmt.Errorf("in backupAndExecSql when getBackupManager, err %w , task: %v", err, a.task.ID)
}
if err = backupMgr.Backup(); err != nil {
return fmt.Errorf("in backupAndExecSql when backupMgr Backup, err %w, backup manager: %v, task: %v", err, backupMgr, a.task.ID)
}
if err := a.execSQL(executeSQL); err != nil {
return fmt.Errorf("in backupAndExecSql when execSQL %v, err %w, backup task: %v, task: %v", executeSQL, err, executeSQL.BackupTask.ID, a.task.ID)
return fmt.Errorf("in backupAndExecSql when execSQL %v, err %w, backup manager: %v, task: %v", executeSQL, err, backupMgr, a.task.ID)
}
}
return nil
Expand Down

0 comments on commit 7d916e2

Please sign in to comment.