diff --git a/doc/command-line-flags.md b/doc/command-line-flags.md index 021462fa2..5c0a0045b 100644 --- a/doc/command-line-flags.md +++ b/doc/command-line-flags.md @@ -61,6 +61,12 @@ It is not reliable to parse the `ALTER` statement to determine if it is instant `gh-ost` will automatically fallback to the normal DDL process if the attempt to use instant DDL is unsuccessful. +### chunk-size + +Chunk size is the number of rows to copy in a single batch for copying data from the original table to the ghost table. The default value is 1000. Increasing the chunk-size can improve performance (via more batching) but also increases the risk of replica delay. + +See also: [`dynamic-chunking`](#dynamic-chunking) + ### conf `--conf=/path/to/my.cnf`: file where credentials are specified. Should be in (or contain) the following format: @@ -122,6 +128,27 @@ Why is this behavior configurable? Different workloads have different characteri Noteworthy is that setting `--dml-batch-size` to higher value _does not_ mean `gh-ost` blocks or waits on writes. The batch size is an upper limit on transaction size, not a minimal one. If `gh-ost` doesn't have "enough" events in the pipe, it does not wait on the binary log, it just writes what it already has. This conveniently suggests that if write load is light enough for `gh-ost` to only see a few events in the binary log at a given time, then it is also light enough for `gh-ost` to apply a fraction of the batch size. +### dynamic-chunking + +Dynamic chunking (default: `OFF`) is a feature that allows `gh-ost` to automatically increase or decrease the `--chunk-size` up to 50x, based on the execution time of previous copy-row operations. The goal is to find the optimal batch size to reach `--dynamic-chunk-size-target-millis` (default: 50). + +For example, assume `--chunk-size=1000`, `--dynamic-chunking=true` and `--dynamic-chunk-size-target-millis=50`: + +- The actual "target" chunk size used will always be in the range of `[20,50000]` (within 50x the chunk size) +- Approximately every 1 second, `gh-ost` will re-assess if the target chunk size is optimal based on the `p90` of recent executions. +- Increases in target chunk size are scaled up by no more than 50% of the current target size at a time. +- If any copy-row operations exceed 250ms (5x the target), the target chunk size is immediately reduced to 10% of its current value. + +Enabling dynamic chunk size can be more reliable than the static `--chunk-size=N`, because tables are not created equally. For a table with a very high number of columns and several indexes, `1000` rows may actually be too large of a chunk size. Similarly, for a table with very few columns and no indexes, the ideal batch size may be 20K+ rows (while still being under the 50ms target). + +See also: [`chunk-size`](#chunk-size), [`dynamic-chunk-size-target-millis`](#dynamic-chunk-size-target-millis) + +### dynamic-chunk-size-target-millis + +The target execution time for each copy-row operation when [`--dynamic-chunking`](#dynamic-chunking) (default: `OFF`) is enabled. + +The default value of `50` is a good starting point for most workloads. If you find that read-replicas are intermittently falling behind, you may want to decrease this value. Similarly, if you do not use read-replicas there may be a benefit from increasing this value slightly. The recommended range is `[10,10000]`. Values larger than this have limited added benefit and are not recommended. + ### exact-rowcount A `gh-ost` execution need to copy whatever rows you have in your existing table onto the ghost table. This can and often will be, a large number. Exactly what that number is? diff --git a/go/base/context.go b/go/base/context.go index e3472f5bd..7a4311783 100644 --- a/go/base/context.go +++ b/go/base/context.go @@ -52,6 +52,22 @@ const ( HTTPStatusOK = 200 MaxEventsBatchSize = 1000 ETAUnknown = math.MinInt64 + + // MaxDynamicScaleFactor is the maximum factor dynamic scaling can change the chunkSize from + // the setting chunkSize. For example, if the factor is 10, and chunkSize is 1000, then the + // values will be in the range of 100 to 10000. + MaxDynamicScaleFactor = 50 + // MaxDynamicStepFactor is the maximum amount each recalculation of the dynamic chunkSize can + // increase by. For example, if the newTarget is 5000 but the current target is 1000, the newTarget + // will be capped back down to 1500. Over time the number 5000 will be reached, but not straight away. + MaxDynamicStepFactor = 1.5 + // MinDynamicChunkSize is the minimum chunkSize that can be used when dynamic chunkSize is enabled. + // This helps prevent a scenario where the chunk size is too small (it can never be less than 1). + MinDynamicRowSize = 10 + // DynamicPanicFactor is the factor by which the feedback process takes immediate action when + // the chunkSize appears to be too large. For example, if the PanicFactor is 5, and the target *time* + // is 50ms, an actual time 250ms+ will cause the dynamic chunk size to immediately be reduced. + DynamicPanicFactor = 5 ) var ( @@ -118,7 +134,7 @@ type MigrationContext struct { HeartbeatIntervalMilliseconds int64 defaultNumRetries int64 - ChunkSize int64 + chunkSize int64 niceRatio float64 MaxLagMillisecondsThrottleThreshold int64 throttleControlReplicaKeys *mysql.InstanceKeyMap @@ -146,6 +162,12 @@ type MigrationContext struct { HooksHintToken string HooksStatusIntervalSec int64 + DynamicChunking bool + DynamicChunkSizeTargetMillis int64 + targetChunkSizeMutex sync.Mutex + targetChunkFeedback []time.Duration + targetChunkSize int64 + DropServeSocket bool ServeSocketFile string ServeTCPPort int64 @@ -269,7 +291,7 @@ func NewMigrationContext() *MigrationContext { return &MigrationContext{ Uuid: uuid.NewV4().String(), defaultNumRetries: 60, - ChunkSize: 1000, + chunkSize: 1000, InspectorConnectionConfig: mysql.NewConnectionConfig(), ApplierConnectionConfig: mysql.NewConnectionConfig(), MaxLagMillisecondsThrottleThreshold: 1500, @@ -287,6 +309,7 @@ func NewMigrationContext() *MigrationContext { ColumnRenameMap: make(map[string]string), PanicAbort: make(chan error), Log: NewDefaultLogger(), + DynamicChunking: false, } } @@ -554,6 +577,94 @@ func (this *MigrationContext) GetTotalRowsCopied() int64 { return atomic.LoadInt64(&this.TotalRowsCopied) } +// ChunkDurationFeedback collects samples from copy-rows tasks, and feeds them +// back into a moving p90 that is used to return a more precise value +// in GetChunkSize() calls. Usually we wait for multiple samples and then recalculate +// in GetChunkSize(), however if the input value far exceeds what was expected (>5x) +// we synchronously reduce the chunk size. If it was a one off, it's not an issue +// because the next few samples will always scale the value back up. +func (this *MigrationContext) ChunkDurationFeedback(d time.Duration) (outOfRange bool) { + if !this.DynamicChunking { + return false + } + this.targetChunkSizeMutex.Lock() + defer this.targetChunkSizeMutex.Unlock() + if int64(d) > (this.DynamicChunkSizeTargetMillis * DynamicPanicFactor * int64(time.Millisecond)) { + this.targetChunkFeedback = []time.Duration{} + newTarget := float64(this.targetChunkSize) / float64(DynamicPanicFactor*2) + this.targetChunkSize = this.boundaryCheckTargetChunkSize(newTarget) + return true // don't include in feedback + } + this.targetChunkFeedback = append(this.targetChunkFeedback, d) + return false +} + +// calculateNewTargetChunkSize is called by GetChunkSize() +// under a mutex. It's safe to read this.targetchunkFeedback. +func (this *MigrationContext) calculateNewTargetChunkSize() int64 { + // We do all our math as float64 of time in ns + p90 := float64(lazyFindP90(this.targetChunkFeedback)) + targetTime := float64(this.DynamicChunkSizeTargetMillis * int64(time.Millisecond)) + newTargetRows := float64(this.targetChunkSize) * (targetTime / p90) + return this.boundaryCheckTargetChunkSize(newTargetRows) +} + +// boundaryCheckTargetChunkSize makes sure the new target is not +// too large/small since we are only allowed to scale up/down 50x from +// the original ("reference") chunk size, and only permitted to increase +// by 50% at a time. This is called under a mutex. +func (this *MigrationContext) boundaryCheckTargetChunkSize(newTargetRows float64) int64 { + referenceSize := float64(atomic.LoadInt64(&this.chunkSize)) + if newTargetRows < (referenceSize / MaxDynamicScaleFactor) { + newTargetRows = referenceSize / MaxDynamicScaleFactor + } + if newTargetRows > (referenceSize * MaxDynamicScaleFactor) { + newTargetRows = referenceSize * MaxDynamicScaleFactor + } + if newTargetRows > float64(this.targetChunkSize)*MaxDynamicStepFactor { + newTargetRows = float64(this.targetChunkSize) * MaxDynamicStepFactor + } + if newTargetRows < MinDynamicRowSize { + newTargetRows = MinDynamicRowSize + } + return int64(newTargetRows) +} + +// GetChunkSize returns the number of rows to copy in a single chunk: +// - If Dynamic Chunking is disabled, it will return this.chunkSize. +// - If Dynamic Chunking is enabled, it will return a value that +// automatically adjusts based on the duration of the last few +// copy-rows tasks. +// +// Historically gh-ost has used a static chunk size (i.e. 1000 rows) +// which can be adjusted while gh-ost is running. +// An ideal chunk size is large enough that it can batch operations, +// but small enough that it doesn't cause spikes in replica lag. +// +// The problem with basing the configurable on row-size is two fold: +// - Fow very narrow rows, it's not enough (leaving performance on the table). +// - For very wide rows (or with many secondary indexes) 1000 might be too high! +// +// Dynamic chunking addresses this by using row-size as a starting point, +// *but* the main configurable is based on time (in ms). +func (this *MigrationContext) GetChunkSize() int64 { + if !this.DynamicChunking { + return atomic.LoadInt64(&this.chunkSize) + } + this.targetChunkSizeMutex.Lock() + defer this.targetChunkSizeMutex.Unlock() + if this.targetChunkSize == 0 { + this.targetChunkSize = atomic.LoadInt64(&this.chunkSize) + } + // We need 10 samples to make a decision because we + // calculate it from the p90 (i.e. 2nd to highest value). + if len(this.targetChunkFeedback) >= 10 { + this.targetChunkSize = this.calculateNewTargetChunkSize() + this.targetChunkFeedback = []time.Duration{} // reset + } + return this.targetChunkSize +} + func (this *MigrationContext) GetIteration() int64 { return atomic.LoadInt64(&this.Iteration) } @@ -611,7 +722,7 @@ func (this *MigrationContext) SetChunkSize(chunkSize int64) { if chunkSize > 100000 { chunkSize = 100000 } - atomic.StoreInt64(&this.ChunkSize, chunkSize) + atomic.StoreInt64(&this.chunkSize, chunkSize) } func (this *MigrationContext) SetDMLBatchSize(batchSize int64) { diff --git a/go/base/context_test.go b/go/base/context_test.go index de208bae4..397ef95dd 100644 --- a/go/base/context_test.go +++ b/go/base/context_test.go @@ -120,3 +120,78 @@ func TestReadConfigFile(t *testing.T) { } } } + +func TestDynamicChunker(t *testing.T) { + context := NewMigrationContext() + context.chunkSize = 1000 + context.DynamicChunking = true + context.DynamicChunkSizeTargetMillis = 50 + + // Before feedback it should match the static chunk size + test.S(t).ExpectEquals(context.GetChunkSize(), int64(1000)) + + // 1s is >5x the target, so it should immediately /10 the target + context.ChunkDurationFeedback(1 * time.Second) + test.S(t).ExpectEquals(context.GetChunkSize(), int64(100)) + + // Let's provide 10 pieces of feedback, and see the chunk size + // be adjusted based on the p90th value. + context.ChunkDurationFeedback(33 * time.Millisecond) // 1st + context.ChunkDurationFeedback(33 * time.Millisecond) // 2nd + context.ChunkDurationFeedback(32 * time.Millisecond) // 3rd + context.ChunkDurationFeedback(40 * time.Millisecond) + context.ChunkDurationFeedback(61 * time.Millisecond) + context.ChunkDurationFeedback(37 * time.Millisecond) + context.ChunkDurationFeedback(38 * time.Millisecond) + context.ChunkDurationFeedback(35 * time.Millisecond) + context.ChunkDurationFeedback(29 * time.Millisecond) + test.S(t).ExpectEquals(context.GetChunkSize(), int64(100)) // 9th + context.ChunkDurationFeedback(38 * time.Millisecond) // 10th + // Because 10 items of feedback have been received, + // the chunk size is recalculated. The p90 is 40ms (below our target) + // so the adjusted chunk size increases 25% to 125 + test.S(t).ExpectEquals(context.GetChunkSize(), int64(125)) + + // Collect some new feedback where the p90 is 500us (much lower than our target) + // We have boundary checking on the value which limits it to 50% greater + // than the previous chunk size. + + context.ChunkDurationFeedback(400 * time.Microsecond) + context.ChunkDurationFeedback(450 * time.Microsecond) + context.ChunkDurationFeedback(470 * time.Microsecond) + context.ChunkDurationFeedback(520 * time.Microsecond) + context.ChunkDurationFeedback(500 * time.Microsecond) + context.ChunkDurationFeedback(490 * time.Microsecond) + context.ChunkDurationFeedback(300 * time.Microsecond) + context.ChunkDurationFeedback(450 * time.Microsecond) + context.ChunkDurationFeedback(460 * time.Microsecond) + context.ChunkDurationFeedback(480 * time.Microsecond) + test.S(t).ExpectEquals(context.GetChunkSize(), int64(187)) // very minor increase + + // Test that the chunk size is not allowed to grow larger than 50x + // the original chunk size. Because of the gradual step up, we need to + // provide a lot of feedback first. + for i := 0; i < 1000; i++ { + context.ChunkDurationFeedback(480 * time.Microsecond) + context.GetChunkSize() + } + test.S(t).ExpectEquals(context.GetChunkSize(), int64(50000)) + + // Similarly, the minimum chunksize is 1000/50=20 rows no matter what the feedback. + // The downscaling rule is /10 for values that immediately exceed 5x the target, + // so it usually scales down before the feedback re-evaluation kicks in. + for i := 0; i < 100; i++ { + context.ChunkDurationFeedback(10 * time.Second) + context.GetChunkSize() + } + test.S(t).ExpectEquals(context.GetChunkSize(), int64(20)) + + // If we set the chunkSize to 100, then 100/50=2 is the minimum. + // But there is a hard coded minimum of 10 rows for safety. + context.chunkSize = 100 + for i := 0; i < 100; i++ { + context.ChunkDurationFeedback(10 * time.Second) + context.GetChunkSize() + } + test.S(t).ExpectEquals(context.GetChunkSize(), int64(10)) +} diff --git a/go/base/utils.go b/go/base/utils.go index e3950f2bd..f51d0ceae 100644 --- a/go/base/utils.go +++ b/go/base/utils.go @@ -9,6 +9,7 @@ import ( "fmt" "os" "regexp" + "sort" "strings" "time" @@ -93,3 +94,13 @@ func ValidateConnection(db *gosql.DB, connectionConfig *mysql.ConnectionConfig, return "", fmt.Errorf("Unexpected database port reported: %+v / extra_port: %+v", port, extraPort) } } + +// lazyFindP90 finds the second to last value in a slice. +// This is the same as a p90 if there are 10 values, but if +// there were 100 values it would technically be a p99 etc. +func lazyFindP90(a []time.Duration) time.Duration { + sort.Slice(a, func(i, j int) bool { + return a[i] > a[j] + }) + return a[len(a)/10] +} diff --git a/go/base/utils_test.go b/go/base/utils_test.go index da98aeced..a3cf12d59 100644 --- a/go/base/utils_test.go +++ b/go/base/utils_test.go @@ -7,6 +7,7 @@ package base import ( "testing" + "time" "github.com/openark/golib/log" test "github.com/openark/golib/tests" @@ -27,3 +28,19 @@ func TestStringContainsAll(t *testing.T) { test.S(t).ExpectTrue(StringContainsAll(s, "insert", "")) test.S(t).ExpectTrue(StringContainsAll(s, "insert", "update", "delete")) } + +func TestFindP90(t *testing.T) { + times := []time.Duration{ + 1 * time.Second, + 2 * time.Second, + 1 * time.Second, + 3 * time.Second, + 10 * time.Second, + 1 * time.Second, + 1 * time.Second, + 1 * time.Second, + 1 * time.Second, + 1 * time.Second, + } + test.S(t).ExpectEquals(lazyFindP90(times), 3*time.Second) +} diff --git a/go/cmd/gh-ost/main.go b/go/cmd/gh-ost/main.go index 3daf24441..6d7667677 100644 --- a/go/cmd/gh-ost/main.go +++ b/go/cmd/gh-ost/main.go @@ -104,6 +104,8 @@ func main() { flag.BoolVar(&migrationContext.CutOverExponentialBackoff, "cut-over-exponential-backoff", false, "Wait exponentially longer intervals between failed cut-over attempts. Wait intervals obey a maximum configurable with 'exponential-backoff-max-interval').") exponentialBackoffMaxInterval := flag.Int64("exponential-backoff-max-interval", 64, "Maximum number of seconds to wait between attempts when performing various operations with exponential backoff.") chunkSize := flag.Int64("chunk-size", 1000, "amount of rows to handle in each iteration (allowed range: 10-100,000)") + flag.BoolVar(&migrationContext.DynamicChunking, "dynamic-chunking", false, "automatically adjust the chunk size based on a time-target") + flag.Int64Var(&migrationContext.DynamicChunkSizeTargetMillis, "dynamic-chunk-size-target-millis", 50, "target duration of a chunk when dynamic-chunking is enabled") dmlBatchSize := flag.Int64("dml-batch-size", 10, "batch size for DML events to apply in a single transaction (range 1-100)") defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking") cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout)") diff --git a/go/logic/applier.go b/go/logic/applier.go index ad6368e61..abe814811 100644 --- a/go/logic/applier.go +++ b/go/logic/applier.go @@ -577,7 +577,7 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo &this.migrationContext.UniqueKey.Columns, this.migrationContext.MigrationIterationRangeMinValues.AbstractValues(), this.migrationContext.MigrationRangeMaxValues.AbstractValues(), - atomic.LoadInt64(&this.migrationContext.ChunkSize), + this.migrationContext.GetChunkSize(), this.migrationContext.GetIteration() == 0, fmt.Sprintf("iteration:%d", this.migrationContext.GetIteration()), ) @@ -614,7 +614,7 @@ func (this *Applier) CalculateNextIterationRangeEndValues() (hasFurtherRange boo // data actually gets copied from original table. func (this *Applier) ApplyIterationInsertQuery() (chunkSize int64, rowsAffected int64, duration time.Duration, err error) { startTime := time.Now() - chunkSize = atomic.LoadInt64(&this.migrationContext.ChunkSize) + chunkSize = this.migrationContext.GetChunkSize() query, explodedArgs, err := sql.BuildRangeInsertPreparedQuery( this.migrationContext.DatabaseName, diff --git a/go/logic/migrator.go b/go/logic/migrator.go index a102188a8..d1e5f95f7 100644 --- a/go/logic/migrator.go +++ b/go/logic/migrator.go @@ -844,7 +844,7 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) { maxLoad := this.migrationContext.GetMaxLoad() criticalLoad := this.migrationContext.GetCriticalLoad() fmt.Fprintf(w, "# chunk-size: %+v; max-lag-millis: %+vms; dml-batch-size: %+v; max-load: %s; critical-load: %s; nice-ratio: %f\n", - atomic.LoadInt64(&this.migrationContext.ChunkSize), + this.migrationContext.GetChunkSize(), atomic.LoadInt64(&this.migrationContext.MaxLagMillisecondsThrottleThreshold), atomic.LoadInt64(&this.migrationContext.DMLBatchSize), maxLoad.String(), @@ -1315,8 +1315,13 @@ func (this *Migrator) executeWriteFuncs() error { if err := copyRowsFunc(); err != nil { return this.migrationContext.Log.Errore(err) } + // Send feedback to the chunker. + copyRowsDuration := time.Since(copyRowsStartTime) + outOfRange := this.migrationContext.ChunkDurationFeedback(copyRowsDuration) + if outOfRange { + this.migrationContext.Log.Warningf("Chunk duration took: %s, throttling copy-rows", copyRowsDuration) + } if niceRatio := this.migrationContext.GetNiceRatio(); niceRatio > 0 { - copyRowsDuration := time.Since(copyRowsStartTime) sleepTimeNanosecondFloat64 := niceRatio * float64(copyRowsDuration.Nanoseconds()) sleepTime := time.Duration(int64(sleepTimeNanosecondFloat64)) * time.Nanosecond time.Sleep(sleepTime) diff --git a/go/logic/server.go b/go/logic/server.go index 4b1b87023..5516d3ccc 100644 --- a/go/logic/server.go +++ b/go/logic/server.go @@ -196,7 +196,7 @@ help # This message case "chunk-size": { if argIsQuestion { - fmt.Fprintf(writer, "%+v\n", atomic.LoadInt64(&this.migrationContext.ChunkSize)) + fmt.Fprintf(writer, "%+v\n", this.migrationContext.GetChunkSize()) return NoPrintStatusRule, nil } if chunkSize, err := strconv.Atoi(arg); err != nil {