From 741ab83fd15174564705edc27fe9bf0e74b0ebe6 Mon Sep 17 00:00:00 2001 From: Aman Mangal Date: Tue, 14 Feb 2023 21:21:41 +0530 Subject: [PATCH] opt(stream): add option to directly copy over tables from lower levels (#1700) Also takes a bug fix from PR #1712, commit 58d0674 This PR adds FullCopy option in Stream. This allows sending the table entirely to the writer. If this option is set to true we directly copy over the tables from the last 2 levels. This option increases the stream speed while also lowering the memory consumption on the DB that is streaming the KVs. For 71GB, compressed and encrypted DB we observed 3x improvement in speed. The DB contained ~65GB in the last 2 levels while remaining in the above levels. To use this option, the following options should be set in Stream. stream.KeyToList = nil stream.ChooseKey = nil stream.SinceTs = 0 db.managedTxns = true If we use stream writer for receiving the KVs, the encryption mode has to be the same in sender and receiver. This will restrict db.StreamDB() to use the same encryption mode in both input and output DB. Added TODO for allowing different encryption modes. --- badger/cmd/stream.go | 1 + db.go | 1 + iterator.go | 20 ++-- key_registry.go | 86 ++++++++------- level_handler.go | 36 ++++++- levels.go | 68 ++++++++++-- levels_test.go | 107 +++++++++++++++++- manifest.go | 9 +- pb/badgerpb4.pb.go | 154 ++++++++++++++++++-------- pb/badgerpb4.proto | 7 ++ stream.go | 246 ++++++++++++++++++++++++++++++++++++------ stream_writer.go | 81 ++++++++++++-- stream_writer_test.go | 23 ++++ table/table.go | 42 +++++++- 14 files changed, 723 insertions(+), 158 deletions(-) diff --git a/badger/cmd/stream.go b/badger/cmd/stream.go index 98f1efd0c..45846d733 100644 --- a/badger/cmd/stream.go +++ b/badger/cmd/stream.go @@ -118,6 +118,7 @@ func stream(cmd *cobra.Command, args []string) error { WithValueDir(so.outDir). WithNumVersionsToKeep(so.numVersions). WithCompression(options.CompressionType(so.compressionType)). + WithEncryptionKey(encKey). WithReadOnly(false) err = inDB.StreamDB(outOpt) diff --git a/db.go b/db.go index fee0481c8..2a4d57b84 100644 --- a/db.go +++ b/db.go @@ -1971,6 +1971,7 @@ func (db *DB) StreamDB(outOptions Options) error { // Stream contents of DB to the output DB. stream := db.NewStreamAt(math.MaxUint64) stream.LogPrefix = fmt.Sprintf("Streaming DB to new DB at %s", outDir) + stream.FullCopy = true stream.Send = func(buf *z.Buffer) error { return writer.Write(buf) diff --git a/iterator.go b/iterator.go index c56259881..d23517af4 100644 --- a/iterator.go +++ b/iterator.go @@ -366,17 +366,17 @@ func (opt *IteratorOptions) pickTable(t table.TableInterface) bool { // that the tables are sorted in the right order. func (opt *IteratorOptions) pickTables(all []*table.Table) []*table.Table { filterTables := func(tables []*table.Table) []*table.Table { - if opt.SinceTs > 0 { - tmp := tables[:0] - for _, t := range tables { - if t.MaxVersion() < opt.SinceTs { - continue - } - tmp = append(tmp, t) + if opt.SinceTs == 0 { + return tables + } + out := tables[:0] + for _, t := range tables { + if t.MaxVersion() < opt.SinceTs { + continue } - tables = tmp + out = append(out, t) } - return tables + return out } if len(opt.Prefix) == 0 { @@ -489,7 +489,7 @@ func (txn *Txn) NewIterator(opt IteratorOptions) *Iterator { for i := 0; i < len(tables); i++ { iters = append(iters, tables[i].sl.NewUniIterator(opt.Reverse)) } - iters = txn.db.lc.appendIterators(iters, &opt) // This will increment references. + iters = append(iters, txn.db.lc.iterators(&opt)...) // This will increment references. res := &Iterator{ txn: txn, iitr: table.NewMergeIterator(iters, opt.Reverse), diff --git a/key_registry.go b/key_registry.go index a1be0435d..1568e2705 100644 --- a/key_registry.go +++ b/key_registry.go @@ -28,6 +28,8 @@ import ( "sync" "time" + "github.com/pkg/errors" + "github.com/dgraph-io/badger/v4/pb" "github.com/dgraph-io/badger/v4/y" ) @@ -264,7 +266,7 @@ func WriteKeyRegistry(reg *KeyRegistry, opt KeyRegistryOptions) error { // Write all the datakeys to the buf. for _, k := range reg.dataKeys { // Writing the datakey to the given buffer. - if err := storeDataKey(buf, opt.EncryptionKey, k); err != nil { + if err := storeDataKey(buf, opt.EncryptionKey, *k); err != nil { return y.Wrapf(err, "Error while storing datakey in WriteKeyRegistry") } } @@ -338,8 +340,7 @@ func (kr *KeyRegistry) LatestDataKey() (*pb.DataKey, error) { defer kr.Unlock() // Key might have generated by another go routine. So, // checking once again. - key, valid = validKey() - if valid { + if key, valid := validKey(); valid { return key, nil } k := make([]byte, len(kr.opt.EncryptionKey)) @@ -347,35 +348,50 @@ func (kr *KeyRegistry) LatestDataKey() (*pb.DataKey, error) { if err != nil { return nil, err } - _, err = rand.Read(k) - if err != nil { + + if _, err := rand.Read(k); err != nil { return nil, err } // Otherwise Increment the KeyID and generate new datakey. kr.nextKeyID++ - dk := &pb.DataKey{ + dk := pb.DataKey{ KeyId: kr.nextKeyID, Data: k, CreatedAt: time.Now().Unix(), Iv: iv, } + kr.lastCreated = dk.CreatedAt + kr.dataKeys[kr.nextKeyID] = &dk // Don't store the datakey on file if badger is running in InMemory mode. - if !kr.opt.InMemory { - // Store the datekey. - buf := &bytes.Buffer{} - if err = storeDataKey(buf, kr.opt.EncryptionKey, dk); err != nil { - return nil, err - } - // Persist the datakey to the disk - if _, err = kr.fp.Write(buf.Bytes()); err != nil { - return nil, err - } + if kr.opt.InMemory { + return &dk, nil + } - // storeDatakey encrypts the datakey So, placing un-encrypted key in the memory. - dk.Data = k - kr.lastCreated = dk.CreatedAt - kr.dataKeys[kr.nextKeyID] = dk - return dk, nil + // Store the datekey. + if err = storeDataKey(kr.fp, kr.opt.EncryptionKey, dk); err != nil { + return nil, err + } + return &dk, nil +} + +func (kr *KeyRegistry) AddKey(dk pb.DataKey) (uint64, error) { + // If we don't have a encryption key, we cannot store the datakey. + if len(kr.opt.EncryptionKey) == 0 { + return 0, errors.New("No encryption key found. Cannot add data key") + } + + if _, ok := kr.dataKeys[dk.KeyId]; !ok { + // If KeyId does not exists already, then use the next available KeyId to store data key. + kr.nextKeyID++ + dk.KeyId = kr.nextKeyID + } + kr.dataKeys[dk.KeyId] = &dk + + if kr.opt.InMemory { + return dk.KeyId, nil + } + // Store the datakey. + return dk.KeyId, storeDataKey(kr.fp, kr.opt.EncryptionKey, dk) } // Close closes the key registry. @@ -387,7 +403,8 @@ func (kr *KeyRegistry) Close() error { } // storeDataKey stores datakey in an encrypted format in the given buffer. If storage key preset. -func storeDataKey(buf *bytes.Buffer, storageKey []byte, k *pb.DataKey) error { +// DO NOT use a pointer for key. storeDataKey modifies the kv.Data field. +func storeDataKey(w io.Writer, storageKey []byte, key pb.DataKey) error { // xor will encrypt the IV and xor with the given data. // It'll used for both encryption and decryption. xor := func() error { @@ -395,30 +412,21 @@ func storeDataKey(buf *bytes.Buffer, storageKey []byte, k *pb.DataKey) error { return nil } var err error - k.Data, err = y.XORBlockAllocate(k.Data, storageKey, k.Iv) + key.Data, err = y.XORBlockAllocate(key.Data, storageKey, key.Iv) return err } // In memory datakey will be plain text so encrypting before storing to the disk. - var err error - if err = xor(); err != nil { + if err := xor(); err != nil { return y.Wrapf(err, "Error while encrypting datakey in storeDataKey") } - var data []byte - if data, err = k.Marshal(); err != nil { - err = y.Wrapf(err, "Error while marshaling datakey in storeDataKey") - var err2 error - // decrypting the datakey back. - if err2 = xor(); err2 != nil { - return y.Wrapf(err, - y.Wrapf(err2, "Error while decrypting datakey in storeDataKey").Error()) - } - return err + data, err := key.Marshal() + if err != nil { + return y.Wrapf(err, "Error while marshaling datakey in storeDataKey") } var lenCrcBuf [8]byte binary.BigEndian.PutUint32(lenCrcBuf[0:4], uint32(len(data))) binary.BigEndian.PutUint32(lenCrcBuf[4:8], crc32.Checksum(data, y.CastagnoliCrcTable)) - y.Check2(buf.Write(lenCrcBuf[:])) - y.Check2(buf.Write(data)) - // Decrypting the datakey back since we're using the pointer. - return xor() + y.Check2(w.Write(lenCrcBuf[:])) + y.Check2(w.Write(data)) + return nil } diff --git a/level_handler.go b/level_handler.go index 31673f15b..6a8e95084 100644 --- a/level_handler.go +++ b/level_handler.go @@ -304,9 +304,9 @@ func (s *levelHandler) get(key []byte) (y.ValueStruct, error) { return maxVs, decr() } -// appendIterators appends iterators to an array of iterators, for merging. +// iterators returns an array of iterators, for merging. // Note: This obtains references for the table handlers. Remember to close these iterators. -func (s *levelHandler) appendIterators(iters []y.Iterator, opt *IteratorOptions) []y.Iterator { +func (s *levelHandler) iterators(opt *IteratorOptions) []y.Iterator { s.RLock() defer s.RUnlock() @@ -324,14 +324,40 @@ func (s *levelHandler) appendIterators(iters []y.Iterator, opt *IteratorOptions) out = append(out, t) } } - return appendIteratorsReversed(iters, out, topt) + return iteratorsReversed(out, topt) } tables := opt.pickTables(s.tables) if len(tables) == 0 { - return iters + return nil } - return append(iters, table.NewConcatIterator(tables, topt)) + return []y.Iterator{table.NewConcatIterator(tables, topt)} +} + +func (s *levelHandler) getTables(opt *IteratorOptions) []*table.Table { + if opt.Reverse { + panic("Invalid option for getTables") + } + + s.RLock() + defer s.RUnlock() + + if s.level == 0 { + var out []*table.Table + for _, t := range s.tables { + if opt.pickTable(t) { + t.IncrRef() + out = append(out, t) + } + } + return out + } + + tables := opt.pickTables(s.tables) + for _, t := range tables { + t.IncrRef() + } + return tables } type levelHandlerRLocked struct{} diff --git a/levels.go b/levels.go index 387ee7e3e..8319ce048 100644 --- a/levels.go +++ b/levels.go @@ -33,6 +33,7 @@ import ( "github.com/pkg/errors" otrace "go.opencensus.io/trace" + "github.com/dgraph-io/badger/v4/options" "github.com/dgraph-io/badger/v4/pb" "github.com/dgraph-io/badger/v4/table" "github.com/dgraph-io/badger/v4/y" @@ -895,7 +896,7 @@ func (s *levelsController) compactBuildTables( var iters []y.Iterator switch { case lev == 0: - iters = appendIteratorsReversed(iters, topTables, table.NOCACHE) + iters = append(iters, iteratorsReversed(topTables, table.NOCACHE)...) case len(topTables) > 0: y.AssertTrue(len(topTables) == 1) iters = []y.Iterator{topTables[0].NewIterator(table.NOCACHE)} @@ -1609,7 +1610,8 @@ func (s *levelsController) get(key []byte, maxVs y.ValueStruct, startLevel int) return maxVs, nil } -func appendIteratorsReversed(out []y.Iterator, th []*table.Table, opt int) []y.Iterator { +func iteratorsReversed(th []*table.Table, opt int) []y.Iterator { + out := make([]y.Iterator, 0, len(th)) for i := len(th) - 1; i >= 0; i-- { // This will increment the reference of the table handler. out = append(out, th[i].NewIterator(opt)) @@ -1617,16 +1619,25 @@ func appendIteratorsReversed(out []y.Iterator, th []*table.Table, opt int) []y.I return out } -// appendIterators appends iterators to an array of iterators, for merging. +// getTables return tables from all levels. It would call IncrRef on all returned tables. +func (s *levelsController) getTables(opt *IteratorOptions) [][]*table.Table { + res := make([][]*table.Table, 0, len(s.levels)) + for _, level := range s.levels { + res = append(res, level.getTables(opt)) + } + return res +} + +// iterators returns an array of iterators, for merging. // Note: This obtains references for the table handlers. Remember to close these iterators. -func (s *levelsController) appendIterators( - iters []y.Iterator, opt *IteratorOptions) []y.Iterator { +func (s *levelsController) iterators(opt *IteratorOptions) []y.Iterator { // Just like with get, it's important we iterate the levels from 0 on upward, to avoid missing // data when there's a compaction. + itrs := make([]y.Iterator, 0, len(s.levels)) for _, level := range s.levels { - iters = level.appendIterators(iters, opt) + itrs = append(itrs, level.iterators(opt)...) } - return iters + return itrs } // TableInfo represents the information about a table. @@ -1753,3 +1764,46 @@ func (s *levelsController) keySplits(numPerTable int, prefix []byte) []string { sort.Strings(splits) return splits } + +// AddTable builds the table from the KV.value options passed through the KV.Key. +func (lc *levelsController) AddTable( + kv *pb.KV, lev int, dk *pb.DataKey, change *pb.ManifestChange) error { + // TODO: Encryption / Decryption might be required for the table, if the sender and receiver + // don't have same encryption mode. See if inplace encryption/decryption can be done. + // Tables are sent in the sorted order, so no need to sort them here. + encrypted := len(lc.kv.opt.EncryptionKey) > 0 + y.AssertTrue((dk != nil && encrypted) || (dk == nil && !encrypted)) + // The keyId is zero if there is no encryption. + opts := buildTableOptions(lc.kv) + opts.Compression = options.CompressionType(change.Compression) + opts.DataKey = dk + + fileID := lc.reserveFileID() + fname := table.NewFilename(fileID, lc.kv.opt.Dir) + + // kv.Value is owned by the z.buffer. Ensure that we copy this buffer. + var tbl *table.Table + var err error + if lc.kv.opt.InMemory { + if tbl, err = table.OpenInMemoryTable(y.Copy(kv.Value), fileID, &opts); err != nil { + return errors.Wrap(err, "while creating in-memory table from buffer") + } + } else { + if tbl, err = table.CreateTableFromBuffer(fname, kv.Value, opts); err != nil { + return errors.Wrap(err, "while creating table from buffer") + } + } + + lc.levels[lev].addTable(tbl) + // Release the ref held by OpenTable. addTable would add a reference. + _ = tbl.DecrRef() + + change.Id = fileID + change.Level = uint32(lev) + if dk != nil { + change.KeyId = dk.KeyId + } + // We use the same data KeyId. So, change.KeyId remains the same. + y.AssertTrue(change.Op == pb.ManifestChange_CREATE) + return lc.kv.manifest.addChanges([]*pb.ManifestChange{change}) +} diff --git a/levels_test.go b/levels_test.go index 87ceaee4a..ec29802bc 100644 --- a/levels_test.go +++ b/levels_test.go @@ -18,6 +18,7 @@ package badger import ( "fmt" + "io/ioutil" "math" "math/rand" "os" @@ -40,7 +41,15 @@ func createAndOpen(db *DB, td []keyValVersion, level int) { BloomFalsePositive: db.opt.BloomFalsePositive, ChkMode: options.NoVerification, } - b := table.NewTableBuilder(opts) + createAndOpenWithOptions(db, td, level, &opts) +} + +func createAndOpenWithOptions(db *DB, td []keyValVersion, level int, opts *table.Options) { + if opts == nil { + bopts := buildTableOptions(db) + opts = &bopts + } + b := table.NewTableBuilder(*opts) defer b.Close() // Add all keys and versions to the table. @@ -49,13 +58,21 @@ func createAndOpen(db *DB, td []keyValVersion, level int) { val := y.ValueStruct{Value: []byte(item.val), Meta: item.meta} b.Add(key, val, 0) } - fname := table.NewFilename(db.lc.reserveFileID(), db.opt.Dir) - tab, err := table.CreateTable(fname, b) + fileID := db.lc.reserveFileID() + var tab *table.Table + var err error + if db.opt.InMemory { + data := b.Finish() + tab, err = table.OpenInMemoryTable(data, fileID, opts) + } else { + fname := table.NewFilename(fileID, db.opt.Dir) + tab, err = table.CreateTable(fname, b) + } if err != nil { panic(err) } if err := db.manifest.addChanges([]*pb.ManifestChange{ - newCreateChange(tab.ID(), level, 0, tab.CompressionType()), + newCreateChange(tab.ID(), level, tab.KeyID(), tab.CompressionType()), }); err != nil { panic(err) } @@ -1221,3 +1238,85 @@ func TestStaleDataCleanup(t *testing.T) { }) } + +func TestStreamWithFullCopy(t *testing.T) { + dbopts := DefaultOptions("") + dbopts.managedTxns = true + dbopts.MaxLevels = 7 + dbopts.NumVersionsToKeep = math.MaxInt32 + + encKey := make([]byte, 24) + _, err := rand.Read(encKey) + require.NoError(t, err) + + test := func(db *DB, outOpts Options) { + l4 := []keyValVersion{{"a", "1", 3, bitDelete}, {"d", "4", 3, 0}} + l5 := []keyValVersion{{"b", "2", 2, 0}} + l6 := []keyValVersion{{"a", "1", 2, 0}, {"c", "3", 1, 0}} + createAndOpenWithOptions(db, l4, 4, nil) + createAndOpenWithOptions(db, l5, 5, nil) + createAndOpenWithOptions(db, l6, 6, nil) + + if !outOpts.InMemory { + dir, err := ioutil.TempDir("", "badger-test") + require.NoError(t, err) + defer removeDir(dir) + outOpts.Dir = dir + outOpts.ValueDir = dir + } + + require.NoError(t, db.StreamDB(outOpts)) + out, err := Open(outOpts) + require.NoError(t, err) + defer func() { + require.NoError(t, out.Close()) + }() + err = out.View(func(txn *Txn) error { + // Key "a" should not be there because we deleted it at higher version. + _, err := txn.Get([]byte("a")) + require.Error(t, err) + require.Equal(t, err, ErrKeyNotFound) + _, err = txn.Get([]byte("b")) + require.NoError(t, err) + _, err = txn.Get([]byte("c")) + require.NoError(t, err) + _, err = txn.Get([]byte("d")) + require.NoError(t, err) + return nil + }) + require.NoError(t, err) + } + t.Run("without encryption", func(t *testing.T) { + opts := dbopts + runBadgerTest(t, &opts, func(t *testing.T, db *DB) { + test(db, opts) + }) + }) + t.Run("with encryption", func(t *testing.T) { + opts := dbopts + opts.IndexCacheSize = 1 << 20 + opts.BlockCacheSize = 1 << 20 + // Set it to zero so that we have more than one data keys. + opts.EncryptionKey = encKey + opts.EncryptionKeyRotationDuration = 0 + runBadgerTest(t, &opts, func(t *testing.T, db *DB) { + test(db, opts) + require.Greater(t, len(db.registry.dataKeys), 1) + }) + }) + t.Run("stream from in-memory to persistent", func(t *testing.T) { + opts := dbopts + opts.IndexCacheSize = 1 << 20 + opts.BlockCacheSize = 1 << 20 + opts.InMemory = true + // Set it to zero so that we have more than one data keys. + opts.EncryptionKey = encKey + opts.EncryptionKeyRotationDuration = 0 + runBadgerTest(t, &opts, func(t *testing.T, db *DB) { + outOpts := opts + outOpts.InMemory = false + test(db, outOpts) + require.Greater(t, len(db.registry.dataKeys), 1) + }) + }) +} diff --git a/manifest.go b/manifest.go index 274b98ce1..fdeeac537 100644 --- a/manifest.go +++ b/manifest.go @@ -128,7 +128,7 @@ func (m *Manifest) clone() Manifest { func openOrCreateManifestFile(opt Options) ( ret *manifestFile, result Manifest, err error) { if opt.InMemory { - return &manifestFile{inMemory: true}, Manifest{}, nil + return &manifestFile{inMemory: true, manifest: createManifest()}, Manifest{}, nil } return helpOpenOrCreateManifestFile(opt.Dir, opt.ReadOnly, opt.ExternalMagicVersion, manifestDeletionsRewriteThreshold) @@ -206,21 +206,20 @@ func (mf *manifestFile) close() error { // this depends on the filesystem -- some might append garbage data if a system crash happens at // the wrong time.) func (mf *manifestFile) addChanges(changesParam []*pb.ManifestChange) error { - if mf.inMemory { - return nil - } changes := pb.ManifestChangeSet{Changes: changesParam} buf, err := proto.Marshal(&changes) if err != nil { return err } - // Maybe we could use O_APPEND instead (on certain file systems) mf.appendLock.Lock() defer mf.appendLock.Unlock() if err := applyChangeSet(&mf.manifest, &changes); err != nil { return err } + if mf.inMemory { + return nil + } // Rewrite manifest if it'd shrink by 1/10 and it's big enough to care if mf.manifest.Deletions > mf.deletionsRewriteThreshold && mf.manifest.Deletions > manifestDeletionsRatio*(mf.manifest.Creations-mf.manifest.Deletions) { diff --git a/pb/badgerpb4.pb.go b/pb/badgerpb4.pb.go index 521f99293..ac94cfc4c 100644 --- a/pb/badgerpb4.pb.go +++ b/pb/badgerpb4.pb.go @@ -44,6 +44,34 @@ func (EncryptionAlgo) EnumDescriptor() ([]byte, []int) { return fileDescriptor_452c1d780baa15ef, []int{0} } +type KV_Kind int32 + +const ( + KV_KEY KV_Kind = 0 + KV_DATA_KEY KV_Kind = 1 + KV_FILE KV_Kind = 2 +) + +var KV_Kind_name = map[int32]string{ + 0: "KEY", + 1: "DATA_KEY", + 2: "FILE", +} + +var KV_Kind_value = map[string]int32{ + "KEY": 0, + "DATA_KEY": 1, + "FILE": 2, +} + +func (x KV_Kind) String() string { + return proto.EnumName(KV_Kind_name, int32(x)) +} + +func (KV_Kind) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_452c1d780baa15ef, []int{0, 0} +} + type ManifestChange_Operation int32 const ( @@ -104,7 +132,8 @@ type KV struct { // Stream id is used to identify which stream the KV came from. StreamId uint32 `protobuf:"varint,10,opt,name=stream_id,json=streamId,proto3" json:"stream_id,omitempty"` // Stream done is used to indicate end of stream. - StreamDone bool `protobuf:"varint,11,opt,name=stream_done,json=streamDone,proto3" json:"stream_done,omitempty"` + StreamDone bool `protobuf:"varint,11,opt,name=stream_done,json=streamDone,proto3" json:"stream_done,omitempty"` + Kind KV_Kind `protobuf:"varint,12,opt,name=kind,proto3,enum=badgerpb4.KV_Kind" json:"kind,omitempty"` } func (m *KV) Reset() { *m = KV{} } @@ -196,6 +225,13 @@ func (m *KV) GetStreamDone() bool { return false } +func (m *KV) GetKind() KV_Kind { + if m != nil { + return m.Kind + } + return KV_KEY +} + type KVList struct { Kv []*KV `protobuf:"bytes,1,rep,name=kv,proto3" json:"kv,omitempty"` // alloc_ref used internally for memory management. @@ -552,6 +588,7 @@ func (m *Match) GetIgnoreBytes() string { func init() { proto.RegisterEnum("badgerpb4.EncryptionAlgo", EncryptionAlgo_name, EncryptionAlgo_value) + proto.RegisterEnum("badgerpb4.KV_Kind", KV_Kind_name, KV_Kind_value) proto.RegisterEnum("badgerpb4.ManifestChange_Operation", ManifestChange_Operation_name, ManifestChange_Operation_value) proto.RegisterEnum("badgerpb4.Checksum_Algorithm", Checksum_Algorithm_name, Checksum_Algorithm_value) proto.RegisterType((*KV)(nil), "badgerpb4.KV") @@ -566,48 +603,52 @@ func init() { func init() { proto.RegisterFile("badgerpb4.proto", fileDescriptor_452c1d780baa15ef) } var fileDescriptor_452c1d780baa15ef = []byte{ - // 653 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x54, 0x4f, 0x6b, 0xdb, 0x4e, - 0x10, 0xf5, 0xca, 0xf2, 0xbf, 0x71, 0xe2, 0xf8, 0xb7, 0xfc, 0x5a, 0x14, 0x4a, 0x5c, 0x47, 0xa1, - 0x60, 0x0a, 0xb5, 0x69, 0x1c, 0x7a, 0xe9, 0xc9, 0xff, 0x20, 0xc6, 0x09, 0x81, 0x6d, 0x08, 0xa1, - 0x17, 0xb3, 0x96, 0xc6, 0xb6, 0xb0, 0x2d, 0x89, 0xd5, 0x5a, 0xc4, 0x1f, 0xa2, 0xd0, 0x8f, 0xd5, - 0x63, 0x0e, 0x3d, 0xf4, 0x58, 0x92, 0x2f, 0x52, 0x76, 0xa5, 0xb8, 0xf6, 0xa1, 0xb7, 0x99, 0x37, - 0xa3, 0x79, 0xa3, 0xf7, 0x46, 0x82, 0xa3, 0x09, 0x77, 0x67, 0x28, 0xc2, 0xc9, 0x45, 0x33, 0x14, - 0x81, 0x0c, 0x68, 0x69, 0x0b, 0xd8, 0x3f, 0x09, 0x18, 0xa3, 0x3b, 0x5a, 0x85, 0xec, 0x02, 0x37, - 0x16, 0xa9, 0x93, 0xc6, 0x01, 0x53, 0x21, 0xfd, 0x1f, 0x72, 0x31, 0x5f, 0xae, 0xd1, 0x32, 0x34, - 0x96, 0x24, 0xf4, 0x0d, 0x94, 0xd6, 0x11, 0x8a, 0xf1, 0x0a, 0x25, 0xb7, 0xb2, 0xba, 0x52, 0x54, - 0xc0, 0x35, 0x4a, 0x4e, 0x2d, 0x28, 0xc4, 0x28, 0x22, 0x2f, 0xf0, 0x2d, 0xb3, 0x4e, 0x1a, 0x26, - 0x7b, 0x49, 0xe9, 0x09, 0x00, 0x3e, 0x84, 0x9e, 0xc0, 0x68, 0xcc, 0xa5, 0x95, 0xd3, 0xc5, 0x52, - 0x8a, 0x74, 0x24, 0xa5, 0x60, 0xea, 0x81, 0x79, 0x3d, 0x50, 0xc7, 0x8a, 0x29, 0x92, 0x02, 0xf9, - 0x6a, 0xec, 0xb9, 0x16, 0xd4, 0x49, 0xe3, 0x90, 0x15, 0x13, 0x60, 0xe8, 0xd2, 0xb7, 0x50, 0x4e, - 0x8b, 0x6e, 0xe0, 0xa3, 0x55, 0xae, 0x93, 0x46, 0x91, 0x41, 0x02, 0xf5, 0x03, 0x1f, 0xed, 0x3e, - 0xe4, 0x47, 0x77, 0x57, 0x5e, 0x24, 0xe9, 0x09, 0x18, 0x8b, 0xd8, 0x22, 0xf5, 0x6c, 0xa3, 0x7c, - 0x7e, 0xd8, 0xfc, 0xab, 0xc4, 0xe8, 0x8e, 0x19, 0x8b, 0x58, 0xd1, 0xf0, 0xe5, 0x32, 0x70, 0xc6, - 0x02, 0xa7, 0x9a, 0xc6, 0x64, 0x45, 0x0d, 0x30, 0x9c, 0xda, 0x97, 0xf0, 0xdf, 0x35, 0xf7, 0xbd, - 0x29, 0x46, 0xb2, 0x37, 0xe7, 0xfe, 0x0c, 0xbf, 0xa0, 0xa4, 0x6d, 0x28, 0x38, 0x3a, 0x89, 0xd2, - 0xa9, 0xc7, 0x3b, 0x53, 0xf7, 0xdb, 0xd9, 0x4b, 0xa7, 0xfd, 0xcd, 0x80, 0xca, 0x7e, 0x8d, 0x56, - 0xc0, 0x18, 0xba, 0x5a, 0x71, 0x93, 0x19, 0x43, 0x97, 0xb6, 0xc1, 0xb8, 0x09, 0xb5, 0xda, 0x95, - 0xf3, 0xb3, 0x7f, 0x8e, 0x6c, 0xde, 0x84, 0x28, 0xb8, 0xf4, 0x02, 0x9f, 0x19, 0x37, 0xa1, 0x72, - 0xe9, 0x0a, 0x63, 0x5c, 0x6a, 0x2f, 0x0e, 0x59, 0x92, 0xd0, 0x57, 0x90, 0x5f, 0xe0, 0x46, 0x09, - 0x97, 0xf8, 0x90, 0x5b, 0xe0, 0x66, 0xe8, 0xd2, 0x2e, 0x1c, 0xa1, 0xef, 0x88, 0x4d, 0xa8, 0x1e, - 0x1f, 0xf3, 0xe5, 0x2c, 0xd0, 0x56, 0x54, 0xf6, 0xde, 0x60, 0xb0, 0xed, 0xe8, 0x2c, 0x67, 0x01, - 0xab, 0xe0, 0x5e, 0x4e, 0xeb, 0x50, 0x76, 0x82, 0x55, 0x28, 0x30, 0xd2, 0x3e, 0xe7, 0x35, 0xed, - 0x2e, 0x64, 0x9f, 0x41, 0x69, 0xbb, 0x23, 0x05, 0xc8, 0xf7, 0xd8, 0xa0, 0x73, 0x3b, 0xa8, 0x66, - 0x54, 0xdc, 0x1f, 0x5c, 0x0d, 0x6e, 0x07, 0x55, 0x62, 0xc7, 0x50, 0xec, 0xcd, 0xd1, 0x59, 0x44, - 0xeb, 0x15, 0xfd, 0x08, 0xa6, 0xde, 0x85, 0xe8, 0x5d, 0x4e, 0x76, 0x76, 0x79, 0x69, 0x69, 0x2a, - 0x6a, 0xe1, 0xc9, 0xf9, 0x8a, 0xe9, 0x56, 0x75, 0xae, 0xd1, 0x7a, 0xa5, 0xc5, 0x32, 0x99, 0x0a, - 0xed, 0x77, 0x50, 0xda, 0x36, 0x25, 0xac, 0xbd, 0xf6, 0x79, 0xaf, 0x9a, 0xa1, 0x07, 0x50, 0xbc, - 0xbf, 0xbf, 0xe4, 0xd1, 0xfc, 0xd3, 0x45, 0x95, 0xd8, 0x0e, 0x14, 0xfa, 0x5c, 0xf2, 0x11, 0x6e, - 0x76, 0x44, 0x22, 0xbb, 0x22, 0x51, 0x30, 0x5d, 0x2e, 0x79, 0x7a, 0xf6, 0x3a, 0x56, 0x56, 0x79, - 0x71, 0x7a, 0xee, 0x86, 0x17, 0xab, 0x73, 0x76, 0x04, 0x72, 0x89, 0xae, 0x3a, 0x67, 0xa5, 0x71, - 0x96, 0x95, 0x52, 0xa4, 0x23, 0xed, 0x2e, 0xe4, 0xae, 0xb9, 0x74, 0xe6, 0xf4, 0x35, 0xe4, 0x43, - 0x81, 0x53, 0xef, 0x21, 0xfd, 0xb0, 0xd2, 0x8c, 0x9e, 0xc2, 0x81, 0x37, 0xf3, 0x03, 0x81, 0xe3, - 0xc9, 0x46, 0x62, 0xa4, 0xb9, 0x4a, 0xac, 0x9c, 0x60, 0x5d, 0x05, 0xbd, 0x3f, 0x86, 0xca, 0xbe, - 0x13, 0xb4, 0x00, 0x59, 0x8e, 0x51, 0x35, 0xd3, 0xfd, 0xfc, 0xe3, 0xa9, 0x46, 0x1e, 0x9f, 0x6a, - 0xe4, 0xf7, 0x53, 0x8d, 0x7c, 0x7f, 0xae, 0x65, 0x1e, 0x9f, 0x6b, 0x99, 0x5f, 0xcf, 0xb5, 0xcc, - 0xd7, 0xd3, 0x99, 0x27, 0xe7, 0xeb, 0x49, 0xd3, 0x09, 0x56, 0x2d, 0x77, 0x26, 0x78, 0x38, 0xff, - 0xe0, 0x05, 0xad, 0x44, 0xcf, 0x56, 0x7c, 0xd1, 0x0a, 0x27, 0x93, 0xbc, 0xfe, 0x03, 0xb4, 0xff, - 0x04, 0x00, 0x00, 0xff, 0xff, 0xec, 0x26, 0x3b, 0x76, 0x14, 0x04, 0x00, 0x00, + // 705 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x54, 0xcd, 0x6e, 0xe2, 0x48, + 0x10, 0xc6, 0xc6, 0xfc, 0x15, 0x84, 0xb0, 0xad, 0xdd, 0x95, 0xa3, 0x55, 0x58, 0xe2, 0x68, 0x77, + 0xd1, 0x4a, 0x0b, 0x5a, 0x88, 0xf6, 0xb2, 0x27, 0x7e, 0x3c, 0x0a, 0x82, 0x28, 0x52, 0x4f, 0x14, + 0x65, 0xe6, 0x82, 0x1a, 0xbb, 0x00, 0x0b, 0xb0, 0xad, 0x76, 0x63, 0x85, 0x87, 0x18, 0x69, 0x5e, + 0x62, 0xde, 0x65, 0x8e, 0x39, 0xce, 0x71, 0x94, 0xbc, 0xc8, 0xa8, 0xdb, 0x0e, 0x03, 0x87, 0xb9, + 0xd5, 0xf7, 0x55, 0xb9, 0xaa, 0x5c, 0x5f, 0x55, 0xc3, 0xe9, 0x8c, 0xb9, 0x0b, 0xe4, 0xe1, 0xec, + 0xaa, 0x15, 0xf2, 0x40, 0x04, 0xa4, 0xb4, 0x27, 0xac, 0x4f, 0x3a, 0xe8, 0xe3, 0x7b, 0x52, 0x83, + 0xec, 0x0a, 0x77, 0xa6, 0xd6, 0xd0, 0x9a, 0x15, 0x2a, 0x4d, 0xf2, 0x33, 0xe4, 0x62, 0xb6, 0xde, + 0xa2, 0xa9, 0x2b, 0x2e, 0x01, 0xe4, 0x37, 0x28, 0x6d, 0x23, 0xe4, 0xd3, 0x0d, 0x0a, 0x66, 0x66, + 0x95, 0xa7, 0x28, 0x89, 0x1b, 0x14, 0x8c, 0x98, 0x50, 0x88, 0x91, 0x47, 0x5e, 0xe0, 0x9b, 0x46, + 0x43, 0x6b, 0x1a, 0xf4, 0x15, 0x92, 0x73, 0x00, 0x7c, 0x0c, 0x3d, 0x8e, 0xd1, 0x94, 0x09, 0x33, + 0xa7, 0x9c, 0xa5, 0x94, 0xe9, 0x09, 0x42, 0xc0, 0x50, 0x09, 0xf3, 0x2a, 0xa1, 0xb2, 0x65, 0xa5, + 0x48, 0x70, 0x64, 0x9b, 0xa9, 0xe7, 0x9a, 0xd0, 0xd0, 0x9a, 0x27, 0xb4, 0x98, 0x10, 0x23, 0x97, + 0xfc, 0x0e, 0xe5, 0xd4, 0xe9, 0x06, 0x3e, 0x9a, 0xe5, 0x86, 0xd6, 0x2c, 0x52, 0x48, 0xa8, 0x61, + 0xe0, 0x23, 0xf9, 0x13, 0x8c, 0x95, 0xe7, 0xbb, 0x66, 0xa5, 0xa1, 0x35, 0xab, 0x1d, 0xd2, 0xfa, + 0x3e, 0x81, 0xf1, 0x7d, 0x6b, 0xec, 0xf9, 0x2e, 0x55, 0x7e, 0xeb, 0x2f, 0x30, 0x24, 0x22, 0x05, + 0xc8, 0x8e, 0xed, 0x77, 0xb5, 0x0c, 0xa9, 0x40, 0x71, 0xd8, 0xbb, 0xeb, 0x4d, 0x25, 0xd2, 0x48, + 0x11, 0x8c, 0x37, 0xa3, 0x89, 0x5d, 0xd3, 0xad, 0x21, 0xe4, 0xc7, 0xf7, 0x13, 0x2f, 0x12, 0xe4, + 0x1c, 0xf4, 0x55, 0x6c, 0x6a, 0x8d, 0x6c, 0xb3, 0xdc, 0x39, 0x39, 0x4a, 0x4c, 0xf5, 0x55, 0x2c, + 0xfb, 0x66, 0xeb, 0x75, 0xe0, 0x4c, 0x39, 0xce, 0x55, 0xdf, 0x06, 0x2d, 0x2a, 0x82, 0xe2, 0xdc, + 0xba, 0x86, 0x9f, 0x6e, 0x98, 0xef, 0xcd, 0x31, 0x12, 0x83, 0x25, 0xf3, 0x17, 0xf8, 0x16, 0x05, + 0xe9, 0x42, 0xc1, 0x51, 0x20, 0x4a, 0xb3, 0x9e, 0x1d, 0x64, 0x3d, 0x0e, 0xa7, 0xaf, 0x91, 0xd6, + 0x07, 0x1d, 0xaa, 0xc7, 0x3e, 0x52, 0x05, 0x7d, 0xe4, 0x2a, 0x09, 0x0d, 0xaa, 0x8f, 0x5c, 0xd2, + 0x05, 0xfd, 0x36, 0x54, 0xf2, 0x55, 0x3b, 0x97, 0x3f, 0x4c, 0xd9, 0xba, 0x0d, 0x91, 0x33, 0xe1, + 0x05, 0x3e, 0xd5, 0x6f, 0x43, 0x29, 0xfb, 0x04, 0x63, 0x5c, 0x2b, 0x71, 0x4f, 0x68, 0x02, 0xc8, + 0x2f, 0x90, 0x5f, 0xe1, 0x4e, 0x2a, 0x91, 0x08, 0x9b, 0x5b, 0xe1, 0x6e, 0xe4, 0x92, 0x3e, 0x9c, + 0xa2, 0xef, 0xf0, 0x5d, 0x28, 0x3f, 0x9f, 0xb2, 0xf5, 0x22, 0x50, 0xda, 0x56, 0x8f, 0xfe, 0xc0, + 0xde, 0x47, 0xf4, 0xd6, 0x8b, 0x80, 0x56, 0xf1, 0x08, 0x93, 0x06, 0x94, 0x9d, 0x60, 0x13, 0x72, + 0x8c, 0xd4, 0xe2, 0xe4, 0x55, 0xd9, 0x43, 0xca, 0xba, 0x84, 0xd2, 0xbe, 0x47, 0x02, 0x90, 0x1f, + 0x50, 0xbb, 0x77, 0x67, 0xd7, 0x32, 0xd2, 0x1e, 0xda, 0x13, 0xfb, 0xce, 0xae, 0x69, 0x56, 0x0c, + 0xc5, 0xc1, 0x12, 0x9d, 0x55, 0xb4, 0xdd, 0x90, 0x7f, 0xc1, 0x50, 0xbd, 0x68, 0xaa, 0x97, 0xf3, + 0x83, 0x5e, 0x5e, 0x43, 0x5a, 0xb2, 0x34, 0xf7, 0xc4, 0x72, 0x43, 0x55, 0xa8, 0xdc, 0xff, 0x68, + 0xbb, 0x51, 0xc3, 0x32, 0xa8, 0x34, 0xad, 0x3f, 0xa0, 0xb4, 0x0f, 0x4a, 0xaa, 0x0e, 0xba, 0x9d, + 0x41, 0xb2, 0x21, 0x0f, 0x0f, 0xd7, 0x2c, 0x5a, 0xfe, 0x77, 0x55, 0xd3, 0x2c, 0x07, 0x0a, 0x43, + 0x26, 0xd8, 0x18, 0x77, 0x07, 0x43, 0xd2, 0x0e, 0x87, 0x44, 0xc0, 0x70, 0x99, 0x60, 0xe9, 0x1d, + 0x29, 0x5b, 0x4a, 0xe5, 0xc5, 0xe9, 0xfd, 0xe8, 0x5e, 0x2c, 0xef, 0xc3, 0xe1, 0xc8, 0x04, 0xba, + 0xf2, 0x3e, 0xe4, 0x8c, 0xb3, 0xb4, 0x94, 0x32, 0x3d, 0x61, 0xf5, 0x21, 0x77, 0xc3, 0x84, 0xb3, + 0x24, 0xbf, 0x42, 0x3e, 0xe4, 0x38, 0xf7, 0x1e, 0xd3, 0x4b, 0x4d, 0x11, 0xb9, 0x80, 0x8a, 0xb7, + 0xf0, 0x03, 0x8e, 0xd3, 0xd9, 0x4e, 0x60, 0xa4, 0x6a, 0x95, 0x68, 0x39, 0xe1, 0xfa, 0x92, 0xfa, + 0xfb, 0x0c, 0xaa, 0xc7, 0x4a, 0xc8, 0x9d, 0x67, 0x18, 0xd5, 0x32, 0xfd, 0xff, 0x3f, 0x3f, 0xd7, + 0xb5, 0xa7, 0xe7, 0xba, 0xf6, 0xf5, 0xb9, 0xae, 0x7d, 0x7c, 0xa9, 0x67, 0x9e, 0x5e, 0xea, 0x99, + 0x2f, 0x2f, 0xf5, 0xcc, 0xfb, 0x8b, 0x85, 0x27, 0x96, 0xdb, 0x59, 0xcb, 0x09, 0x36, 0x6d, 0x77, + 0xc1, 0x59, 0xb8, 0xfc, 0xc7, 0x0b, 0xda, 0xc9, 0x3c, 0xdb, 0xf1, 0x55, 0x3b, 0x9c, 0xcd, 0xf2, + 0xea, 0x49, 0xe9, 0x7e, 0x0b, 0x00, 0x00, 0xff, 0xff, 0x16, 0xf2, 0x9d, 0x4d, 0x65, 0x04, 0x00, + 0x00, } func (m *KV) Marshal() (dAtA []byte, err error) { @@ -630,6 +671,11 @@ func (m *KV) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.Kind != 0 { + i = encodeVarintBadgerpb4(dAtA, i, uint64(m.Kind)) + i-- + dAtA[i] = 0x60 + } if m.StreamDone { i-- if m.StreamDone { @@ -980,6 +1026,9 @@ func (m *KV) Size() (n int) { if m.StreamDone { n += 2 } + if m.Kind != 0 { + n += 1 + sovBadgerpb4(uint64(m.Kind)) + } return n } @@ -1346,6 +1395,25 @@ func (m *KV) Unmarshal(dAtA []byte) error { } } m.StreamDone = bool(v != 0) + case 12: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Kind", wireType) + } + m.Kind = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowBadgerpb4 + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Kind |= KV_Kind(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipBadgerpb4(dAtA[iNdEx:]) diff --git a/pb/badgerpb4.proto b/pb/badgerpb4.proto index 079c1cfee..8e6702574 100644 --- a/pb/badgerpb4.proto +++ b/pb/badgerpb4.proto @@ -33,6 +33,13 @@ message KV { uint32 stream_id = 10; // Stream done is used to indicate end of stream. bool stream_done = 11; + + enum Kind { + KEY = 0; + DATA_KEY = 1; + FILE = 2; + } + Kind kind = 12; } message KVList { diff --git a/stream.go b/stream.go index 21c5e9926..4c17581b3 100644 --- a/stream.go +++ b/stream.go @@ -25,8 +25,10 @@ import ( "time" humanize "github.com/dustin/go-humanize" + "github.com/pkg/errors" "github.com/dgraph-io/badger/v4/pb" + "github.com/dgraph-io/badger/v4/table" "github.com/dgraph-io/badger/v4/y" "github.com/dgraph-io/ristretto/z" ) @@ -83,7 +85,9 @@ type Stream struct { Send func(buf *z.Buffer) error // Read data above the sinceTs. All keys with version =< sinceTs will be ignored. - SinceTs uint64 + SinceTs uint64 + // FullCopy should be set to true only when encryption mode is same for sender and receiver. + FullCopy bool readTs uint64 db *DB rangeCh chan keyRange @@ -108,9 +112,6 @@ func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) { list := &pb.KVList{} for ; itr.Valid(); itr.Next() { item := itr.Item() - if item.IsDeletedOrExpired() { - break - } if !bytes.Equal(key, item.Key()) { // Break out on the first encounter with another key. break @@ -128,6 +129,8 @@ func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) { } kv.Version = item.Version() kv.ExpiresAt = item.ExpiresAt() + // As we do full copy, we need to transmit only if it is a delete key or not. + kv.Meta = []byte{item.meta & bitDelete} kv.UserMeta = a.Copy([]byte{item.UserMeta()}) list.Kv = append(list.Kv, kv) @@ -138,6 +141,12 @@ func (st *Stream) ToList(key []byte, itr *Iterator) (*pb.KVList, error) { if item.DiscardEarlierVersions() { break } + if item.IsDeletedOrExpired() { + // We do a FullCopy in stream. It might happen that tables from L6 contain K(version=1), + // while the table at L4 that was not copied contains K(version=2) with delete mark. + // Hence, we need to send the deleted or expired item too. + break + } } return list, nil } @@ -164,18 +173,10 @@ func (st *Stream) produceRanges(ctx context.Context) { } // produceKVs picks up ranges from rangeCh, generates KV lists and sends them to kvChan. -func (st *Stream) produceKVs(ctx context.Context, threadId int) error { +func (st *Stream) produceKVs(ctx context.Context, itr *Iterator) error { st.numProducers.Add(1) defer st.numProducers.Add(-1) - var txn *Txn - if st.readTs > 0 { - txn = st.db.NewTransactionAt(st.readTs, false) - } else { - txn = st.db.NewTransaction(false) - } - defer txn.Discard() - // produceKVs is running iterate serially. So, we can define the outList here. outList := z.NewBuffer(2*batchSize, "Stream.ProduceKVs") defer func() { @@ -185,15 +186,6 @@ func (st *Stream) produceKVs(ctx context.Context, threadId int) error { }() iterate := func(kr keyRange) error { - iterOpts := DefaultIteratorOptions - iterOpts.AllVersions = true - iterOpts.Prefix = st.Prefix - iterOpts.PrefetchValues = false - iterOpts.SinceTs = st.SinceTs - itr := txn.NewIterator(iterOpts) - itr.ThreadId = threadId - defer itr.Close() - itr.Alloc = z.NewAllocator(1<<20, "Stream.Iterate") defer itr.Alloc.Release() @@ -377,6 +369,77 @@ outer: return nil } +func (st *Stream) copyTablesOver(ctx context.Context, tableMatrix [][]*table.Table) error { + // TODO: See if making this concurrent would be helpful. Most likely it won't. + // But, if it does work, then most like <3 goroutines might be sufficient. + infof := st.db.opt.Infof + // Make a copy of the manifest so that we don't have race condition. + manifest := st.db.manifest.manifest.clone() + dataKeys := make(map[uint64]struct{}) + // Iterate in reverse order so that the receiver gets the bottommost level first. + for i := len(tableMatrix) - 1; i >= 0; i-- { + level := i + tables := tableMatrix[i] + for _, t := range tables { + // This table can be picked for copying directly. + out := z.NewBuffer(int(t.Size())+1024, "Stream.Table") + if dk := t.DataKey(); dk != nil { + y.AssertTrue(dk.KeyId != 0) + // If we have a legit data key, send it over so the table can be decrypted. The same + // data key could have been used to encrypt many tables. Avoid sending it + // repeatedly. + if _, sent := dataKeys[dk.KeyId]; !sent { + infof("Sending data key with ID: %d\n", dk.KeyId) + val, err := dk.Marshal() + y.Check(err) + + // This would go to key registry in destination. + kv := &pb.KV{ + Value: val, + Kind: pb.KV_DATA_KEY, + } + KVToBuffer(kv, out) + dataKeys[dk.KeyId] = struct{}{} + } + } + + infof("Sending table ID: %d at level: %d. Size: %s\n", + t.ID(), level, humanize.IBytes(uint64(t.Size()))) + tableManifest := manifest.Tables[t.ID()] + change := pb.ManifestChange{ + Op: pb.ManifestChange_CREATE, + Level: uint32(level), + KeyId: tableManifest.KeyID, + // Hard coding it, since we're supporting only AES for now. + EncryptionAlgo: pb.EncryptionAlgo_aes, + Compression: uint32(tableManifest.Compression), + } + + buf, err := change.Marshal() + y.Check(err) + + // We send the table along with level to the destination, so they'd know where to + // place the tables. We'd send all the tables first, before we start streaming. So, the + // destination DB would write streamed keys one level above. + kv := &pb.KV{ + // Key can be used for MANIFEST. + Key: buf, + Value: t.Data, + Kind: pb.KV_FILE, + } + KVToBuffer(kv, out) + + select { + case st.kvChan <- out: + case <-ctx.Done(): + _ = out.Release() + return ctx.Err() + } + } + } + return nil +} + // Orchestrate runs Stream. It picks up ranges from the SSTables, then runs NumGo number of // goroutines to iterate over these ranges and batch up KVs in lists. It concurrently runs a single // goroutine to pick these lists, batch them up further and send to Output.Send. Orchestrate also @@ -384,6 +447,11 @@ outer: // are serial. In case any of these steps encounter an error, Orchestrate would stop execution and // return that error. Orchestrate can be called multiple times, but in serial order. func (st *Stream) Orchestrate(ctx context.Context) error { + if st.FullCopy { + if !st.db.opt.managedTxns || st.SinceTs != 0 || st.ChooseKey != nil && st.KeyToList != nil { + panic("Got invalid stream options when doing full copy") + } + } ctx, cancel := context.WithCancel(ctx) defer cancel() st.rangeCh = make(chan keyRange, 3) // Contains keys for posting lists. @@ -397,7 +465,127 @@ func (st *Stream) Orchestrate(ctx context.Context) error { st.KeyToList = st.ToList } + // Pick up key-values from kvChan and send to stream. + kvErr := make(chan error, 1) + go func() { + // Picks up KV lists from kvChan, and sends them to Output. + err := st.streamKVs(ctx) + if err != nil { + cancel() // Stop all the go routines. + } + kvErr <- err + }() + + // Pick all relevant tables from levels. We'd use this to copy them over, + // or generate iterators from them. + memTables, decr := st.db.getMemTables() + defer decr() + + opts := DefaultIteratorOptions + opts.Prefix = st.Prefix + opts.SinceTs = st.SinceTs + tableMatrix := st.db.lc.getTables(&opts) + defer func() { + for _, tables := range tableMatrix { + for _, t := range tables { + _ = t.DecrRef() + } + } + }() + y.AssertTrue(len(tableMatrix) == st.db.opt.MaxLevels) + + infof := st.db.opt.Infof + copyTables := func() error { + // Figure out which tables we can copy. Only choose from the last 2 levels. + // Say last level has data of size 100. Given a 10x level multiplier and + // assuming the tree is balanced, second last level would have 10, and the + // third last level would have 1. The third last level would only have 1% + // of the data of the last level. It's OK for us to stop there and just + // stream it, instead of trying to copy over those tables too. When we + // copy over tables to Level i, we can't stream any data to level i, i+1, + // and so on. The stream has to create tables at level i-1, so there can be + // overlap between the tables at i-1 and i. + + // Let's pick the tables which can be fully copied over from last level. + threshold := len(tableMatrix) - 2 + toCopy := make([][]*table.Table, len(tableMatrix)) + var numCopy, numStream int + for lev, tables := range tableMatrix { + // We stream only the data in the two bottommost levels. + if lev < threshold { + numStream += len(tables) + continue + } + var rem []*table.Table + cp := tables[:0] + for _, t := range tables { + // We can only copy over those tables that satisfy following conditions: + // - All the keys have version less than st.readTs + // - st.Prefix fully covers the table + if t.MaxVersion() > st.readTs || !t.CoveredByPrefix(st.Prefix) { + rem = append(rem, t) + continue + } + cp = append(cp, t) + } + toCopy[lev] = cp // Pick tables to copy. + tableMatrix[lev] = rem // Keep remaining for streaming. + numCopy += len(cp) + numStream += len(rem) + } + infof("Num tables to copy: %d. Num to stream: %d\n", numCopy, numStream) + + return st.copyTablesOver(ctx, toCopy) + } + + if st.FullCopy { + // As of now, we don't handle the non-zero SinceTs. + if err := copyTables(); err != nil { + return errors.Wrap(err, "while copying tables") + } + } + + var txn *Txn + if st.readTs > 0 { + txn = st.db.NewTransactionAt(st.readTs, false) + } else { + txn = st.db.NewTransaction(false) + } + defer txn.Discard() + + newIterator := func(threadId int) *Iterator { + var itrs []y.Iterator + for _, mt := range memTables { + itrs = append(itrs, mt.sl.NewUniIterator(false)) + } + if tables := tableMatrix[0]; len(tables) > 0 { + itrs = append(itrs, iteratorsReversed(tables, 0)...) + } + for _, tables := range tableMatrix[1:] { + if len(tables) == 0 { + continue + } + itrs = append(itrs, table.NewConcatIterator(tables, 0)) + } + + opt := DefaultIteratorOptions + opt.AllVersions = true + opt.Prefix = st.Prefix + opt.PrefetchValues = false + opt.SinceTs = st.SinceTs + + res := &Iterator{ + txn: txn, + iitr: table.NewMergeIterator(itrs, false), + opt: opt, + readTs: txn.readTs, + ThreadId: threadId, + } + return res + } + // Picks up ranges from Badger, and sends them to rangeCh. + // Just for simplicity, we'd consider all the tables for range production. go st.produceRanges(ctx) errCh := make(chan error, st.NumGo) // Stores error by consumeKeys. @@ -408,7 +596,9 @@ func (st *Stream) Orchestrate(ctx context.Context) error { go func(threadId int) { defer wg.Done() // Picks up ranges from rangeCh, generates KV lists, and sends them to kvChan. - if err := st.produceKVs(ctx, threadId); err != nil { + itr := newIterator(threadId) + defer itr.Close() + if err := st.produceKVs(ctx, itr); err != nil { select { case errCh <- err: default: @@ -417,16 +607,6 @@ func (st *Stream) Orchestrate(ctx context.Context) error { }(i) } - // Pick up key-values from kvChan and send to stream. - kvErr := make(chan error, 1) - go func() { - // Picks up KV lists from kvChan, and sends them to Output. - err := st.streamKVs(ctx) - if err != nil { - cancel() // Stop all the go routines. - } - kvErr <- err - }() wg.Wait() // Wait for produceKVs to be over. close(st.kvChan) // Now we can close kvChan. defer func() { diff --git a/stream_writer.go b/stream_writer.go index 6618b2f7c..c1549abac 100644 --- a/stream_writer.go +++ b/stream_writer.go @@ -22,6 +22,7 @@ import ( "sync" humanize "github.com/dustin/go-humanize" + "github.com/golang/protobuf/proto" "github.com/pkg/errors" "github.com/dgraph-io/badger/v4/pb" @@ -42,12 +43,18 @@ import ( // StreamWriter should not be called on in-use DB instances. It is designed only to bootstrap new // DBs. type StreamWriter struct { - writeLock sync.Mutex - db *DB - done func() - throttle *y.Throttle - maxVersion uint64 - writers map[uint32]*sortedWriter + writeLock sync.Mutex + db *DB + done func() + throttle *y.Throttle + maxVersion uint64 + writers map[uint32]*sortedWriter + prevLevel int + senderPrevLevel int + keyId map[uint64]*pb.DataKey // map stores reader's keyId to data key. + // Writer might receive tables first, and then receive keys. If true, that means we have + // started processing keys. + processingKeys bool } // NewStreamWriter creates a StreamWriter. Right after creating StreamWriter, Prepare must be @@ -61,6 +68,7 @@ func (db *DB) NewStreamWriter() *StreamWriter { // concurrent streams being processed. throttle: y.NewThrottle(16), writers: make(map[uint32]*sortedWriter), + keyId: make(map[uint64]*pb.DataKey), } } @@ -109,7 +117,64 @@ func (sw *StreamWriter) Write(buf *z.Buffer) error { if _, ok := closedStreams[kv.StreamId]; ok { panic(fmt.Sprintf("write performed on closed stream: %d", kv.StreamId)) } + switch kv.Kind { + case pb.KV_DATA_KEY: + y.AssertTrue(len(sw.db.opt.EncryptionKey) > 0) + var dk pb.DataKey + if err := proto.Unmarshal(kv.Value, &dk); err != nil { + return errors.Wrapf(err, "unmarshal failed %s", kv.Value) + } + readerId := dk.KeyId + if _, ok := sw.keyId[readerId]; !ok { + // Insert the data key to the key registry if not already inserted. + id, err := sw.db.registry.AddKey(dk) + if err != nil { + return errors.Wrap(err, "failed to write data key") + } + dk.KeyId = id + sw.keyId[readerId] = &dk + } + return nil + case pb.KV_FILE: + // All tables should be recieved before any of the keys. + if sw.processingKeys { + return errors.New("Received pb.KV_FILE after pb.KV_KEY") + } + var change pb.ManifestChange + if err := proto.Unmarshal(kv.Key, &change); err != nil { + return errors.Wrap(err, "unable to unmarshal manifest change") + } + level := int(change.Level) + if sw.senderPrevLevel == 0 { + // We received the first file, set the sender's and receiver's max levels. + sw.senderPrevLevel = level + sw.prevLevel = len(sw.db.lc.levels) - 1 + } + // This is based on the assumption that the tables from the last + // level will be sent first and then the second last level tables. + // As long as the kv.Version (which stores the level) is same as + // the prevLevel, we know we're processing a last level table. + // The last level for this DB can be 8 while the DB that's sending + // this could have the last level at 7. + if sw.senderPrevLevel != level { + // If the previous level and the current level is different, we + // must be processing a table from the next last level. + sw.senderPrevLevel = level + sw.prevLevel-- + } + dk := sw.keyId[change.KeyId] + return sw.db.lc.AddTable(&kv, sw.prevLevel, dk, &change) + case pb.KV_KEY: + // Pass. The following code will handle the keys. + } + + sw.processingKeys = true + if sw.prevLevel == 0 { + // If prevLevel is 0, that means that we have not written anything yet. Equivalently, + // we were virtually writing to the maxLevel+1. + sw.prevLevel = len(sw.db.lc.levels) + } var meta, userMeta byte if len(kv.Meta) > 0 { meta = kv.Meta[0] @@ -285,6 +350,7 @@ type sortedWriter struct { builder *table.Builder lastKey []byte + level int streamID uint32 reqCh chan *request // Have separate closer for each writer, as it can be closed at any time. @@ -304,6 +370,7 @@ func (sw *StreamWriter) newWriter(streamID uint32) (*sortedWriter, error) { builder: table.NewTableBuilder(bopts), reqCh: make(chan *request, 3), closer: z.NewCloser(1), + level: sw.prevLevel - 1, // Write at the level just above the one we were writing to. } go w.handleRequests() @@ -435,7 +502,7 @@ func (w *sortedWriter) createTable(builder *table.Builder) error { } lc := w.db.lc - lhandler := lc.levels[len(lc.levels)-1] + lhandler := lc.levels[w.level] // Now that table can be opened successfully, let's add this to the MANIFEST. change := &pb.ManifestChange{ Id: tbl.ID(), diff --git a/stream_writer_test.go b/stream_writer_test.go index 2535fd22f..48221fb02 100644 --- a/stream_writer_test.go +++ b/stream_writer_test.go @@ -577,3 +577,26 @@ func TestStreamWriterEncrypted(t *testing.T) { require.NoError(t, db.Close()) } + +// Test that stream writer does not crashes with large values in managed mode. +func TestStreamWriterWithLargeValue(t *testing.T) { + opts := DefaultOptions("") + opts.managedTxns = true + runBadgerTest(t, &opts, func(t *testing.T, db *DB) { + buf := z.NewBuffer(10<<20, "test") + defer func() { require.NoError(t, buf.Release()) }() + val := make([]byte, 10<<20) + _, err := rand.Read(val) + require.NoError(t, err) + KVToBuffer(&pb.KV{ + Key: []byte("key"), + Value: val, + Version: 1, + }, buf) + + sw := db.NewStreamWriter() + require.NoError(t, sw.Prepare(), "sw.Prepare() failed") + require.NoError(t, sw.Write(buf), "sw.Write() failed") + require.NoError(t, sw.Flush(), "sw.Flush() failed") + }) +} diff --git a/table/table.go b/table/table.go index 0bbc91089..06c33347a 100644 --- a/table/table.go +++ b/table/table.go @@ -144,10 +144,13 @@ func (t *Table) UncompressedSize() uint32 { return t.cheapIndex().UncompressedSi // KeyCount is the total number of keys in this table. func (t *Table) KeyCount() uint32 { return t.cheapIndex().KeyCount } -// OnDiskSize returns the total size of key-values stored in this table (including the -// disk space occupied on the value log). +// OnDiskSize returns the total size of key-values stored in this table +// (including the disk space occupied on the value log). func (t *Table) OnDiskSize() uint32 { return t.cheapIndex().OnDiskSize } +// DataKey returns the encryption key +func (t *Table) DataKey() *pb.DataKey { return t.opt.DataKey } + // CompressionType returns the compression algorithm used for block compression. func (t *Table) CompressionType() options.CompressionType { return t.opt.Compression @@ -256,7 +259,21 @@ func (b *block) verifyCheckSum() error { func CreateTable(fname string, builder *Builder) (*Table, error) { bd := builder.Done() - mf, err := z.OpenMmapFile(fname, os.O_CREATE|os.O_RDWR|os.O_EXCL, bd.Size) + mf, err := newFile(fname, bd.Size) + if err != nil { + return nil, err + } + + written := bd.Copy(mf.Data) + y.AssertTrue(written == len(mf.Data)) + if err := z.Msync(mf.Data); err != nil { + return nil, y.Wrapf(err, "while calling msync on %s", fname) + } + return OpenTable(mf, *builder.opts) +} + +func newFile(fname string, sz int) (*z.MmapFile, error) { + mf, err := z.OpenMmapFile(fname, os.O_CREATE|os.O_RDWR|os.O_EXCL, sz) if err == z.NewFile { // Expected. } else if err != nil { @@ -264,13 +281,22 @@ func CreateTable(fname string, builder *Builder) (*Table, error) { } else { return nil, errors.Errorf("file already exists: %s", fname) } + return mf, nil +} - written := bd.Copy(mf.Data) +func CreateTableFromBuffer(fname string, buf []byte, opts Options) (*Table, error) { + mf, err := newFile(fname, len(buf)) + if err != nil { + return nil, err + } + + // We cannot use the buf directly here because it is not mmapped. + written := copy(mf.Data, buf) y.AssertTrue(written == len(mf.Data)) if err := z.Msync(mf.Data); err != nil { return nil, y.Wrapf(err, "while calling msync on %s", fname) } - return OpenTable(mf, *builder.opts) + return OpenTable(mf, opts) } // OpenTable assumes file has only one table and opens it. Takes ownership of fd upon function @@ -693,6 +719,12 @@ func (t *Table) DoesNotHave(hash uint32) bool { return !mayContain } +// CoveredByPrefix returns true if all the keys in the table are prefixed by the given prefix. +func (t *Table) CoveredByPrefix(prefix []byte) bool { + return bytes.HasPrefix(y.ParseKey(t.Biggest()), prefix) && + bytes.HasPrefix(y.ParseKey(t.Smallest()), prefix) +} + // readTableIndex reads table index from the sst and returns its pb format. func (t *Table) readTableIndex() (*fb.TableIndex, error) { data := t.readNoFail(t.indexStart, t.indexLen)