Skip to content

Commit

Permalink
BFT-465: Nits
Browse files Browse the repository at this point in the history
  • Loading branch information
aakoshh committed Jun 10, 2024
1 parent c13bcbf commit 2394350
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 51 deletions.
113 changes: 69 additions & 44 deletions node/actors/bft/src/testonly/run.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{Behavior, Node};
use anyhow::bail;
use network::{io, Config};
use rand::prelude::SliceRandom;
use rand::seq::SliceRandom;
use std::{
collections::{HashMap, HashSet},
sync::Arc,
Expand All @@ -15,9 +15,7 @@ use zksync_concurrency::{
oneshot, scope,
};
use zksync_consensus_network as network;
use zksync_consensus_roles::validator::{
self, BlockNumber, CommitQC, ConsensusMsg, Genesis, PublicKey,
};
use zksync_consensus_roles::validator;
use zksync_consensus_storage::{testonly::new_store, BlockStore};
use zksync_consensus_utils::pipe;

Expand Down Expand Up @@ -61,7 +59,7 @@ impl Test {
&self,
ctx: &ctx::Ctx,
nets: Vec<Config>,
genesis: &Genesis,
genesis: &validator::Genesis,
) -> anyhow::Result<()> {
let mut nodes = vec![];
let mut honest = vec![];
Expand Down Expand Up @@ -92,8 +90,7 @@ impl Test {
// Check that the stored blocks are consistent.
for i in 0..self.blocks_to_finalize as u64 {
let i = first + i;
// Only comparing the payload; the signatories might be different,
// at least with the simulated gossip of the twins network.
// Only comparing the payload; the justification might be different.
let want = honest[0]
.block(ctx, i)
.await?
Expand Down Expand Up @@ -275,8 +272,10 @@ async fn run_nodes_twins(
splits,
validator_ports,
sends,
&gossip_targets[&port],
gossip_send,
TwinsGossipConfig {
targets: &gossip_targets[&port],
send: gossip_send,
},
port,
recv,
)
Expand All @@ -297,14 +296,12 @@ async fn run_nodes_twins(
/// We have to simulate the gossip layer which isn't instantiated by these tests.
/// If we don't, then if a replica misses a LeaderPrepare message it won't ever get the payload
/// and won't be able to finalize the block, and won't participate further in the consensus.
#[allow(clippy::too_many_arguments)]
async fn twins_receive_loop(
ctx: &ctx::Ctx,
splits: &PortSplitSchedule,
validator_ports: &HashMap<PublicKey, Vec<Port>>,
validator_ports: &HashMap<validator::PublicKey, Vec<Port>>,
sends: &HashMap<Port, UnboundedSender<io::OutputMessage>>,
gossip_targets: &HashSet<Port>,
gossip_send: UnboundedSender<(Port, Port, BlockNumber)>,
gossip: TwinsGossipConfig<'_>,
port: Port,
mut recv: UnboundedReceiver<io::InputMessage>,
) -> anyhow::Result<()> {
Expand All @@ -313,7 +310,7 @@ async fn twins_receive_loop(

// Finalized block number iff this node can gossip to the target and the message contains a QC.
let block_to_gossip = |target_port: Port, msg: &io::OutputMessage| {
if !gossip_targets.contains(&target_port) {
if !gossip.targets.contains(&target_port) {
return None;
}
output_msg_commit_qc(msg).map(|qc| qc.header().number)
Expand Down Expand Up @@ -347,32 +344,41 @@ async fn twins_receive_loop(
let view = output_msg_view_number(&msg);
let kind = output_msg_label(&msg);

if can_send {
let s = &sends[&target_port];
// Remove any previously stashed message of the same kind, because the network will only
// try to send the last one of each, not all pending messages.
stash.retain(|stashed| output_msg_label(stashed) != kind);

// Send after taking note of potentially gossipable blocks.
let send = |msg| {
if let Some(bn) = block_to_gossip(target_port, &msg) {
gossip_send.send((port, target_port, bn));
}
s.send(msg);
};
if !can_send {
tracing::info!(" VVV stashed view={view} from={port} to={target_port} kind={kind}");
stash.push(msg);
return;
}

// Messages can be delivered in arbitrary order.
stash.shuffle(rng);
let s = &sends[&target_port];

for unstashed in stash.drain(0..) {
let view = output_msg_view_number(&unstashed);
let kind = output_msg_label(&unstashed);
eprintln!(" ^^^ unstashed view={view} from={port} to={target_port} kind={kind}");
send(unstashed);
// Send after taking note of potentially gossipable blocks.
let send = |msg| {
if let Some(number) = block_to_gossip(target_port, &msg) {
gossip.send.send(TwinsGossipMessage {
from: port,
to: target_port,
number,
});
}
eprintln!(" >>> sending view={view} from={port} to={target_port} kind={kind}");
send(msg);
} else {
eprintln!(" VVV stashed view={view} from={port} to={target_port} kind={kind}");
stash.push(msg);
s.send(msg);
};

// Messages can be delivered in arbitrary order.
stash.shuffle(rng);

for unstashed in stash.drain(0..) {
let view = output_msg_view_number(&unstashed);
let kind = output_msg_label(&unstashed);
tracing::info!(" ^^^ unstashed view={view} from={port} to={target_port} kind={kind}");
send(unstashed);
}
tracing::info!(" >>> sending view={view} from={port} to={target_port} kind={kind}");
send(msg);
};

while let Ok(io::InputMessage::Consensus(message)) = recv.recv(ctx).await {
Expand All @@ -398,15 +404,15 @@ async fn twins_receive_loop(
match message.recipient {
io::Target::Broadcast => match partitions_opt {
None => {
eprintln!("broadcasting view={view_number} from={port} target=all");
tracing::info!("broadcasting view={view_number} from={port} target=all");
for target_port in sends.keys() {
send_or_stash(true, *target_port, msg());
}
}
Some(ps) => {
for p in ps {
let can_send = p.contains(&port);
eprintln!("broadcasting view={view_number} from={port} target={p:?} can_send={can_send} t={}", start.elapsed().as_secs());
tracing::info!("broadcasting view={view_number} from={port} target={p:?} can_send={can_send} t={}", start.elapsed().as_secs());
for target_port in p {
send_or_stash(can_send, *target_port, msg());
}
Expand All @@ -419,7 +425,7 @@ async fn twins_receive_loop(
match partitions_opt {
None => {
for target_port in target_ports {
eprintln!(
tracing::info!(
"unicasting view={view_number} from={port} target={target_port}"
);
send_or_stash(true, *target_port, msg());
Expand All @@ -430,7 +436,7 @@ async fn twins_receive_loop(
let can_send = p.contains(&port);
for target_port in target_ports {
if p.contains(target_port) {
eprintln!("unicasting view={view_number} from={port} target={target_port } can_send={can_send} t={}", start.elapsed().as_secs());
tracing::info!("unicasting view={view_number} from={port} target={target_port } can_send={can_send} t={}", start.elapsed().as_secs());
send_or_stash(can_send, *target_port, msg());
}
}
Expand All @@ -454,10 +460,15 @@ async fn twins_receive_loop(
async fn twins_gossip_loop(
ctx: &ctx::Ctx,
stores: &HashMap<Port, Arc<BlockStore>>,
mut recv: UnboundedReceiver<(Port, Port, BlockNumber)>,
mut recv: UnboundedReceiver<TwinsGossipMessage>,
) -> anyhow::Result<()> {
scope::run!(ctx, |ctx, s| async move {
while let Ok((from, to, mut number)) = recv.recv(ctx).await {
while let Ok(TwinsGossipMessage {
from,
to,
mut number,
}) = recv.recv(ctx).await
{
let local_store = &stores[&from];
let remote_store = &stores[&to];
let first_needed = remote_store.queued().next();
Expand Down Expand Up @@ -492,9 +503,9 @@ async fn twins_gossip_loop(
.await
}

fn output_msg_view_number(msg: &io::OutputMessage) -> u64 {
fn output_msg_view_number(msg: &io::OutputMessage) -> validator::ViewNumber {
match msg {
io::OutputMessage::Consensus(cr) => cr.msg.msg.view().number.0,
io::OutputMessage::Consensus(cr) => cr.msg.msg.view().number,
}
}

Expand All @@ -504,7 +515,8 @@ fn output_msg_label(msg: &io::OutputMessage) -> &str {
}
}

fn output_msg_commit_qc(msg: &io::OutputMessage) -> Option<&CommitQC> {
fn output_msg_commit_qc(msg: &io::OutputMessage) -> Option<&validator::CommitQC> {
use validator::ConsensusMsg;
match msg {
io::OutputMessage::Consensus(cr) => match &cr.msg.msg {
ConsensusMsg::ReplicaPrepare(rp) => rp.high_qc.as_ref(),
Expand All @@ -514,3 +526,16 @@ fn output_msg_commit_qc(msg: &io::OutputMessage) -> Option<&CommitQC> {
},
}
}

struct TwinsGossipMessage {
from: Port,
to: Port,
number: validator::BlockNumber,
}

struct TwinsGossipConfig<'a> {
/// Ports to which this node should gossip to.
targets: &'a HashSet<Port>,
/// Channel over which to send gossip messages.
send: UnboundedSender<TwinsGossipMessage>,
}
4 changes: 2 additions & 2 deletions node/actors/bft/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ async fn run_twins(
})
.collect();

eprintln!(
tracing::info!(
"num_replicas={num_replicas} num_twins={num_twins} num_nodes={} scenario={i}",
cluster.num_nodes()
);
Expand All @@ -387,7 +387,7 @@ async fn run_twins(
.iter()
.all(|s| *s < cluster.quorum_size());

eprintln!("round={r} partitions={partitions:?} leaders={leader_ports:?} leader_partition_sizes={leader_partition_sizes:?} leader_isolated={leader_isolated}");
tracing::debug!("round={r} partitions={partitions:?} leaders={leader_ports:?} leader_partition_sizes={leader_partition_sizes:?} leader_isolated={leader_isolated}");
}

Test {
Expand Down
7 changes: 2 additions & 5 deletions node/actors/network/src/testonly.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,7 @@ use std::{
sync::Arc,
};
use zksync_concurrency::{ctx, ctx::channel, io, limiter, net, scope, sync};
use zksync_consensus_roles::{
node,
validator::{self, SecretKey},
};
use zksync_consensus_roles::{node, validator};
use zksync_consensus_storage::BlockStore;
use zksync_consensus_utils::pipe;

Expand Down Expand Up @@ -92,7 +89,7 @@ pub fn new_configs_for_validators<'a, I>(
gossip_peers: usize,
) -> Vec<Config>
where
I: Iterator<Item = &'a SecretKey>,
I: Iterator<Item = &'a validator::SecretKey>,
{
let configs = validator_keys.map(|validator_key| {
let addr = net::tcp::testonly::reserve_listener();
Expand Down

0 comments on commit 2394350

Please sign in to comment.