Skip to content

Commit

Permalink
misc: add flume benchmarks
Browse files Browse the repository at this point in the history
Signed-off-by: Anthony Griffon <[email protected]>
  • Loading branch information
Miaxos committed Jan 18, 2024
1 parent 302e055 commit f5a232d
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 3 deletions.
15 changes: 14 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ thiserror = "1"
cfg-if = "1"
criterion = { version = "0.5", features = ["async", "html_reports"] }
monoio = { version = "0.2", features = ["sync"] }
flume = "0.11"

[[bench]]
name = "sharding_direct"
Expand Down
199 changes: 197 additions & 2 deletions benches/sharding_direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::thread::JoinHandle;
use std::u128;

use criterion::{criterion_group, criterion_main, Criterion};
use flume::{Receiver, Sender};
use futures::StreamExt;
use sharded_thread::mesh::MeshBuilder;

Expand Down Expand Up @@ -66,6 +67,93 @@ fn start_threads<Msg: Send + 'static>(
(handles, mesh)
}

fn start_threads_flume<Msg: Send + 'static>(
) -> (Vec<JoinHandle<()>>, Vec<(Sender<Msg>, Receiver<Msg>)>) {
// 1 -> 2
let (tx1, rx2) = flume::unbounded::<Msg>();
// 2 -> 3
let (tx2, rx3) = flume::unbounded::<Msg>();
// 3 -> 1
let (tx3, rx1) = flume::unbounded::<Msg>();

let senders = vec![
(tx1.clone(), rx1.clone()),
(tx2.clone(), rx2.clone()),
(tx3.clone(), rx3.clone()),
];

let handle_1 = std::thread::spawn(move || {
// We lock the thread for the core
monoio::utils::bind_to_cpu_set(Some(0)).unwrap();
let mut rt = monoio::RuntimeBuilder::<Driver>::new()
.with_entries(1024)
.enable_timer()
.build()
.expect("Cannot build runtime");

rt.block_on(async move {
let handle = monoio::spawn(async move {
let rx = rx1;
while let Ok(val) = rx.recv_async().await {
tx2.send(val).unwrap();
}
});
handle.await
})
});

let handle_2 = std::thread::spawn(move || {
// We lock the thread for the core
monoio::utils::bind_to_cpu_set(Some(1)).unwrap();
let mut rt = monoio::RuntimeBuilder::<Driver>::new()
.with_entries(1024)
.enable_timer()
.build()
.expect("Cannot build runtime");

rt.block_on(async move {
let handle = monoio::spawn(async move {
let rx = rx2;
while let Ok(val) = rx.recv_async().await {
tx3.send(val).unwrap();
}
});
handle.await
})
});

/*
let handle_3 = std::thread::spawn(move || {
// We lock the thread for the core
monoio::utils::bind_to_cpu_set(Some(2)).unwrap();
let mut rt = monoio::RuntimeBuilder::<Driver>::new()
.with_entries(1024)
.enable_timer()
.build()
.expect("Cannot build runtime");
rt.block_on(async move {
let handle = monoio::spawn(async move {
let rx = rx3;
let mut count = 0;
while let Ok(val) = rx.recv_async().await {
if count > count_max {
return;
}
tx1.send(val).unwrap();
count += 1;
}
});
handle.await
})
});
*/

let handles: Vec<std::thread::JoinHandle<()>> = vec![handle_1, handle_2];

(handles, senders)
}

fn execute_round<T: Send + 'static>(
mesh: Arc<MeshBuilder<T>>,
count_max: usize,
Expand Down Expand Up @@ -160,7 +248,7 @@ fn bench_round(c: &mut Criterion) {

c.bench_function("rotate_a_usize_between_3_cpu_10_000", |b| {
b.iter_batched(
|| (Arc::clone(&mesh), execute_round(Arc::clone(&mesh), 1_000)),
|| (Arc::clone(&mesh), execute_round(Arc::clone(&mesh), 10_000)),
|(mesh, handle)| {
mesh.send_to(2, 12345).unwrap();
handle.join().unwrap();
Expand Down Expand Up @@ -267,9 +355,116 @@ fn bench_round_struct(c: &mut Criterion) {
});
}

fn execute_round_flume<T: Send + 'static>(
senders: Vec<(Sender<T>, Receiver<T>)>,
count_max: usize,
) -> JoinHandle<()> {
std::thread::spawn(move || {
// We lock the thread for the core
monoio::utils::bind_to_cpu_set(Some(2)).unwrap();
let mut rt = monoio::RuntimeBuilder::<Driver>::new()
.with_entries(1024)
.enable_timer()
.build()
.expect("Cannot build runtime");

let (tx1, _) = senders.get(0).cloned().unwrap();
let (_, rx3) = senders.get(2).cloned().unwrap();
rt.block_on(async move {
let handle = monoio::spawn(async move {
let rx = rx3;
let mut count = 0;
while let Ok(val) = rx.recv_async().await {
if count > count_max {
return;
}
tx1.send(val).unwrap();
count += 1;
}
});
handle.await
})
})
}

fn bench_round_flume(c: &mut Criterion) {
let (_, senders) = start_threads_flume::<usize>();

c.bench_function("flume_send_a_value_between_two_cpu_1", |b| {
b.iter_batched(
|| (senders.clone(), execute_round_flume(senders.clone(), 0)),
|(senders, handle)| {
senders.get(1).unwrap().0.send(12345).unwrap();
handle.join().unwrap();
},
criterion::BatchSize::PerIteration,
)
});

c.bench_function("flume_rotate_a_usize_between_3_cpu_1", |b| {
b.iter_batched(
|| (senders.clone(), execute_round_flume(senders.clone(), 1)),
|(senders, handle)| {
senders.get(2).unwrap().0.send(12345).unwrap();
handle.join().unwrap();
},
criterion::BatchSize::PerIteration,
)
});

c.bench_function("flume_rotate_a_usize_between_3_cpu_10", |b| {
b.iter_batched(
|| (senders.clone(), execute_round_flume(senders.clone(), 10)),
|(senders, handle)| {
senders.get(2).unwrap().0.send(12345).unwrap();
handle.join().unwrap();
},
criterion::BatchSize::PerIteration,
)
});

c.bench_function("flume_rotate_a_usize_between_3_cpu_100", |b| {
b.iter_batched(
|| (senders.clone(), execute_round_flume(senders.clone(), 100)),
|(senders, handle)| {
senders.get(2).unwrap().0.send(12345).unwrap();
handle.join().unwrap();
},
criterion::BatchSize::PerIteration,
)
});

c.bench_function("flume_rotate_a_usize_between_3_cpu_1000", |b| {
b.iter_batched(
|| (senders.clone(), execute_round_flume(senders.clone(), 1_000)),
|(senders, handle)| {
senders.get(2).unwrap().0.send(12345).unwrap();
handle.join().unwrap();
},
criterion::BatchSize::PerIteration,
)
});

c.bench_function("flume_rotate_a_usize_between_3_cpu_10_000", |b| {
b.iter_batched(
|| {
(
senders.clone(),
execute_round_flume(senders.clone(), 10_000),
)
},
|(senders, handle)| {
senders.get(2).unwrap().0.send(12345).unwrap();
handle.join().unwrap();
},
criterion::BatchSize::PerIteration,
)
});
}

criterion_group! {
name = benches;
config = Criterion::default();
targets = bench_round, bench_round_struct
targets = bench_round, bench_round_struct, bench_round_flume,
}
criterion_main!(benches);

0 comments on commit f5a232d

Please sign in to comment.