diff --git a/badger_db.go b/badger_db.go new file mode 100644 index 0000000..08f201f --- /dev/null +++ b/badger_db.go @@ -0,0 +1,295 @@ +//go:build badgerdb +// +build badgerdb + +package db + +import ( + "bytes" + "fmt" + "os" + "path/filepath" + + "github.com/dgraph-io/badger/v2" +) + +func init() { registerDBCreator(BadgerDBBackend, badgerDBCreator) } + +func badgerDBCreator(dbName, dir string) (DB, error) { + return NewBadgerDB(dbName, dir) +} + +// NewBadgerDB creates a Badger key-value store backed to the +// directory dir supplied. If dir does not exist, it will be created. +func NewBadgerDB(dbName, dir string) (*BadgerDB, error) { + // Since Badger doesn't support database names, we join both to obtain + // the final directory to use for the database. + path := filepath.Join(dir, dbName) + + if err := os.MkdirAll(path, 0755); err != nil { + return nil, err + } + opts := badger.DefaultOptions(path) + opts.SyncWrites = false // note that we have Sync methods + opts.Logger = nil // badger is too chatty by default + return NewBadgerDBWithOptions(opts) +} + +// NewBadgerDBWithOptions creates a BadgerDB key value store +// gives the flexibility of initializing a database with the +// respective options. +func NewBadgerDBWithOptions(opts badger.Options) (*BadgerDB, error) { + db, err := badger.Open(opts) + if err != nil { + return nil, err + } + return &BadgerDB{db: db}, nil +} + +type BadgerDB struct { + db *badger.DB +} + +var _ DB = (*BadgerDB)(nil) + +func (b *BadgerDB) Get(key []byte) ([]byte, error) { + if len(key) == 0 { + return nil, errKeyEmpty + } + var val []byte + err := b.db.View(func(txn *badger.Txn) error { + item, err := txn.Get(key) + if err == badger.ErrKeyNotFound { + return nil + } else if err != nil { + return err + } + val, err = item.ValueCopy(nil) + if err == nil && val == nil { + val = []byte{} + } + return err + }) + return val, err +} + +func (b *BadgerDB) Has(key []byte) (bool, error) { + if len(key) == 0 { + return false, errKeyEmpty + } + var found bool + err := b.db.View(func(txn *badger.Txn) error { + _, err := txn.Get(key) + if err != nil && err != badger.ErrKeyNotFound { + return err + } + found = (err != badger.ErrKeyNotFound) + return nil + }) + return found, err +} + +func (b *BadgerDB) Set(key, value []byte) error { + if len(key) == 0 { + return errKeyEmpty + } + if value == nil { + return errValueNil + } + return b.db.Update(func(txn *badger.Txn) error { + return txn.Set(key, value) + }) +} + +func withSync(db *badger.DB, err error) error { + if err != nil { + return err + } + return db.Sync() +} + +func (b *BadgerDB) SetSync(key, value []byte) error { + return withSync(b.db, b.Set(key, value)) +} + +func (b *BadgerDB) Delete(key []byte) error { + if len(key) == 0 { + return errKeyEmpty + } + return b.db.Update(func(txn *badger.Txn) error { + return txn.Delete(key) + }) +} + +func (b *BadgerDB) DeleteSync(key []byte) error { + return withSync(b.db, b.Delete(key)) +} + +func (b *BadgerDB) Close() error { + return b.db.Close() +} + +func (b *BadgerDB) Print() error { + return nil +} + +func (b *BadgerDB) iteratorOpts(start, end []byte, opts badger.IteratorOptions) (*badgerDBIterator, error) { + if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { + return nil, errKeyEmpty + } + txn := b.db.NewTransaction(false) + iter := txn.NewIterator(opts) + iter.Rewind() + iter.Seek(start) + if opts.Reverse && iter.Valid() && bytes.Equal(iter.Item().Key(), start) { + // If we're going in reverse, our starting point was "end", + // which is exclusive. + iter.Next() + } + return &badgerDBIterator{ + reverse: opts.Reverse, + start: start, + end: end, + + txn: txn, + iter: iter, + }, nil +} + +func (b *BadgerDB) Iterator(start, end []byte) (Iterator, error) { + opts := badger.DefaultIteratorOptions + return b.iteratorOpts(start, end, opts) +} + +func (b *BadgerDB) ReverseIterator(start, end []byte) (Iterator, error) { + opts := badger.DefaultIteratorOptions + opts.Reverse = true + return b.iteratorOpts(end, start, opts) +} + +func (b *BadgerDB) Stats() map[string]string { + return nil +} + +func (b *BadgerDB) NewBatch() Batch { + wb := &badgerDBBatch{ + db: b.db, + wb: b.db.NewWriteBatch(), + firstFlush: make(chan struct{}, 1), + } + wb.firstFlush <- struct{}{} + return wb +} + +var _ Batch = (*badgerDBBatch)(nil) + +type badgerDBBatch struct { + db *badger.DB + wb *badger.WriteBatch + + // Calling db.Flush twice panics, so we must keep track of whether we've + // flushed already on our own. If Write can receive from the firstFlush + // channel, then it's the first and only Flush call we should do. + // + // Upstream bug report: + // https://github.com/dgraph-io/badger/issues/1394 + firstFlush chan struct{} +} + +func (b *badgerDBBatch) Set(key, value []byte) error { + if len(key) == 0 { + return errKeyEmpty + } + if value == nil { + return errValueNil + } + return b.wb.Set(key, value) +} + +func (b *badgerDBBatch) Delete(key []byte) error { + if len(key) == 0 { + return errKeyEmpty + } + return b.wb.Delete(key) +} + +func (b *badgerDBBatch) Write() error { + select { + case <-b.firstFlush: + return b.wb.Flush() + default: + return fmt.Errorf("batch already flushed") + } +} + +func (b *badgerDBBatch) WriteSync() error { + return withSync(b.db, b.Write()) +} + +func (b *badgerDBBatch) Close() error { + select { + case <-b.firstFlush: // a Flush after Cancel panics too + default: + } + b.wb.Cancel() + return nil +} + +type badgerDBIterator struct { + reverse bool + start, end []byte + + txn *badger.Txn + iter *badger.Iterator + + lastErr error +} + +func (i *badgerDBIterator) Close() error { + i.iter.Close() + i.txn.Discard() + return nil +} + +func (i *badgerDBIterator) Domain() (start, end []byte) { return i.start, i.end } +func (i *badgerDBIterator) Error() error { return i.lastErr } + +func (i *badgerDBIterator) Next() { + if !i.Valid() { + panic("iterator is invalid") + } + i.iter.Next() +} + +func (i *badgerDBIterator) Valid() bool { + if !i.iter.Valid() { + return false + } + if len(i.end) > 0 { + key := i.iter.Item().Key() + if c := bytes.Compare(key, i.end); (!i.reverse && c >= 0) || (i.reverse && c < 0) { + // We're at the end key, or past the end. + return false + } + } + return true +} + +func (i *badgerDBIterator) Key() []byte { + if !i.Valid() { + panic("iterator is invalid") + } + // Note that we don't use KeyCopy, so this is only valid until the next + // call to Next. + return i.iter.Item().KeyCopy(nil) +} + +func (i *badgerDBIterator) Value() []byte { + if !i.Valid() { + panic("iterator is invalid") + } + val, err := i.iter.Item().ValueCopy(nil) + if err != nil { + i.lastErr = err + } + return val +} diff --git a/boltdb.go b/boltdb.go new file mode 100644 index 0000000..e452105 --- /dev/null +++ b/boltdb.go @@ -0,0 +1,207 @@ +//go:build boltdb +// +build boltdb + +package db + +import ( + "errors" + "fmt" + "os" + "path/filepath" + + "go.etcd.io/bbolt" +) + +var ( + bucket = []byte("tm") +) + +func init() { + registerDBCreator(BoltDBBackend, func(name, dir string) (DB, error) { + return NewBoltDB(name, dir) + }) +} + +// BoltDB is a wrapper around etcd's fork of bolt (https://github.com/etcd-io/bbolt). +// +// NOTE: All operations (including Set, Delete) are synchronous by default. One +// can globally turn it off by using NoSync config option (not recommended). +// +// A single bucket ([]byte("tm")) is used per a database instance. This could +// lead to performance issues when/if there will be lots of keys. +type BoltDB struct { + db *bbolt.DB +} + +var _ DB = (*BoltDB)(nil) + +// NewBoltDB returns a BoltDB with default options. +func NewBoltDB(name, dir string) (DB, error) { + return NewBoltDBWithOpts(name, dir, bbolt.DefaultOptions) +} + +// NewBoltDBWithOpts allows you to supply *bbolt.Options. ReadOnly: true is not +// supported because NewBoltDBWithOpts creates a global bucket. +func NewBoltDBWithOpts(name string, dir string, opts *bbolt.Options) (DB, error) { + if opts.ReadOnly { + return nil, errors.New("ReadOnly: true is not supported") + } + + dbPath := filepath.Join(dir, name+".db") + db, err := bbolt.Open(dbPath, os.ModePerm, opts) + if err != nil { + return nil, err + } + + // create a global bucket + err = db.Update(func(tx *bbolt.Tx) error { + _, err := tx.CreateBucketIfNotExists(bucket) + return err + }) + if err != nil { + return nil, err + } + + return &BoltDB{db: db}, nil +} + +// Get implements DB. +func (bdb *BoltDB) Get(key []byte) (value []byte, err error) { + if len(key) == 0 { + return nil, errKeyEmpty + } + err = bdb.db.View(func(tx *bbolt.Tx) error { + b := tx.Bucket(bucket) + if v := b.Get(key); v != nil { + value = append([]byte{}, v...) + } + return nil + }) + if err != nil { + return nil, err + } + return +} + +// Has implements DB. +func (bdb *BoltDB) Has(key []byte) (bool, error) { + bytes, err := bdb.Get(key) + if err != nil { + return false, err + } + return bytes != nil, nil +} + +// Set implements DB. +func (bdb *BoltDB) Set(key, value []byte) error { + if len(key) == 0 { + return errKeyEmpty + } + if value == nil { + return errValueNil + } + err := bdb.db.Update(func(tx *bbolt.Tx) error { + b := tx.Bucket(bucket) + return b.Put(key, value) + }) + if err != nil { + return err + } + return nil +} + +// SetSync implements DB. +func (bdb *BoltDB) SetSync(key, value []byte) error { + return bdb.Set(key, value) +} + +// Delete implements DB. +func (bdb *BoltDB) Delete(key []byte) error { + if len(key) == 0 { + return errKeyEmpty + } + err := bdb.db.Update(func(tx *bbolt.Tx) error { + return tx.Bucket(bucket).Delete(key) + }) + if err != nil { + return err + } + return nil +} + +// DeleteSync implements DB. +func (bdb *BoltDB) DeleteSync(key []byte) error { + return bdb.Delete(key) +} + +// Close implements DB. +func (bdb *BoltDB) Close() error { + return bdb.db.Close() +} + +// Print implements DB. +func (bdb *BoltDB) Print() error { + stats := bdb.db.Stats() + fmt.Printf("%v\n", stats) + + err := bdb.db.View(func(tx *bbolt.Tx) error { + tx.Bucket(bucket).ForEach(func(k, v []byte) error { + fmt.Printf("[%X]:\t[%X]\n", k, v) + return nil + }) + return nil + }) + if err != nil { + return err + } + return nil +} + +// Stats implements DB. +func (bdb *BoltDB) Stats() map[string]string { + stats := bdb.db.Stats() + m := make(map[string]string) + + // Freelist stats + m["FreePageN"] = fmt.Sprintf("%v", stats.FreePageN) + m["PendingPageN"] = fmt.Sprintf("%v", stats.PendingPageN) + m["FreeAlloc"] = fmt.Sprintf("%v", stats.FreeAlloc) + m["FreelistInuse"] = fmt.Sprintf("%v", stats.FreelistInuse) + + // Transaction stats + m["TxN"] = fmt.Sprintf("%v", stats.TxN) + m["OpenTxN"] = fmt.Sprintf("%v", stats.OpenTxN) + + return m +} + +// NewBatch implements DB. +func (bdb *BoltDB) NewBatch() Batch { + return newBoltDBBatch(bdb) +} + +// WARNING: Any concurrent writes or reads will block until the iterator is +// closed. +func (bdb *BoltDB) Iterator(start, end []byte) (Iterator, error) { + if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { + return nil, errKeyEmpty + } + tx, err := bdb.db.Begin(false) + if err != nil { + return nil, err + } + return newBoltDBIterator(tx, start, end, false), nil +} + +// WARNING: Any concurrent writes or reads will block until the iterator is +// closed. +func (bdb *BoltDB) ReverseIterator(start, end []byte) (Iterator, error) { + if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { + return nil, errKeyEmpty + } + tx, err := bdb.db.Begin(false) + if err != nil { + return nil, err + } + return newBoltDBIterator(tx, start, end, true), nil +} diff --git a/boltdb_batch.go b/boltdb_batch.go new file mode 100644 index 0000000..cd22c67 --- /dev/null +++ b/boltdb_batch.go @@ -0,0 +1,87 @@ +//go:build boltdb +// +build boltdb + +package db + +import "go.etcd.io/bbolt" + +// boltDBBatch stores operations internally and dumps them to BoltDB on Write(). +type boltDBBatch struct { + db *BoltDB + ops []operation +} + +var _ Batch = (*boltDBBatch)(nil) + +func newBoltDBBatch(db *BoltDB) *boltDBBatch { + return &boltDBBatch{ + db: db, + ops: []operation{}, + } +} + +// Set implements Batch. +func (b *boltDBBatch) Set(key, value []byte) error { + if len(key) == 0 { + return errKeyEmpty + } + if value == nil { + return errValueNil + } + if b.ops == nil { + return errBatchClosed + } + b.ops = append(b.ops, operation{opTypeSet, key, value}) + return nil +} + +// Delete implements Batch. +func (b *boltDBBatch) Delete(key []byte) error { + if len(key) == 0 { + return errKeyEmpty + } + if b.ops == nil { + return errBatchClosed + } + b.ops = append(b.ops, operation{opTypeDelete, key, nil}) + return nil +} + +// Write implements Batch. +func (b *boltDBBatch) Write() error { + if b.ops == nil { + return errBatchClosed + } + err := b.db.db.Batch(func(tx *bbolt.Tx) error { + bkt := tx.Bucket(bucket) + for _, op := range b.ops { + switch op.opType { + case opTypeSet: + if err := bkt.Put(op.key, op.value); err != nil { + return err + } + case opTypeDelete: + if err := bkt.Delete(op.key); err != nil { + return err + } + } + } + return nil + }) + if err != nil { + return err + } + // Make sure batch cannot be used afterwards. Callers should still call Close(), for errors. + return b.Close() +} + +// WriteSync implements Batch. +func (b *boltDBBatch) WriteSync() error { + return b.Write() +} + +// Close implements Batch. +func (b *boltDBBatch) Close() error { + b.ops = nil + return nil +} diff --git a/boltdb_iterator.go b/boltdb_iterator.go new file mode 100644 index 0000000..a62e2ab --- /dev/null +++ b/boltdb_iterator.go @@ -0,0 +1,142 @@ +//go:build boltdb +// +build boltdb + +package db + +import ( + "bytes" + + "go.etcd.io/bbolt" +) + +// boltDBIterator allows you to iterate on range of keys/values given some +// start / end keys (nil & nil will result in doing full scan). +type boltDBIterator struct { + tx *bbolt.Tx + + itr *bbolt.Cursor + start []byte + end []byte + + currentKey []byte + currentValue []byte + + isInvalid bool + isReverse bool +} + +var _ Iterator = (*boltDBIterator)(nil) + +// newBoltDBIterator creates a new boltDBIterator. +func newBoltDBIterator(tx *bbolt.Tx, start, end []byte, isReverse bool) *boltDBIterator { + itr := tx.Bucket(bucket).Cursor() + + var ck, cv []byte + if isReverse { + switch { + case end == nil: + ck, cv = itr.Last() + default: + _, _ = itr.Seek(end) // after key + ck, cv = itr.Prev() // return to end key + } + } else { + switch { + case start == nil: + ck, cv = itr.First() + default: + ck, cv = itr.Seek(start) + } + } + + return &boltDBIterator{ + tx: tx, + itr: itr, + start: start, + end: end, + currentKey: ck, + currentValue: cv, + isReverse: isReverse, + isInvalid: false, + } +} + +// Domain implements Iterator. +func (itr *boltDBIterator) Domain() ([]byte, []byte) { + return itr.start, itr.end +} + +// Valid implements Iterator. +func (itr *boltDBIterator) Valid() bool { + if itr.isInvalid { + return false + } + + if itr.Error() != nil { + itr.isInvalid = true + return false + } + + // iterated to the end of the cursor + if itr.currentKey == nil { + itr.isInvalid = true + return false + } + + if itr.isReverse { + if itr.start != nil && bytes.Compare(itr.currentKey, itr.start) < 0 { + itr.isInvalid = true + return false + } + } else { + if itr.end != nil && bytes.Compare(itr.end, itr.currentKey) <= 0 { + itr.isInvalid = true + return false + } + } + + // Valid + return true +} + +// Next implements Iterator. +func (itr *boltDBIterator) Next() { + itr.assertIsValid() + if itr.isReverse { + itr.currentKey, itr.currentValue = itr.itr.Prev() + } else { + itr.currentKey, itr.currentValue = itr.itr.Next() + } +} + +// Key implements Iterator. +func (itr *boltDBIterator) Key() []byte { + itr.assertIsValid() + return append([]byte{}, itr.currentKey...) +} + +// Value implements Iterator. +func (itr *boltDBIterator) Value() []byte { + itr.assertIsValid() + var value []byte + if itr.currentValue != nil { + value = append([]byte{}, itr.currentValue...) + } + return value +} + +// Error implements Iterator. +func (itr *boltDBIterator) Error() error { + return nil +} + +// Close implements Iterator. +func (itr *boltDBIterator) Close() error { + return itr.tx.Rollback() +} + +func (itr *boltDBIterator) assertIsValid() { + if !itr.Valid() { + panic("iterator is invalid") + } +} diff --git a/boltdb_test.go b/boltdb_test.go new file mode 100644 index 0000000..e68c85b --- /dev/null +++ b/boltdb_test.go @@ -0,0 +1,36 @@ +//go:build boltdb +// +build boltdb + +package db + +import ( + "fmt" + "os" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestBoltDBNewBoltDB(t *testing.T) { + name := fmt.Sprintf("test_%x", randStr(12)) + dir := os.TempDir() + defer cleanupDBDir(dir, name) + + db, err := NewBoltDB(name, dir) + require.NoError(t, err) + db.Close() +} + +func BenchmarkBoltDBRandomReadsWrites(b *testing.B) { + name := fmt.Sprintf("test_%x", randStr(12)) + db, err := NewBoltDB(name, "") + if err != nil { + b.Fatal(err) + } + defer func() { + db.Close() + cleanupDBDir("", name) + }() + + benchmarkRandomReadsWrites(b, db) +} diff --git a/cleveldb.go b/cleveldb.go new file mode 100644 index 0000000..3a42a31 --- /dev/null +++ b/cleveldb.go @@ -0,0 +1,196 @@ +//go:build cleveldb +// +build cleveldb + +package db + +import ( + "fmt" + "path/filepath" + + "github.com/jmhodges/levigo" +) + +func init() { + dbCreator := func(name string, dir string) (DB, error) { + return NewCLevelDB(name, dir) + } + registerDBCreator(CLevelDBBackend, dbCreator) +} + +// CLevelDB uses the C LevelDB database via a Go wrapper. +type CLevelDB struct { + db *levigo.DB + ro *levigo.ReadOptions + wo *levigo.WriteOptions + woSync *levigo.WriteOptions +} + +var _ DB = (*CLevelDB)(nil) + +// NewCLevelDB creates a new CLevelDB. +func NewCLevelDB(name string, dir string) (*CLevelDB, error) { + dbPath := filepath.Join(dir, name+".db") + + opts := levigo.NewOptions() + opts.SetCache(levigo.NewLRUCache(1 << 30)) + opts.SetCreateIfMissing(true) + db, err := levigo.Open(dbPath, opts) + if err != nil { + return nil, err + } + ro := levigo.NewReadOptions() + wo := levigo.NewWriteOptions() + woSync := levigo.NewWriteOptions() + woSync.SetSync(true) + database := &CLevelDB{ + db: db, + ro: ro, + wo: wo, + woSync: woSync, + } + return database, nil +} + +// Get implements DB. +func (db *CLevelDB) Get(key []byte) ([]byte, error) { + if len(key) == 0 { + return nil, errKeyEmpty + } + res, err := db.db.Get(db.ro, key) + if err != nil { + return nil, err + } + return res, nil +} + +// Has implements DB. +func (db *CLevelDB) Has(key []byte) (bool, error) { + bytes, err := db.Get(key) + if err != nil { + return false, err + } + return bytes != nil, nil +} + +// Set implements DB. +func (db *CLevelDB) Set(key []byte, value []byte) error { + if len(key) == 0 { + return errKeyEmpty + } + if value == nil { + return errValueNil + } + if err := db.db.Put(db.wo, key, value); err != nil { + return err + } + return nil +} + +// SetSync implements DB. +func (db *CLevelDB) SetSync(key []byte, value []byte) error { + if len(key) == 0 { + return errKeyEmpty + } + if value == nil { + return errValueNil + } + if err := db.db.Put(db.woSync, key, value); err != nil { + return err + } + return nil +} + +// Delete implements DB. +func (db *CLevelDB) Delete(key []byte) error { + if len(key) == 0 { + return errKeyEmpty + } + if err := db.db.Delete(db.wo, key); err != nil { + return err + } + return nil +} + +// DeleteSync implements DB. +func (db *CLevelDB) DeleteSync(key []byte) error { + if len(key) == 0 { + return errKeyEmpty + } + if err := db.db.Delete(db.woSync, key); err != nil { + return err + } + return nil +} + +// FIXME This should not be exposed +func (db *CLevelDB) DB() *levigo.DB { + return db.db +} + +// Close implements DB. +func (db *CLevelDB) Close() error { + db.db.Close() + db.ro.Close() + db.wo.Close() + db.woSync.Close() + return nil +} + +// Print implements DB. +func (db *CLevelDB) Print() error { + itr, err := db.Iterator(nil, nil) + if err != nil { + return err + } + defer itr.Close() + for ; itr.Valid(); itr.Next() { + key := itr.Key() + value := itr.Value() + fmt.Printf("[%X]:\t[%X]\n", key, value) + } + return nil +} + +// Stats implements DB. +func (db *CLevelDB) Stats() map[string]string { + keys := []string{ + "leveldb.aliveiters", + "leveldb.alivesnaps", + "leveldb.blockpool", + "leveldb.cachedblock", + "leveldb.num-files-at-level{n}", + "leveldb.openedtables", + "leveldb.sstables", + "leveldb.stats", + } + + stats := make(map[string]string, len(keys)) + for _, key := range keys { + str := db.db.PropertyValue(key) + stats[key] = str + } + return stats +} + +// NewBatch implements DB. +func (db *CLevelDB) NewBatch() Batch { + return newCLevelDBBatch(db) +} + +// Iterator implements DB. +func (db *CLevelDB) Iterator(start, end []byte) (Iterator, error) { + if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { + return nil, errKeyEmpty + } + itr := db.db.NewIterator(db.ro) + return newCLevelDBIterator(itr, start, end, false), nil +} + +// ReverseIterator implements DB. +func (db *CLevelDB) ReverseIterator(start, end []byte) (Iterator, error) { + if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { + return nil, errKeyEmpty + } + itr := db.db.NewIterator(db.ro) + return newCLevelDBIterator(itr, start, end, true), nil +} diff --git a/cleveldb_batch.go b/cleveldb_batch.go new file mode 100644 index 0000000..b77bd52 --- /dev/null +++ b/cleveldb_batch.go @@ -0,0 +1,82 @@ +//go:build cleveldb +// +build cleveldb + +package db + +import "github.com/jmhodges/levigo" + +// cLevelDBBatch is a LevelDB batch. +type cLevelDBBatch struct { + db *CLevelDB + batch *levigo.WriteBatch +} + +func newCLevelDBBatch(db *CLevelDB) *cLevelDBBatch { + return &cLevelDBBatch{ + db: db, + batch: levigo.NewWriteBatch(), + } +} + +// Set implements Batch. +func (b *cLevelDBBatch) Set(key, value []byte) error { + if len(key) == 0 { + return errKeyEmpty + } + if value == nil { + return errValueNil + } + if b.batch == nil { + return errBatchClosed + } + b.batch.Put(key, value) + return nil +} + +// Delete implements Batch. +func (b *cLevelDBBatch) Delete(key []byte) error { + if len(key) == 0 { + return errKeyEmpty + } + if b.batch == nil { + return errBatchClosed + } + b.batch.Delete(key) + return nil +} + +// Write implements Batch. +func (b *cLevelDBBatch) Write() error { + if b.batch == nil { + return errBatchClosed + } + err := b.db.db.Write(b.db.wo, b.batch) + if err != nil { + return err + } + // Make sure batch cannot be used afterwards. Callers should still call Close(), for errors. + return b.Close() +} + +// WriteSync implements Batch. +func (b *cLevelDBBatch) WriteSync() error { + if b.batch == nil { + return errBatchClosed + } + err := b.db.db.Write(b.db.woSync, b.batch) + if err != nil { + return err + } + // Make sure batch cannot be used afterwards. Callers should still call Close(), for errors. + b.Close() + return nil +} + +// Close implements Batch. +func (b *cLevelDBBatch) Close() error { + if b.batch != nil { + b.batch.Close() + b.batch = nil + } + return nil +} diff --git a/cleveldb_iterator.go b/cleveldb_iterator.go new file mode 100644 index 0000000..e56c35e --- /dev/null +++ b/cleveldb_iterator.go @@ -0,0 +1,135 @@ +//go:build cleveldb +// +build cleveldb + +package db + +import ( + "bytes" + + "github.com/jmhodges/levigo" +) + +// cLevelDBIterator is a cLevelDB iterator. +type cLevelDBIterator struct { + source *levigo.Iterator + start, end []byte + isReverse bool + isInvalid bool +} + +var _ Iterator = (*cLevelDBIterator)(nil) + +func newCLevelDBIterator(source *levigo.Iterator, start, end []byte, isReverse bool) *cLevelDBIterator { + if isReverse { + if end == nil || len(end) == 0 { + source.SeekToLast() + } else { + source.Seek(end) + if source.Valid() { + eoakey := source.Key() // end or after key + if bytes.Compare(end, eoakey) <= 0 { + source.Prev() + } + } else { + source.SeekToLast() + } + } + } else { + if start == nil || len(start) == 0 { + source.SeekToFirst() + } else { + source.Seek(start) + } + } + return &cLevelDBIterator{ + source: source, + start: start, + end: end, + isReverse: isReverse, + isInvalid: false, + } +} + +// Domain implements Iterator. +func (itr cLevelDBIterator) Domain() ([]byte, []byte) { + return itr.start, itr.end +} + +// Valid implements Iterator. +func (itr cLevelDBIterator) Valid() bool { + + // Once invalid, forever invalid. + if itr.isInvalid { + return false + } + + // If source errors, invalid. + if itr.source.GetError() != nil { + itr.isInvalid = true + return false + } + + // If source is invalid, invalid. + if !itr.source.Valid() { + itr.isInvalid = true + return false + } + + // If key is end or past it, invalid. + var start = itr.start + var end = itr.end + var key = itr.source.Key() + if itr.isReverse { + if start != nil && bytes.Compare(key, start) < 0 { + itr.isInvalid = true + return false + } + } else { + if end != nil && bytes.Compare(end, key) <= 0 { + itr.isInvalid = true + return false + } + } + + // It's valid. + return true +} + +// Key implements Iterator. +func (itr cLevelDBIterator) Key() []byte { + itr.assertIsValid() + return itr.source.Key() +} + +// Value implements Iterator. +func (itr cLevelDBIterator) Value() []byte { + itr.assertIsValid() + return itr.source.Value() +} + +// Next implements Iterator. +func (itr cLevelDBIterator) Next() { + itr.assertIsValid() + if itr.isReverse { + itr.source.Prev() + } else { + itr.source.Next() + } +} + +// Error implements Iterator. +func (itr cLevelDBIterator) Error() error { + return itr.source.GetError() +} + +// Close implements Iterator. +func (itr cLevelDBIterator) Close() error { + itr.source.Close() + return nil +} + +func (itr cLevelDBIterator) assertIsValid() { + if !itr.Valid() { + panic("iterator is invalid") + } +} diff --git a/cleveldb_test.go b/cleveldb_test.go new file mode 100644 index 0000000..42a1bd9 --- /dev/null +++ b/cleveldb_test.go @@ -0,0 +1,101 @@ +//go:build cleveldb +// +build cleveldb + +package db + +import ( + "bytes" + "fmt" + "math/rand" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func BenchmarkRandomReadsWrites2(b *testing.B) { + b.StopTimer() + + numItems := int64(1000000) + internal := map[int64]int64{} + for i := 0; i < int(numItems); i++ { + internal[int64(i)] = int64(0) + } + db, err := NewCLevelDB(fmt.Sprintf("test_%x", randStr(12)), "") + if err != nil { + b.Fatal(err.Error()) + return + } + + b.StartTimer() + + for i := 0; i < b.N; i++ { + // Write something + { + idx := (int64(rand.Int()) % numItems) + internal[idx]++ + val := internal[idx] + idxBytes := int642Bytes(int64(idx)) + valBytes := int642Bytes(int64(val)) + db.Set( + idxBytes, + valBytes, + ) + } + // Read something + { + idx := (int64(rand.Int()) % numItems) + val := internal[idx] + idxBytes := int642Bytes(int64(idx)) + valBytes, err := db.Get(idxBytes) + if err != nil { + b.Error(err) + } + if val == 0 { + if !bytes.Equal(valBytes, nil) { + b.Errorf("Expected %v for %v, got %X", + nil, idx, valBytes) + break + } + } else { + if len(valBytes) != 8 { + b.Errorf("Expected length 8 for %v, got %X", + idx, valBytes) + break + } + valGot := bytes2Int64(valBytes) + if val != valGot { + b.Errorf("Expected %v for %v, got %v", + val, idx, valGot) + break + } + } + } + } + + db.Close() +} + +func TestCLevelDBBackend(t *testing.T) { + name := fmt.Sprintf("test_%x", randStr(12)) + // Can't use "" (current directory) or "./" here because levigo.Open returns: + // "Error initializing DB: IO error: test_XXX.db: Invalid argument" + dir := os.TempDir() + db, err := NewDB(name, CLevelDBBackend, dir) + require.NoError(t, err) + defer cleanupDBDir(dir, name) + + _, ok := db.(*CLevelDB) + assert.True(t, ok) +} + +func TestCLevelDBStats(t *testing.T) { + name := fmt.Sprintf("test_%x", randStr(12)) + dir := os.TempDir() + db, err := NewDB(name, CLevelDBBackend, dir) + require.NoError(t, err) + defer cleanupDBDir(dir, name) + + assert.NotEmpty(t, db.Stats()) +} diff --git a/go.mod b/go.mod index 64662c1..44ca59f 100644 --- a/go.mod +++ b/go.mod @@ -3,15 +3,20 @@ module github.com/cometbft/cometbft-db go 1.21 require ( + github.com/dgraph-io/badger/v2 v2.2007.4 github.com/gogo/protobuf v1.3.2 github.com/google/btree v1.1.2 + github.com/jmhodges/levigo v1.0.0 + github.com/linxGnu/grocksdb v1.8.10 github.com/stretchr/testify v1.8.4 github.com/syndtr/goleveldb v1.0.1-0.20200815110645-5c35d600f0ca + go.etcd.io/bbolt v1.3.8 google.golang.org/grpc v1.60.1 ) require ( github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash v1.1.0 // indirect github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 // indirect github.com/getsentry/sentry-go v0.26.0 // indirect github.com/prometheus/client_golang v1.18.0 // indirect @@ -29,6 +34,9 @@ require ( github.com/cockroachdb/pebble v0.0.0-20240112000813-0effd2429fca github.com/cockroachdb/redact v1.1.5 // indirect github.com/davecgh/go-spew v1.1.1 // indirect + github.com/dgraph-io/ristretto v0.0.3 // indirect + github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/klauspost/compress v1.17.4 // indirect diff --git a/go.sum b/go.sum index 9b25e8b..6e2da10 100644 --- a/go.sum +++ b/go.sum @@ -1,7 +1,13 @@ +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/DataDog/zstd v1.5.5 h1:oWf5W7GtOLgp6bciQYDmhHHjdhYkALu6S/5Ni9ZgSvQ= github.com/DataDog/zstd v1.5.5/go.mod h1:g4AWEaM3yOg3HYfnJ3YIawPnVdXJh9QME85blwSAmyw= +github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE= +github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= +github.com/cespare/xxhash v1.1.0 h1:a6HrQnmkObjyL+Gs60czilIUGqrzKutQD6XZog3p+ko= +github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cockroachdb/datadriven v1.0.3-0.20230413201302-be42291fc80f h1:otljaYPt5hWxV3MUfO5dFPFiOXg9CyG5/kCfayTqsJ4= @@ -18,9 +24,25 @@ github.com/cockroachdb/redact v1.1.5 h1:u1PMllDkdFfPWaNGMyLD1+so+aq3uUItthCFqzwP github.com/cockroachdb/redact v1.1.5/go.mod h1:BVNblN9mBWFyMyqK1k3AAiSxhvhfK2oOZZ2lK+dpvRg= github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06 h1:zuQyyAKVxetITBuuhv3BI9cMrmStnpT18zmgmTxunpo= github.com/cockroachdb/tokenbucket v0.0.0-20230807174530-cc333fc44b06/go.mod h1:7nc4anLGjupUW/PeY5qiNYsdNXj7zopG+eqsS7To5IQ= +github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= +github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgraph-io/badger/v2 v2.2007.4 h1:TRWBQg8UrlUhaFdco01nO2uXwzKS7zd+HVdwV/GHc4o= +github.com/dgraph-io/badger/v2 v2.2007.4/go.mod h1:vSw/ax2qojzbN6eXHIx6KPKtCSHJN/Uz0X0VPruTIhk= +github.com/dgraph-io/ristretto v0.0.3-0.20200630154024-f66de99634de/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= +github.com/dgraph-io/ristretto v0.0.3 h1:jh22xisGBjrEVnRZ1DVTpBVQm0Xndu8sMl0CWDzSIBI= +github.com/dgraph-io/ristretto v0.0.3/go.mod h1:KPxhHT9ZxKefz+PCeOGsrHpl1qZ7i70dGTu2u+Ahh6E= +github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13 h1:fAjc9m62+UWV/WAFKLNi6ZS0675eEUC9y3AlwSbQu1Y= +github.com/dgryski/go-farm v0.0.0-20200201041132-a6ae2369ad13/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= @@ -31,6 +53,7 @@ github.com/go-errors/errors v1.4.2/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3Bop github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= @@ -41,6 +64,7 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU= @@ -51,15 +75,28 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/jmhodges/levigo v1.0.0 h1:q5EC36kV79HWeTBWsod3mG11EgStG3qArTKcvlksN1U= +github.com/jmhodges/levigo v1.0.0/go.mod h1:Q6Qx+uH3RAqyK4rFQroq9RL7mdkABMcfhEI+nNuzMJQ= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.12.3/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4= github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/linxGnu/grocksdb v1.8.10 h1:6FAhBThErRfJaevGOZISYvkG7RD4gfzeq452X4r8pes= +github.com/linxGnu/grocksdb v1.8.10/go.mod h1:xZCIb5Muw+nhbDK4Y5UJuOrin5MceOuiXkVUR7vp4WY= +github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= +github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -69,9 +106,11 @@ github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9k github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4= github.com/pingcap/errors v0.11.4/go.mod h1:Oi8TUi2kEtXXLMJk9l1cGmz20kV3TaQ0usTwv5KuLY8= github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -87,12 +126,30 @@ github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3c github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= +github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= +github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= +github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= +github.com/spf13/cobra v0.0.5/go.mod h1:3K3wKZymM7VvHMDS9+Akkh4K60UwM26emMESw8tLCHU= +github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= +github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/syndtr/goleveldb v1.0.1-0.20200815110645-5c35d600f0ca h1:Ld/zXl5t4+D69SiV4JoN7kkfvJdOWlPpfxrzxpLMoUk= github.com/syndtr/goleveldb v1.0.1-0.20200815110645-5c35d600f0ca/go.mod h1:u2MKkTVTVJWe5D1rCvame8WqhBd88EuIwODJZ1VHCPM= +github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= +github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.etcd.io/bbolt v1.3.8 h1:xs88BrvEv273UsB79e0hcVrlUWmS0a8upikMFhSyAtA= +go.etcd.io/bbolt v1.3.8/go.mod h1:N9Mkw9X8x5fupy0IKsmuqVtoGDyxsaDlbk4Rd05IAQw= +golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -116,8 +173,10 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ= golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -156,11 +215,13 @@ google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ google.golang.org/protobuf v1.32.0 h1:pPC6BG5ex8PDFnkbrGU3EixyhKcQ2aDuBS36lqK/C7I= google.golang.org/protobuf v1.32.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= diff --git a/rocksdb.go b/rocksdb.go new file mode 100644 index 0000000..d508b82 --- /dev/null +++ b/rocksdb.go @@ -0,0 +1,204 @@ +//go:build rocksdb +// +build rocksdb + +package db + +import ( + "fmt" + "path/filepath" + "runtime" + + "github.com/linxGnu/grocksdb" +) + +func init() { + dbCreator := func(name string, dir string) (DB, error) { + return NewRocksDB(name, dir) + } + registerDBCreator(RocksDBBackend, dbCreator) +} + +// RocksDB is a RocksDB backend. +type RocksDB struct { + db *grocksdb.DB + ro *grocksdb.ReadOptions + wo *grocksdb.WriteOptions + woSync *grocksdb.WriteOptions +} + +var _ DB = (*RocksDB)(nil) + +func NewRocksDB(name string, dir string) (*RocksDB, error) { + // default rocksdb option, good enough for most cases, including heavy workloads. + // 1GB table cache, 512MB write buffer(may use 50% more on heavy workloads). + // compression: snappy as default, need to -lsnappy to enable. + bbto := grocksdb.NewDefaultBlockBasedTableOptions() + bbto.SetBlockCache(grocksdb.NewLRUCache(1 << 30)) + bbto.SetFilterPolicy(grocksdb.NewBloomFilter(10)) + + opts := grocksdb.NewDefaultOptions() + opts.SetBlockBasedTableFactory(bbto) + opts.SetCreateIfMissing(true) + opts.IncreaseParallelism(runtime.NumCPU()) + // 1.5GB maximum memory use for writebuffer. + opts.OptimizeLevelStyleCompaction(512 * 1024 * 1024) + return NewRocksDBWithOptions(name, dir, opts) +} + +func NewRocksDBWithOptions(name string, dir string, opts *grocksdb.Options) (*RocksDB, error) { + dbPath := filepath.Join(dir, name+".db") + db, err := grocksdb.OpenDb(opts, dbPath) + if err != nil { + return nil, err + } + ro := grocksdb.NewDefaultReadOptions() + wo := grocksdb.NewDefaultWriteOptions() + woSync := grocksdb.NewDefaultWriteOptions() + woSync.SetSync(true) + return NewRocksDBWithRawDB(db, ro, wo, woSync), nil +} + +func NewRocksDBWithRawDB(db *grocksdb.DB, ro *grocksdb.ReadOptions, wo *grocksdb.WriteOptions, woSync *grocksdb.WriteOptions) *RocksDB { + return &RocksDB{ + db: db, + ro: ro, + wo: wo, + woSync: woSync, + } +} + +// Get implements DB. +func (db *RocksDB) Get(key []byte) ([]byte, error) { + if len(key) == 0 { + return nil, errKeyEmpty + } + res, err := db.db.Get(db.ro, key) + if err != nil { + return nil, err + } + return moveSliceToBytes(res), nil +} + +// Has implements DB. +func (db *RocksDB) Has(key []byte) (bool, error) { + bytes, err := db.Get(key) + if err != nil { + return false, err + } + return bytes != nil, nil +} + +// Set implements DB. +func (db *RocksDB) Set(key []byte, value []byte) error { + if len(key) == 0 { + return errKeyEmpty + } + if value == nil { + return errValueNil + } + err := db.db.Put(db.wo, key, value) + if err != nil { + return err + } + return nil +} + +// SetSync implements DB. +func (db *RocksDB) SetSync(key []byte, value []byte) error { + if len(key) == 0 { + return errKeyEmpty + } + if value == nil { + return errValueNil + } + err := db.db.Put(db.woSync, key, value) + if err != nil { + return err + } + return nil +} + +// Delete implements DB. +func (db *RocksDB) Delete(key []byte) error { + if len(key) == 0 { + return errKeyEmpty + } + err := db.db.Delete(db.wo, key) + if err != nil { + return err + } + return nil +} + +// DeleteSync implements DB. +func (db *RocksDB) DeleteSync(key []byte) error { + if len(key) == 0 { + return errKeyEmpty + } + err := db.db.Delete(db.woSync, key) + if err != nil { + return nil + } + return nil +} + +func (db *RocksDB) DB() *grocksdb.DB { + return db.db +} + +// Close implements DB. +func (db *RocksDB) Close() error { + db.ro.Destroy() + db.wo.Destroy() + db.woSync.Destroy() + db.db.Close() + return nil +} + +// Print implements DB. +func (db *RocksDB) Print() error { + itr, err := db.Iterator(nil, nil) + if err != nil { + return err + } + defer itr.Close() + for ; itr.Valid(); itr.Next() { + key := itr.Key() + value := itr.Value() + fmt.Printf("[%X]:\t[%X]\n", key, value) + } + return nil +} + +// Stats implements DB. +func (db *RocksDB) Stats() map[string]string { + keys := []string{"rocksdb.stats"} + stats := make(map[string]string, len(keys)) + for _, key := range keys { + stats[key] = db.db.GetProperty(key) + } + return stats +} + +// NewBatch implements DB. +func (db *RocksDB) NewBatch() Batch { + return newRocksDBBatch(db) +} + +// Iterator implements DB. +func (db *RocksDB) Iterator(start, end []byte) (Iterator, error) { + if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { + return nil, errKeyEmpty + } + itr := db.db.NewIterator(db.ro) + return newRocksDBIterator(itr, start, end, false), nil +} + +// ReverseIterator implements DB. +func (db *RocksDB) ReverseIterator(start, end []byte) (Iterator, error) { + if (start != nil && len(start) == 0) || (end != nil && len(end) == 0) { + return nil, errKeyEmpty + } + itr := db.db.NewIterator(db.ro) + return newRocksDBIterator(itr, start, end, true), nil +} diff --git a/rocksdb_batch.go b/rocksdb_batch.go new file mode 100644 index 0000000..6ebc8da --- /dev/null +++ b/rocksdb_batch.go @@ -0,0 +1,83 @@ +//go:build rocksdb +// +build rocksdb + +package db + +import "github.com/linxGnu/grocksdb" + +type rocksDBBatch struct { + db *RocksDB + batch *grocksdb.WriteBatch +} + +var _ Batch = (*rocksDBBatch)(nil) + +func newRocksDBBatch(db *RocksDB) *rocksDBBatch { + return &rocksDBBatch{ + db: db, + batch: grocksdb.NewWriteBatch(), + } +} + +// Set implements Batch. +func (b *rocksDBBatch) Set(key, value []byte) error { + if len(key) == 0 { + return errKeyEmpty + } + if value == nil { + return errValueNil + } + if b.batch == nil { + return errBatchClosed + } + b.batch.Put(key, value) + return nil +} + +// Delete implements Batch. +func (b *rocksDBBatch) Delete(key []byte) error { + if len(key) == 0 { + return errKeyEmpty + } + if b.batch == nil { + return errBatchClosed + } + b.batch.Delete(key) + return nil +} + +// Write implements Batch. +func (b *rocksDBBatch) Write() error { + if b.batch == nil { + return errBatchClosed + } + err := b.db.db.Write(b.db.wo, b.batch) + if err != nil { + return err + } + // Make sure batch cannot be used afterwards. Callers should still call Close(), for errors. + b.Close() + return nil +} + +// WriteSync implements Batch. +func (b *rocksDBBatch) WriteSync() error { + if b.batch == nil { + return errBatchClosed + } + err := b.db.db.Write(b.db.woSync, b.batch) + if err != nil { + return err + } + // Make sure batch cannot be used afterwards. Callers should still call Close(), for errors. + return b.Close() +} + +// Close implements Batch. +func (b *rocksDBBatch) Close() error { + if b.batch != nil { + b.batch.Destroy() + b.batch = nil + } + return nil +} diff --git a/rocksdb_iterator.go b/rocksdb_iterator.go new file mode 100644 index 0000000..495517c --- /dev/null +++ b/rocksdb_iterator.go @@ -0,0 +1,147 @@ +//go:build rocksdb +// +build rocksdb + +package db + +import ( + "bytes" + + "github.com/linxGnu/grocksdb" +) + +type rocksDBIterator struct { + source *grocksdb.Iterator + start, end []byte + isReverse bool + isInvalid bool +} + +var _ Iterator = (*rocksDBIterator)(nil) + +func newRocksDBIterator(source *grocksdb.Iterator, start, end []byte, isReverse bool) *rocksDBIterator { + if isReverse { + if end == nil { + source.SeekToLast() + } else { + source.Seek(end) + if source.Valid() { + eoakey := moveSliceToBytes(source.Key()) // end or after key + if bytes.Compare(end, eoakey) <= 0 { + source.Prev() + } + } else { + source.SeekToLast() + } + } + } else { + if start == nil { + source.SeekToFirst() + } else { + source.Seek(start) + } + } + return &rocksDBIterator{ + source: source, + start: start, + end: end, + isReverse: isReverse, + isInvalid: false, + } +} + +// Domain implements Iterator. +func (itr *rocksDBIterator) Domain() ([]byte, []byte) { + return itr.start, itr.end +} + +// Valid implements Iterator. +func (itr *rocksDBIterator) Valid() bool { + + // Once invalid, forever invalid. + if itr.isInvalid { + return false + } + + // If source has error, invalid. + if err := itr.source.Err(); err != nil { + itr.isInvalid = true + return false + } + + // If source is invalid, invalid. + if !itr.source.Valid() { + itr.isInvalid = true + return false + } + + // If key is end or past it, invalid. + var start = itr.start + var end = itr.end + var key = moveSliceToBytes(itr.source.Key()) + if itr.isReverse { + if start != nil && bytes.Compare(key, start) < 0 { + itr.isInvalid = true + return false + } + } else { + if end != nil && bytes.Compare(end, key) <= 0 { + itr.isInvalid = true + return false + } + } + + // It's valid. + return true +} + +// Key implements Iterator. +func (itr *rocksDBIterator) Key() []byte { + itr.assertIsValid() + return moveSliceToBytes(itr.source.Key()) +} + +// Value implements Iterator. +func (itr *rocksDBIterator) Value() []byte { + itr.assertIsValid() + return moveSliceToBytes(itr.source.Value()) +} + +// Next implements Iterator. +func (itr rocksDBIterator) Next() { + itr.assertIsValid() + if itr.isReverse { + itr.source.Prev() + } else { + itr.source.Next() + } +} + +// Error implements Iterator. +func (itr *rocksDBIterator) Error() error { + return itr.source.Err() +} + +// Close implements Iterator. +func (itr *rocksDBIterator) Close() error { + itr.source.Close() + return nil +} + +func (itr *rocksDBIterator) assertIsValid() { + if !itr.Valid() { + panic("iterator is invalid") + } +} + +// moveSliceToBytes will free the slice and copy out a go []byte +// This function can be applied on *Slice returned from Key() and Value() +// of an Iterator, because they are marked as freed. +func moveSliceToBytes(s *grocksdb.Slice) []byte { + defer s.Free() + if !s.Exists() { + return nil + } + v := make([]byte, len(s.Data())) + copy(v, s.Data()) + return v +} diff --git a/rocksdb_test.go b/rocksdb_test.go new file mode 100644 index 0000000..4eddbc5 --- /dev/null +++ b/rocksdb_test.go @@ -0,0 +1,36 @@ +//go:build rocksdb +// +build rocksdb + +package db + +import ( + "fmt" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRocksDBBackend(t *testing.T) { + name := fmt.Sprintf("test_%x", randStr(12)) + dir := os.TempDir() + db, err := NewDB(name, RocksDBBackend, dir) + require.NoError(t, err) + defer cleanupDBDir(dir, name) + + _, ok := db.(*RocksDB) + assert.True(t, ok) +} + +func TestRocksDBStats(t *testing.T) { + name := fmt.Sprintf("test_%x", randStr(12)) + dir := os.TempDir() + db, err := NewDB(name, RocksDBBackend, dir) + require.NoError(t, err) + defer cleanupDBDir(dir, name) + + assert.NotEmpty(t, db.Stats()) +} + +// TODO: Add tests for rocksdb