Skip to content

Commit

Permalink
stop double-buffering (#1643)
Browse files Browse the repository at this point in the history
Since we dropped Go 1.20 support, we do not need double buffering.

This pull request stop double buffering and simplify buffer
implementation a lot.

Fix #1435
  • Loading branch information
methane authored Dec 1, 2024
1 parent 2df7a26 commit 575e1b2
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 84 deletions.
118 changes: 44 additions & 74 deletions buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,47 +22,30 @@ const maxCachedBufSize = 256 * 1024
// In other words, we can't write and read simultaneously on the same connection.
// The buffer is similar to bufio.Reader / Writer but zero-copy-ish
// Also highly optimized for this particular use case.
// This buffer is backed by two byte slices in a double-buffering scheme
type buffer struct {
buf []byte // buf is a byte buffer who's length and capacity are equal.
nc net.Conn
idx int
length int
timeout time.Duration
dbuf [2][]byte // dbuf is an array with the two byte slices that back this buffer
flipcnt uint // flipccnt is the current buffer counter for double-buffering
buf []byte // read buffer.
cachedBuf []byte // buffer that will be reused. len(cachedBuf) <= maxCachedBufSize.
nc net.Conn
timeout time.Duration
}

// newBuffer allocates and returns a new buffer.
func newBuffer(nc net.Conn) buffer {
fg := make([]byte, defaultBufSize)
return buffer{
buf: fg,
nc: nc,
dbuf: [2][]byte{fg, nil},
cachedBuf: make([]byte, defaultBufSize),
nc: nc,
}
}

// busy returns true if the buffer contains some read data.
// busy returns true if the read buffer is not empty.
func (b *buffer) busy() bool {
return b.length > 0
return len(b.buf) > 0
}

// flip replaces the active buffer with the background buffer
// this is a delayed flip that simply increases the buffer counter;
// the actual flip will be performed the next time we call `buffer.fill`
func (b *buffer) flip() {
b.flipcnt += 1
}

// fill reads into the buffer until at least _need_ bytes are in it
// fill reads into the read buffer until at least _need_ bytes are in it.
func (b *buffer) fill(need int) error {
n := b.length
// fill data into its double-buffering target: if we've called
// flip on this buffer, we'll be copying to the background buffer,
// and then filling it with network data; otherwise we'll just move
// the contents of the current buffer to the front before filling it
dest := b.dbuf[b.flipcnt&1]
// we'll move the contents of the current buffer to dest before filling it.
dest := b.cachedBuf

// grow buffer if necessary to fit the whole packet.
if need > len(dest) {
Expand All @@ -72,18 +55,13 @@ func (b *buffer) fill(need int) error {
// if the allocated buffer is not too large, move it to backing storage
// to prevent extra allocations on applications that perform large reads
if len(dest) <= maxCachedBufSize {
b.dbuf[b.flipcnt&1] = dest
b.cachedBuf = dest
}
}

// if we're filling the fg buffer, move the existing data to the start of it.
// if we're filling the bg buffer, copy over the data
if n > 0 {
copy(dest[:n], b.buf[b.idx:])
}

b.buf = dest
b.idx = 0
// move the existing data to the start of the buffer.
n := len(b.buf)
copy(dest[:n], b.buf)

for {
if b.timeout > 0 {
Expand All @@ -92,63 +70,58 @@ func (b *buffer) fill(need int) error {
}
}

nn, err := b.nc.Read(b.buf[n:])
nn, err := b.nc.Read(dest[n:])
n += nn

switch err {
case nil:
if n < need {
continue
}
b.length = n
return nil
if err == nil && n < need {
continue
}

case io.EOF:
if n >= need {
b.length = n
return nil
}
return io.ErrUnexpectedEOF
b.buf = dest[:n]

default:
return err
if err == io.EOF {
if n < need {
err = io.ErrUnexpectedEOF
} else {
err = nil
}
}
return err
}
}

// returns next N bytes from buffer.
// The returned slice is only guaranteed to be valid until the next read
func (b *buffer) readNext(need int) ([]byte, error) {
if b.length < need {
if len(b.buf) < need {
// refill
if err := b.fill(need); err != nil {
return nil, err
}
}

offset := b.idx
b.idx += need
b.length -= need
return b.buf[offset:b.idx], nil
data := b.buf[:need]
b.buf = b.buf[need:]
return data, nil
}

// takeBuffer returns a buffer with the requested size.
// If possible, a slice from the existing buffer is returned.
// Otherwise a bigger buffer is made.
// Only one buffer (total) can be used at a time.
func (b *buffer) takeBuffer(length int) ([]byte, error) {
if b.length > 0 {
if b.busy() {
return nil, ErrBusyBuffer
}

// test (cheap) general case first
if length <= cap(b.buf) {
return b.buf[:length], nil
if length <= len(b.cachedBuf) {
return b.cachedBuf[:length], nil
}

if length < maxPacketSize {
b.buf = make([]byte, length)
return b.buf, nil
if length < maxCachedBufSize {
b.cachedBuf = make([]byte, length)
return b.cachedBuf, nil
}

// buffer is larger than we want to store.
Expand All @@ -159,29 +132,26 @@ func (b *buffer) takeBuffer(length int) ([]byte, error) {
// known to be smaller than defaultBufSize.
// Only one buffer (total) can be used at a time.
func (b *buffer) takeSmallBuffer(length int) ([]byte, error) {
if b.length > 0 {
if b.busy() {
return nil, ErrBusyBuffer
}
return b.buf[:length], nil
return b.cachedBuf[:length], nil
}

// takeCompleteBuffer returns the complete existing buffer.
// This can be used if the necessary buffer size is unknown.
// cap and len of the returned buffer will be equal.
// Only one buffer (total) can be used at a time.
func (b *buffer) takeCompleteBuffer() ([]byte, error) {
if b.length > 0 {
if b.busy() {
return nil, ErrBusyBuffer
}
return b.buf, nil
return b.cachedBuf, nil
}

// store stores buf, an updated buffer, if its suitable to do so.
func (b *buffer) store(buf []byte) error {
if b.length > 0 {
return ErrBusyBuffer
} else if cap(buf) <= maxPacketSize && cap(buf) > cap(b.buf) {
b.buf = buf[:cap(buf)]
func (b *buffer) store(buf []byte) {
if cap(buf) <= maxCachedBufSize && cap(buf) > cap(b.cachedBuf) {
b.cachedBuf = buf[:cap(buf)]
}
return nil
}
4 changes: 1 addition & 3 deletions packets.go
Original file line number Diff line number Diff line change
Expand Up @@ -1191,9 +1191,7 @@ func (stmt *mysqlStmt) writeExecutePacket(args []driver.Value) error {
// In that case we must build the data packet with the new values buffer
if valuesCap != cap(paramValues) {
data = append(data[:pos], paramValues...)
if err = mc.buf.store(data); err != nil {
return err
}
mc.buf.store(data) // allow this buffer to be reused
}

pos += len(paramValues)
Expand Down
7 changes: 0 additions & 7 deletions rows.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,6 @@ func (rows *mysqlRows) Close() (err error) {
return err
}

// flip the buffer for this connection if we need to drain it.
// note that for a successful query (i.e. one where rows.next()
// has been called until it returns false), `rows.mc` will be nil
// by the time the user calls `(*Rows).Close`, so we won't reach this
// see: https://github.com/golang/go/commit/651ddbdb5056ded455f47f9c494c67b389622a47
mc.buf.flip()

// Remove unread packets from stream
if !rows.rs.done {
err = mc.readUntilEOF()
Expand Down

0 comments on commit 575e1b2

Please sign in to comment.