Skip to content

Commit

Permalink
[fix] ensure that no same leader is elected for the same round but di…
Browse files Browse the repository at this point in the history
…fferent offset. Also fix issue that arrises when round = 0
  • Loading branch information
akichidis committed Dec 1, 2023
1 parent 3050e2d commit e23f312
Show file tree
Hide file tree
Showing 5 changed files with 95 additions and 57 deletions.
65 changes: 50 additions & 15 deletions mysticeti-core/src/committee.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,30 +129,50 @@ impl Committee {
total_stake
}

pub fn elect_leader(&self, round: u64) -> AuthorityIndex {
pub fn elect_leader(&self, round: u64, offset: u64) -> AuthorityIndex {
cfg_if::cfg_if! {
// TODO: we need to differentiate in tests the leader strategy so for some type of testing (ex sim tests)
// we can use the staked approach.
if #[cfg(test)] {
(round % self.authorities.len() as u64) as AuthorityIndex
((round + offset) % self.authorities.len() as u64) as AuthorityIndex
} else {
let mut seed_bytes = [0u8; 32];
seed_bytes[32 - 8..].copy_from_slice(&round.to_le_bytes());
let mut rng = StdRng::from_seed(seed_bytes);
let choices = self
.authorities
.iter()
.enumerate()
.map(|(index, authority)| (index, authority.stake as f32))
.collect::<Vec<_>>();
choices
.choose_weighted(&mut rng, |item| item.1)
.expect("Weighted choice error: stake values incorrect!")
.0 as AuthorityIndex
self.elect_leader_stake_based(round, offset)
}
}
}

pub fn elect_leader_stake_based(&self, round: u64, offset: u64) -> AuthorityIndex {
assert!((offset as usize) < self.authorities.len());

// if genesis, always return index 0 - TODO: this needs to be removed.
if round == 0 {
return 0;
}

// To ensure that we elect different leaders for the same round (using different offset) we are
// using as seed the round number to shuffle in a weighted way the results, but skip based on the offset
// TODO: use a cache in case this proves to be computationally expensive
let mut seed_bytes = [0u8; 32];
seed_bytes[32 - 8..].copy_from_slice(&(round).to_le_bytes());
let mut rng = StdRng::from_seed(seed_bytes);

let choices = self
.authorities
.iter()
.enumerate()
.map(|(index, authority)| (index as AuthorityIndex, authority.stake as f32))
.collect::<Vec<_>>();

let leader_index = *choices
.choose_multiple_weighted(&mut rng, self.authorities.len(), |item| item.1)
.expect("Weighted choice error: stake values incorrect!")
.skip(offset as usize)
.map(|(index, _)| index)
.collect::<Vec<_>>()[0];

leader_index
}

pub fn random_authority(&self, rng: &mut impl Rng) -> AuthorityIndex {
rng.gen_range(self.authorities())
}
Expand Down Expand Up @@ -500,4 +520,19 @@ mod test {
assert_eq!(Some(4..5), b.add(6));
assert_eq!(Some(6..7), b.finish());
}

#[test]
fn stake_aware_leader_election() {
let authorities_stake = vec![100, 200, 300, 400, 500];
let num_of_authorities = authorities_stake.len();
let committee = Committee::new_test(authorities_stake);

let mut elected_leaders = HashSet::new();
for offset in 0..num_of_authorities {
assert!(
elected_leaders.insert(committee.elect_leader_stake_based(10, offset as u64)),
"Leader already elected for another offset - that shouldn't happen"
);
}
}
}
2 changes: 1 addition & 1 deletion mysticeti-core/src/consensus/base_committer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ impl BaseCommitter {
}

let offset = self.options.leader_offset as RoundNumber;
Some(self.committee.elect_leader(round + offset))
Some(self.committee.elect_leader(round, offset))
}

/// Find which block is supported at (author, round) by the given block.
Expand Down
27 changes: 15 additions & 12 deletions mysticeti-core/src/consensus/tests/base_committer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ fn direct_commit() {

assert_eq!(sequence.len(), 1);
if let LeaderStatus::Commit(ref block) = sequence[0] {
assert_eq!(block.author(), committee.elect_leader(DEFAULT_WAVE_LENGTH))
assert_eq!(
block.author(),
committee.elect_leader(DEFAULT_WAVE_LENGTH, 0)
)
} else {
panic!("Expected a committed leader")
};
Expand Down Expand Up @@ -92,7 +95,7 @@ fn multiple_direct_commit() {

let leader_round = n as u64 * wave_length;
if let LeaderStatus::Commit(ref block) = sequence[0] {
assert_eq!(block.author(), committee.elect_leader(leader_round));
assert_eq!(block.author(), committee.elect_leader(leader_round, 0));
} else {
panic!("Expected a committed leader")
}
Expand Down Expand Up @@ -130,7 +133,7 @@ fn direct_commit_late_call() {
for (i, leader_block) in sequence.iter().enumerate() {
let leader_round = (i as u64 + 1) * wave_length;
if let LeaderStatus::Commit(ref block) = leader_block {
assert_eq!(block.author(), committee.elect_leader(leader_round));
assert_eq!(block.author(), committee.elect_leader(leader_round, 0));
} else {
panic!("Expected a committed leader")
};
Expand Down Expand Up @@ -179,7 +182,7 @@ fn no_leader() {

// Add enough blocks to reach the decision round of the first leader (but without the leader).
let leader_round_1 = wave_length;
let leader_1 = committee.elect_leader(leader_round_1);
let leader_1 = committee.elect_leader(leader_round_1, 0);

let connections = committee
.authorities()
Expand Down Expand Up @@ -233,7 +236,7 @@ fn direct_skip() {
// Filter out that leader.
let references_without_leader_1: Vec<_> = references_1
.into_iter()
.filter(|x| x.authority != committee.elect_leader(leader_round_1))
.filter(|x| x.authority != committee.elect_leader(leader_round_1, 0))
.collect();

// Add enough blocks to reach the decision round of the first leader.
Expand All @@ -260,7 +263,7 @@ fn direct_skip() {

assert_eq!(sequence.len(), 1);
if let LeaderStatus::Skip(leader, round) = sequence[0] {
assert_eq!(leader, committee.elect_leader(leader_round_1));
assert_eq!(leader, committee.elect_leader(leader_round_1, 0));
assert_eq!(round, leader_round_1);
} else {
panic!("Expected to directly skip the leader");
Expand All @@ -284,7 +287,7 @@ fn indirect_commit() {
let references_without_leader_1: Vec<_> = references_1
.iter()
.cloned()
.filter(|x| x.authority != committee.elect_leader(leader_round_1))
.filter(|x| x.authority != committee.elect_leader(leader_round_1, 0))
.collect();

// Only 2f+1 validators vote for the 1st leader.
Expand Down Expand Up @@ -356,7 +359,7 @@ fn indirect_commit() {
assert_eq!(sequence.len(), 2);

let leader_round = wave_length;
let leader = committee.elect_leader(leader_round);
let leader = committee.elect_leader(leader_round, 0);
if let LeaderStatus::Commit(ref block) = sequence[0] {
assert_eq!(block.author(), leader);
} else {
Expand All @@ -378,7 +381,7 @@ fn indirect_skip() {
let references_2 = build_dag(&committee, &mut block_writer, None, leader_round_2);

// Filter out that leader.
let leader_2 = committee.elect_leader(leader_round_2);
let leader_2 = committee.elect_leader(leader_round_2, 0);
let references_without_leader_2: Vec<_> = references_2
.iter()
.cloned()
Expand Down Expand Up @@ -433,7 +436,7 @@ fn indirect_skip() {

// Ensure we commit the leader of wave 1.
let leader_round_1 = wave_length;
let leader_1 = committee.elect_leader(leader_round_1);
let leader_1 = committee.elect_leader(leader_round_1, 0);
if let LeaderStatus::Commit(ref block) = sequence[0] {
assert_eq!(block.author(), leader_1);
} else {
Expand All @@ -451,7 +454,7 @@ fn indirect_skip() {

// Ensure we commit the 3rd leader.
let leader_round_3 = 3 * wave_length;
let leader_3 = committee.elect_leader(leader_round_3);
let leader_3 = committee.elect_leader(leader_round_3, 0);
if let LeaderStatus::Commit(ref block) = sequence[2] {
assert_eq!(block.author(), leader_3);
} else {
Expand All @@ -476,7 +479,7 @@ fn undecided() {
let references_without_leader_1: Vec<_> = references_1
.iter()
.cloned()
.filter(|x| x.authority != committee.elect_leader(leader_round_1))
.filter(|x| x.authority != committee.elect_leader(leader_round_1, 0))
.collect();

// Create a dag layer where only one authority votes for the first leader.
Expand Down
32 changes: 16 additions & 16 deletions mysticeti-core/src/consensus/tests/multi_committer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ fn direct_commit() {
if let LeaderStatus::Commit(block) = leader {
let leader_round = wave_length;
let leader_offset = i as u64;
let expected = committee.elect_leader(leader_round + leader_offset);
let expected = committee.elect_leader(leader_round, leader_offset);
assert_eq!(block.author(), expected);
} else {
panic!("Expected a committed leader")
Expand Down Expand Up @@ -109,7 +109,7 @@ fn multiple_direct_commit() {
for (i, leader) in sequence.iter().enumerate() {
if let LeaderStatus::Commit(block) = leader {
let leader_offset = i as u64;
let expected = committee.elect_leader(leader_round + leader_offset);
let expected = committee.elect_leader(leader_round, leader_offset);
assert_eq!(block.author(), expected);
} else {
panic!("Expected a committed leader")
Expand All @@ -130,7 +130,7 @@ fn direct_commit_partial_round() {
let number_of_leaders = committee.quorum_threshold() as usize;

let first_leader_round = wave_length;
let first_leader = committee.elect_leader(first_leader_round);
let first_leader = committee.elect_leader(first_leader_round, 0);
let last_committed = BlockReference::new_test(first_leader, first_leader_round);

let enough_blocks = 2 * wave_length - 1;
Expand All @@ -153,7 +153,7 @@ fn direct_commit_partial_round() {
for (i, leader) in sequence.iter().enumerate() {
if let LeaderStatus::Commit(block) = leader {
let leader_offset = (i + 1) % committee.len();
let expected = committee.elect_leader(first_leader_round + leader_offset as u64);
let expected = committee.elect_leader(first_leader_round, leader_offset as u64);
assert_eq!(block.author(), expected);
} else {
panic!("Expected a committed leader")
Expand Down Expand Up @@ -193,7 +193,7 @@ fn direct_commit_late_call() {
for (j, leader) in leaders.iter().enumerate() {
if let LeaderStatus::Commit(block) = leader {
let leader_offset = j as u64;
let expected = committee.elect_leader(leader_round + leader_offset);
let expected = committee.elect_leader(leader_round, leader_offset);
assert_eq!(block.author(), expected);
} else {
panic!("Expected a committed leader")
Expand Down Expand Up @@ -247,7 +247,7 @@ fn no_leader() {

// Add enough blocks to reach the decision round of wave 1 (but without its leader).
let leader_round_1 = wave_length;
let leader_1 = committee.elect_leader(leader_round_1);
let leader_1 = committee.elect_leader(leader_round_1, 0);

let connections = committee
.authorities()
Expand Down Expand Up @@ -281,7 +281,7 @@ fn no_leader() {
for (i, leader) in sequence.iter().enumerate() {
let leader_round = wave_length;
let leader_offset = i as u64;
let expected_leader = committee.elect_leader(leader_round + leader_offset);
let expected_leader = committee.elect_leader(leader_round, leader_offset);
if i == 0 {
if let LeaderStatus::Skip(leader, round) = sequence[i] {
assert_eq!(leader, expected_leader);
Expand Down Expand Up @@ -316,7 +316,7 @@ fn direct_skip() {
// Filter out that leader.
let references_without_leader_1: Vec<_> = references_1
.into_iter()
.filter(|x| x.authority != committee.elect_leader(leader_round_1))
.filter(|x| x.authority != committee.elect_leader(leader_round_1, 0))
.collect();

// Add enough blocks to reach the decision round of wave 1.
Expand Down Expand Up @@ -346,7 +346,7 @@ fn direct_skip() {
for (i, leader) in sequence.iter().enumerate() {
let leader_round = wave_length;
let leader_offset = i as u64;
let expected_leader = committee.elect_leader(leader_round + leader_offset);
let expected_leader = committee.elect_leader(leader_round, leader_offset);
if i == 0 {
if let LeaderStatus::Skip(leader, round) = sequence[i] {
assert_eq!(leader, expected_leader);
Expand Down Expand Up @@ -382,7 +382,7 @@ fn indirect_commit() {
let references_without_leader_1: Vec<_> = references_1
.iter()
.cloned()
.filter(|x| x.authority != committee.elect_leader(leader_round_1))
.filter(|x| x.authority != committee.elect_leader(leader_round_1, 0))
.collect();

// Only 2f+1 validators vote for the that leader.
Expand Down Expand Up @@ -455,7 +455,7 @@ fn indirect_commit() {
assert_eq!(sequence.len(), 2 * number_of_leaders);

let leader_round = wave_length;
let leader = committee.elect_leader(leader_round);
let leader = committee.elect_leader(leader_round, 0);
if let LeaderStatus::Commit(ref block) = sequence[0] {
assert_eq!(block.author(), leader);
} else {
Expand All @@ -478,7 +478,7 @@ fn indirect_skip() {
let references_2 = build_dag(&committee, &mut block_writer, None, leader_round_2);

// Filter out the first leader of wave 2.
let leader_2 = committee.elect_leader(leader_round_2);
let leader_2 = committee.elect_leader(leader_round_2, 0);
let references_without_leader_2: Vec<_> = references_2
.iter()
.cloned()
Expand Down Expand Up @@ -536,7 +536,7 @@ fn indirect_skip() {
for n in 0..number_of_leaders {
let leader_round_1 = wave_length;
let leader_offset = n as u64;
let leader_1 = committee.elect_leader(leader_round_1 + leader_offset);
let leader_1 = committee.elect_leader(leader_round_1, leader_offset);
if let LeaderStatus::Commit(ref block) = sequence[n] {
assert_eq!(block.author(), leader_1);
} else {
Expand Down Expand Up @@ -564,7 +564,7 @@ fn indirect_skip() {
panic!("Expected a skipped leader")
}
} else {
let leader_2 = committee.elect_leader(leader_round_2 + leader_offset);
let leader_2 = committee.elect_leader(leader_round_2, leader_offset);
if let LeaderStatus::Commit(ref block) = sequence[number_of_leaders + n] {
assert_eq!(block.author(), leader_2);
} else {
Expand All @@ -577,7 +577,7 @@ fn indirect_skip() {
for n in 0..number_of_leaders {
let leader_round_3 = 3 * wave_length;
let leader_offset = n as u64;
let leader_3 = committee.elect_leader(leader_round_3 + leader_offset);
let leader_3 = committee.elect_leader(leader_round_3, leader_offset);
if let LeaderStatus::Commit(ref block) = sequence[2 * number_of_leaders + n] {
assert_eq!(block.author(), leader_3);
} else {
Expand All @@ -604,7 +604,7 @@ fn undecided() {
let references_1_without_leader: Vec<_> = references_1
.iter()
.cloned()
.filter(|x| x.authority != committee.elect_leader(leader_round_1))
.filter(|x| x.authority != committee.elect_leader(leader_round_1, 0))
.collect();

// Create a dag layer where only one authority votes for that leader.
Expand Down
Loading

0 comments on commit e23f312

Please sign in to comment.