From f6cb51680fc6586f2601d0e429c242dcfc814e08 Mon Sep 17 00:00:00 2001 From: shaohoukun Date: Mon, 5 Feb 2024 18:21:24 +0800 Subject: [PATCH 1/5] feat(feat-binlog-apply-optimization): feat-binlog-apply-optimization 1. Support for ignoring binlog events that exceed chunk boundary values. 2. Support for binlog merge processing. --- doc/command-line-flags.md | 4 + go/base/context.go | 3 + go/cmd/gh-ost/main.go | 2 + go/logic/applier.go | 331 ++++++++++++++++++++++++++++++++++---- go/logic/applier_test.go | 177 ++++++++++++++++++++ go/logic/inspect.go | 28 ++++ go/logic/migrator.go | 6 +- go/sql/builder.go | 16 +- go/sql/builder_test.go | 26 ++- go/sql/types.go | 43 ++++- 10 files changed, 589 insertions(+), 47 deletions(-) diff --git a/doc/command-line-flags.md b/doc/command-line-flags.md index ac491b268..efac3c23b 100644 --- a/doc/command-line-flags.md +++ b/doc/command-line-flags.md @@ -125,6 +125,10 @@ Why is this behavior configurable? Different workloads have different characteri Noteworthy is that setting `--dml-batch-size` to higher value _does not_ mean `gh-ost` blocks or waits on writes. The batch size is an upper limit on transaction size, not a minimal one. If `gh-ost` doesn't have "enough" events in the pipe, it does not wait on the binary log, it just writes what it already has. This conveniently suggests that if write load is light enough for `gh-ost` to only see a few events in the binary log at a given time, then it is also light enough for `gh-ost` to apply a fraction of the batch size. +### ignore-over-iteration-range-max-binlog +When binlog unique key value is over `MigrationIterationRangeMaxValues`, and less than `MigrationRangeMaxValues`, the binlog will be ignored. Because the data will be synced by copy chunk. +When binlog unique key value is over `MigrationRangeMaxValues` or less than `MigrationIterationRangeMaxValues`, the binlog will be applied. + ### exact-rowcount A `gh-ost` execution need to copy whatever rows you have in your existing table onto the ghost table. This can and often will be, a large number. Exactly what that number is? diff --git a/go/base/context.go b/go/base/context.go index 300ec1201..fd9aae97f 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -194,7 +194,10 @@ type MigrationContext struct { controlReplicasLagResult mysql.ReplicationLagResult TotalRowsCopied int64 TotalDMLEventsApplied int64 + TotalDMLEventsIgnored int64 DMLBatchSize int64 + IgnoreOverIterationRangeMaxBinlog bool + IsMergeDMLEvents bool isThrottled bool throttleReason string throttleReasonHint ThrottleReasonHint diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index 139703077..d0feba2b6 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -108,6 +108,8 @@ func main() { defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking") cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout)") niceRatio := flag.Float64("nice-ratio", 0, "force being 'nice', imply sleep time per chunk time; range: [0.0..100.0]. Example values: 0 is aggressive. 1: for every 1ms spent copying rows, sleep additional 1ms (effectively doubling runtime); 0.7: for every 10ms spend in a rowcopy chunk, spend 7ms sleeping immediately after") + flag.BoolVar(&migrationContext.IgnoreOverIterationRangeMaxBinlog, "ignore-over-iteration-range-max-binlog", false, "When binlog unique key value is over MigrationIterationRangeMaxValues, and less than MigrationRangeMaxValues, the binlog will be ignored. Because the data will be synced by copy chunk") + flag.BoolVar(&migrationContext.IsMergeDMLEvents, "is-merge-dml-event", false, "Merge DML Binlog Event") maxLagMillis := flag.Int64("max-lag-millis", 1500, "replication lag at which to throttle operation") replicationLagQuery := flag.String("replication-lag-query", "", "Deprecated. gh-ost uses an internal, subsecond resolution query") diff --git a/go/logic/applier.go b/go/logic/applier.go index fa374a70f..78eed1ec5 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -27,18 +27,22 @@ const ( ) type dmlBuildResult struct { - query string - args []interface{} - rowsDelta int64 - err error + dml binlog.EventDML + query string + args []interface{} + uniqueValues []interface{} + rowsDelta int64 + err error } -func newDmlBuildResult(query string, args []interface{}, rowsDelta int64, err error) *dmlBuildResult { +func newDmlBuildResult(dml binlog.EventDML, query string, args []interface{}, uniqueValues []interface{}, rowsDelta int64, err error) *dmlBuildResult { return &dmlBuildResult{ - query: query, - args: args, - rowsDelta: rowsDelta, - err: err, + dml: dml, + query: query, + args: args, + uniqueValues: uniqueValues, + rowsDelta: rowsDelta, + err: err, } } @@ -1110,12 +1114,22 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (result case binlog.DeleteDML: { query, uniqueKeyArgs, err := sql.BuildDMLDeleteQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.WhereColumnValues.AbstractValues()) - return append(results, newDmlBuildResult(query, uniqueKeyArgs, -1, err)) + if ignore, err := this.isIgnoreOverMaxChunkRangeEvent(uniqueKeyArgs); err != nil { + return append(results, newDmlBuildResultError(fmt.Errorf("Check isIgnoreOverMaxChunkRangeEvent error: %+v", err))) + } else if ignore { + return results + } + return append(results, newDmlBuildResult(binlog.DeleteDML, query, uniqueKeyArgs, uniqueKeyArgs, -1, err)) } case binlog.InsertDML: { - query, sharedArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, dmlEvent.NewColumnValues.AbstractValues()) - return append(results, newDmlBuildResult(query, sharedArgs, 1, err)) + query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLInsertQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues()) + if ignore, err := this.isIgnoreOverMaxChunkRangeEvent(uniqueKeyArgs); err != nil { + return append(results, newDmlBuildResultError(fmt.Errorf("Check isIgnoreOverMaxChunkRangeEvent error: %+v", err))) + } else if ignore { + return results + } + return append(results, newDmlBuildResult(binlog.InsertDML, query, sharedArgs, uniqueKeyArgs, 1, err)) } case binlog.UpdateDML: { @@ -1127,20 +1141,114 @@ func (this *Applier) buildDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) (result return results } query, sharedArgs, uniqueKeyArgs, err := sql.BuildDMLUpdateQuery(dmlEvent.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns, &this.migrationContext.UniqueKey.Columns, dmlEvent.NewColumnValues.AbstractValues(), dmlEvent.WhereColumnValues.AbstractValues()) + if ignore, err := this.isIgnoreOverMaxChunkRangeEvent(uniqueKeyArgs); err != nil { + return append(results, newDmlBuildResultError(fmt.Errorf("Check isIgnoreOverMaxChunkRangeEvent error: %+v", err))) + } else if ignore { + return results + } args := sqlutils.Args() args = append(args, sharedArgs...) args = append(args, uniqueKeyArgs...) - return append(results, newDmlBuildResult(query, args, 0, err)) + return append(results, newDmlBuildResult(binlog.UpdateDML, query, args, uniqueKeyArgs, 0, err)) } } return append(results, newDmlBuildResultError(fmt.Errorf("Unknown dml event type: %+v", dmlEvent.DML))) } +func (this *Applier) generateDeleteQuery(uniqueKeyValuesList [][]string) string { + var stmt string + if len(uniqueKeyValuesList) == 0 { + return stmt + } + + var whereClause string + for _, uniqueKeyValues := range uniqueKeyValuesList { + if uniqueKeyValues == nil || len(uniqueKeyValues) == 0 { + continue + } + _clause := "" + for _, val := range uniqueKeyValues { + if _clause == "" { + _clause = fmt.Sprintf("%v", val) + continue + } + _clause += fmt.Sprintf(", %v", val) + } + + if whereClause == "" { + whereClause = fmt.Sprintf("(%s)", _clause) + continue + } + whereClause = fmt.Sprintf("%s, (%s)", whereClause, _clause) + } + + stmt = fmt.Sprintf(` + DELETE /* gh-ost %s.%s */ + FROM %s.%s + WHERE (%s) IN (%s)`, + sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName), + sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.GetGhostTableName()), + this.migrationContext.UniqueKey.Columns.EscapeString(), + whereClause, + ) + return stmt +} + +func (this *Applier) generateReplaceQuery(uniqueKeyValuesList [][]string) string { + var stmt string + if len(uniqueKeyValuesList) == 0 { + return stmt + } + + var whereClause string + for _, uniqueKeyValues := range uniqueKeyValuesList { + if uniqueKeyValues == nil || len(uniqueKeyValues) == 0 { + continue + } + _clause := "" + for _, val := range uniqueKeyValues { + if _clause == "" { + _clause = fmt.Sprintf("%v", val) + continue + } + _clause += fmt.Sprintf(", %v", val) + } + + if whereClause == "" { + whereClause = fmt.Sprintf(`(%s)`, _clause) + continue + } + whereClause = fmt.Sprintf(`%s, (%s)`, whereClause, _clause) + } + + stmt = fmt.Sprintf(` + REPLACE /* gh-ost %s.%s */ + INTO %s.%s (%s) + SELECT %s + FROM %s.%s + FORCE INDEX (%s) + WHERE (%s) IN (%s)`, + sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName), + sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.GetGhostTableName()), this.migrationContext.SharedColumns.EscapeString(), + this.migrationContext.SharedColumns.EscapeString(), + sql.EscapeName(this.migrationContext.DatabaseName), sql.EscapeName(this.migrationContext.OriginalTableName), + sql.EscapeName(this.migrationContext.UniqueKey.Name), + this.migrationContext.UniqueKey.Columns.EscapeString(), + whereClause, + ) + + return stmt +} + // ApplyDMLEventQueries applies multiple DML queries onto the _ghost_ table func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) error { var totalDelta int64 + var dmlEventSize int64 + var ignoredEventSize int64 - err := func() error { + var err error + + dbTxFunc := func(applyFunc func(*gosql.Tx) error) error { tx, err := this.db.Begin() if err != nil { return err @@ -1157,38 +1265,139 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) if _, err := tx.Exec(sessionQuery); err != nil { return rollback(err) } + + if err := applyFunc(tx); err != nil { + return rollback(err) + } + + if err := tx.Commit(); err != nil { + return err + } + return nil + } + + resultFunc := func(result gosql.Result, delta int64) { + if result == nil { + return + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + log.Warningf("error getting rows affected from DML event query: %s. i'm going to assume that the DML affected a single row, but this may result in inaccurate statistics", err) + rowsAffected = 1 + } + totalDelta += delta * rowsAffected + } + + applyMapFunc := func(tx *gosql.Tx) error { + dmlMap := make(map[string]*dmlBuildResult) + const ValSep = "#gho#" for _, dmlEvent := range dmlEvents { - for _, buildResult := range this.buildDMLEventQuery(dmlEvent) { + buildResults := this.buildDMLEventQuery(dmlEvent) + if len(buildResults) == 0 { + ignoredEventSize++ + continue + } + dmlEventSize++ + for _, buildResult := range buildResults { if buildResult.err != nil { - return rollback(buildResult.err) + return buildResult.err } - result, err := tx.Exec(buildResult.query, buildResult.args...) + + values, err := this.migrationContext.UniqueKey.FormatValues(buildResult.uniqueValues) if err != nil { - err = fmt.Errorf("%w; query=%s; args=%+v", err, buildResult.query, buildResult.args) - return rollback(err) + return err } + dmlMap[strings.Join(values, ValSep)] = buildResult + } + } + delArgs := make([][]string, 0) + uptArgs := make([][]string, 0) + insArgs := make([][]string, 0) + for key, buildResult := range dmlMap { + if buildResult == nil { + continue + } + values := strings.Split(key, ValSep) + switch buildResult.dml { + case binlog.DeleteDML: + delArgs = append(delArgs, values) + case binlog.InsertDML: + insArgs = append(insArgs, values) + case binlog.UpdateDML: + uptArgs = append(uptArgs, values) + default: + return fmt.Errorf("dost not support dml event %s", buildResult.dml) + } + } + + if len(delArgs) > 0 { + query := this.generateDeleteQuery(delArgs) + result, err := tx.Exec(query) + if err != nil { + err = fmt.Errorf("%w; query=%s", err, query) + return err + } + resultFunc(result, -1) + } + + if len(insArgs) > 0 { + query := this.generateReplaceQuery(insArgs) + result, err := tx.Exec(query) + if err != nil { + err = fmt.Errorf("%w; query=%s", err, query) + return err + } + resultFunc(result, 1) + } + + if len(uptArgs) > 0 { + query := this.generateReplaceQuery(uptArgs) + _, err := tx.Exec(query) + if err != nil { + err = fmt.Errorf("%w; query=%s", err, query) + return err + } + } + return nil + } - rowsAffected, err := result.RowsAffected() + applyAllFunc := func(tx *gosql.Tx) error { + for _, dmlEvent := range dmlEvents { + buildResults := this.buildDMLEventQuery(dmlEvent) + if len(buildResults) == 0 { + ignoredEventSize++ + continue + } + dmlEventSize++ + for _, buildResult := range buildResults { + if buildResult.err != nil { + return buildResult.err + } + result, err := tx.Exec(buildResult.query, buildResult.args...) if err != nil { - log.Warningf("error getting rows affected from DML event query: %s. i'm going to assume that the DML affected a single row, but this may result in inaccurate statistics", err) - rowsAffected = 1 + err = fmt.Errorf("%w; query=%s; args=%+v", err, buildResult.query, buildResult.args) + return err } - // each DML is either a single insert (delta +1), update (delta +0) or delete (delta -1). - // multiplying by the rows actually affected (either 0 or 1) will give an accurate row delta for this DML event - totalDelta += buildResult.rowsDelta * rowsAffected + + resultFunc(result, buildResult.rowsDelta) } } - if err := tx.Commit(); err != nil { - return err - } return nil - }() + } + + if this.migrationContext.IsMergeDMLEvents && this.migrationContext.UniqueKey.IsMemoryComparable { + err = dbTxFunc(applyMapFunc) + } else { + err = dbTxFunc(applyAllFunc) + } if err != nil { return this.migrationContext.Log.Errore(err) } // no error - atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, int64(len(dmlEvents))) + atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, dmlEventSize) + atomic.AddInt64(&this.migrationContext.TotalDMLEventsIgnored, ignoredEventSize) if this.migrationContext.CountTableRows { atomic.AddInt64(&this.migrationContext.RowsDeltaEstimate, totalDelta) } @@ -1202,3 +1411,65 @@ func (this *Applier) Teardown() { this.singletonDB.Close() atomic.StoreInt64(&this.finishedMigrating, 1) } + +// isIgnoreOverMaxChunkRangeEvent returns true if this event should be ignored +// min rangeMax max +// the value > rangeMax and value < max, ignore = true +// otherwise ignore = false +func (this *Applier) isIgnoreOverMaxChunkRangeEvent(uniqueKeyArgs []interface{}) (bool, error) { + if !this.migrationContext.IgnoreOverIterationRangeMaxBinlog { + return false, nil + } + + // 比较是否大于等于最大边界值,如果是就不能忽略,需要应用对应的binlog + ignore, err := func() (bool, error) { + for order, uniqueKeyCol := range this.migrationContext.UniqueKey.Columns.Columns() { + if uniqueKeyCol.CompareValueFunc == nil { + return false, nil + } + + than, err := uniqueKeyCol.CompareValueFunc(uniqueKeyArgs[order], this.migrationContext.MigrationRangeMaxValues.StringColumn(order)) + if err != nil { + return false, err + } + if than < 0 { + return true, nil + } else if than > 0 { + return false, nil + } + } + + // 等于边界值的时候,不能忽略 + return false, nil + }() + if err != nil { + return false, err + } + + if !ignore { + return false, nil + } + + // 比较是否超过IterationRangeMax的边界值,如果大于可以忽略,小于就不能忽略 + ignore, err = func() (bool, error) { + for order, uniqueKeyCol := range this.migrationContext.UniqueKey.Columns.Columns() { + than, err := uniqueKeyCol.CompareValueFunc(uniqueKeyArgs[order], this.migrationContext.MigrationIterationRangeMaxValues.StringColumn(order)) + if err != nil { + return false, err + } + if than > 0 { + return true, nil + } else if than < 0 { + return false, nil + } + } + + // 等于边界值的时候,不能忽略 + return false, nil + }() + if err != nil { + return false, err + } + + return ignore, nil +} diff --git a/go/logic/applier_test.go b/go/logic/applier_test.go index 36c28d9e8..acb2d7813 100644 --- a/go/logic/applier_test.go +++ b/go/logic/applier_test.go @@ -6,6 +6,8 @@ package logic import ( + "fmt" + "math/big" "strings" "testing" @@ -183,3 +185,178 @@ func TestApplierInstantDDL(t *testing.T) { test.S(t).ExpectEquals(stmt, "ALTER /* gh-ost */ TABLE `test`.`mytable` ADD INDEX (foo), ALGORITHM=INSTANT") }) } + +func TestGenerateQuery(t *testing.T) { + migrationContext := base.NewMigrationContext() + migrationContext.DatabaseName = "test" + migrationContext.OriginalTableName = "mytable" + uniqueColumns := sql.NewColumnList([]string{"id", "order_id"}) + migrationContext.UniqueKey = &sql.UniqueKey{ + Name: "PRIMARY KEY", + Columns: *uniqueColumns, + } + sharedColumns := sql.NewColumnList([]string{"id", "order_id", "name", "age"}) + migrationContext.SharedColumns = sharedColumns + + applier := NewApplier(migrationContext) + + t.Run("generateDeleteQuery1", func(t *testing.T) { + stmt := applier.generateDeleteQuery([][]string{{"1", "2"}}) + test.S(t).ExpectEquals(stmt, ` + DELETE /* gh-ost `+"`test`.`mytable`"+` */ + FROM `+"`test`.`_mytable_gho`"+` + WHERE `+"(`id`,`order_id`)"+` IN ((1, 2))`) + }) + t.Run("generateDeleteQuery2", func(t *testing.T) { + stmt := applier.generateDeleteQuery([][]string{{"'1'", "'2'"}}) + test.S(t).ExpectEquals(stmt, ` + DELETE /* gh-ost `+"`test`.`mytable`"+` */ + FROM `+"`test`.`_mytable_gho`"+` + WHERE `+"(`id`,`order_id`)"+` IN (('1', '2'))`) + }) + t.Run("generateDeleteQuery3", func(t *testing.T) { + stmt := applier.generateDeleteQuery([][]string{{"'1'", "'2'"}, {"1", "23"}}) + test.S(t).ExpectEquals(stmt, ` + DELETE /* gh-ost `+"`test`.`mytable`"+` */ + FROM `+"`test`.`_mytable_gho`"+` + WHERE `+"(`id`,`order_id`)"+` IN (('1', '2'), (1, 23))`) + }) + t.Run("generateReplaceQuery1", func(t *testing.T) { + stmt := applier.generateReplaceQuery([][]string{{"1", "2"}}) + test.S(t).ExpectEquals(stmt, ` + REPLACE /* gh-ost `+"`test`.`mytable`"+` */ + INTO `+"`test`.`_mytable_gho` (`id`,`order_id`,`name`,`age`)"+` + SELECT `+"`id`,`order_id`,`name`,`age`"+` + FROM `+"`test`.`mytable`"+` + FORCE INDEX `+"(`PRIMARY KEY`)"+` + WHERE (`+"`id`,`order_id`"+`) IN ((1, 2))`) + }) + t.Run("generateReplaceQuery2", func(t *testing.T) { + stmt := applier.generateReplaceQuery([][]string{{"'1'", "'2'"}}) + test.S(t).ExpectEquals(stmt, ` + REPLACE /* gh-ost `+"`test`.`mytable`"+` */ + INTO `+"`test`.`_mytable_gho` (`id`,`order_id`,`name`,`age`)"+` + SELECT `+"`id`,`order_id`,`name`,`age`"+` + FROM `+"`test`.`mytable`"+` + FORCE INDEX `+"(`PRIMARY KEY`)"+` + WHERE (`+"`id`,`order_id`"+`) IN (('1', '2'))`) + }) + t.Run("generateReplaceQuery3", func(t *testing.T) { + stmt := applier.generateReplaceQuery([][]string{{"'1'", "'2'"}, {"1", "23"}}) + test.S(t).ExpectEquals(stmt, ` + REPLACE /* gh-ost `+"`test`.`mytable`"+` */ + INTO `+"`test`.`_mytable_gho` (`id`,`order_id`,`name`,`age`)"+` + SELECT `+"`id`,`order_id`,`name`,`age`"+` + FROM `+"`test`.`mytable`"+` + FORCE INDEX `+"(`PRIMARY KEY`)"+` + WHERE (`+"`id`,`order_id`"+`) IN (('1', '2'), (1, 23))`) + }) +} + +func TestIsIgnoreOverMaxChunkRangeEvent(t *testing.T) { + migrationContext := base.NewMigrationContext() + uniqueColumns := sql.NewColumnList([]string{"id", "date"}) + uniqueColumns.SetColumnCompareValueFunc("id", func(a interface{}, b interface{}) (int, error) { + _a := new(big.Int) + if _a, _ = _a.SetString(fmt.Sprintf("%+v", a), 10); a == nil { + return 0, fmt.Errorf("CompareValueFunc err, %+v convert int is nil", a) + } + _b := new(big.Int) + if _b, _ = _b.SetString(fmt.Sprintf("%+v", b), 10); b == nil { + return 0, fmt.Errorf("CompareValueFunc err, %+v convert int is nil", b) + } + return _a.Cmp(_b), nil + }) + + uniqueColumns.SetColumnCompareValueFunc("date", func(a interface{}, b interface{}) (int, error) { + _a := new(big.Int) + if _a, _ = _a.SetString(fmt.Sprintf("%+v", a), 10); a == nil { + return 0, fmt.Errorf("CompareValueFunc err, %+v convert int is nil", a) + } + _b := new(big.Int) + if _b, _ = _b.SetString(fmt.Sprintf("%+v", b), 10); b == nil { + return 0, fmt.Errorf("CompareValueFunc err, %+v convert int is nil", b) + } + return _a.Cmp(_b), nil + }) + + migrationContext.UniqueKey = &sql.UniqueKey{ + Name: "PRIMARY KEY", + Columns: *uniqueColumns, + } + migrationContext.MigrationRangeMinValues = sql.ToColumnValues([]interface{}{10, 20240110}) + migrationContext.MigrationRangeMaxValues = sql.ToColumnValues([]interface{}{123456, 20240205}) + migrationContext.MigrationIterationRangeMaxValues = sql.ToColumnValues([]interface{}{11111, 20240103}) + + applier := NewApplier(migrationContext) + + t.Run("setFalse", func(t *testing.T) { + migrationContext.IgnoreOverIterationRangeMaxBinlog = false + isIgnore, err := applier.isIgnoreOverMaxChunkRangeEvent([]interface{}{1, 20240101}) + test.S(t).ExpectNil(err) + test.S(t).ExpectFalse(isIgnore) + }) + + t.Run("lessRangeMaxValue1", func(t *testing.T) { + migrationContext.IgnoreOverIterationRangeMaxBinlog = true + isIgnore, err := applier.isIgnoreOverMaxChunkRangeEvent([]interface{}{100, 20240101}) + test.S(t).ExpectNil(err) + test.S(t).ExpectFalse(isIgnore) + }) + + t.Run("lessRangeMaxValue2", func(t *testing.T) { + migrationContext.IgnoreOverIterationRangeMaxBinlog = true + isIgnore, err := applier.isIgnoreOverMaxChunkRangeEvent([]interface{}{11111, 20240101}) + test.S(t).ExpectNil(err) + test.S(t).ExpectFalse(isIgnore) + }) + + t.Run("equalRangeMaxValue", func(t *testing.T) { + migrationContext.IgnoreOverIterationRangeMaxBinlog = true + isIgnore, err := applier.isIgnoreOverMaxChunkRangeEvent([]interface{}{11111, 20240103}) + test.S(t).ExpectNil(err) + test.S(t).ExpectFalse(isIgnore) + }) + + t.Run("greatRangeMaxValue1", func(t *testing.T) { + migrationContext.IgnoreOverIterationRangeMaxBinlog = true + isIgnore, err := applier.isIgnoreOverMaxChunkRangeEvent([]interface{}{11111, 20240104}) + test.S(t).ExpectNil(err) + test.S(t).ExpectTrue(isIgnore) + }) + + t.Run("greatRangeMaxValue2", func(t *testing.T) { + migrationContext.IgnoreOverIterationRangeMaxBinlog = true + isIgnore, err := applier.isIgnoreOverMaxChunkRangeEvent([]interface{}{11112, 20240103}) + test.S(t).ExpectNil(err) + test.S(t).ExpectTrue(isIgnore) + }) + + t.Run("lessMaxValue1", func(t *testing.T) { + migrationContext.IgnoreOverIterationRangeMaxBinlog = true + isIgnore, err := applier.isIgnoreOverMaxChunkRangeEvent([]interface{}{123456, 20240204}) + test.S(t).ExpectNil(err) + test.S(t).ExpectTrue(isIgnore) + }) + + t.Run("equalMaxValue", func(t *testing.T) { + migrationContext.IgnoreOverIterationRangeMaxBinlog = true + isIgnore, err := applier.isIgnoreOverMaxChunkRangeEvent([]interface{}{123456, 20240205}) + test.S(t).ExpectNil(err) + test.S(t).ExpectFalse(isIgnore) + }) + + t.Run("greatMaxValue1", func(t *testing.T) { + migrationContext.IgnoreOverIterationRangeMaxBinlog = true + isIgnore, err := applier.isIgnoreOverMaxChunkRangeEvent([]interface{}{123456, 20240207}) + test.S(t).ExpectNil(err) + test.S(t).ExpectFalse(isIgnore) + }) + + t.Run("greatMaxValue2", func(t *testing.T) { + migrationContext.IgnoreOverIterationRangeMaxBinlog = true + isIgnore, err := applier.isIgnoreOverMaxChunkRangeEvent([]interface{}{123457, 20240204}) + test.S(t).ExpectNil(err) + test.S(t).ExpectFalse(isIgnore) + }) +} diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 9d414a43e..268c4bb96 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -10,6 +10,7 @@ import ( gosql "database/sql" "errors" "fmt" + "math/big" "reflect" "strings" "sync/atomic" @@ -137,6 +138,7 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) { for i, sharedUniqueKey := range sharedUniqueKeys { this.applyColumnTypes(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, &sharedUniqueKey.Columns) uniqueKeyIsValid := true + isMemoryComparable := true for _, column := range sharedUniqueKey.Columns.Columns() { switch column.Type { case sql.FloatColumnType: @@ -152,9 +154,15 @@ func (this *Inspector) inspectOriginalAndGhostTables() (err error) { uniqueKeyIsValid = false } } + if isMemoryComparable && column.FormatValueFunc != nil { + isMemoryComparable = true + } else { + isMemoryComparable = false + } } if uniqueKeyIsValid { this.migrationContext.UniqueKey = sharedUniqueKeys[i] + this.migrationContext.UniqueKey.IsMemoryComparable = isMemoryComparable break } } @@ -598,6 +606,23 @@ func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsL if strings.Contains(columnType, "unsigned") { column.IsUnsigned = true } + if strings.Contains(columnType, "int") { + column.CompareValueFunc = func(a interface{}, b interface{}) (int, error) { + _a := new(big.Int) + if _a, _ = _a.SetString(fmt.Sprintf("%+v", a), 10); a == nil { + return 0, fmt.Errorf("CompareValueFunc err, %+v convert int is nil", a) + } + _b := new(big.Int) + if _b, _ = _b.SetString(fmt.Sprintf("%+v", b), 10); b == nil { + return 0, fmt.Errorf("CompareValueFunc err, %+v convert int is nil", b) + } + return _a.Cmp(_b), nil + } + + column.FormatValueFunc = func(a interface{}) (string, error) { + return fmt.Sprintf("%+v", a), nil + } + } if strings.Contains(columnType, "mediumint") { column.Type = sql.MediumIntColumnType } @@ -612,6 +637,9 @@ func (this *Inspector) applyColumnTypes(databaseName, tableName string, columnsL } if strings.Contains(columnType, "float") { column.Type = sql.FloatColumnType + column.FormatValueFunc = func(a interface{}) (string, error) { + return fmt.Sprintf("%+v", a), nil + } } if strings.HasPrefix(columnType, "enum") { column.Type = sql.EnumColumnType diff --git a/go/logic/migrator.go b/go/logic/migrator.go index b4d0a9ae1..a9d0f2cd4 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -1025,9 +1025,10 @@ func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) { currentBinlogCoordinates := *this.eventsStreamer.GetCurrentBinlogCoordinates() - status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, HeartbeatLag: %.2fs, State: %s; ETA: %s", + status := fmt.Sprintf("Copy: %d/%d %.1f%%; Applied: %d; Ignored: %d; Backlog: %d/%d; Time: %+v(total), %+v(copy); streamer: %+v; Lag: %.2fs, HeartbeatLag: %.2fs, State: %s; ETA: %s", totalRowsCopied, rowsEstimate, progressPct, atomic.LoadInt64(&this.migrationContext.TotalDMLEventsApplied), + atomic.LoadInt64(&this.migrationContext.TotalDMLEventsIgnored), len(this.applyEventsQueue), cap(this.applyEventsQueue), base.PrettifyDurationOutput(elapsedTime), base.PrettifyDurationOutput(this.migrationContext.ElapsedRowCopyTime()), currentBinlogCoordinates, @@ -1260,6 +1261,9 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { } // Create a task to apply the DML event; this will be execute by executeWriteFuncs() var applyEventFunc tableWriteFunc = func() error { + if this.migrationContext.UniqueKey.IsMemoryComparable { + return this.applier.ApplyDMLEventQueries(dmlEvents) + } return this.applier.ApplyDMLEventQueries(dmlEvents) } if err := this.retryOperation(applyEventFunc); err != nil { diff --git a/go/sql/builder.go b/go/sql/builder.go index 7be428f93..a1bab49e4 100644 --- a/go/sql/builder.go +++ b/go/sql/builder.go @@ -433,15 +433,15 @@ func BuildDMLDeleteQuery(databaseName, tableName string, tableColumns, uniqueKey return result, uniqueKeyArgs, nil } -func BuildDMLInsertQuery(databaseName, tableName string, tableColumns, sharedColumns, mappedSharedColumns *ColumnList, args []interface{}) (result string, sharedArgs []interface{}, err error) { +func BuildDMLInsertQuery(databaseName, tableName string, tableColumns, sharedColumns, mappedSharedColumns, uniqueKeyColumns *ColumnList, args []interface{}) (result string, sharedArgs, uniqueKeyArgs []interface{}, err error) { if len(args) != tableColumns.Len() { - return result, args, fmt.Errorf("args count differs from table column count in BuildDMLInsertQuery") + return result, args, nil, fmt.Errorf("args count differs from table column count in BuildDMLInsertQuery") } if !sharedColumns.IsSubsetOf(tableColumns) { - return result, args, fmt.Errorf("shared columns is not a subset of table columns in BuildDMLInsertQuery") + return result, args, nil, fmt.Errorf("shared columns is not a subset of table columns in BuildDMLInsertQuery") } if sharedColumns.Len() == 0 { - return result, args, fmt.Errorf("No shared columns found in BuildDMLInsertQuery") + return result, args, nil, fmt.Errorf("No shared columns found in BuildDMLInsertQuery") } databaseName = EscapeName(databaseName) tableName = EscapeName(tableName) @@ -452,6 +452,12 @@ func BuildDMLInsertQuery(databaseName, tableName string, tableColumns, sharedCol sharedArgs = append(sharedArgs, arg) } + for _, column := range uniqueKeyColumns.Columns() { + tableOrdinal := tableColumns.Ordinals[column.Name] + arg := column.convertArg(args[tableOrdinal], true) + uniqueKeyArgs = append(uniqueKeyArgs, arg) + } + mappedSharedColumnNames := duplicateNames(mappedSharedColumns.Names()) for i := range mappedSharedColumnNames { mappedSharedColumnNames[i] = EscapeName(mappedSharedColumnNames[i]) @@ -470,7 +476,7 @@ func BuildDMLInsertQuery(databaseName, tableName string, tableColumns, sharedCol strings.Join(mappedSharedColumnNames, ", "), strings.Join(preparedValues, ", "), ) - return result, sharedArgs, nil + return result, sharedArgs, uniqueKeyArgs, nil } func BuildDMLUpdateQuery(databaseName, tableName string, tableColumns, sharedColumns, mappedSharedColumns, uniqueKeyColumns *ColumnList, valueArgs, whereArgs []interface{}) (result string, sharedArgs, uniqueKeyArgs []interface{}, err error) { diff --git a/go/sql/builder_test.go b/go/sql/builder_test.go index 574e8bb1b..dd49e7f29 100644 --- a/go/sql/builder_test.go +++ b/go/sql/builder_test.go @@ -491,7 +491,8 @@ func TestBuildDMLInsertQuery(t *testing.T) { args := []interface{}{3, "testname", "first", 17, 23} { sharedColumns := NewColumnList([]string{"id", "name", "position", "age"}) - query, sharedArgs, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, args) + uniqueKeyColumns := NewColumnList([]string{"position"}) + query, sharedArgs, uniqueKeyArgs, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, uniqueKeyColumns, args) test.S(t).ExpectNil(err) expected := ` replace /* gh-ost mydb.tbl */ @@ -502,10 +503,12 @@ func TestBuildDMLInsertQuery(t *testing.T) { ` test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) test.S(t).ExpectTrue(reflect.DeepEqual(sharedArgs, []interface{}{3, "testname", 17, 23})) + test.S(t).ExpectTrue(reflect.DeepEqual(uniqueKeyArgs, []interface{}{17})) } { sharedColumns := NewColumnList([]string{"position", "name", "age", "id"}) - query, sharedArgs, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, args) + uniqueKeyColumns := NewColumnList([]string{"position", "name"}) + query, sharedArgs, uniqueKeyArgs, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, uniqueKeyColumns, args) test.S(t).ExpectNil(err) expected := ` replace /* gh-ost mydb.tbl */ @@ -516,15 +519,18 @@ func TestBuildDMLInsertQuery(t *testing.T) { ` test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) test.S(t).ExpectTrue(reflect.DeepEqual(sharedArgs, []interface{}{17, "testname", 23, 3})) + test.S(t).ExpectTrue(reflect.DeepEqual(uniqueKeyArgs, []interface{}{17, "testname"})) } { sharedColumns := NewColumnList([]string{"position", "name", "surprise", "id"}) - _, _, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, args) + uniqueKeyColumns := NewColumnList([]string{"id"}) + _, _, _, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, uniqueKeyColumns, args) test.S(t).ExpectNotNil(err) } { sharedColumns := NewColumnList([]string{}) - _, _, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, args) + uniqueKeyColumns := NewColumnList([]string{}) + _, _, _, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, uniqueKeyColumns, args) test.S(t).ExpectNotNil(err) } } @@ -538,7 +544,8 @@ func TestBuildDMLInsertQuerySignedUnsigned(t *testing.T) { // testing signed args := []interface{}{3, "testname", "first", int8(-1), 23} sharedColumns := NewColumnList([]string{"id", "name", "position", "age"}) - query, sharedArgs, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, args) + uniqueKeyColumns := NewColumnList([]string{"position"}) + query, sharedArgs, uniqueKeyArgs, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, uniqueKeyColumns, args) test.S(t).ExpectNil(err) expected := ` replace /* gh-ost mydb.tbl */ @@ -549,12 +556,14 @@ func TestBuildDMLInsertQuerySignedUnsigned(t *testing.T) { ` test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) test.S(t).ExpectTrue(reflect.DeepEqual(sharedArgs, []interface{}{3, "testname", int8(-1), 23})) + test.S(t).ExpectTrue(reflect.DeepEqual(uniqueKeyArgs, []interface{}{int8(-1)})) } { // testing unsigned args := []interface{}{3, "testname", "first", int8(-1), 23} sharedColumns.SetUnsigned("position") - query, sharedArgs, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, args) + uniqueKeyColumns := NewColumnList([]string{"position"}) + query, sharedArgs, uniqueKeyArgs, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, uniqueKeyColumns, args) test.S(t).ExpectNil(err) expected := ` replace /* gh-ost mydb.tbl */ @@ -565,12 +574,14 @@ func TestBuildDMLInsertQuerySignedUnsigned(t *testing.T) { ` test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) test.S(t).ExpectTrue(reflect.DeepEqual(sharedArgs, []interface{}{3, "testname", uint8(255), 23})) + test.S(t).ExpectTrue(reflect.DeepEqual(uniqueKeyArgs, []interface{}{uint8(255)})) } { // testing unsigned args := []interface{}{3, "testname", "first", int32(-1), 23} sharedColumns.SetUnsigned("position") - query, sharedArgs, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, args) + uniqueKeyColumns := NewColumnList([]string{"position"}) + query, sharedArgs, uniqueKeyArgs, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, sharedColumns, uniqueKeyColumns, args) test.S(t).ExpectNil(err) expected := ` replace /* gh-ost mydb.tbl */ @@ -581,6 +592,7 @@ func TestBuildDMLInsertQuerySignedUnsigned(t *testing.T) { ` test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected)) test.S(t).ExpectTrue(reflect.DeepEqual(sharedArgs, []interface{}{3, "testname", uint32(4294967295), 23})) + test.S(t).ExpectTrue(reflect.DeepEqual(uniqueKeyArgs, []interface{}{uint32(4294967295)})) } } diff --git a/go/sql/types.go b/go/sql/types.go index 3be1a44ca..a25efde85 100644 --- a/go/sql/types.go +++ b/go/sql/types.go @@ -49,6 +49,9 @@ type Column struct { // https://github.com/github/gh-ost/issues/909 BinaryOctetLength uint charsetConversion *CharacterSetConversion + // compare a and b using this function, when a equal b, return 0, when a > b, return 1, when a < b, return -1 + CompareValueFunc func(a interface{}, b interface{}) (int, error) + FormatValueFunc func(a interface{}) (string, error) } func (this *Column) convertArg(arg interface{}, isUniqueKeyColumn bool) interface{} { @@ -225,6 +228,14 @@ func (this *ColumnList) String() string { return strings.Join(this.Names(), ",") } +func (this *ColumnList) EscapeString() string { + var cols []string + for _, col := range this.Names() { + cols = append(cols, fmt.Sprintf("`%s`", col)) + } + return strings.Join(cols, ",") +} + func (this *ColumnList) Equals(other *ColumnList) bool { return reflect.DeepEqual(this.Columns, other.Columns) } @@ -252,12 +263,21 @@ func (this *ColumnList) SetCharsetConversion(columnName string, fromCharset stri this.GetColumn(columnName).charsetConversion = &CharacterSetConversion{FromCharset: fromCharset, ToCharset: toCharset} } +func (this *ColumnList) SetColumnCompareValueFunc(columnName string, f func(a interface{}, b interface{}) (int, error)) { + this.GetColumn(columnName).CompareValueFunc = f +} + +func (this *ColumnList) GetColumnCompareValueFunc(columnName string) func(a interface{}, b interface{}) (int, error) { + return this.GetColumn(columnName).CompareValueFunc +} + // UniqueKey is the combination of a key's name and columns type UniqueKey struct { - Name string - Columns ColumnList - HasNullable bool - IsAutoIncrement bool + Name string + Columns ColumnList + HasNullable bool + IsAutoIncrement bool + IsMemoryComparable bool } // IsPrimary checks if this unique key is primary @@ -277,6 +297,21 @@ func (this *UniqueKey) String() string { return fmt.Sprintf("%s: %s; has nullable: %+v", description, this.Columns.Names(), this.HasNullable) } +func (this *UniqueKey) FormatValues(args []interface{}) ([]string, error) { + var values []string + for i, column := range this.Columns.Columns() { + if column.FormatValueFunc == nil { + return nil, fmt.Errorf("column %s does not support format value", column.Name) + } + val, err := column.FormatValueFunc(args[i]) + if err != nil { + return nil, err + } + values = append(values, val) + } + return values, nil +} + type ColumnValues struct { abstractValues []interface{} ValuesPointers []interface{} From 6ae134024d8093e2559b4f84f2d2c84cca990dcb Mon Sep 17 00:00:00 2001 From: shaohoukun Date: Mon, 5 Feb 2024 18:38:02 +0800 Subject: [PATCH 2/5] docs(x): x --- doc/command-line-flags.md | 4 ++++ go/logic/applier.go | 8 ++++---- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/doc/command-line-flags.md b/doc/command-line-flags.md index efac3c23b..6a379d576 100644 --- a/doc/command-line-flags.md +++ b/doc/command-line-flags.md @@ -129,6 +129,10 @@ Noteworthy is that setting `--dml-batch-size` to higher value _does not_ mean `g When binlog unique key value is over `MigrationIterationRangeMaxValues`, and less than `MigrationRangeMaxValues`, the binlog will be ignored. Because the data will be synced by copy chunk. When binlog unique key value is over `MigrationRangeMaxValues` or less than `MigrationIterationRangeMaxValues`, the binlog will be applied. +### is-merge-dml-event +When is-merge-dml-event is `true`, and the chunk unique key is int or float type. the sync binlog event while be merged into map, and the key is unique key value. +Then traverse the map, merge all delete operations into one `delete sql`, merge all insert and update operations into `replace sql`, and then execute. + ### exact-rowcount A `gh-ost` execution need to copy whatever rows you have in your existing table onto the ghost table. This can and often will be, a large number. Exactly what that number is? diff --git a/go/logic/applier.go b/go/logic/applier.go index 78eed1ec5..6284fd722 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -1421,7 +1421,7 @@ func (this *Applier) isIgnoreOverMaxChunkRangeEvent(uniqueKeyArgs []interface{}) return false, nil } - // 比较是否大于等于最大边界值,如果是就不能忽略,需要应用对应的binlog + // Compare whether it is greater than or equal to the maximum boundary value. If it is, it cannot be ignored and the corresponding binlog needs to be applied. ignore, err := func() (bool, error) { for order, uniqueKeyCol := range this.migrationContext.UniqueKey.Columns.Columns() { if uniqueKeyCol.CompareValueFunc == nil { @@ -1439,7 +1439,7 @@ func (this *Applier) isIgnoreOverMaxChunkRangeEvent(uniqueKeyArgs []interface{}) } } - // 等于边界值的时候,不能忽略 + // When it is equal to the boundary value, it cannot be ignored. return false, nil }() if err != nil { @@ -1450,7 +1450,7 @@ func (this *Applier) isIgnoreOverMaxChunkRangeEvent(uniqueKeyArgs []interface{}) return false, nil } - // 比较是否超过IterationRangeMax的边界值,如果大于可以忽略,小于就不能忽略 + // Compare whether it exceeds the boundary value of IterationRangeMax. If it is greater, it can be ignored, if it is less, it cannot be ignored. ignore, err = func() (bool, error) { for order, uniqueKeyCol := range this.migrationContext.UniqueKey.Columns.Columns() { than, err := uniqueKeyCol.CompareValueFunc(uniqueKeyArgs[order], this.migrationContext.MigrationIterationRangeMaxValues.StringColumn(order)) @@ -1464,7 +1464,7 @@ func (this *Applier) isIgnoreOverMaxChunkRangeEvent(uniqueKeyArgs []interface{}) } } - // 等于边界值的时候,不能忽略 + //When it is equal to the boundary value, it cannot be ignored. return false, nil }() if err != nil { From 1746b18da1c3178499952cd82a711ff3b4c76720 Mon Sep 17 00:00:00 2001 From: shaohoukun Date: Tue, 6 Feb 2024 14:39:02 +0800 Subject: [PATCH 3/5] fix(x): x --- go/logic/migrator.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/go/logic/migrator.go b/go/logic/migrator.go index a9d0f2cd4..5db54ad12 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -1261,9 +1261,6 @@ func (this *Migrator) onApplyEventStruct(eventStruct *applyEventStruct) error { } // Create a task to apply the DML event; this will be execute by executeWriteFuncs() var applyEventFunc tableWriteFunc = func() error { - if this.migrationContext.UniqueKey.IsMemoryComparable { - return this.applier.ApplyDMLEventQueries(dmlEvents) - } return this.applier.ApplyDMLEventQueries(dmlEvents) } if err := this.retryOperation(applyEventFunc); err != nil { From d6a536b1adb403cafcd83ca5221c73d329f03a8d Mon Sep 17 00:00:00 2001 From: shaohoukun Date: Mon, 26 Feb 2024 18:50:42 +0800 Subject: [PATCH 4/5] fix(feat-binlog-apply-optimization): feat-binlog-apply-optimization When there is only one column in a unique index, merging of DML binlog events is permitted. --- go/logic/applier.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/go/logic/applier.go b/go/logic/applier.go index 6284fd722..83fd51c53 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -1386,7 +1386,8 @@ func (this *Applier) ApplyDMLEventQueries(dmlEvents [](*binlog.BinlogDMLEvent)) return nil } - if this.migrationContext.IsMergeDMLEvents && this.migrationContext.UniqueKey.IsMemoryComparable { + // IsMergeDMLEvents is enabled and unique key is memory comparable and unique key has only one column + if this.migrationContext.IsMergeDMLEvents && this.migrationContext.UniqueKey.IsMemoryComparable && this.migrationContext.UniqueKey.Len() == 1 { err = dbTxFunc(applyMapFunc) } else { err = dbTxFunc(applyAllFunc) From ea8378b12cef87a86a0afa5a1f9e12a6e2bfc09d Mon Sep 17 00:00:00 2001 From: shaohoukun Date: Wed, 13 Mar 2024 10:59:35 +0800 Subject: [PATCH 5/5] fix(feat-binlog-apply-optimization): add handling when MigrationIterationRangeMaxValues is nil Add handling for the case when MigrationIterationRangeMaxValues is nil --- go/logic/applier.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/go/logic/applier.go b/go/logic/applier.go index 83fd51c53..60a53366a 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -1453,6 +1453,10 @@ func (this *Applier) isIgnoreOverMaxChunkRangeEvent(uniqueKeyArgs []interface{}) // Compare whether it exceeds the boundary value of IterationRangeMax. If it is greater, it can be ignored, if it is less, it cannot be ignored. ignore, err = func() (bool, error) { + if this.migrationContext.MigrationIterationRangeMaxValues == nil { + return true, nil + } + for order, uniqueKeyCol := range this.migrationContext.UniqueKey.Columns.Columns() { than, err := uniqueKeyCol.CompareValueFunc(uniqueKeyArgs[order], this.migrationContext.MigrationIterationRangeMaxValues.StringColumn(order)) if err != nil {