diff --git a/Cargo.lock b/Cargo.lock index 7472f3ac5785..ba7203947047 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7431,8 +7431,10 @@ dependencies = [ "crossbeam-channel", "derive_more 2.0.1", "futures", + "itertools 0.14.0", "metrics", "mini-moka", + "pretty_assertions", "proptest", "rand 0.8.5", "rayon", diff --git a/Cargo.toml b/Cargo.toml index b0ab50f6626d..db2e7a860f11 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -602,13 +602,14 @@ arbitrary = "1.3" assert_matches = "1.5.0" criterion = { package = "codspeed-criterion-compat", version = "2.7" } pprof = "0.14" +pretty_assertions = "1.4" proptest = "1.4" proptest-derive = "0.5" +rstest = "0.24.0" serial_test = { default-features = false, version = "3" } similar-asserts = { version = "1.5.0", features = ["serde"] } tempfile = "3.8" test-fuzz = "7" -rstest = "0.24.0" # allocators tikv-jemalloc-ctl = "0.6" diff --git a/crates/engine/invalid-block-hooks/Cargo.toml b/crates/engine/invalid-block-hooks/Cargo.toml index a7b0153d0d4b..045cccdba354 100644 --- a/crates/engine/invalid-block-hooks/Cargo.toml +++ b/crates/engine/invalid-block-hooks/Cargo.toml @@ -35,6 +35,6 @@ futures.workspace = true # misc eyre.workspace = true jsonrpsee.workspace = true -pretty_assertions = "1.4" +pretty_assertions.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/crates/engine/tree/Cargo.toml b/crates/engine/tree/Cargo.toml index 7f2e84408bda..9fb7516112b1 100644 --- a/crates/engine/tree/Cargo.toml +++ b/crates/engine/tree/Cargo.toml @@ -54,10 +54,11 @@ metrics.workspace = true reth-metrics = { workspace = true, features = ["common"] } # misc -schnellru.workspace = true +derive_more.workspace = true +itertools.workspace = true rayon.workspace = true +schnellru.workspace = true tracing.workspace = true -derive_more.workspace = true # optional deps for test-utils reth-prune-types = { workspace = true, optional = true } @@ -91,6 +92,7 @@ revm-state.workspace = true assert_matches.workspace = true criterion.workspace = true crossbeam-channel = "0.5.13" +pretty_assertions.workspace = true proptest.workspace = true rand.workspace = true diff --git a/crates/engine/tree/src/tree/root.rs b/crates/engine/tree/src/tree/root.rs index fceaab9c2f2b..d58eff233952 100644 --- a/crates/engine/tree/src/tree/root.rs +++ b/crates/engine/tree/src/tree/root.rs @@ -1,7 +1,12 @@ //! State root task related functionality. -use alloy_primitives::map::HashSet; +use alloy_primitives::{ + keccak256, + map::{B256Map, HashSet}, + B256, +}; use derive_more::derive::Deref; +use itertools::Itertools; use metrics::Histogram; use rayon::iter::{ParallelBridge, ParallelIterator}; use reth_errors::{ProviderError, ProviderResult}; @@ -28,9 +33,8 @@ use reth_trie_sparse::{ errors::{SparseStateTrieResult, SparseTrieErrorKind}, SparseStateTrie, }; -use revm_primitives::{keccak256, B256}; use std::{ - collections::{BTreeMap, VecDeque}, + collections::{hash_map::Entry, BTreeMap, VecDeque}, sync::{ mpsc::{self, channel, Receiver, Sender}, Arc, @@ -42,6 +46,11 @@ use tracing::{debug, error, trace, trace_span}; /// The level below which the sparse trie hashes are calculated in [`update_sparse_trie`]. const SPARSE_TRIE_INCREMENTAL_LEVEL: usize = 2; +/// Maximum number of account targets in a multiproof. +const MULTIPROOF_ACCOUNTS_CHUNK_SIZE: usize = 5; +/// Maximum number of storage slots targets per account in a multiproof. +const MULTIPROOF_STORAGES_CHUNK_SIZE: usize = 5; + /// Determines the size of the rayon thread pool to be used in [`StateRootTask`]. /// /// The value is determined as `max(NUM_THREADS - 2, 3)`: @@ -686,14 +695,22 @@ where let proof_targets = self.get_prefetch_proof_targets(targets); extend_multi_proof_targets_ref(&mut self.fetched_proof_targets, &proof_targets); - self.multiproof_manager.spawn_or_queue(MultiproofInput { - config: self.config.clone(), - source: None, - hashed_state_update: Default::default(), + for chunk in ChunkedProofTargets::new( proof_targets, - proof_sequence_number: self.proof_sequencer.next_sequence(), - state_root_message_sender: self.tx.clone(), - }); + MULTIPROOF_ACCOUNTS_CHUNK_SIZE, + MULTIPROOF_STORAGES_CHUNK_SIZE, + ) + .flatten() + { + self.multiproof_manager.spawn_or_queue(MultiproofInput { + config: self.config.clone(), + source: None, + hashed_state_update: HashedPostState::default(), + proof_targets: chunk, + proof_sequence_number: self.proof_sequencer.next_sequence(), + state_root_message_sender: self.tx.clone(), + }); + } } /// Calls `get_proof_targets` with existing proof targets for prefetching. @@ -746,27 +763,83 @@ where targets } - /// Handles state updates. + /// Handles state update. /// - /// Returns proof targets derived from the state update. - fn on_state_update( - &mut self, - source: StateChangeSource, - update: EvmState, - proof_sequence_number: u64, - ) { - let hashed_state_update = evm_state_to_hashed_post_state(update); - let proof_targets = get_proof_targets(&hashed_state_update, &self.fetched_proof_targets); + /// Chunks into multiple state updates, so that each has at most + /// `MULTIPROOF_ACCOUNTS_CHUNK_SIZE` accounts and `MULTIPROOF_STORAGES_CHUNK_SIZE` storage + /// slots per account. + /// + /// After chunking, [`MultiproofManager::spawn_or_queue`] is called for each state update. + /// + /// Returns number of new updates generated by one state update. + fn on_state_update(&mut self, source: StateChangeSource, update: EvmState) -> u64 { + let mut state_update = evm_state_to_hashed_post_state(update); + let proof_targets = get_proof_targets(&state_update, &self.fetched_proof_targets); extend_multi_proof_targets_ref(&mut self.fetched_proof_targets, &proof_targets); - self.multiproof_manager.spawn_or_queue(MultiproofInput { - config: self.config.clone(), - source: Some(source), - hashed_state_update, + debug!(target: "engine::root", ?state_update, "State update before"); + debug!(target: "engine::root", ?proof_targets, "Proof targets"); + + let mut total_updates = 0; + + for chunk in ChunkedProofTargets::new( proof_targets, - proof_sequence_number, - state_root_message_sender: self.tx.clone(), - }); + MULTIPROOF_ACCOUNTS_CHUNK_SIZE, + MULTIPROOF_STORAGES_CHUNK_SIZE, + ) + .flatten() + { + debug!(target: "engine::root", ?chunk, "Chunk"); + + total_updates += 1; + + let mut accounts = B256Map::with_capacity_and_hasher(chunk.len(), Default::default()); + let mut storages = B256Map::with_capacity_and_hasher(chunk.len(), Default::default()); + + for (&address, storage_slots) in &chunk { + if let Some(account) = state_update.accounts.remove(&address) { + accounts.insert(address, account); + } + + if !storage_slots.is_empty() { + let state_storage = state_update.storages.entry(address); + let mut hashed_storage = HashedStorage::default(); + match state_storage { + Entry::Occupied(mut entry) => { + for storage_slot in storage_slots { + let value = entry + .get_mut() + .storage + .remove(storage_slot) + .expect("storage slot should be present"); + hashed_storage.storage.insert(*storage_slot, value); + } + + if entry.get_mut().storage.is_empty() { + entry.remove(); + } + } + Entry::Vacant(_) => unreachable!(), + } + storages.insert(address, hashed_storage); + } + } + + let hashed_state_update = HashedPostState { accounts, storages }; + debug!(target: "engine::tree", ?hashed_state_update, ?chunk, "Spawning multiproof"); + self.multiproof_manager.spawn_or_queue(MultiproofInput { + config: self.config.clone(), + source: Some(source), + hashed_state_update, + proof_targets: chunk, + proof_sequence_number: self.proof_sequencer.next_sequence(), + state_root_message_sender: self.tx.clone(), + }); + } + + debug!(target: "engine::tree", ?state_update, "State update after"); + + total_updates as u64 } /// Handler for new proof calculated, aggregates all the existing sequential proofs. @@ -867,16 +940,16 @@ where } last_update_time = Some(Instant::now()); - updates_received += 1; + let update_size = update.len(); + let new_updates = self.on_state_update(source, update); + updates_received += new_updates; debug!( target: "engine::root", - ?source, - len = update.len(), + update_size, + new_updates, total_updates = updates_received, "Received new state update" ); - let next_sequence = self.proof_sequencer.next_sequence(); - self.on_state_update(source, update, next_sequence); } StateRootMessage::FinishedStateUpdates => { trace!(target: "engine::root", "processing StateRootMessage::FinishedStateUpdates"); @@ -1169,6 +1242,64 @@ fn get_proof_targets( targets } +/// Iterator over proof targets chunks. +/// +/// Each chunk will have at most [`MULTIPROOF_ACCOUNTS_CHUNK_SIZE`] accounts and +/// [`MULTIPROOF_STORAGES_CHUNK_SIZE`] storage slots per account. +/// +/// This iterator will yield items of type [`Vec>`], with each mapping having a +/// maximum length of [`MULTIPROOF_ACCOUNTS_CHUNK_SIZE`], and each mapping value having a maximum +/// length of [`MULTIPROOF_STORAGES_CHUNK_SIZE`]. +struct ChunkedProofTargets { + proof_targets: MultiProofTargets, + accounts_chunk_size: usize, + storages_chunk_size: usize, +} + +impl ChunkedProofTargets { + fn new( + proof_targets: MultiProofTargets, + accounts_chunk_size: usize, + storages_chunk_size: usize, + ) -> Self { + Self { proof_targets, accounts_chunk_size, storages_chunk_size } + } +} + +impl Iterator for ChunkedProofTargets { + type Item = Vec; + + fn next(&mut self) -> Option { + if self.proof_targets.is_empty() { + return None; + } + + // Every address will be added to at least first chunk + let mut chunks = vec![MultiProofTargets::default(); 1]; + + let keys: Vec = + self.proof_targets.keys().take(self.accounts_chunk_size).copied().collect(); + for address in keys { + let storage_slots = self.proof_targets.remove(&address).expect("address not found"); + let storage_chunks = storage_slots.into_iter().chunks(self.storages_chunk_size); + + // Initialize the chunk with an address with no storage slots. We need to do this, + // because the account may have no storage slots in the targets, but we still need to + // add it to the chunk. + chunks[0].entry(address).or_default(); + + for (i, chunk) in storage_chunks.into_iter().enumerate() { + if i >= chunks.len() { + chunks.push(B256Map::default()); + } + chunks[i].entry(address).or_default().extend(chunk); + } + } + + Some(chunks) + } +} + /// Calculate multiproof for the targets. #[inline] fn calculate_multiproof( @@ -1784,4 +1915,44 @@ mod tests { vec![slot2].into_iter().collect::() ); } + + #[test] + fn test_chunked_proof_targets() { + let address1 = B256::from([1; 32]); + let address2 = B256::from([2; 32]); + let address3 = B256::from([3; 32]); + + let slot1 = B256::from([1; 32]); + let slot2 = B256::from([2; 32]); + let slot3 = B256::from([3; 32]); + + let mut targets = MultiProofTargets::from_iter([ + (address1, vec![slot1, slot2, slot3].into_iter().collect::()), + (address2, vec![slot2, slot3].into_iter().collect::()), + (address3, B256Set::default()), + ]); + let chunks = ChunkedProofTargets::new(targets.clone(), 1, 2); + + for chunk in chunks.flatten() { + // Every chunk should have at most one address + assert_eq!(chunk.len(), 1); + for (address, slots) in chunk { + // Every chunk should have at most two slots per address + assert!(slots.len() <= 2); + + let Entry::Occupied(mut entry) = targets.entry(address) else { + panic!("address not found"); + }; + let entry_mut = entry.get_mut(); + for slot in slots { + entry_mut.remove(&slot); + } + if entry_mut.is_empty() { + entry.remove(); + } + } + } + // Verify that chunked iterator had all targets from the original list + assert!(targets.is_empty()); + } } diff --git a/crates/trie/sparse/Cargo.toml b/crates/trie/sparse/Cargo.toml index 908d03950e9f..a8784a086b34 100644 --- a/crates/trie/sparse/Cargo.toml +++ b/crates/trie/sparse/Cargo.toml @@ -39,7 +39,7 @@ arbitrary.workspace = true assert_matches.workspace = true criterion.workspace = true itertools.workspace = true -pretty_assertions = "1.4" +pretty_assertions.workspace = true proptest-arbitrary-interop.workspace = true proptest.workspace = true rand.workspace = true