Skip to content

Commit

Permalink
Merge pull request #160 from github/fix-unsigned-columns
Browse files Browse the repository at this point in the history
Fix unsigned columns
  • Loading branch information
Shlomi Noach authored Aug 17, 2016
2 parents 01d48e6 + 3a0ee9b commit 825c64f
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 21 deletions.
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#
#

RELEASE_VERSION="1.0.9"
RELEASE_VERSION="1.0.10"

function build {
osname=$1
Expand Down
10 changes: 0 additions & 10 deletions go/binlog/gomysql_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,33 +128,23 @@ func (this *GoMySQLReader) StreamEvents(canStopStreaming func() bool, entriesCha
if err != nil {
return err
}
// if rand.Intn(1000) == 0 {
// this.binlogSyncer.Close()
// log.Debugf("current: %+v, hint: %+v", this.currentCoordinates, this.LastAppliedRowsEventHint)
// return log.Errorf(".............haha got random error")
// }
// log.Debugf("0001 ........ currentCoordinates: %+v", this.currentCoordinates) //TODO
func() {
this.currentCoordinatesMutex.Lock()
defer this.currentCoordinatesMutex.Unlock()
this.currentCoordinates.LogPos = int64(ev.Header.LogPos)
}()
if rotateEvent, ok := ev.Event.(*replication.RotateEvent); ok {
// log.Debugf("0008 ........ currentCoordinates: %+v", this.currentCoordinates) //TODO
// ev.Dump(os.Stdout)
func() {
this.currentCoordinatesMutex.Lock()
defer this.currentCoordinatesMutex.Unlock()
this.currentCoordinates.LogFile = string(rotateEvent.NextLogName)
}()
// log.Debugf("0001 ........ currentCoordinates: %+v", this.currentCoordinates) //TODO
log.Infof("rotate to next log name: %s", rotateEvent.NextLogName)
} else if rowsEvent, ok := ev.Event.(*replication.RowsEvent); ok {
if err := this.handleRowsEvent(ev, rowsEvent, entriesChannel); err != nil {
return err
}
}
// log.Debugf("TODO ........ currentCoordinates: %+v", this.currentCoordinates) //TODO
}
log.Debugf("done streaming events")

Expand Down
29 changes: 29 additions & 0 deletions go/logic/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,12 +853,41 @@ func (this *Applier) ApplyDMLEventQuery(dmlEvent *binlog.BinlogDMLEvent) error {
if err != nil {
return err
}

// TODO The below is commented, and is in preparation for transactional writes on the ghost tables.
// Such writes would be, for example:
// - prepended with sql_mode setup
// - prepended with SET SQL_LOG_BIN=0
// - prepended with SET FK_CHECKS=0
// etc.
//
// Current known problem: https://github.com/golang/go/issues/9373 -- bitint unsigned values, not supported in database/sql
//

// err = func() error {
// tx, err := this.db.Begin()
// if err != nil {
// return err
// }
// if _, err := tx.Exec(query, args...); err != nil {
// return err
// }
// if err := tx.Commit(); err != nil {
// return err
// }
// return nil
// }()

_, err = sqlutils.Exec(this.db, query, args...)
if err == nil {
atomic.AddInt64(&this.migrationContext.TotalDMLEventsApplied, 1)
}
if this.migrationContext.CountTableRows {
atomic.AddInt64(&this.migrationContext.RowsEstimate, rowDelta)
}
if err != nil {
err = fmt.Errorf("%s; query=%s; args=%+v", err.Error(), query, args)
log.Errore(err)
}
return err
}
27 changes: 27 additions & 0 deletions go/logic/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,13 @@ func (this *Inspector) InspectOriginalAndGhostTables() (err error) {
this.migrationContext.SharedColumns, this.migrationContext.MappedSharedColumns = this.getSharedColumns(this.migrationContext.OriginalTableColumns, this.migrationContext.GhostTableColumns, this.migrationContext.ColumnRenameMap)
log.Infof("Shared columns are %s", this.migrationContext.SharedColumns)
// By fact that a non-empty unique key exists we also know the shared columns are non-empty

// This additional step looks at which columns are unsigned. We could have merged this within
// the `getTableColumns()` function, but it's a later patch and introduces some complexity; I feel
// comfortable in doing this as a separate step.
this.applyUnsignedColumns(this.migrationContext.DatabaseName, this.migrationContext.OriginalTableName, this.migrationContext.OriginalTableColumns, this.migrationContext.SharedColumns)
this.applyUnsignedColumns(this.migrationContext.DatabaseName, this.migrationContext.GetGhostTableName(), this.migrationContext.GhostTableColumns, this.migrationContext.MappedSharedColumns)

return nil
}

Expand Down Expand Up @@ -450,6 +457,26 @@ func (this *Inspector) getTableColumns(databaseName, tableName string) (*sql.Col
return sql.NewColumnList(columnNames), nil
}

// applyUnsignedColumns
func (this *Inspector) applyUnsignedColumns(databaseName, tableName string, columnsLists ...*sql.ColumnList) error {
query := fmt.Sprintf(`
show columns from %s.%s
`,
sql.EscapeName(databaseName),
sql.EscapeName(tableName),
)
err := sqlutils.QueryRowsMap(this.db, query, func(rowMap sqlutils.RowMap) error {
columnName := rowMap.GetString("Field")
if strings.Contains(rowMap.GetString("Type"), "unsigned") {
for _, columnsList := range columnsLists {
columnsList.SetUnsigned(columnName)
}
}
return nil
})
return err
}

// getCandidateUniqueKeys investigates a table and returns the list of unique keys
// candidate for chunking
func (this *Inspector) getCandidateUniqueKeys(tableName string) (uniqueKeys [](*sql.UniqueKey), err error) {
Expand Down
35 changes: 31 additions & 4 deletions go/sql/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,29 @@ func EscapeName(name string) string {
return fmt.Sprintf("`%s`", name)
}

func fixArgType(arg interface{}, isUnsigned bool) interface{} {
if !isUnsigned {
return arg
}
// unsigned
if i, ok := arg.(int8); ok {
return uint8(i)
}
if i, ok := arg.(int16); ok {
return uint16(i)
}
if i, ok := arg.(int32); ok {
return uint32(i)
}
if i, ok := arg.(int64); ok {
return uint64(i)
}
if i, ok := arg.(int); ok {
return uint(i)
}
return arg
}

func buildPreparedValues(length int) []string {
values := make([]string, length, length)
for i := 0; i < length; i++ {
Expand Down Expand Up @@ -309,7 +332,8 @@ func BuildDMLDeleteQuery(databaseName, tableName string, tableColumns, uniqueKey
}
for _, column := range uniqueKeyColumns.Names {
tableOrdinal := tableColumns.Ordinals[column]
uniqueKeyArgs = append(uniqueKeyArgs, args[tableOrdinal])
arg := fixArgType(args[tableOrdinal], uniqueKeyColumns.IsUnsigned(column))
uniqueKeyArgs = append(uniqueKeyArgs, arg)
}
databaseName = EscapeName(databaseName)
tableName = EscapeName(tableName)
Expand Down Expand Up @@ -345,7 +369,8 @@ func BuildDMLInsertQuery(databaseName, tableName string, tableColumns, sharedCol

for _, column := range sharedColumns.Names {
tableOrdinal := tableColumns.Ordinals[column]
sharedArgs = append(sharedArgs, args[tableOrdinal])
arg := fixArgType(args[tableOrdinal], sharedColumns.IsUnsigned(column))
sharedArgs = append(sharedArgs, arg)
}

sharedColumnNames := duplicateNames(sharedColumns.Names)
Expand Down Expand Up @@ -392,12 +417,14 @@ func BuildDMLUpdateQuery(databaseName, tableName string, tableColumns, sharedCol

for _, column := range sharedColumns.Names {
tableOrdinal := tableColumns.Ordinals[column]
sharedArgs = append(sharedArgs, valueArgs[tableOrdinal])
arg := fixArgType(valueArgs[tableOrdinal], sharedColumns.IsUnsigned(column))
sharedArgs = append(sharedArgs, arg)
}

for _, column := range uniqueKeyColumns.Names {
tableOrdinal := tableColumns.Ordinals[column]
uniqueKeyArgs = append(uniqueKeyArgs, whereArgs[tableOrdinal])
arg := fixArgType(whereArgs[tableOrdinal], uniqueKeyColumns.IsUnsigned(column))
uniqueKeyArgs = append(uniqueKeyArgs, arg)
}

sharedColumnNames := duplicateNames(sharedColumns.Names)
Expand Down
135 changes: 135 additions & 0 deletions go/sql/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,44 @@ func TestBuildDMLDeleteQuery(t *testing.T) {
}
}

func TestBuildDMLDeleteQuerySignedUnsigned(t *testing.T) {
databaseName := "mydb"
tableName := "tbl"
tableColumns := NewColumnList([]string{"id", "name", "rank", "position", "age"})
uniqueKeyColumns := NewColumnList([]string{"position"})
{
// test signed (expect no change)
args := []interface{}{3, "testname", "first", -1, 23}
query, uniqueKeyArgs, err := BuildDMLDeleteQuery(databaseName, tableName, tableColumns, uniqueKeyColumns, args)
test.S(t).ExpectNil(err)
expected := `
delete /* gh-ost mydb.tbl */
from
mydb.tbl
where
((position = ?))
`
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
test.S(t).ExpectTrue(reflect.DeepEqual(uniqueKeyArgs, []interface{}{-1}))
}
{
// test unsigned
args := []interface{}{3, "testname", "first", int8(-1), 23}
uniqueKeyColumns.SetUnsigned("position")
query, uniqueKeyArgs, err := BuildDMLDeleteQuery(databaseName, tableName, tableColumns, uniqueKeyColumns, args)
test.S(t).ExpectNil(err)
expected := `
delete /* gh-ost mydb.tbl */
from
mydb.tbl
where
((position = ?))
`
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
test.S(t).ExpectTrue(reflect.DeepEqual(uniqueKeyArgs, []interface{}{uint8(255)}))
}
}

func TestBuildDMLInsertQuery(t *testing.T) {
databaseName := "mydb"
tableName := "tbl"
Expand Down Expand Up @@ -442,6 +480,61 @@ func TestBuildDMLInsertQuery(t *testing.T) {
}
}

func TestBuildDMLInsertQuerySignedUnsigned(t *testing.T) {
databaseName := "mydb"
tableName := "tbl"
tableColumns := NewColumnList([]string{"id", "name", "rank", "position", "age"})
sharedColumns := NewColumnList([]string{"id", "name", "position", "age"})
{
// testing signed
args := []interface{}{3, "testname", "first", int8(-1), 23}
sharedColumns := NewColumnList([]string{"id", "name", "position", "age"})
query, sharedArgs, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, args)
test.S(t).ExpectNil(err)
expected := `
replace /* gh-ost mydb.tbl */
into mydb.tbl
(id, name, position, age)
values
(?, ?, ?, ?)
`
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
test.S(t).ExpectTrue(reflect.DeepEqual(sharedArgs, []interface{}{3, "testname", int8(-1), 23}))
}
{
// testing unsigned
args := []interface{}{3, "testname", "first", int8(-1), 23}
sharedColumns.SetUnsigned("position")
query, sharedArgs, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, args)
test.S(t).ExpectNil(err)
expected := `
replace /* gh-ost mydb.tbl */
into mydb.tbl
(id, name, position, age)
values
(?, ?, ?, ?)
`
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
test.S(t).ExpectTrue(reflect.DeepEqual(sharedArgs, []interface{}{3, "testname", uint8(255), 23}))
}
{
// testing unsigned
args := []interface{}{3, "testname", "first", int32(-1), 23}
sharedColumns.SetUnsigned("position")
query, sharedArgs, err := BuildDMLInsertQuery(databaseName, tableName, tableColumns, sharedColumns, args)
test.S(t).ExpectNil(err)
expected := `
replace /* gh-ost mydb.tbl */
into mydb.tbl
(id, name, position, age)
values
(?, ?, ?, ?)
`
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
test.S(t).ExpectTrue(reflect.DeepEqual(sharedArgs, []interface{}{3, "testname", uint32(4294967295), 23}))
}
}

func TestBuildDMLUpdateQuery(t *testing.T) {
databaseName := "mydb"
tableName := "tbl"
Expand Down Expand Up @@ -525,3 +618,45 @@ func TestBuildDMLUpdateQuery(t *testing.T) {
test.S(t).ExpectNotNil(err)
}
}

func TestBuildDMLUpdateQuerySignedUnsigned(t *testing.T) {
databaseName := "mydb"
tableName := "tbl"
tableColumns := NewColumnList([]string{"id", "name", "rank", "position", "age"})
valueArgs := []interface{}{3, "testname", "newval", int8(-17), int8(-2)}
whereArgs := []interface{}{3, "testname", "findme", int8(-3), 56}
sharedColumns := NewColumnList([]string{"id", "name", "position", "age"})
uniqueKeyColumns := NewColumnList([]string{"position"})
{
// test signed
query, sharedArgs, uniqueKeyArgs, err := BuildDMLUpdateQuery(databaseName, tableName, tableColumns, sharedColumns, uniqueKeyColumns, valueArgs, whereArgs)
test.S(t).ExpectNil(err)
expected := `
update /* gh-ost mydb.tbl */
mydb.tbl
set id=?, name=?, position=?, age=?
where
((position = ?))
`
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
test.S(t).ExpectTrue(reflect.DeepEqual(sharedArgs, []interface{}{3, "testname", int8(-17), int8(-2)}))
test.S(t).ExpectTrue(reflect.DeepEqual(uniqueKeyArgs, []interface{}{int8(-3)}))
}
{
// test unsigned
sharedColumns.SetUnsigned("age")
uniqueKeyColumns.SetUnsigned("position")
query, sharedArgs, uniqueKeyArgs, err := BuildDMLUpdateQuery(databaseName, tableName, tableColumns, sharedColumns, uniqueKeyColumns, valueArgs, whereArgs)
test.S(t).ExpectNil(err)
expected := `
update /* gh-ost mydb.tbl */
mydb.tbl
set id=?, name=?, position=?, age=?
where
((position = ?))
`
test.S(t).ExpectEquals(normalizeQuery(query), normalizeQuery(expected))
test.S(t).ExpectTrue(reflect.DeepEqual(sharedArgs, []interface{}{3, "testname", int8(-17), uint8(254)}))
test.S(t).ExpectTrue(reflect.DeepEqual(uniqueKeyArgs, []interface{}{uint8(253)}))
}
}
Loading

0 comments on commit 825c64f

Please sign in to comment.