Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Sharded DB #76

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 42 additions & 25 deletions ss/pebbledb/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,52 +66,69 @@
return b.batch.Commit(defaultWriteOpts)
}

// For writing kv pairs in any order of version
// RawBatch handles writing key-value pairs where versions and storeKeys can vary.
// It maintains separate batches for each storage (pebble.DB instance).
type RawBatch struct {
storage *pebble.DB
batch *pebble.Batch
batches map[*pebble.DB]*pebble.Batch // Map from storage to batch
}

func NewRawBatch(storage *pebble.DB) (*RawBatch, error) {
batch := storage.NewBatch()

func NewRawBatch() *RawBatch {
return &RawBatch{
storage: storage,
batch: batch,
}, nil
batches: make(map[*pebble.DB]*pebble.Batch),
}
}

func (b *RawBatch) Size() int {
return b.batch.Len()
func (rb *RawBatch) Size() int {
size := 0
for _, batch := range rb.batches {
size += batch.Len()
}
Comment on lines +83 to +85

Check warning

Code scanning / CodeQL

Iteration over map Warning

Iteration over map may be a possible source of non-determinism
return size
}

func (b *RawBatch) Reset() {
b.batch.Reset()
func (rb *RawBatch) Reset() {
for _, batch := range rb.batches {
batch.Reset()
}
Comment on lines +90 to +92

Check warning

Code scanning / CodeQL

Iteration over map Warning

Iteration over map may be a possible source of non-determinism
}

func (b *RawBatch) set(storeKey string, tombstone int64, key, value []byte, version int64) error {
func (rb *RawBatch) set(db *pebble.DB, storeKey string, tombstone int64, key, value []byte, version int64) error {
batch, ok := rb.batches[db]
if !ok {
batch = db.NewBatch()
rb.batches[db] = batch
}

prefixedKey := MVCCEncode(prependStoreKey(storeKey, key), version)
prefixedVal := MVCCEncode(value, tombstone)

if err := b.batch.Set(prefixedKey, prefixedVal, nil); err != nil {
if err := batch.Set(prefixedKey, prefixedVal, nil); err != nil {
return fmt.Errorf("failed to write PebbleDB batch: %w", err)
}

return nil
}

func (b *RawBatch) Set(storeKey string, key, value []byte, version int64) error {
return b.set(storeKey, 0, key, value, version)
func (rb *RawBatch) Set(db *pebble.DB, storeKey string, key, value []byte, version int64) error {
return rb.set(db, storeKey, 0, key, value, version)
}

func (b *RawBatch) Delete(storeKey string, key []byte, version int64) error {
return b.set(storeKey, version, key, []byte(tombstoneVal), version)
func (rb *RawBatch) Delete(db *pebble.DB, storeKey string, key []byte, version int64) error {
return rb.set(db, storeKey, version, key, []byte(tombstoneVal), version)
}

func (b *RawBatch) Write() (err error) {
defer func() {
err = errors.Join(err, b.batch.Close())
}()

return b.batch.Commit(defaultWriteOpts)
func (rb *RawBatch) Write() (err error) {
for _, batch := range rb.batches {
if batch.Count() > 0 {
if commitErr := batch.Commit(defaultWriteOpts); commitErr != nil {
err = errors.Join(err, commitErr)
}
}
if closeErr := batch.Close(); closeErr != nil {
err = errors.Join(err, closeErr)
}
}
Comment on lines +121 to +130

Check warning

Code scanning / CodeQL

Iteration over map Warning

Iteration over map may be a possible source of non-determinism
// Reset the batches after writing
rb.batches = make(map[*pebble.DB]*pebble.Batch)
return err
}
Loading
Loading