Skip to content

Commit

Permalink
feat(driver): Turn get_value into an asynchronous step within the s…
Browse files Browse the repository at this point in the history
…tate machine (#65)

* feat(driver): Turn `get_value` into an asynchronous step within the state machine

* chore(driver): Remove unused error variant `NoValueToPropose`, remove `Env` trait

* Use valid value if present else  get_value_and_schedule_timeout

* Remove redundant height from `RoundData`

* Include proposer for the current round in the `RoundData`

* Rename `RoundData` to `Info`

* Replace nil proposal with propose timeout

* Differentiate between propose timeout when we are proposer or not

* Check that proposer is in validator set in `Driver::get_proposer`

* Remove `NewRoundProposer` event in favor of check within the state machine (#79)

---------

Co-authored-by: Anca Zamfir <[email protected]>
Co-authored-by: Anca Zamfir <[email protected]>
  • Loading branch information
3 people authored Nov 24, 2023
1 parent c6c695e commit a755b7b
Show file tree
Hide file tree
Showing 15 changed files with 353 additions and 216 deletions.
98 changes: 45 additions & 53 deletions Code/driver/src/driver.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use malachite_round::state_machine::RoundData;
use malachite_round::state_machine::Info;

use malachite_common::{
Context, Proposal, Round, SignedVote, Timeout, TimeoutStep, Validator, ValidatorSet, Value,
Expand All @@ -12,7 +12,6 @@ use malachite_vote::keeper::VoteKeeper;
use malachite_vote::Threshold;
use malachite_vote::ThresholdParams;

use crate::env::Env as DriverEnv;
use crate::event::Event;
use crate::message::Message;
use crate::Error;
Expand All @@ -21,14 +20,12 @@ use crate::Validity;

/// Driver for the state machine of the Malachite consensus engine at a given height.
#[derive(Clone, Debug)]
pub struct Driver<Ctx, Env, PSel>
pub struct Driver<Ctx, PSel>
where
Ctx: Context,
Env: DriverEnv<Ctx>,
PSel: ProposerSelector<Ctx>,
{
pub ctx: Ctx,
pub env: Env,
pub proposer_selector: PSel,

pub address: Ctx::Address,
Expand All @@ -38,15 +35,13 @@ where
pub round_state: RoundState<Ctx>,
}

impl<Ctx, Env, PSel> Driver<Ctx, Env, PSel>
impl<Ctx, PSel> Driver<Ctx, PSel>
where
Ctx: Context,
Env: DriverEnv<Ctx>,
PSel: ProposerSelector<Ctx>,
{
pub fn new(
ctx: Ctx,
env: Env,
proposer_selector: PSel,
validator_set: Ctx::ValidatorSet,
address: Ctx::Address,
Expand All @@ -58,7 +53,6 @@ where

Self {
ctx,
env,
proposer_selector,
address,
validator_set,
Expand All @@ -75,10 +69,17 @@ where
self.round_state.round
}

async fn get_value(&self) -> Option<Ctx::Value> {
self.env
.get_value(self.height().clone(), self.round())
.await
pub fn get_proposer(&self, round: Round) -> Result<&Ctx::Validator, Error<Ctx>> {
let address = self
.proposer_selector
.select_proposer(round, &self.validator_set);

let proposer = self
.validator_set
.get_by_address(&address)
.ok_or_else(|| Error::ProposerNotFound(address))?;

Ok(proposer)
}

pub async fn execute(&mut self, msg: Event<Ctx>) -> Result<Option<Message<Ctx>>, Error<Ctx>> {
Expand All @@ -102,6 +103,10 @@ where

RoundMessage::ScheduleTimeout(timeout) => Message::ScheduleTimeout(timeout),

RoundMessage::GetValueAndScheduleTimeout(round, timeout) => {
Message::GetValueAndScheduleTimeout(round, timeout)
}

RoundMessage::Decision(value) => {
// TODO: update the state
Message::Decide(value.round, value.value)
Expand All @@ -114,14 +119,10 @@ where
async fn apply(&mut self, event: Event<Ctx>) -> Result<Option<RoundMessage<Ctx>>, Error<Ctx>> {
match event {
Event::NewRound(height, round) => self.apply_new_round(height, round).await,

Event::Proposal(proposal, validity) => {
Ok(self.apply_proposal(proposal, validity).await)
}

Event::ProposeValue(round, value) => self.apply_propose_value(round, value).await,
Event::Proposal(proposal, validity) => self.apply_proposal(proposal, validity).await,
Event::Vote(signed_vote) => self.apply_vote(signed_vote),

Event::TimeoutElapsed(timeout) => Ok(self.apply_timeout(timeout)),
Event::TimeoutElapsed(timeout) => self.apply_timeout(timeout),
}
}

Expand All @@ -132,56 +133,42 @@ where
) -> Result<Option<RoundMessage<Ctx>>, Error<Ctx>> {
self.round_state = RoundState::new(height, round);

let proposer_address = self
.proposer_selector
.select_proposer(round, &self.validator_set);

let proposer = self
.validator_set
.get_by_address(&proposer_address)
.ok_or_else(|| Error::ProposerNotFound(proposer_address.clone()))?;

let event = if proposer.address() == &self.address {
// We are the proposer
// TODO: Schedule propose timeout

let Some(value) = self.get_value().await else {
return Err(Error::NoValueToPropose);
};

RoundEvent::NewRoundProposer(value)
} else {
RoundEvent::NewRound
};
self.apply_event(round, RoundEvent::NewRound)
}

Ok(self.apply_event(round, event))
async fn apply_propose_value(
&mut self,
round: Round,
value: Ctx::Value,
) -> Result<Option<RoundMessage<Ctx>>, Error<Ctx>> {
self.apply_event(round, RoundEvent::ProposeValue(value))
}

async fn apply_proposal(
&mut self,
proposal: Ctx::Proposal,
validity: Validity,
) -> Option<RoundMessage<Ctx>> {
) -> Result<Option<RoundMessage<Ctx>>, Error<Ctx>> {
// Check that there is an ongoing round
if self.round_state.round == Round::NIL {
return None;
return Ok(None);
}

// Only process the proposal if there is no other proposal
if self.round_state.proposal.is_some() {
return None;
return Ok(None);
}

// Check that the proposal is for the current height and round
if self.round_state.height != proposal.height()
|| self.round_state.round != proposal.round()
{
return None;
return Ok(None);
}

// TODO: Document
if proposal.pol_round().is_defined() && proposal.pol_round() >= self.round_state.round {
return None;
return Ok(None);
}

// TODO: Verify proposal signature (make some of these checks part of message validation)
Expand Down Expand Up @@ -215,7 +202,7 @@ where

self.apply_event(round, event)
}
_ => None,
_ => Ok(None),
}
}

Expand Down Expand Up @@ -257,10 +244,10 @@ where
VoteMessage::SkipRound(r) => RoundEvent::SkipRound(r),
};

Ok(self.apply_event(vote_round, round_event))
self.apply_event(vote_round, round_event)
}

fn apply_timeout(&mut self, timeout: Timeout) -> Option<RoundMessage<Ctx>> {
fn apply_timeout(&mut self, timeout: Timeout) -> Result<Option<RoundMessage<Ctx>>, Error<Ctx>> {
let event = match timeout.step {
TimeoutStep::Propose => RoundEvent::TimeoutPropose,
TimeoutStep::Prevote => RoundEvent::TimeoutPrevote,
Expand All @@ -271,10 +258,15 @@ where
}

/// Apply the event, update the state.
fn apply_event(&mut self, round: Round, event: RoundEvent<Ctx>) -> Option<RoundMessage<Ctx>> {
fn apply_event(
&mut self,
event_round: Round,
event: RoundEvent<Ctx>,
) -> Result<Option<RoundMessage<Ctx>>, Error<Ctx>> {
let round_state = core::mem::take(&mut self.round_state);
let proposer = self.get_proposer(round_state.round)?;

let data = RoundData::new(round, round_state.height.clone(), &self.address);
let data = Info::new(event_round, &self.address, proposer.address());

// Multiplex the event with the round state.
let mux_event = match event {
Expand All @@ -301,6 +293,6 @@ where
self.round_state = transition.next_state;

// Return message, if any
transition.message
Ok(transition.message)
}
}
19 changes: 0 additions & 19 deletions Code/driver/src/env.rs

This file was deleted.

5 changes: 0 additions & 5 deletions Code/driver/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ pub enum Error<Ctx>
where
Ctx: Context,
{
/// No value to propose
NoValueToPropose,

/// Proposer not found
ProposerNotFound(Ctx::Address),

Expand All @@ -27,7 +24,6 @@ where
#[cfg_attr(coverage_nightly, coverage(off))]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Error::NoValueToPropose => write!(f, "No value to propose"),
Error::ProposerNotFound(addr) => write!(f, "Proposer not found: {addr}"),
Error::ValidatorNotFound(addr) => write!(f, "Validator not found: {addr}"),
Error::InvalidVoteSignature(vote, validator) => write!(
Expand All @@ -46,7 +42,6 @@ where
#[cfg_attr(coverage_nightly, coverage(off))]
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Error::NoValueToPropose, Error::NoValueToPropose) => true,
(Error::ProposerNotFound(addr1), Error::ProposerNotFound(addr2)) => addr1 == addr2,
(Error::ValidatorNotFound(addr1), Error::ValidatorNotFound(addr2)) => addr1 == addr2,
(
Expand Down
10 changes: 10 additions & 0 deletions Code/driver/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,18 @@ pub enum Event<Ctx>
where
Ctx: Context,
{
/// Start a new round
NewRound(Ctx::Height, Round),

/// Propose a value for the given round
ProposeValue(Round, Ctx::Value),

/// Receive a proposal, of the given validity
Proposal(Ctx::Proposal, Validity),

/// Receive a signed vote
Vote(SignedVote<Ctx>),

/// Receive a timeout
TimeoutElapsed(Timeout),
}
2 changes: 0 additions & 2 deletions Code/driver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@
extern crate alloc;

mod driver;
mod env;
mod error;
mod event;
mod message;
mod proposer;
mod util;

pub use driver::Driver;
pub use env::Env;
pub use error::Error;
pub use event::Event;
pub use message::Message;
Expand Down
34 changes: 28 additions & 6 deletions Code/driver/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,23 @@ pub enum Message<Ctx>
where
Ctx: Context,
{
/// Start a new round
NewRound(Ctx::Height, Round),

/// Broadcast a proposal
Propose(Ctx::Proposal),

/// Broadcast a vote for a value
Vote(SignedVote<Ctx>),

/// Decide on a value
Decide(Round, Ctx::Value),

/// Schedule a timeout
ScheduleTimeout(Timeout),
NewRound(Ctx::Height, Round),

/// Ask for a value to propose and schedule a timeout
GetValueAndScheduleTimeout(Round, Timeout),
}

// NOTE: We have to derive these instances manually, otherwise
Expand All @@ -22,11 +34,14 @@ impl<Ctx: Context> Clone for Message<Ctx> {
#[cfg_attr(coverage_nightly, coverage(off))]
fn clone(&self) -> Self {
match self {
Message::NewRound(height, round) => Message::NewRound(height.clone(), *round),
Message::Propose(proposal) => Message::Propose(proposal.clone()),
Message::Vote(signed_vote) => Message::Vote(signed_vote.clone()),
Message::Decide(round, value) => Message::Decide(*round, value.clone()),
Message::ScheduleTimeout(timeout) => Message::ScheduleTimeout(*timeout),
Message::NewRound(height, round) => Message::NewRound(height.clone(), *round),
Message::GetValueAndScheduleTimeout(round, timeout) => {
Message::GetValueAndScheduleTimeout(*round, *timeout)
}
}
}
}
Expand All @@ -35,11 +50,14 @@ impl<Ctx: Context> fmt::Debug for Message<Ctx> {
#[cfg_attr(coverage_nightly, coverage(off))]
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Message::NewRound(height, round) => write!(f, "NewRound({:?}, {:?})", height, round),
Message::Propose(proposal) => write!(f, "Propose({:?})", proposal),
Message::Vote(signed_vote) => write!(f, "Vote({:?})", signed_vote),
Message::Decide(round, value) => write!(f, "Decide({:?}, {:?})", round, value),
Message::ScheduleTimeout(timeout) => write!(f, "ScheduleTimeout({:?})", timeout),
Message::NewRound(height, round) => write!(f, "NewRound({:?}, {:?})", height, round),
Message::GetValueAndScheduleTimeout(round, timeout) => {
write!(f, "GetValueAndScheduleTimeout({:?}, {:?})", round, timeout)
}
}
}
}
Expand All @@ -48,6 +66,9 @@ impl<Ctx: Context> PartialEq for Message<Ctx> {
#[cfg_attr(coverage_nightly, coverage(off))]
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Message::NewRound(height, round), Message::NewRound(other_height, other_round)) => {
height == other_height && round == other_round
}
(Message::Propose(proposal), Message::Propose(other_proposal)) => {
proposal == other_proposal
}
Expand All @@ -60,9 +81,10 @@ impl<Ctx: Context> PartialEq for Message<Ctx> {
(Message::ScheduleTimeout(timeout), Message::ScheduleTimeout(other_timeout)) => {
timeout == other_timeout
}
(Message::NewRound(height, round), Message::NewRound(other_height, other_round)) => {
height == other_height && round == other_round
}
(
Message::GetValueAndScheduleTimeout(round, timeout),
Message::GetValueAndScheduleTimeout(other_round, other_timeout),
) => round == other_round && timeout == other_timeout,
_ => false,
}
}
Expand Down
Loading

0 comments on commit a755b7b

Please sign in to comment.