Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(root): chunk state updates in state root task #14500

Draft
wants to merge 25 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
813fe72
feat(evm, root): pass state change source to state hook
shekhirin Feb 14, 2025
2d859c7
some comments
shekhirin Feb 14, 2025
31f5b48
log on multiproof
shekhirin Feb 14, 2025
440e870
perf(root): chunk state updates in state root task
shekhirin Feb 14, 2025
e93d465
add todo for prefetch
shekhirin Feb 14, 2025
1ceeb1e
chunk storage slots by 5 as well
shekhirin Feb 14, 2025
5b12022
at least one element
shekhirin Feb 14, 2025
600e6ee
Merge remote-tracking branch 'origin/main' into alexey/evm-state-hook…
shekhirin Feb 14, 2025
f41000e
updates accounting
shekhirin Feb 14, 2025
f40ce0c
fix comment
shekhirin Feb 14, 2025
976d5a2
revertme: print max
shekhirin Feb 14, 2025
a50c462
Merge remote-tracking branch 'origin/alexey/evm-state-hook-source' in…
shekhirin Feb 14, 2025
1b297fe
Merge remote-tracking branch 'origin/main' into alexey/state-root-tas…
shekhirin Feb 14, 2025
ca1e115
revertme: more than 500
shekhirin Feb 14, 2025
c20669f
revert print
shekhirin Feb 14, 2025
89b61de
use btreemap instead
shekhirin Feb 14, 2025
cf1bc7b
use vec again but nicer indexing in it
shekhirin Feb 14, 2025
dbef921
Revert "use vec again but nicer indexing in it"
shekhirin Feb 14, 2025
8af60fb
Revert "use btreemap instead"
shekhirin Feb 14, 2025
e630c80
collect into one vec first
shekhirin Feb 14, 2025
a1115c3
Merge remote-tracking branch 'origin/main' into alexey/state-root-tas…
shekhirin Feb 18, 2025
ea68fa0
Merge remote-tracking branch 'origin/main' into alexey/state-root-tas…
shekhirin Feb 19, 2025
c6f1fcb
comments
shekhirin Feb 21, 2025
898fe1d
iterate all together
shekhirin Feb 21, 2025
ed5ae5d
use separate iterator struct
shekhirin Feb 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions crates/engine/tree/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@ metrics.workspace = true
reth-metrics = { workspace = true, features = ["common"] }

# misc
schnellru.workspace = true
derive_more.workspace = true
itertools.workspace = true
rayon.workspace = true
schnellru.workspace = true
tracing.workspace = true
derive_more.workspace = true

# optional deps for test-utils
reth-prune-types = { workspace = true, optional = true }
Expand Down
166 changes: 132 additions & 34 deletions crates/engine/tree/src/tree/root.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
//! State root task related functionality.

use alloy_primitives::map::HashSet;
use alloy_primitives::{
keccak256,
map::{B256Map, B256Set, HashSet},
B256,
};
use derive_more::derive::Deref;
use itertools::Itertools;
use metrics::Histogram;
use rayon::iter::{ParallelBridge, ParallelIterator};
use reth_errors::{ProviderError, ProviderResult};
Expand All @@ -28,9 +33,8 @@ use reth_trie_sparse::{
errors::{SparseStateTrieResult, SparseTrieErrorKind},
SparseStateTrie,
};
use revm_primitives::{keccak256, B256};
use std::{
collections::{BTreeMap, VecDeque},
collections::{hash_map::Entry, BTreeMap, VecDeque},
sync::{
mpsc::{self, channel, Receiver, Sender},
Arc,
Expand All @@ -42,6 +46,11 @@ use tracing::{debug, error, trace, trace_span};
/// The level below which the sparse trie hashes are calculated in [`update_sparse_trie`].
const SPARSE_TRIE_INCREMENTAL_LEVEL: usize = 2;

/// Maximum number of account targets in a multiproof.
const MULTIPROOF_ACCOUNTS_CHUNK_SIZE: usize = 5;
/// Maximum number of storage slots targets per account in a multiproof.
const MULTIPROOF_STORAGES_CHUNK_SIZE: usize = 5;

/// Determines the size of the rayon thread pool to be used in [`StateRootTask`].
///
/// The value is determined as `max(NUM_THREADS - 2, 3)`:
Expand Down Expand Up @@ -620,14 +629,16 @@ where
let proof_targets = self.get_prefetch_proof_targets(targets);
extend_multi_proof_targets_ref(&mut self.fetched_proof_targets, &proof_targets);

self.multiproof_manager.spawn_or_queue(MultiproofInput {
config: self.config.clone(),
source: None,
hashed_state_update: Default::default(),
proof_targets,
proof_sequence_number: self.proof_sequencer.next_sequence(),
state_root_message_sender: self.tx.clone(),
});
for chunk in ChunkedProofTargets::new(proof_targets).flatten() {
self.multiproof_manager.spawn_or_queue(MultiproofInput {
config: self.config.clone(),
source: None,
hashed_state_update: HashedPostState::default(),
proof_targets: chunk,
proof_sequence_number: self.proof_sequencer.next_sequence(),
state_root_message_sender: self.tx.clone(),
});
}
}

/// Calls `get_proof_targets` with existing proof targets for prefetching.
Expand Down Expand Up @@ -680,27 +691,68 @@ where
targets
}

/// Handles state updates.
/// Handles state update.
///
/// Returns proof targets derived from the state update.
fn on_state_update(
&mut self,
source: StateChangeSource,
update: EvmState,
proof_sequence_number: u64,
) {
let hashed_state_update = evm_state_to_hashed_post_state(update);
let proof_targets = get_proof_targets(&hashed_state_update, &self.fetched_proof_targets);
/// Chunks into multiple state updates, so that each has at most
/// `MULTIPROOF_ACCOUNTS_CHUNK_SIZE` accounts and `MULTIPROOF_STORAGES_CHUNK_SIZE` storage
/// slots per account.
///
/// After chunking, [`MultiproofManager::spawn_or_queue`] is called for each state update.
///
/// Returns number of new updates generated by one state update.
fn on_state_update(&mut self, source: StateChangeSource, update: EvmState) -> u64 {
let mut state_update = evm_state_to_hashed_post_state(update);
let proof_targets = get_proof_targets(&state_update, &self.fetched_proof_targets);
extend_multi_proof_targets_ref(&mut self.fetched_proof_targets, &proof_targets);

self.multiproof_manager.spawn_or_queue(MultiproofInput {
config: self.config.clone(),
source: Some(source),
hashed_state_update,
proof_targets,
proof_sequence_number,
state_root_message_sender: self.tx.clone(),
});
let mut total_updates = 0;

for chunk in ChunkedProofTargets::new(proof_targets).flatten() {
total_updates += 1;

let mut accounts = B256Map::with_capacity_and_hasher(chunk.len(), Default::default());
let mut storages = B256Map::with_capacity_and_hasher(chunk.len(), Default::default());

for (&address, storage_slots) in &chunk {
if let Some(account) = state_update.accounts.remove(&address) {
accounts.insert(address, account);
}

if !storage_slots.is_empty() {
let state_storage = state_update.storages.entry(address);
let mut hashed_storage = HashedStorage::default();
match state_storage {
Entry::Occupied(mut entry) => {
for storage_slot in storage_slots {
let value = entry
.get_mut()
.storage
.remove(storage_slot)
.expect("storage slot should be present");
hashed_storage.storage.insert(*storage_slot, value);
}

if entry.get_mut().storage.is_empty() {
entry.remove();
}
}
Entry::Vacant(_) => unreachable!(),
}
storages.insert(address, hashed_storage);
}
}

self.multiproof_manager.spawn_or_queue(MultiproofInput {
config: self.config.clone(),
source: Some(source),
hashed_state_update: HashedPostState { accounts, storages },
proof_targets: chunk,
proof_sequence_number: self.proof_sequencer.next_sequence(),
state_root_message_sender: self.tx.clone(),
});
}

total_updates as u64
}

/// Handler for new proof calculated, aggregates all the existing sequential proofs.
Expand Down Expand Up @@ -801,16 +853,16 @@ where
}
last_update_time = Some(Instant::now());

updates_received += 1;
let update_size = update.len();
let new_updates = self.on_state_update(source, update);
updates_received += new_updates;
debug!(
target: "engine::root",
?source,
len = update.len(),
update_size,
new_updates,
total_updates = updates_received,
"Received new state update"
);
let next_sequence = self.proof_sequencer.next_sequence();
self.on_state_update(source, update, next_sequence);
}
StateRootMessage::FinishedStateUpdates => {
trace!(target: "engine::root", "processing StateRootMessage::FinishedStateUpdates");
Expand Down Expand Up @@ -1103,6 +1155,52 @@ fn get_proof_targets(
targets
}

/// Iterator over proof targets chunks.
///
/// Each chunk will have at most [`MULTIPROOF_ACCOUNTS_CHUNK_SIZE`] accounts and
/// [`MULTIPROOF_STORAGES_CHUNK_SIZE`] storage slots per account.
///
/// This iterator will yield items of type [`Vec<B256Map<B256Set>>`], with each mapping having a
/// maximum length of [`MULTIPROOF_ACCOUNTS_CHUNK_SIZE`], and each mapping value having a maximum
/// length of [`MULTIPROOF_STORAGES_CHUNK_SIZE`].
struct ChunkedProofTargets {
proof_targets: MultiProofTargets,
}

impl ChunkedProofTargets {
fn new(proof_targets: MultiProofTargets) -> Self {
Self { proof_targets }
}
}

impl Iterator for ChunkedProofTargets {
type Item = Vec<B256Map<B256Set>>;

fn next(&mut self) -> Option<Self::Item> {
if self.proof_targets.is_empty() {
return None;
}

let mut chunks = vec![B256Map::<B256Set>::default(); 1];

let accounts_chunk: B256Map<B256Set> =
self.proof_targets.drain().take(MULTIPROOF_ACCOUNTS_CHUNK_SIZE).collect();

for (address, storage_slots) in accounts_chunk {
let storage_chunks = storage_slots.into_iter().chunks(MULTIPROOF_STORAGES_CHUNK_SIZE);

for (i, chunk) in storage_chunks.into_iter().enumerate() {
if i >= chunks.len() {
chunks.push(B256Map::default());
}
chunks[i].entry(address).or_default().extend(chunk);
}
}

Some(chunks)
}
}

/// Calculate multiproof for the targets.
#[inline]
fn calculate_multiproof<Factory>(
Expand Down
Loading