diff --git a/core/blockchain.go b/core/blockchain.go index 825dce4e0..d6f9340fd 100644 --- a/core/blockchain.go +++ b/core/blockchain.go @@ -195,6 +195,7 @@ func (c *CacheConfig) triedbConfig(keepFunc pathdb.NotifyKeepFunc) *triedb.Confi NotifyKeep: keepFunc, JournalFilePath: c.JournalFilePath, JournalFile: c.JournalFile, + UseBase: c.UseBase, } } return config @@ -216,6 +217,7 @@ var defaultCacheConfig = &CacheConfig{ func DefaultCacheConfigWithScheme(scheme string) *CacheConfig { config := *defaultCacheConfig config.StateScheme = scheme + config.UseBase = true return &config } @@ -2083,7 +2085,7 @@ func (bc *BlockChain) insertChain(chain types.Blocks, setHead bool) (int, error) if bc.snaps != nil && !minerMode { snapDiffItems, snapBufItems = bc.snaps.Size() } - + var trieDiffNodes, trieBufNodes, trieImmutableBufNodes common.StorageSize if !minerMode { trieDiffNodes, trieBufNodes, trieImmutableBufNodes, _ = bc.triedb.Size() diff --git a/core/blockchain_snapshot_test.go b/core/blockchain_snapshot_test.go index 348cc3f47..e2a5a2c9c 100644 --- a/core/blockchain_snapshot_test.go +++ b/core/blockchain_snapshot_test.go @@ -81,7 +81,9 @@ func (basic *snapshotTestBasic) prepare(t *testing.T) (*BlockChain, []*types.Blo } engine = ethash.NewFullFaker() ) - chain, err := NewBlockChain(db, DefaultCacheConfigWithScheme(basic.scheme), gspec, nil, engine, vm.Config{}, nil, nil) + cacheConfig := DefaultCacheConfigWithScheme(basic.scheme) + cacheConfig.UseBase = true + chain, err := NewBlockChain(db, cacheConfig, gspec, nil, engine, vm.Config{}, nil, nil) if err != nil { t.Fatalf("Failed to create chain: %v", err) } @@ -180,11 +182,11 @@ func (basic *snapshotTestBasic) dump() string { } fmt.Fprint(buffer, "\n") - //if crash { + // if crash { // fmt.Fprintf(buffer, "\nCRASH\n\n") - //} else { + // } else { // fmt.Fprintf(buffer, "\nSetHead(%d)\n\n", basic.setHead) - //} + // } fmt.Fprintf(buffer, "------------------------------\n\n") fmt.Fprint(buffer, "Expected in leveldb:\n G") @@ -228,7 +230,10 @@ func (snaptest *snapshotTest) test(t *testing.T) { // Restart the chain normally chain.Stop() - newchain, err := NewBlockChain(snaptest.db, DefaultCacheConfigWithScheme(snaptest.scheme), snaptest.gspec, nil, snaptest.engine, vm.Config{}, nil, nil) + + cacheConfig := DefaultCacheConfigWithScheme(snaptest.scheme) + cacheConfig.UseBase = true + newchain, err := NewBlockChain(snaptest.db, cacheConfig, snaptest.gspec, nil, snaptest.engine, vm.Config{}, nil, nil) if err != nil { t.Fatalf("Failed to recreate chain: %v", err) } @@ -313,6 +318,7 @@ func (snaptest *gappedSnapshotTest) test(t *testing.T) { SnapshotLimit: 0, StateScheme: snaptest.scheme, } + cacheConfig.UseBase = true newchain, err := NewBlockChain(snaptest.db, cacheConfig, snaptest.gspec, nil, snaptest.engine, vm.Config{}, nil, nil) if err != nil { t.Fatalf("Failed to recreate chain: %v", err) @@ -321,7 +327,9 @@ func (snaptest *gappedSnapshotTest) test(t *testing.T) { newchain.Stop() // Restart the chain with enabling the snapshot - newchain, err = NewBlockChain(snaptest.db, DefaultCacheConfigWithScheme(snaptest.scheme), snaptest.gspec, nil, snaptest.engine, vm.Config{}, nil, nil) + config := DefaultCacheConfigWithScheme(snaptest.scheme) + config.UseBase = true + newchain, err = NewBlockChain(snaptest.db, config, snaptest.gspec, nil, snaptest.engine, vm.Config{}, nil, nil) if err != nil { t.Fatalf("Failed to recreate chain: %v", err) } @@ -349,7 +357,9 @@ func (snaptest *setHeadSnapshotTest) test(t *testing.T) { chain.SetHead(snaptest.setHead) chain.Stop() - newchain, err := NewBlockChain(snaptest.db, DefaultCacheConfigWithScheme(snaptest.scheme), snaptest.gspec, nil, snaptest.engine, vm.Config{}, nil, nil) + cacheConfig := DefaultCacheConfigWithScheme(snaptest.scheme) + cacheConfig.UseBase = true + newchain, err := NewBlockChain(snaptest.db, cacheConfig, snaptest.gspec, nil, snaptest.engine, vm.Config{}, nil, nil) if err != nil { t.Fatalf("Failed to recreate chain: %v", err) } @@ -385,6 +395,7 @@ func (snaptest *wipeCrashSnapshotTest) test(t *testing.T) { SnapshotLimit: 0, StateScheme: snaptest.scheme, } + config.UseBase = true newchain, err := NewBlockChain(snaptest.db, config, snaptest.gspec, nil, snaptest.engine, vm.Config{}, nil, nil) if err != nil { t.Fatalf("Failed to recreate chain: %v", err) @@ -402,6 +413,7 @@ func (snaptest *wipeCrashSnapshotTest) test(t *testing.T) { SnapshotWait: false, // Don't wait rebuild StateScheme: snaptest.scheme, } + config.UseBase = true tmp, err := NewBlockChain(snaptest.db, config, snaptest.gspec, nil, snaptest.engine, vm.Config{}, nil, nil) if err != nil { t.Fatalf("Failed to recreate chain: %v", err) @@ -411,7 +423,9 @@ func (snaptest *wipeCrashSnapshotTest) test(t *testing.T) { tmp.triedb.Close() tmp.stopWithoutSaving() - newchain, err = NewBlockChain(snaptest.db, DefaultCacheConfigWithScheme(snaptest.scheme), snaptest.gspec, nil, snaptest.engine, vm.Config{}, nil, nil) + cacheConfig := DefaultCacheConfigWithScheme(snaptest.scheme) + cacheConfig.UseBase = true + newchain, err = NewBlockChain(snaptest.db, cacheConfig, snaptest.gspec, nil, snaptest.engine, vm.Config{}, nil, nil) if err != nil { t.Fatalf("Failed to recreate chain: %v", err) } diff --git a/core/rawdb/freezer.go b/core/rawdb/freezer.go index 54b911f1c..ed0d58238 100644 --- a/core/rawdb/freezer.go +++ b/core/rawdb/freezer.go @@ -127,7 +127,7 @@ func NewFreezer(datadir string, namespace string, readonly, writeTrieNode bool, // Create the tables. for name, disableSnappy := range tables { if name == stateHistoryTrieNodesData && !writeTrieNode { - log.Info("Not create trie node data") + log.Info("Not create trie node data in freezer db") continue } table, err := newTable(datadir, name, readMeter, writeMeter, sizeGauge, maxTableSize, disableSnappy, readonly) diff --git a/eth/backend.go b/eth/backend.go index 380498c52..28fc33d51 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -134,6 +134,17 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { config.Miner.GasPrice = new(big.Int).Set(ethconfig.Defaults.Miner.GasPrice) } + // Assemble the Ethereum object + chainDb, err := stack.OpenAndMergeDatabase("chaindata", ChainDBNamespace, false, config.DatabaseCache, config.DatabaseHandles, + config.DatabaseFreezer) + if err != nil { + return nil, err + } + config.StateScheme, err = rawdb.ParseStateScheme(config.StateScheme, chainDb) + if err != nil { + return nil, err + } + if config.StateScheme == rawdb.HashScheme && config.NoPruning && config.TrieDirtyCache > 0 { if config.SnapshotCache > 0 { config.TrieCleanCache += config.TrieDirtyCache * 3 / 5 @@ -153,21 +164,11 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { config.TrieCleanCache += config.TrieDirtyCache - pathdb.MaxBufferSize/1024/1024 config.TrieDirtyCache = pathdb.MaxBufferSize / 1024 / 1024 } - log.Info("Allocated memory caches", - "state_scheme", config.StateScheme, + log.Info("Allocated memory caches", "state_scheme", config.StateScheme, "trie_clean_cache", common.StorageSize(config.TrieCleanCache)*1024*1024, "trie_dirty_cache", common.StorageSize(config.TrieDirtyCache)*1024*1024, "snapshot_cache", common.StorageSize(config.SnapshotCache)*1024*1024) - // Assemble the Ethereum object - chainDb, err := stack.OpenAndMergeDatabase("chaindata", ChainDBNamespace, false, config.DatabaseCache, config.DatabaseHandles, - config.DatabaseFreezer) - if err != nil { - return nil, err - } - config.StateScheme, err = rawdb.ParseStateScheme(config.StateScheme, chainDb) - if err != nil { - return nil, err - } + // Try to recover offline state pruning only in hash-based. if config.StateScheme == rawdb.HashScheme { if err := pruner.RecoverPruning(stack.ResolvePath(""), chainDb); err != nil { diff --git a/triedb/pathdb/database.go b/triedb/pathdb/database.go index d2cd0b997..e9d5c8230 100644 --- a/triedb/pathdb/database.go +++ b/triedb/pathdb/database.go @@ -368,7 +368,7 @@ func (db *Database) Enable(root common.Hash) error { // Re-construct a new disk layer backed by persistent state // with **empty clean cache and node buffer**. nb, err := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0, db.config.ProposeBlockInterval, - db.config.NotifyKeep, nil, false, false) + db.config.NotifyKeep, db.freezer, false, false) if err != nil { log.Error("Failed to new trie node buffer", "error", err) return err diff --git a/triedb/pathdb/journal.go b/triedb/pathdb/journal.go index 586e2800e..fab4495d8 100644 --- a/triedb/pathdb/journal.go +++ b/triedb/pathdb/journal.go @@ -271,9 +271,6 @@ func (db *Database) loadLayers() layer { if (errors.Is(err, errMissJournal) || errors.Is(err, errUnmatchedJournal)) && db.fastRecovery && db.config.TrieNodeBufferType == NodeBufferList && !db.useBase { start := time.Now() - if db.freezer == nil { - log.Crit("Use unopened freezer db to recover node buffer list") - } log.Info("Recover node buffer list from ancient db") nb, err = NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nil, 0, @@ -333,23 +330,13 @@ func (db *Database) loadDiskLayer(r *rlp.Stream, journalTypeForReader JournalTyp if stored > id { return nil, fmt.Errorf("invalid state id: stored %d resolved %d", stored, id) } + // Resolve nodes cached in node buffer var encoded []journalNodes if err := journalBuf.Decode(&encoded); err != nil { return nil, fmt.Errorf("failed to load disk nodes: %v", err) } - nodes := make(map[common.Hash]map[string]*trienode.Node) - for _, entry := range encoded { - subset := make(map[string]*trienode.Node) - for _, n := range entry.Nodes { - if len(n.Blob) > 0 { - subset[string(n.Path)] = trienode.New(crypto.Keccak256Hash(n.Blob), n.Blob) - } else { - subset[string(n.Path)] = trienode.NewDeleted() - } - } - nodes[entry.Owner] = subset - } + nodes := flattenTrieNodes(encoded) if journalTypeForReader == JournalFileType { var shaSum [32]byte @@ -365,11 +352,24 @@ func (db *Database) loadDiskLayer(r *rlp.Stream, journalTypeForReader JournalTyp // Calculate the internal state transitions by id difference. nb, err := NewTrieNodeBuffer(db.diskdb, db.config.TrieNodeBufferType, db.bufferSize, nodes, id-stored, db.config.ProposeBlockInterval, - db.config.NotifyKeep, nil, false, db.useBase) + db.config.NotifyKeep, db.freezer, db.fastRecovery, db.useBase) if err != nil { log.Error("Failed to new trie node buffer", "error", err) return nil, err } + + if db.config.TrieNodeBufferType == NodeBufferList && !db.useBase && db.fastRecovery { + recoveredRoot, recoveredStateID, _ := nb.getLatestStatus() + if recoveredRoot != root && recoveredStateID != id { + log.Error("Recovered state root and state id are different from recording ones", + "recovered_root", recoveredRoot, "root", root, "recovered_state_id", recoveredStateID, "id", id) + return nil, errors.New("Unmatched root and state id with recovered") + } + + log.Info("Disk layer finishes recovering node buffer list", "latest root hash", recoveredRoot.String(), + "latest state_id", recoveredStateID) + } + base := newDiskLayer(root, id, db, nil, nb) nb.setClean(base.cleans) return base, nil @@ -486,14 +486,7 @@ func (dl *diskLayer) journal(w io.Writer, journalType JournalType) error { } // Step three, write all unwritten nodes into the journal bufferNodes := dl.buffer.getAllNodes() - nodes := make([]journalNodes, 0, len(bufferNodes)) - for owner, subset := range bufferNodes { - entry := journalNodes{Owner: owner} - for path, node := range subset { - entry.Nodes = append(entry.Nodes, journalNode{Path: []byte(path), Blob: node.Blob}) - } - nodes = append(nodes, entry) - } + nodes := compressTrieNodes(bufferNodes) if err := rlp.Encode(journalBuf, nodes); err != nil { return err } diff --git a/triedb/pathdb/nodebufferlist.go b/triedb/pathdb/nodebufferlist.go index 212889222..c57fd7e0b 100644 --- a/triedb/pathdb/nodebufferlist.go +++ b/triedb/pathdb/nodebufferlist.go @@ -30,6 +30,9 @@ const ( // DefaultReserveMultiDifflayerNumber defines the default reserve number of multiDifflayer in nodebufferlist. DefaultReserveMultiDifflayerNumber = 3 + + // The max batch size of pebble cannot exceed 4GB, so set maxNodeBufferListSize to 3GB. + maxNodeBufferListSize = 3221225472 ) type KeepRecord struct { @@ -108,113 +111,103 @@ func newNodeBufferList( dlInMd = wpBlocks } - if nodes == nil { - nodes = make(map[common.Hash]map[string]*trienode.Node) - } - var size uint64 - for _, subset := range nodes { - for path, n := range subset { - size += uint64(len(n.Blob) + len(path)) + var base *multiDifflayer + if nodes != nil && useBase { + // after using fast recovery, use ancient db to recover nbl for force kill and graceful kill. + // so this case for now is used in unit test + var size uint64 + for _, subset := range nodes { + for path, n := range subset { + size += uint64(len(n.Blob) + len(path)) + } } + base = newMultiDifflayer(limit, size, common.Hash{}, nodes, layers) + } else { + base = newMultiDifflayer(limit, 0, common.Hash{}, make(map[common.Hash]map[string]*trienode.Node), 0) } - base := newMultiDifflayer(limit, size, common.Hash{}, nodes, layers) - var ( - nf *nodebufferlist - err error - ) + nf := &nodebufferlist{ + db: db, + wpBlocks: wpBlocks, + rsevMdNum: rsevMdNum, + dlInMd: dlInMd, + limit: limit, + base: base, + persistID: rawdb.ReadPersistentStateID(db), + stopCh: make(chan struct{}), + waitStopCh: make(chan struct{}), + forceKeepCh: make(chan struct{}), + waitForceKeepCh: make(chan struct{}), + keepFunc: keepFunc, + } + nf.useBase.Store(useBase) + if !useBase && fastRecovery { - nf, err = recoverNodeBufferList(db, freezer, base, limit, wpBlocks, rsevMdNum, dlInMd) - if err != nil { + if freezer == nil { + log.Crit("Use unopened freezer db to recover node buffer list") + } + + if err := nf.recoverNodeBufferList(freezer); err != nil { log.Error("Failed to recover node buffer list", "error", err) return nil, err } } else { ele := newMultiDifflayer(limit, 0, common.Hash{}, make(map[common.Hash]map[string]*trienode.Node), 0) - nf = &nodebufferlist{ - db: db, - wpBlocks: wpBlocks, - rsevMdNum: rsevMdNum, - dlInMd: dlInMd, - limit: limit, - base: base, - head: ele, - tail: ele, - count: 1, - persistID: rawdb.ReadPersistentStateID(db), - stopCh: make(chan struct{}), - waitStopCh: make(chan struct{}), - forceKeepCh: make(chan struct{}), - waitForceKeepCh: make(chan struct{}), - keepFunc: keepFunc, - } - nf.useBase.Store(useBase) + nf.head = ele + nf.tail = ele + nf.count = 1 } go nf.loop() log.Info("new node buffer list", "proposed block interval", nf.wpBlocks, - "reserve multi difflayers", nf.rsevMdNum, "difflayers in multidifflayer", nf.dlInMd, - "limit", common.StorageSize(limit), "layers", layers, "persist id", nf.persistID, "base_size", size) + "reserve multi diff_layers", nf.rsevMdNum, "diff_layers in multi_diff_layer", nf.dlInMd, + "limit", common.StorageSize(limit), "layers", layers, "persist_id", nf.persistID, "base_size", nf.size) return nf, nil } // recoverNodeBufferList recovers node buffer list -func recoverNodeBufferList(db ethdb.Database, freezer *rawdb.ResettableFreezer, base *multiDifflayer, - limit, wpBlocks, rsevMdNum, dlInMd uint64) (*nodebufferlist, error) { - nbl := &nodebufferlist{ - db: db, - wpBlocks: wpBlocks, - rsevMdNum: rsevMdNum, - dlInMd: dlInMd, - limit: limit, - base: base, - persistID: rawdb.ReadPersistentStateID(db), - stopCh: make(chan struct{}), - waitStopCh: make(chan struct{}), - forceKeepCh: make(chan struct{}), - waitForceKeepCh: make(chan struct{}), - } +func (nf *nodebufferlist) recoverNodeBufferList(freezer *rawdb.ResettableFreezer) error { head, err := freezer.Ancients() if err != nil { log.Error("Failed to get freezer ancients", "error", err) - return nil, err + return err } tail, err := freezer.Tail() if err != nil { log.Error("Failed to get freezer tail", "error", err) - return nil, err + return err } - log.Info("Ancient db meta info", "persistent_state_id", nbl.persistID, "head_state_id", head, - "tail_state_id", tail, "waiting_recover_num", head-nbl.persistID) + log.Info("Ancient db meta info", "persistent_state_id", nf.persistID, "head_state_id", head, + "tail_state_id", tail, "waiting_recover_num", head-nf.persistID) - startStateID := nbl.persistID + 1 + startStateID := nf.persistID + 1 startBlock, err := readBlockNumber(freezer, startStateID) if err != nil { log.Error("Failed to read start block number", "error", err, "tail_state_id", startStateID) - return nil, err + return err } endBlock, err := readBlockNumber(freezer, head) if err != nil { log.Error("Failed to read end block number", "error", err, "head_state_id", head) - return nil, err + return err } - blockIntervals := nbl.createBlockInterval(startBlock, endBlock) - stateIntervals, err := nbl.createStateInterval(freezer, startStateID, head, blockIntervals) + blockIntervals := nf.createBlockInterval(startBlock, endBlock) + stateIntervals, err := nf.createStateInterval(freezer, startStateID, head, blockIntervals) if err != nil { - return nil, err + return err } - log.Info("block intervals info", "blockIntervals", blockIntervals, "stateIntervals", stateIntervals, - "startBlock", startBlock, "endBlock", endBlock) + log.Info("block intervals info", "block_intervals", blockIntervals, "state_intervals", stateIntervals, + "start_block", startBlock, "end_block", endBlock) var eg errgroup.Group - nbl.linkMultiDiffLayers(len(blockIntervals)) - for current, i := nbl.head, 0; current != nil; current, i = current.next, i+1 { + nf.linkMultiDiffLayers(len(blockIntervals)) + for current, i := nf.head, 0; current != nil; current, i = current.next, i+1 { index := i mdl := current eg.Go(func() error { for j := stateIntervals[index][0]; j <= stateIntervals[index][1]; j++ { - h, err := nbl.readStateHistory(freezer, j) + h, err := nf.readStateHistory(freezer, j) if err != nil { log.Error("Failed to read state history", "error", err) return err @@ -228,18 +221,32 @@ func recoverNodeBufferList(db ethdb.Database, freezer *rawdb.ResettableFreezer, }) } if err = eg.Wait(); err != nil { - return nil, err + return err } - for current, i := nbl.head, 0; current != nil; current, i = current.next, i+1 { - nbl.size += current.size - nbl.layers += current.layers + for current, i := nf.head, 0; current != nil; current, i = current.next, i+1 { + nf.size += current.size + nf.layers += current.layers + } + + log.Info("Before diffToBase", "base_size", nf.base.size, "tail_state_id", nf.tail.id, "head_state_id", nf.head.id, + "nbl_layers", nf.layers, "base_layers", nf.base.layers, "nf_count", nf.count, "node_buffer_size", nf.size) + + if nf.size >= maxNodeBufferListSize && nf.layers == DefaultReserveMultiDifflayerNumber { + // avoid diff size exceeding max pebble batch size limit, force flush buffer to base + log.Info("node buffer size exceeds 3GB", "node buffer size", nf.size) + nf.diffToBase(true) + } else { + nf.diffToBase(false) } - nbl.diffToBase() - log.Info("Succeed to add diff layer", "base_size", nbl.base.size, "tail_state_id", nbl.tail.id, - "head_state_id", nbl.head.id, "nbl_layers", nbl.layers, "base_layers", nbl.base.layers) - return nbl, nil + log.Info("After diffToBase", "base_size", nf.base.size, "tail_state_id", nf.tail.id, + "head_state_id", nf.head.id, "nbl_layers", nf.layers, "base_layers", nf.base.layers, "nf_count", nf.count, "node_buffer_size", nf.size) + nf.backgroundFlush() + + log.Info("Succeed to recover node buffer list", "base_size", nf.base.size, "tail_state_id", nf.tail.id, + "head_state_id", nf.head.id, "nbl_layers", nf.layers, "base_layers", nf.base.layers, "nf_count", nf.count, "node_buffer_size", nf.size) + return nil } // linkMultiDiffLayers links specified amount of multiDiffLayers for recovering @@ -687,15 +694,18 @@ func (nf *nodebufferlist) traverseReverse(cb func(*multiDifflayer) bool) { // diffToBase calls traverseReverse and merges the multiDifflayer's nodes to // base node buffer, if up to limit size and flush to disk. It is called // periodically in the background -func (nf *nodebufferlist) diffToBase() { +func (nf *nodebufferlist) diffToBase(skipCountCheck bool) { commitFunc := func(buffer *multiDifflayer) bool { if nf.base.size >= nf.base.limit { log.Debug("base node buffer need write disk immediately") return false } - if nf.count <= nf.rsevMdNum { - log.Debug("node buffer list less, waiting more difflayer to be committed") - return false + if !skipCountCheck { + // when using fast recovery, force flush buffer to base to avoid exceeding pebble batch size limit + if nf.count <= nf.rsevMdNum { + log.Debug("node buffer list less, waiting more difflayer to be committed") + return false + } } if buffer.block%nf.dlInMd != 0 { log.Crit("committed block number misaligned", "block", buffer.block) @@ -815,7 +825,7 @@ func (nf *nodebufferlist) loop() { if nf.isFlushing.Swap(true) { continue } - nf.diffToBase() + nf.diffToBase(false) if nf.base.size >= nf.base.limit { nf.backgroundFlush() } @@ -861,8 +871,8 @@ func (nf *nodebufferlist) proposedBlockReader(blockRoot common.Hash) (layer, err func (nf *nodebufferlist) report() { context := []interface{}{ "number", nf.block, "count", nf.count, "layers", nf.layers, - "stateid", nf.stateId, "persist", nf.persistID, "size", common.StorageSize(nf.size), - "basesize", common.StorageSize(nf.base.size), "baselayers", nf.base.layers, + "state_id", nf.stateId, "persist", nf.persistID, "size", common.StorageSize(nf.size), + "base_size", common.StorageSize(nf.base.size), "base_layers", nf.base.layers, } log.Info("node buffer list info", context...) }