diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 5f9fea04941..6d3321c91df 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -940,6 +940,18 @@ var ( Usage: "set the cors' allow origins", Value: cli.NewStringSlice(), } + + // Witness stage + EnableWitnessGenerationFlag = cli.BoolFlag{ + Name: "witness", + Usage: "Enables the witness generation stage", + Value: true, + } + MaxWitnessLimitFlag = cli.Uint64Flag{ + Name: "witness.limit", + Usage: "Maximum number of witness allowed to store at a time", + Value: 10_000, + } ) var MetricFlags = []cli.Flag{&MetricsEnabledFlag, &MetricsHTTPFlag, &MetricsPortFlag} @@ -1594,6 +1606,11 @@ func setSilkworm(ctx *cli.Context, cfg *ethconfig.Config) { cfg.SilkwormSentry = ctx.Bool(SilkwormSentryFlag.Name) } +func setWitness(ctx *cli.Context, cfg *ethconfig.Config) { + cfg.EnableWitnessGeneration = ctx.Bool(EnableWitnessGenerationFlag.Name) + cfg.MaxWitnessLimit = ctx.Uint64(MaxWitnessLimitFlag.Name) +} + // CheckExclusive verifies that only a single instance of the provided flags was // set by the user. Each flag might optionally be followed by a string type to // specialize it further. @@ -1715,6 +1732,7 @@ func SetEthConfig(ctx *cli.Context, nodeConfig *nodecfg.Config, cfg *ethconfig.C log.Error("Failed to set beacon API", "err", err) } setCaplin(ctx, cfg) + setWitness(ctx, cfg) cfg.Ethstats = ctx.String(EthStatsURLFlag.Name) cfg.HistoryV3 = ctx.Bool(HistoryV3Flag.Name) diff --git a/core/state/database.go b/core/state/database.go index 669ed7d760f..163f159c16e 100644 --- a/core/state/database.go +++ b/core/state/database.go @@ -685,7 +685,7 @@ func (tds *TrieDbState) resolveAccountAndStorageTouches(accountTouches common.Ha } if err := tds.t.HookSubTries(subTries, hooks); err != nil { for i, hash := range subTries.Hashes { - log.Error("Info for error", "dbPrefix", fmt.Sprintf("%x", dbPrefixes[i]), "fixedbits", fixedbits[i], "hash", hash) + log.Error("Info for error", "dbPrefix", fmt.Sprintf("%x", dbPrefixes[i]), "fixedbits", fixedbits[i], "hash", hash, "err", err) } return err } diff --git a/erigon-lib/kv/tables.go b/erigon-lib/kv/tables.go index 78aec94f6e6..e8bba3462c2 100644 --- a/erigon-lib/kv/tables.go +++ b/erigon-lib/kv/tables.go @@ -232,7 +232,7 @@ Invariants: const TrieOfAccounts = "TrieAccount" const TrieOfStorage = "TrieStorage" const IntermediateTrieHash = "IntermediateTrieHash" -const Witnesses = "witnesses" +const Witnesses = "witnesses" // block_num_u64 + "_chunk_" + chunk_num_u64 -> witness // Mapping [block number] => [Verkle Root] const VerkleRoots = "VerkleRoots" diff --git a/eth/ethconfig/config.go b/eth/ethconfig/config.go index 0eab8bdc045..b57aa5745c3 100644 --- a/eth/ethconfig/config.go +++ b/eth/ethconfig/config.go @@ -230,6 +230,13 @@ type Config struct { // New DB and Snapshots format of history allows: parallel blocks execution, get state as of given transaction without executing whole block.", HistoryV3 bool + // EnableWitnessGeneration if true enables the witness generation stage post block execution + EnableWitnessGeneration bool + + // MaxWitnessLimit denotes the maximum number of witness allowed to be stored at a time. As new + // witnesses are added, the older ones are pruned to maintain the limit. + MaxWitnessLimit uint64 + // URL to connect to Heimdall node HeimdallURL string diff --git a/eth/stagedsync/default_stages.go b/eth/stagedsync/default_stages.go index 08dfdd8430e..207a3994e81 100644 --- a/eth/stagedsync/default_stages.go +++ b/eth/stagedsync/default_stages.go @@ -20,6 +20,7 @@ func DefaultStages(ctx context.Context, bodies BodiesCfg, senders SendersCfg, exec ExecuteBlockCfg, + witness WitnessCfg, hashState HashStateCfg, trieCfg TrieCfg, history HistoryCfg, @@ -130,6 +131,19 @@ func DefaultStages(ctx context.Context, return PruneExecutionStage(p, tx, exec, ctx, firstCycle) }, }, + { + ID: stages.Witness, + Description: "Generate witness of the executed block", + Forward: func(firstCycle bool, badBlockUnwind bool, s *StageState, u Unwinder, txc wrap.TxContainer, logger log.Logger) error { + return SpawnWitnessStage(s, txc.Tx, witness, ctx, logger) + }, + Unwind: func(firstCycle bool, u *UnwindState, s *StageState, txc wrap.TxContainer, logger log.Logger) error { + return UnwindWitnessStage() + }, + Prune: func(firstCycle bool, p *PruneState, tx kv.RwTx, logger log.Logger) error { + return PruneWitnessStage() + }, + }, { ID: stages.HashState, Description: "Hash the key in the state", diff --git a/eth/stagedsync/stage_witness.go b/eth/stagedsync/stage_witness.go new file mode 100644 index 00000000000..7cfeb1370db --- /dev/null +++ b/eth/stagedsync/stage_witness.go @@ -0,0 +1,519 @@ +package stagedsync + +import ( + "bytes" + "context" + "fmt" + + "github.com/holiman/uint256" + "github.com/ledgerwatch/erigon-lib/chain" + libcommon "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/common/datadir" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon-lib/kv/membatchwithdb" + "github.com/ledgerwatch/erigon/consensus" + "github.com/ledgerwatch/erigon/core" + "github.com/ledgerwatch/erigon/core/state" + "github.com/ledgerwatch/erigon/core/types" + "github.com/ledgerwatch/erigon/core/vm" + "github.com/ledgerwatch/erigon/eth/stagedsync/stages" + "github.com/ledgerwatch/erigon/turbo/rpchelper" + "github.com/ledgerwatch/erigon/turbo/services" + "github.com/ledgerwatch/erigon/turbo/trie" + "github.com/ledgerwatch/log/v3" +) + +type WitnessCfg struct { + db kv.RwDB + enableWitnessGeneration bool + maxWitnessLimit uint64 + chainConfig *chain.Config + engine consensus.Engine + blockReader services.FullBlockReader + dirs datadir.Dirs +} + +type WitnessStore struct { + Tds *state.TrieDbState + TrieStateWriter *state.TrieStateWriter + Statedb *state.IntraBlockState + ChainReader *ChainReaderImpl + GetHashFn func(n uint64) libcommon.Hash +} + +func StageWitnessCfg(db kv.RwDB, enableWitnessGeneration bool, maxWitnessLimit uint64, chainConfig *chain.Config, engine consensus.Engine, blockReader services.FullBlockReader, dirs datadir.Dirs) WitnessCfg { + return WitnessCfg{ + db: db, + enableWitnessGeneration: enableWitnessGeneration, + maxWitnessLimit: maxWitnessLimit, + chainConfig: chainConfig, + engine: engine, + blockReader: blockReader, + dirs: dirs, + } +} + +func SpawnWitnessStage(s *StageState, rootTx kv.RwTx, cfg WitnessCfg, ctx context.Context, logger log.Logger) error { + if !cfg.enableWitnessGeneration { + logger.Debug(fmt.Sprintf("[%s] Skipping Witness Generation", s.LogPrefix())) + return nil + } + + useExternalTx := rootTx != nil + if !useExternalTx { + var err error + rootTx, err = cfg.db.BeginRw(context.Background()) + if err != nil { + return err + } + defer rootTx.Rollback() + } + + // We'll need to use `rootTx` to write witness. As during rewind + // the tx is updated to an in-memory batch, we'll operate on the copy + // to keep the `rootTx` as it is. + tx := rootTx + + logPrefix := s.LogPrefix() + execStageBlock, err := s.ExecutionAt(tx) + if err != nil { + return err + } + + lastWitnessBlock := s.BlockNumber + if lastWitnessBlock >= execStageBlock { + // We already did witness generation for this block + return nil + } + + var from, to uint64 + + // Skip witness generation for past blocks. This can happen when we're upgrading + // the node to this version having witness stage or when witness stage is disabled + // and then enabled again. + if lastWitnessBlock == 0 { + s.Update(tx, execStageBlock-1) + return nil + } + + // We'll generate witness for all blocks from `lastWitnessBlock+1` until `execStageBlock - 1` + to = execStageBlock - 1 + from = lastWitnessBlock + 1 + if to <= 0 { + return nil + } + + // We only need to store last `maxWitnessLimit` witnesses. As during sync, we + // can do batch imports, trim down the blocks until this limit. + if to-from+1 > cfg.maxWitnessLimit { + from = to - cfg.maxWitnessLimit + 1 + } + + rl := trie.NewRetainList(0) + batch := membatchwithdb.NewMemoryBatch(tx, "", logger) + defer batch.Rollback() + + logger.Info(fmt.Sprintf("[%s] Witness Generation", logPrefix), "from", from, "to", to) + + for blockNr := from; blockNr <= to; blockNr++ { + tx := rootTx + + block, err := cfg.blockReader.BlockByNumber(ctx, tx, blockNr) + if err != nil { + return err + } + if block == nil { + return fmt.Errorf("block %d not found while generating witness", blockNr) + } + + prevHeader, err := cfg.blockReader.HeaderByNumber(ctx, tx, blockNr-1) + if err != nil { + return err + } + + batch, rl, err = RewindStagesForWitness(batch, blockNr, &cfg, false, ctx, logger) + if err != nil { + return err + } + + // Update the tx to operate on the in-memory batch + tx = batch + + store, err := PrepareForWitness(tx, block, prevHeader.Root, rl, &cfg, ctx, logger) + if err != nil { + return err + } + + w, txTds, err := GenerateWitness(tx, block, prevHeader, true, 0, store.Tds, store.TrieStateWriter, store.Statedb, store.GetHashFn, &cfg, false, ctx, logger) + if err != nil { + return err + } + if w == nil { + return fmt.Errorf("unable to generate witness for block %d", blockNr) + } + + var buf bytes.Buffer + _, err = w.WriteInto(&buf) + if err != nil { + return err + } + + _, err = VerifyWitness(tx, block, prevHeader, true, 0, store.ChainReader, store.Tds, txTds, store.GetHashFn, &cfg, &buf, logger) + if err != nil { + return fmt.Errorf("error verifying witness for block %d: %v", blockNr, err) + } + + // Check if we already have a witness for the same block (can happen during a reorg) + exist, _ := HasWitness(rootTx, kv.Witnesses, Uint64ToBytes(blockNr)) + if exist { + logger.Debug("Deleting witness chunks for existing block", "block", blockNr) + err = DeleteChunks(rootTx, kv.Witnesses, Uint64ToBytes(blockNr)) + if err != nil { + return fmt.Errorf("error deletig witness for block %d: %v", blockNr, err) + } + } + + // Write the witness buffer against the corresponding block number + logger.Debug("Writing witness to db", "block", blockNr) + err = WriteChunks(rootTx, kv.Witnesses, Uint64ToBytes(blockNr), buf.Bytes()) + if err != nil { + return fmt.Errorf("error writing witness for block %d: %v", blockNr, err) + } + + // Update the stage with the latest block number + s.Update(rootTx, blockNr) + + // If we're overlimit, delete oldest witness + oldestWitnessBlock, _ := FindOldestWitness(rootTx, kv.Witnesses) + if blockNr-oldestWitnessBlock+1 > cfg.maxWitnessLimit { + // If the user reduces `witness.limit`, we'll need to delete witnesses more than just oldest + deleteFrom := oldestWitnessBlock + deleteTo := blockNr - cfg.maxWitnessLimit + logger.Debug("Reached max witness limit, deleting oldest witness", "from", deleteFrom, "to", deleteTo) + for i := deleteFrom; i <= deleteTo; i++ { + err = DeleteChunks(rootTx, kv.Witnesses, Uint64ToBytes(i)) + if err != nil { + return fmt.Errorf("error deleting witness for block %d: %v", i, err) + } + } + } + + logger.Info(fmt.Sprintf("[%s] Generated witness", logPrefix), "block", blockNr, "len", len(buf.Bytes())) + } + + logger.Info(fmt.Sprintf("[%s] Done Witness Generation", logPrefix), "until", to) + + return nil +} + +// PrepareForWitness abstracts the process of initialising bunch of necessary things required for witness +// generation and puts them in a WitnessStore. +func PrepareForWitness(tx kv.Tx, block *types.Block, prevRoot libcommon.Hash, rl *trie.RetainList, cfg *WitnessCfg, ctx context.Context, logger log.Logger) (*WitnessStore, error) { + blockNr := block.NumberU64() + reader, err := rpchelper.CreateHistoryStateReader(tx, blockNr, 0, false, cfg.chainConfig.ChainName) + if err != nil { + return nil, err + } + + tds := state.NewTrieDbState(prevRoot, tx, blockNr-1, reader) + tds.SetRetainList(rl) + tds.SetResolveReads(true) + + tds.StartNewBuffer() + trieStateWriter := tds.TrieStateWriter() + + statedb := state.New(tds) + statedb.SetDisableBalanceInc(true) + + chainReader := NewChainReaderImpl(cfg.chainConfig, tx, cfg.blockReader, logger) + if err := core.InitializeBlockExecution(cfg.engine, chainReader, block.Header(), cfg.chainConfig, statedb, trieStateWriter, nil); err != nil { + return nil, err + } + + getHeader := func(hash libcommon.Hash, number uint64) *types.Header { + h, e := cfg.blockReader.Header(ctx, tx, hash, number) + if e != nil { + log.Error("getHeader error", "number", number, "hash", hash, "err", e) + } + return h + } + getHashFn := core.GetHashFn(block.Header(), getHeader) + + return &WitnessStore{ + Tds: tds, + TrieStateWriter: trieStateWriter, + Statedb: statedb, + ChainReader: chainReader, + GetHashFn: getHashFn, + }, nil +} + +// RewindStagesForWitness rewinds the 'HashState' and 'IntermediateHashes' stages to previous block. +func RewindStagesForWitness(batch *membatchwithdb.MemoryMutation, blockNr uint64, cfg *WitnessCfg, regenerateHash bool, ctx context.Context, logger log.Logger) (*membatchwithdb.MemoryMutation, *trie.RetainList, error) { + rl := trie.NewRetainList(0) + + // Rewind the 'HashState' and 'IntermediateHashes' stages to previous block + unwindState := &UnwindState{ID: stages.HashState, UnwindPoint: blockNr - 1} + stageState := &StageState{ID: stages.HashState, BlockNumber: blockNr} + + hashStageCfg := StageHashStateCfg(nil, cfg.dirs, false) + if err := UnwindHashStateStage(unwindState, stageState, batch, hashStageCfg, ctx, logger); err != nil { + return nil, nil, err + } + + unwindState = &UnwindState{ID: stages.IntermediateHashes, UnwindPoint: blockNr - 1} + stageState = &StageState{ID: stages.IntermediateHashes, BlockNumber: blockNr} + + if !regenerateHash { + interHashStageCfg := StageTrieCfg(nil, false, false, false, "", cfg.blockReader, nil, false, nil) + err := UnwindIntermediateHashes("eth_getWitness", rl, unwindState, stageState, batch, interHashStageCfg, ctx.Done(), logger) + if err != nil { + return nil, nil, err + } + } else { + _ = batch.ClearBucket(kv.TrieOfAccounts) + _ = batch.ClearBucket(kv.TrieOfStorage) + } + + return batch, rl, nil +} + +// GenerateWitness does the core witness generation part by re-executing transactions in the block. It +// assumes that the 'HashState' and 'IntermediateHashes' stages are already rewinded to the previous block. +func GenerateWitness(tx kv.Tx, block *types.Block, prevHeader *types.Header, fullBlock bool, txIndex uint64, tds *state.TrieDbState, trieStateWriter *state.TrieStateWriter, statedb *state.IntraBlockState, getHashFn func(n uint64) libcommon.Hash, cfg *WitnessCfg, regenerateHash bool, ctx context.Context, logger log.Logger) (*trie.Witness, *state.TrieDbState, error) { + blockNr := block.NumberU64() + usedGas := new(uint64) + usedBlobGas := new(uint64) + gp := new(core.GasPool).AddGas(block.GasLimit()).AddBlobGas(cfg.chainConfig.GetMaxBlobGasPerBlock()) + var receipts types.Receipts + + if len(block.Transactions()) == 0 { + statedb.GetBalance(libcommon.HexToAddress("0x1234")) + } + + vmConfig := vm.Config{} + + loadFunc := func(loader *trie.SubTrieLoader, rl *trie.RetainList, dbPrefixes [][]byte, fixedbits []int, accountNibbles [][]byte) (trie.SubTries, error) { + rl.Rewind() + receiver := trie.NewSubTrieAggregator(nil, nil, false) + receiver.SetRetainList(rl) + pr := trie.NewMultiAccountProofRetainer(rl) + pr.AccHexKeys = accountNibbles + receiver.SetProofRetainer(pr) + + loaderRl := rl + if regenerateHash { + loaderRl = trie.NewRetainList(0) + } + subTrieloader := trie.NewFlatDBTrieLoader[trie.SubTries]("eth_getWitness", loaderRl, nil, nil, false, receiver) + subTries, err := subTrieloader.Result(tx, nil) + + rl.Rewind() + + if err != nil { + return receiver.EmptyResult(), err + } + + err = trie.AttachRequestedCode(tx, loader.CodeRequests()) + + if err != nil { + return receiver.EmptyResult(), err + } + + // Reverse the subTries.Hashes and subTries.roots + for i, j := 0, len(subTries.Hashes)-1; i < j; i, j = i+1, j-1 { + subTries.Hashes[i], subTries.Hashes[j] = subTries.Hashes[j], subTries.Hashes[i] + subTries.Roots()[i], subTries.Roots()[j] = subTries.Roots()[j], subTries.Roots()[i] + } + + return subTries, nil + } + + var txTds *state.TrieDbState + + for i, txn := range block.Transactions() { + statedb.SetTxContext(txn.Hash(), block.Hash(), i) + + // Ensure that the access list is loaded into witness + for _, a := range txn.GetAccessList() { + statedb.GetBalance(a.Address) + + for _, k := range a.StorageKeys { + v := uint256.NewInt(0) + statedb.GetState(a.Address, &k, v) + } + } + + receipt, _, err := core.ApplyTransaction(cfg.chainConfig, getHashFn, cfg.engine, nil, gp, statedb, trieStateWriter, block.Header(), txn, usedGas, usedBlobGas, vmConfig) + if err != nil { + return nil, nil, err + } + + if !fullBlock && i == int(txIndex) { + txTds = tds.WithLastBuffer() + break + } + + if !cfg.chainConfig.IsByzantium(block.NumberU64()) || (!fullBlock && i+1 == int(txIndex)) { + tds.StartNewBuffer() + } + + receipts = append(receipts, receipt) + } + + if fullBlock { + if _, _, _, err := cfg.engine.FinalizeAndAssemble(cfg.chainConfig, block.Header(), statedb, block.Transactions(), block.Uncles(), receipts, block.Withdrawals(), nil, nil, nil, nil); err != nil { + fmt.Printf("Finalize of block %d failed: %v\n", blockNr, err) + return nil, nil, err + } + + statedb.FinalizeTx(cfg.chainConfig.Rules(block.NumberU64(), block.Header().Time), trieStateWriter) + } + + triePreroot := tds.LastRoot() + + if fullBlock && !bytes.Equal(prevHeader.Root[:], triePreroot[:]) { + return nil, nil, fmt.Errorf("mismatch in expected state root computed %v vs %v indicates bug in witness implementation", prevHeader.Root, triePreroot) + } + + if err := tds.ResolveStateTrieWithFunc(loadFunc); err != nil { + return nil, nil, err + } + + w, err := tds.ExtractWitness(false, false) + if err != nil { + return nil, nil, err + } + + return w, txTds, nil +} + +// VerifyWitness verifies if the correct state trie can be re-generated by the witness (prepared earlier). +func VerifyWitness(tx kv.Tx, block *types.Block, prevHeader *types.Header, fullBlock bool, txIndex uint64, chainReader *ChainReaderImpl, tds *state.TrieDbState, txTds *state.TrieDbState, getHashFn func(n uint64) libcommon.Hash, cfg *WitnessCfg, buf *bytes.Buffer, logger log.Logger) (*bytes.Buffer, error) { + blockNr := block.NumberU64() + nw, err := trie.NewWitnessFromReader(bytes.NewReader(buf.Bytes()), false) + if err != nil { + return nil, err + } + + s, err := state.NewStateless(prevHeader.Root, nw, blockNr-1, false, false /* is binary */) + if err != nil { + return nil, err + } + ibs := state.New(s) + s.SetBlockNr(blockNr) + + gp := new(core.GasPool).AddGas(block.GasLimit()).AddBlobGas(cfg.chainConfig.GetMaxBlobGasPerBlock()) + usedGas := new(uint64) + usedBlobGas := new(uint64) + receipts := types.Receipts{} + vmConfig := vm.Config{} + + if err := core.InitializeBlockExecution(cfg.engine, chainReader, block.Header(), cfg.chainConfig, ibs, s, nil); err != nil { + return nil, err + } + header := block.Header() + + for i, txn := range block.Transactions() { + if !fullBlock && i == int(txIndex) { + s.Finalize() + break + } + + ibs.SetTxContext(txn.Hash(), block.Hash(), i) + receipt, _, err := core.ApplyTransaction(cfg.chainConfig, getHashFn, cfg.engine, nil, gp, ibs, s, header, txn, usedGas, usedBlobGas, vmConfig) + if err != nil { + return nil, fmt.Errorf("tx %x failed: %v", txn.Hash(), err) + } + receipts = append(receipts, receipt) + } + + if !fullBlock { + err = txTds.ResolveStateTrieWithFunc( + func(loader *trie.SubTrieLoader, rl *trie.RetainList, dbPrefixes [][]byte, fixedbits []int, accountNibbles [][]byte) (trie.SubTries, error) { + return trie.SubTries{}, nil + }, + ) + + if err != nil { + return nil, err + } + + rl := txTds.GetRetainList() + + w, err := s.GetTrie().ExtractWitness(false, rl) + + if err != nil { + return nil, err + } + + var buf bytes.Buffer + _, err = w.WriteInto(&buf) + if err != nil { + return nil, err + } + + return &buf, nil + } + + receiptSha := types.DeriveSha(receipts) + if !vmConfig.StatelessExec && cfg.chainConfig.IsByzantium(block.NumberU64()) && !vmConfig.NoReceipts && receiptSha != block.ReceiptHash() { + return nil, fmt.Errorf("mismatched receipt headers for block %d (%s != %s)", block.NumberU64(), receiptSha.Hex(), block.ReceiptHash().Hex()) + } + + if !vmConfig.StatelessExec && *usedGas != header.GasUsed { + return nil, fmt.Errorf("gas used by execution: %d, in header: %d", *usedGas, header.GasUsed) + } + + if header.BlobGasUsed != nil && *usedBlobGas != *header.BlobGasUsed { + return nil, fmt.Errorf("blob gas used by execution: %d, in header: %d", *usedBlobGas, *header.BlobGasUsed) + } + + var bloom types.Bloom + if !vmConfig.NoReceipts { + bloom = types.CreateBloom(receipts) + if !vmConfig.StatelessExec && bloom != header.Bloom { + return nil, fmt.Errorf("bloom computed by execution: %x, in header: %x", bloom, header.Bloom) + } + } + + if !vmConfig.ReadOnly { + _, _, _, err := cfg.engine.FinalizeAndAssemble(cfg.chainConfig, block.Header(), ibs, block.Transactions(), block.Uncles(), receipts, block.Withdrawals(), nil, nil, nil, nil) + if err != nil { + return nil, err + } + + rules := cfg.chainConfig.Rules(block.NumberU64(), header.Time) + + ibs.FinalizeTx(rules, s) + + if err := ibs.CommitBlock(rules, s); err != nil { + return nil, fmt.Errorf("committing block %d failed: %v", block.NumberU64(), err) + } + } + + if err = s.CheckRoot(header.Root); err != nil { + return nil, err + } + + roots, err := tds.UpdateStateTrie() + if err != nil { + return nil, err + } + + if roots[len(roots)-1] != block.Root() { + return nil, fmt.Errorf("mismatch in expected state root computed %v vs %v indicates bug in witness implementation", roots[len(roots)-1], block.Root()) + } + + return buf, nil +} + +// TODO: Implement +func UnwindWitnessStage() error { + return nil +} + +// TODO: Implement +func PruneWitnessStage() error { + return nil +} diff --git a/eth/stagedsync/stagedsynctest/harness.go b/eth/stagedsync/stagedsynctest/harness.go index b8808b05d37..32a1c5b394a 100644 --- a/eth/stagedsync/stagedsynctest/harness.go +++ b/eth/stagedsync/stagedsynctest/harness.go @@ -70,6 +70,7 @@ func InitHarness(ctx context.Context, t *testing.T, cfg HarnessCfg) Harness { stagedsync.BodiesCfg{}, stagedsync.SendersCfg{}, stagedsync.ExecuteBlockCfg{}, + stagedsync.WitnessCfg{}, stagedsync.HashStateCfg{}, stagedsync.TrieCfg{}, stagedsync.HistoryCfg{}, diff --git a/eth/stagedsync/stages/stages.go b/eth/stagedsync/stages/stages.go index c6734f3923e..6ce43f61d35 100644 --- a/eth/stagedsync/stages/stages.go +++ b/eth/stagedsync/stages/stages.go @@ -39,6 +39,7 @@ var ( Execution SyncStage = "Execution" // Executing each block w/o buildinf a trie Translation SyncStage = "Translation" // Translation each marked for translation contract (from EVM to TEVM) VerkleTrie SyncStage = "VerkleTrie" + Witness SyncStage = "Witness" // Generating witness of the executed block IntermediateHashes SyncStage = "IntermediateHashes" // Generate intermediate hashes, calculate the state root hash HashState SyncStage = "HashState" // Apply Keccak256 to all the keys in the state AccountHistoryIndex SyncStage = "AccountHistoryIndex" // Generating history index for accounts diff --git a/eth/stagedsync/witness_util.go b/eth/stagedsync/witness_util.go new file mode 100644 index 00000000000..d842ac792a2 --- /dev/null +++ b/eth/stagedsync/witness_util.go @@ -0,0 +1,247 @@ +package stagedsync + +import ( + "bytes" + "context" + "encoding/binary" + "encoding/csv" + "fmt" + "strconv" + + "github.com/ledgerwatch/erigon-lib/common" + "github.com/ledgerwatch/erigon-lib/kv" + "github.com/ledgerwatch/erigon/turbo/trie" +) + +type WitnessDBWriter struct { + storage kv.RwDB + statsWriter *csv.Writer +} + +func NewWitnessDBWriter(storage kv.RwDB, statsWriter *csv.Writer) (*WitnessDBWriter, error) { + err := statsWriter.Write([]string{ + "blockNum", "maxTrieSize", "witnessesSize", + }) + if err != nil { + return nil, err + } + return &WitnessDBWriter{storage, statsWriter}, nil +} + +const chunkSize = 100000 // 100KB + +func WriteChunks(tx kv.RwTx, tableName string, key []byte, valueBytes []byte) error { + // Split the valueBytes into chunks and write each chunk + for i := 0; i < len(valueBytes); i += chunkSize { + end := i + chunkSize + if end > len(valueBytes) { + end = len(valueBytes) + } + chunk := valueBytes[i:end] + chunkKey := append(key, []byte("_chunk_"+strconv.Itoa(i/chunkSize))...) + + // Write each chunk to the KV store + if err := tx.Put(tableName, chunkKey, chunk); err != nil { + return err + } + } + + return nil +} + +func ReadChunks(tx kv.Tx, tableName string, key []byte) ([]byte, error) { + // Initialize a buffer to store the concatenated chunks + var result []byte + + // Retrieve and concatenate each chunk + for i := 0; ; i++ { + chunkKey := append(key, []byte("_chunk_"+strconv.Itoa(i))...) + chunk, err := tx.GetOne(tableName, chunkKey) + if err != nil { + return nil, err + } + + // Check if this is the last chunk + if len(chunk) == 0 { + break + } + + // Append the chunk to the result + result = append(result, chunk...) + } + + return result, nil +} + +// HasWitness returns whether a witness exists for the given key or not +func HasWitness(tx kv.Tx, tableName string, key []byte) (bool, error) { + firstChunkKey := append(key, []byte("_chunk_0")...) + chunk, err := tx.GetOne(tableName, firstChunkKey) + if err != nil { + return false, err + } + + if len(chunk) == 0 { + return false, nil + } + + return true, nil +} + +// DeleteChunks deletes all the chunks present with prefix `key` +// TODO: Try to see if this can be optimised by using tx.ForEach +// and iterate over each element with prefix `key` +func DeleteChunks(tx kv.RwTx, tableName string, key []byte) error { + for i := 0; ; i++ { + chunkKey := append(key, []byte("_chunk_"+strconv.Itoa(i))...) + chunk, err := tx.GetOne(tableName, chunkKey) + if err != nil { + return err + } + + err = tx.Delete(tableName, chunkKey) + if err != nil { + return err + } + + // Check if this is the last chunk + if len(chunk) == 0 { + break + } + } + + return nil +} + +// FindOldestWitness returns the block number of the oldest stored block +func FindOldestWitness(tx kv.Tx, tableName string) (uint64, error) { + cursor, err := tx.Cursor(tableName) + if err != nil { + return 0, err + } + + k, _, err := cursor.First() + if err != nil { + return 0, err + } + + return BytesToUint64(k), nil +} + +func (db *WitnessDBWriter) MustUpsertOneWitness(blockNumber uint64, witness *trie.Witness) { + k := make([]byte, 8) + + binary.LittleEndian.PutUint64(k[:], blockNumber) + + var buf bytes.Buffer + _, err := witness.WriteInto(&buf) + if err != nil { + panic(fmt.Sprintf("error extracting witness for block %d: %v\n", blockNumber, err)) + } + + wb := buf.Bytes() + + tx, err := db.storage.BeginRw(context.Background()) + if err != nil { + panic(fmt.Errorf("error opening tx: %w", err)) + } + + defer tx.Rollback() + + fmt.Printf("Size of witness: %d\n", len(wb)) + + err = WriteChunks(tx, kv.Witnesses, k, common.CopyBytes(wb)) + + tx.Commit() + + if err != nil { + panic(fmt.Errorf("error while upserting witness: %w", err)) + } +} + +func (db *WitnessDBWriter) MustUpsert(blockNumber uint64, maxTrieSize uint32, resolveWitnesses []*trie.Witness) { + key := deriveDbKey(blockNumber, maxTrieSize) + + var buf bytes.Buffer + + for i, witness := range resolveWitnesses { + if _, err := witness.WriteInto(&buf); err != nil { + panic(fmt.Errorf("error while writing witness to a buffer: %w", err)) + } + if i < len(resolveWitnesses)-1 { + buf.WriteByte(byte(trie.OpNewTrie)) + } + } + + bytes := buf.Bytes() + + tx, err := db.storage.BeginRw(context.Background()) + if err != nil { + panic(fmt.Errorf("error opening tx: %w", err)) + } + + defer tx.Rollback() + + err = tx.Put(kv.Witnesses, common.CopyBytes(key), common.CopyBytes(bytes)) + + tx.Commit() + + if err != nil { + panic(fmt.Errorf("error while upserting witness: %w", err)) + } + + err = db.statsWriter.Write([]string{ + fmt.Sprintf("%v", blockNumber), + fmt.Sprintf("%v", maxTrieSize), + fmt.Sprintf("%v", len(bytes)), + }) + + if err != nil { + panic(fmt.Errorf("error while writing stats: %w", err)) + } + + db.statsWriter.Flush() +} + +type WitnessDBReader struct { + db kv.RwDB +} + +func NewWitnessDBReader(db kv.RwDB) *WitnessDBReader { + return &WitnessDBReader{db} +} + +func (db *WitnessDBReader) GetWitnessesForBlock(blockNumber uint64, maxTrieSize uint32) ([]byte, error) { + key := deriveDbKey(blockNumber, maxTrieSize) + + tx, err := db.db.BeginRo(context.Background()) + if err != nil { + panic(fmt.Errorf("error opening tx: %w", err)) + } + + defer tx.Rollback() + + return tx.GetOne(kv.Witnesses, key) +} + +func deriveDbKey(blockNumber uint64, maxTrieSize uint32) []byte { + buffer := make([]byte, 8+4) + + binary.LittleEndian.PutUint64(buffer[:], blockNumber) + binary.LittleEndian.PutUint32(buffer[8:], maxTrieSize) + + return buffer +} + +func BytesToUint64(b []byte) uint64 { + if len(b) < 8 { + return 0 + } + return binary.BigEndian.Uint64(b) +} + +func Uint64ToBytes(i uint64) []byte { + buf := make([]byte, 8) + binary.BigEndian.PutUint64(buf, i) + return buf +} diff --git a/turbo/cli/default_flags.go b/turbo/cli/default_flags.go index 96ea28151af..2645b48d7ee 100644 --- a/turbo/cli/default_flags.go +++ b/turbo/cli/default_flags.go @@ -191,4 +191,7 @@ var DefaultFlags = []cli.Flag{ &SyncLoopBlockLimitFlag, &SyncLoopBreakAfterFlag, &SyncLoopPruneLimitFlag, + + &utils.EnableWitnessGenerationFlag, + &utils.MaxWitnessLimitFlag, } diff --git a/turbo/jsonrpc/eth_call.go b/turbo/jsonrpc/eth_call.go index 3469862781a..1e57ae87850 100644 --- a/turbo/jsonrpc/eth_call.go +++ b/turbo/jsonrpc/eth_call.go @@ -425,7 +425,6 @@ func (api *BaseAPI) getWitness(ctx context.Context, db kv.RoDB, blockNrOrHash rp } blockNr, hash, _, err := rpchelper.GetCanonicalBlockNumber(blockNrOrHash, tx, api.filters) // DoCall cannot be executed on non-canonical blocks - if err != nil { return nil, err } @@ -455,11 +454,6 @@ func (api *BaseAPI) getWitness(ctx context.Context, db kv.RoDB, blockNrOrHash rp return nil, fmt.Errorf("transaction index out of bounds") } - prevHeader, err := api._blockReader.HeaderByNumber(ctx, tx, blockNr-1) - if err != nil { - return nil, err - } - latestBlock, err := rpchelper.GetLatestBlockNumber(tx) if err != nil { return nil, err @@ -470,307 +464,74 @@ func (api *BaseAPI) getWitness(ctx context.Context, db kv.RoDB, blockNrOrHash rp return nil, fmt.Errorf("block number is in the future latest=%d requested=%d", latestBlock, blockNr) } - rl := trie.NewRetainList(0) - - regenerate_hash := false - if latestBlock-blockNr > uint64(maxGetProofRewindBlockCount) { - regenerate_hash = true - } - - if blockNr-1 < latestBlock { - batch := membatchwithdb.NewMemoryBatch(tx, api.dirs.Tmp, logger) - defer batch.Rollback() - - unwindState := &stagedsync.UnwindState{UnwindPoint: blockNr - 1} - stageState := &stagedsync.StageState{BlockNumber: latestBlock} - - hashStageCfg := stagedsync.StageHashStateCfg(nil, api.dirs, api.historyV3(batch)) - if err := stagedsync.UnwindHashStateStage(unwindState, stageState, batch, hashStageCfg, ctx, logger); err != nil { - return nil, err - } - - if !regenerate_hash { - interHashStageCfg := stagedsync.StageTrieCfg(nil, false, false, false, api.dirs.Tmp, api._blockReader, nil, api.historyV3(batch), api._agg) - err = stagedsync.UnwindIntermediateHashes("eth_getWitness", rl, unwindState, stageState, batch, interHashStageCfg, ctx.Done(), logger) - if err != nil { - return nil, err - } - } else { - _ = batch.ClearBucket(kv.TrieOfAccounts) - _ = batch.ClearBucket(kv.TrieOfStorage) + // Check if the witness is present in the database. If found, return it. + if fullBlock { + wBytes, err := stagedsync.ReadChunks(tx, kv.Witnesses, stagedsync.Uint64ToBytes(blockNr)) + if err == nil && wBytes != nil { + logger.Debug("Returning witness found in db", "blockNr", blockNr) + return wBytes, nil } - tx = batch - } - - chainConfig, err := api.chainConfig(tx) - if err != nil { - return nil, err + logger.Debug("Witness unavailable in db, calculating", "blockNr", blockNr) } - header := block.Header() - - reader, err := rpchelper.CreateHistoryStateReader(tx, blockNr, 0, false, chainConfig.ChainName) + // Compute the witness if it's for a tx or it's not present in db + prevHeader, err := api._blockReader.HeaderByNumber(ctx, tx, blockNr-1) if err != nil { return nil, err } - tds := state.NewTrieDbState(prevHeader.Root, tx, blockNr-1, reader) - tds.SetRetainList(rl) - tds.SetResolveReads(true) - - tds.StartNewBuffer() - trieStateWriter := tds.TrieStateWriter() - - statedb := state.New(tds) - statedb.SetDisableBalanceInc(true) - - getHeader := func(hash libcommon.Hash, number uint64) *types.Header { - h, e := api._blockReader.Header(ctx, tx, hash, number) - if e != nil { - log.Error("getHeader error", "number", number, "hash", hash, "err", e) - } - return h + regenerateHash := false + if latestBlock-blockNr > uint64(maxGetProofRewindBlockCount) { + regenerateHash = true } - getHashFn := core.GetHashFn(block.Header(), getHeader) - - usedGas := new(uint64) - usedBlobGas := new(uint64) - gp := new(core.GasPool).AddGas(block.GasLimit()).AddBlobGas(chainConfig.GetMaxBlobGasPerBlock()) - var receipts types.Receipts + var rl *trie.RetainList + batch := membatchwithdb.NewMemoryBatch(tx, "", logger) + defer batch.Rollback() engine, ok := api.engine().(consensus.Engine) - if !ok { return nil, fmt.Errorf("engine is not consensus.Engine") } - chainReader := stagedsync.NewChainReaderImpl(chainConfig, tx, nil, nil) - - if err := core.InitializeBlockExecution(engine, chainReader, block.Header(), chainConfig, statedb, trieStateWriter, nil); err != nil { - if err != nil { - return nil, err - } - } - - if len(block.Transactions()) == 0 { - statedb.GetBalance(libcommon.HexToAddress("0x1234")) - } - - vmConfig := vm.Config{} - - loadFunc := func(loader *trie.SubTrieLoader, rl *trie.RetainList, dbPrefixes [][]byte, fixedbits []int, accountNibbles [][]byte) (trie.SubTries, error) { - rl.Rewind() - receiver := trie.NewSubTrieAggregator(nil, nil, false) - receiver.SetRetainList(rl) - pr := trie.NewMultiAccountProofRetainer(rl) - pr.AccHexKeys = accountNibbles - receiver.SetProofRetainer(pr) - - loaderRl := rl - if regenerate_hash { - loaderRl = trie.NewRetainList(0) - } - subTrieloader := trie.NewFlatDBTrieLoader[trie.SubTries]("eth_getWitness", loaderRl, nil, nil, false, receiver) - subTries, err := subTrieloader.Result(tx, nil) - - rl.Rewind() - - if err != nil { - return receiver.EmptyResult(), err - } - - err = trie.AttachRequestedCode(tx, loader.CodeRequests()) - - if err != nil { - return receiver.EmptyResult(), err - } - - // Reverse the subTries.Hashes and subTries.roots - for i, j := 0, len(subTries.Hashes)-1; i < j; i, j = i+1, j-1 { - subTries.Hashes[i], subTries.Hashes[j] = subTries.Hashes[j], subTries.Hashes[i] - subTries.Roots()[i], subTries.Roots()[j] = subTries.Roots()[j], subTries.Roots()[i] - } - - return subTries, nil - } - - var txTds *state.TrieDbState - - for i, txn := range block.Transactions() { - statedb.SetTxContext(txn.Hash(), block.Hash(), i) - - // Ensure that the access list is loaded into witness - for _, a := range txn.GetAccessList() { - statedb.GetBalance(a.Address) - - for _, k := range a.StorageKeys { - v := uint256.NewInt(0) - statedb.GetState(a.Address, &k, v) - } - } - - receipt, _, err := core.ApplyTransaction(chainConfig, getHashFn, api.engine(), nil, gp, statedb, trieStateWriter, block.Header(), txn, usedGas, usedBlobGas, vmConfig) - if err != nil { - return nil, err - } - - if !fullBlock && i == int(txIndex) { - txTds = tds.WithLastBuffer() - break - } - - if !chainConfig.IsByzantium(block.NumberU64()) || (!fullBlock && i+1 == int(txIndex)) { - tds.StartNewBuffer() - } - - receipts = append(receipts, receipt) - } - - if fullBlock { - if _, _, _, err = engine.FinalizeAndAssemble(chainConfig, block.Header(), statedb, block.Transactions(), block.Uncles(), receipts, block.Withdrawals(), nil, nil, nil, nil); err != nil { - fmt.Printf("Finalize of block %d failed: %v\n", blockNr, err) - return nil, err - } - - statedb.FinalizeTx(chainConfig.Rules(block.NumberU64(), block.Header().Time), trieStateWriter) - } - - triePreroot := tds.LastRoot() - - if fullBlock && !bytes.Equal(prevHeader.Root[:], triePreroot[:]) { - return nil, fmt.Errorf("mismatch in expected state root computed %v vs %v indicates bug in witness implementation", prevHeader.Root, triePreroot) - } - - if err := tds.ResolveStateTrieWithFunc(loadFunc); err != nil { - return nil, err - } - - w, err := tds.ExtractWitness(false, false) - + // Prepare witness config + chainConfig, err := api.chainConfig(tx) if err != nil { - return nil, err + return nil, fmt.Errorf("error loading chain config: %v", err) } - var buf bytes.Buffer - _, err = w.WriteInto(&buf) + cfg := stagedsync.StageWitnessCfg(nil, true, 0, chainConfig, engine, api._blockReader, api.dirs) + batch, rl, err = stagedsync.RewindStagesForWitness(batch, blockNr, &cfg, regenerateHash, ctx, logger) if err != nil { return nil, err } - nw, err := trie.NewWitnessFromReader(bytes.NewReader(buf.Bytes()), false) + // Update the tx to operate on the in-memory batch + tx = batch + store, err := stagedsync.PrepareForWitness(tx, block, prevHeader.Root, rl, &cfg, ctx, logger) if err != nil { return nil, err } - s, err := state.NewStateless(prevHeader.Root, nw, blockNr-1, false, false /* is binary */) + w, txTds, err := stagedsync.GenerateWitness(tx, block, prevHeader, fullBlock, uint64(txIndex), store.Tds, store.TrieStateWriter, store.Statedb, store.GetHashFn, &cfg, regenerateHash, ctx, logger) if err != nil { return nil, err } - ibs := state.New(s) - s.SetBlockNr(blockNr) - - gp = new(core.GasPool).AddGas(block.GasLimit()).AddBlobGas(chainConfig.GetMaxBlobGasPerBlock()) - usedGas = new(uint64) - usedBlobGas = new(uint64) - receipts = types.Receipts{} - - if err := core.InitializeBlockExecution(engine, chainReader, block.Header(), chainConfig, ibs, s, nil); err != nil { - return nil, err - } - - for i, txn := range block.Transactions() { - if !fullBlock && i == int(txIndex) { - s.Finalize() - break - } - - ibs.SetTxContext(txn.Hash(), block.Hash(), i) - receipt, _, err := core.ApplyTransaction(chainConfig, getHashFn, engine, nil, gp, ibs, s, header, txn, usedGas, usedBlobGas, vmConfig) - if err != nil { - return nil, fmt.Errorf("tx %x failed: %v", txn.Hash(), err) - } - receipts = append(receipts, receipt) + if w == nil { + return nil, fmt.Errorf("unable to generate witness for block %d", blockNr) } - if !fullBlock { - err = txTds.ResolveStateTrieWithFunc( - func(loader *trie.SubTrieLoader, rl *trie.RetainList, dbPrefixes [][]byte, fixedbits []int, accountNibbles [][]byte) (trie.SubTries, error) { - return trie.SubTries{}, nil - }, - ) - - if err != nil { - return nil, err - } - - rl = txTds.GetRetainList() - - w, err = s.GetTrie().ExtractWitness(false, rl) - - if err != nil { - return nil, err - } - - var buf bytes.Buffer - _, err = w.WriteInto(&buf) - if err != nil { - return nil, err - } - - return buf.Bytes(), nil - } - - receiptSha := types.DeriveSha(receipts) - if !vmConfig.StatelessExec && chainConfig.IsByzantium(header.Number.Uint64()) && !vmConfig.NoReceipts && receiptSha != block.ReceiptHash() { - return nil, fmt.Errorf("mismatched receipt headers for block %d (%s != %s)", block.NumberU64(), receiptSha.Hex(), block.ReceiptHash().Hex()) - } - - if !vmConfig.StatelessExec && *usedGas != header.GasUsed { - return nil, fmt.Errorf("gas used by execution: %d, in header: %d", *usedGas, header.GasUsed) - } - - if header.BlobGasUsed != nil && *usedBlobGas != *header.BlobGasUsed { - return nil, fmt.Errorf("blob gas used by execution: %d, in header: %d", *usedBlobGas, *header.BlobGasUsed) - } - - var bloom types.Bloom - if !vmConfig.NoReceipts { - bloom = types.CreateBloom(receipts) - if !vmConfig.StatelessExec && bloom != header.Bloom { - return nil, fmt.Errorf("bloom computed by execution: %x, in header: %x", bloom, header.Bloom) - } - } - - if !vmConfig.ReadOnly { - _, _, _, err := engine.FinalizeAndAssemble(chainConfig, block.Header(), ibs, block.Transactions(), block.Uncles(), receipts, block.Withdrawals(), nil, nil, nil, nil) - if err != nil { - return nil, err - } - - rules := chainConfig.Rules(block.NumberU64(), header.Time) - - ibs.FinalizeTx(rules, s) - - if err := ibs.CommitBlock(rules, s); err != nil { - return nil, fmt.Errorf("committing block %d failed: %v", block.NumberU64(), err) - } - } - - if err = s.CheckRoot(header.Root); err != nil { - return nil, err - } - - roots, err := tds.UpdateStateTrie() - + var witness bytes.Buffer + _, err = w.WriteInto(&witness) if err != nil { return nil, err } - if roots[len(roots)-1] != block.Root() { - return nil, fmt.Errorf("mismatch in expected state root computed %v vs %v indicates bug in witness implementation", roots[len(roots)-1], block.Root()) + buf, err := stagedsync.VerifyWitness(tx, block, prevHeader, fullBlock, uint64(txIndex), store.ChainReader, store.Tds, txTds, store.GetHashFn, &cfg, &witness, logger) + if err != nil { + return nil, fmt.Errorf("error verifying witness for block %d: %v", blockNr, err) } return buf.Bytes(), nil diff --git a/turbo/stages/mock/mock_sentry.go b/turbo/stages/mock/mock_sentry.go index 921855eadc1..8cd2dfa12b3 100644 --- a/turbo/stages/mock/mock_sentry.go +++ b/turbo/stages/mock/mock_sentry.go @@ -459,6 +459,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK mock.agg, nil, ), + stagedsync.StageWitnessCfg(mock.DB, cfg.EnableWitnessGeneration, cfg.MaxWitnessLimit, mock.ChainConfig, mock.Engine, mock.BlockReader, mock.Dirs), stagedsync.StageHashStateCfg(mock.DB, mock.Dirs, cfg.HistoryV3), stagedsync.StageTrieCfg(mock.DB, checkStateRoot, true, false, dirs.Tmp, mock.BlockReader, mock.sentriesClient.Hd, cfg.HistoryV3, mock.agg), stagedsync.StageHistoryCfg(mock.DB, prune, dirs.Tmp), diff --git a/turbo/stages/stageloop.go b/turbo/stages/stageloop.go index 721e3f942ca..f956c688a9e 100644 --- a/turbo/stages/stageloop.go +++ b/turbo/stages/stageloop.go @@ -542,6 +542,7 @@ func NewDefaultStages(ctx context.Context, agg, silkwormForExecutionStage(silkworm, cfg), ), + stagedsync.StageWitnessCfg(db, cfg.EnableWitnessGeneration, cfg.MaxWitnessLimit, controlServer.ChainConfig, controlServer.Engine, blockReader, dirs), stagedsync.StageHashStateCfg(db, dirs, cfg.HistoryV3), stagedsync.StageTrieCfg(db, true, true, false, dirs.Tmp, blockReader, controlServer.Hd, cfg.HistoryV3, agg), stagedsync.StageHistoryCfg(db, cfg.Prune, dirs.Tmp),