Skip to content

Commit

Permalink
A src/list/raxos/protocal/src/commonly_used/history/generic.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Oct 2, 2024
1 parent 0ffe9d1 commit c19e28e
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 30 deletions.
84 changes: 84 additions & 0 deletions src/list/raxos/protocal/src/commonly_used/history/generic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use std::collections::BTreeMap;
use std::collections::HashMap;

use crate::apaxos::history::History;
use crate::Types;

#[derive(Clone, Debug)]
pub struct GenericHistory<T>
where T: Types<History = Self>
{
maximals: HashMap<T::Time, ()>,
time_events: HashMap<T::Time, T::Event>,
}

impl<T> Default for GenericHistory<T>
where T: Types<History = Self>
{
fn default() -> Self {
Self {
maximals: HashMap::new(),
time_events: HashMap::new(),
}
}
}

impl<T: Types> History<T> for GenericHistory<T>
where T: Types<History = Self>
{
fn do_append(&mut self, time: T::Time, event: T::Event) {
self.time_events.insert(time, event);
for max in self.maximals.keys().copied().collect::<Vec<_>>() {
if max <= time {
self.maximals.remove(&max);
}
}
}

fn get(&self, time: &T::Time) -> Option<&T::Event> {
self.time_events.get(time)
}

fn lower_bounds(&self, time: T::Time) -> Self {
let time_events = self
.time_events
.iter()
.filter(|(t, _ev)| t <= &time)
.map(|(t, ev)| (*t, ev.clone()))
.collect::<HashMap<_, _>>();

Self {
maximals: build_maximals(&time_events),
time_events,
}
}

fn maximals(&self) -> impl Iterator<Item = (T::Time, T::Event)> {
self.maximals.keys().copied().map(move |t| (t, self.time_events[&t].clone()))
}

fn do_merge(&mut self, other: Self) {
for (time, event) in other.time_events {
self.time_events.insert(time, event);
}

for (time, _) in other.maximals {
self.maximals.insert(time, ());
}
}
}

/// Build a map of **maximal** times from a map of time events.
fn build_maximals<T: Types>(time_events: &HashMap<T::Time, T::Event>) -> HashMap<T::Time, ()> {
let mut maximals = HashMap::new();
for time in time_events.keys() {
for max in maximals.keys().copied().collect::<Vec<_>>() {
if time > &max {
maximals.remove(&max);
}
}

maximals.insert(*time, ());
}
maximals
}
1 change: 1 addition & 0 deletions src/list/raxos/protocal/src/commonly_used/history/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
pub mod linear;
pub mod generic;
2 changes: 2 additions & 0 deletions src/list/raxos/protocal/src/commonly_used/time.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub mod vec_2d;

pub struct BallotNumber {
pub round: u64,
pub leader: u64,
Expand Down
6 changes: 6 additions & 0 deletions src/list/raxos/protocal/src/commonly_used/time/vec_2d.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#[derive(Clone, Debug, Copy)]
#[derive(PartialOrd, PartialEq, Eq, Hash)]
pub struct Vec2DTime {
pub x: u64,
pub y: u64,
}
61 changes: 32 additions & 29 deletions src/list/raxos/protocal/src/implementations/paxos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,49 +42,52 @@ mod tests {

#[test]
fn test_paxos() -> anyhow::Result<()> {
//

let ex = Arc::new(Executor::new());

let fu = do_test(ex.clone());
let fu = async move {
let acceptor_ids = [1, 2, 3];

futures_lite::future::block_on(ex.run(fu))?;
Ok(())
let mut acceptors = BTreeMap::new();
for id in acceptor_ids {
acceptors.insert(id, Acceptor::default());
}

// TODO: rebuild from previous value
}
let quorum_set = Majority::new(acceptor_ids);
let transport = DirectCall::new(acceptors.clone());

async fn do_test(ex: Arc<Executor<'_>>) -> anyhow::Result<()> {
ex.spawn(async {
println!("Inner task");
})
.detach();
let mut apaxos = APaxos::<Paxos>::new(acceptor_ids, quorum_set, transport);

let acceptor_ids = [1, 2, 3];
let mut proposer = Proposer::new(&mut apaxos, 5, "hello".to_string());

let mut acceptors = BTreeMap::new();
for id in acceptor_ids {
acceptors.insert(id, Acceptor::default());
}
let committed = proposer.run().await?;

let quorum_set = Majority::new(acceptor_ids);
let transport = DirectCall::new(acceptors.clone());
assert_eq!(committed.latest_time(), Some(5));
assert_eq!(committed.latest_value(), Some(s("hello")));

let mut apaxos = APaxos::<Paxos>::new(acceptor_ids, quorum_set, transport);
println!("Done");

let mut proposer = Proposer::new(&mut apaxos, 5, "hello".to_string());
let committed = proposer.run().await?;
Ok::<(), anyhow::Error>(())
};

assert_eq!(committed.latest_time(), Some(5));
assert_eq!(committed.latest_value(), Some(s("hello")));
// let mut proposer = Proposer::new(&mut apaxos, 6, "world".to_string());
// let committed = proposer.run().await?;
//
// assert_eq!(committed.latest_time(), Some(6));
// assert_eq!(committed.latest_value(), Some(s("hello")));

ex.spawn(fu).detach();

let mut proposer = Proposer::new(&mut apaxos, 6, "world".to_string());
let committed = proposer.run().await?;
futures_lite::future::block_on(ex.tick());
Ok(())

assert_eq!(committed.latest_time(), Some(6));
assert_eq!(committed.latest_value(), Some(s("hello")));
// TODO: rebuild from previous value
}

println!("Done");
async fn do_test(ex: Arc<Executor<'_>>) -> anyhow::Result<()> {
ex.spawn(async {
println!("Inner task");
})
.detach();

Ok(())
}
Expand Down
1 change: 0 additions & 1 deletion src/list/raxos/protocal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ where Self: Default + Debug + Clone + Sized + 'static
/// - In Paxos, it is ballot number, which is `(round, proposer_id)`.
/// - In Raft, it is `(term, Option<voted_for>)`.
/// - In 2PC, it is mainly a vector of related data entry name.
// TODO: explain 2pc time.
type Time: Time;

/// The value to propose and to commit
Expand Down

0 comments on commit c19e28e

Please sign in to comment.