Skip to content

Commit

Permalink
BFT-466: LeaderPrepare wait for previous block deadlock (#128)
Browse files Browse the repository at this point in the history
## What ❔

- [x] Create a test to artificially create a situation in which some
nodes have the proposal payload but not the CommitQC that would finalize
it and allow it to be gossiped, while others have the HighQC received as
part of a new proposal, but they don't have the previous payload and
wait for it to be gossiped to them, creating a deadlock and stalling
consensus.
- [x] The solution is to timeout waiting for the block to appear in the
store and move on to the new view, broadcast the HighQC as part of the
ReplicaPrepare, which should allow the others to finalise the block.

## Why ❔

To deal with the loss of liveness caused by multiple leaders who owned
both the payload and the CommitQC going down, and further replicas stop
participating in consensus until they have the previous payload which
never comes.
  • Loading branch information
aakoshh authored Jun 14, 2024
1 parent 3b7e779 commit a1636b8
Show file tree
Hide file tree
Showing 5 changed files with 275 additions and 75 deletions.
7 changes: 6 additions & 1 deletion node/actors/bft/src/leader/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,16 @@ impl StateMachine {
Err(err) => {
match err {
super::replica_prepare::Error::Internal(e) => {
tracing::error!(
"process_replica_prepare: internal error: {e:#}"
);

return Err(e);
}
super::replica_prepare::Error::Old { .. }
| super::replica_prepare::Error::NotLeaderInView => {
tracing::info!("process_replica_prepare: {err:#}");
// It's broadcasted now, so everyone gets it.
tracing::debug!("process_replica_prepare: {err:#}");
}
_ => {
tracing::warn!("process_replica_prepare: {err:#}");
Expand Down
24 changes: 16 additions & 8 deletions node/actors/bft/src/replica/leader_prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
use super::StateMachine;
use zksync_concurrency::{ctx, error::Wrap};
use zksync_consensus_network::io::{ConsensusInputMessage, Target};
use zksync_consensus_roles::validator;
use zksync_consensus_roles::validator::{self, BlockNumber};

/// Errors that can occur when processing a "leader prepare" message.
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -45,6 +45,12 @@ pub(crate) enum Error {
/// Invalid payload.
#[error("invalid payload: {0:#}")]
ProposalInvalidPayload(#[source] anyhow::Error),
/// Previous payload missing.
#[error("previous block proposal payload missing from store (block number: {prev_number})")]
MissingPreviousPayload {
/// The number of the missing block
prev_number: BlockNumber,
},
/// Internal error. Unlike other error types, this one isn't supposed to be easily recoverable.
#[error(transparent)]
Internal(#[from] ctx::Error),
Expand Down Expand Up @@ -107,8 +113,15 @@ impl StateMachine {
message
.verify(self.config.genesis())
.map_err(Error::InvalidMessage)?;

let high_qc = message.justification.high_qc();

if let Some(high_qc) = high_qc {
// Try to create a finalized block with this CommitQC and our block proposal cache.
// This gives us another chance to finalize a block that we may have missed before.
self.save_block(ctx, high_qc).await.wrap("save_block()")?;
}

// Check that the payload doesn't exceed the maximum size.
if let Some(payload) = &message.proposal_payload {
if payload.0.len() > self.config.max_payload_size {
Expand All @@ -121,9 +134,9 @@ impl StateMachine {
// Defensively assume that PayloadManager cannot verify proposal until the previous block is stored.
self.config
.block_store
.wait_until_persisted(ctx, prev)
.wait_until_persisted(&ctx.with_deadline(self.timeout_deadline), prev)
.await
.map_err(ctx::Error::Canceled)?;
.map_err(|_| Error::MissingPreviousPayload { prev_number: prev })?;
}
if let Err(err) = self
.config
Expand All @@ -150,11 +163,6 @@ impl StateMachine {
self.view = message.view().number;
self.phase = validator::Phase::Commit;
self.high_vote = Some(commit_vote.clone());
if let Some(high_qc) = high_qc {
// Try to create a finalized block with this CommitQC and our block proposal cache.
// This gives us another chance to finalize a block that we may have missed before.
self.save_block(ctx, high_qc).await.wrap("save_block()")?;
}
// If we received a new block proposal, store it in our cache.
if let Some(payload) = &message.proposal_payload {
self.block_proposal_cache
Expand Down
9 changes: 8 additions & 1 deletion node/actors/bft/src/replica/state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,13 @@ impl StateMachine {
Err(err) => {
match err {
super::replica_prepare::Error::Internal(e) => {
tracing::error!(
"process_replica_prepare: internal error: {e:#}"
);
return Err(e);
}
super::replica_prepare::Error::Old { .. } => {
tracing::info!("process_replica_prepare: {err:#}");
tracing::debug!("process_replica_prepare: {err:#}");
}
_ => {
tracing::warn!("process_replica_prepare: {err:#}");
Expand All @@ -140,6 +143,9 @@ impl StateMachine {
Err(err) => {
match err {
super::leader_prepare::Error::Internal(e) => {
tracing::error!(
"process_leader_prepare: internal error: {e:#}"
);
return Err(e);
}
super::leader_prepare::Error::Old { .. } => {
Expand All @@ -164,6 +170,7 @@ impl StateMachine {
Err(err) => {
match err {
super::leader_commit::Error::Internal(e) => {
tracing::error!("process_leader_commit: internal error: {e:#}");
return Err(e);
}
super::leader_commit::Error::Old { .. } => {
Expand Down
127 changes: 65 additions & 62 deletions node/actors/bft/src/testonly/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ use zksync_consensus_roles::validator;
use zksync_consensus_storage::{testonly::new_store, BlockStore};
use zksync_consensus_utils::pipe;

#[derive(Clone)]
pub(crate) enum Network {
Real,
Mock,
Twins(PortSplitSchedule),
Twins(PortRouter),
}

/// Number of phases within a view for which we consider different partitions.
Expand All @@ -46,9 +45,46 @@ pub(crate) type PortPartition = HashSet<Port>;
pub(crate) type PortSplit = Vec<PortPartition>;
/// A schedule contains a list of splits (one for each phase) for every view.
pub(crate) type PortSplitSchedule = Vec<[PortSplit; NUM_PHASES]>;
/// Function to decide whether a message can go from a source to a target port.
pub(crate) type PortRouterFn = dyn Fn(&validator::ConsensusMsg, Port, Port) -> Option<bool> + Sync;

/// A predicate to gover who can communicate to whom a given message.
pub(crate) enum PortRouter {
/// List of port splits for each view/phase, where ports in the same partition can send any message to each other.
Splits(PortSplitSchedule),
/// Custom routing function which can take closer control of which message can be sent in which direction,
/// in order to reenact particular edge cases.
Custom(Box<PortRouterFn>),
}

impl PortRouter {
/// Decide whether a message can be sent from a sender to a target in the given view/phase.
///
/// Returning `None` means the there was no more routing data and the test can decide to
/// allow all communication or to abort a runaway test.
fn can_send(&self, msg: &validator::ConsensusMsg, from: Port, to: Port) -> Option<bool> {
match self {
PortRouter::Splits(splits) => {
// Here we assume that all instances start from view 0 in the tests.
// If the view is higher than what we have planned for, assume no partitions.
// Every node is guaranteed to be present in only one partition.
let view_number = msg.view().number.0 as usize;
let phase_number = msg_phase_number(msg);
splits
.get(view_number)
.and_then(|ps| ps.get(phase_number))
.map(|partitions| {
partitions
.iter()
.any(|p| p.contains(&from) && p.contains(&to))
})
}
PortRouter::Custom(f) => f(msg, from, to),
}
}
}

/// Config for the test. Determines the parameters to run the test with.
#[derive(Clone)]
pub(crate) struct Test {
pub(crate) network: Network,
pub(crate) nodes: Vec<(Behavior, u64)>,
Expand Down Expand Up @@ -124,7 +160,7 @@ async fn run_nodes(ctx: &ctx::Ctx, network: &Network, specs: &[Node]) -> anyhow:
match network {
Network::Real => run_nodes_real(ctx, specs).await,
Network::Mock => run_nodes_mock(ctx, specs).await,
Network::Twins(splits) => run_nodes_twins(ctx, specs, splits).await,
Network::Twins(router) => run_nodes_twins(ctx, specs, router).await,
}
}

Expand Down Expand Up @@ -210,7 +246,7 @@ async fn run_nodes_mock(ctx: &ctx::Ctx, specs: &[Node]) -> anyhow::Result<()> {
async fn run_nodes_twins(
ctx: &ctx::Ctx,
specs: &[Node],
splits: &PortSplitSchedule,
router: &PortRouter,
) -> anyhow::Result<()> {
scope::run!(ctx, |ctx, s| async {
// All known network ports of a validator, so that we can tell if any of
Expand Down Expand Up @@ -282,7 +318,7 @@ async fn run_nodes_twins(
s.spawn(async move {
twins_receive_loop(
ctx,
splits,
router,
validator_ports,
sends,
TwinsGossipConfig {
Expand Down Expand Up @@ -312,7 +348,7 @@ async fn run_nodes_twins(
/// and won't be able to finalize the block, and won't participate further in the consensus.
async fn twins_receive_loop(
ctx: &ctx::Ctx,
splits: &PortSplitSchedule,
router: &PortRouter,
validator_ports: &HashMap<validator::PublicKey, Vec<Port>>,
sends: &HashMap<Port, UnboundedSender<io::OutputMessage>>,
gossip: TwinsGossipConfig<'_>,
Expand Down Expand Up @@ -396,19 +432,8 @@ async fn twins_receive_loop(
};

while let Ok(io::InputMessage::Consensus(message)) = recv.recv(ctx).await {
let view_number = message.message.msg.view().number.0 as usize;
let phase_number = input_msg_phase_number(&message);
let view = message.message.msg.view().number.0 as usize;
let kind = message.message.msg.label();
// Here we assume that all instances start from view 0 in the tests.
// If the view is higher than what we have planned for, assume no partitions.
// Every node is guaranteed to be present in only one partition.
let partitions_opt = splits.get(view_number).and_then(|ps| ps.get(phase_number));

if partitions_opt.is_none() {
bail!(
"ran out of scheduled rounds; most likely cannot finalize blocks even if we go on"
);
}

let msg = || {
io::OutputMessage::Consensus(io::ConsensusReq {
Expand All @@ -417,49 +442,27 @@ async fn twins_receive_loop(
})
};

let can_send = |to| {
match router.can_send(&message.message.msg, port, to) {
Some(can_send) => Ok(can_send),
None => bail!("ran out of port schedule; we probably wouldn't finalize blocks even if we continued")
}
};

match message.recipient {
io::Target::Broadcast => match partitions_opt {
None => {
tracing::info!(
"broadcasting view={view_number} from={port} target=all kind={kind}"
);
for target_port in sends.keys() {
send_or_stash(true, *target_port, msg());
}
}
Some(ps) => {
for p in ps {
let can_send = p.contains(&port);
tracing::info!("broadcasting view={view_number} from={port} target={p:?} kind={kind} can_send={can_send}");
for target_port in p {
send_or_stash(can_send, *target_port, msg());
}
}
io::Target::Broadcast => {
tracing::info!("broadcasting view={view} from={port} kind={kind}");
for target_port in sends.keys() {
send_or_stash(can_send(*target_port)?, *target_port, msg());
}
},
io::Target::Validator(v) => {
let target_ports = &validator_ports[&v];

match partitions_opt {
None => {
for target_port in target_ports {
tracing::info!(
"unicasting view={view_number} from={port} target={target_port} kind={kind}"
);
send_or_stash(true, *target_port, msg());
}
}
Some(ps) => {
for p in ps {
let can_send = p.contains(&port);
for target_port in target_ports {
if p.contains(target_port) {
tracing::info!("unicasting view={view_number} from={port} target={target_port } kind={kind} can_send={can_send}");
send_or_stash(can_send, *target_port, msg());
}
}
}
}
}
io::Target::Validator(ref v) => {
let target_ports = &validator_ports[v];
tracing::info!(
"unicasting view={view} from={port} target={target_ports:?} kind={kind}"
);
for target_port in target_ports {
send_or_stash(can_send(*target_port)?, *target_port, msg());
}
}
}
Expand Down Expand Up @@ -554,9 +557,9 @@ fn output_msg_commit_qc(msg: &io::OutputMessage) -> Option<&validator::CommitQC>
}

/// Index of the phase in which the message appears, to decide which partitioning to apply.
fn input_msg_phase_number(msg: &io::ConsensusInputMessage) -> usize {
fn msg_phase_number(msg: &validator::ConsensusMsg) -> usize {
use validator::ConsensusMsg;
let phase = match msg.message.msg {
let phase = match msg {
ConsensusMsg::ReplicaPrepare(_) => 0,
ConsensusMsg::LeaderPrepare(_) => 0,
ConsensusMsg::ReplicaCommit(_) => 0,
Expand Down
Loading

0 comments on commit a1636b8

Please sign in to comment.