diff --git a/node/actors/bft/src/testonly/run.rs b/node/actors/bft/src/testonly/run.rs index 3effb94f..102b28f4 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/actors/bft/src/testonly/run.rs @@ -1,7 +1,7 @@ use super::{Behavior, Node}; use anyhow::bail; use network::{io, Config}; -use rand::prelude::SliceRandom; +use rand::seq::SliceRandom; use std::{ collections::{HashMap, HashSet}, sync::Arc, @@ -15,9 +15,7 @@ use zksync_concurrency::{ oneshot, scope, }; use zksync_consensus_network as network; -use zksync_consensus_roles::validator::{ - self, BlockNumber, CommitQC, ConsensusMsg, Genesis, PublicKey, -}; +use zksync_consensus_roles::validator; use zksync_consensus_storage::{testonly::new_store, BlockStore}; use zksync_consensus_utils::pipe; @@ -61,7 +59,7 @@ impl Test { &self, ctx: &ctx::Ctx, nets: Vec, - genesis: &Genesis, + genesis: &validator::Genesis, ) -> anyhow::Result<()> { let mut nodes = vec![]; let mut honest = vec![]; @@ -92,8 +90,7 @@ impl Test { // Check that the stored blocks are consistent. for i in 0..self.blocks_to_finalize as u64 { let i = first + i; - // Only comparing the payload; the signatories might be different, - // at least with the simulated gossip of the twins network. + // Only comparing the payload; the justification might be different. let want = honest[0] .block(ctx, i) .await? @@ -275,8 +272,10 @@ async fn run_nodes_twins( splits, validator_ports, sends, - &gossip_targets[&port], - gossip_send, + TwinsGossipConfig { + targets: &gossip_targets[&port], + send: gossip_send, + }, port, recv, ) @@ -297,14 +296,12 @@ async fn run_nodes_twins( /// We have to simulate the gossip layer which isn't instantiated by these tests. /// If we don't, then if a replica misses a LeaderPrepare message it won't ever get the payload /// and won't be able to finalize the block, and won't participate further in the consensus. -#[allow(clippy::too_many_arguments)] async fn twins_receive_loop( ctx: &ctx::Ctx, splits: &PortSplitSchedule, - validator_ports: &HashMap>, + validator_ports: &HashMap>, sends: &HashMap>, - gossip_targets: &HashSet, - gossip_send: UnboundedSender<(Port, Port, BlockNumber)>, + gossip: TwinsGossipConfig<'_>, port: Port, mut recv: UnboundedReceiver, ) -> anyhow::Result<()> { @@ -313,7 +310,7 @@ async fn twins_receive_loop( // Finalized block number iff this node can gossip to the target and the message contains a QC. let block_to_gossip = |target_port: Port, msg: &io::OutputMessage| { - if !gossip_targets.contains(&target_port) { + if !gossip.targets.contains(&target_port) { return None; } output_msg_commit_qc(msg).map(|qc| qc.header().number) @@ -347,32 +344,41 @@ async fn twins_receive_loop( let view = output_msg_view_number(&msg); let kind = output_msg_label(&msg); - if can_send { - let s = &sends[&target_port]; + // Remove any previously stashed message of the same kind, because the network will only + // try to send the last one of each, not all pending messages. + stash.retain(|stashed| output_msg_label(stashed) != kind); - // Send after taking note of potentially gossipable blocks. - let send = |msg| { - if let Some(bn) = block_to_gossip(target_port, &msg) { - gossip_send.send((port, target_port, bn)); - } - s.send(msg); - }; + if !can_send { + tracing::info!(" VVV stashed view={view} from={port} to={target_port} kind={kind}"); + stash.push(msg); + return; + } - // Messages can be delivered in arbitrary order. - stash.shuffle(rng); + let s = &sends[&target_port]; - for unstashed in stash.drain(0..) { - let view = output_msg_view_number(&unstashed); - let kind = output_msg_label(&unstashed); - eprintln!(" ^^^ unstashed view={view} from={port} to={target_port} kind={kind}"); - send(unstashed); + // Send after taking note of potentially gossipable blocks. + let send = |msg| { + if let Some(number) = block_to_gossip(target_port, &msg) { + gossip.send.send(TwinsGossipMessage { + from: port, + to: target_port, + number, + }); } - eprintln!(" >>> sending view={view} from={port} to={target_port} kind={kind}"); - send(msg); - } else { - eprintln!(" VVV stashed view={view} from={port} to={target_port} kind={kind}"); - stash.push(msg); + s.send(msg); + }; + + // Messages can be delivered in arbitrary order. + stash.shuffle(rng); + + for unstashed in stash.drain(0..) { + let view = output_msg_view_number(&unstashed); + let kind = output_msg_label(&unstashed); + tracing::info!(" ^^^ unstashed view={view} from={port} to={target_port} kind={kind}"); + send(unstashed); } + tracing::info!(" >>> sending view={view} from={port} to={target_port} kind={kind}"); + send(msg); }; while let Ok(io::InputMessage::Consensus(message)) = recv.recv(ctx).await { @@ -398,7 +404,7 @@ async fn twins_receive_loop( match message.recipient { io::Target::Broadcast => match partitions_opt { None => { - eprintln!("broadcasting view={view_number} from={port} target=all"); + tracing::info!("broadcasting view={view_number} from={port} target=all"); for target_port in sends.keys() { send_or_stash(true, *target_port, msg()); } @@ -406,7 +412,7 @@ async fn twins_receive_loop( Some(ps) => { for p in ps { let can_send = p.contains(&port); - eprintln!("broadcasting view={view_number} from={port} target={p:?} can_send={can_send} t={}", start.elapsed().as_secs()); + tracing::info!("broadcasting view={view_number} from={port} target={p:?} can_send={can_send} t={}", start.elapsed().as_secs()); for target_port in p { send_or_stash(can_send, *target_port, msg()); } @@ -419,7 +425,7 @@ async fn twins_receive_loop( match partitions_opt { None => { for target_port in target_ports { - eprintln!( + tracing::info!( "unicasting view={view_number} from={port} target={target_port}" ); send_or_stash(true, *target_port, msg()); @@ -430,7 +436,7 @@ async fn twins_receive_loop( let can_send = p.contains(&port); for target_port in target_ports { if p.contains(target_port) { - eprintln!("unicasting view={view_number} from={port} target={target_port } can_send={can_send} t={}", start.elapsed().as_secs()); + tracing::info!("unicasting view={view_number} from={port} target={target_port } can_send={can_send} t={}", start.elapsed().as_secs()); send_or_stash(can_send, *target_port, msg()); } } @@ -454,10 +460,15 @@ async fn twins_receive_loop( async fn twins_gossip_loop( ctx: &ctx::Ctx, stores: &HashMap>, - mut recv: UnboundedReceiver<(Port, Port, BlockNumber)>, + mut recv: UnboundedReceiver, ) -> anyhow::Result<()> { scope::run!(ctx, |ctx, s| async move { - while let Ok((from, to, mut number)) = recv.recv(ctx).await { + while let Ok(TwinsGossipMessage { + from, + to, + mut number, + }) = recv.recv(ctx).await + { let local_store = &stores[&from]; let remote_store = &stores[&to]; let first_needed = remote_store.queued().next(); @@ -492,9 +503,9 @@ async fn twins_gossip_loop( .await } -fn output_msg_view_number(msg: &io::OutputMessage) -> u64 { +fn output_msg_view_number(msg: &io::OutputMessage) -> validator::ViewNumber { match msg { - io::OutputMessage::Consensus(cr) => cr.msg.msg.view().number.0, + io::OutputMessage::Consensus(cr) => cr.msg.msg.view().number, } } @@ -504,7 +515,8 @@ fn output_msg_label(msg: &io::OutputMessage) -> &str { } } -fn output_msg_commit_qc(msg: &io::OutputMessage) -> Option<&CommitQC> { +fn output_msg_commit_qc(msg: &io::OutputMessage) -> Option<&validator::CommitQC> { + use validator::ConsensusMsg; match msg { io::OutputMessage::Consensus(cr) => match &cr.msg.msg { ConsensusMsg::ReplicaPrepare(rp) => rp.high_qc.as_ref(), @@ -514,3 +526,16 @@ fn output_msg_commit_qc(msg: &io::OutputMessage) -> Option<&CommitQC> { }, } } + +struct TwinsGossipMessage { + from: Port, + to: Port, + number: validator::BlockNumber, +} + +struct TwinsGossipConfig<'a> { + /// Ports to which this node should gossip to. + targets: &'a HashSet, + /// Channel over which to send gossip messages. + send: UnboundedSender, +} diff --git a/node/actors/bft/src/tests.rs b/node/actors/bft/src/tests.rs index 2c09e9d0..36d57fbd 100644 --- a/node/actors/bft/src/tests.rs +++ b/node/actors/bft/src/tests.rs @@ -363,7 +363,7 @@ async fn run_twins( }) .collect(); - eprintln!( + tracing::info!( "num_replicas={num_replicas} num_twins={num_twins} num_nodes={} scenario={i}", cluster.num_nodes() ); @@ -387,7 +387,7 @@ async fn run_twins( .iter() .all(|s| *s < cluster.quorum_size()); - eprintln!("round={r} partitions={partitions:?} leaders={leader_ports:?} leader_partition_sizes={leader_partition_sizes:?} leader_isolated={leader_isolated}"); + tracing::debug!("round={r} partitions={partitions:?} leaders={leader_ports:?} leader_partition_sizes={leader_partition_sizes:?} leader_isolated={leader_isolated}"); } Test { diff --git a/node/actors/network/src/testonly.rs b/node/actors/network/src/testonly.rs index e7d452e6..897d1354 100644 --- a/node/actors/network/src/testonly.rs +++ b/node/actors/network/src/testonly.rs @@ -13,10 +13,7 @@ use std::{ sync::Arc, }; use zksync_concurrency::{ctx, ctx::channel, io, limiter, net, scope, sync}; -use zksync_consensus_roles::{ - node, - validator::{self, SecretKey}, -}; +use zksync_consensus_roles::{node, validator}; use zksync_consensus_storage::BlockStore; use zksync_consensus_utils::pipe; @@ -92,7 +89,7 @@ pub fn new_configs_for_validators<'a, I>( gossip_peers: usize, ) -> Vec where - I: Iterator, + I: Iterator, { let configs = validator_keys.map(|validator_key| { let addr = net::tcp::testonly::reserve_listener();