From 93791bbb94bd536206e3f7ab3f273ce7b538d475 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bart=C5=82omiej=20Kami=C5=84ski?= Date: Thu, 19 Sep 2024 19:02:29 +0300 Subject: [PATCH] Add a WAL to Highway --- .../components/consensus/era_supervisor.rs | 12 +- .../highway_core/active_validator.rs | 213 ------------------ .../consensus/highway_core/highway.rs | 114 +++++++--- .../consensus/highway_core/highway_testing.rs | 3 +- .../highway_core/synchronizer/tests.rs | 15 +- .../components/consensus/protocols/highway.rs | 3 +- .../consensus/protocols/highway/tests.rs | 1 + .../src/components/consensus/protocols/zug.rs | 56 +++-- node/src/components/consensus/utils.rs | 1 + .../consensus/{protocols/zug => utils}/wal.rs | 108 +++------ 10 files changed, 177 insertions(+), 349 deletions(-) rename node/src/components/consensus/{protocols/zug => utils}/wal.rs (66%) diff --git a/node/src/components/consensus/era_supervisor.rs b/node/src/components/consensus/era_supervisor.rs index 29b3febac3..11ff7135ac 100644 --- a/node/src/components/consensus/era_supervisor.rs +++ b/node/src/components/consensus/era_supervisor.rs @@ -389,7 +389,7 @@ impl EraSupervisor { our_id.clone(), ); let instance_id = self.era(era_id).consensus.instance_id(); - let unit_hash_file = self.unit_file(instance_id); + let unit_hash_file = self.protocol_state_file(instance_id); self.era_mut(era_id).consensus.activate_validator( our_id, secret, @@ -510,6 +510,7 @@ impl EraSupervisor { .collect(); // Create and insert the new era instance. + let protocol_state_file = self.protocol_state_file(&instance_id); let (consensus, mut outcomes) = match self.chainspec.core_config.consensus_protocol { ConsensusProtocolName::Highway => HighwayProtocol::new_boxed( instance_id, @@ -522,6 +523,7 @@ impl EraSupervisor { start_time, seed, now, + Some(protocol_state_file), ), ConsensusProtocolName::Zug => Zug::new_boxed( instance_id, @@ -534,7 +536,7 @@ impl EraSupervisor { start_time, seed, now, - self.unit_file(&instance_id), + protocol_state_file, ), }; @@ -579,7 +581,7 @@ impl EraSupervisor { self.validator_matrix.secret_signing_key().clone(), our_id.clone(), ); - let unit_hash_file = self.unit_file(&instance_id); + let unit_hash_file = self.protocol_state_file(&instance_id); outcomes.extend(self.era_mut(era_id).consensus.activate_validator( our_id, secret, @@ -624,7 +626,7 @@ impl EraSupervisor { } }); for instance_id in removed_instance_ids { - if let Err(err) = fs::remove_file(self.unit_file(&instance_id)) { + if let Err(err) = fs::remove_file(self.protocol_state_file(&instance_id)) { match err.kind() { io::ErrorKind::NotFound => {} err => warn!(?err, "could not delete unit hash file"), @@ -637,7 +639,7 @@ impl EraSupervisor { } /// Returns the path to the era's unit file. - fn unit_file(&self, instance_id: &Digest) -> PathBuf { + fn protocol_state_file(&self, instance_id: &Digest) -> PathBuf { self.unit_files_folder.join(format!( "unit_{:?}_{}.dat", instance_id, diff --git a/node/src/components/consensus/highway_core/active_validator.rs b/node/src/components/consensus/highway_core/active_validator.rs index 82919a8148..3943091a01 100644 --- a/node/src/components/consensus/highway_core/active_validator.rs +++ b/node/src/components/consensus/highway_core/active_validator.rs @@ -1,9 +1,6 @@ use std::{ fmt::{self, Debug}, - fs::{self, File}, - io::{self, Read, Write}, iter, - path::{Path, PathBuf}, }; use datasize::DataSize; @@ -73,10 +70,6 @@ where next_timer: Timestamp, /// Panorama and context for a block we are about to propose when we get a consensus value. next_proposal: Option<(BlockContext, Panorama)>, - /// The path to the file storing the hash of our latest known unit (if any). - unit_file: Option, - /// The last known unit created by us. - own_last_unit: Option>, /// The target fault tolerance threshold. The validator pauses (i.e. doesn't create new units) /// if not enough validators are online to finalize values at this FTT. target_ftt: Weight, @@ -104,28 +97,15 @@ impl ActiveValidator { current_time: Timestamp, start_time: Timestamp, state: &State, - unit_file: Option, target_ftt: Weight, instance_id: C::InstanceId, ) -> (Self, Vec>) { - let own_last_unit = unit_file - .as_ref() - .map(read_last_unit) - .transpose() - .map_err(|err| match err.kind() { - io::ErrorKind::NotFound => (), - _ => panic!("got an error reading unit file {:?}: {:?}", unit_file, err), - }) - .ok() - .flatten(); let mut av = ActiveValidator { vidx, secret, next_round_len: state.params().init_round_len(), next_timer: state.params().start_timestamp(), next_proposal: None, - unit_file, - own_last_unit, target_ftt, paused: false, }; @@ -134,34 +114,6 @@ impl ActiveValidator { (av, effects) } - /// Returns whether validator's protocol state is fully synchronized and it's safe to start - /// creating units. - /// - /// If validator restarted within an era, it most likely had created units before that event. It - /// cannot start creating new units until its state is fully synchronized, otherwise it will - /// most likely equivocate. - fn can_vote(&self, state: &State) -> bool { - self.own_last_unit - .as_ref() - .map_or(true, |swunit| state.has_unit(&swunit.hash())) - } - - /// Returns whether validator's protocol state is synchronized up until the panorama of its own - /// last unit. - pub(crate) fn is_own_last_unit_panorama_sync(&self, state: &State) -> bool { - self.own_last_unit.as_ref().map_or(true, |swunit| { - swunit - .wire_unit() - .panorama - .iter_correct_hashes() - .all(|hash| state.has_unit(hash)) - }) - } - - pub(crate) fn take_own_last_unit(&mut self) -> Option> { - self.own_last_unit.take() - } - /// Sets the next round length to the new value. pub(crate) fn set_round_len(&mut self, new_round_len: TimeDiff) { self.next_round_len = new_round_len; @@ -430,10 +382,6 @@ impl ActiveValidator { if value.is_none() && !panorama.has_correct() { return None; // Wait for the first proposal before creating a unit without a value. } - if !self.can_vote(state) { - info!(?self.own_last_unit, "not voting - last own unit unknown"); - return None; - } if let Some((prop_context, _)) = self.next_proposal.take() { warn!(?prop_context, "canceling proposal due to unit"); } @@ -470,12 +418,6 @@ impl ActiveValidator { } .into_hashed(); let swunit = SignedWireUnit::new(hwunit, &self.secret); - write_last_unit(&self.unit_file, swunit.clone()).unwrap_or_else(|err| { - panic!( - "should successfully write unit's hash to {:?}, got {:?}", - self.unit_file, err - ) - }); Some(swunit) } @@ -607,9 +549,6 @@ impl ActiveValidator { /// Returns whether the incoming vertex was signed by our key even though we don't have it yet. /// This can only happen if another node is running with the same signing key. pub(crate) fn is_doppelganger_vertex(&self, vertex: &Vertex, state: &State) -> bool { - if !self.can_vote(state) { - return false; - } match vertex { Vertex::Unit(swunit) => { // If we already have the unit in our local state, @@ -641,45 +580,10 @@ impl ActiveValidator { } } -pub(crate) fn read_last_unit(path: P) -> io::Result> -where - C: Context, - P: AsRef, -{ - let mut file = File::open(path)?; - let mut bytes = Vec::new(); - file.read_to_end(&mut bytes)?; - Ok(serde_json::from_slice(&bytes)?) -} - -pub(crate) fn write_last_unit( - unit_file: &Option, - swunit: SignedWireUnit, -) -> io::Result<()> { - // If there is no unit_file set, do not write to it - let unit_file = if let Some(file) = unit_file.as_ref() { - file - } else { - return Ok(()); - }; - - // Create the file (and its parents) as necessary - if let Some(parent_directory) = unit_file.parent() { - fs::create_dir_all(parent_directory)?; - } - let mut file = File::create(unit_file)?; - - // Finally, write the data to file we created - let bytes = serde_json::to_vec(&swunit)?; - - file.write_all(&bytes) -} - #[cfg(test)] #[allow(clippy::arithmetic_side_effects)] // Overflows in tests panic anyway. mod tests { use std::{collections::BTreeSet, fmt::Debug}; - use tempfile::tempdir; use crate::components::consensus::{ highway_core::highway_testing::TEST_INSTANCE_ID, @@ -704,14 +608,6 @@ mod tests { panic!("Unexpected effect: {:?}", self); } } - - fn unwrap_timer(self) -> Timestamp { - if let Eff::ScheduleTimer(timestamp) = self { - timestamp - } else { - panic!("expected `ScheduleTimer`, got: {:?}", self) - } - } } struct TestState { @@ -747,7 +643,6 @@ mod tests { start_time, start_time, &state, - None, target_ftt, TEST_INSTANCE_ID, ); @@ -983,7 +878,6 @@ mod tests { 410.into(), 410.into(), &state, - None, Weight(2), TEST_INSTANCE_ID, ); @@ -1006,7 +900,6 @@ mod tests { 410.into(), 410.into(), &state, - None, Weight(2), TEST_INSTANCE_ID, ); @@ -1021,110 +914,4 @@ mod tests { state.add_ping(ALICE, 500.into()); assert!(!active_validator.is_doppelganger_vertex(&ping, &state)); } - - #[test] - fn waits_until_synchronized() -> Result<(), AddUnitError> { - let instance_id = TEST_INSTANCE_ID; - let mut state = State::new_test(&[Weight(3)], 0); - let a0 = { - let a0 = add_unit!(state, ALICE, 0xB0; N)?; - state.wire_unit(&a0, instance_id).unwrap() - }; - let a1 = { - let a1 = add_unit!(state, ALICE, None; a0.hash())?; - state.wire_unit(&a1, instance_id).unwrap() - }; - let a2 = { - let a2 = add_unit!(state, ALICE, None; a1.hash())?; - state.wire_unit(&a2, instance_id).unwrap() - }; - // Clean state. We want Alice to synchronize first. - state.retain_evidence_only(); - - let unit_file = { - let tmp_dir = tempdir().unwrap(); - let unit_files_folder = tmp_dir.path().to_path_buf(); - Some(unit_files_folder.join(format!("unit_{:?}.dat", instance_id))) - }; - - // Store `a2` unit as the Alice's last unit. - write_last_unit(&unit_file, a2.clone()).expect("storing unit should succeed"); - - // Alice's last unit is `a2` but `State` is empty. She must synchronize first. - let (mut alice, alice_init_effects) = ActiveValidator::new( - ALICE, - TestSecret(ALICE.0), - 410.into(), - 410.into(), - &state, - unit_file, - Weight(2), - TEST_INSTANCE_ID, - ); - - let mut next_proposal_timer = match &*alice_init_effects { - &[Effect::ScheduleTimer(timestamp), Effect::NewVertex(ValidVertex(Vertex::Ping(_)))] - if timestamp == 416.into() => - { - timestamp - } - other => panic!("unexpected effects {:?}", other), - }; - - // Alice has to synchronize up until `a2` (including) before she starts proposing. - for unit in vec![a0, a1, a2.clone()] { - next_proposal_timer = - assert_no_proposal(&mut alice, &state, instance_id, next_proposal_timer); - state.add_unit(unit)?; - } - - // After synchronizing the protocol state up until `last_own_unit`, Alice can now propose a - // new block. - let bctx = match &*alice.handle_timer(next_proposal_timer, &state, instance_id) { - [Eff::ScheduleTimer(_), Eff::RequestNewBlock(bctx, _)] => bctx.clone(), - effects => panic!("unexpected effects {:?}", effects), - }; - - let proposal_wunit = - unwrap_single(&alice.propose(0xC0FFEE, bctx, &state, instance_id)).unwrap_unit(); - assert_eq!( - proposal_wunit.wire_unit().seq_number, - a2.wire_unit().seq_number + 1, - "new unit should have correct seq_number" - ); - assert_eq!( - proposal_wunit.wire_unit().panorama, - panorama!(a2.hash()), - "new unit should cite the latest unit" - ); - - Ok(()) - } - - // Triggers new proposal by `validator` and verifies that it's empty – no block was proposed. - // Captures the next witness timer and calls the `validator` with that to return the timer for - // the next proposal. - fn assert_no_proposal( - validator: &mut ActiveValidator, - state: &State, - instance_id: u64, - proposal_timer: Timestamp, - ) -> Timestamp { - let (witness_timestamp, bctx) = - match &*validator.handle_timer(proposal_timer, state, instance_id) { - [Eff::ScheduleTimer(witness_timestamp), Eff::RequestNewBlock(bctx, _)] => { - (*witness_timestamp, bctx.clone()) - } - effects => panic!("unexpected effects {:?}", effects), - }; - - let effects = validator.propose(0xC0FFEE, bctx, state, instance_id); - assert!( - effects.is_empty(), - "should not propose blocks until its dependencies are synchronized: {:?}", - effects - ); - - unwrap_single(&validator.handle_timer(witness_timestamp, state, instance_id)).unwrap_timer() - } } diff --git a/node/src/components/consensus/highway_core/highway.rs b/node/src/components/consensus/highway_core/highway.rs index a430678ab2..56624e7805 100644 --- a/node/src/components/consensus/highway_core/highway.rs +++ b/node/src/components/consensus/highway_core/highway.rs @@ -10,6 +10,7 @@ pub use vertex::{ use std::path::PathBuf; use datasize::DataSize; +use serde::{Deserialize, Serialize}; use thiserror::Error; use tracing::{debug, error, info, trace, warn}; @@ -24,7 +25,10 @@ use crate::components::consensus::{ state::{Fault, Observation, State, UnitError}, }, traits::Context, - utils::{Validator, ValidatorIndex, Validators, Weight}, + utils::{ + wal::{ReadWal, WalEntry, WriteWal}, + Validator, ValidatorIndex, Validators, Weight, + }, }; /// If a lot of rounds were skipped between two blocks, log at most this many. @@ -102,7 +106,11 @@ impl From> for Vertex { /// /// Note that this must only be added to the `Highway` instance that created it. Can cause a panic /// or inconsistent state otherwise. -#[derive(Clone, DataSize, Debug, Eq, PartialEq, Hash)] +#[derive(Clone, DataSize, Debug, Eq, PartialEq, Hash, Serialize, Deserialize)] +#[serde(bound( + serialize = "C::Hash: Serialize", + deserialize = "C::Hash: Deserialize<'de>", +))] pub(crate) struct ValidVertex(pub(crate) Vertex) where C: Context; @@ -134,6 +142,18 @@ pub(crate) enum GetDepOutcome { Evidence(C::ValidatorId), } +#[derive(Serialize, Deserialize, Debug, PartialEq)] +#[serde(bound( + serialize = "C::Hash: Serialize", + deserialize = "C::Hash: Deserialize<'de>", +))] +struct HighwayWalEntry { + vertex: ValidVertex, + timestamp: Timestamp, +} + +impl WalEntry for HighwayWalEntry {} + /// An instance of the Highway protocol, containing its local state. /// /// Both observers and active validators must instantiate this, pass in all incoming vertices from @@ -152,6 +172,8 @@ where state: State, /// The state of an active validator, who is participating and creating new vertices. active_validator: Option>, + /// The path to the protocol state file. + write_wal: Option>>, } impl Highway { @@ -167,17 +189,65 @@ impl Highway { instance_id: C::InstanceId, validators: Validators, params: Params, + protocol_state_file: Option, ) -> Highway { info!(%validators, instance=%instance_id, "creating Highway instance"); let weights = validators.iter().map(Validator::weight); let banned = validators.iter_banned_idx(); let cannot_propose = validators.iter_cannot_propose_idx(); let state = State::new(weights, params, banned, cannot_propose); - Highway { + let (write_wal, entries) = if let Some(protocol_state_file) = protocol_state_file.as_ref() { + let entries = Self::read_stored_vertices(protocol_state_file); + let write_wal = match WriteWal::>::new(protocol_state_file) { + Ok(wal) => Some(wal), + Err(err) => { + panic!("couldn't open WriteWal: {}", err); + } + }; + (write_wal, entries) + } else { + (None, vec![]) + }; + let mut result = Highway { instance_id, validators, state, active_validator: None, + write_wal, + }; + result.restore_state(entries); + result + } + + fn read_stored_vertices(protocol_state_file: &PathBuf) -> Vec> { + let mut read_wal = match ReadWal::>::new(protocol_state_file) { + Ok(wal) => wal, + Err(err) => { + panic!("couldn't open ReadWal: {}", err); + } + }; + let mut entries = vec![]; + loop { + match read_wal.read_next_entry() { + Ok(Some(entry)) => { + entries.push(entry); + } + Ok(None) => { + break; + } + Err(err) => { + panic!("error while reading ReadWal: {}", err); + } + } + } + entries + } + + fn restore_state(&mut self, entries: Vec>) { + for entry in entries { + // we can safely ignore the effects - they were properly processed when persisting the + // vertex + self.add_valid_vertex(entry.vertex, entry.timestamp); } } @@ -190,7 +260,7 @@ impl Highway { id: C::ValidatorId, secret: C::ValidatorSecret, current_time: Timestamp, - unit_hash_file: Option, + _unit_hash_file: Option, target_ftt: Weight, ) -> Vec> { if self.active_validator.is_some() { @@ -211,7 +281,6 @@ impl Highway { current_time, start_time, &self.state, - unit_hash_file, target_ftt, self.instance_id, ); @@ -292,6 +361,15 @@ impl Highway { now: Timestamp, ) -> Vec> { if !self.has_vertex(&vertex) { + if let Some(ref mut wal) = self.write_wal { + let entry = HighwayWalEntry { + vertex: ValidVertex(vertex.clone()), + timestamp: now, + }; + if let Err(err) = wal.record_entry(&entry) { + error!("error recording entry: {}", err); + } + } match vertex { Vertex::Unit(unit) => self.add_valid_unit(unit, now), Vertex::Evidence(evidence) => self.add_evidence(evidence), @@ -638,30 +716,9 @@ impl Highway { }) .unwrap_or_default(); evidence_effects.extend(self.on_new_unit(&unit_hash, now)); - evidence_effects.extend(self.add_own_last_unit(now)); evidence_effects } - /// If validator's protocol state is synchronized, adds its own last unit (if any) to the - /// protocol state - fn add_own_last_unit(&mut self, now: Timestamp) -> Vec> { - self.map_active_validator( - |av, state| { - if av.is_own_last_unit_panorama_sync(state) { - if let Some(own_last_unit) = av.take_own_last_unit() { - vec![Effect::NewVertex(ValidVertex(Vertex::Unit(own_last_unit)))] - } else { - vec![] - } - } else { - vec![] - } - }, - now, - ) - .unwrap_or_default() - } - /// Adds endorsements to the state. If there are conflicting endorsements, `NewVertex` effects /// are returned containing evidence to prove them faulty. fn add_endorsements(&mut self, endorsements: Endorsements) -> Vec> { @@ -805,6 +862,7 @@ pub(crate) mod tests { validators: test_validators(), state, active_validator: None, + write_wal: None, }; let wunit = WireUnit { panorama: Panorama::new(WEIGHTS.len()), @@ -862,6 +920,7 @@ pub(crate) mod tests { validators: test_validators(), state: State::new_test(WEIGHTS, 0), active_validator: None, + write_wal: None, }; let vertex_end_a = Vertex::Endorsements(end_a); @@ -914,6 +973,7 @@ pub(crate) mod tests { validators: test_validators(), state, active_validator: None, + write_wal: None, }; let validate = |wunit0: &WireUnit, @@ -1012,6 +1072,7 @@ pub(crate) mod tests { validators: test_validators(), state, active_validator: None, + write_wal: None, }; // Ping by validator that is not bonded, with an index that is outside of boundaries of the @@ -1038,6 +1099,7 @@ pub(crate) mod tests { validators: test_validators(), state, active_validator: None, + write_wal: None, }; let _effects = diff --git a/node/src/components/consensus/highway_core/highway_testing.rs b/node/src/components/consensus/highway_core/highway_testing.rs index d41b533199..d67d53e835 100644 --- a/node/src/components/consensus/highway_core/highway_testing.rs +++ b/node/src/components/consensus/highway_core/highway_testing.rs @@ -935,7 +935,8 @@ impl HighwayTestHarnessBuilder { |(vid, secrets): (ValidatorId, &mut HashMap)| { let v_sec = secrets.remove(&vid).expect("Secret key should exist."); - let mut highway = Highway::new(instance_id, validators.clone(), params.clone()); + let mut highway = + Highway::new(instance_id, validators.clone(), params.clone(), None); let effects = highway.activate_validator(vid, v_sec, start_time, None, Weight(ftt)); let finality_detector = FinalityDetector::new(Weight(ftt)); diff --git a/node/src/components/consensus/highway_core/synchronizer/tests.rs b/node/src/components/consensus/highway_core/synchronizer/tests.rs index 694f609f0e..672264f25e 100644 --- a/node/src/components/consensus/highway_core/synchronizer/tests.rs +++ b/node/src/components/consensus/highway_core/synchronizer/tests.rs @@ -38,7 +38,7 @@ fn purge_vertices() { // A Highway instance that's just used to create PreValidatedVertex instances below. let util_highway = - Highway::::new(TEST_INSTANCE_ID, test_validators(), params.clone()); + Highway::::new(TEST_INSTANCE_ID, test_validators(), params.clone(), None); // Returns the WireUnit with the specified hash. let unit = |hash: u64| Vertex::Unit(state.wire_unit(&hash, TEST_INSTANCE_ID).unwrap()); @@ -50,7 +50,8 @@ fn purge_vertices() { // Create a synchronizer with a 0x20 ms timeout, and a Highway instance. let max_requests_for_vertex = 5; let mut sync = Synchronizer::::new(WEIGHTS.len(), TEST_INSTANCE_ID); - let mut highway = Highway::::new(TEST_INSTANCE_ID, test_validators(), params); + let mut highway = + Highway::::new(TEST_INSTANCE_ID, test_validators(), params, None); // At time 0x20, we receive c2, b0 and b1 — the latter ahead of their timestamp. // Since c2 is the first entry in the main queue, processing is scheduled. @@ -126,7 +127,7 @@ fn do_not_download_synchronized_dependencies() { let mut state = State::new(WEIGHTS, params.clone(), vec![], vec![]); let util_highway = - Highway::::new(TEST_INSTANCE_ID, test_validators(), params.clone()); + Highway::::new(TEST_INSTANCE_ID, test_validators(), params.clone(), None); // We use round exponent 0u8, so a round is 0x40 ms. With seed 0, Carol is the first leader. // @@ -153,7 +154,8 @@ fn do_not_download_synchronized_dependencies() { let max_requests_for_vertex = 5; let mut sync = Synchronizer::::new(WEIGHTS.len(), TEST_INSTANCE_ID); - let mut highway = Highway::::new(TEST_INSTANCE_ID, test_validators(), params); + let mut highway = + Highway::::new(TEST_INSTANCE_ID, test_validators(), params, None); let now = 0x20.into(); assert!(matches!( @@ -232,7 +234,7 @@ fn transitive_proposal_dependency() { let mut state = State::new(WEIGHTS, params.clone(), vec![], vec![]); let util_highway = - Highway::::new(TEST_INSTANCE_ID, test_validators(), params.clone()); + Highway::::new(TEST_INSTANCE_ID, test_validators(), params.clone(), None); // Alice a0 — a1 // / \ @@ -257,7 +259,8 @@ fn transitive_proposal_dependency() { let max_requests_for_vertex = 5; let mut sync = Synchronizer::::new(WEIGHTS.len(), TEST_INSTANCE_ID); - let mut highway = Highway::::new(TEST_INSTANCE_ID, test_validators(), params); + let mut highway = + Highway::::new(TEST_INSTANCE_ID, test_validators(), params, None); let now = 0x100.into(); assert!(matches!( diff --git a/node/src/components/consensus/protocols/highway.rs b/node/src/components/consensus/protocols/highway.rs index c6589fc004..eb252806a0 100644 --- a/node/src/components/consensus/protocols/highway.rs +++ b/node/src/components/consensus/protocols/highway.rs @@ -97,6 +97,7 @@ impl HighwayProtocol { era_start_time: Timestamp, seed: u64, now: Timestamp, + protocol_state_file: Option, ) -> (Box>, ProtocolOutcomes) { let validators_count = validator_stakes.len(); let validators = protocols::common::validators::(faulty, inactive, validator_stakes); @@ -170,7 +171,7 @@ impl HighwayProtocol { let outcomes = Self::initialize_timers(now, era_start_time, &config.highway); - let highway = Highway::new(instance_id, validators, params); + let highway = Highway::new(instance_id, validators, params, protocol_state_file); let hw_proto = Box::new(HighwayProtocol { pending_values: HashMap::new(), finality_detector: FinalityDetector::new(ftt), diff --git a/node/src/components/consensus/protocols/highway/tests.rs b/node/src/components/consensus/protocols/highway/tests.rs index 5c06da33bf..83e7e781da 100644 --- a/node/src/components/consensus/protocols/highway/tests.rs +++ b/node/src/components/consensus/protocols/highway/tests.rs @@ -89,6 +89,7 @@ where start_timestamp, 0, start_timestamp, + None, ); // We expect three messages: // * log participation timer, diff --git a/node/src/components/consensus/protocols/zug.rs b/node/src/components/consensus/protocols/zug.rs index 87135cebdb..121169410b 100644 --- a/node/src/components/consensus/protocols/zug.rs +++ b/node/src/components/consensus/protocols/zug.rs @@ -64,7 +64,6 @@ mod proposal; mod round; #[cfg(test)] mod tests; -mod wal; use std::{ any::Any, @@ -93,7 +92,10 @@ use crate::{ era_supervisor::SerializedMessage, protocols, traits::{ConsensusValueT, Context}, - utils::{ValidatorIndex, ValidatorMap, Validators, Weight}, + utils::{ + wal::{ReadWal, WalEntry, WriteWal}, + ValidatorIndex, ValidatorMap, Validators, Weight, + }, ActionId, LeaderSequence, TimerId, }, types::NodeId, @@ -105,7 +107,7 @@ use params::Params; use participation::{Participation, ParticipationStatus}; use proposal::{HashedProposal, Proposal}; use round::Round; -use wal::{Entry, ReadWal, WriteWal}; +use serde::{Deserialize, Serialize}; pub(crate) use message::{Message, SyncRequest}; @@ -126,6 +128,23 @@ pub(crate) type RoundId = u32; type ProposalsAwaitingParent = HashSet<(RoundId, NodeId)>; type ProposalsAwaitingValidation = HashSet<(RoundId, HashedProposal, NodeId)>; +/// An entry in the Write-Ahead Log, storing a message we had added to our protocol state. +#[derive(Deserialize, Serialize, Debug, PartialEq)] +#[serde(bound( + serialize = "C::Hash: Serialize", + deserialize = "C::Hash: Deserialize<'de>", +))] +pub(crate) enum ZugWalEntry { + /// A signed echo or vote. + SignedMessage(SignedMessage), + /// A proposal. + Proposal(Proposal, RoundId), + /// Evidence of a validator double-signing. + Evidence(SignedMessage, Content, C::Signature), +} + +impl WalEntry for ZugWalEntry {} + /// Contains the portion of the state required for an active validator to participate in the /// protocol. #[derive(DataSize)] @@ -200,7 +219,7 @@ where /// `update`. next_scheduled_update: Timestamp, /// The write-ahead log to prevent honest nodes from double-signing upon restart. - write_wal: Option>, + write_wal: Option>>, /// A map of random IDs -> tipmestamp of when it has been created, allowing to /// verify that a response has been asked for. sent_sync_requests: registered_sync::RegisteredSync, @@ -663,7 +682,7 @@ impl Zug { ); // We only return the new message if we are able to record it. If that fails we // wouldn't know about our own message after a restart and risk double-signing. - if self.record_entry(&Entry::SignedMessage(signed_msg.clone())) + if self.record_entry(&ZugWalEntry::SignedMessage(signed_msg.clone())) && self.add_content(signed_msg.clone()) { Some(signed_msg) @@ -708,7 +727,11 @@ impl Zug { signature2: C::Signature, now: Timestamp, ) -> ProtocolOutcomes { - self.record_entry(&Entry::Evidence(signed_msg.clone(), content2, signature2)); + self.record_entry(&ZugWalEntry::Evidence( + signed_msg.clone(), + content2, + signature2, + )); self.handle_fault_no_wal(signed_msg, validator_id, content2, signature2, now) } @@ -1108,7 +1131,7 @@ impl Zug { return Ok(vec![]); } - self.record_entry(&Entry::SignedMessage(signed_msg.clone())); + self.record_entry(&ZugWalEntry::SignedMessage(signed_msg.clone())); if self.add_content(signed_msg) { Ok(self.update(now)) } else { @@ -1329,7 +1352,7 @@ impl Zug { /// Adds a signed message to the WAL such that we can avoid double signing upon recovery if the /// node shuts down. Returns `true` if the message was added successfully. - fn record_entry(&mut self, entry: &Entry) -> bool { + fn record_entry(&mut self, entry: &ZugWalEntry) -> bool { match self.write_wal.as_mut().map(|ww| ww.record_entry(entry)) { None => false, Some(Ok(())) => true, @@ -1353,7 +1376,7 @@ impl Zug { pub(crate) fn open_wal(&mut self, wal_file: PathBuf, now: Timestamp) -> ProtocolOutcomes { let our_idx = self.our_idx(); // Open the file for reading. - let mut read_wal = match ReadWal::::new(&wal_file) { + let mut read_wal = match ReadWal::>::new(&wal_file) { Ok(read_wal) => read_wal, Err(err) => { error!(our_idx, %err, "could not create a ReadWal using this file"); @@ -1367,13 +1390,13 @@ impl Zug { loop { match read_wal.read_next_entry() { Ok(Some(next_entry)) => match next_entry { - Entry::SignedMessage(next_message) => { + ZugWalEntry::SignedMessage(next_message) => { if !self.add_content(next_message) { error!(our_idx, "Could not add content from WAL."); return outcomes; } } - Entry::Proposal(next_proposal, corresponding_round_id) => { + ZugWalEntry::Proposal(next_proposal, corresponding_round_id) => { if self .round(corresponding_round_id) .and_then(Round::proposal) @@ -1421,7 +1444,7 @@ impl Zug { } } } - Entry::Evidence( + ZugWalEntry::Evidence( conflicting_message, conflicting_message_content, conflicting_signature, @@ -1806,7 +1829,7 @@ impl Zug { } else { self.log_proposal(&proposal, round_id, "proposal does not need validation"); if self.round_mut(round_id).insert_proposal(proposal.clone()) { - self.record_entry(&Entry::Proposal(proposal.inner().clone(), round_id)); + self.record_entry(&ZugWalEntry::Proposal(proposal.inner().clone(), round_id)); self.progress_detected = true; self.mark_dirty(round_id); if let Some(block) = proposal.maybe_block().cloned() { @@ -1947,7 +1970,10 @@ impl Zug { instance_id: *self.instance_id(), echo, }; - if !self.record_entry(&Entry::Proposal(hashed_prop.inner().clone(), round_id)) { + if !self.record_entry(&ZugWalEntry::Proposal( + hashed_prop.inner().clone(), + round_id, + )) { error!( our_idx = self.our_idx(), "could not record own proposal in WAL" @@ -2314,7 +2340,7 @@ where for (round_id, proposal, _sender) in rounds_and_node_ids { info!(our_idx = self.our_idx(), %round_id, %proposal, "handling valid proposal"); if self.round_mut(round_id).insert_proposal(proposal.clone()) { - self.record_entry(&Entry::Proposal(proposal.into_inner(), round_id)); + self.record_entry(&ZugWalEntry::Proposal(proposal.into_inner(), round_id)); self.mark_dirty(round_id); self.progress_detected = true; outcomes.push(ProtocolOutcome::HandledProposedBlock( diff --git a/node/src/components/consensus/utils.rs b/node/src/components/consensus/utils.rs index 1e71d7833f..bcbda1b361 100644 --- a/node/src/components/consensus/utils.rs +++ b/node/src/components/consensus/utils.rs @@ -1,6 +1,7 @@ //! Various utilities relevant to consensus. mod validators; +pub(crate) mod wal; mod weight; pub use validators::{Validator, ValidatorIndex, ValidatorMap, Validators}; diff --git a/node/src/components/consensus/protocols/zug/wal.rs b/node/src/components/consensus/utils/wal.rs similarity index 66% rename from node/src/components/consensus/protocols/zug/wal.rs rename to node/src/components/consensus/utils/wal.rs index 70fa606eeb..31c9649a40 100644 --- a/node/src/components/consensus/protocols/zug/wal.rs +++ b/node/src/components/consensus/utils/wal.rs @@ -11,36 +11,16 @@ use serde::{Deserialize, Serialize}; use thiserror::Error; use tracing::warn; -use crate::components::consensus::{ - protocols::zug::{Content, Proposal, SignedMessage}, - traits::Context, -}; - -use super::RoundId; - -/// An entry in the Write-Ahead Log, storing a message we had added to our protocol state. -#[derive(Deserialize, Serialize, Debug, PartialEq)] -#[serde(bound( - serialize = "C::Hash: Serialize", - deserialize = "C::Hash: Deserialize<'de>", -))] -pub(crate) enum Entry { - /// A signed echo or vote. - SignedMessage(SignedMessage), - /// A proposal. - Proposal(Proposal, RoundId), - /// Evidence of a validator double-signing. - Evidence(SignedMessage, Content, C::Signature), -} +pub(crate) trait WalEntry: Serialize + for<'de> Deserialize<'de> {} /// A Write-Ahead Log to store every message on disk when we add it to the protocol state. #[derive(Debug)] -pub(crate) struct WriteWal { +pub(crate) struct WriteWal { writer: BufWriter, - phantom_context: PhantomData, + phantom_context: PhantomData, } -impl DataSize for WriteWal { +impl DataSize for WriteWal { const IS_DYNAMIC: bool = true; const STATIC_HEAP_SIZE: usize = 0; @@ -64,7 +44,7 @@ pub(crate) enum WriteWalError { FileCouldntBeOpened(io::Error), } -impl WriteWal { +impl WriteWal { pub(crate) fn new(wal_path: &PathBuf) -> Result { let file = OpenOptions::new() .append(true) @@ -77,7 +57,7 @@ impl WriteWal { }) } - pub(crate) fn record_entry(&mut self, entry: &Entry) -> Result<(), WriteWalError> { + pub(crate) fn record_entry(&mut self, entry: &E) -> Result<(), WriteWalError> { // First write the size of the entry as a serialized u64. let entry_size = bincode::serialized_size(entry).map_err(WriteWalError::CouldntGetSerializedSize)?; @@ -96,9 +76,9 @@ impl WriteWal { /// A buffer to read a Write-Ahead Log from disk and deserialize its messages. #[derive(Debug)] -pub(crate) struct ReadWal { +pub(crate) struct ReadWal { pub(crate) reader: BufReader, - pub(crate) phantom_context: PhantomData, + pub(crate) phantom_context: PhantomData, } #[derive(Error, Debug)] @@ -111,7 +91,7 @@ pub(crate) enum ReadWalError { CouldNotDeserialize(bincode::Error), } -impl ReadWal { +impl ReadWal { pub(crate) fn new(wal_path: &PathBuf) -> Result { let file = OpenOptions::new() .create(true) @@ -127,10 +107,10 @@ impl ReadWal { } } -impl ReadWal { +impl ReadWal { /// Reads the next entry from the WAL, or returns an error. /// If there are 0 bytes left it returns `Ok(None)`. - pub(crate) fn read_next_entry(&mut self) -> Result>, ReadWalError> { + pub(crate) fn read_next_entry(&mut self) -> Result, ReadWalError> { // Remember the current position: If we encounter an unreadable entry we trim the file at // this point so we can continue appending entries after it. let position = self.reader.stream_position()?; @@ -179,65 +159,29 @@ impl ReadWal { mod tests { use std::iter::from_fn; - use crate::components::consensus::{ - cl_context::{ClContext, Keypair}, - protocols::common, - }; - use casper_types::{PublicKey, SecretKey, Timestamp, U512}; + use casper_types::Timestamp; + use serde::{Deserialize, Serialize}; use tempfile::tempdir; use super::*; - use once_cell::sync::Lazy; - const INSTANCE_ID_DATA: &[u8; 1] = &[123u8; 1]; - const ALICE_WEIGHT: u64 = 1000000; - const ALICE_SECRET_KEY_BYTES: [u8; SecretKey::ED25519_LENGTH] = [3; SecretKey::ED25519_LENGTH]; - static ALICE_SECRET_KEY: Lazy = - Lazy::new(|| SecretKey::ed25519_from_bytes(ALICE_SECRET_KEY_BYTES).unwrap()); - static ALICE_PUBLIC_KEY: Lazy = - Lazy::new(|| PublicKey::from(Lazy::force(&ALICE_SECRET_KEY))); - - fn create_message_fn() -> Box) -> SignedMessage> { - let alice_keypair = Keypair::from(std::sync::Arc::new( - SecretKey::ed25519_from_bytes(ALICE_SECRET_KEY_BYTES).unwrap(), - )); - let weights: Vec<(PublicKey, U512)> = - vec![(ALICE_PUBLIC_KEY.clone(), U512::from(ALICE_WEIGHT))]; - let validators = common::validators::( - &Default::default(), - &Default::default(), - weights.iter().cloned().collect(), - ); - let instance_id = ClContext::hash(INSTANCE_ID_DATA); - Box::new(move |round_id, content: Content| { - let validator_idx = validators.get_index(alice_keypair.public_key()).unwrap(); - SignedMessage::sign_new( - round_id, - instance_id, - content, - validator_idx, - &alice_keypair, - ) - }) + + #[derive(Serialize, Deserialize, Debug, PartialEq)] + enum TestWalEntry { + Variant1(u32), + Variant2(Timestamp), } + impl WalEntry for TestWalEntry {} + #[test] // Tests the functionality of the ReadWal and WriteWal by constructing one and manipulating it. fn test_read_write_wal() { // Create a bunch of test entries - let create_message = create_message_fn(); let mut entries = vec![ - Entry::SignedMessage(create_message(0, Content::Vote(true))), - Entry::SignedMessage(create_message(1, Content::Vote(false))), - Entry::SignedMessage(create_message( - 2, - Content::Echo(ClContext::hash(&[123u8; 1])), - )), - Entry::Proposal(Proposal::dummy(Timestamp::zero(), 0), 0), - Entry::Evidence( - create_message(0, Content::Echo(ClContext::hash(&[23u8; 1]))), - Content::Echo(ClContext::hash(&[52u8; 1])), - create_message(0, Content::Echo(ClContext::hash(&[4u8; 1]))).signature, - ), + TestWalEntry::Variant1(0), + TestWalEntry::Variant1(1), + TestWalEntry::Variant1(2), + TestWalEntry::Variant2(Timestamp::zero()), ]; // Create a temporary directory which will be removed upon dropping the dir variable, @@ -246,14 +190,14 @@ mod tests { let path = dir.path().join("wal"); let read_entries = || { - let mut read_wal: ReadWal = ReadWal::new(&path).unwrap(); + let mut read_wal: ReadWal = ReadWal::new(&path).unwrap(); from_fn(move || read_wal.read_next_entry().unwrap()).collect::>() }; assert_eq!(read_entries(), vec![]); // Record all of the test entries into the WAL file - let mut write_wal: WriteWal = WriteWal::new(&path).unwrap(); + let mut write_wal: WriteWal = WriteWal::new(&path).unwrap(); entries.iter().for_each(move |entry| { write_wal.record_entry(entry).unwrap();