Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix partial sstable writes #31

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion recordio/buffered_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func (d BufferedIOFactory) CreateNewReader(filePath string, bufSize int) (*os.Fi
return readFile, NewCountingByteReader(NewReaderBuf(readFile, block)), nil
}

func (d BufferedIOFactory) CreateNewWriter(filePath string, bufSize int) (*os.File, WriteCloserFlusher, error) {
func (d BufferedIOFactory) CreateNewWriter(filePath string, bufSize int) (*os.File, WriteSeekerCloserFlusher, error) {
writeFile, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
return nil, nil, err
Expand Down
26 changes: 21 additions & 5 deletions recordio/bufio_vendor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,14 @@ import (
"io"
)

type WriteCloserFlusher interface {
io.WriteCloser
type WriteSeekerCloser interface {
io.Writer
io.Seeker
io.Closer
}

type WriteSeekerCloserFlusher interface {
WriteSeekerCloser
Flush() error
Size() int
}
Expand All @@ -19,15 +25,25 @@ type WriteCloserFlusher interface {
// the underlying io.Writer.
// This is the same writer as bufio.Writer, but it allows us to supply the buffer from the outside.
// Namely, it has a new constructor in NewWriterBuf & NewAlignedWriterBuf, implements Close() and supports block aligned flushes.
// Later the interface of the writer added io.Seeker, which the writer now fully implements as well.
// Additionally, several methods that were not needed are removed to reduce the test surface of the original.
type Writer struct {
err error
buf []byte
n int
wr io.WriteCloser
wr WriteSeekerCloser
alignFlush bool
}

func (b *Writer) Seek(offset int64, whence int) (int64, error) {
err := b.Flush()
if err != nil {
return 0, err
}

return b.wr.Seek(offset, whence)
}

func (b *Writer) Close() error {
err := b.Flush()
if err != nil {
Expand All @@ -36,14 +52,14 @@ func (b *Writer) Close() error {
return b.wr.Close()
}

func NewWriterBuf(w io.WriteCloser, buf []byte) WriteCloserFlusher {
func NewWriterBuf(w WriteSeekerCloser, buf []byte) WriteSeekerCloserFlusher {
return &Writer{
buf: buf,
wr: w,
}
}

func NewAlignedWriterBuf(w io.WriteCloser, buf []byte) WriteCloserFlusher {
func NewAlignedWriterBuf(w WriteSeekerCloser, buf []byte) WriteSeekerCloserFlusher {
return &Writer{
buf: buf,
wr: w,
Expand Down
23 changes: 23 additions & 0 deletions recordio/bufio_vendor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package recordio
import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"io"
"testing"
)

Expand All @@ -15,6 +16,10 @@ func (w *closingWriter) Write(p []byte) (n int, err error) {
return len(p), nil
}

func (w *closingWriter) Seek(offset int64, whence int) (int64, error) {
return offset, nil
}

func (*closingWriter) Close() error {
return nil
}
Expand Down Expand Up @@ -45,6 +50,24 @@ func TestCreateNewBufferCloseFlushes(t *testing.T) {
assert.Equal(t, []byte{13, 6, 91, 22, 0, 0}, sink.buf)
}

func TestSeekFlushes(t *testing.T) {
sink := &closingWriter{make([]byte, 6)}
wBuf := NewWriterBuf(sink, make([]byte, 4))
assert.Equal(t, 4, wBuf.Size())

_, err := wBuf.Write([]byte{13, 6, 91, 22})
require.NoError(t, err)
// buffer should not been flushed so far
assert.Equal(t, []byte{0, 0, 0, 0, 0, 0}, sink.buf)

_, err = wBuf.Seek(0, io.SeekStart)
require.NoError(t, err)
assert.Equal(t, []byte{13, 6, 91, 22, 0, 0}, sink.buf)

require.NoError(t, wBuf.Close())
assert.Equal(t, []byte{13, 6, 91, 22, 0, 0}, sink.buf)
}

func TestCreateNewBufferWithAlignedSlice(t *testing.T) {
sink := &closingWriter{make([]byte, 8)}
wBuf := NewAlignedWriterBuf(sink, make([]byte, 4))
Expand Down
2 changes: 1 addition & 1 deletion recordio/direct_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func (d DirectIOFactory) CreateNewReader(filePath string, bufSize int) (*os.File
return readFile, NewCountingByteReader(NewReaderBuf(readFile, block)), nil
}

func (d DirectIOFactory) CreateNewWriter(filePath string, bufSize int) (*os.File, WriteCloserFlusher, error) {
func (d DirectIOFactory) CreateNewWriter(filePath string, bufSize int) (*os.File, WriteSeekerCloserFlusher, error) {
writeFile, err := directio.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
return nil, nil, err
Expand Down
30 changes: 25 additions & 5 deletions recordio/file_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/binary"
"errors"
"fmt"
"io"
"os"

pool "github.com/libp2p/go-buffer-pool"
Expand All @@ -22,9 +23,11 @@ type FileWriter struct {
open bool
closed bool

file *os.File
bufWriter WriteCloserFlusher
currentOffset uint64
file *os.File
bufWriter WriteSeekerCloserFlusher
currentOffset uint64
headerOffset uint64

compressionType int
compressor compressor.CompressionI
recordHeaderCache []byte
Expand Down Expand Up @@ -54,6 +57,7 @@ func (w *FileWriter) Open() error {
}

w.currentOffset = uint64(offset)
w.headerOffset = w.currentOffset
w.open = true
w.recordHeaderCache = make([]byte, RecordHeaderV2MaxSizeBytes)
w.bufferPool = new(pool.BufferPool)
Expand Down Expand Up @@ -88,7 +92,7 @@ func fileHeaderAsByteSlice(compressionType uint32) []byte {
}

// for legacy reference still around, main paths unused - mostly for tests writing old versions
//noinspection GoUnusedFunction
// noinspection GoUnusedFunction
func writeRecordHeaderV1(writer *FileWriter, payloadSizeUncompressed uint64, payloadSizeCompressed uint64) (int, error) {
// 4 byte magic number, 8 byte uncompressed size, 8 bytes for compressed size = 20 bytes
bytes := make([]byte, RecordHeaderSizeBytes)
Expand Down Expand Up @@ -204,6 +208,22 @@ func (w *FileWriter) Size() uint64 {
return w.currentOffset
}

func (w *FileWriter) Seek(offset uint64) error {
if offset < w.headerOffset {
return fmt.Errorf("can't seek into the header range, supplied: %d header: %d", offset, w.headerOffset)
}
if offset > w.Size() {
return fmt.Errorf("can't seek past file size, supplied: %d header: %d", offset, w.Size())
}

newOffset, err := w.bufWriter.Seek(int64(offset), io.SeekStart)
if err != nil {
return err
}
w.currentOffset = uint64(newOffset)
return nil
}

// options

type FileWriterOptions struct {
Expand Down Expand Up @@ -301,7 +321,7 @@ func NewFileWriter(writerOptions ...FileWriterOption) (WriterI, error) {
}

// creates a new writer with the given os.File, with the desired compression
func newCompressedFileWriterWithFile(file *os.File, bufWriter WriteCloserFlusher, compType int, alignedBlockWrites bool) (WriterI, error) {
func newCompressedFileWriterWithFile(file *os.File, bufWriter WriteSeekerCloserFlusher, compType int, alignedBlockWrites bool) (WriterI, error) {
return &FileWriter{
file: file,
bufWriter: bufWriter,
Expand Down
75 changes: 75 additions & 0 deletions recordio/file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,75 @@ func TestWriterNotAllowsSyncsWithDirectIO(t *testing.T) {
require.ErrorIs(t, err, DirectIOSyncWriteErr)
}

func TestWriterSeekHappyPath(t *testing.T) {
writer := newOpenedWriter(t)
defer removeFileWriterFile(t, writer)

previousOffset := writer.Size()
offset, err := writer.Write([]byte{12, 13, 14, 15, 16})
assert.Equal(t, uint64(FileHeaderSizeBytes), offset)
assert.Equal(t, uint64(0x12), writer.Size())
require.Nil(t, err)

require.NoError(t, writer.Seek(previousOffset))

offset, err = writer.Write([]byte{1, 2, 3, 4, 5})
assert.Equal(t, uint64(FileHeaderSizeBytes), offset)
assert.Equal(t, uint64(0x12), writer.Size())
require.Nil(t, err)

require.NoError(t, writer.Close())
reader := newReaderOnTopOfWriter(t, writer)
defer closeFileReader(t, reader)

// this should only be exactly one record
next, err := reader.ReadNext()
require.NoError(t, err)
require.Equal(t, []byte{1, 2, 3, 4, 5}, next)

readNextExpectEOF(t, reader)
}

func TestWriterSeekOutOfBounds(t *testing.T) {
writer := newOpenedWriter(t)
defer removeFileWriterFile(t, writer)

_, err := writer.Write(ascendingBytes(5))
require.NoError(t, err)

require.Error(t, writer.Seek(0))
require.Error(t, writer.Seek(writer.headerOffset-1))
require.NoError(t, writer.Seek(writer.headerOffset))
require.Error(t, writer.Seek(writer.Size()+1))
require.NoError(t, writer.Seek(writer.Size()))
require.NoError(t, writer.Close())
}

func TestWriterSeekShorterReplacementWrite(t *testing.T) {
writer := newOpenedWriter(t)
defer removeFileWriterFile(t, writer)

o, err := writer.Write(ascendingBytes(5))
require.NoError(t, err)
require.NoError(t, writer.Seek(o))

// this should create a two byte suffix to the file that should not be readable as a record
_, err = writer.Write(ascendingBytes(3))
require.NoError(t, err)
require.NoError(t, writer.Close())

reader := newReaderOnTopOfWriter(t, writer)
defer func() {
require.NoError(t, reader.Close())
}()
readNextExpectAscendingBytesOfLen(t, reader, 3)

// TODO(thomas): can we wipe the remainder of the file?
// there's a more general concern about having short replacement writes within a record though
// maybe we need to leave a marker to skip until the next record / EOF
readNextExpectMagicNumberMismatch(t, reader)
}

func newUncompressedTestWriter() (*FileWriter, error) {
tmpFile, err := os.CreateTemp("", "recordio_UncompressedWriter")
if err != nil {
Expand Down Expand Up @@ -315,6 +384,12 @@ func readNextExpectEOF(t *testing.T, reader *FileReader) {
assert.Equal(t, io.EOF, errors.Unwrap(err))
}

func readNextExpectMagicNumberMismatch(t *testing.T, reader *FileReader) {
buf, err := reader.ReadNext()
require.Nil(t, buf)
require.ErrorIs(t, err, MagicNumberMismatchErr)
}

func newReaderOnTopOfWriter(t *testing.T, writer *FileWriter) *FileReader {
reader, err := NewFileReaderWithPath(writer.file.Name())
require.Nil(t, err)
Expand Down
6 changes: 5 additions & 1 deletion recordio/recordio.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ type WriterI interface {
Write(record []byte) (uint64, error)
// WriteSync appends a record of bytes and forces a disk sync, returns the current offset this item was written to
WriteSync(record []byte) (uint64, error)
// Seek will reset the current offset to the given offset. The offset is always
// denoted as a value from the start (origin) of the file at offset zero.
// An error will be returned when trying to seek into the file header or beyond the current size of the file.
Seek(offset uint64) error
}

type ReaderI interface {
Expand All @@ -85,7 +89,7 @@ type ReadAtI interface {

type ReaderWriterCloserFactory interface {
CreateNewReader(filePath string, bufSize int) (*os.File, ByteReaderResetCount, error)
CreateNewWriter(filePath string, bufSize int) (*os.File, WriteCloserFlusher, error)
CreateNewWriter(filePath string, bufSize int) (*os.File, WriteSeekerCloserFlusher, error)
}

// NewCompressorForType returns an instance of the desired compressor defined by its identifier.
Expand Down
5 changes: 4 additions & 1 deletion sstables/sstable_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,17 @@ func (writer *SSTableStreamWriter) WriteNext(key []byte, value []byte) error {
return fmt.Errorf("error while writing crc64 hash in '%s': %w", writer.opts.basePath, err)
}

preWriteOffset := writer.dataWriter.Size()
recordOffset, err := writer.dataWriter.Write(value)
if err != nil {
return fmt.Errorf("error writeNext data writer error in '%s': %w", writer.opts.basePath, err)
}

_, err = writer.indexWriter.Write(&sProto.IndexEntry{Key: key, ValueOffset: recordOffset, Checksum: crc.Sum64()})
if err != nil {
return fmt.Errorf("error writeNext index writer error in '%s': %w", writer.opts.basePath, err)
// in case of failures we need to try to rewind the data writer's offset to preWriteOffset
seekErr := writer.dataWriter.Seek(preWriteOffset)
return fmt.Errorf("error writeNext index writer/seeker error in '%s': %w", writer.opts.basePath, errors.Join(err, seekErr))
}

writer.metaData.NumRecords += 1
Expand Down
9 changes: 6 additions & 3 deletions sstables/sstable_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,10 @@ func TestFailedIndexAppend(t *testing.T) {
require.NoError(t, writer.WriteNext(intToByteSlice(44), intToByteSlice(45)))
require.NoError(t, writer.Close())

reader, _ := getFullScanIterator(t, writer.opts.basePath)
reader, it := getFullScanIterator(t, writer.opts.basePath)
defer closeReader(t, reader)

// TODO(thomas): this is failing, as the "optimized" iterator is not skipping partial records
// assertIteratorMatchesSlice(t, it, []int{42, 44})
assertIteratorMatchesSlice(t, it, []int{42, 44})
assertContentMatchesSlice(t, reader, []int{42, 44})
_, err = reader.Get(intToByteSlice(43))
require.Equal(t, NotFound, err)
Expand All @@ -179,6 +178,10 @@ func (f *failingRecordIoWriter) Size() uint64 {
return f.w.Size()
}

func (f *failingRecordIoWriter) Seek(o uint64) error {
return f.w.Seek(o)
}

func (f *failingRecordIoWriter) WriteSync(record []byte) (uint64, error) {
if f.failNext {
return 0, errors.New("failing record")
Expand Down
Loading