Skip to content

Commit

Permalink
add option for protocol reader
Browse files Browse the repository at this point in the history
  • Loading branch information
sebheitzmann committed Dec 14, 2024
1 parent e5a0823 commit 8e107ad
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 27 deletions.
9 changes: 5 additions & 4 deletions _examples/recordio.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package main

import (
"errors"
"github.com/thomasjungblut/go-sstables/_examples/proto"
"github.com/thomasjungblut/go-sstables/recordio"
rProto "github.com/thomasjungblut/go-sstables/recordio/proto"
"io"
"log"
"os"

"github.com/thomasjungblut/go-sstables/_examples/proto"
"github.com/thomasjungblut/go-sstables/recordio"
rProto "github.com/thomasjungblut/go-sstables/recordio/proto"
)

func main() {
Expand All @@ -21,7 +22,7 @@ func main() {
}

func simpleRead(path string) {
reader, err := rProto.NewProtoReaderWithPath(path)
reader, err := rProto.NewReader(rProto.ReaderPath(path))
if err != nil {
log.Fatalf("error: %v", err)
}
Expand Down
72 changes: 61 additions & 11 deletions recordio/proto/proto_reader.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package proto

import (
"errors"
"os"

"github.com/thomasjungblut/go-sstables/recordio"
"google.golang.org/protobuf/proto"
"os"
)

type Reader struct {
Expand Down Expand Up @@ -36,27 +38,75 @@ func (r *Reader) Close() error {
return r.reader.Close()
}

func NewProtoReaderWithPath(path string) (ReaderI, error) {
f, err := os.Open(path)
if err != nil {
return nil, err
// options

type ReaderOptions struct {
path string
file *os.File
bufSizeBytes int
}

type ReaderOption func(*ReaderOptions)

func ReaderPath(p string) ReaderOption {
return func(args *ReaderOptions) {
args.path = p
}
}

r, err := NewProtoReaderWithFile(f)
if err != nil {
return nil, err
func ReaderFile(p *os.File) ReaderOption {
return func(args *ReaderOptions) {
args.file = p
}
}

return r, nil
func ReadBufferSizeBytes(p int) ReaderOption {
return func(args *ReaderOptions) {
args.bufSizeBytes = p
}
}

func NewProtoReaderWithFile(file *os.File) (ReaderI, error) {
reader, err := recordio.NewFileReaderWithFile(file)
// create a new reader with the given options. Either Path or File must be supplied
func NewReader(readerOptions ...ReaderOption) (ReaderI, error) {
opts := &ReaderOptions{
path: "",
file: nil,
bufSizeBytes: 1024 * 1024 * 4,
}

for _, readerOption := range readerOptions {
readerOption(opts)
}

if (opts.file != nil) && (opts.path != "") {
return nil, errors.New("either os.File or string path must be supplied, never both")
}

if opts.file == nil {
if opts.path == "" {
return nil, errors.New("path was not supplied")
}
}
reader, err := recordio.NewFileReader(
recordio.ReaderPath(opts.path),
recordio.ReaderFile(opts.file),
recordio.ReaderBufferSizeBytes(opts.bufSizeBytes))
if err != nil {
return nil, err
}

return &Reader{
reader: reader,
}, nil

}

// Deprecated: use the NewProtoReader with options.
func NewProtoReaderWithPath(path string) (ReaderI, error) {
return NewReader(ReaderPath(path))
}

// Deprecated: use the NewProtoReader with options.
func NewProtoReaderWithFile(file *os.File) (ReaderI, error) {
return NewReader(ReaderFile(file))
}
9 changes: 5 additions & 4 deletions recordio/proto/recordio_proto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package proto

import (
"bufio"
"math/rand"
"os"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thomasjungblut/go-sstables/recordio"
"github.com/thomasjungblut/go-sstables/recordio/test_files"
"math/rand"
"os"
"testing"
)

const TestFile = "../test_files/berlin52.tsp"
Expand Down Expand Up @@ -63,7 +64,7 @@ func endToEndReadWriteProtobuf(writer WriterI, t *testing.T, tmpFile *os.File) {
require.NoError(t, writer.Close())
require.NoError(t, inFile.Close())

reader, err := NewProtoReaderWithPath(tmpFile.Name())
reader, err := NewReader(ReaderPath(tmpFile.Name()))
require.NoError(t, err)
require.NoError(t, reader.Open())

Expand Down
2 changes: 1 addition & 1 deletion simpledb/recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (db *DB) repairCompactions() error {
}

// try to read it, if it's corrupted we would also delete it
reader, err := rProto.NewProtoReaderWithPath(metaPath)
reader, err := rProto.NewReader(rProto.ReaderPath(metaPath))
if err != nil {
return err
}
Expand Down
15 changes: 8 additions & 7 deletions sstables/sstable_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@ package sstables
import (
"errors"
"fmt"
"hash/crc64"
"hash/fnv"
"io"
"os"
"path/filepath"

"github.com/steakknife/bloomfilter"
"github.com/thomasjungblut/go-sstables/recordio"
rProto "github.com/thomasjungblut/go-sstables/recordio/proto"
"github.com/thomasjungblut/go-sstables/skiplist"
"github.com/thomasjungblut/go-sstables/sstables/proto"
pb "google.golang.org/protobuf/proto"
"hash/crc64"
"hash/fnv"
"io"
"os"
"path/filepath"
)

var ChecksumErr = ChecksumError{}
Expand Down Expand Up @@ -117,7 +118,7 @@ func (reader *SSTableReader) getValueAtOffset(iVal indexVal, skipHashCheck bool)

func (reader *SSTableReader) Scan() (SSTableIteratorI, error) {
if reader.v0DataReader != nil {
dataReader, err := rProto.NewProtoReaderWithPath(filepath.Join(reader.opts.basePath, DataFileName))
dataReader, err := rProto.NewReader(rProto.ReaderPath(filepath.Join(reader.opts.basePath, DataFileName)))
if err != nil {
return nil, fmt.Errorf("error in sstable '%s' while creating a scanner: %w", reader.opts.basePath, err)
}
Expand Down Expand Up @@ -335,7 +336,7 @@ func NewSSTableReader(readerOptions ...ReadOption) (SSTableReaderI, error) {
}

func readIndex(indexPath string, keyComparator skiplist.Comparator[[]byte]) (indexMap skiplist.MapI[[]byte, indexVal], err error) {
reader, err := rProto.NewProtoReaderWithPath(indexPath)
reader, err := rProto.NewReader(rProto.ReaderPath(indexPath))
if err != nil {
return nil, fmt.Errorf("error while creating index reader of sstable in '%s': %w", indexPath, err)
}
Expand Down

0 comments on commit 8e107ad

Please sign in to comment.