Skip to content

Commit

Permalink
Add configurable block replayer (sigp#2863)
Browse files Browse the repository at this point in the history
## Issue Addressed

Successor to sigp#2431

## Proposed Changes

* Add a `BlockReplayer` struct to abstract over the intricacies of calling `per_slot_processing` and `per_block_processing` while avoiding unnecessary tree hashing.
* Add a variant of the forwards state root iterator that does not require an `end_state`.
* Use the `BlockReplayer` when reconstructing states in the database. Use the efficient forwards iterator for frozen states.
* Refactor the iterators to remove `Arc<HotColdDB>` (this seems to be neater than making _everything_ an `Arc<HotColdDB>` as I did in sigp#2431).

Supplying the state roots allow us to avoid building a tree hash cache at all when reconstructing historic states, which saves around 1 second flat (regardless of `slots-per-restore-point`). This is a small percentage of worst-case state load times with 200K validators and SPRP=2048 (~15s vs ~16s) but a significant speed-up for more frequent restore points: state loads with SPRP=32 should be now consistently <500ms instead of 1.5s (a ~3x speedup).

## Additional Info

Required by sigp#2628
  • Loading branch information
michaelsproul committed Dec 21, 2021
1 parent 56d596e commit a290a3c
Show file tree
Hide file tree
Showing 25 changed files with 955 additions and 443 deletions.
160 changes: 116 additions & 44 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ use state_processing::{
per_block_processing::{errors::AttestationValidationError, is_merge_transition_complete},
per_slot_processing,
state_advance::{complete_state_advance, partial_state_advance},
BlockSignatureStrategy, SigVerifiedOp,
BlockSignatureStrategy, SigVerifiedOp, VerifyBlockRoot,
};
use std::borrow::Cow;
use std::cmp::Ordering;
Expand Down Expand Up @@ -488,7 +488,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn forwards_iter_block_roots(
&self,
start_slot: Slot,
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>>, Error> {
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>> + '_, Error> {
let oldest_block_slot = self.store.get_oldest_block_slot();
if start_slot < oldest_block_slot {
return Err(Error::HistoricalBlockError(
Expand All @@ -501,8 +501,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

let local_head = self.head()?;

let iter = HotColdDB::forwards_block_roots_iterator(
self.store.clone(),
let iter = self.store.forwards_block_roots_iterator(
start_slot,
local_head.beacon_state,
local_head.beacon_block_root,
Expand All @@ -512,6 +511,43 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(iter.map(|result| result.map_err(Into::into)))
}

/// Even more efficient variant of `forwards_iter_block_roots` that will avoid cloning the head
/// state if it isn't required for the requested range of blocks.
pub fn forwards_iter_block_roots_until(
&self,
start_slot: Slot,
end_slot: Slot,
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>> + '_, Error> {
let oldest_block_slot = self.store.get_oldest_block_slot();
if start_slot < oldest_block_slot {
return Err(Error::HistoricalBlockError(
HistoricalBlockError::BlockOutOfRange {
slot: start_slot,
oldest_block_slot,
},
));
}

self.with_head(move |head| {
let iter = self.store.forwards_block_roots_iterator_until(
start_slot,
end_slot,
|| {
(
head.beacon_state.clone_with_only_committee_caches(),
head.beacon_block_root,
)
},
&self.spec,
)?;
Ok(iter
.map(|result| result.map_err(Into::into))
.take_while(move |result| {
result.as_ref().map_or(true, |(_, slot)| *slot <= end_slot)
}))
})
}

/// Traverse backwards from `block_root` to find the block roots of its ancestors.
///
/// ## Notes
Expand All @@ -524,14 +560,14 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn rev_iter_block_roots_from(
&self,
block_root: Hash256,
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>>, Error> {
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>> + '_, Error> {
let block = self
.get_block(&block_root)?
.ok_or(Error::MissingBeaconBlock(block_root))?;
let state = self
.get_state(&block.state_root(), Some(block.slot()))?
.ok_or_else(|| Error::MissingBeaconState(block.state_root()))?;
let iter = BlockRootsIterator::owned(self.store.clone(), state);
let iter = BlockRootsIterator::owned(&self.store, state);
Ok(std::iter::once(Ok((block_root, block.slot())))
.chain(iter)
.map(|result| result.map_err(|e| e.into())))
Expand Down Expand Up @@ -618,12 +654,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// - As this iterator starts at the `head` of the chain (viz., the best block), the first slot
/// returned may be earlier than the wall-clock slot.
pub fn rev_iter_state_roots_from<'a>(
&self,
&'a self,
state_root: Hash256,
state: &'a BeaconState<T::EthSpec>,
) -> impl Iterator<Item = Result<(Hash256, Slot), Error>> + 'a {
std::iter::once(Ok((state_root, state.slot())))
.chain(StateRootsIterator::new(self.store.clone(), state))
.chain(StateRootsIterator::new(&self.store, state))
.map(|result| result.map_err(Into::into))
}

Expand All @@ -637,11 +673,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn forwards_iter_state_roots(
&self,
start_slot: Slot,
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>>, Error> {
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>> + '_, Error> {
let local_head = self.head()?;

let iter = HotColdDB::forwards_state_roots_iterator(
self.store.clone(),
let iter = self.store.forwards_state_roots_iterator(
start_slot,
local_head.beacon_state_root(),
local_head.beacon_state,
Expand All @@ -651,6 +686,36 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(iter.map(|result| result.map_err(Into::into)))
}

/// Super-efficient forwards state roots iterator that avoids cloning the head if the state
/// roots lie entirely within the freezer database.
///
/// The iterator returned will include roots for `start_slot..=end_slot`, i.e. it
/// is endpoint inclusive.
pub fn forwards_iter_state_roots_until(
&self,
start_slot: Slot,
end_slot: Slot,
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>> + '_, Error> {
self.with_head(move |head| {
let iter = self.store.forwards_state_roots_iterator_until(
start_slot,
end_slot,
|| {
(
head.beacon_state.clone_with_only_committee_caches(),
head.beacon_state_root(),
)
},
&self.spec,
)?;
Ok(iter
.map(|result| result.map_err(Into::into))
.take_while(move |result| {
result.as_ref().map_or(true, |(_, slot)| *slot <= end_slot)
}))
})
}

/// Returns the block at the given slot, if any. Only returns blocks in the canonical chain.
///
/// Use the `skips` parameter to define the behaviour when `request_slot` is a skipped slot.
Expand Down Expand Up @@ -708,18 +773,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Ok(Some(root));
}

process_results(self.forwards_iter_state_roots(request_slot)?, |mut iter| {
if let Some((root, slot)) = iter.next() {
if slot == request_slot {
Ok(Some(root))
process_results(
self.forwards_iter_state_roots_until(request_slot, request_slot)?,
|mut iter| {
if let Some((root, slot)) = iter.next() {
if slot == request_slot {
Ok(Some(root))
} else {
// Sanity check.
Err(Error::InconsistentForwardsIter { request_slot, slot })
}
} else {
// Sanity check.
Err(Error::InconsistentForwardsIter { request_slot, slot })
Ok(None)
}
} else {
Ok(None)
}
})?
},
)?
}

/// Returns the block root at the given slot, if any. Only returns roots in the canonical chain.
Expand Down Expand Up @@ -790,11 +858,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Ok(root_opt);
}

if let Some(((prev_root, _), (curr_root, curr_slot))) =
process_results(self.forwards_iter_block_roots(prev_slot)?, |iter| {
iter.tuple_windows().next()
})?
{
if let Some(((prev_root, _), (curr_root, curr_slot))) = process_results(
self.forwards_iter_block_roots_until(prev_slot, request_slot)?,
|iter| iter.tuple_windows().next(),
)? {
// Sanity check.
if curr_slot != request_slot {
return Err(Error::InconsistentForwardsIter {
Expand Down Expand Up @@ -842,18 +909,21 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
return Ok(Some(root));
}

process_results(self.forwards_iter_block_roots(request_slot)?, |mut iter| {
if let Some((root, slot)) = iter.next() {
if slot == request_slot {
Ok(Some(root))
process_results(
self.forwards_iter_block_roots_until(request_slot, request_slot)?,
|mut iter| {
if let Some((root, slot)) = iter.next() {
if slot == request_slot {
Ok(Some(root))
} else {
// Sanity check.
Err(Error::InconsistentForwardsIter { request_slot, slot })
}
} else {
// Sanity check.
Err(Error::InconsistentForwardsIter { request_slot, slot })
Ok(None)
}
} else {
Ok(None)
}
})?
},
)?
}

/// Returns the block at the given root, if any.
Expand Down Expand Up @@ -1112,12 +1182,13 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(state)
}
Ordering::Less => {
let state_root = process_results(self.forwards_iter_state_roots(slot)?, |iter| {
iter.take_while(|(_, current_slot)| *current_slot >= slot)
.find(|(_, current_slot)| *current_slot == slot)
.map(|(root, _slot)| root)
})?
.ok_or(Error::NoStateForSlot(slot))?;
let state_root =
process_results(self.forwards_iter_state_roots_until(slot, slot)?, |iter| {
iter.take_while(|(_, current_slot)| *current_slot >= slot)
.find(|(_, current_slot)| *current_slot == slot)
.map(|(root, _slot)| root)
})?
.ok_or(Error::NoStateForSlot(slot))?;

Ok(self
.get_state(&state_root, Some(slot))?
Expand Down Expand Up @@ -1256,7 +1327,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
beacon_block_root: Hash256,
state: &BeaconState<T::EthSpec>,
) -> Result<Option<Hash256>, Error> {
let iter = BlockRootsIterator::new(self.store.clone(), state);
let iter = BlockRootsIterator::new(&self.store, state);
let iter_with_head = std::iter::once(Ok((beacon_block_root, state.slot())))
.chain(iter)
.map(|result| result.map_err(|e| e.into()));
Expand Down Expand Up @@ -2983,6 +3054,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&block,
None,
BlockSignatureStrategy::VerifyRandao,
VerifyBlockRoot::True,
&self.spec,
)?;
drop(process_timer);
Expand Down Expand Up @@ -3324,7 +3396,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
.epoch
.start_slot(T::EthSpec::slots_per_epoch());
let new_finalized_state_root = process_results(
StateRootsIterator::new(self.store.clone(), &head.beacon_state),
StateRootsIterator::new(&self.store, &head.beacon_state),
|mut iter| {
iter.find_map(|(state_root, slot)| {
if slot == new_finalized_slot {
Expand Down
3 changes: 2 additions & 1 deletion beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ use state_processing::{
block_signature_verifier::{BlockSignatureVerifier, Error as BlockSignatureVerifierError},
per_block_processing, per_slot_processing,
state_advance::partial_state_advance,
BlockProcessingError, BlockSignatureStrategy, SlotProcessingError,
BlockProcessingError, BlockSignatureStrategy, SlotProcessingError, VerifyBlockRoot,
};
use std::borrow::Cow;
use std::fs;
Expand Down Expand Up @@ -1185,6 +1185,7 @@ impl<'a, T: BeaconChainTypes> FullyVerifiedBlock<'a, T> {
Some(block_root),
// Signatures were verified earlier in this function.
BlockSignatureStrategy::NoVerification,
VerifyBlockRoot::True,
&chain.spec,
) {
match err {
Expand Down
4 changes: 3 additions & 1 deletion beacon_node/beacon_chain/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use state_processing::{
},
signature_sets::Error as SignatureSetError,
state_advance::Error as StateAdvanceError,
BlockProcessingError, SlotProcessingError,
BlockProcessingError, BlockReplayError, SlotProcessingError,
};
use std::time::Duration;
use task_executor::ShutdownReason;
Expand Down Expand Up @@ -86,6 +86,7 @@ pub enum BeaconChainError {
ValidatorPubkeyCacheIncomplete(usize),
SignatureSetError(SignatureSetError),
BlockSignatureVerifierError(state_processing::block_signature_verifier::Error),
BlockReplayError(BlockReplayError),
DuplicateValidatorPublicKey,
ValidatorPubkeyCacheFileError(String),
ValidatorIndexUnknown(usize),
Expand Down Expand Up @@ -160,6 +161,7 @@ easy_from_to!(ArithError, BeaconChainError);
easy_from_to!(ForkChoiceStoreError, BeaconChainError);
easy_from_to!(HistoricalBlockError, BeaconChainError);
easy_from_to!(StateAdvanceError, BeaconChainError);
easy_from_to!(BlockReplayError, BeaconChainError);

#[derive(Debug)]
pub enum BlockProductionError {
Expand Down
5 changes: 4 additions & 1 deletion beacon_node/beacon_chain/src/fork_revert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use fork_choice::{ForkChoice, PayloadVerificationStatus};
use itertools::process_results;
use slog::{info, warn, Logger};
use state_processing::state_advance::complete_state_advance;
use state_processing::{per_block_processing, per_block_processing::BlockSignatureStrategy};
use state_processing::{
per_block_processing, per_block_processing::BlockSignatureStrategy, VerifyBlockRoot,
};
use std::sync::Arc;
use std::time::Duration;
use store::{iter::ParentRootBlockIterator, HotColdDB, ItemStore};
Expand Down Expand Up @@ -161,6 +163,7 @@ pub fn reset_fork_choice_to_finalization<E: EthSpec, Hot: ItemStore<E>, Cold: It
&block,
None,
BlockSignatureStrategy::NoVerification,
VerifyBlockRoot::True,
spec,
)
.map_err(|e| format!("Error replaying block: {:?}", e))?;
Expand Down
14 changes: 6 additions & 8 deletions beacon_node/beacon_chain/src/migrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,13 +360,11 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho
new_finalized_slot,
(new_finalized_block_hash, new_finalized_state_hash),
)))
.chain(
RootsIterator::new(store.clone(), new_finalized_state).map(|res| {
res.map(|(block_root, state_root, slot)| {
(slot, (block_root.into(), state_root.into()))
})
}),
)
.chain(RootsIterator::new(&store, new_finalized_state).map(|res| {
res.map(|(block_root, state_root, slot)| {
(slot, (block_root.into(), state_root.into()))
})
}))
.take_while(|res| {
res.as_ref()
.map_or(true, |(slot, _)| *slot >= old_finalized_slot)
Expand Down Expand Up @@ -416,7 +414,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> BackgroundMigrator<E, Ho

// Iterate backwards from this head, staging blocks and states for deletion.
let iter = std::iter::once(Ok((head_hash, head_state_root, head_slot)))
.chain(RootsIterator::from_block(store.clone(), head_hash)?);
.chain(RootsIterator::from_block(&store, head_hash)?);

for maybe_tuple in iter {
let (block_root, state_root, slot) = maybe_tuple?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ fn map_relevant_epochs_to_roots<T: BeaconChainTypes>(

// Iterate backwards from the given `head_root` and `head_slot` and find the block root at each epoch.
let mut iter = std::iter::once(Ok((head_root, head_slot)))
.chain(BlockRootsIterator::from_block(db, head_root).map_err(|e| format!("{:?}", e))?);
.chain(BlockRootsIterator::from_block(&db, head_root).map_err(|e| format!("{:?}", e))?);
let mut roots_by_epoch = HashMap::new();
for epoch in relevant_epochs {
let start_slot = epoch.start_slot(T::EthSpec::slots_per_epoch());
Expand Down
Loading

0 comments on commit a290a3c

Please sign in to comment.