Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use uint64 for binary log file position #17472

Merged
merged 15 commits into from
Jan 10, 2025
2 changes: 1 addition & 1 deletion go/mysql/binlog_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type BinlogEvent interface {
IsValid() bool

// General protocol events.
NextPosition() uint32
NextPosition() uint64

// IsFormatDescription returns true if this is a
// FORMAT_DESCRIPTION_EVENT. Do not call StripChecksum before
Expand Down
7 changes: 4 additions & 3 deletions go/mysql/binlog_event_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import (
// +----------------------------+
// | extra_headers 19 : x-19 |
// +============================+
// http://dev.mysql.com/doc/internals/en/event-header-fields.html
// https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_replication_binlog_event.html#sect_protocol_replication_binlog_event_header
type binlogEvent []byte

const (
Expand Down Expand Up @@ -119,8 +119,9 @@ func (ev binlogEvent) Length() uint32 {
}

// NextPosition returns the nextPosition field from the header
func (ev binlogEvent) NextPosition() uint32 {
return binary.LittleEndian.Uint32(ev.Bytes()[13 : 13+4])
func (ev binlogEvent) NextPosition() uint64 {
// Only 4 bytes are used for the next_position field in the header.
return uint64(binary.LittleEndian.Uint32(ev.Bytes()[13 : 13+4]))
timvaillancourt marked this conversation as resolved.
Show resolved Hide resolved
}

// IsFormatDescription implements BinlogEvent.IsFormatDescription().
Expand Down
9 changes: 5 additions & 4 deletions go/mysql/binlog_event_filepos.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,13 @@ func (ev *filePosBinlogEvent) StripChecksum(f BinlogFormat) (BinlogEvent, []byte

// nextPosition returns the next file position of the binlog.
// If no information is available, it returns 0.
func (ev *filePosBinlogEvent) nextPosition(f BinlogFormat) uint32 {
func (ev *filePosBinlogEvent) nextPosition(f BinlogFormat) uint64 {
if f.HeaderLength <= 13 {
// Dead code. This is just a failsafe.
return 0
}
return binary.LittleEndian.Uint32(ev.Bytes()[13:17])
// The header only uses 4 bytes for the next_position.
return uint64(binary.LittleEndian.Uint32(ev.Bytes()[13:17]))
}

// rotate implements BinlogEvent.Rotate().
Expand Down Expand Up @@ -139,7 +140,7 @@ type filePosFakeEvent struct {
timestamp uint32
}

func (ev filePosFakeEvent) NextPosition() uint32 {
func (ev filePosFakeEvent) NextPosition() uint64 {
return 0
}

Expand Down Expand Up @@ -283,7 +284,7 @@ type filePosGTIDEvent struct {
gtid replication.FilePosGTID
}

func newFilePosGTIDEvent(file string, pos uint32, timestamp uint32) filePosGTIDEvent {
func newFilePosGTIDEvent(file string, pos uint64, timestamp uint32) filePosGTIDEvent {
return filePosGTIDEvent{
filePosFakeEvent: filePosFakeEvent{
timestamp: timestamp,
Expand Down
12 changes: 9 additions & 3 deletions go/mysql/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package mysql

import (
"fmt"
"math"

"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/vt/proto/vtrpc"
Expand All @@ -32,9 +33,14 @@ const (
// This file contains the methods related to replication.

// WriteComBinlogDump writes a ComBinlogDump command.
// See http://dev.mysql.com/doc/internals/en/com-binlog-dump.html for syntax.
// Returns a SQLError.
func (c *Conn) WriteComBinlogDump(serverID uint32, binlogFilename string, binlogPos uint32, flags uint16) error {
// See: https://dev.mysql.com/doc/dev/mysql-server/latest/page_protocol_com_binlog_dump.html
func (c *Conn) WriteComBinlogDump(serverID uint32, binlogFilename string, binlogPos uint64, flags uint16) error {
// The binary log file position is a uint64, but the protocol command
// only uses 4 bytes for the file position.
if binlogPos > math.MaxUint32 {
return vterrors.Errorf(vtrpc.Code_INVALID_ARGUMENT, "binlog position %d is too large, it must fit into 32 bits", binlogPos)
}
c.sequence = 0
length := 1 + // ComBinlogDump
4 + // binlog-pos
Expand All @@ -43,7 +49,7 @@ func (c *Conn) WriteComBinlogDump(serverID uint32, binlogFilename string, binlog
len(binlogFilename) // binlog-filename
data, pos := c.startEphemeralPacketWithHeader(length)
pos = writeByte(data, pos, ComBinlogDump)
pos = writeUint32(data, pos, binlogPos)
pos = writeUint32(data, pos, uint32(binlogPos))
pos = writeUint16(data, pos, flags)
pos = writeUint32(data, pos, serverID)
_ = writeEOFString(data, pos, binlogFilename)
Expand Down
6 changes: 3 additions & 3 deletions go/mysql/replication/filepos_gtid.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ func parseFilePosGTID(s string) (GTID, error) {
return nil, fmt.Errorf("invalid FilePos GTID (%v): expecting file:pos", s)
}

pos, err := strconv.ParseUint(parts[1], 0, 32)
pos, err := strconv.ParseUint(parts[1], 0, 64)
if err != nil {
return nil, fmt.Errorf("invalid FilePos GTID (%v): expecting pos to be an integer", s)
}

return FilePosGTID{
File: parts[0],
Pos: uint32(pos),
Pos: pos,
}, nil
}

Expand All @@ -56,7 +56,7 @@ func ParseFilePosGTIDSet(s string) (GTIDSet, error) {
// FilePosGTID implements GTID.
type FilePosGTID struct {
File string
Pos uint32
Pos uint64
}

// String implements GTID.String().
Expand Down
9 changes: 7 additions & 2 deletions go/mysql/replication/filepos_gtid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
func Test_filePosGTID_String(t *testing.T) {
type fields struct {
file string
pos uint32
pos uint64
}
tests := []struct {
name string
Expand All @@ -35,6 +35,11 @@ func Test_filePosGTID_String(t *testing.T) {
fields{file: "mysql-bin.166031", pos: 192394},
"mysql-bin.166031:192394",
},
{
"handles large position correctly",
fields{file: "vt-1448040107-bin.003222", pos: 4663881395},
"vt-1448040107-bin.003222:4663881395",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand All @@ -52,7 +57,7 @@ func Test_filePosGTID_String(t *testing.T) {
func Test_filePosGTID_ContainsGTID(t *testing.T) {
type fields struct {
file string
pos uint32
pos uint64
}
type args struct {
other GTID
Expand Down
5 changes: 5 additions & 0 deletions go/mysql/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package mysql

import (
"math"
"reflect"
"testing"

Expand All @@ -37,6 +38,10 @@ func TestComBinlogDump(t *testing.T) {
cConn.Close()
}()

// Try to write a ComBinlogDump packet with a position greater than 4 bytes.
err := cConn.WriteComBinlogDump(1, "moofarm", math.MaxInt64, 0x0d0e)
require.Error(t, err)

// Write ComBinlogDump packet, read it, compare.
if err := cConn.WriteComBinlogDump(0x01020304, "moofarm", 0x05060708, 0x090a); err != nil {
t.Fatalf("WriteComBinlogDump failed: %v", err)
Expand Down
16 changes: 6 additions & 10 deletions go/test/endtoend/migration/migration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,10 +132,6 @@ three streams although only two are required. This is to show that there can exi
streams from the same source. The main difference between an external source vs a vitess
source is that the source proto contains an "external_mysql" field instead of keyspace and shard.
That field is the key into the externalConnections section of the input yaml.

VReplicationExec: insert into _vt.vreplication (workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values('product', 'vt_commerce', 'filter:<rules:<match:\"product\" > > external_mysql:\"product\" ', ”, 9999, 9999, 'primary', 0, 0, 'Running')
VReplicationExec: insert into _vt.vreplication (workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values('customer', 'vt_commerce', 'filter:<rules:<match:\"customer\" > > external_mysql:\"customer\" ', ”, 9999, 9999, 'primary', 0, 0, 'Running')
VReplicationExec: insert into _vt.vreplication (workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values('orders', 'vt_commerce', 'filter:<rules:<match:\"orders\" > > external_mysql:\"customer\" ', ”, 9999, 9999, 'primary', 0, 0, 'Running')
*/
func TestMigration(t *testing.T) {
yamlFile := startCluster(t)
Expand All @@ -155,7 +151,7 @@ func TestMigration(t *testing.T) {
migrate(t, "customer", "commerce", []string{"customer"})
migrate(t, "customer", "commerce", []string{"orders"})
vttablet := keyspaces["commerce"].Shards[0].Vttablets[0].VttabletProcess
waitForVReplicationToCatchup(t, vttablet, 1*time.Second)
waitForVReplicationToCatchup(t, vttablet, 30*time.Second)

testcases := []struct {
query string
Expand Down Expand Up @@ -217,11 +213,11 @@ func migrate(t *testing.T, fromdb, toks string, tables []string) {
var sqlEscaped bytes.Buffer
val.EncodeSQL(&sqlEscaped)
query := fmt.Sprintf("insert into _vt.vreplication "+
"(workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state) values"+
"('%s', '%s', %s, '', 9999, 9999, 'primary', 0, 0, 'Running')", tables[0], "vt_"+toks, sqlEscaped.String())
fmt.Printf("VReplicationExec: %s\n", query)
vttablet := keyspaces[toks].Shards[0].Vttablets[0].VttabletProcess
err := clusterInstance.VtctldClientProcess.ExecuteCommand("VReplicationExec", vttablet.TabletPath, query)
"(workflow, db_name, source, pos, max_tps, max_replication_lag, tablet_types, time_updated, transaction_timestamp, state, options) values"+
"('%s', '%s', %s, '', 9999, 9999, 'primary', 0, 0, 'Running', '{}')", tables[0], "vt_"+toks, sqlEscaped.String())
fmt.Printf("VReplication insert: %s\n", query)
vttablet := keyspaces[toks].Shards[0].Vttablets[0].Alias
err := clusterInstance.VtctldClientProcess.ExecuteCommand("ExecuteFetchAsDBA", vttablet, query)
require.NoError(t, err)
}

Expand Down
10 changes: 5 additions & 5 deletions go/test/endtoend/vtorc/readtopologyinstance/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func TestReadTopologyInstanceBufferable(t *testing.T) {
assert.Equal(t, "ON", primaryInstance.GTIDMode)
assert.Equal(t, "FULL", primaryInstance.BinlogRowImage)
assert.Contains(t, primaryInstance.SelfBinlogCoordinates.LogFile, fmt.Sprintf("vt-0000000%d-bin", primary.TabletUID))
assert.Greater(t, primaryInstance.SelfBinlogCoordinates.LogPos, uint32(0))
assert.Greater(t, primaryInstance.SelfBinlogCoordinates.LogPos, uint64(0))
assert.True(t, primaryInstance.SemiSyncPrimaryEnabled)
assert.True(t, primaryInstance.SemiSyncReplicaEnabled)
assert.True(t, primaryInstance.SemiSyncPrimaryStatus)
Expand Down Expand Up @@ -138,7 +138,7 @@ func TestReadTopologyInstanceBufferable(t *testing.T) {
assert.Equal(t, utils.Hostname, replicaInstance.SourceHost)
assert.Equal(t, primary.MySQLPort, replicaInstance.SourcePort)
assert.Contains(t, replicaInstance.SelfBinlogCoordinates.LogFile, fmt.Sprintf("vt-0000000%d-bin", replica.TabletUID))
assert.Greater(t, replicaInstance.SelfBinlogCoordinates.LogPos, uint32(0))
assert.Greater(t, replicaInstance.SelfBinlogCoordinates.LogPos, uint64(0))
assert.False(t, replicaInstance.SemiSyncPrimaryEnabled)
assert.True(t, replicaInstance.SemiSyncReplicaEnabled)
assert.False(t, replicaInstance.SemiSyncPrimaryStatus)
Expand All @@ -156,11 +156,11 @@ func TestReadTopologyInstanceBufferable(t *testing.T) {
assert.True(t, replicaInstance.ReplicationIOThreadRuning)
assert.True(t, replicaInstance.ReplicationSQLThreadRuning)
assert.Equal(t, replicaInstance.ReadBinlogCoordinates.LogFile, primaryInstance.SelfBinlogCoordinates.LogFile)
assert.Greater(t, replicaInstance.ReadBinlogCoordinates.LogPos, uint32(0))
assert.Greater(t, replicaInstance.ReadBinlogCoordinates.LogPos, uint64(0))
assert.Equal(t, replicaInstance.ExecBinlogCoordinates.LogFile, primaryInstance.SelfBinlogCoordinates.LogFile)
assert.Greater(t, replicaInstance.ExecBinlogCoordinates.LogPos, uint32(0))
assert.Greater(t, replicaInstance.ExecBinlogCoordinates.LogPos, uint64(0))
assert.Contains(t, replicaInstance.RelaylogCoordinates.LogFile, fmt.Sprintf("vt-0000000%d-relay", replica.TabletUID))
assert.Greater(t, replicaInstance.RelaylogCoordinates.LogPos, uint32(0))
assert.Greater(t, replicaInstance.RelaylogCoordinates.LogPos, uint64(0))
assert.Empty(t, replicaInstance.LastIOError)
assert.Empty(t, replicaInstance.LastSQLError)
assert.EqualValues(t, 0, replicaInstance.SQLDelay)
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtorc/inst/analysis_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func GetReplicationAnalysis(keyspace string, shard string, hints *ReplicationAna
a.AnalyzedInstancePrimaryAlias = topoproto.TabletAliasString(primaryTablet.Alias)
a.AnalyzedInstanceBinlogCoordinates = BinlogCoordinates{
LogFile: m.GetString("binary_log_file"),
LogPos: m.GetUint32("binary_log_pos"),
LogPos: m.GetUint64("binary_log_pos"),
Type: BinaryLog,
}
isStaleBinlogCoordinates := m.GetBool("is_stale_binlog_coordinates")
Expand Down
11 changes: 6 additions & 5 deletions go/vt/vtorc/inst/binlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,22 +40,23 @@ const (
// BinlogCoordinates described binary log coordinates in the form of log file & log position.
type BinlogCoordinates struct {
LogFile string
LogPos uint32
LogPos uint64
Type BinlogType
}

// ParseInstanceKey will parse an InstanceKey from a string representation such as 127.0.0.1:3306
// ParseBinlogCoordinates will parse a string representation such as "mysql-bin.000001:12345"
// into a BinlogCoordinates struct.
func ParseBinlogCoordinates(logFileLogPos string) (*BinlogCoordinates, error) {
tokens := strings.SplitN(logFileLogPos, ":", 2)
if len(tokens) != 2 {
return nil, fmt.Errorf("ParseBinlogCoordinates: Cannot parse BinlogCoordinates from %s. Expected format is file:pos", logFileLogPos)
}

logPos, err := strconv.ParseUint(tokens[1], 10, 32)
logPos, err := strconv.ParseUint(tokens[1], 10, 64)
if err != nil {
return nil, fmt.Errorf("ParseBinlogCoordinates: invalid pos: %s", tokens[1])
}
return &BinlogCoordinates{LogFile: tokens[0], LogPos: uint32(logPos)}, nil
return &BinlogCoordinates{LogFile: tokens[0], LogPos: logPos}, nil
}

// DisplayString returns a user-friendly string representation of these coordinates
Expand Down Expand Up @@ -177,6 +178,6 @@ func (binlogCoordinates *BinlogCoordinates) ExtractDetachedCoordinates() (isDeta
}
detachedCoordinates.LogFile = detachedCoordinatesSubmatch[1]
logPos, _ := strconv.ParseUint(detachedCoordinatesSubmatch[2], 10, 32)
detachedCoordinates.LogPos = uint32(logPos)
detachedCoordinates.LogPos = logPos
return true, detachedCoordinates
}
4 changes: 2 additions & 2 deletions go/vt/vtorc/inst/binlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,15 @@ func TestPreviousFileCoordinates(t *testing.T) {

require.NoError(t, err)
require.Equal(t, previous.LogFile, "mysql-bin.000009")
require.Equal(t, previous.LogPos, uint32(0))
require.Equal(t, previous.LogPos, uint64(0))
}

func TestNextFileCoordinates(t *testing.T) {
next, err := testCoordinates.NextFileCoordinates()

require.NoError(t, err)
require.Equal(t, next.LogFile, "mysql-bin.000011")
require.Equal(t, next.LogPos, uint32(0))
require.Equal(t, next.LogPos, uint64(0))
}

func TestBinlogCoordinates(t *testing.T) {
Expand Down
8 changes: 4 additions & 4 deletions go/vt/vtorc/inst/instance_dao.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,14 +548,14 @@ func readInstanceRow(m sqlutils.RowMap) *Instance {
instance.GtidPurged = m.GetString("gtid_purged")
instance.GtidErrant = m.GetString("gtid_errant")
instance.SelfBinlogCoordinates.LogFile = m.GetString("binary_log_file")
instance.SelfBinlogCoordinates.LogPos = m.GetUint32("binary_log_pos")
instance.SelfBinlogCoordinates.LogPos = m.GetUint64("binary_log_pos")
instance.ReadBinlogCoordinates.LogFile = m.GetString("source_log_file")
instance.ReadBinlogCoordinates.LogPos = m.GetUint32("read_source_log_pos")
instance.ReadBinlogCoordinates.LogPos = m.GetUint64("read_source_log_pos")
instance.ExecBinlogCoordinates.LogFile = m.GetString("relay_source_log_file")
instance.ExecBinlogCoordinates.LogPos = m.GetUint32("exec_source_log_pos")
instance.ExecBinlogCoordinates.LogPos = m.GetUint64("exec_source_log_pos")
instance.IsDetached, _ = instance.ExecBinlogCoordinates.ExtractDetachedCoordinates()
instance.RelaylogCoordinates.LogFile = m.GetString("relay_log_file")
instance.RelaylogCoordinates.LogPos = m.GetUint32("relay_log_pos")
instance.RelaylogCoordinates.LogPos = m.GetUint64("relay_log_pos")
instance.RelaylogCoordinates.Type = RelayLog
instance.LastSQLError = m.GetString("last_sql_error")
instance.LastIOError = m.GetString("last_io_error")
Expand Down
5 changes: 3 additions & 2 deletions go/vt/wrangler/testlib/emergency_reparent_shard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ func TestEmergencyReparentShard(t *testing.T) {
},
},
})
goodReplica1RelayLogPos, _ := replication.ParseFilePosGTIDSet("relay-bin.000004:455")
goodReplica1RelayLogPos, err := replication.ParseFilePosGTIDSet("relay-bin.003222:18321744073709551612") // Requires all 64 bits or uint64
require.NoError(t, err)
goodReplica1.FakeMysqlDaemon.CurrentSourceFilePosition = replication.Position{
GTIDSet: goodReplica1RelayLogPos,
}
Expand Down Expand Up @@ -182,7 +183,7 @@ func TestEmergencyReparentShard(t *testing.T) {

// run EmergencyReparentShard
waitReplicaTimeout := time.Second * 2
err := vp.Run([]string{"EmergencyReparentShard", "--wait_replicas_timeout", waitReplicaTimeout.String(), newPrimary.Tablet.Keyspace + "/" + newPrimary.Tablet.Shard,
err = vp.Run([]string{"EmergencyReparentShard", "--wait_replicas_timeout", waitReplicaTimeout.String(), newPrimary.Tablet.Keyspace + "/" + newPrimary.Tablet.Shard,
topoproto.TabletAliasString(newPrimary.Tablet.Alias)})
require.NoError(t, err)
// check what was run
Expand Down
Loading