Skip to content

Commit

Permalink
Merge pull request #2503 from eqlabs/chris/flaky-sync-test
Browse files Browse the repository at this point in the history
flaky p2p sync test
  • Loading branch information
CHr15F0x authored Jan 21, 2025
2 parents fa95959 + 75b371f commit 243842b
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 42 deletions.
8 changes: 8 additions & 0 deletions crates/common/src/state_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,14 @@ impl DeclaredClasses {
}
}

#[derive(Debug, thiserror::Error)]
pub enum StateUpdateError {
#[error("Contract class hash missing for contract {0}")]
ContractClassHashMissing(ContractAddress),
#[error(transparent)]
StorageError(#[from] anyhow::Error),
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
11 changes: 3 additions & 8 deletions crates/merkle-tree/src/contract_state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::Context;
use pathfinder_common::state_update::{ReverseContractUpdate, StorageRef};
use pathfinder_common::state_update::{ReverseContractUpdate, StateUpdateError, StorageRef};
use pathfinder_common::{
BlockNumber,
ClassHash,
Expand Down Expand Up @@ -55,7 +55,7 @@ pub fn update_contract_state(
transaction: &Transaction<'_>,
verify_hashes: bool,
block: BlockNumber,
) -> anyhow::Result<ContractStateUpdateResult> {
) -> Result<ContractStateUpdateResult, StateUpdateError> {
// Load the contract tree and insert the updates.
let (new_root, trie_update) = if !updates.is_empty() {
let mut contract_tree = match block.parent() {
Expand Down Expand Up @@ -95,12 +95,7 @@ pub fn update_contract_state(
transaction
.contract_class_hash(block.into(), contract_address)
.context("Querying contract's class hash")?
.with_context(|| {
format!(
"Contract's class hash is missing, block: {block}, contract_address: \
{contract_address}"
)
})?
.ok_or(StateUpdateError::ContractClassHashMissing(contract_address))?
};

let nonce = if let Some(nonce) = new_nonce {
Expand Down
15 changes: 9 additions & 6 deletions crates/merkle-tree/src/starknet_state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::Context;
use pathfinder_common::state_update::StateUpdateRef;
use pathfinder_common::state_update::{StateUpdateError, StateUpdateRef};
use pathfinder_common::{BlockNumber, ClassCommitment, StorageCommitment};
use pathfinder_storage::{Storage, Transaction};

Expand All @@ -14,7 +14,7 @@ pub fn update_starknet_state(
// we need this so that we can create extra read-only transactions for
// parallel contract state updates
storage: Storage,
) -> anyhow::Result<(StorageCommitment, ClassCommitment)> {
) -> Result<(StorageCommitment, ClassCommitment), StateUpdateError> {
use rayon::prelude::*;

let mut storage_commitment_tree = match block.parent() {
Expand All @@ -36,10 +36,13 @@ pub fn update_starknet_state(
|connection, (contract_address, update)| {
let connection = match connection {
Ok(connection) => connection,
Err(e) => anyhow::bail!(
"Failed to create database connection in rayon thread: {}",
e
),
Err(e) => {
return Err(anyhow::anyhow!(
"Failed to create database connection in rayon thread: {}",
e
)
.into())
}
};
let transaction = connection.transaction()?;
update_contract_state(
Expand Down
115 changes: 90 additions & 25 deletions crates/pathfinder/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ where
continue_from
}
Err(SyncError::Fatal(mut error)) => {
tracing::error!(%error, "Stopping checkpoint sync");
tracing::error!(?error, "Stopping checkpoint sync");
return Err(error.take_or_deep_clone());
}
Err(error) => {
Expand Down Expand Up @@ -204,7 +204,7 @@ where
match result {
Ok(_) => tracing::debug!("Restarting track sync: unexpected end of Block stream"),
Err(SyncError::Fatal(mut error)) => {
tracing::error!(%error, "Stopping track sync");
tracing::error!(?error, "Stopping track sync");
return Err(error.take_or_deep_clone());
}
Err(error) => {
Expand Down Expand Up @@ -315,7 +315,7 @@ mod tests {
use p2p::libp2p::PeerId;
use pathfinder_common::event::Event;
use pathfinder_common::receipt::Receipt;
use pathfinder_common::state_update::StateUpdateData;
use pathfinder_common::state_update::{self, StateUpdateData};
use pathfinder_common::transaction::Transaction;
use pathfinder_common::{
BlockHeader,
Expand Down Expand Up @@ -376,27 +376,49 @@ mod tests {
(public_key, blocks)
}

async fn sync_done_watch(storage: Storage, expected_last: BlockNumber) {
async fn sync_done_watch(
mut last_event_rx: tokio::sync::mpsc::Receiver<()>,
storage: Storage,
expected_last: BlockNumber,
) {
// Don't poll the DB until the last event is emitted from the fake P2P client
last_event_rx.recv().await.unwrap();

let mut interval = tokio::time::interval_at(
// Give sync some slack to process the last event and commit the last block
tokio::time::Instant::now() + Duration::from_millis(500),
Duration::from_millis(200),
);

let mut start = std::time::Instant::now();
tokio::task::spawn_blocking(move || loop {
std::thread::sleep(Duration::from_millis(200));
let mut db = storage.connection().unwrap();
let db = db.transaction().unwrap();
let header = db.block_header(expected_last.into()).unwrap();
if let Some(header) = header {
let after = start.elapsed();
if after > TIMEOUT {
break;
}

if header.number == expected_last {
tracing::info!(?after, "Sync done");
break;
loop {
interval.tick().await;
let storage = storage.clone();

let done = tokio::task::spawn_blocking(move || {
let mut db = storage.connection().unwrap();
let db = db.transaction().unwrap();
// We don't have to query the entire block, as tracking sync commits entire
// blocks to the DB, so if the header is there, the block is there
let header = db.block_header(expected_last.into()).unwrap();
if let Some(header) = header {
if header.number == expected_last {
let after = start.elapsed();
tracing::info!(?after, "Sync done");
return true;
}
}

false
})
.await
.unwrap();

if done {
break;
}
})
.await
.unwrap();
}
}

#[derive(Copy, Clone, Debug)]
Expand Down Expand Up @@ -449,6 +471,8 @@ mod tests {
})]
#[test_log::test(tokio::test)]
async fn sync(#[case] error_setup: ErrorSetup) {
use futures::FutureExt;

let (public_key, blocks) = generate_fake_blocks(ALL_BLOCKS as usize);
let last_header = &blocks.last().unwrap().header.header;
let last_checkpoint_header = &blocks[LAST_IN_CHECKPOINT.get() as usize].header.header;
Expand All @@ -458,13 +482,15 @@ mod tests {
let expect_fully_synced_blocks = error_setup.expected_last_synced.is_full();

let error_trigger = ErrorTrigger::new(error_setup.fatal_at);
let (last_event_tx, mut last_event_rx) = tokio::sync::mpsc::channel(1);

let sync = Sync {
storage: storage.clone(),
p2p: FakeP2PClient {
blocks: blocks.clone(),
error_trigger: error_trigger.clone(),
storage: storage.clone(),
last_event_tx,
},
// We use `l1_checkpoint_override` instead
eth_client: EthereumClient::new("https://unused.com").unwrap(),
Expand All @@ -483,13 +509,42 @@ mod tests {
block_hash_db: None,
};

tokio::select! {
let sync_done = if error_setup.fatal_at.is_some() {
// Sync will either bail on fatal error or time out
std::future::pending().boxed()
} else {
// Successful sync never ends
sync_done_watch(last_event_rx, storage.clone(), expected_last_synced_block).boxed()
};

let bail_early = tokio::select! {
result = tokio::time::timeout(TIMEOUT, sync.run()) => match result {
Ok(Ok(())) => unreachable!("Sync does not exit upon success, sync_done_watch should have been triggered"),
Ok(Err(e)) => tracing::debug!(%e, "Sync failed with a fatal error"),
Err(_) => tracing::debug!("Test timed out"),
Ok(Err(error)) => {
let unexpected_fatal = error_setup.fatal_at.is_none();
if unexpected_fatal {
tracing::debug!(?error, "Sync failed with an unexpected fatal error");
} else {
tracing::debug!(?error, "Sync failed with a fatal error");
}
unexpected_fatal
},
Err(_) => {
tracing::debug!("Test timed out");
true
},
},
_ = sync_done_watch(storage.clone(), expected_last_synced_block) => tracing::debug!("Sync completion detected"),
_ = sync_done => {
tracing::debug!("Sync completion detected");
false
},
};

if bail_early {
blocks.iter().for_each(|b| {
tracing::error!(block=%b.header.header.number, state_update=?b.state_update.as_ref().unwrap());
});
return;
}

assert!(error_trigger.all_errors_triggered());
Expand Down Expand Up @@ -590,6 +645,7 @@ mod tests {
pub blocks: Vec<Block>,
pub error_trigger: ErrorTrigger,
pub storage: Storage,
pub last_event_tx: tokio::sync::mpsc::Sender<()>,
}

#[derive(Clone)]
Expand All @@ -606,7 +662,11 @@ mod tests {
(0..=4)
.map(|_| AtomicU64::new((0..CHECKPOINT_BLOCKS).fake()))
.chain(
(5..=9).map(|_| AtomicU64::new((CHECKPOINT_BLOCKS..ALL_BLOCKS).fake())),
// The last block is always error free to ease checking for sync
// completion
(5..=9).map(|_| {
AtomicU64::new((CHECKPOINT_BLOCKS..ALL_BLOCKS - 1).fake())
}),
)
.collect(),
)),
Expand Down Expand Up @@ -1002,6 +1062,11 @@ mod tests {
e.push(Ok(Faker.fake()));
}

if block == LAST_IN_TRACK {
let last_event_tx = self.last_event_tx.clone();
last_event_tx.send(()).await.unwrap();
}

Some((PeerId::random(), stream::iter(e)))
}
}
Expand Down
2 changes: 2 additions & 0 deletions crates/pathfinder/src/sync/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub(super) enum SyncError {
ClassDefinitionsDeclarationsMismatch(PeerId),
#[error("Class hash computation failed")]
ClassHashComputationError(PeerId),
#[error("Contract's class is missing")]
ContractClassMissing(PeerId),
#[error("Discontinuity in header chain")]
Discontinuity(PeerId),
#[error("Event commitment mismatch")]
Expand Down
11 changes: 10 additions & 1 deletion crates/pathfinder/src/sync/state_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use pathfinder_common::state_update::{
ContractClassUpdate,
ContractUpdate,
StateUpdateData,
StateUpdateError,
StateUpdateRef,
SystemContractUpdate,
};
Expand Down Expand Up @@ -285,7 +286,15 @@ pub async fn batch_update_starknet_state(
tail,
storage.clone(),
)
.context("Updating Starknet state")?;
.map_err(|error| match error {
StateUpdateError::ContractClassHashMissing(for_contract) => {
tracing::debug!(%for_contract, "Contract class hash is missing");
SyncError::ContractClassMissing(peer)
}
StateUpdateError::StorageError(error) => SyncError::Fatal(Arc::new(
error.context(format!("Updating Starknet state, tail {tail}")),
)),
})?;
let state_commitment = StateCommitment::calculate(storage_commitment, class_commitment);
let expected_state_commitment = db
.state_commitment(tail.into())
Expand Down
2 changes: 1 addition & 1 deletion crates/pathfinder/src/sync/track.rs
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ impl ProcessStage for StoreBlock {
block_number,
self.storage.clone(),
)
.context("Updating Starknet state")?;
.with_context(|| format!("Updating Starknet state, block_number {block_number}"))?;

// Ensure that roots match.
let state_commitment = StateCommitment::calculate(storage_commitment, class_commitment);
Expand Down
3 changes: 2 additions & 1 deletion crates/storage/src/fake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use pathfinder_common::receipt::Receipt;
use pathfinder_common::state_update::{
ContractClassUpdate,
ContractUpdate,
StateUpdateError,
StateUpdateRef,
SystemContractUpdate,
};
Expand Down Expand Up @@ -70,7 +71,7 @@ pub type UpdateTriesFn = Box<
bool,
BlockNumber,
Storage,
) -> anyhow::Result<(StorageCommitment, ClassCommitment)>,
) -> Result<(StorageCommitment, ClassCommitment), StateUpdateError>,
>;

pub struct Config {
Expand Down

0 comments on commit 243842b

Please sign in to comment.