Skip to content

Commit

Permalink
Merge pull request #231 from github/named-cut-over
Browse files Browse the repository at this point in the history
Named cut over
  • Loading branch information
Shlomi Noach authored Sep 13, 2016
2 parents 5215dd5 + 7517d48 commit eac6a72
Show file tree
Hide file tree
Showing 6 changed files with 192 additions and 165 deletions.
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#
#

RELEASE_VERSION="1.0.18"
RELEASE_VERSION="1.0.20"

function build {
osname=$1
Expand Down
4 changes: 4 additions & 0 deletions go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ type MigrationContext struct {
criticalLoad LoadMap
PostponeCutOverFlagFile string
CutOverLockTimeoutSeconds int64
ForceNamedCutOverCommand bool
PanicFlagFile string
HooksPath string
HooksHintMessage string
Expand Down Expand Up @@ -140,6 +141,8 @@ type MigrationContext struct {
CountingRowsFlag int64
AllEventsUpToLockProcessedInjectedFlag int64
CleanupImminentFlag int64
UserCommandedUnpostponeFlag int64
PanicAbort chan error

OriginalTableColumns *sql.ColumnList
OriginalTableUniqueKeys [](*sql.UniqueKey)
Expand Down Expand Up @@ -192,6 +195,7 @@ func newMigrationContext() *MigrationContext {
configMutex: &sync.Mutex{},
pointOfInterestTimeMutex: &sync.Mutex{},
ColumnRenameMap: make(map[string]string),
PanicAbort: make(chan error),
}
}

Expand Down
1 change: 1 addition & 0 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func main() {
flag.BoolVar(&migrationContext.InitiallyDropOldTable, "initially-drop-old-table", false, "Drop a possibly existing OLD table (remains from a previous run?) before beginning operation. Default is to panic and abort if such table exists")
flag.BoolVar(&migrationContext.InitiallyDropGhostTable, "initially-drop-ghost-table", false, "Drop a possibly existing Ghost table (remains from a previous run?) before beginning operation. Default is to panic and abort if such table exists")
cutOver := flag.String("cut-over", "atomic", "choose cut-over type (default|atomic, two-step)")
flag.BoolVar(&migrationContext.ForceNamedCutOverCommand, "force-named-cut-over", false, "When true, the 'unpostpone|cut-over' interactive command must name the migrated table")

flag.BoolVar(&migrationContext.SwitchToRowBinlogFormat, "switch-to-rbr", false, "let this tool automatically switch binary log format to 'ROW' on the replica, if needed. The format will NOT be switched back. I'm too scared to do that, and wish to protect you if you happen to execute another migration while this one is running")
flag.BoolVar(&migrationContext.AssumeRBR, "assume-rbr", false, "set to 'true' when you know for certain your server uses 'ROW' binlog_format. gh-ost is unable to tell, event after reading binlog_format, whether the replication process does indeed use 'ROW', and restarts replication to be certain RBR setting is applied. Such operation requires SUPER privileges which you might not have. Setting this flag avoids restarting replication and you can proceed to use gh-ost without SUPER privileges")
Expand Down
168 changes: 14 additions & 154 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,11 @@
package logic

import (
"bufio"
"fmt"
"io"
"math"
"os"
"os/signal"
"strconv"
"strings"
"sync/atomic"
"syscall"
"time"
Expand Down Expand Up @@ -42,7 +39,8 @@ const (
type PrintStatusRule int

const (
HeuristicPrintStatusRule PrintStatusRule = iota
NoPrintStatusRule PrintStatusRule = iota
HeuristicPrintStatusRule = iota
ForcePrintStatusRule = iota
ForcePrintStatusOnlyRule = iota
ForcePrintStatusAndHintRule = iota
Expand All @@ -63,11 +61,9 @@ type Migrator struct {
tablesInPlace chan bool
rowCopyComplete chan bool
allEventsUpToLockProcessed chan bool
panicAbort chan error

rowCopyCompleteFlag int64
inCutOverCriticalActionFlag int64
userCommandedUnpostponeFlag int64
// copyRowsQueue should not be buffered; if buffered some non-damaging but
// excessive work happens at the end of the iteration as new copy-jobs arrive befroe realizing the copy is complete
copyRowsQueue chan tableWriteFunc
Expand All @@ -84,7 +80,6 @@ func NewMigrator() *Migrator {
firstThrottlingCollected: make(chan bool, 1),
rowCopyComplete: make(chan bool),
allEventsUpToLockProcessed: make(chan bool),
panicAbort: make(chan error),

copyRowsQueue: make(chan tableWriteFunc),
applyEventsQueue: make(chan tableWriteFunc, applyEventsQueueBuffer),
Expand Down Expand Up @@ -148,7 +143,7 @@ func (this *Migrator) retryOperation(operation func() error, notFatalHint ...boo
// there's an error. Let's try again.
}
if len(notFatalHint) == 0 {
this.panicAbort <- err
this.migrationContext.PanicAbort <- err
}
return err
}
Expand Down Expand Up @@ -217,7 +212,7 @@ func (this *Migrator) onChangelogStateEvent(dmlEvent *binlog.BinlogDMLEvent) (er

// listenOnPanicAbort aborts on abort request
func (this *Migrator) listenOnPanicAbort() {
err := <-this.panicAbort
err := <-this.migrationContext.PanicAbort
log.Fatale(err)
}

Expand Down Expand Up @@ -385,7 +380,7 @@ func (this *Migrator) cutOver() (err error) {
if this.migrationContext.PostponeCutOverFlagFile == "" {
return false, nil
}
if atomic.LoadInt64(&this.userCommandedUnpostponeFlag) > 0 {
if atomic.LoadInt64(&this.migrationContext.UserCommandedUnpostponeFlag) > 0 {
return false, nil
}
if base.FileExists(this.migrationContext.PostponeCutOverFlagFile) {
Expand Down Expand Up @@ -584,150 +579,12 @@ func (this *Migrator) atomicCutOver() (err error) {
return nil
}

// onServerCommand responds to a user's interactive command
func (this *Migrator) onServerCommand(command string, writer *bufio.Writer) (err error) {
defer writer.Flush()

tokens := strings.SplitN(command, "=", 2)
command = strings.TrimSpace(tokens[0])
arg := ""
if len(tokens) > 1 {
arg = strings.TrimSpace(tokens[1])
}

throttleHint := "# Note: you may only throttle for as long as your binary logs are not purged\n"

if err := this.hooksExecutor.onInteractiveCommand(command); err != nil {
return err
}

switch command {
case "help":
{
fmt.Fprintln(writer, `available commands:
status # Print a detailed status message
sup # Print a short status message
chunk-size=<newsize> # Set a new chunk-size
nice-ratio=<ratio> # Set a new nice-ratio, immediate sleep after each row-copy operation, float (examples: 0 is agrressive, 0.7 adds 70% runtime, 1.0 doubles runtime, 2.0 triples runtime, ...)
critical-load=<load> # Set a new set of max-load thresholds
max-lag-millis=<max-lag> # Set a new replication lag threshold
replication-lag-query=<query> # Set a new query that determines replication lag (no quotes)
max-load=<load> # Set a new set of max-load thresholds
throttle-query=<query> # Set a new throttle-query (no quotes)
throttle-control-replicas=<replicas> # Set a new comma delimited list of throttle control replicas
throttle # Force throttling
no-throttle # End forced throttling (other throttling may still apply)
unpostpone # Bail out a cut-over postpone; proceed to cut-over
panic # panic and quit without cleanup
help # This message
`)
}
case "sup":
this.printStatus(ForcePrintStatusOnlyRule, writer)
case "info", "status":
this.printStatus(ForcePrintStatusAndHintRule, writer)
case "chunk-size":
{
if chunkSize, err := strconv.Atoi(arg); err != nil {
fmt.Fprintf(writer, "%s\n", err.Error())
return log.Errore(err)
} else {
this.migrationContext.SetChunkSize(int64(chunkSize))
this.printStatus(ForcePrintStatusAndHintRule, writer)
}
}
case "max-lag-millis":
{
if maxLagMillis, err := strconv.Atoi(arg); err != nil {
fmt.Fprintf(writer, "%s\n", err.Error())
return log.Errore(err)
} else {
this.migrationContext.SetMaxLagMillisecondsThrottleThreshold(int64(maxLagMillis))
this.printStatus(ForcePrintStatusAndHintRule, writer)
}
}
case "replication-lag-query":
{
this.migrationContext.SetReplicationLagQuery(arg)
this.printStatus(ForcePrintStatusAndHintRule, writer)
}
case "nice-ratio":
{
if niceRatio, err := strconv.ParseFloat(arg, 64); err != nil {
fmt.Fprintf(writer, "%s\n", err.Error())
return log.Errore(err)
} else {
this.migrationContext.SetNiceRatio(niceRatio)
this.printStatus(ForcePrintStatusAndHintRule, writer)
}
}
case "max-load":
{
if err := this.migrationContext.ReadMaxLoad(arg); err != nil {
fmt.Fprintf(writer, "%s\n", err.Error())
return log.Errore(err)
}
this.printStatus(ForcePrintStatusAndHintRule, writer)
}
case "critical-load":
{
if err := this.migrationContext.ReadCriticalLoad(arg); err != nil {
fmt.Fprintf(writer, "%s\n", err.Error())
return log.Errore(err)
}
this.printStatus(ForcePrintStatusAndHintRule, writer)
}
case "throttle-query":
{
this.migrationContext.SetThrottleQuery(arg)
fmt.Fprintf(writer, throttleHint)
this.printStatus(ForcePrintStatusAndHintRule, writer)
}
case "throttle-control-replicas":
{
if err := this.migrationContext.ReadThrottleControlReplicaKeys(arg); err != nil {
fmt.Fprintf(writer, "%s\n", err.Error())
return log.Errore(err)
}
fmt.Fprintf(writer, "%s\n", this.migrationContext.GetThrottleControlReplicaKeys().ToCommaDelimitedList())
this.printStatus(ForcePrintStatusAndHintRule, writer)
}
case "throttle", "pause", "suspend":
{
atomic.StoreInt64(&this.migrationContext.ThrottleCommandedByUser, 1)
fmt.Fprintf(writer, throttleHint)
this.printStatus(ForcePrintStatusAndHintRule, writer)
}
case "no-throttle", "unthrottle", "resume", "continue":
{
atomic.StoreInt64(&this.migrationContext.ThrottleCommandedByUser, 0)
}
case "unpostpone", "no-postpone", "cut-over":
{
if atomic.LoadInt64(&this.migrationContext.IsPostponingCutOver) > 0 {
atomic.StoreInt64(&this.userCommandedUnpostponeFlag, 1)
fmt.Fprintf(writer, "Unpostponed\n")
} else {
fmt.Fprintf(writer, "You may only invoke this when gh-ost is actively postponing migration. At this time it is not.\n")
}
}
case "panic":
{
err := fmt.Errorf("User commanded 'panic'. I will now panic, without cleanup. PANIC!")
fmt.Fprintf(writer, "%s\n", err.Error())
this.panicAbort <- err
}
default:
err = fmt.Errorf("Unknown command: %s", command)
fmt.Fprintf(writer, "%s\n", err.Error())
return err
}
return nil
}

// initiateServer begins listening on unix socket/tcp for incoming interactive commands
func (this *Migrator) initiateServer() (err error) {
this.server = NewServer(this.onServerCommand)
var f printStatusFunc = func(rule PrintStatusRule, writer io.Writer) {
this.printStatus(rule, writer)
}
this.server = NewServer(this.hooksExecutor, f)
if err := this.server.BindSocketFile(); err != nil {
return err
}
Expand Down Expand Up @@ -887,6 +744,9 @@ func (this *Migrator) printMigrationStatusHint(writers ...io.Writer) {
// By default the status is written to standard output, but other writers can
// be used as well.
func (this *Migrator) printStatus(rule PrintStatusRule, writers ...io.Writer) {
if rule == NoPrintStatusRule {
return
}
writers = append(writers, os.Stdout)

elapsedTime := this.migrationContext.ElapsedTime()
Expand Down Expand Up @@ -1007,7 +867,7 @@ func (this *Migrator) initiateStreaming() error {
log.Debugf("Beginning streaming")
err := this.eventsStreamer.StreamEvents(this.canStopStreaming)
if err != nil {
this.panicAbort <- err
this.migrationContext.PanicAbort <- err
}
log.Debugf("Done streaming")
}()
Expand Down Expand Up @@ -1035,7 +895,7 @@ func (this *Migrator) addDMLEventsListener() error {

// initiateThrottler kicks in the throttling collection and the throttling checks.
func (this *Migrator) initiateThrottler() error {
this.throttler = NewThrottler(this.applier, this.inspector, this.panicAbort)
this.throttler = NewThrottler(this.applier, this.inspector)

go this.throttler.initiateThrottlerCollection(this.firstThrottlingCollected)
log.Infof("Waiting for first throttle metrics to be collected")
Expand Down
Loading

0 comments on commit eac6a72

Please sign in to comment.