diff --git a/Cargo.lock b/Cargo.lock index 1e84cabf85de..82054e4559c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7203,6 +7203,7 @@ dependencies = [ name = "reth-evm" version = "1.0.0" dependencies = [ + "alloy-rpc-types-eth", "auto_impl", "futures-util", "parking_lot 0.12.3", @@ -9793,6 +9794,7 @@ version = "1.0.0" dependencies = [ "alloy-eips", "alloy-sol-types", + "flate2", "reth-chainspec", "reth-consensus", "reth-evm", @@ -9800,6 +9802,7 @@ dependencies = [ "reth-primitives", "reth-prune-types", "reth-revm", + "reth-rpc-types-compat", "reth-testing-utils", "revm-primitives", "secp256k1", @@ -9880,7 +9883,6 @@ name = "taiko-reth-proposer-consensus" version = "1.0.0" dependencies = [ "alloy-rlp", - "flate2", "futures-util", "reth-chainspec", "reth-consensus", @@ -9890,8 +9892,6 @@ dependencies = [ "reth-primitives", "reth-provider", "reth-revm", - "reth-rpc-types", - "reth-rpc-types-compat", "reth-transaction-pool", "tokio", "tracing", diff --git a/crates/ethereum/evm/src/execute.rs b/crates/ethereum/evm/src/execute.rs index 00256e5bfb77..83acf649ce20 100644 --- a/crates/ethereum/evm/src/execute.rs +++ b/crates/ethereum/evm/src/execute.rs @@ -379,7 +379,7 @@ where // NOTE: we need to merge keep the reverts for the bundle retention self.state.merge_transitions(BundleRetention::Reverts); - Ok(BlockExecutionOutput { state: self.state.take_bundle(), receipts, requests, gas_used }) + Ok(BlockExecutionOutput { state: self.state.take_bundle(), receipts, requests, gas_used, target_list: vec![] }) } } diff --git a/crates/evm/Cargo.toml b/crates/evm/Cargo.toml index 7fa52726648c..a425a569ee20 100644 --- a/crates/evm/Cargo.toml +++ b/crates/evm/Cargo.toml @@ -19,6 +19,7 @@ revm-primitives.workspace = true reth-prune-types.workspace = true reth-storage-errors.workspace = true reth-execution-types.workspace = true +alloy-rpc-types-eth.workspace = true revm.workspace = true diff --git a/crates/evm/src/execute.rs b/crates/evm/src/execute.rs index 5ede6c78ff8e..568978a45f37 100644 --- a/crates/evm/src/execute.rs +++ b/crates/evm/src/execute.rs @@ -1,5 +1,6 @@ //! Traits for execution. +use alloy_rpc_types_eth::transaction::Transaction; use reth_execution_types::ExecutionOutcome; use reth_primitives::{BlockNumber, BlockWithSenders, Receipt, Request, U256}; use reth_prune_types::PruneModes; @@ -90,6 +91,17 @@ pub trait BatchExecutor { fn size_hint(&self) -> Option; } +/// Result of the trigger +#[derive(Debug, Clone)] +pub struct TaskResult { + /// Transactions + pub txs: Vec, + /// Estimated gas used + pub estimated_gas_used: u64, + /// Bytes length + pub bytes_length: u64, +} + /// The output of an ethereum block. /// /// Contains the state changes, transaction receipts, and total gas used in the block. @@ -105,6 +117,8 @@ pub struct BlockExecutionOutput { pub requests: Vec, /// The total gas used by the block. pub gas_used: u64, + /// The target list. + pub target_list: Vec, } /// A helper type for ethereum block inputs that consists of a block and the total difficulty. @@ -118,12 +132,26 @@ pub struct BlockExecutionInput<'a, Block> { pub enable_anchor: bool, /// Enable skip invalid transaction. pub enable_skip: bool, + /// Enable build transaction lists. + pub enable_build: bool, + /// Max compressed bytes. + pub max_bytes_per_tx_list: u64, + /// Max length of transactions list. + pub max_transactions_lists: u64, } impl<'a, Block> BlockExecutionInput<'a, Block> { /// Creates a new input. pub fn new(block: &'a mut Block, total_difficulty: U256) -> Self { - Self { block, total_difficulty, enable_anchor: true, enable_skip: true } + Self { + block, + total_difficulty, + enable_anchor: true, + enable_skip: true, + enable_build: false, + max_bytes_per_tx_list: 0, + max_transactions_lists: 0, + } } } @@ -135,7 +163,15 @@ impl<'a, Block> From<(&'a mut Block, U256)> for BlockExecutionInput<'a, Block> { impl<'a, Block> From<(&'a mut Block, U256, bool)> for BlockExecutionInput<'a, Block> { fn from((block, total_difficulty, enable_anchor): (&'a mut Block, U256, bool)) -> Self { - Self { block, total_difficulty, enable_anchor, enable_skip: true } + Self { + block, + total_difficulty, + enable_anchor, + enable_skip: true, + enable_build: false, + max_bytes_per_tx_list: 0, + max_transactions_lists: 0, + } } } @@ -143,7 +179,15 @@ impl<'a, Block> From<(&'a mut Block, U256, bool, bool)> for BlockExecutionInput< fn from( (block, total_difficulty, enable_anchor, enable_skip): (&'a mut Block, U256, bool, bool), ) -> Self { - Self { block, total_difficulty, enable_anchor, enable_skip } + Self { + block, + total_difficulty, + enable_anchor, + enable_skip, + enable_build: false, + max_bytes_per_tx_list: 0, + max_transactions_lists: 0, + } } } diff --git a/crates/taiko/consensus/proposer/Cargo.toml b/crates/taiko/consensus/proposer/Cargo.toml index 18602236d600..628876beaf51 100644 --- a/crates/taiko/consensus/proposer/Cargo.toml +++ b/crates/taiko/consensus/proposer/Cargo.toml @@ -21,8 +21,6 @@ reth-revm.workspace = true reth-transaction-pool.workspace = true reth-evm.workspace = true reth-consensus.workspace = true -reth-rpc-types.workspace = true -reth-rpc-types-compat.workspace = true reth-errors.workspace = true @@ -32,5 +30,4 @@ tokio = { workspace = true, features = ["sync", "time"] } tracing.workspace = true # misc -flate2.workspace = true alloy-rlp.workspace = true diff --git a/crates/taiko/consensus/proposer/src/lib.rs b/crates/taiko/consensus/proposer/src/lib.rs index 7476607876f8..990497145a1a 100644 --- a/crates/taiko/consensus/proposer/src/lib.rs +++ b/crates/taiko/consensus/proposer/src/lib.rs @@ -12,26 +12,19 @@ html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256", issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/" )] -#![cfg_attr(not(test), warn(unused_crate_dependencies))] -#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] -use flate2::write::ZlibEncoder; -use flate2::Compression; use reth_chainspec::ChainSpec; use reth_consensus::{Consensus, ConsensusError, PostExecutionInput}; use reth_errors::RethError; use reth_execution_errors::{BlockExecutionError, BlockValidationError}; use reth_primitives::{ - eip4844::calculate_excess_blob_gas, proofs, transaction::TransactionSignedList, Address, Block, - BlockWithSenders, Header, Requests, SealedBlock, SealedHeader, TransactionSigned, Withdrawals, - U256, + eip4844::calculate_excess_blob_gas, proofs, Address, Block, BlockWithSenders, Header, Requests, + SealedBlock, SealedHeader, TransactionSigned, Withdrawals, U256, }; use reth_provider::{BlockReaderIdExt, StateProviderFactory}; use reth_revm::database::StateProviderDatabase; -use reth_rpc_types::Transaction; use reth_transaction_pool::TransactionPool; use std::{ - io::{self, Write}, sync::Arc, time::{SystemTime, UNIX_EPOCH}, }; @@ -42,7 +35,9 @@ mod client; mod task; pub use crate::client::ProposerClient; -use reth_evm::execute::{BlockExecutionOutput, BlockExecutorProvider, Executor}; +use reth_evm::execute::{ + BlockExecutionInput, BlockExecutionOutput, BlockExecutorProvider, Executor, TaskResult, +}; pub use task::ProposerTask; /// A consensus implementation intended for local development and testing purposes. @@ -157,17 +152,6 @@ pub struct TaskArgs { tx: oneshot::Sender, RethError>>, } -/// Result of the trigger -#[derive(Debug)] -pub struct TaskResult { - /// Transactions - pub txs: Vec, - /// Estimated gas used - pub estimated_gas_used: u64, - /// Bytes length - pub bytes_length: u64, -} - #[derive(Debug, Clone, Default)] struct Storage; @@ -303,70 +287,18 @@ impl Storage { ); // execute the block - let BlockExecutionOutput { receipts, .. } = - executor.executor(&mut db).execute((&mut block, U256::ZERO, false).into())?; - let Block { body, .. } = block.block; - - debug!(target: "taiko::proposer", transactions=?body, "after executing transactions"); - - let mut tx_lists = vec![]; - let mut chunk_start = 0; - let mut last_compressed_buf = None; - let mut gas_used_start = 0; - for idx in 0..body.len() { - if let Some((txs_range, estimated_gas_used, compressed_buf)) = { - let compressed_buf = encode_and_compress_tx_list(&body[chunk_start..=idx]) - .map_err(BlockExecutionError::other)?; - - if compressed_buf.len() > max_bytes_per_tx_list as usize { - // the first transaction in chunk is too large, so we need to skip it - if idx == chunk_start { - gas_used_start = receipts[idx].cumulative_gas_used; - chunk_start += 1; - // the first transaction in chunk is too large, so we need to skip it - None - } else { - // current chunk reaches the max_transactions_lists or max_bytes_per_tx_list - // and use previous transaction's data - let estimated_gas_used = - receipts[idx - 1].cumulative_gas_used - gas_used_start; - gas_used_start = receipts[idx - 1].cumulative_gas_used; - let range = chunk_start..idx; - chunk_start = idx; - Some((range, estimated_gas_used, last_compressed_buf.clone())) - } - } - // reach the limitation of max_transactions_lists or max_bytes_per_tx_list - else if idx - chunk_start + 1 == max_transactions_lists as usize { - let estimated_gas_used = receipts[idx].cumulative_gas_used - gas_used_start; - gas_used_start = receipts[idx].cumulative_gas_used; - let range = chunk_start..idx + 1; - chunk_start = idx + 1; - Some((range, estimated_gas_used, Some(compressed_buf))) - } else { - last_compressed_buf = Some(compressed_buf); - None - } - } { - tx_lists.push(TaskResult { - txs: body[txs_range] - .iter() - .cloned() - .map(|tx| reth_rpc_types_compat::transaction::from_signed(tx).unwrap()) - .collect(), - estimated_gas_used, - bytes_length: compressed_buf.map_or(0, |b| b.len() as u64), - }); - } - } + let block_input = BlockExecutionInput { + block: &mut block, + total_difficulty: U256::ZERO, + enable_anchor: false, + enable_skip: false, + enable_build: true, + max_bytes_per_tx_list, + max_transactions_lists, + }; + let BlockExecutionOutput { target_list, .. } = + executor.executor(&mut db).execute(block_input.into())?; - Ok(tx_lists) + Ok(target_list) } } - -fn encode_and_compress_tx_list(txs: &[TransactionSigned]) -> io::Result> { - let encoded_buf = alloy_rlp::encode(TransactionSignedList(txs)); - let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default()); - encoder.write_all(&encoded_buf)?; - encoder.finish() -} diff --git a/crates/taiko/consensus/proposer/src/task.rs b/crates/taiko/consensus/proposer/src/task.rs index 759f1046a189..e262b07e94f5 100644 --- a/crates/taiko/consensus/proposer/src/task.rs +++ b/crates/taiko/consensus/proposer/src/task.rs @@ -96,6 +96,7 @@ where }); local_txs.extend(remote_txs); debug!(target: "taiko::proposer", txs = ?local_txs.len(), "Proposer filter best transactions"); + // miner returned a set of transaction that we feed to the producer this.queued.push_back((trigger_args, local_txs)); }; @@ -111,7 +112,6 @@ where let client = this.provider.clone(); let chain_spec = Arc::clone(&this.chain_spec); - let pool = this.pool.clone(); let executor = this.block_executor.clone(); // Create the mining future that creates a block, notifies the engine that drives @@ -144,10 +144,6 @@ where max_transactions_lists, base_fee, ); - if res.is_ok() { - // clear all transactions from pool - pool.remove_transactions(txs.iter().map(|tx| tx.hash()).collect()); - } let _ = tx.send(res); })); } diff --git a/crates/taiko/evm/Cargo.toml b/crates/taiko/evm/Cargo.toml index 92fbc1b74409..f48b1343c713 100644 --- a/crates/taiko/evm/Cargo.toml +++ b/crates/taiko/evm/Cargo.toml @@ -19,6 +19,7 @@ reth-revm = { workspace = true, features = ["taiko"] } reth-prune-types.workspace = true reth-execution-types.workspace = true reth-consensus.workspace = true +reth-rpc-types-compat.workspace = true # Taiko taiko-reth-beacon-consensus.workspace = true @@ -32,6 +33,7 @@ alloy-sol-types.workspace = true # Misc tracing.workspace = true +flate2.workspace = true [dev-dependencies] reth-testing-utils.workspace = true diff --git a/crates/taiko/evm/src/execute.rs b/crates/taiko/evm/src/execute.rs index 27df7e39f1b6..ef13df84e916 100644 --- a/crates/taiko/evm/src/execute.rs +++ b/crates/taiko/evm/src/execute.rs @@ -4,6 +4,8 @@ use crate::{ dao_fork::{DAO_HARDFORK_BENEFICIARY, DAO_HARDKFORK_ACCOUNTS}, TaikoEvmConfig, }; +use flate2::write::ZlibEncoder; +use flate2::Compression; use reth_chainspec::{ChainSpec, TAIKO_HEKLA, TAIKO_MAINNET}; use reth_consensus::ConsensusError; use reth_evm::{ @@ -15,7 +17,8 @@ use reth_evm::{ }; use reth_execution_types::ExecutionOutcome; use reth_primitives::{ - BlockNumber, BlockWithSenders, Hardfork, Header, Receipt, Request, Withdrawals, U256, + BlockNumber, BlockWithSenders, Hardfork, Header, Receipt, Request, TransactionSigned, + Withdrawals, U256, }; use reth_prune_types::PruneModes; use reth_revm::{ @@ -37,8 +40,14 @@ use taiko_reth_beacon_consensus::{ #[cfg(not(feature = "std"))] use alloc::{sync::Arc, vec, vec::Vec}; +use std::io; +use std::io::Write; use tracing::debug; +use reth_evm::execute::TaskResult; +use reth_primitives::transaction::TransactionSignedList; +use reth_revm::interpreter::Host; +use revm_primitives::alloy_primitives::private::alloy_rlp; #[cfg(feature = "std")] use std::sync::Arc; @@ -341,6 +350,13 @@ impl TaikoBlockExecutor { } } +fn encode_and_compress_tx_list(txs: &[TransactionSigned]) -> io::Result> { + let encoded_buf = alloy_rlp::encode(TransactionSignedList(txs)); + let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default()); + encoder.write_all(&encoded_buf)?; + encoder.finish() +} + impl TaikoBlockExecutor where EvmConfig: ConfigureEvm, @@ -394,6 +410,94 @@ where Ok(output) } + fn build_transaction_list( + &mut self, + block: &BlockWithSenders, + max_bytes_per_tx_list: u64, + max_transactions_lists: u64, + ) -> Result, BlockExecutionError> { + let env = self.evm_env_for_block(&block.header, U256::ZERO); + let mut evm = self.executor.evm_config.evm_with_env(&mut self.state, env); + // 2. configure the evm and execute + // apply pre execution changes + apply_beacon_root_contract_call( + &self.executor.chain_spec, + block.timestamp, + block.number, + block.parent_beacon_block_root, + &mut evm, + )?; + + apply_blockhashes_update( + evm.db_mut(), + &self.executor.chain_spec, + block.timestamp, + block.number, + block.parent_hash, + )?; + + let mut target_list: Vec = vec![]; + // get previous env + let previous_env = Box::new(evm.context.env().clone()); + + for _ in 0..max_transactions_lists { + // evm.context.evm.db.commit(state); + // re-set the previous env + evm.context.evm.env = previous_env.clone(); + + let mut cumulative_gas_used = 0; + let mut tx_list: Vec = vec![]; + let mut buf_len: u64 = 0; + + for i in 0..block.body.len() { + let transaction = block.body.get(i).unwrap(); + let sender = block.senders.get(i).unwrap(); + let block_available_gas = block.header.gas_limit - cumulative_gas_used; + if transaction.gas_limit() > block_available_gas { + break; + } + + EvmConfig::fill_tx_env(evm.tx_mut(), transaction, *sender); + + // Execute transaction. + let ResultAndState { result, state } = match evm.transact() { + Ok(res) => res, + Err(_) => continue, + }; + tx_list.push(transaction.clone()); + + let compressed_buf = + encode_and_compress_tx_list(&tx_list).map_err(BlockExecutionError::other)?; + if compressed_buf.len() > max_bytes_per_tx_list as usize { + tx_list.pop(); + break; + } + + buf_len = compressed_buf.len() as u64; + // append gas used + cumulative_gas_used += result.gas_used(); + + // collect executed transaction state + evm.db_mut().commit(state); + } + + if tx_list.is_empty() { + break; + } + target_list.push(TaskResult { + txs: tx_list[..] + .iter() + .cloned() + .map(|tx| reth_rpc_types_compat::transaction::from_signed(tx).unwrap()) + .collect(), + estimated_gas_used: cumulative_gas_used, + bytes_length: buf_len, + }); + } + + Ok(target_list) + } + /// Apply settings before a new block is executed. pub(crate) fn on_new_block(&mut self, header: &Header) { // Set state clear flag if the block is after the Spurious Dragon hardfork. @@ -458,14 +562,44 @@ where /// /// State changes are committed to the database. fn execute(mut self, input: Self::Input<'_>) -> Result { - let BlockExecutionInput { block, total_difficulty, enable_anchor, enable_skip } = input; - let TaikoExecuteOutput { receipts, requests, gas_used } = - self.execute_without_verification(block, total_difficulty, enable_anchor, enable_skip)?; - - // NOTE: we need to merge keep the reverts for the bundle retention - self.state.merge_transitions(BundleRetention::Reverts); - - Ok(BlockExecutionOutput { state: self.state.take_bundle(), receipts, requests, gas_used }) + let BlockExecutionInput { + block, + total_difficulty, + enable_anchor, + enable_skip, + enable_build, + max_bytes_per_tx_list, + max_transactions_lists, + } = input; + if enable_build { + let target_list = + self.build_transaction_list(block, max_bytes_per_tx_list, max_transactions_lists)?; + Ok(BlockExecutionOutput { + state: Default::default(), + receipts: vec![], + requests: vec![], + gas_used: 0, + target_list, + }) + } else { + let TaikoExecuteOutput { receipts, requests, gas_used } = self + .execute_without_verification( + block, + total_difficulty, + enable_anchor, + enable_skip, + )?; + + // NOTE: we need to merge keep the reverts for the bundle retention + self.state.merge_transitions(BundleRetention::Reverts); + Ok(BlockExecutionOutput { + state: self.state.take_bundle(), + receipts, + requests, + gas_used, + target_list: vec![], + }) + } } } @@ -501,7 +635,7 @@ where type Error = BlockExecutionError; fn execute_and_verify_one(&mut self, input: Self::Input<'_>) -> Result<(), Self::Error> { - let BlockExecutionInput { block, total_difficulty, enable_anchor, enable_skip } = input; + let BlockExecutionInput { block, total_difficulty, enable_anchor, enable_skip, .. } = input; let TaikoExecuteOutput { receipts, requests, gas_used: _ } = self .executor .execute_without_verification(block, total_difficulty, enable_anchor, enable_skip)?; diff --git a/crates/taiko/payload/builder/src/builder.rs b/crates/taiko/payload/builder/src/builder.rs index 3eb04eb8adea..6846c3ebdc7d 100644 --- a/crates/taiko/payload/builder/src/builder.rs +++ b/crates/taiko/payload/builder/src/builder.rs @@ -167,7 +167,7 @@ where .ok_or(BlockExecutionError::Validation(BlockValidationError::SenderRecoveryError))?; // execute the block - let BlockExecutionOutput { state, receipts, requests, gas_used } = + let BlockExecutionOutput { state, receipts, requests, gas_used, .. } = executor.executor(&mut db).execute((&mut block, U256::ZERO).into())?; let execution_outcome = diff --git a/crates/taiko/payload/builder/src/lib.rs b/crates/taiko/payload/builder/src/lib.rs index 777d1d76f884..f103ff8cdbfe 100644 --- a/crates/taiko/payload/builder/src/lib.rs +++ b/crates/taiko/payload/builder/src/lib.rs @@ -5,9 +5,6 @@ html_favicon_url = "https://avatars0.githubusercontent.com/u/97369466?s=256", issue_tracker_base_url = "https://github.com/paradigmxyz/reth/issues/" )] -#![cfg_attr(not(test), warn(unused_crate_dependencies))] -#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))] -#![allow(clippy::useless_let_if_seq)] pub mod builder; pub use builder::*; pub mod error;