Skip to content

Commit

Permalink
add reputation scores
Browse files Browse the repository at this point in the history
  • Loading branch information
akichidis committed Dec 4, 2023
1 parent 63974e4 commit a797328
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 5 deletions.
4 changes: 4 additions & 0 deletions mysticeti-core/src/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::commit_observer::CommitObserverRecoveredState;
use crate::consensus::reputation_scores::ReputationScores;
use crate::metrics::{Metrics, UtilizationTimerExt};
use crate::state::{CoreRecoveredState, RecoveredStateBuilder};
use crate::types::{AuthorityIndex, BlockDigest, BlockReference, RoundNumber, StatementBlock};
Expand Down Expand Up @@ -556,6 +557,8 @@ pub struct CommitData {
pub sub_dag: Vec<BlockReference>,
// Height of the commit, corresponds to CommittedSubDag::height
pub height: u64,
// the reputation scores
pub reputation_scores: ReputationScores,
}

impl From<&CommittedSubDag> for CommitData {
Expand All @@ -565,6 +568,7 @@ impl From<&CommittedSubDag> for CommitData {
leader: value.anchor,
sub_dag,
height: value.height,
reputation_scores: value.reputation_scores.clone(),
}
}
}
Expand Down
5 changes: 3 additions & 2 deletions mysticeti-core/src/commit_observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl<H: ProcessedTransactionHandler<TransactionLocator>> TestCommitObserver<H> {
) -> Self {
let consensus_only = env::var("CONSENSUS_ONLY").is_ok();
let mut observer = Self {
commit_interpreter: Linearizer::new(block_store),
commit_interpreter: Linearizer::new(block_store, committee.clone()),
transaction_votes: TransactionAggregator::with_handler(handler),
committee,
committed_leaders: vec![],
Expand Down Expand Up @@ -218,10 +218,11 @@ impl SimpleCommitObserver {
last_sent_height: u64,
recover_state: CommitObserverRecoveredState,
metrics: Arc<Metrics>,
committee: Arc<Committee>,
) -> Self {
let mut observer = Self {
block_store: block_store.clone(),
commit_interpreter: Linearizer::new(block_store),
commit_interpreter: Linearizer::new(block_store, committee),
sender,
metrics,
};
Expand Down
62 changes: 59 additions & 3 deletions mysticeti-core/src/consensus/linearizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@

use crate::block_store::{BlockStore, CommitData};
use crate::commit_observer::CommitObserverRecoveredState;
use crate::committee::Committee;
use crate::consensus::reputation_scores::ReputationScores;
use crate::{
data::Data,
types::{BlockReference, StatementBlock},
};
use std::collections::HashSet;
use std::fmt;
use std::fmt::{Display, Formatter};
use std::sync::Arc;

/// The output of consensus is an ordered list of [`CommittedSubDag`]. The application can arbitrarily
/// sort the blocks within each sub-dag (but using a deterministic algorithm).
Expand All @@ -24,6 +27,8 @@ pub struct CommittedSubDag {
/// Height of the commit.
/// First commit after genesis has a height of 1, then every next commit has a height incremented by 1.
pub height: u64,
/// The reputation scores
pub reputation_scores: ReputationScores,
}

impl CommittedSubDag {
Expand All @@ -33,12 +38,14 @@ impl CommittedSubDag {
blocks: Vec<Data<StatementBlock>>,
timestamp_ms: u64,
height: u64,
reputation_scores: ReputationScores,
) -> Self {
Self {
anchor,
blocks,
timestamp_ms,
height,
reputation_scores,
}
}

Expand All @@ -61,7 +68,13 @@ impl CommittedSubDag {
let leader_block_idx = leader_block_idx.expect("Leader block must be in the sub-dag");
let leader_block_ref = blocks[leader_block_idx].reference();
let timestamp_ms = blocks[leader_block_idx].meta_creation_time_ms();
CommittedSubDag::new(*leader_block_ref, blocks, timestamp_ms, commit_data.height)
CommittedSubDag::new(
*leader_block_ref,
blocks,
timestamp_ms,
commit_data.height,
commit_data.reputation_scores,
)
}

/// Sort the blocks of the sub-dag by round number. Any deterministic algorithm works.
Expand Down Expand Up @@ -94,14 +107,22 @@ pub struct Linearizer {
committed: HashSet<BlockReference>,
/// Keep track of the height of last linearized commit
last_height: u64,
/// Keeps the last committed leader
last_committed_leader: BlockReference,
/// the latest reputation scores
reputation_scores: ReputationScores,
committee: Arc<Committee>,
}

impl Linearizer {
pub fn new(block_store: BlockStore) -> Self {
pub fn new(block_store: BlockStore, committee: Arc<Committee>) -> Self {
Self {
block_store,
committed: Default::default(),
last_height: Default::default(),
last_committed_leader: BlockReference::default(),
reputation_scores: ReputationScores::new(&committee),
committee,
}
}

Expand Down Expand Up @@ -146,7 +167,17 @@ impl Linearizer {
}
}
self.last_height += 1;
CommittedSubDag::new(leader_block_ref, to_commit, timestamp_ms, self.last_height)

// update the reputation scores
self.update_reputation_scores(self.last_height, &to_commit);

CommittedSubDag::new(
leader_block_ref,
to_commit,
timestamp_ms,
self.last_height,
self.reputation_scores.clone(),
)
}

pub fn handle_commit(
Expand All @@ -155,15 +186,40 @@ impl Linearizer {
) -> Vec<CommittedSubDag> {
let mut committed = vec![];
for leader_block in committed_leaders {
let leader_ref = *leader_block.reference();
// Collect the sub-dag generated using each of these leaders as anchor.
let mut sub_dag = self.collect_sub_dag(leader_block);

// [Optional] sort the sub-dag using a deterministic algorithm.
sub_dag.sort();
committed.push(sub_dag);

self.last_committed_leader = leader_ref;
}
committed
}

fn update_reputation_scores(&mut self, height: u64, to_commit: &Vec<Data<StatementBlock>>) {
static NUM_SUB_DAGS_PER_SCHEDULE: u64 = 200;

// always reset when it is the first for the new window
if height % NUM_SUB_DAGS_PER_SCHEDULE == 1 {
self.reputation_scores = ReputationScores::new(&self.committee);
}

for block in to_commit {
//for include in block.includes() {
// if self.last_committed_leader == *include {
// We give one point to the author of every committed block. This could be good enough for now
self.reputation_scores.add_score(block.author(), 1);
// break;
// }
//}
}

// mark the reputation scores as final
self.reputation_scores.final_of_schedule = height % NUM_SUB_DAGS_PER_SCHEDULE == 0;
}
}

impl fmt::Debug for CommittedSubDag {
Expand Down
1 change: 1 addition & 0 deletions mysticeti-core/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub mod base_committer;
pub mod linearizer;
pub mod universal_committer;

pub mod reputation_scores;
#[cfg(test)]
mod tests;

Expand Down
71 changes: 71 additions & 0 deletions mysticeti-core/src/consensus/reputation_scores.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use crate::committee::Committee;
use crate::types::AuthorityIndex;
use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::collections::HashMap;
use std::sync::Arc;

#[derive(Serialize, Deserialize, Clone, Debug, Default, Eq, PartialEq)]
pub struct ReputationScores {
/// Holds the score for every authority. If an authority is not amongst
/// the records of the map then we assume that its score is zero.
pub scores_per_authority: HashMap<AuthorityIndex, u64>,
/// When true it notifies us that those scores will be the last updated scores of the
/// current schedule before they get reset for the next schedule and start
/// scoring from the beginning. In practice we can leverage this information to
/// use the scores during the next schedule until the next final ones are calculated.
pub final_of_schedule: bool,
}

impl ReputationScores {
/// Creating a new ReputationScores instance pre-populating the authorities entries with
/// zero score value.
pub fn new(committee: &Arc<Committee>) -> Self {
let scores_per_authority = committee
.authorities()
.map(|index| (index, 0_u64))
.collect();

Self {
scores_per_authority,
..Default::default()
}
}
/// Adds the provided `score` to the existing score for the provided `authority`
pub fn add_score(&mut self, authority: AuthorityIndex, score: u64) {
self.scores_per_authority
.entry(authority)
.and_modify(|value| *value += score)
.or_insert(score);
}

pub fn total_authorities(&self) -> u64 {
self.scores_per_authority.len() as u64
}

pub fn all_zero(&self) -> bool {
!self.scores_per_authority.values().any(|e| *e > 0)
}

// Returns the authorities in score descending order.
pub fn authorities_by_score_desc(&self) -> Vec<(AuthorityIndex, u64)> {
let mut authorities: Vec<_> = self
.scores_per_authority
.iter()
.map(|(authority, score)| (*authority, *score))
.collect();

authorities.sort_by(|a1, a2| {
match a2.1.cmp(&a1.1) {
Ordering::Equal => {
// we resolve the score equality deterministically by ordering in authority
// identifier order descending.
a2.0.cmp(&a1.0)
}
result => result,
}
});

authorities
}
}
1 change: 1 addition & 0 deletions mysticeti-core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ impl Validator<SimpleBlockHandler> {
consumer.last_sent_height,
commit_observer_recovered,
metrics.clone(),
committee.clone(),
);

let validator = Validator::start_internal(
Expand Down

0 comments on commit a797328

Please sign in to comment.