From 27d970e8cccfcfdb4666e7921fc1bb3d98738923 Mon Sep 17 00:00:00 2001 From: thomasjungblut Date: Wed, 31 Jul 2024 20:26:03 +0200 Subject: [PATCH 1/3] fix partial sstable writes In some failure scenarios we might have inconsistent data and index files. This adds seeking support to recordio writers, which can be used to rewind partially written data records. --- recordio/buffered_io.go | 2 +- recordio/bufio_vendor.go | 26 +++++++++++++++++++++----- recordio/bufio_vendor_test.go | 23 +++++++++++++++++++++++ recordio/direct_io.go | 2 +- recordio/file_writer.go | 16 +++++++++++++--- recordio/file_writer_test.go | 29 +++++++++++++++++++++++++++++ recordio/recordio.go | 5 ++++- sstables/sstable_writer.go | 5 ++++- sstables/sstable_writer_test.go | 9 ++++++--- 9 files changed, 102 insertions(+), 15 deletions(-) diff --git a/recordio/buffered_io.go b/recordio/buffered_io.go index 865ca2d..ef54988 100644 --- a/recordio/buffered_io.go +++ b/recordio/buffered_io.go @@ -17,7 +17,7 @@ func (d BufferedIOFactory) CreateNewReader(filePath string, bufSize int) (*os.Fi return readFile, NewCountingByteReader(NewReaderBuf(readFile, block)), nil } -func (d BufferedIOFactory) CreateNewWriter(filePath string, bufSize int) (*os.File, WriteCloserFlusher, error) { +func (d BufferedIOFactory) CreateNewWriter(filePath string, bufSize int) (*os.File, WriteSeekerCloserFlusher, error) { writeFile, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, 0666) if err != nil { return nil, nil, err diff --git a/recordio/bufio_vendor.go b/recordio/bufio_vendor.go index 530925e..b6c91aa 100644 --- a/recordio/bufio_vendor.go +++ b/recordio/bufio_vendor.go @@ -5,8 +5,14 @@ import ( "io" ) -type WriteCloserFlusher interface { - io.WriteCloser +type WriteSeekerCloser interface { + io.Writer + io.Seeker + io.Closer +} + +type WriteSeekerCloserFlusher interface { + WriteSeekerCloser Flush() error Size() int } @@ -19,15 +25,25 @@ type WriteCloserFlusher interface { // the underlying io.Writer. // This is the same writer as bufio.Writer, but it allows us to supply the buffer from the outside. // Namely, it has a new constructor in NewWriterBuf & NewAlignedWriterBuf, implements Close() and supports block aligned flushes. +// Later the interface of the writer added io.Seeker, which the writer now fully implements as well. // Additionally, several methods that were not needed are removed to reduce the test surface of the original. type Writer struct { err error buf []byte n int - wr io.WriteCloser + wr WriteSeekerCloser alignFlush bool } +func (b *Writer) Seek(offset int64, whence int) (int64, error) { + err := b.Flush() + if err != nil { + return 0, err + } + + return b.wr.Seek(offset, whence) +} + func (b *Writer) Close() error { err := b.Flush() if err != nil { @@ -36,14 +52,14 @@ func (b *Writer) Close() error { return b.wr.Close() } -func NewWriterBuf(w io.WriteCloser, buf []byte) WriteCloserFlusher { +func NewWriterBuf(w WriteSeekerCloser, buf []byte) WriteSeekerCloserFlusher { return &Writer{ buf: buf, wr: w, } } -func NewAlignedWriterBuf(w io.WriteCloser, buf []byte) WriteCloserFlusher { +func NewAlignedWriterBuf(w WriteSeekerCloser, buf []byte) WriteSeekerCloserFlusher { return &Writer{ buf: buf, wr: w, diff --git a/recordio/bufio_vendor_test.go b/recordio/bufio_vendor_test.go index 13f6dda..5185497 100644 --- a/recordio/bufio_vendor_test.go +++ b/recordio/bufio_vendor_test.go @@ -3,6 +3,7 @@ package recordio import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "io" "testing" ) @@ -15,6 +16,10 @@ func (w *closingWriter) Write(p []byte) (n int, err error) { return len(p), nil } +func (w *closingWriter) Seek(offset int64, whence int) (int64, error) { + return offset, nil +} + func (*closingWriter) Close() error { return nil } @@ -45,6 +50,24 @@ func TestCreateNewBufferCloseFlushes(t *testing.T) { assert.Equal(t, []byte{13, 6, 91, 22, 0, 0}, sink.buf) } +func TestSeekFlushes(t *testing.T) { + sink := &closingWriter{make([]byte, 6)} + wBuf := NewWriterBuf(sink, make([]byte, 4)) + assert.Equal(t, 4, wBuf.Size()) + + _, err := wBuf.Write([]byte{13, 6, 91, 22}) + require.NoError(t, err) + // buffer should not been flushed so far + assert.Equal(t, []byte{0, 0, 0, 0, 0, 0}, sink.buf) + + _, err = wBuf.Seek(0, io.SeekStart) + require.NoError(t, err) + assert.Equal(t, []byte{13, 6, 91, 22, 0, 0}, sink.buf) + + require.NoError(t, wBuf.Close()) + assert.Equal(t, []byte{13, 6, 91, 22, 0, 0}, sink.buf) +} + func TestCreateNewBufferWithAlignedSlice(t *testing.T) { sink := &closingWriter{make([]byte, 8)} wBuf := NewAlignedWriterBuf(sink, make([]byte, 4)) diff --git a/recordio/direct_io.go b/recordio/direct_io.go index ac6503a..78fc79a 100644 --- a/recordio/direct_io.go +++ b/recordio/direct_io.go @@ -21,7 +21,7 @@ func (d DirectIOFactory) CreateNewReader(filePath string, bufSize int) (*os.File return readFile, NewCountingByteReader(NewReaderBuf(readFile, block)), nil } -func (d DirectIOFactory) CreateNewWriter(filePath string, bufSize int) (*os.File, WriteCloserFlusher, error) { +func (d DirectIOFactory) CreateNewWriter(filePath string, bufSize int) (*os.File, WriteSeekerCloserFlusher, error) { writeFile, err := directio.OpenFile(filePath, os.O_WRONLY|os.O_CREATE, 0666) if err != nil { return nil, nil, err diff --git a/recordio/file_writer.go b/recordio/file_writer.go index 0cba66b..8ef1a14 100644 --- a/recordio/file_writer.go +++ b/recordio/file_writer.go @@ -4,6 +4,7 @@ import ( "encoding/binary" "errors" "fmt" + "io" "os" pool "github.com/libp2p/go-buffer-pool" @@ -23,7 +24,7 @@ type FileWriter struct { closed bool file *os.File - bufWriter WriteCloserFlusher + bufWriter WriteSeekerCloserFlusher currentOffset uint64 compressionType int compressor compressor.CompressionI @@ -88,7 +89,7 @@ func fileHeaderAsByteSlice(compressionType uint32) []byte { } // for legacy reference still around, main paths unused - mostly for tests writing old versions -//noinspection GoUnusedFunction +// noinspection GoUnusedFunction func writeRecordHeaderV1(writer *FileWriter, payloadSizeUncompressed uint64, payloadSizeCompressed uint64) (int, error) { // 4 byte magic number, 8 byte uncompressed size, 8 bytes for compressed size = 20 bytes bytes := make([]byte, RecordHeaderSizeBytes) @@ -204,6 +205,15 @@ func (w *FileWriter) Size() uint64 { return w.currentOffset } +func (w *FileWriter) Seek(offset uint64) error { + newOffset, err := w.bufWriter.Seek(int64(offset), io.SeekStart) + if err != nil { + return err + } + w.currentOffset = uint64(newOffset) + return nil +} + // options type FileWriterOptions struct { @@ -301,7 +311,7 @@ func NewFileWriter(writerOptions ...FileWriterOption) (WriterI, error) { } // creates a new writer with the given os.File, with the desired compression -func newCompressedFileWriterWithFile(file *os.File, bufWriter WriteCloserFlusher, compType int, alignedBlockWrites bool) (WriterI, error) { +func newCompressedFileWriterWithFile(file *os.File, bufWriter WriteSeekerCloserFlusher, compType int, alignedBlockWrites bool) (WriterI, error) { return &FileWriter{ file: file, bufWriter: bufWriter, diff --git a/recordio/file_writer_test.go b/recordio/file_writer_test.go index 3c2feba..117c572 100644 --- a/recordio/file_writer_test.go +++ b/recordio/file_writer_test.go @@ -221,6 +221,35 @@ func TestWriterNotAllowsSyncsWithDirectIO(t *testing.T) { require.ErrorIs(t, err, DirectIOSyncWriteErr) } +func TestWriterSeek(t *testing.T) { + writer := newOpenedWriter(t) + defer removeFileWriterFile(t, writer) + + previousOffset := writer.Size() + offset, err := writer.Write([]byte{12, 13, 14, 15, 16}) + assert.Equal(t, uint64(FileHeaderSizeBytes), offset) + assert.Equal(t, uint64(0x12), writer.Size()) + require.Nil(t, err) + + require.NoError(t, writer.Seek(previousOffset)) + + offset, err = writer.Write([]byte{1, 2, 3, 4, 5}) + assert.Equal(t, uint64(FileHeaderSizeBytes), offset) + assert.Equal(t, uint64(0x12), writer.Size()) + require.Nil(t, err) + + require.NoError(t, writer.Close()) + reader := newReaderOnTopOfWriter(t, writer) + defer closeFileReader(t, reader) + + // this should only be exactly one record + next, err := reader.ReadNext() + require.NoError(t, err) + require.Equal(t, []byte{1, 2, 3, 4, 5}, next) + + readNextExpectEOF(t, reader) +} + func newUncompressedTestWriter() (*FileWriter, error) { tmpFile, err := os.CreateTemp("", "recordio_UncompressedWriter") if err != nil { diff --git a/recordio/recordio.go b/recordio/recordio.go index 39edbae..350919d 100644 --- a/recordio/recordio.go +++ b/recordio/recordio.go @@ -65,6 +65,9 @@ type WriterI interface { Write(record []byte) (uint64, error) // WriteSync appends a record of bytes and forces a disk sync, returns the current offset this item was written to WriteSync(record []byte) (uint64, error) + // Seek will reset the current offset to the given offset. The offset is always + // denoted as a value from the start (origin) of the file at offset zero. + Seek(offset uint64) error } type ReaderI interface { @@ -85,7 +88,7 @@ type ReadAtI interface { type ReaderWriterCloserFactory interface { CreateNewReader(filePath string, bufSize int) (*os.File, ByteReaderResetCount, error) - CreateNewWriter(filePath string, bufSize int) (*os.File, WriteCloserFlusher, error) + CreateNewWriter(filePath string, bufSize int) (*os.File, WriteSeekerCloserFlusher, error) } // NewCompressorForType returns an instance of the desired compressor defined by its identifier. diff --git a/sstables/sstable_writer.go b/sstables/sstable_writer.go index 325712c..11e830d 100644 --- a/sstables/sstable_writer.go +++ b/sstables/sstable_writer.go @@ -123,6 +123,7 @@ func (writer *SSTableStreamWriter) WriteNext(key []byte, value []byte) error { return fmt.Errorf("error while writing crc64 hash in '%s': %w", writer.opts.basePath, err) } + preWriteOffset := writer.dataWriter.Size() recordOffset, err := writer.dataWriter.Write(value) if err != nil { return fmt.Errorf("error writeNext data writer error in '%s': %w", writer.opts.basePath, err) @@ -130,7 +131,9 @@ func (writer *SSTableStreamWriter) WriteNext(key []byte, value []byte) error { _, err = writer.indexWriter.Write(&sProto.IndexEntry{Key: key, ValueOffset: recordOffset, Checksum: crc.Sum64()}) if err != nil { - return fmt.Errorf("error writeNext index writer error in '%s': %w", writer.opts.basePath, err) + // in case of failures we need to try to rewind the data writer's offset to preWriteOffset + seekErr := writer.dataWriter.Seek(preWriteOffset) + return fmt.Errorf("error writeNext index writer/seeker error in '%s': %w", writer.opts.basePath, errors.Join(err, seekErr)) } writer.metaData.NumRecords += 1 diff --git a/sstables/sstable_writer_test.go b/sstables/sstable_writer_test.go index fd010fa..a8beed0 100644 --- a/sstables/sstable_writer_test.go +++ b/sstables/sstable_writer_test.go @@ -152,11 +152,10 @@ func TestFailedIndexAppend(t *testing.T) { require.NoError(t, writer.WriteNext(intToByteSlice(44), intToByteSlice(45))) require.NoError(t, writer.Close()) - reader, _ := getFullScanIterator(t, writer.opts.basePath) + reader, it := getFullScanIterator(t, writer.opts.basePath) defer closeReader(t, reader) - // TODO(thomas): this is failing, as the "optimized" iterator is not skipping partial records - // assertIteratorMatchesSlice(t, it, []int{42, 44}) + assertIteratorMatchesSlice(t, it, []int{42, 44}) assertContentMatchesSlice(t, reader, []int{42, 44}) _, err = reader.Get(intToByteSlice(43)) require.Equal(t, NotFound, err) @@ -179,6 +178,10 @@ func (f *failingRecordIoWriter) Size() uint64 { return f.w.Size() } +func (f *failingRecordIoWriter) Seek(o uint64) error { + return f.w.Seek(o) +} + func (f *failingRecordIoWriter) WriteSync(record []byte) (uint64, error) { if f.failNext { return 0, errors.New("failing record") From 4f7b476b453398e2718b42c41e725be2cb2b43b9 Mon Sep 17 00:00:00 2001 From: thomasjungblut Date: Thu, 1 Aug 2024 21:23:10 +0200 Subject: [PATCH 2/3] tests --- recordio/file_writer.go | 16 ++++++++++--- recordio/file_writer_test.go | 45 +++++++++++++++++++++++++++++++++++- recordio/recordio.go | 1 + 3 files changed, 58 insertions(+), 4 deletions(-) diff --git a/recordio/file_writer.go b/recordio/file_writer.go index 8ef1a14..e404b3b 100644 --- a/recordio/file_writer.go +++ b/recordio/file_writer.go @@ -23,9 +23,11 @@ type FileWriter struct { open bool closed bool - file *os.File - bufWriter WriteSeekerCloserFlusher - currentOffset uint64 + file *os.File + bufWriter WriteSeekerCloserFlusher + currentOffset uint64 + headerOffset uint64 + compressionType int compressor compressor.CompressionI recordHeaderCache []byte @@ -55,6 +57,7 @@ func (w *FileWriter) Open() error { } w.currentOffset = uint64(offset) + w.headerOffset = w.currentOffset w.open = true w.recordHeaderCache = make([]byte, RecordHeaderV2MaxSizeBytes) w.bufferPool = new(pool.BufferPool) @@ -206,6 +209,13 @@ func (w *FileWriter) Size() uint64 { } func (w *FileWriter) Seek(offset uint64) error { + if offset < w.headerOffset { + return fmt.Errorf("can't seek into the header range, supplied: %d header: %d", offset, w.headerOffset) + } + if offset > w.Size() { + return fmt.Errorf("can't seek past file size, supplied: %d header: %d", offset, w.Size()) + } + newOffset, err := w.bufWriter.Seek(int64(offset), io.SeekStart) if err != nil { return err diff --git a/recordio/file_writer_test.go b/recordio/file_writer_test.go index 117c572..0acfd75 100644 --- a/recordio/file_writer_test.go +++ b/recordio/file_writer_test.go @@ -221,7 +221,7 @@ func TestWriterNotAllowsSyncsWithDirectIO(t *testing.T) { require.ErrorIs(t, err, DirectIOSyncWriteErr) } -func TestWriterSeek(t *testing.T) { +func TestWriterSeekHappyPath(t *testing.T) { writer := newOpenedWriter(t) defer removeFileWriterFile(t, writer) @@ -250,6 +250,43 @@ func TestWriterSeek(t *testing.T) { readNextExpectEOF(t, reader) } +func TestWriterSeekOutOfBounds(t *testing.T) { + writer := newOpenedWriter(t) + defer removeFileWriterFile(t, writer) + + _, err := writer.Write(ascendingBytes(5)) + require.NoError(t, err) + + require.Error(t, writer.Seek(0)) + require.Error(t, writer.Seek(writer.headerOffset-1)) + require.NoError(t, writer.Seek(writer.headerOffset)) + require.Error(t, writer.Seek(writer.Size()+1)) + require.NoError(t, writer.Seek(writer.Size())) + require.NoError(t, writer.Close()) +} + +func TestWriterSeekShorterReplacementWrite(t *testing.T) { + writer := newOpenedWriter(t) + defer removeFileWriterFile(t, writer) + + o, err := writer.Write(ascendingBytes(5)) + require.NoError(t, err) + require.NoError(t, writer.Seek(o)) + + // this should create a two byte suffix to the file that should not be readable as a record + _, err = writer.Write(ascendingBytes(3)) + require.NoError(t, err) + require.NoError(t, writer.Close()) + + reader := newReaderOnTopOfWriter(t, writer) + defer func() { + require.NoError(t, reader.Close()) + }() + readNextExpectAscendingBytesOfLen(t, reader, 3) + + readNextExpectMagicNumberMismatch(t, reader) +} + func newUncompressedTestWriter() (*FileWriter, error) { tmpFile, err := os.CreateTemp("", "recordio_UncompressedWriter") if err != nil { @@ -344,6 +381,12 @@ func readNextExpectEOF(t *testing.T, reader *FileReader) { assert.Equal(t, io.EOF, errors.Unwrap(err)) } +func readNextExpectMagicNumberMismatch(t *testing.T, reader *FileReader) { + buf, err := reader.ReadNext() + require.Nil(t, buf) + require.ErrorIs(t, err, MagicNumberMismatchErr) +} + func newReaderOnTopOfWriter(t *testing.T, writer *FileWriter) *FileReader { reader, err := NewFileReaderWithPath(writer.file.Name()) require.Nil(t, err) diff --git a/recordio/recordio.go b/recordio/recordio.go index 350919d..0f9d8df 100644 --- a/recordio/recordio.go +++ b/recordio/recordio.go @@ -67,6 +67,7 @@ type WriterI interface { WriteSync(record []byte) (uint64, error) // Seek will reset the current offset to the given offset. The offset is always // denoted as a value from the start (origin) of the file at offset zero. + // An error will be returned when trying to seek into the file header or beyond the current size of the file. Seek(offset uint64) error } From 20ca00eccc60aaf570cfbbd273a23b8152891b24 Mon Sep 17 00:00:00 2001 From: thomasjungblut Date: Thu, 1 Aug 2024 21:25:09 +0200 Subject: [PATCH 3/3] todo --- recordio/file_writer_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/recordio/file_writer_test.go b/recordio/file_writer_test.go index 0acfd75..c88d6d6 100644 --- a/recordio/file_writer_test.go +++ b/recordio/file_writer_test.go @@ -284,6 +284,9 @@ func TestWriterSeekShorterReplacementWrite(t *testing.T) { }() readNextExpectAscendingBytesOfLen(t, reader, 3) + // TODO(thomas): can we wipe the remainder of the file? + // there's a more general concern about having short replacement writes within a record though + // maybe we need to leave a marker to skip until the next record / EOF readNextExpectMagicNumberMismatch(t, reader) }