Skip to content

Commit

Permalink
Merge #4895
Browse files Browse the repository at this point in the history
4895: Add a WAL to Highway r=EdHastingsCasperAssociation a=fizyk20

This will enable validators in a network running Highway to resume validating after a crash without the necessity to download the protocol state from other nodes. The WAL (Write-Ahead Log) will contain all the units added to the protocol state during the node's operation, which combined with the information from the stored blocks will make it possible to restore the protocol state to the point from before a crash.

Since Zug already uses a WAL, this will close the only remaining hole in the protocol state persistence story.

Closes #3904 


Co-authored-by: Bartłomiej Kamiński <[email protected]>
  • Loading branch information
casperlabs-bors-ng[bot] and fizyk20 authored Sep 24, 2024
2 parents 0dd04ac + 93791bb commit ea5e6fa
Show file tree
Hide file tree
Showing 10 changed files with 177 additions and 349 deletions.
12 changes: 7 additions & 5 deletions node/src/components/consensus/era_supervisor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ impl EraSupervisor {
our_id.clone(),
);
let instance_id = self.era(era_id).consensus.instance_id();
let unit_hash_file = self.unit_file(instance_id);
let unit_hash_file = self.protocol_state_file(instance_id);
self.era_mut(era_id).consensus.activate_validator(
our_id,
secret,
Expand Down Expand Up @@ -510,6 +510,7 @@ impl EraSupervisor {
.collect();

// Create and insert the new era instance.
let protocol_state_file = self.protocol_state_file(&instance_id);
let (consensus, mut outcomes) = match self.chainspec.core_config.consensus_protocol {
ConsensusProtocolName::Highway => HighwayProtocol::new_boxed(
instance_id,
Expand All @@ -522,6 +523,7 @@ impl EraSupervisor {
start_time,
seed,
now,
Some(protocol_state_file),
),
ConsensusProtocolName::Zug => Zug::new_boxed(
instance_id,
Expand All @@ -534,7 +536,7 @@ impl EraSupervisor {
start_time,
seed,
now,
self.unit_file(&instance_id),
protocol_state_file,
),
};

Expand Down Expand Up @@ -579,7 +581,7 @@ impl EraSupervisor {
self.validator_matrix.secret_signing_key().clone(),
our_id.clone(),
);
let unit_hash_file = self.unit_file(&instance_id);
let unit_hash_file = self.protocol_state_file(&instance_id);
outcomes.extend(self.era_mut(era_id).consensus.activate_validator(
our_id,
secret,
Expand Down Expand Up @@ -624,7 +626,7 @@ impl EraSupervisor {
}
});
for instance_id in removed_instance_ids {
if let Err(err) = fs::remove_file(self.unit_file(&instance_id)) {
if let Err(err) = fs::remove_file(self.protocol_state_file(&instance_id)) {
match err.kind() {
io::ErrorKind::NotFound => {}
err => warn!(?err, "could not delete unit hash file"),
Expand All @@ -637,7 +639,7 @@ impl EraSupervisor {
}

/// Returns the path to the era's unit file.
fn unit_file(&self, instance_id: &Digest) -> PathBuf {
fn protocol_state_file(&self, instance_id: &Digest) -> PathBuf {
self.unit_files_folder.join(format!(
"unit_{:?}_{}.dat",
instance_id,
Expand Down
213 changes: 0 additions & 213 deletions node/src/components/consensus/highway_core/active_validator.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use std::{
fmt::{self, Debug},
fs::{self, File},
io::{self, Read, Write},
iter,
path::{Path, PathBuf},
};

use datasize::DataSize;
Expand Down Expand Up @@ -73,10 +70,6 @@ where
next_timer: Timestamp,
/// Panorama and context for a block we are about to propose when we get a consensus value.
next_proposal: Option<(BlockContext<C>, Panorama<C>)>,
/// The path to the file storing the hash of our latest known unit (if any).
unit_file: Option<PathBuf>,
/// The last known unit created by us.
own_last_unit: Option<SignedWireUnit<C>>,
/// The target fault tolerance threshold. The validator pauses (i.e. doesn't create new units)
/// if not enough validators are online to finalize values at this FTT.
target_ftt: Weight,
Expand Down Expand Up @@ -104,28 +97,15 @@ impl<C: Context> ActiveValidator<C> {
current_time: Timestamp,
start_time: Timestamp,
state: &State<C>,
unit_file: Option<PathBuf>,
target_ftt: Weight,
instance_id: C::InstanceId,
) -> (Self, Vec<Effect<C>>) {
let own_last_unit = unit_file
.as_ref()
.map(read_last_unit)
.transpose()
.map_err(|err| match err.kind() {
io::ErrorKind::NotFound => (),
_ => panic!("got an error reading unit file {:?}: {:?}", unit_file, err),
})
.ok()
.flatten();
let mut av = ActiveValidator {
vidx,
secret,
next_round_len: state.params().init_round_len(),
next_timer: state.params().start_timestamp(),
next_proposal: None,
unit_file,
own_last_unit,
target_ftt,
paused: false,
};
Expand All @@ -134,34 +114,6 @@ impl<C: Context> ActiveValidator<C> {
(av, effects)
}

/// Returns whether validator's protocol state is fully synchronized and it's safe to start
/// creating units.
///
/// If validator restarted within an era, it most likely had created units before that event. It
/// cannot start creating new units until its state is fully synchronized, otherwise it will
/// most likely equivocate.
fn can_vote(&self, state: &State<C>) -> bool {
self.own_last_unit
.as_ref()
.map_or(true, |swunit| state.has_unit(&swunit.hash()))
}

/// Returns whether validator's protocol state is synchronized up until the panorama of its own
/// last unit.
pub(crate) fn is_own_last_unit_panorama_sync(&self, state: &State<C>) -> bool {
self.own_last_unit.as_ref().map_or(true, |swunit| {
swunit
.wire_unit()
.panorama
.iter_correct_hashes()
.all(|hash| state.has_unit(hash))
})
}

pub(crate) fn take_own_last_unit(&mut self) -> Option<SignedWireUnit<C>> {
self.own_last_unit.take()
}

/// Sets the next round length to the new value.
pub(crate) fn set_round_len(&mut self, new_round_len: TimeDiff) {
self.next_round_len = new_round_len;
Expand Down Expand Up @@ -430,10 +382,6 @@ impl<C: Context> ActiveValidator<C> {
if value.is_none() && !panorama.has_correct() {
return None; // Wait for the first proposal before creating a unit without a value.
}
if !self.can_vote(state) {
info!(?self.own_last_unit, "not voting - last own unit unknown");
return None;
}
if let Some((prop_context, _)) = self.next_proposal.take() {
warn!(?prop_context, "canceling proposal due to unit");
}
Expand Down Expand Up @@ -470,12 +418,6 @@ impl<C: Context> ActiveValidator<C> {
}
.into_hashed();
let swunit = SignedWireUnit::new(hwunit, &self.secret);
write_last_unit(&self.unit_file, swunit.clone()).unwrap_or_else(|err| {
panic!(
"should successfully write unit's hash to {:?}, got {:?}",
self.unit_file, err
)
});
Some(swunit)
}

Expand Down Expand Up @@ -607,9 +549,6 @@ impl<C: Context> ActiveValidator<C> {
/// Returns whether the incoming vertex was signed by our key even though we don't have it yet.
/// This can only happen if another node is running with the same signing key.
pub(crate) fn is_doppelganger_vertex(&self, vertex: &Vertex<C>, state: &State<C>) -> bool {
if !self.can_vote(state) {
return false;
}
match vertex {
Vertex::Unit(swunit) => {
// If we already have the unit in our local state,
Expand Down Expand Up @@ -641,45 +580,10 @@ impl<C: Context> ActiveValidator<C> {
}
}

pub(crate) fn read_last_unit<C, P>(path: P) -> io::Result<SignedWireUnit<C>>
where
C: Context,
P: AsRef<Path>,
{
let mut file = File::open(path)?;
let mut bytes = Vec::new();
file.read_to_end(&mut bytes)?;
Ok(serde_json::from_slice(&bytes)?)
}

pub(crate) fn write_last_unit<C: Context>(
unit_file: &Option<PathBuf>,
swunit: SignedWireUnit<C>,
) -> io::Result<()> {
// If there is no unit_file set, do not write to it
let unit_file = if let Some(file) = unit_file.as_ref() {
file
} else {
return Ok(());
};

// Create the file (and its parents) as necessary
if let Some(parent_directory) = unit_file.parent() {
fs::create_dir_all(parent_directory)?;
}
let mut file = File::create(unit_file)?;

// Finally, write the data to file we created
let bytes = serde_json::to_vec(&swunit)?;

file.write_all(&bytes)
}

#[cfg(test)]
#[allow(clippy::arithmetic_side_effects)] // Overflows in tests panic anyway.
mod tests {
use std::{collections::BTreeSet, fmt::Debug};
use tempfile::tempdir;

use crate::components::consensus::{
highway_core::highway_testing::TEST_INSTANCE_ID,
Expand All @@ -704,14 +608,6 @@ mod tests {
panic!("Unexpected effect: {:?}", self);
}
}

fn unwrap_timer(self) -> Timestamp {
if let Eff::ScheduleTimer(timestamp) = self {
timestamp
} else {
panic!("expected `ScheduleTimer`, got: {:?}", self)
}
}
}

struct TestState {
Expand Down Expand Up @@ -747,7 +643,6 @@ mod tests {
start_time,
start_time,
&state,
None,
target_ftt,
TEST_INSTANCE_ID,
);
Expand Down Expand Up @@ -983,7 +878,6 @@ mod tests {
410.into(),
410.into(),
&state,
None,
Weight(2),
TEST_INSTANCE_ID,
);
Expand All @@ -1006,7 +900,6 @@ mod tests {
410.into(),
410.into(),
&state,
None,
Weight(2),
TEST_INSTANCE_ID,
);
Expand All @@ -1021,110 +914,4 @@ mod tests {
state.add_ping(ALICE, 500.into());
assert!(!active_validator.is_doppelganger_vertex(&ping, &state));
}

#[test]
fn waits_until_synchronized() -> Result<(), AddUnitError<TestContext>> {
let instance_id = TEST_INSTANCE_ID;
let mut state = State::new_test(&[Weight(3)], 0);
let a0 = {
let a0 = add_unit!(state, ALICE, 0xB0; N)?;
state.wire_unit(&a0, instance_id).unwrap()
};
let a1 = {
let a1 = add_unit!(state, ALICE, None; a0.hash())?;
state.wire_unit(&a1, instance_id).unwrap()
};
let a2 = {
let a2 = add_unit!(state, ALICE, None; a1.hash())?;
state.wire_unit(&a2, instance_id).unwrap()
};
// Clean state. We want Alice to synchronize first.
state.retain_evidence_only();

let unit_file = {
let tmp_dir = tempdir().unwrap();
let unit_files_folder = tmp_dir.path().to_path_buf();
Some(unit_files_folder.join(format!("unit_{:?}.dat", instance_id)))
};

// Store `a2` unit as the Alice's last unit.
write_last_unit(&unit_file, a2.clone()).expect("storing unit should succeed");

// Alice's last unit is `a2` but `State` is empty. She must synchronize first.
let (mut alice, alice_init_effects) = ActiveValidator::new(
ALICE,
TestSecret(ALICE.0),
410.into(),
410.into(),
&state,
unit_file,
Weight(2),
TEST_INSTANCE_ID,
);

let mut next_proposal_timer = match &*alice_init_effects {
&[Effect::ScheduleTimer(timestamp), Effect::NewVertex(ValidVertex(Vertex::Ping(_)))]
if timestamp == 416.into() =>
{
timestamp
}
other => panic!("unexpected effects {:?}", other),
};

// Alice has to synchronize up until `a2` (including) before she starts proposing.
for unit in vec![a0, a1, a2.clone()] {
next_proposal_timer =
assert_no_proposal(&mut alice, &state, instance_id, next_proposal_timer);
state.add_unit(unit)?;
}

// After synchronizing the protocol state up until `last_own_unit`, Alice can now propose a
// new block.
let bctx = match &*alice.handle_timer(next_proposal_timer, &state, instance_id) {
[Eff::ScheduleTimer(_), Eff::RequestNewBlock(bctx, _)] => bctx.clone(),
effects => panic!("unexpected effects {:?}", effects),
};

let proposal_wunit =
unwrap_single(&alice.propose(0xC0FFEE, bctx, &state, instance_id)).unwrap_unit();
assert_eq!(
proposal_wunit.wire_unit().seq_number,
a2.wire_unit().seq_number + 1,
"new unit should have correct seq_number"
);
assert_eq!(
proposal_wunit.wire_unit().panorama,
panorama!(a2.hash()),
"new unit should cite the latest unit"
);

Ok(())
}

// Triggers new proposal by `validator` and verifies that it's empty – no block was proposed.
// Captures the next witness timer and calls the `validator` with that to return the timer for
// the next proposal.
fn assert_no_proposal(
validator: &mut ActiveValidator<TestContext>,
state: &State<TestContext>,
instance_id: u64,
proposal_timer: Timestamp,
) -> Timestamp {
let (witness_timestamp, bctx) =
match &*validator.handle_timer(proposal_timer, state, instance_id) {
[Eff::ScheduleTimer(witness_timestamp), Eff::RequestNewBlock(bctx, _)] => {
(*witness_timestamp, bctx.clone())
}
effects => panic!("unexpected effects {:?}", effects),
};

let effects = validator.propose(0xC0FFEE, bctx, state, instance_id);
assert!(
effects.is_empty(),
"should not propose blocks until its dependencies are synchronized: {:?}",
effects
);

unwrap_single(&validator.handle_timer(witness_timestamp, state, instance_id)).unwrap_timer()
}
}
Loading

0 comments on commit ea5e6fa

Please sign in to comment.