From 46327f3a0799c4ec1efccf26932d7f2eb067f81e Mon Sep 17 00:00:00 2001 From: Lars Kroll Date: Wed, 29 Apr 2020 16:21:24 +0200 Subject: [PATCH 01/11] Added first support for executing Futures --- executors/Cargo.toml | 6 +- executors/src/common.rs | 2 +- executors/src/crossbeam_channel_pool.rs | 2 +- executors/src/crossbeam_workstealing_pool.rs | 2 +- executors/src/futures_executor.rs | 178 +++++++++++++++++++ executors/src/lib.rs | 5 +- executors/src/parker.rs | 2 +- executors/src/run_now.rs | 2 +- executors/src/threadpool_executor.rs | 2 +- 9 files changed, 192 insertions(+), 9 deletions(-) create mode 100644 executors/src/futures_executor.rs diff --git a/executors/Cargo.toml b/executors/Cargo.toml index 49de729..30e4a17 100644 --- a/executors/Cargo.toml +++ b/executors/Cargo.toml @@ -16,7 +16,7 @@ categories = ["concurrency", "asynchronous"] license = "MIT" [features] -default = ["threadpool-exec", "cb-channel-exec", "workstealing-exec", "ws-timed-fairness", "defaults"] +default = ["threadpool-exec", "cb-channel-exec", "workstealing-exec", "ws-timed-fairness", "defaults", "futures-support"] threadpool-exec = ["threadpool"] cb-channel-exec = ["crossbeam-channel"] @@ -32,6 +32,8 @@ ws-no-park = [] thread-pinning = ["core_affinity"] +futures-support = ["futures"] + [dependencies] log = "0.4" @@ -41,10 +43,10 @@ crossbeam-channel = {version = "0.4", optional = true} threadpool = {version = "1.7", optional = true} crossbeam-utils = {version = "0.7", optional = true} crossbeam-deque = {version = "0.7", optional = true} -#time = {version = "0.2", optional = true, default-featurs = false, features = ["std", ""]} rand = {version = "0.7", optional = true} num_cpus = {version = "1.12", optional = true} core_affinity = {version = "0.5", optional = true} +futures = {version = "0.3", optional = true} [dev-dependencies] env_logger = "0.7" diff --git a/executors/src/common.rs b/executors/src/common.rs index e91bd2e..13d1afe 100644 --- a/executors/src/common.rs +++ b/executors/src/common.rs @@ -1,4 +1,4 @@ -// Copyright 2017 Lars Kroll. See the LICENSE +// Copyright 2017-2020 Lars Kroll. See the LICENSE // file at the top-level directory of this distribution. // // Licensed under the MIT license diff --git a/executors/src/crossbeam_channel_pool.rs b/executors/src/crossbeam_channel_pool.rs index 5d5c389..6ffe489 100644 --- a/executors/src/crossbeam_channel_pool.rs +++ b/executors/src/crossbeam_channel_pool.rs @@ -1,4 +1,4 @@ -// Copyright 2017 Lars Kroll. See the LICENSE +// Copyright 2017-2020 Lars Kroll. See the LICENSE // file at the top-level directory of this distribution. // // Licensed under the MIT license diff --git a/executors/src/crossbeam_workstealing_pool.rs b/executors/src/crossbeam_workstealing_pool.rs index a74acd9..3216384 100644 --- a/executors/src/crossbeam_workstealing_pool.rs +++ b/executors/src/crossbeam_workstealing_pool.rs @@ -1,4 +1,4 @@ -// Copyright 2017 Lars Kroll. See the LICENSE +// Copyright 2017-2020 Lars Kroll. See the LICENSE // file at the top-level directory of this distribution. // // Licensed under the MIT license diff --git a/executors/src/futures_executor.rs b/executors/src/futures_executor.rs new file mode 100644 index 0000000..09583f0 --- /dev/null +++ b/executors/src/futures_executor.rs @@ -0,0 +1,178 @@ +// Copyright 2017-2020 Lars Kroll. See the LICENSE +// file at the top-level directory of this distribution. +// +// Licensed under the MIT license +// . +// This file may not be copied, modified, or distributed +// except according to those terms. + +use super::*; +use futures::{ + future::{BoxFuture, FutureExt}, + task::{waker_ref, ArcWake}, +}; +use std::{ + cell::UnsafeCell, + future::Future, + sync::Arc, + task::{Context, Poll}, +}; + +pub trait FuturesExecutor: Executor + Sync + 'static { + fn spawn(&self, future: impl Future + 'static + Send) -> (); +} +impl FuturesExecutor for E +where + E: Executor + Sync + 'static, +{ + fn spawn(&self, future: impl Future + 'static + Send) -> () { + let future = future.boxed(); + let task = FunTask { + future: UnsafeCell::new(Some(future)), + executor: self.clone(), + }; + let task = Arc::new(task); + self.execute(move || FunTask::run(task)); + } +} + +struct FunTask +where + E: Executor + Sync + 'static, +{ + future: UnsafeCell>>, + executor: E, +} + +impl FunTask +where + E: Executor + Sync + 'static, +{ + fn run(task: Arc) -> () { + let res = unsafe { + let src = task.future.get(); + src.as_mut().map(|opt| opt.take()).flatten() + }; + if let Some(mut f) = res { + let waker = waker_ref(&task); + let context = &mut Context::from_waker(&*waker); + // `BoxFuture` is a type alias for + // `Pin + Send + 'static>>`. + // We can get a `Pin<&mut dyn Future + Send + 'static>` + // from it by calling the `Pin::as_mut` method. + if let Poll::Pending = f.as_mut().poll(context) { + // We're not done processing the future, so put it + // back in its task to be run again in the future. + unsafe { + let dst = task.future.get(); + if let Some(slot) = dst.as_mut() { + *slot = Some(f); + } + } + } + } // else the future is already done with + } +} + +impl ArcWake for FunTask +where + E: Executor + Sync + 'static, +{ + fn wake_by_ref(arc_self: &Arc) { + // Implement `wake` by sending this task back onto the task channel + // so that it will be polled again by the executor. + let cloned = arc_self.clone(); + arc_self.executor.execute(move || FunTask::run(cloned)); + } +} + +// I'm making sure only one thread at a time has access to the contents here +unsafe impl Sync for FunTask where E: Executor + Sync + 'static {} + +#[cfg(test)] +mod tests { + use super::*; + + use futures::channel::oneshot::*; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::thread; + use std::time::Duration; + + async fn just_succeed(barrier: Arc) -> () { + let res = barrier.compare_and_swap(false, true, Ordering::SeqCst); + assert!(!res); // i.e. assert that the old value was false + } + + #[test] + fn test_async_ready_ccp() { + let exec = crate::crossbeam_channel_pool::ThreadPool::new(2); + test_async_ready_executor(&exec); + exec.shutdown().expect("shutdown"); + } + + #[test] + fn test_async_ready_cwp() { + let exec = crate::crossbeam_workstealing_pool::small_pool(2); + test_async_ready_executor(&exec); + exec.shutdown().expect("shutdown"); + } + + fn test_async_ready_executor(exec: &E) + where + E: FuturesExecutor, + { + let barrier = Arc::new(AtomicBool::new(false)); + let f = just_succeed(barrier.clone()); + exec.spawn(f); + let mut done = false; + while !done { + thread::sleep(Duration::from_millis(100)); + done = barrier.load(Ordering::SeqCst); + } + } + + async fn wait_for_channel(receiver: Receiver<()>, barrier: Arc) -> () { + let _ok = receiver.await.expect("message"); + let res = barrier.compare_and_swap(false, true, Ordering::SeqCst); + assert!(!res); // i.e. assert that the old value was false + } + + // Does not implement Sync + // #[test] + // fn test_async_pending_tpe() { + // let exec = crate::threadpool_executor::ThreadPoolExecutor::new(2); + // test_async_pending_executor(&exec); + // exec.shutdown().expect("shutdown"); + // } + + #[test] + fn test_async_pending_ccp() { + let exec = crate::crossbeam_channel_pool::ThreadPool::new(2); + test_async_pending_executor(&exec); + exec.shutdown().expect("shutdown"); + } + + #[test] + fn test_async_pending_cwp() { + let exec = crate::crossbeam_workstealing_pool::small_pool(2); + test_async_pending_executor(&exec); + exec.shutdown().expect("shutdown"); + } + + fn test_async_pending_executor(exec: &E) + where + E: FuturesExecutor, + { + let barrier = Arc::new(AtomicBool::new(false)); + let (tx, rx) = channel(); + let f = wait_for_channel(rx, barrier.clone()); + exec.spawn(f); + thread::sleep(Duration::from_millis(100)); + tx.send(()).expect("sent"); + let mut done = false; + while !done { + thread::sleep(Duration::from_millis(100)); + done = barrier.load(Ordering::SeqCst); + } + } +} diff --git a/executors/src/lib.rs b/executors/src/lib.rs index 1d878d1..493fb67 100644 --- a/executors/src/lib.rs +++ b/executors/src/lib.rs @@ -1,4 +1,4 @@ -// Copyright 2017 Lars Kroll. See the LICENSE +// Copyright 2017-2020 Lars Kroll. See the LICENSE // file at the top-level directory of this distribution. // // Licensed under the MIT license @@ -33,6 +33,9 @@ mod timeconstants; use crate::common::ignore; pub use crate::common::Executor; +#[cfg(feature = "futures-support")] +pub mod futures_executor; + //use bichannel::*; use synchronoise::CountdownEvent; diff --git a/executors/src/parker.rs b/executors/src/parker.rs index ee3ca80..6359b6d 100644 --- a/executors/src/parker.rs +++ b/executors/src/parker.rs @@ -1,4 +1,4 @@ -// Copyright 2017 Lars Kroll. See the LICENSE +// Copyright 2017-2020 Lars Kroll. See the LICENSE // file at the top-level directory of this distribution. // // Licensed under the MIT license diff --git a/executors/src/run_now.rs b/executors/src/run_now.rs index e454d21..519e131 100644 --- a/executors/src/run_now.rs +++ b/executors/src/run_now.rs @@ -1,4 +1,4 @@ -// Copyright 2017 Lars Kroll. See the LICENSE +// Copyright 2017-2020 Lars Kroll. See the LICENSE // file at the top-level directory of this distribution. // // Licensed under the MIT license diff --git a/executors/src/threadpool_executor.rs b/executors/src/threadpool_executor.rs index 88eb101..8271120 100644 --- a/executors/src/threadpool_executor.rs +++ b/executors/src/threadpool_executor.rs @@ -1,4 +1,4 @@ -// Copyright 2017 Lars Kroll. See the LICENSE +// Copyright 2017-2020 Lars Kroll. See the LICENSE // file at the top-level directory of this distribution. // // Licensed under the MIT license From 322386533c4ef42dcd133b43137aec44f2a06fc2 Mon Sep 17 00:00:00 2001 From: Lars Kroll Date: Wed, 3 Jun 2020 16:38:17 +0200 Subject: [PATCH 02/11] Reduce number of Arcs to clone in crossbeam_workstealing_pool handles --- executors/Cargo.toml | 6 + executors/benches/scheduler.rs | 213 ++++++++++ executors/src/crossbeam_workstealing_pool.rs | 394 ++++++++++--------- executors/src/futures_executor.rs | 2 +- executors/src/parker.rs | 2 +- 5 files changed, 431 insertions(+), 186 deletions(-) create mode 100644 executors/benches/scheduler.rs diff --git a/executors/Cargo.toml b/executors/Cargo.toml index 30e4a17..df7dd74 100644 --- a/executors/Cargo.toml +++ b/executors/Cargo.toml @@ -51,6 +51,7 @@ futures = {version = "0.3", optional = true} [dev-dependencies] env_logger = "0.7" version-sync = "0.8" +bencher = "0.1.5" [badges] @@ -60,3 +61,8 @@ version-sync = "0.8" maintenance = { status = "actively-developed" } travis-ci = { repository = "Bathtor/rust-executors", branch = "master" } + +[[bench]] +name = "scheduler" +path = "benches/scheduler.rs" +harness = false diff --git a/executors/benches/scheduler.rs b/executors/benches/scheduler.rs new file mode 100644 index 0000000..be0e5ad --- /dev/null +++ b/executors/benches/scheduler.rs @@ -0,0 +1,213 @@ +//! Benchmark implementation details of the theaded scheduler. These benches are +//! intended to be used as a form of regression testing and not as a general +//! purpose benchmark demonstrating real-world performance. + +use executors::crossbeam_workstealing_pool; +use executors::futures_executor::*; +use executors::Executor; +use futures::channel::oneshot; + +use bencher::{benchmark_group, benchmark_main, Bencher}; +use std::sync::atomic::AtomicUsize; +use std::sync::atomic::Ordering::Relaxed; +use std::sync::{mpsc, Arc}; + +fn spawn_many(b: &mut Bencher) { + const NUM_SPAWN: usize = 10_000; + + let rt = rt(); + + let (tx, rx) = mpsc::sync_channel(1); + let rem = Arc::new(AtomicUsize::new(0)); + + b.iter(|| { + rem.store(NUM_SPAWN, Relaxed); + + //futures::executor::block_on(async { + for _ in 0..NUM_SPAWN { + let tx = tx.clone(); + let rem = rem.clone(); + + rt.spawn(async move { + if 1 == rem.fetch_sub(1, Relaxed) { + tx.send(()).unwrap(); + } + }); + } + + let _ = rx.recv().unwrap(); + //}); + }); + + rt.shutdown().expect("shutdown"); +} + +// fn yield_many(b: &mut Bencher) { +// const NUM_YIELD: usize = 1_000; +// const TASKS: usize = 200; + +// let rt = rt(); + +// let (tx, rx) = mpsc::sync_channel(TASKS); + +// b.iter(move || { +// for _ in 0..TASKS { +// let tx = tx.clone(); + +// rt.spawn(async move { +// for _ in 0..NUM_YIELD { +// tokio::task::yield_now().await; +// } + +// tx.send(()).unwrap(); +// }); +// } + +// for _ in 0..TASKS { +// let _ = rx.recv().unwrap(); +// } +// }); +// } + +fn ping_pong(b: &mut Bencher) { + const NUM_PINGS: usize = 1_000; + + let rt = rt(); + + let (done_tx, done_rx) = mpsc::sync_channel(1000); + let rem = Arc::new(AtomicUsize::new(0)); + + b.iter(|| { + let done_tx = done_tx.clone(); + let rem = rem.clone(); + rem.store(NUM_PINGS, Relaxed); + + futures::executor::block_on(async { + let rt_new = rt.clone(); + rt.spawn(async move { + for _ in 0..NUM_PINGS { + let rem = rem.clone(); + let done_tx = done_tx.clone(); + let rt_new2 = rt_new.clone(); + rt_new.spawn(async move { + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + + rt_new2.spawn(async move { + rx1.await.unwrap(); + tx2.send(()).unwrap(); + }); + + tx1.send(()).unwrap(); + rx2.await.unwrap(); + + if 1 == rem.fetch_sub(1, Relaxed) { + done_tx.send(()).unwrap(); + } + }); + } + }); + + done_rx.recv().unwrap(); + }); + }); + rt.shutdown().expect("shutdown"); +} + +fn chained_spawn(b: &mut Bencher) { + const ITER: usize = 1_000; + + let rt = rt(); + + fn iter(done_tx: mpsc::SyncSender<()>, n: usize) { + if n == 0 { + done_tx.send(()).unwrap(); + } else { + let _ = crossbeam_workstealing_pool::execute_locally(move || { + iter(done_tx, n - 1); + }); + } + } + + let (done_tx, done_rx) = mpsc::sync_channel(1000); + let rt_inner = rt.clone(); + b.iter(move || { + let done_tx = done_tx.clone(); + rt_inner.execute(move || { + iter(done_tx, ITER); + }); + + done_rx.recv().unwrap(); + }); + rt.shutdown().expect("shutdown"); +} + +fn chained_spawn_no_local(b: &mut Bencher) { + const ITER: usize = 1_000; + + let rt = rt(); + + fn iter(rt: impl FuturesExecutor, done_tx: mpsc::SyncSender<()>, n: usize) { + if n == 0 { + done_tx.send(()).unwrap(); + } else { + let rt_new = rt.clone(); + rt.execute(move || { + iter(rt_new, done_tx, n - 1); + }); + } + } + + let (done_tx, done_rx) = mpsc::sync_channel(1); + let rt_inner = rt.clone(); + b.iter(move || { + let done_tx = done_tx.clone(); + let rt_new = rt_inner.clone(); + rt_inner.execute(move || { + iter(rt_new, done_tx, ITER); + }); + + done_rx.recv().unwrap(); + }); + rt.shutdown().expect("shutdown"); +} + +fn chained_spawn_async(b: &mut Bencher) { + const ITER: usize = 1_000; + + let rt = rt(); + + fn iter(rt: impl FuturesExecutor, done_tx: mpsc::SyncSender<()>, n: usize) { + if n == 0 { + done_tx.send(()).unwrap(); + } else { + let rt_new = rt.clone(); + rt.spawn(async move { + iter(rt_new, done_tx, n - 1); + }); + } + } + + let (done_tx, done_rx) = mpsc::sync_channel(1); + let rt_inner = rt.clone(); + b.iter(move || { + let done_tx = done_tx.clone(); + //futures::executor::block_on(async { + let rt_new = rt_inner.clone(); + rt_inner.spawn(async move { + iter(rt_new, done_tx, ITER); + }); + + done_rx.recv().unwrap(); + //}); + }); + rt.shutdown().expect("shutdown"); +} + +fn rt() -> impl FuturesExecutor { + crossbeam_workstealing_pool::small_pool(4) +} + +benchmark_group!(scheduler, chained_spawn, chained_spawn_no_local, chained_spawn_async); + +benchmark_main!(scheduler); diff --git a/executors/src/crossbeam_workstealing_pool.rs b/executors/src/crossbeam_workstealing_pool.rs index 3216384..1877a71 100644 --- a/executors/src/crossbeam_workstealing_pool.rs +++ b/executors/src/crossbeam_workstealing_pool.rs @@ -156,18 +156,53 @@ thread_local!( static LOCAL_JOB_QUEUE: UnsafeCell>> = UnsafeCell::new(Option::None); ); +/// Try to append the job to the thread-local job queue +/// +/// This only work if called from a thread that is part of the pool. +/// Otherwise the job will be returned in an `Err` variant. +pub fn execute_locally(job: F) -> Result<(), F> +where + F: FnOnce() + Send + 'static, +{ + LOCAL_JOB_QUEUE.with(|qo| unsafe { + match *qo.get() { + Some(ref q) => { + let msg = Job(Box::new(job)); + q.push(msg); + Ok(()) + } + None => Err(job), + } + }) +} + +/// A handle associated with the thread pool structure #[derive(Clone, Debug)] pub struct ThreadPool

where P: Parker + Clone + 'static, { - core: Arc>>, - global_sender: Arc>, + inner: Arc>, +} + +#[derive(Debug)] +struct ThreadPoolInner

+where + P: Parker + Clone + 'static, +{ + core: Mutex, + global_sender: deque::Injector, threads: usize, - shutdown: Arc, + shutdown: AtomicBool, parker: P, } +#[derive(Debug)] +struct ThreadPoolCore { + ids: usize, + workers: BTreeMap, +} + impl

ThreadPool

where P: Parker + Clone + 'static, @@ -202,35 +237,38 @@ where if let Some(max) = parker.max_threads() { assert!(threads <= max); } - let injector = Arc::new(deque::Injector::new()); - let shutdown = Arc::new(AtomicBool::new(false)); - let core = ThreadPoolCore::new(injector.clone(), shutdown.clone(), threads, parker.clone()); - let pool = ThreadPool { - core: Arc::new(Mutex::new(core)), - global_sender: injector, + let core = ThreadPoolCore::new(); + let inner = ThreadPoolInner { + core: Mutex::new(core), + global_sender: deque::Injector::new(), threads, - shutdown, + shutdown: AtomicBool::new(false), parker, }; + let pool = ThreadPool { + inner: Arc::new(inner), + }; #[cfg(not(feature = "thread-pinning"))] { - let mut guard = pool.core.lock().unwrap(); + let mut guard = pool.inner.core.lock().unwrap(); for _ in 0..threads { - guard.spawn_worker(pool.core.clone(), None); + pool.inner + .spawn_worker(&mut guard, pool.inner.clone(), None); } } #[cfg(feature = "thread-pinning")] { - let mut guard = pool.core.lock().unwrap(); + let mut guard = pool.core.inner.lock().unwrap(); let cores = core_affinity::get_core_ids().expect("core ids"); let num_pinned = cores.len().min(threads); for i in 0..num_pinned { - guard.spawn_worker_pinned(pool.core.clone(), None, cores[i]); + guard.spawn_worker_pinned(pool.inner.clone(), None, cores[i]); } if num_pinned < threads { let num_unpinned = threads - num_pinned; for _ in 0..num_unpinned { - guard.spawn_worker(pool.core.clone(), None); + pool.inner + .spawn_worker(&mut guard, pool.inner.clone(), None); } } } @@ -256,28 +294,24 @@ where if let Some(max) = parker.max_threads() { assert!(total_threads <= max); } - let injector = Arc::new(deque::Injector::new()); - let shutdown = Arc::new(AtomicBool::new(false)); - let core = ThreadPoolCore::new( - injector.clone(), - shutdown.clone(), - total_threads, - parker.clone(), - ); - let pool = ThreadPool { - core: Arc::new(Mutex::new(core)), - global_sender: injector.clone(), - threads: total_threads, - shutdown, + let core = ThreadPoolCore::new(); + let inner = ThreadPoolInner { + core: Mutex::new(core), + global_sender: deque::Injector::new(), + threads, + shutdown: AtomicBool::new(false), parker, }; + let pool = ThreadPool { + inner: Arc::new(inner), + }; { - let mut guard = pool.core.lock().unwrap(); + let mut guard = pool.inner.core.lock().unwrap(); cores.iter().for_each(|core_id| { - guard.spawn_worker_pinned(pool.core.clone(), None, *core_id); + guard.spawn_worker_pinned(pool.inner.clone(), None, *core_id); }); for _ in 0..floating { - guard.spawn_worker(pool.core.clone(), None); + guard.spawn_worker(pool.inner.clone(), None); } } pool @@ -305,7 +339,7 @@ where where F: FnOnce() + Send + 'static, { - if !self.shutdown.load(Ordering::Relaxed) { + if !self.inner.shutdown.load(Ordering::Relaxed) { LOCAL_JOB_QUEUE.with(|qo| unsafe { let msg = Job(Box::new(job)); match *qo.get() { @@ -314,9 +348,9 @@ where } None => { debug!("Scheduling on global pool."); - self.global_sender.push(msg); + self.inner.global_sender.push(msg); #[cfg(not(feature = "ws-no-park"))] - self.parker.unpark_one(); + self.inner.parker.unpark_one(); } } }) @@ -335,13 +369,14 @@ where fn shutdown_borrowed(&self) -> Result<(), String> { if !self + .inner .shutdown .compare_and_swap(false, true, Ordering::SeqCst) { - let latch = Arc::new(CountdownEvent::new(self.threads)); - debug!("Shutting down {} threads", self.threads); + let latch = Arc::new(CountdownEvent::new(self.inner.threads)); + debug!("Shutting down {} threads", self.inner.threads); { - let guard = self.core.lock().unwrap(); + let guard = self.inner.core.lock().unwrap(); for worker in guard.workers.values() { worker .control @@ -350,7 +385,7 @@ where } } #[cfg(not(feature = "ws-no-park"))] - self.parker.unpark_all(); // gotta wake up all threads so they can get shut down + self.inner.parker.unpark_all(); // gotta wake up all threads so they can get shut down let remaining = latch.wait_timeout(Duration::from_millis(MAX_WAIT_SHUTDOWN_MS)); if remaining == 0 { debug!("All threads shut down"); @@ -367,54 +402,88 @@ where } } -//fn spawn_worker(core: Arc>) { -// let id = { -// let mut guard = core.lock().unwrap(); -// guard.new_worker_id() -// }; -// let core2 = core.clone(); -// thread::spawn(move || { -// let mut worker = ThreadPoolWorker::new(id, core2); -// worker.run() -// }); -//} - -#[derive(Debug)] -struct WorkerEntry { - control: channel::Sender, - stealer: Option, -} - -#[derive(Debug)] -struct ThreadPoolCore

+impl

ThreadPoolInner

where P: Parker + Clone + 'static, { - global_injector: Arc>, - shutdown: Arc, - _threads: usize, - ids: usize, - workers: BTreeMap, - parker: P, + fn spawn_worker( + &self, + guard: &mut std::sync::MutexGuard<'_, ThreadPoolCore>, + inner: Arc, + old_id: Option, + ) { + //let guard = self.core.lock().unwrap(); + let id = old_id.unwrap_or_else(|| guard.new_worker_id()); + let (tx_control, rx_control) = channel::unbounded(); + let worker = WorkerEntry { + control: tx_control, + stealer: Option::None, + }; + guard.workers.insert(id, worker); + //drop(guard); + thread::Builder::new() + .name(format!("cb-ws-pool-worker-{}", id)) + .spawn(move || { + let mut worker = ThreadPoolWorker::new(id, rx_control, &inner); + drop(inner); + worker.run() + }) + .expect("Could not create worker thread!"); + } + + #[cfg(feature = "thread-pinning")] + fn spawn_worker_pinned( + &self, + guard: &mut std::sync::MutexGuard<'_, ThreadPoolCore>, + inner: Arc, + old_id: Option, + core_id: core_affinity::CoreId, + ) { + //let guard = self.core.lock().unwrap(); + let id = old_id.unwrap_or_else(|| guard.new_worker_id()); + let (tx_control, rx_control) = channel::unbounded(); + let worker = WorkerEntry { + control: tx_control, + stealer: Option::None, + }; + guard.workers.insert(id, worker); + //drop(guard); + thread::Builder::new() + .name(format!("cb-ws-pool-worker-{}", id)) + .spawn(move || { + core_affinity::set_for_current(core_id); + let mut worker = ThreadPoolWorker::new(id, rx_control, &inner); + drop(inner); + worker.run() + }) + .expect("Could not create worker thread!"); + } } -impl

ThreadPoolCore

+impl

Drop for ThreadPoolInner

where P: Parker + Clone + 'static, { - fn new( - global_injector: Arc>, - shutdown: Arc, - threads: usize, - parker: P, - ) -> ThreadPoolCore

{ + fn drop(&mut self) { + if !self.shutdown.load(Ordering::SeqCst) { + warn!("Threadpools should be shut down before deallocating."); + // the threads won't leak, as they'll get errors on their control queues next time they check + self.parker.unpark_all(); // must wake them all up so they don't idle forever + } + } +} + +#[derive(Debug)] +struct WorkerEntry { + control: channel::Sender, + stealer: Option, +} + +impl ThreadPoolCore { + fn new() -> Self { ThreadPoolCore { - global_injector, - shutdown, - _threads: threads, ids: 0, workers: BTreeMap::new(), - parker, } } @@ -452,7 +521,7 @@ where .map(|w| w.stealer.clone().unwrap()) .collect(); for (wid, worker) in self.workers.iter() { - let l: LinkedList = stealers + let l: Vec = stealers .iter() .filter(|s| s.id() != *wid) .cloned() @@ -463,64 +532,6 @@ where .unwrap_or_else(|e| error!("Error submitting Stealer msg: {:?}", e)); } } - - fn spawn_worker(&mut self, core: Arc>>, old_id: Option) { - let id = old_id.unwrap_or_else(|| self.new_worker_id()); - let (tx_control, rx_control) = channel::unbounded(); - let worker = WorkerEntry { - control: tx_control, - stealer: Option::None, - }; - let recv = self.global_injector.clone(); - self.workers.insert(id, worker); - let p = self.parker.clone(); - thread::Builder::new() - .name(format!("cb-ws-pool-worker-{}", id)) - .spawn(move || { - let mut worker = ThreadPoolWorker::new(id, recv, rx_control, core, p); - worker.run() - }) - .expect("Could not create worker thread!"); - } - - #[cfg(feature = "thread-pinning")] - fn spawn_worker_pinned( - &mut self, - core: Arc>>, - old_id: Option, - core_id: core_affinity::CoreId, - ) { - let id = old_id.unwrap_or_else(|| self.new_worker_id()); - let (tx_control, rx_control) = channel::unbounded(); - let worker = WorkerEntry { - control: tx_control, - stealer: Option::None, - }; - let recv = self.global_injector.clone(); - self.workers.insert(id, worker); - let p = self.parker.clone(); - thread::Builder::new() - .name(format!("cb-ws-pool-worker-{}", id)) - .spawn(move || { - core_affinity::set_for_current(core_id); - let mut worker = ThreadPoolWorker::new(id, recv, rx_control, core, p); - worker.run() - }) - .expect("Could not create worker thread!"); - } -} - -impl

Drop for ThreadPoolCore

-where - P: Parker + Clone + 'static, -{ - fn drop(&mut self) { - if !self.shutdown.load(Ordering::SeqCst) { - warn!("Threadpools should be shut down before deallocating."); - // the threads won't leak, as they'll get errors on their control queues next time they check - self.parker.unpark_all(); // must wake them all up so they don't idle forever - } - } } #[derive(Debug)] @@ -529,12 +540,10 @@ where P: Parker + Clone + 'static, { id: usize, - core: Weak>>, - global_injector: Arc>, + core: Weak>, control: channel::Receiver, stealers: Vec, random: ThreadRng, - parker: P, } impl

ThreadPoolWorker

@@ -543,19 +552,15 @@ where { fn new( id: usize, - global_injector: Arc>, control: channel::Receiver, - core: Arc>>, - parker: P, + core: &Arc>, ) -> ThreadPoolWorker

{ ThreadPoolWorker { id, - core: Arc::downgrade(&core), - global_injector, + core: Arc::downgrade(core), control, stealers: Vec::new(), random: rand::thread_rng(), - parker, } } @@ -565,13 +570,19 @@ where } #[inline(always)] - fn abort_sleep(&self, backoff: &Backoff, snoozing: &mut bool, parking: &mut bool) -> () { + fn abort_sleep( + &self, + core: &Arc>, + backoff: &Backoff, + snoozing: &mut bool, + parking: &mut bool, + ) -> () { if *parking { #[cfg(feature = "ws-no-park")] unreachable!("parking should never be true in ws-no-park!"); #[cfg(not(feature = "ws-no-park"))] { - self.parker.abort_park(*self.id()); + core.parker.abort_park(*self.id()); *parking = false; } } @@ -597,14 +608,19 @@ where *q.get() = Some(worker); stealer }); + let core = self.core.upgrade().expect("Core shut down already!"); + //info!("Setting up new worker {}. strong={}, weak={}", self.id(), Arc::strong_count(&core), Arc::weak_count(&core)); let local_stealer = JobStealer::new(local_stealer_raw, self.id); - self.register_stealer(local_stealer.clone()); - self.parker.init(*self.id()); + self.register_stealer(&core, local_stealer); + core.parker.init(*self.id()); + drop(core); let sentinel = Sentinel::new(self.core.clone(), self.id); let backoff = Backoff::new(); let mut snoozing = false; let mut parking = false; + let mut stop_latch: Option> = None; 'main: loop { + //info!("Worker {} starting main loop", self.id()); let mut fairness_check = false; #[cfg(feature = "ws-timed-fairness")] let next_global_check = Instant::now() + Duration::new(0, CHECK_GLOBAL_INTERVAL_NS); @@ -642,21 +658,25 @@ where panic!("Queue should have been initialised!"); } }); + + let core = self.core.upgrade().expect("Core shut down already!"); + //info!("Worker {} taking new core handle. strong={}, weak={}", self.id(), Arc::strong_count(&core), Arc::weak_count(&core)); // drain the control queue 'ctrl: loop { match self.control.try_recv() { Ok(msg) => match msg { ControlMsg::Stealers(l) => { self.update_stealers(l); - self.abort_sleep(&backoff, &mut snoozing, &mut parking); + self.abort_sleep(&core, &backoff, &mut snoozing, &mut parking); } ControlMsg::Stop(latch) => { - latch.decrement().expect("stop latch decrements"); + stop_latch = Some(latch); break 'main; } }, Err(channel::TryRecvError::Empty) => break 'ctrl, Err(channel::TryRecvError::Disconnected) => { + drop(core); warn!("Worker {} self-terminating.", self.id()); sentinel.cancel(); panic!("Threadpool wasn't shut down properly!"); @@ -666,7 +686,7 @@ where // sometimes try the global queue let glob_res = LOCAL_JOB_QUEUE.with(|q| unsafe { if let Some(ref local_queue) = *q.get() { - self.global_injector.steal_batch_and_pop(local_queue) + core.global_sender.steal_batch_and_pop(local_queue) } else { panic!("Queue should have been initialised!"); } @@ -674,13 +694,13 @@ where if let deque::Steal::Success(msg) = glob_res { let Job(f) = msg; f.call_box(); - self.abort_sleep(&backoff, &mut snoozing, &mut parking); + self.abort_sleep(&core, &backoff, &mut snoozing, &mut parking); continue 'main; } // only go on if there was no work left on the local queue if fairness_check { #[cfg(not(feature = "ws-no-park"))] - self.parker.unpark_one(); // wake up more threads if there is more work to do + core.parker.unpark_one(); // wake up more threads if there is more work to do continue 'main; } // try to steal something! @@ -688,7 +708,7 @@ where if let deque::Steal::Success(msg) = stealer.steal() { let Job(f) = msg; f.call_box(); - self.abort_sleep(&backoff, &mut snoozing, &mut parking); + self.abort_sleep(&core, &backoff, &mut snoozing, &mut parking); continue 'main; // only steal once before checking locally again } } @@ -696,51 +716,48 @@ where if parking { #[cfg(feature = "ws-no-park")] unreachable!("parking should never be true in ws-no-park!"); - #[cfg(not(feature = "ws-no-park"))] - match self.parker.park(*self.id()) { + //#[cfg(not(feature = "ws-no-park"))] + let parker = core.parker.clone(); + drop(core); + match parker.park(*self.id()) { ParkResult::Retry => (), // just start over ParkResult::Abort | ParkResult::Woken => { self.stop_sleep(&backoff, &mut snoozing, &mut parking); } } } else if backoff.is_completed() && cfg!(not(feature = "ws-no-park")) { - self.parker.prepare_park(*self.id()); + core.parker.prepare_park(*self.id()); parking = true; } else { + drop(core); backoff.snooze(); } // aaaaand starting over with 'local } sentinel.cancel(); - self.unregister_stealer(local_stealer); + let core = self.core.upgrade().expect("Core shut down already!"); + self.unregister_stealer(&core); LOCAL_JOB_QUEUE.with(|q| unsafe { *q.get() = None; }); debug!("Worker {} shutting down", self.id()); + drop(core); + if let Some(latch) = stop_latch.take() { + latch.decrement().expect("stop latch decrements"); + } } - fn register_stealer(&mut self, stealer: JobStealer) { - match self.core.upgrade() { - Some(core) => { - let mut guard = core.lock().unwrap(); - guard.add_stealer(stealer); - } - None => panic!("Core was already shut down!"), - } + fn register_stealer(&mut self, core: &Arc>, stealer: JobStealer) { + let mut guard = core.core.lock().unwrap(); + guard.add_stealer(stealer); } - fn unregister_stealer(&mut self, _stealer: JobStealer) { - match self.core.upgrade() { - Some(core) => { - let mut guard = core.lock().unwrap(); - guard.drop_worker(self.id); - } - None => panic!("Core was already shut down!"), - } + fn unregister_stealer(&mut self, core: &Arc>) { + let mut guard = core.core.lock().unwrap(); + guard.drop_worker(self.id); } - fn update_stealers(&mut self, l: LinkedList) { - let mut v = Vec::from_iter(l.into_iter()); + fn update_stealers(&mut self, mut v: Vec) { v.shuffle(&mut self.random); self.stealers = v; } @@ -795,7 +812,7 @@ impl Debug for JobStealer { struct Job(Box); enum ControlMsg { - Stealers(LinkedList), + Stealers(Vec), Stop(Arc), } @@ -812,7 +829,7 @@ struct Sentinel

where P: Parker + Clone + 'static, { - core: Weak>>, + core: Weak>, id: usize, active: bool, } @@ -821,7 +838,7 @@ impl

Sentinel

where P: Parker + Clone + 'static, { - fn new(core: Weak>>, id: usize) -> Sentinel

{ + fn new(core: Weak>, id: usize) -> Sentinel

{ Sentinel { core, id, @@ -856,16 +873,15 @@ where } jobs }); - let mut guard = core.lock().unwrap(); - let gsend = guard.global_injector.clone(); + let mut guard = core.core.lock().unwrap(); // cleanup guard.drop_worker(self.id); // restart // make sure the new thread starts with the same worker id, so the parker doesn't run out of slots - guard.spawn_worker(core.clone(), Some(self.id)); + core.spawn_worker(&mut guard, core.clone(), Some(self.id)); drop(guard); for job in jobs.into_iter() { - gsend.push(job); + core.global_sender.push(job); } } None => warn!("Could not restart worker, as pool has been deallocated!"), @@ -1082,11 +1098,21 @@ mod tests { }); let res = latch.wait_timeout(Duration::from_secs(1)); assert_eq!(res, 0); - let core = Arc::downgrade(&pool.core); + let core_weak = Arc::downgrade(&pool.inner); // sleep before drop, to ensure that even parked threads shut down std::thread::sleep(std::time::Duration::from_secs(1)); drop(pool); std::thread::sleep(std::time::Duration::from_secs(1)); - assert!(core.upgrade().is_none()); + match core_weak.upgrade() { + Some(core) => { + println!( + "Outstanding references: strong={}, weak={}", + Arc::strong_count(&core), + Arc::weak_count(&core) + ); + panic!("Pool should have been dropped!"); + } + None => (), // ok + } } } diff --git a/executors/src/futures_executor.rs b/executors/src/futures_executor.rs index 09583f0..00dd1ee 100644 --- a/executors/src/futures_executor.rs +++ b/executors/src/futures_executor.rs @@ -36,7 +36,7 @@ where } } -struct FunTask +pub(crate) struct FunTask where E: Executor + Sync + 'static, { diff --git a/executors/src/parker.rs b/executors/src/parker.rs index 6359b6d..3b92684 100644 --- a/executors/src/parker.rs +++ b/executors/src/parker.rs @@ -39,7 +39,7 @@ pub enum ParkResult { Woken, } -pub trait Parker: Send + Clone { +pub trait Parker: Sync + Send + Clone { /// Maximum number of threads supported by this parker implementation. fn max_threads(&self) -> Option; From 4c00215e2c6b389c530ec4696465b73a14405412 Mon Sep 17 00:00:00 2001 From: Lars Kroll Date: Wed, 3 Jun 2020 16:43:57 +0200 Subject: [PATCH 03/11] make clippy happy --- executors/benches/scheduler.rs | 7 ++++++- executors/src/crossbeam_workstealing_pool.rs | 1 - 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/executors/benches/scheduler.rs b/executors/benches/scheduler.rs index be0e5ad..6b8a3b6 100644 --- a/executors/benches/scheduler.rs +++ b/executors/benches/scheduler.rs @@ -208,6 +208,11 @@ fn rt() -> impl FuturesExecutor { crossbeam_workstealing_pool::small_pool(4) } -benchmark_group!(scheduler, chained_spawn, chained_spawn_no_local, chained_spawn_async); +benchmark_group!( + scheduler, + chained_spawn, + chained_spawn_no_local, + chained_spawn_async +); benchmark_main!(scheduler); diff --git a/executors/src/crossbeam_workstealing_pool.rs b/executors/src/crossbeam_workstealing_pool.rs index 1877a71..cc880b0 100644 --- a/executors/src/crossbeam_workstealing_pool.rs +++ b/executors/src/crossbeam_workstealing_pool.rs @@ -74,7 +74,6 @@ use crate::parker::{DynParker, ParkResult, Parker}; use rand::prelude::*; use std::cell::UnsafeCell; use std::collections::BTreeMap; -use std::collections::LinkedList; use std::fmt::{Debug, Formatter}; use std::iter::{FromIterator, IntoIterator}; use std::ops::Deref; From c5a5e308377d8fc210fb43e1720c9770bf88c427 Mon Sep 17 00:00:00 2001 From: Lars Kroll Date: Wed, 3 Jun 2020 17:02:21 +0200 Subject: [PATCH 04/11] Actually make the new layout work with all features --- executors/src/crossbeam_workstealing_pool.rs | 33 +++++++++++--------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/executors/src/crossbeam_workstealing_pool.rs b/executors/src/crossbeam_workstealing_pool.rs index cc880b0..e53a8a9 100644 --- a/executors/src/crossbeam_workstealing_pool.rs +++ b/executors/src/crossbeam_workstealing_pool.rs @@ -75,7 +75,7 @@ use rand::prelude::*; use std::cell::UnsafeCell; use std::collections::BTreeMap; use std::fmt::{Debug, Formatter}; -use std::iter::{FromIterator, IntoIterator}; +//use std::iter::{FromIterator, IntoIterator}; use std::ops::Deref; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex, Weak}; @@ -257,11 +257,12 @@ where } #[cfg(feature = "thread-pinning")] { - let mut guard = pool.core.inner.lock().unwrap(); + let mut guard = pool.inner.core.lock().unwrap(); let cores = core_affinity::get_core_ids().expect("core ids"); let num_pinned = cores.len().min(threads); for i in 0..num_pinned { - guard.spawn_worker_pinned(pool.inner.clone(), None, cores[i]); + pool.inner + .spawn_worker_pinned(&mut guard, pool.inner.clone(), None, cores[i]); } if num_pinned < threads { let num_unpinned = threads - num_pinned; @@ -297,7 +298,7 @@ where let inner = ThreadPoolInner { core: Mutex::new(core), global_sender: deque::Injector::new(), - threads, + threads: total_threads, shutdown: AtomicBool::new(false), parker, }; @@ -307,10 +308,12 @@ where { let mut guard = pool.inner.core.lock().unwrap(); cores.iter().for_each(|core_id| { - guard.spawn_worker_pinned(pool.inner.clone(), None, *core_id); + pool.inner + .spawn_worker_pinned(&mut guard, pool.inner.clone(), None, *core_id); }); for _ in 0..floating { - guard.spawn_worker(pool.inner.clone(), None); + pool.inner + .spawn_worker(&mut guard, pool.inner.clone(), None); } } pool @@ -617,7 +620,7 @@ where let backoff = Backoff::new(); let mut snoozing = false; let mut parking = false; - let mut stop_latch: Option> = None; + let mut stop_latch: Option>; 'main: loop { //info!("Worker {} starting main loop", self.id()); let mut fairness_check = false; @@ -715,13 +718,15 @@ where if parking { #[cfg(feature = "ws-no-park")] unreachable!("parking should never be true in ws-no-park!"); - //#[cfg(not(feature = "ws-no-park"))] - let parker = core.parker.clone(); - drop(core); - match parker.park(*self.id()) { - ParkResult::Retry => (), // just start over - ParkResult::Abort | ParkResult::Woken => { - self.stop_sleep(&backoff, &mut snoozing, &mut parking); + #[cfg(not(feature = "ws-no-park"))] + { + let parker = core.parker.clone(); + drop(core); + match parker.park(*self.id()) { + ParkResult::Retry => (), // just start over + ParkResult::Abort | ParkResult::Woken => { + self.stop_sleep(&backoff, &mut snoozing, &mut parking); + } } } } else if backoff.is_completed() && cfg!(not(feature = "ws-no-park")) { From b0abb55ff0b61804e3f83980364a885dded47178 Mon Sep 17 00:00:00 2001 From: Lars Kroll Date: Thu, 4 Jun 2020 16:59:25 +0200 Subject: [PATCH 05/11] Switch to criterion for micro benchmarks and fix a concurrency bug in the futures api --- executors/Cargo.toml | 2 +- executors/benches/scheduler.rs | 455 +++++++++++++++++++++--------- executors/src/futures_executor.rs | 165 ++++++++++- 3 files changed, 475 insertions(+), 147 deletions(-) diff --git a/executors/Cargo.toml b/executors/Cargo.toml index df7dd74..93a42c7 100644 --- a/executors/Cargo.toml +++ b/executors/Cargo.toml @@ -51,7 +51,7 @@ futures = {version = "0.3", optional = true} [dev-dependencies] env_logger = "0.7" version-sync = "0.8" -bencher = "0.1.5" +criterion = "0.3" [badges] diff --git a/executors/benches/scheduler.rs b/executors/benches/scheduler.rs index 6b8a3b6..d7e0ecf 100644 --- a/executors/benches/scheduler.rs +++ b/executors/benches/scheduler.rs @@ -2,44 +2,258 @@ //! intended to be used as a form of regression testing and not as a general //! purpose benchmark demonstrating real-world performance. -use executors::crossbeam_workstealing_pool; use executors::futures_executor::*; -use executors::Executor; -use futures::channel::oneshot; -use bencher::{benchmark_group, benchmark_main, Bencher}; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering::Relaxed; -use std::sync::{mpsc, Arc}; +use criterion::{criterion_group, criterion_main, Criterion, Throughput}; + +const CHAINING_DEPTH: usize = 1000; +const SPAWN_NUM: usize = 1000; +const PINGS_NUM: usize = 1000; + +pub fn chained_spawn_benchmark(c: &mut Criterion) { + let mut group = c.benchmark_group("Chained Spawn"); + group.throughput(Throughput::Elements(CHAINING_DEPTH as u64)); + // CBWP + group.bench_function("Local Function CBWP", chained_spawn::local_function_cbwp); + group.bench_function( + "No-Local Function CBWP", + chained_spawn::no_local_function_cbwp, + ); + group.bench_function("Async CBWP", chained_spawn::async_cbwp); + // CBCP + group.bench_function( + "No-Local Function CBCP", + chained_spawn::no_local_function_cbcp, + ); + group.bench_function("Async CBCP", chained_spawn::async_cbcp); + group.finish(); +} -fn spawn_many(b: &mut Bencher) { - const NUM_SPAWN: usize = 10_000; +pub fn spawn_many_benchmark(c: &mut Criterion) { + let mut group = c.benchmark_group("Spawn Many"); + group.throughput(Throughput::Elements(SPAWN_NUM as u64)); + // CBWP + group.bench_function("Local Function CBWP", spawn_many::function_cbwp); + group.bench_function("Async CBWP", spawn_many::async_cbwp); + // CBCP + group.bench_function("Local Function CBCP", spawn_many::function_cbcp); + group.bench_function("Async CBCP", spawn_many::async_cbcp); + group.finish(); +} - let rt = rt(); +pub fn ping_pong_benchmark(c: &mut Criterion) { + let mut group = c.benchmark_group("Ping Pong"); + group.throughput(Throughput::Elements(PINGS_NUM as u64)); + // CBWP + //group.bench_function("Local Function CBWP", spawn_many::function_cbwp); + group.bench_function("Async CBWP", ping_pong::async_cbwp); + // CBCP + //group.bench_function("Local Function CBCP", spawn_many::function_cbcp); + group.bench_function("Async CBCP", ping_pong::async_cbcp); + group.finish(); +} - let (tx, rx) = mpsc::sync_channel(1); - let rem = Arc::new(AtomicUsize::new(0)); +mod chained_spawn { + use super::*; + use criterion::Bencher; + use std::sync::mpsc; - b.iter(|| { - rem.store(NUM_SPAWN, Relaxed); + fn cbwp_rt() -> impl FuturesExecutor { + executors::crossbeam_workstealing_pool::small_pool(4) + } - //futures::executor::block_on(async { - for _ in 0..NUM_SPAWN { - let tx = tx.clone(); - let rem = rem.clone(); + fn cbcp_rt() -> impl FuturesExecutor { + executors::crossbeam_channel_pool::ThreadPool::new(4) + } - rt.spawn(async move { - if 1 == rem.fetch_sub(1, Relaxed) { - tx.send(()).unwrap(); - } + pub fn local_function_cbwp(b: &mut Bencher) { + let rt = cbwp_rt(); + chained_spawn_fun(b, rt); + } + + pub fn no_local_function_cbwp(b: &mut Bencher) { + let rt = cbwp_rt(); + chained_spawn_no_local(b, rt); + } + + pub fn async_cbwp(b: &mut Bencher) { + let rt = cbwp_rt(); + chained_spawn_async(b, rt); + } + + pub fn no_local_function_cbcp(b: &mut Bencher) { + let rt = cbcp_rt(); + chained_spawn_no_local(b, rt); + } + + pub fn async_cbcp(b: &mut Bencher) { + let rt = cbcp_rt(); + chained_spawn_async(b, rt); + } + + fn chained_spawn_fun(b: &mut Bencher, rt: impl FuturesExecutor) { + fn iter(done_tx: mpsc::SyncSender<()>, n: usize) { + if n == 0 { + done_tx.send(()).unwrap(); + } else { + let _ = executors::crossbeam_workstealing_pool::execute_locally(move || { + iter(done_tx, n - 1); + }); + } + } + + let (done_tx, done_rx) = mpsc::sync_channel(1); + let rt_inner = rt.clone(); + b.iter(move || { + let done_tx = done_tx.clone(); + rt_inner.execute(move || { + iter(done_tx, CHAINING_DEPTH); }); + + done_rx.recv().unwrap(); + }); + rt.shutdown().expect("shutdown"); + } + + fn chained_spawn_no_local(b: &mut Bencher, rt: impl FuturesExecutor) { + fn iter(rt: impl FuturesExecutor, done_tx: mpsc::SyncSender<()>, n: usize) { + if n == 0 { + done_tx.send(()).unwrap(); + } else { + let rt_new = rt.clone(); + rt.execute(move || { + iter(rt_new, done_tx, n - 1); + }); + } + } + + let (done_tx, done_rx) = mpsc::sync_channel(1); + let rt_inner = rt.clone(); + b.iter(move || { + let done_tx = done_tx.clone(); + let rt_new = rt_inner.clone(); + rt_inner.execute(move || { + iter(rt_new, done_tx, CHAINING_DEPTH); + }); + + done_rx.recv().unwrap(); + }); + rt.shutdown().expect("shutdown"); + } + + fn chained_spawn_async(b: &mut Bencher, rt: impl FuturesExecutor) { + fn iter(rt: impl FuturesExecutor, done_tx: mpsc::SyncSender<()>, n: usize) { + if n == 0 { + done_tx.send(()).unwrap(); + } else { + let rt_new = rt.clone(); + rt.spawn(async move { + iter(rt_new, done_tx, n - 1); + }); + } } - let _ = rx.recv().unwrap(); - //}); - }); + let (done_tx, done_rx) = mpsc::sync_channel(1); + let rt_inner = rt.clone(); + b.iter(move || { + let done_tx = done_tx.clone(); + futures::executor::block_on(async { + let rt_new = rt_inner.clone(); + rt_inner.spawn(async move { + iter(rt_new, done_tx, CHAINING_DEPTH); + }); + + done_rx.recv().unwrap(); + }); + }); + rt.shutdown().expect("shutdown"); + } +} + +mod spawn_many { + use super::*; + use criterion::Bencher; + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering::Relaxed; + use std::sync::{mpsc, Arc}; + + fn cbwp_rt() -> impl FuturesExecutor { + executors::crossbeam_workstealing_pool::small_pool(8) + } + + fn cbcp_rt() -> impl FuturesExecutor { + executors::crossbeam_channel_pool::ThreadPool::new(8) + } + + pub fn function_cbwp(b: &mut Bencher) { + let rt = cbwp_rt(); + spawn_many_function(b, rt); + } + + pub fn async_cbwp(b: &mut Bencher) { + let rt = cbwp_rt(); + spawn_many_async(b, rt); + } + + pub fn function_cbcp(b: &mut Bencher) { + let rt = cbcp_rt(); + spawn_many_function(b, rt); + } + + pub fn async_cbcp(b: &mut Bencher) { + let rt = cbcp_rt(); + spawn_many_async(b, rt); + } + + fn spawn_many_function(b: &mut Bencher, rt: impl FuturesExecutor) { + let (tx, rx) = mpsc::sync_channel(1); + let rem = Arc::new(AtomicUsize::new(0)); + + b.iter(|| { + rem.store(SPAWN_NUM, Relaxed); + + for _ in 0..SPAWN_NUM { + let tx = tx.clone(); + let rem = rem.clone(); + + rt.execute(move || { + if 1 == rem.fetch_sub(1, Relaxed) { + tx.send(()).unwrap(); + } + }); + } + + let _ = rx.recv().unwrap(); + }); + + rt.shutdown().expect("shutdown"); + } + + fn spawn_many_async(b: &mut Bencher, rt: impl FuturesExecutor) { + let (tx, rx) = mpsc::sync_channel(1); + let rem = Arc::new(AtomicUsize::new(0)); + + b.iter(|| { + rem.store(SPAWN_NUM, Relaxed); - rt.shutdown().expect("shutdown"); + futures::executor::block_on(async { + for _ in 0..SPAWN_NUM { + let tx = tx.clone(); + let rem = rem.clone(); + + rt.spawn(async move { + if 1 == rem.fetch_sub(1, Relaxed) { + tx.send(()).unwrap(); + } + }); + } + + let _ = rx.recv().unwrap(); + }); + }); + + rt.shutdown().expect("shutdown"); + } } // fn yield_many(b: &mut Bencher) { @@ -69,23 +283,66 @@ fn spawn_many(b: &mut Bencher) { // }); // } -fn ping_pong(b: &mut Bencher) { - const NUM_PINGS: usize = 1_000; +mod ping_pong { + + use super::*; + use criterion::Bencher; + use futures::channel::oneshot; + use std::sync::atomic::AtomicUsize; + use std::sync::atomic::Ordering::SeqCst; + use std::sync::{mpsc, Arc}; + use std::time::Duration; + + fn cbwp_rt() -> impl FuturesExecutor { + executors::crossbeam_workstealing_pool::small_pool(4) + } + + fn cbcp_rt() -> impl FuturesExecutor { + executors::crossbeam_channel_pool::ThreadPool::new(4) + } + + // pub fn local_function_cbwp(b: &mut Bencher) { + // let rt = cbwp_rt(); + // chained_spawn_fun(b, rt); + // } + + // pub fn no_local_function_cbwp(b: &mut Bencher) { + // let rt = cbwp_rt(); + // chained_spawn_no_local(b, rt); + // } + + pub fn async_cbwp(b: &mut Bencher) { + let rt = cbwp_rt(); + ping_pong_async(b, rt); + } + + // pub fn no_local_function_cbcp(b: &mut Bencher) { + // let rt = cbcp_rt(); + // chained_spawn_no_local(b, rt); + // } - let rt = rt(); + pub fn async_cbcp(b: &mut Bencher) { + let rt = cbcp_rt(); + ping_pong_async(b, rt); + } - let (done_tx, done_rx) = mpsc::sync_channel(1000); - let rem = Arc::new(AtomicUsize::new(0)); + fn ping_pong_async(b: &mut Bencher, rt: impl FuturesExecutor) { + let (done_tx, done_rx) = mpsc::sync_channel(1); + let rem = Arc::new(AtomicUsize::new(0)); - b.iter(|| { - let done_tx = done_tx.clone(); - let rem = rem.clone(); - rem.store(NUM_PINGS, Relaxed); + let mut count = 0u64; - futures::executor::block_on(async { + b.iter(|| { + let done_tx = done_tx.clone(); + let rem = rem.clone(); + let outer_rem = rem.clone(); + rem.store(PINGS_NUM, SeqCst); + count += 1u64; + //println!("Iteration #{}", count); + //futures::executor::block_on(async { let rt_new = rt.clone(); rt.spawn(async move { - for _ in 0..NUM_PINGS { + for _i in 0..PINGS_NUM { let rem = rem.clone(); let done_tx = done_tx.clone(); let rt_new2 = rt_new.clone(); @@ -101,118 +358,34 @@ fn ping_pong(b: &mut Bencher) { tx1.send(()).unwrap(); rx2.await.unwrap(); - if 1 == rem.fetch_sub(1, Relaxed) { - done_tx.send(()).unwrap(); + let res = rem.fetch_sub(1, SeqCst); + if 1 == res { + done_tx.try_send(()).expect("done should have sent"); } + // else { + // println!("Pinger {} is done, but {} remaining.", i, res); + // } }); } }); - done_rx.recv().unwrap(); - }); - }); - rt.shutdown().expect("shutdown"); -} - -fn chained_spawn(b: &mut Bencher) { - const ITER: usize = 1_000; - - let rt = rt(); - - fn iter(done_tx: mpsc::SyncSender<()>, n: usize) { - if n == 0 { - done_tx.send(()).unwrap(); - } else { - let _ = crossbeam_workstealing_pool::execute_locally(move || { - iter(done_tx, n - 1); - }); - } - } - - let (done_tx, done_rx) = mpsc::sync_channel(1000); - let rt_inner = rt.clone(); - b.iter(move || { - let done_tx = done_tx.clone(); - rt_inner.execute(move || { - iter(done_tx, ITER); - }); - - done_rx.recv().unwrap(); - }); - rt.shutdown().expect("shutdown"); -} - -fn chained_spawn_no_local(b: &mut Bencher) { - const ITER: usize = 1_000; - - let rt = rt(); - - fn iter(rt: impl FuturesExecutor, done_tx: mpsc::SyncSender<()>, n: usize) { - if n == 0 { - done_tx.send(()).unwrap(); - } else { - let rt_new = rt.clone(); - rt.execute(move || { - iter(rt_new, done_tx, n - 1); - }); - } - } - - let (done_tx, done_rx) = mpsc::sync_channel(1); - let rt_inner = rt.clone(); - b.iter(move || { - let done_tx = done_tx.clone(); - let rt_new = rt_inner.clone(); - rt_inner.execute(move || { - iter(rt_new, done_tx, ITER); + let res = done_rx.recv_timeout(Duration::from_millis(5000)); //.expect("should have gotten a done with 5s"); + if res.is_err() { + panic!( + "done_rx timeouted within 5s. Remaining={}", + outer_rem.load(SeqCst) + ); + } + //}); }); - - done_rx.recv().unwrap(); - }); - rt.shutdown().expect("shutdown"); -} - -fn chained_spawn_async(b: &mut Bencher) { - const ITER: usize = 1_000; - - let rt = rt(); - - fn iter(rt: impl FuturesExecutor, done_tx: mpsc::SyncSender<()>, n: usize) { - if n == 0 { - done_tx.send(()).unwrap(); - } else { - let rt_new = rt.clone(); - rt.spawn(async move { - iter(rt_new, done_tx, n - 1); - }); - } + rt.shutdown().expect("shutdown"); } - - let (done_tx, done_rx) = mpsc::sync_channel(1); - let rt_inner = rt.clone(); - b.iter(move || { - let done_tx = done_tx.clone(); - //futures::executor::block_on(async { - let rt_new = rt_inner.clone(); - rt_inner.spawn(async move { - iter(rt_new, done_tx, ITER); - }); - - done_rx.recv().unwrap(); - //}); - }); - rt.shutdown().expect("shutdown"); } -fn rt() -> impl FuturesExecutor { - crossbeam_workstealing_pool::small_pool(4) -} - -benchmark_group!( +criterion_group!( scheduler, - chained_spawn, - chained_spawn_no_local, - chained_spawn_async + chained_spawn_benchmark, + spawn_many_benchmark, + ping_pong_benchmark ); - -benchmark_main!(scheduler); +criterion_main!(scheduler); diff --git a/executors/src/futures_executor.rs b/executors/src/futures_executor.rs index 00dd1ee..c0077d6 100644 --- a/executors/src/futures_executor.rs +++ b/executors/src/futures_executor.rs @@ -14,10 +14,19 @@ use futures::{ use std::{ cell::UnsafeCell, future::Future, - sync::Arc, + sync::{ + atomic::{AtomicBool, AtomicUsize, Ordering}, + Arc, Mutex, + }, task::{Context, Poll}, }; +mod task_state { + pub const WAITING: usize = 0; + pub const SCHEDULED: usize = 1; + pub const DONE: usize = 2; +} + pub trait FuturesExecutor: Executor + Sync + 'static { fn spawn(&self, future: impl Future + 'static + Send) -> (); } @@ -29,6 +38,8 @@ where let future = future.boxed(); let task = FunTask { future: UnsafeCell::new(Some(future)), + //future: Mutex::new(Some(future)), + state: AtomicUsize::new(task_state::SCHEDULED), executor: self.clone(), }; let task = Arc::new(task); @@ -36,11 +47,126 @@ where } } +// pub(crate) struct FunTask +// where +// E: Executor + Sync + 'static, +// { +// future: Mutex>>, +// executor: E, +// } + +// impl FunTask +// where +// E: Executor + Sync + 'static, +// { +// fn run(task: Arc) -> () { +// // let res = unsafe { +// // let src = task.future.get(); +// // src.as_mut().map(|opt| opt.take()).flatten() +// // }; +// let mut guard = task.future.lock().unwrap(); +// if let Some(mut f) = guard.take() { +// let waker = waker_ref(&task); +// let context = &mut Context::from_waker(&*waker); +// // `BoxFuture` is a type alias for +// // `Pin + Send + 'static>>`. +// // We can get a `Pin<&mut dyn Future + Send + 'static>` +// // from it by calling the `Pin::as_mut` method. +// if let Poll::Pending = f.as_mut().poll(context) { +// // We're not done processing the future, so put it +// // back in its task to be run again in the future. +// // unsafe { +// // let dst = task.future.get(); +// // if let Some(slot) = dst.as_mut() { +// // *slot = Some(f); +// // } +// // } +// *guard = Some(f); +// } +// } // else the future is already done with +// } +// } + +// impl ArcWake for FunTask +// where +// E: Executor + Sync + 'static, +// { +// fn wake_by_ref(arc_self: &Arc) { +// // Implement `wake` by sending this task back onto the task channel +// // so that it will be polled again by the executor. +// let cloned = arc_self.clone(); +// arc_self.executor.execute(move || FunTask::run(cloned)); +// } +// } + +// pub(crate) struct FunTask +// where +// E: Executor + Sync + 'static, +// { +// future: UnsafeCell>>, +// scheduled: AtomicBool, +// executor: E, +// } + +// impl FunTask +// where +// E: Executor + Sync + 'static, +// { +// fn run(task: Arc) -> () { +// let res = unsafe { +// let src = task.future.get(); +// src.as_mut().map(|opt| opt.take()).flatten() +// }; +// if let Some(mut f) = res { +// let waker = waker_ref(&task); +// let context = &mut Context::from_waker(&*waker); +// // `BoxFuture` is a type alias for +// // `Pin + Send + 'static>>`. +// // We can get a `Pin<&mut dyn Future + Send + 'static>` +// // from it by calling the `Pin::as_mut` method. +// if let Poll::Pending = f.as_mut().poll(context) { +// // We're not done processing the future, so put it +// // back in its task to be run again in the future. +// unsafe { +// let dst = task.future.get(); +// if let Some(slot) = dst.as_mut() { +// *slot = Some(f); +// } +// } +// } +// } else { +// // else the future is already done with +// //unreachable!("Shouldn't get here!"); +// //eprintln!("Future got schedulled despite being done!"); +// } +// task.scheduled.store(false, Ordering::Release); +// } +// } + +// impl ArcWake for FunTask +// where +// E: Executor + Sync + 'static, +// { +// fn wake_by_ref(arc_self: &Arc) { +// // Implement `wake` by sending this task back onto the task channel +// // so that it will be polled again by the executor. +// while arc_self +// .scheduled +// .compare_and_swap(false, true, Ordering::Acquire) +// { +// // hot wait here, since this only coveres the range between pool and the end of the function, where nothing expensive happens +// } +// let cloned = arc_self.clone(); +// arc_self.executor.execute(move || FunTask::run(cloned)); +// } +// } + pub(crate) struct FunTask where E: Executor + Sync + 'static, { future: UnsafeCell>>, + state: AtomicUsize, executor: E, } @@ -69,8 +195,16 @@ where *slot = Some(f); } } + task.state.store(task_state::WAITING, Ordering::Release); + } else { + task.state.store(task_state::DONE, Ordering::Release); } - } // else the future is already done with + } else { + // else the future is already done with + //unreachable!("Shouldn't get here!"); + //eprintln!("Future got schedulled despite being done!"); + task.state.store(task_state::DONE, Ordering::Release); + } } } @@ -81,12 +215,33 @@ where fn wake_by_ref(arc_self: &Arc) { // Implement `wake` by sending this task back onto the task channel // so that it will be polled again by the executor. - let cloned = arc_self.clone(); - arc_self.executor.execute(move || FunTask::run(cloned)); + + loop { + // hot wait here, since this only coveres the range between pool and the end of the function, where nothing expensive happens + let state = arc_self.state.compare_and_swap( + task_state::WAITING, + task_state::SCHEDULED, + Ordering::Acquire, + ); + match state { + task_state::WAITING => { + let cloned = arc_self.clone(); + arc_self.executor.execute(move || FunTask::run(cloned)); + return; + } + task_state::SCHEDULED => { + continue; + } + task_state::DONE => { + return; + } + _ => unreachable!("Invalid State!"), + } + } } } -// I'm making sure only one thread at a time has access to the contents here +//I'm making sure only one thread at a time has access to the contents here unsafe impl Sync for FunTask where E: Executor + Sync + 'static {} #[cfg(test)] From 39370d7367578613ed902d6990a80e5b35920d48 Mon Sep 17 00:00:00 2001 From: Lars Kroll Date: Thu, 4 Jun 2020 17:29:54 +0200 Subject: [PATCH 06/11] Remove unneeded imports and commented out stuff --- executors/src/futures_executor.rs | 118 +----------------------------- 1 file changed, 2 insertions(+), 116 deletions(-) diff --git a/executors/src/futures_executor.rs b/executors/src/futures_executor.rs index c0077d6..13e65da 100644 --- a/executors/src/futures_executor.rs +++ b/executors/src/futures_executor.rs @@ -15,8 +15,8 @@ use std::{ cell::UnsafeCell, future::Future, sync::{ - atomic::{AtomicBool, AtomicUsize, Ordering}, - Arc, Mutex, + atomic::{AtomicUsize, Ordering}, + Arc, }, task::{Context, Poll}, }; @@ -47,120 +47,6 @@ where } } -// pub(crate) struct FunTask -// where -// E: Executor + Sync + 'static, -// { -// future: Mutex>>, -// executor: E, -// } - -// impl FunTask -// where -// E: Executor + Sync + 'static, -// { -// fn run(task: Arc) -> () { -// // let res = unsafe { -// // let src = task.future.get(); -// // src.as_mut().map(|opt| opt.take()).flatten() -// // }; -// let mut guard = task.future.lock().unwrap(); -// if let Some(mut f) = guard.take() { -// let waker = waker_ref(&task); -// let context = &mut Context::from_waker(&*waker); -// // `BoxFuture` is a type alias for -// // `Pin + Send + 'static>>`. -// // We can get a `Pin<&mut dyn Future + Send + 'static>` -// // from it by calling the `Pin::as_mut` method. -// if let Poll::Pending = f.as_mut().poll(context) { -// // We're not done processing the future, so put it -// // back in its task to be run again in the future. -// // unsafe { -// // let dst = task.future.get(); -// // if let Some(slot) = dst.as_mut() { -// // *slot = Some(f); -// // } -// // } -// *guard = Some(f); -// } -// } // else the future is already done with -// } -// } - -// impl ArcWake for FunTask -// where -// E: Executor + Sync + 'static, -// { -// fn wake_by_ref(arc_self: &Arc) { -// // Implement `wake` by sending this task back onto the task channel -// // so that it will be polled again by the executor. -// let cloned = arc_self.clone(); -// arc_self.executor.execute(move || FunTask::run(cloned)); -// } -// } - -// pub(crate) struct FunTask -// where -// E: Executor + Sync + 'static, -// { -// future: UnsafeCell>>, -// scheduled: AtomicBool, -// executor: E, -// } - -// impl FunTask -// where -// E: Executor + Sync + 'static, -// { -// fn run(task: Arc) -> () { -// let res = unsafe { -// let src = task.future.get(); -// src.as_mut().map(|opt| opt.take()).flatten() -// }; -// if let Some(mut f) = res { -// let waker = waker_ref(&task); -// let context = &mut Context::from_waker(&*waker); -// // `BoxFuture` is a type alias for -// // `Pin + Send + 'static>>`. -// // We can get a `Pin<&mut dyn Future + Send + 'static>` -// // from it by calling the `Pin::as_mut` method. -// if let Poll::Pending = f.as_mut().poll(context) { -// // We're not done processing the future, so put it -// // back in its task to be run again in the future. -// unsafe { -// let dst = task.future.get(); -// if let Some(slot) = dst.as_mut() { -// *slot = Some(f); -// } -// } -// } -// } else { -// // else the future is already done with -// //unreachable!("Shouldn't get here!"); -// //eprintln!("Future got schedulled despite being done!"); -// } -// task.scheduled.store(false, Ordering::Release); -// } -// } - -// impl ArcWake for FunTask -// where -// E: Executor + Sync + 'static, -// { -// fn wake_by_ref(arc_self: &Arc) { -// // Implement `wake` by sending this task back onto the task channel -// // so that it will be polled again by the executor. -// while arc_self -// .scheduled -// .compare_and_swap(false, true, Ordering::Acquire) -// { -// // hot wait here, since this only coveres the range between pool and the end of the function, where nothing expensive happens -// } -// let cloned = arc_self.clone(); -// arc_self.executor.execute(move || FunTask::run(cloned)); -// } -// } - pub(crate) struct FunTask where E: Executor + Sync + 'static, From c00e49a5d6ecd7d804b669b4e4656f3c361f2390 Mon Sep 17 00:00:00 2001 From: Lars Kroll Date: Thu, 18 Jun 2020 15:08:34 +0200 Subject: [PATCH 07/11] add thread local execute variant to most executors --- executors/benches/scheduler.rs | 12 +++- executors/src/common.rs | 17 +++++- executors/src/crossbeam_channel_pool.rs | 55 +++++++++++------ executors/src/crossbeam_workstealing_pool.rs | 63 ++++++++++++++------ executors/src/lib.rs | 60 ++++++++++++++++++- executors/src/locals.rs | 59 ++++++++++++++++++ executors/src/run_now.rs | 35 ++++++++++- executors/src/threadpool_executor.rs | 21 ++++++- 8 files changed, 276 insertions(+), 46 deletions(-) create mode 100644 executors/src/locals.rs diff --git a/executors/benches/scheduler.rs b/executors/benches/scheduler.rs index d7e0ecf..0e58a3f 100644 --- a/executors/benches/scheduler.rs +++ b/executors/benches/scheduler.rs @@ -21,6 +21,7 @@ pub fn chained_spawn_benchmark(c: &mut Criterion) { ); group.bench_function("Async CBWP", chained_spawn::async_cbwp); // CBCP + group.bench_function("Local Function CBCP", chained_spawn::local_function_cbcp); group.bench_function( "No-Local Function CBCP", chained_spawn::no_local_function_cbcp, @@ -33,10 +34,10 @@ pub fn spawn_many_benchmark(c: &mut Criterion) { let mut group = c.benchmark_group("Spawn Many"); group.throughput(Throughput::Elements(SPAWN_NUM as u64)); // CBWP - group.bench_function("Local Function CBWP", spawn_many::function_cbwp); + group.bench_function("Function CBWP", spawn_many::function_cbwp); group.bench_function("Async CBWP", spawn_many::async_cbwp); // CBCP - group.bench_function("Local Function CBCP", spawn_many::function_cbcp); + group.bench_function("Function CBCP", spawn_many::function_cbcp); group.bench_function("Async CBCP", spawn_many::async_cbcp); group.finish(); } @@ -81,6 +82,11 @@ mod chained_spawn { chained_spawn_async(b, rt); } + pub fn local_function_cbcp(b: &mut Bencher) { + let rt = cbcp_rt(); + chained_spawn_fun(b, rt); + } + pub fn no_local_function_cbcp(b: &mut Bencher) { let rt = cbcp_rt(); chained_spawn_no_local(b, rt); @@ -96,7 +102,7 @@ mod chained_spawn { if n == 0 { done_tx.send(()).unwrap(); } else { - let _ = executors::crossbeam_workstealing_pool::execute_locally(move || { + let _ = executors::try_execute_locally(move || { iter(done_tx, n - 1); }); } diff --git a/executors/src/common.rs b/executors/src/common.rs index 13d1afe..fcc207a 100644 --- a/executors/src/common.rs +++ b/executors/src/common.rs @@ -8,11 +8,21 @@ use std::fmt::Debug; +/// A minimal trait for task executors. +/// +/// This is mostly useful as a narrowed view for dynamic executor references. +pub trait CanExecute { + /// Executes the function `job` on the `Executor`. + /// + /// This is the same as [execute](Executor::execute), but already boxed up. + fn execute_job(&self, job: Box); +} + /// A common trait for task executors. /// /// All implementations need to allow cloning to create new handles to /// the same executor, and they need to be safe to pass to threads. -pub trait Executor: Clone + Send { +pub trait Executor: CanExecute + Clone + Send { /// Executes the function `job` on the `Executor`. /// /// # Examples @@ -33,7 +43,10 @@ pub trait Executor: Clone + Send { /// ``` fn execute(&self, job: F) where - F: FnOnce() + Send + 'static; + F: FnOnce() + Send + 'static, + { + self.execute_job(Box::new(job)); + } /// Shutdown the `Executor` without waiting. /// diff --git a/executors/src/crossbeam_channel_pool.rs b/executors/src/crossbeam_channel_pool.rs index 6ffe489..78569f2 100644 --- a/executors/src/crossbeam_channel_pool.rs +++ b/executors/src/crossbeam_channel_pool.rs @@ -170,21 +170,20 @@ impl Default for ThreadPool { } } -impl Executor for ThreadPool { - fn execute(&self, job: F) - where - F: FnOnce() + Send + 'static, - { +impl CanExecute for ThreadPool { + fn execute_job(&self, job: Box) { // NOTE: This check costs about 150k schedulings/s in a 2 by 2 experiment over 20 runs. if !self.shutdown.load(Ordering::SeqCst) { self.sender - .send(JobMsg::Job(Box::new(job))) + .send(JobMsg::Job(job)) .unwrap_or_else(|e| error!("Error submitting job: {:?}", e)); } else { warn!("Ignoring job as pool is shutting down."); } } +} +impl Executor for ThreadPool { fn shutdown_async(&self) { if !self .shutdown @@ -299,6 +298,15 @@ impl Drop for ThreadPoolCore { } } +struct ThreadLocalExecute(channel::Sender); +impl CanExecute for ThreadLocalExecute { + fn execute_job(&self, job: Box) { + self.0 + .send(JobMsg::Job(job)) + .unwrap_or_else(|e| error!("Error submitting Stop msg: {:?}", e)); + } +} + struct ThreadPoolWorker { id: usize, core: Weak>, @@ -322,33 +330,40 @@ impl ThreadPoolWorker { } fn run(&mut self) { debug!("CrossbeamWorker {} starting", self.id()); + let sender = { + let core = self.core.upgrade().expect("Core already shut down!"); + let guard = core.lock().expect("Mutex poisoned!"); + guard.sender.clone() + }; let sentinel = Sentinel::new(self.core.clone(), self.id); + set_local_executor(ThreadLocalExecute(sender)); while let Ok(msg) = self.recv.recv() { match msg { - JobMsg::Job(f) => f.call_box(), + JobMsg::Job(f) => f(), JobMsg::Stop(latch) => { ignore(latch.decrement()); break; } } } + unset_local_executor(); sentinel.cancel(); debug!("CrossbeamWorker {} shutting down", self.id()); } } -trait FnBox { - fn call_box(self: Box); -} +// trait FnBox { +// fn call_box(self: Box); +// } -impl FnBox for F { - fn call_box(self: Box) { - (*self)() - } -} +// impl FnBox for F { +// fn call_box(self: Box) { +// (*self)() +// } +// } enum JobMsg { - Job(Box), + Job(Box), Stop(Arc), } @@ -391,7 +406,7 @@ mod tests { use super::*; - const LABEL: &'static str = "Crossbeam Channel Pool"; + const LABEL: &str = "Crossbeam Channel Pool"; #[test] fn test_debug() { @@ -412,6 +427,12 @@ mod tests { crate::tests::test_defaults::(LABEL); } + #[test] + fn test_local() { + let exec = ThreadPool::default(); + crate::tests::test_local(exec, LABEL); + } + #[cfg(feature = "thread-pinning")] #[test] fn test_custom_affinity() { diff --git a/executors/src/crossbeam_workstealing_pool.rs b/executors/src/crossbeam_workstealing_pool.rs index e53a8a9..5eae9ed 100644 --- a/executors/src/crossbeam_workstealing_pool.rs +++ b/executors/src/crossbeam_workstealing_pool.rs @@ -333,17 +333,14 @@ impl Default for ThreadPool { //impl !Sync for ThreadPool {} -impl

Executor for ThreadPool

+impl

CanExecute for ThreadPool

where P: Parker + Clone + 'static, { - fn execute(&self, job: F) - where - F: FnOnce() + Send + 'static, - { + fn execute_job(&self, job: Box) { if !self.inner.shutdown.load(Ordering::Relaxed) { LOCAL_JOB_QUEUE.with(|qo| unsafe { - let msg = Job(Box::new(job)); + let msg = Job(job); match *qo.get() { Some(ref q) => { q.push(msg); @@ -360,7 +357,12 @@ where warn!("Ignoring job as pool is shutting down.") } } +} +impl

Executor for ThreadPool

+where + P: Parker + Clone + 'static, +{ fn shutdown_async(&self) { let shutdown_pool = self.clone(); // Need to make sure that pool is shut down before drop is called. @@ -536,6 +538,23 @@ impl ThreadPoolCore { } } +struct ThreadLocalExecute; +impl CanExecute for ThreadLocalExecute { + fn execute_job(&self, job: Box) { + LOCAL_JOB_QUEUE.with(|qo| unsafe { + let msg = Job(job); + match *qo.get() { + Some(ref q) => { + q.push(msg); + } + None => { + unreachable!("Local Executor was set but local job queue was not!"); + } + } + }) + } +} + #[derive(Debug)] struct ThreadPoolWorker

where @@ -610,6 +629,7 @@ where *q.get() = Some(worker); stealer }); + set_local_executor(ThreadLocalExecute); let core = self.core.upgrade().expect("Core shut down already!"); //info!("Setting up new worker {}. strong={}, weak={}", self.id(), Arc::strong_count(&core), Arc::weak_count(&core)); let local_stealer = JobStealer::new(local_stealer_raw, self.id); @@ -635,7 +655,7 @@ where match local_queue.pop() { Some(d) => { let Job(f) = d; - f.call_box(); + f(); } None => break 'local, } @@ -695,7 +715,7 @@ where }); if let deque::Steal::Success(msg) = glob_res { let Job(f) = msg; - f.call_box(); + f(); self.abort_sleep(&core, &backoff, &mut snoozing, &mut parking); continue 'main; } @@ -709,7 +729,7 @@ where for stealer in self.stealers.iter() { if let deque::Steal::Success(msg) = stealer.steal() { let Job(f) = msg; - f.call_box(); + f(); self.abort_sleep(&core, &backoff, &mut snoozing, &mut parking); continue 'main; // only steal once before checking locally again } @@ -741,6 +761,7 @@ where sentinel.cancel(); let core = self.core.upgrade().expect("Core shut down already!"); self.unregister_stealer(&core); + unset_local_executor(); LOCAL_JOB_QUEUE.with(|q| unsafe { *q.get() = None; }); @@ -767,15 +788,15 @@ where } } -trait FnBox { - fn call_box(self: Box); -} +// trait FnBox { +// fn call_box(self: Box); +// } -impl FnBox for F { - fn call_box(self: Box) { - (*self)() - } -} +// impl FnBox for F { +// fn call_box(self: Box) { +// (*self)() +// } +// } #[derive(Clone)] struct JobStealer { @@ -813,7 +834,7 @@ impl Debug for JobStealer { } } -struct Job(Box); +struct Job(Box); enum ControlMsg { Stealers(Vec), @@ -934,6 +955,12 @@ mod tests { crate::tests::test_custom(exec, LABEL); } + #[test] + fn test_local() { + let exec = ThreadPool::default(); + crate::tests::test_local(exec, LABEL); + } + #[cfg(feature = "thread-pinning")] #[test] fn test_custom_affinity() { diff --git a/executors/src/lib.rs b/executors/src/lib.rs index 493fb67..5ca2be4 100644 --- a/executors/src/lib.rs +++ b/executors/src/lib.rs @@ -31,7 +31,7 @@ pub mod threadpool_executor; mod timeconstants; use crate::common::ignore; -pub use crate::common::Executor; +pub use crate::common::{CanExecute, Executor}; #[cfg(feature = "futures-support")] pub mod futures_executor; @@ -39,12 +39,14 @@ pub mod futures_executor; //use bichannel::*; use synchronoise::CountdownEvent; -// TODO add default implementation for abstract executor impl with associated type executable +mod locals; +pub use locals::*; #[cfg(test)] pub(crate) mod tests { use super::*; use std::fmt::Debug; + use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -122,6 +124,31 @@ pub(crate) mod tests { assert_eq!(res, 0); } + pub fn test_local(exec: E, label: &str) + where + E: Executor + Debug + 'static, + { + let pool = exec; + + let latch = Arc::new(CountdownEvent::new(N_DEPTH * N_WIDTH)); + let failed = Arc::new(AtomicBool::new(false)); + for _ in 0..N_WIDTH { + let latch2 = latch.clone(); + let failed2 = failed.clone(); + pool.execute(move || { + do_step_local(latch2, failed2, N_DEPTH); + }); + } + let res = latch.wait_timeout(Duration::from_secs(30)); + assert_eq!(res, 0); + assert!( + !failed.load(Ordering::SeqCst), + "Executor does not support local scheduling!" + ); + pool.shutdown() + .unwrap_or_else(|e| error!("Error during pool shutdown {:?} at {}", e, label)); + } + fn do_step(latch: Arc, pool: E, depth: usize) where E: Executor + Debug + 'static, @@ -133,4 +160,33 @@ pub(crate) mod tests { pool.execute(move || do_step(latch, pool2, new_depth)) } } + + fn do_step_local(latch: Arc, failed: Arc, depth: usize) { + let new_depth = depth - 1; + match latch.decrement() { + Ok(_) => { + if (new_depth > 0) { + let failed2 = failed.clone(); + let latch2 = latch.clone(); + let res = + try_execute_locally(move || do_step_local(latch2, failed2, new_depth)); + if res.is_err() { + error!("do_step_local should have executed locally!"); + failed.store(true, Ordering::SeqCst); + while latch.decrement().is_ok() { + () // do nothing, just keep draining, so the main thread wakes up + } + } + } + } + Err(e) => { + if failed.load(Ordering::SeqCst) { + warn!("Aborting test as it failed"); + // and simply return here + } else { + panic!("Latch didn't decrement! Error: {:?}", e); + } + } + } + } } diff --git a/executors/src/locals.rs b/executors/src/locals.rs new file mode 100644 index 0000000..a0570b4 --- /dev/null +++ b/executors/src/locals.rs @@ -0,0 +1,59 @@ +use super::*; +use std::cell::UnsafeCell; + +// UnsafeCell has 10x the performance of RefCell +// and the scoping guarantees that the borrows are exclusive +thread_local!( + static LOCAL_EXECUTOR: UnsafeCell>> = UnsafeCell::new(Option::None); +); + +pub(crate) fn set_local_executor(executor: E) +where + E: CanExecute + 'static, +{ + LOCAL_EXECUTOR.with(move |e| unsafe { + *e.get() = Some(Box::new(executor)); + }); +} + +pub(crate) fn unset_local_executor() { + LOCAL_EXECUTOR.with(move |e| unsafe { + *e.get() = None; + }); +} + +/// Tries run the job on the same executor that spawned the thread running the job. +/// +/// This is not supported on all executor implementations, +/// and will return `Err` variant with the original job if it can't be scheduled. +pub fn try_execute_locally(f: F) -> Result<(), F> +where + F: FnOnce() + Send + 'static, +{ + LOCAL_EXECUTOR.with(move |e| { + if let Some(Some(ref boxed)) = unsafe { e.get().as_ref() } { + boxed.execute_job(Box::new(f)); + Ok(()) + } else { + Err(f) + } + }) +} + +// #[cfg(feature = "futures-support")] +// mod futures_locals { +// use super::*; +// use crate::futures_executor::FuturesExecutor; + +// // UnsafeCell has 10x the performance of RefCell +// // and the scoping guarantees that the borrows are exclusive +// thread_local!( +// static LOCAL_FUTURES_EXECUTOR: UnsafeCell>> = UnsafeCell::new(Option::None); +// ); + +// pub fn set_local_futures_executor(executor: E) where E: FuturesExecutor { +// LOCAL_FUTURES_EXECUTOR.with(move |e| unsafe { +// *e.get() = Some(Box::new(executor)); +// }); +// } +// } diff --git a/executors/src/run_now.rs b/executors/src/run_now.rs index 519e131..45ef390 100644 --- a/executors/src/run_now.rs +++ b/executors/src/run_now.rs @@ -23,7 +23,12 @@ //! exec.execute(|| println!("bar")); //! exec.shutdown(); //! ``` - +//! +//! # Note +//! +//! If you use [try_execute_locally](try_execute_locally) from within a job closure, +//! it will be the same as running recursively, so you may run of out stack space, eventually. +use super::*; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -32,7 +37,12 @@ pub struct RunNowExecutor { active: Arc, } -use super::*; +struct ThreadLocalRunNow; +impl CanExecute for ThreadLocalRunNow { + fn execute_job(&self, job: Box) { + job(); + } +} impl RunNowExecutor { pub fn new() -> RunNowExecutor { @@ -50,13 +60,26 @@ impl Default for RunNowExecutor { } } +impl CanExecute for RunNowExecutor { + fn execute_job(&self, job: Box) { + if self.active.load(Ordering::SeqCst) { + job(); + } else { + warn!("Ignoring job as pool is shutting down."); + } + } +} + impl Executor for RunNowExecutor { + // override default implementation to avoid boxing at all fn execute(&self, job: F) where F: FnOnce() + Send + 'static, { if self.active.load(Ordering::SeqCst) { + set_local_executor(ThreadLocalRunNow); job(); + unset_local_executor(); } else { warn!("Ignoring job as pool is shutting down."); } @@ -100,6 +123,14 @@ mod tests { crate::tests::test_defaults::(LABEL); } + // Will stack overflow. + // #[test] + // #[should_panic] + // fn test_local() { + // let exec = RunNowExecutor::new(); + // crate::tests::test_local(exec, LABEL); + // } + #[test] fn run_tasks() { let _ = env_logger::try_init(); diff --git a/executors/src/threadpool_executor.rs b/executors/src/threadpool_executor.rs index 8271120..85f501e 100644 --- a/executors/src/threadpool_executor.rs +++ b/executors/src/threadpool_executor.rs @@ -85,7 +85,18 @@ impl Default for ThreadPoolExecutor { } } +impl CanExecute for ThreadPoolExecutor { + fn execute_job(&self, job: Box) { + if self.active.load(Ordering::SeqCst) { + self.pool.execute(job); + } else { + warn!("Ignoring job as pool is shutting down."); + } + } +} + impl Executor for ThreadPoolExecutor { + // override this here instead of using the default implementation to avoid double boxing fn execute(&self, job: F) where F: FnOnce() + Send + 'static, @@ -119,12 +130,11 @@ impl Executor for ThreadPoolExecutor { #[cfg(test)] mod tests { - use env_logger; use super::*; use std::time::Duration; - const LABEL: &'static str = "Threadpool"; + const LABEL: &str = "Threadpool"; #[test] fn test_debug() { @@ -145,6 +155,13 @@ mod tests { crate::tests::test_defaults::(LABEL); } + #[test] + #[should_panic] // this executor does not actually support local scheduling + fn test_local() { + let exec = ThreadPoolExecutor::default(); + crate::tests::test_local(exec, LABEL); + } + #[test] fn run_with_two_threads() { let _ = env_logger::try_init(); From 95aa32f58ac95f0781d0898174a603728ec9837f Mon Sep 17 00:00:00 2001 From: Lars Kroll Date: Mon, 22 Jun 2020 16:56:09 +0200 Subject: [PATCH 08/11] Switching to using async-task crate internally --- executors/Cargo.toml | 12 +- executors/src/crossbeam_channel_pool.rs | 30 +++++ executors/src/crossbeam_workstealing_pool.rs | 79 +++++++++--- executors/src/futures_executor.rs | 120 +------------------ executors/src/lib.rs | 4 +- 5 files changed, 104 insertions(+), 141 deletions(-) diff --git a/executors/Cargo.toml b/executors/Cargo.toml index 93a42c7..26cb0b4 100644 --- a/executors/Cargo.toml +++ b/executors/Cargo.toml @@ -16,11 +16,11 @@ categories = ["concurrency", "asynchronous"] license = "MIT" [features] -default = ["threadpool-exec", "cb-channel-exec", "workstealing-exec", "ws-timed-fairness", "defaults", "futures-support"] +default = ["threadpool-exec", "cb-channel-exec", "workstealing-exec", "ws-timed-fairness", "defaults"] threadpool-exec = ["threadpool"] -cb-channel-exec = ["crossbeam-channel"] -workstealing-exec = ["crossbeam-channel", "crossbeam-deque", "rand", "crossbeam-utils"] +cb-channel-exec = ["crossbeam-channel", "async-task"] +workstealing-exec = ["crossbeam-channel", "crossbeam-deque", "rand", "crossbeam-utils", "async-task"] defaults = ["num_cpus"] # In the workstealing executor, check the global queues every 1ms @@ -32,8 +32,6 @@ ws-no-park = [] thread-pinning = ["core_affinity"] -futures-support = ["futures"] - [dependencies] log = "0.4" @@ -46,12 +44,14 @@ crossbeam-deque = {version = "0.7", optional = true} rand = {version = "0.7", optional = true} num_cpus = {version = "1.12", optional = true} core_affinity = {version = "0.5", optional = true} -futures = {version = "0.3", optional = true} +#futures = {version = "0.3", optional = true} +async-task = {version = "3.0", optional = true} [dev-dependencies] env_logger = "0.7" version-sync = "0.8" criterion = "0.3" +futures = "0.3" [badges] diff --git a/executors/src/crossbeam_channel_pool.rs b/executors/src/crossbeam_channel_pool.rs index 78569f2..ceb16c3 100644 --- a/executors/src/crossbeam_channel_pool.rs +++ b/executors/src/crossbeam_channel_pool.rs @@ -49,6 +49,7 @@ use super::*; use crossbeam_channel as channel; +use std::future::Future; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex, Weak}; use std::thread; @@ -158,6 +159,17 @@ impl ThreadPool { } pool } + + fn schedule_task(&self, task: async_task::Task<()>) -> () { + // NOTE: This check costs about 150k schedulings/s in a 2 by 2 experiment over 20 runs. + if !self.shutdown.load(Ordering::SeqCst) { + self.sender + .send(JobMsg::Task(task)) + .unwrap_or_else(|e| error!("Error submitting job: {:?}", e)); + } else { + warn!("Ignoring job as pool is shutting down."); + } + } } /// Create a thread pool with one thread per CPU. @@ -230,6 +242,20 @@ impl Executor for ThreadPool { } } +impl FuturesExecutor for ThreadPool { + fn spawn(&self, future: impl Future + 'static + Send) -> () { + let exec = self.clone(); + let (task, _handle) = async_task::spawn( + future, + move |task| { + exec.schedule_task(task); + }, + (), + ); + task.schedule(); + } +} + fn spawn_worker(core: Arc>) { let id = { let mut guard = core.lock().unwrap(); @@ -340,6 +366,9 @@ impl ThreadPoolWorker { while let Ok(msg) = self.recv.recv() { match msg { JobMsg::Job(f) => f(), + JobMsg::Task(t) => { + let _ = t.run(); + } JobMsg::Stop(latch) => { ignore(latch.decrement()); break; @@ -364,6 +393,7 @@ impl ThreadPoolWorker { enum JobMsg { Job(Box), + Task(async_task::Task<()>), Stop(Arc), } diff --git a/executors/src/crossbeam_workstealing_pool.rs b/executors/src/crossbeam_workstealing_pool.rs index 5eae9ed..ecf152c 100644 --- a/executors/src/crossbeam_workstealing_pool.rs +++ b/executors/src/crossbeam_workstealing_pool.rs @@ -64,18 +64,17 @@ //! ``` use super::*; +use crate::futures_executor::FuturesExecutor; +use crate::parker; +use crate::parker::{DynParker, ParkResult, Parker}; use crossbeam_channel as channel; use crossbeam_deque as deque; use crossbeam_utils::Backoff; -//use std::sync::mpsc; -use crate::parker; -use crate::parker::{DynParker, ParkResult, Parker}; -//use num_cpus; use rand::prelude::*; use std::cell::UnsafeCell; use std::collections::BTreeMap; use std::fmt::{Debug, Formatter}; -//use std::iter::{FromIterator, IntoIterator}; +use std::future::Future; use std::ops::Deref; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::{Arc, Mutex, Weak}; @@ -83,7 +82,6 @@ use std::thread; use std::time::Duration; #[cfg(feature = "ws-timed-fairness")] use std::time::Instant; -// use time; use std::vec::Vec; /// Creates a thread pool with support for up to 32 threads. @@ -166,7 +164,7 @@ where LOCAL_JOB_QUEUE.with(|qo| unsafe { match *qo.get() { Some(ref q) => { - let msg = Job(Box::new(job)); + let msg = Job::Function(Box::new(job)); q.push(msg); Ok(()) } @@ -318,6 +316,27 @@ where } pool } + + fn schedule_task(&self, task: async_task::Task<()>) -> () { + if !self.inner.shutdown.load(Ordering::Relaxed) { + LOCAL_JOB_QUEUE.with(|qo| unsafe { + let msg = Job::Task(task); + match *qo.get() { + Some(ref q) => { + q.push(msg); + } + None => { + debug!("Scheduling on global pool."); + self.inner.global_sender.push(msg); + #[cfg(not(feature = "ws-no-park"))] + self.inner.parker.unpark_one(); + } + } + }) + } else { + warn!("Ignoring job as pool is shutting down.") + } + } } /// Create a thread pool with one thread per CPU. @@ -340,7 +359,7 @@ where fn execute_job(&self, job: Box) { if !self.inner.shutdown.load(Ordering::Relaxed) { LOCAL_JOB_QUEUE.with(|qo| unsafe { - let msg = Job(job); + let msg = Job::Function(job); match *qo.get() { Some(ref q) => { q.push(msg); @@ -406,6 +425,23 @@ where } } +impl

FuturesExecutor for ThreadPool

+where + P: Parker + Clone + 'static, +{ + fn spawn(&self, future: impl Future + 'static + Send) -> () { + let exec = self.clone(); + let (task, _handle) = async_task::spawn( + future, + move |task| { + exec.schedule_task(task); + }, + (), + ); + task.schedule(); + } +} + impl

ThreadPoolInner

where P: Parker + Clone + 'static, @@ -542,7 +578,7 @@ struct ThreadLocalExecute; impl CanExecute for ThreadLocalExecute { fn execute_job(&self, job: Box) { LOCAL_JOB_QUEUE.with(|qo| unsafe { - let msg = Job(job); + let msg = Job::Function(job); match *qo.get() { Some(ref q) => { q.push(msg); @@ -654,8 +690,7 @@ where 'local: loop { match local_queue.pop() { Some(d) => { - let Job(f) = d; - f(); + d.run(); } None => break 'local, } @@ -714,8 +749,7 @@ where } }); if let deque::Steal::Success(msg) = glob_res { - let Job(f) = msg; - f(); + msg.run(); self.abort_sleep(&core, &backoff, &mut snoozing, &mut parking); continue 'main; } @@ -728,8 +762,7 @@ where // try to steal something! for stealer in self.stealers.iter() { if let deque::Steal::Success(msg) = stealer.steal() { - let Job(f) = msg; - f(); + msg.run(); self.abort_sleep(&core, &backoff, &mut snoozing, &mut parking); continue 'main; // only steal once before checking locally again } @@ -834,7 +867,21 @@ impl Debug for JobStealer { } } -struct Job(Box); +//struct Job(Box); +enum Job { + Function(Box), + Task(async_task::Task<()>), +} +impl Job { + fn run(self) { + match self { + Job::Function(f) => f(), + Job::Task(t) => { + let _ = t.run(); + } + } + } +} enum ControlMsg { Stealers(Vec), diff --git a/executors/src/futures_executor.rs b/executors/src/futures_executor.rs index 13e65da..61bf295 100644 --- a/executors/src/futures_executor.rs +++ b/executors/src/futures_executor.rs @@ -7,128 +7,11 @@ // except according to those terms. use super::*; -use futures::{ - future::{BoxFuture, FutureExt}, - task::{waker_ref, ArcWake}, -}; -use std::{ - cell::UnsafeCell, - future::Future, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, - task::{Context, Poll}, -}; - -mod task_state { - pub const WAITING: usize = 0; - pub const SCHEDULED: usize = 1; - pub const DONE: usize = 2; -} +use std::future::Future; pub trait FuturesExecutor: Executor + Sync + 'static { fn spawn(&self, future: impl Future + 'static + Send) -> (); } -impl FuturesExecutor for E -where - E: Executor + Sync + 'static, -{ - fn spawn(&self, future: impl Future + 'static + Send) -> () { - let future = future.boxed(); - let task = FunTask { - future: UnsafeCell::new(Some(future)), - //future: Mutex::new(Some(future)), - state: AtomicUsize::new(task_state::SCHEDULED), - executor: self.clone(), - }; - let task = Arc::new(task); - self.execute(move || FunTask::run(task)); - } -} - -pub(crate) struct FunTask -where - E: Executor + Sync + 'static, -{ - future: UnsafeCell>>, - state: AtomicUsize, - executor: E, -} - -impl FunTask -where - E: Executor + Sync + 'static, -{ - fn run(task: Arc) -> () { - let res = unsafe { - let src = task.future.get(); - src.as_mut().map(|opt| opt.take()).flatten() - }; - if let Some(mut f) = res { - let waker = waker_ref(&task); - let context = &mut Context::from_waker(&*waker); - // `BoxFuture` is a type alias for - // `Pin + Send + 'static>>`. - // We can get a `Pin<&mut dyn Future + Send + 'static>` - // from it by calling the `Pin::as_mut` method. - if let Poll::Pending = f.as_mut().poll(context) { - // We're not done processing the future, so put it - // back in its task to be run again in the future. - unsafe { - let dst = task.future.get(); - if let Some(slot) = dst.as_mut() { - *slot = Some(f); - } - } - task.state.store(task_state::WAITING, Ordering::Release); - } else { - task.state.store(task_state::DONE, Ordering::Release); - } - } else { - // else the future is already done with - //unreachable!("Shouldn't get here!"); - //eprintln!("Future got schedulled despite being done!"); - task.state.store(task_state::DONE, Ordering::Release); - } - } -} - -impl ArcWake for FunTask -where - E: Executor + Sync + 'static, -{ - fn wake_by_ref(arc_self: &Arc) { - // Implement `wake` by sending this task back onto the task channel - // so that it will be polled again by the executor. - - loop { - // hot wait here, since this only coveres the range between pool and the end of the function, where nothing expensive happens - let state = arc_self.state.compare_and_swap( - task_state::WAITING, - task_state::SCHEDULED, - Ordering::Acquire, - ); - match state { - task_state::WAITING => { - let cloned = arc_self.clone(); - arc_self.executor.execute(move || FunTask::run(cloned)); - return; - } - task_state::SCHEDULED => { - continue; - } - task_state::DONE => { - return; - } - _ => unreachable!("Invalid State!"), - } - } - } -} - -//I'm making sure only one thread at a time has access to the contents here -unsafe impl Sync for FunTask where E: Executor + Sync + 'static {} #[cfg(test)] mod tests { @@ -136,6 +19,7 @@ mod tests { use futures::channel::oneshot::*; use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::Arc; use std::thread; use std::time::Duration; diff --git a/executors/src/lib.rs b/executors/src/lib.rs index 5ca2be4..4eb1050 100644 --- a/executors/src/lib.rs +++ b/executors/src/lib.rs @@ -33,8 +33,10 @@ mod timeconstants; use crate::common::ignore; pub use crate::common::{CanExecute, Executor}; -#[cfg(feature = "futures-support")] +#[cfg(any(feature = "cb-channel-exec", feature = "workstealing-exec"))] pub mod futures_executor; +#[cfg(any(feature = "cb-channel-exec", feature = "workstealing-exec"))] +pub use crate::futures_executor::FuturesExecutor; //use bichannel::*; use synchronoise::CountdownEvent; From cf769702b84106c77020d2a263d90d7a44a23fb0 Mon Sep 17 00:00:00 2001 From: Lars Kroll Date: Mon, 22 Jun 2020 17:05:11 +0200 Subject: [PATCH 09/11] prevent unsafe drops in schedule_task --- executors/src/crossbeam_channel_pool.rs | 14 ++++----- executors/src/crossbeam_workstealing_pool.rs | 32 +++++++++----------- 2 files changed, 21 insertions(+), 25 deletions(-) diff --git a/executors/src/crossbeam_channel_pool.rs b/executors/src/crossbeam_channel_pool.rs index ceb16c3..cbb023e 100644 --- a/executors/src/crossbeam_channel_pool.rs +++ b/executors/src/crossbeam_channel_pool.rs @@ -161,14 +161,12 @@ impl ThreadPool { } fn schedule_task(&self, task: async_task::Task<()>) -> () { - // NOTE: This check costs about 150k schedulings/s in a 2 by 2 experiment over 20 runs. - if !self.shutdown.load(Ordering::SeqCst) { - self.sender - .send(JobMsg::Task(task)) - .unwrap_or_else(|e| error!("Error submitting job: {:?}", e)); - } else { - warn!("Ignoring job as pool is shutting down."); - } + // schedule tasks always, even if the pool is already stopped, since it's unsafe to drop from the schedule function + // this might lead to some "memory leaks" if an executor remains stopped but allocated for a long time + + self.sender + .send(JobMsg::Task(task)) + .unwrap_or_else(|e| error!("Error submitting job: {:?}", e)); } } diff --git a/executors/src/crossbeam_workstealing_pool.rs b/executors/src/crossbeam_workstealing_pool.rs index ecf152c..28e8f62 100644 --- a/executors/src/crossbeam_workstealing_pool.rs +++ b/executors/src/crossbeam_workstealing_pool.rs @@ -318,24 +318,22 @@ where } fn schedule_task(&self, task: async_task::Task<()>) -> () { - if !self.inner.shutdown.load(Ordering::Relaxed) { - LOCAL_JOB_QUEUE.with(|qo| unsafe { - let msg = Job::Task(task); - match *qo.get() { - Some(ref q) => { - q.push(msg); - } - None => { - debug!("Scheduling on global pool."); - self.inner.global_sender.push(msg); - #[cfg(not(feature = "ws-no-park"))] - self.inner.parker.unpark_one(); - } + // schedule tasks always, even if the pool is already stopped, since it's unsafe to drop from the schedule function + // this might lead to some "memory leaks" if an executor remains stopped but allocated for a long time + LOCAL_JOB_QUEUE.with(|qo| unsafe { + let msg = Job::Task(task); + match *qo.get() { + Some(ref q) => { + q.push(msg); } - }) - } else { - warn!("Ignoring job as pool is shutting down.") - } + None => { + debug!("Scheduling on global pool."); + self.inner.global_sender.push(msg); + #[cfg(not(feature = "ws-no-park"))] + self.inner.parker.unpark_one(); + } + } + }) } } From 7edd216dbb00b18d49f13ca2857f48bdc18b5d0e Mon Sep 17 00:00:00 2001 From: Lars Kroll Date: Tue, 30 Jun 2020 13:03:22 +0200 Subject: [PATCH 10/11] Use JoinHandles for spawned results --- executors/src/crossbeam_channel_pool.rs | 8 +++- executors/src/crossbeam_workstealing_pool.rs | 8 +++- executors/src/futures_executor.rs | 39 +++++++++++++++++++- executors/src/lib.rs | 2 +- 4 files changed, 51 insertions(+), 6 deletions(-) diff --git a/executors/src/crossbeam_channel_pool.rs b/executors/src/crossbeam_channel_pool.rs index cbb023e..d5b6e0e 100644 --- a/executors/src/crossbeam_channel_pool.rs +++ b/executors/src/crossbeam_channel_pool.rs @@ -241,9 +241,12 @@ impl Executor for ThreadPool { } impl FuturesExecutor for ThreadPool { - fn spawn(&self, future: impl Future + 'static + Send) -> () { + fn spawn( + &self, + future: impl Future + 'static + Send, + ) -> JoinHandle { let exec = self.clone(); - let (task, _handle) = async_task::spawn( + let (task, handle) = async_task::spawn( future, move |task| { exec.schedule_task(task); @@ -251,6 +254,7 @@ impl FuturesExecutor for ThreadPool { (), ); task.schedule(); + handle } } diff --git a/executors/src/crossbeam_workstealing_pool.rs b/executors/src/crossbeam_workstealing_pool.rs index 28e8f62..241d482 100644 --- a/executors/src/crossbeam_workstealing_pool.rs +++ b/executors/src/crossbeam_workstealing_pool.rs @@ -427,9 +427,12 @@ impl

FuturesExecutor for ThreadPool

where P: Parker + Clone + 'static, { - fn spawn(&self, future: impl Future + 'static + Send) -> () { + fn spawn( + &self, + future: impl Future + 'static + Send, + ) -> JoinHandle { let exec = self.clone(); - let (task, _handle) = async_task::spawn( + let (task, handle) = async_task::spawn( future, move |task| { exec.schedule_task(task); @@ -437,6 +440,7 @@ where (), ); task.schedule(); + handle } } diff --git a/executors/src/futures_executor.rs b/executors/src/futures_executor.rs index 61bf295..7986fcd 100644 --- a/executors/src/futures_executor.rs +++ b/executors/src/futures_executor.rs @@ -9,8 +9,18 @@ use super::*; use std::future::Future; +/// A future that can be used to await the result of a spawned future +pub type JoinHandle = async_task::JoinHandle; + +/// A trait for spawning futures on an Executor pub trait FuturesExecutor: Executor + Sync + 'static { - fn spawn(&self, future: impl Future + 'static + Send) -> (); + /// Spawn `future` on the pool and return a handle to the result + /// + /// Handles can be awaited like any other future. + fn spawn( + &self, + future: impl Future + 'static + Send, + ) -> JoinHandle; } #[cfg(test)] @@ -100,4 +110,31 @@ mod tests { done = barrier.load(Ordering::SeqCst); } } + + #[test] + fn test_async_result_ccp() { + let exec = crate::crossbeam_channel_pool::ThreadPool::new(2); + test_async_result_executor(&exec); + exec.shutdown().expect("shutdown"); + } + + #[test] + fn test_async_result_cwp() { + let exec = crate::crossbeam_workstealing_pool::small_pool(2); + test_async_result_executor(&exec); + exec.shutdown().expect("shutdown"); + } + + fn test_async_result_executor(exec: &E) + where + E: FuturesExecutor, + { + let test_string = "test".to_string(); + let (tx, rx) = channel::(); + let handle = exec.spawn(async move { rx.await.expect("message") }); + thread::sleep(Duration::from_millis(100)); + tx.send(test_string.clone()).expect("sent"); + let res = futures::executor::block_on(handle); + assert_eq!(res, Some(test_string)) + } } diff --git a/executors/src/lib.rs b/executors/src/lib.rs index 4eb1050..c7e30b2 100644 --- a/executors/src/lib.rs +++ b/executors/src/lib.rs @@ -36,7 +36,7 @@ pub use crate::common::{CanExecute, Executor}; #[cfg(any(feature = "cb-channel-exec", feature = "workstealing-exec"))] pub mod futures_executor; #[cfg(any(feature = "cb-channel-exec", feature = "workstealing-exec"))] -pub use crate::futures_executor::FuturesExecutor; +pub use crate::futures_executor::{FuturesExecutor, JoinHandle}; //use bichannel::*; use synchronoise::CountdownEvent; From 9c260d2b3fd09e6e48553fc64c70997a33e57dcb Mon Sep 17 00:00:00 2001 From: Lars Kroll Date: Mon, 13 Jul 2020 14:22:55 +0200 Subject: [PATCH 11/11] Update docs and prepare for 0.7.0 release --- README.md | 6 ++-- executors/Cargo.toml | 2 +- executors/src/bichannel.rs | 32 ++++++++++++++++--- executors/src/common.rs | 2 ++ executors/src/crossbeam_channel_pool.rs | 3 ++ executors/src/futures_executor.rs | 21 +++++++++++++ executors/src/lib.rs | 3 +- executors/src/parker.rs | 42 +++++++++++++++++++++---- executors/src/run_now.rs | 9 +++++- executors/src/threadpool_executor.rs | 3 ++ 10 files changed, 107 insertions(+), 16 deletions(-) diff --git a/README.md b/README.md index 24cf7e3..a75c69e 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ Add this to your `Cargo.toml`: ```toml [dependencies] -executors = "0.6" +executors = "0.7" ``` You can use, for example, the [crossbeam_workstealing_pool](https://docs.rs/executors/latest/executors/crossbeam_workstealing_pool/index.html) to schedule a number `n_jobs` over a number `n_workers` threads, and collect the results via an `mpsc::channel`. @@ -59,7 +59,7 @@ You can enable support for pinning pool threads to particular CPU cores via the ## Some Numbers -The following are some example result from my desktop machine (Intel i7-4770 @ 3.40Ghz Quad-Core with HT (8 logical cores) with 16GB of RAM). +The following are some example results from my desktop machine (Intel i7-4770 @ 3.40Ghz Quad-Core with HT (8 logical cores) with 16GB of RAM). *Note* that they are all from a single run and thus not particularly scientific and subject to whatever else was going on on my system during the run. Implementation abbreviations: @@ -170,6 +170,6 @@ This corresponds to a very large internal workload in response to every external ## License -Licensed under the terms of MIT license. +Licensed under the terms of the MIT license. See [LICENSE](LICENSE) for details. diff --git a/executors/Cargo.toml b/executors/Cargo.toml index 26cb0b4..89be1e4 100644 --- a/executors/Cargo.toml +++ b/executors/Cargo.toml @@ -3,7 +3,7 @@ name = "executors" # NB: When modifying, also modify: # 1. html_root_url in lib.rs # 2. number in readme (for breaking changes) -version = "0.6.1" +version = "0.7.0" authors = ["Lars Kroll "] edition = "2018" description = "A collection of high-performance task executors." diff --git a/executors/src/bichannel.rs b/executors/src/bichannel.rs index a2cd2d0..aea0cc3 100644 --- a/executors/src/bichannel.rs +++ b/executors/src/bichannel.rs @@ -37,6 +37,10 @@ pub fn bichannel() -> (Endpoint, Endpoint (endpoint_left, endpoint_right) } +/// One end of a bichannel +/// +/// This end can send `In` type to the other end +/// and receive `Out` type from the other end. pub struct Endpoint { sender: Sender, receiver: Receiver, @@ -46,32 +50,52 @@ impl Endpoint { fn new(sender: Sender, receiver: Receiver) -> Endpoint { Endpoint { sender, receiver } } + + /// Send `t` to the other end + /// + /// This functions works just like [mpsc send](std::sync::mpsc::Sender::send). pub fn send(&self, t: Out) -> Result<(), SendError> { self.sender.send(t) } + + /// Receive something from the channel without blocking + /// + /// This functions works just like [mpsc try_recv](std::sync::mpsc::Receiver::try_recv). pub fn try_recv(&self) -> Result { self.receiver.try_recv() } + + /// Receive something from the channel, blocking until something is available + /// + /// This functions works just like [mpsc recv](std::sync::mpsc::Receiver::recv). pub fn recv(&self) -> Result { self.receiver.recv() } + + /// Receive something from the channel, blocking until something is available or the timeout expires + /// + /// This functions works just like [mpsc recv_timeout](std::sync::mpsc::Receiver::recv_timeout). pub fn recv_timeout(&self, timeout: Duration) -> Result { self.receiver.recv_timeout(timeout) } + + /// Iterate over incoming data + /// + /// This functions works just like [mpsc iter](std::sync::mpsc::Receiver::iter). pub fn iter(&self) -> Iter<'_, In> { self.receiver.iter() } + + /// Iterate over incoming data + /// + /// This functions works just like [mpsc try_iter](std::sync::mpsc::Receiver::try_iter). pub fn try_iter(&self) -> TryIter<'_, In> { self.receiver.try_iter() } } -//impl !Sync for Endpoint {} -unsafe impl Send for Endpoint {} - #[cfg(test)] mod tests { - use env_logger; use super::*; use crate::common::ignore; diff --git a/executors/src/common.rs b/executors/src/common.rs index fcc207a..1392690 100644 --- a/executors/src/common.rs +++ b/executors/src/common.rs @@ -6,6 +6,8 @@ // This file may not be copied, modified, or distributed // except according to those terms. +//! The core traits and reusable functions of this crate. + use std::fmt::Debug; /// A minimal trait for task executors. diff --git a/executors/src/crossbeam_channel_pool.rs b/executors/src/crossbeam_channel_pool.rs index d5b6e0e..82777c8 100644 --- a/executors/src/crossbeam_channel_pool.rs +++ b/executors/src/crossbeam_channel_pool.rs @@ -55,6 +55,9 @@ use std::sync::{Arc, Mutex, Weak}; use std::thread; use std::time::Duration; +/// A handle to a [crossbeam_channel_pool](crossbeam_channel_pool) +/// +/// See module level documentation for usage information. #[derive(Clone, Debug)] pub struct ThreadPool { core: Arc>, diff --git a/executors/src/futures_executor.rs b/executors/src/futures_executor.rs index 7986fcd..ad9b5e4 100644 --- a/executors/src/futures_executor.rs +++ b/executors/src/futures_executor.rs @@ -6,6 +6,11 @@ // This file may not be copied, modified, or distributed // except according to those terms. +//! Support for Rust's futures and async/await APIs +//! +//! This functionality is provided via the [FuturesExecutor](FuturesExecutor) +//! trait, which is implemented for all executors in this crate that can efficiently support it. + use super::*; use std::future::Future; @@ -17,6 +22,22 @@ pub trait FuturesExecutor: Executor + Sync + 'static { /// Spawn `future` on the pool and return a handle to the result /// /// Handles can be awaited like any other future. + /// + /// # Examples + /// + /// Execute an "expensive" computation on the pool and + /// block the main thread waiting for the result to become available. + /// + /// ``` + /// use executors::*; + /// use futures::executor::block_on; + /// # use executors::crossbeam_channel_pool::ThreadPool; + /// // initialise some executor + /// # let executor = ThreadPool::new(2); + /// let handle = executor.spawn(async move { 2*2 }); + /// let result = block_on(handle).expect("result"); + /// assert_eq!(4, result); + /// ``` fn spawn( &self, future: impl Future + 'static + Send, diff --git a/executors/src/lib.rs b/executors/src/lib.rs index c7e30b2..9ba61cf 100644 --- a/executors/src/lib.rs +++ b/executors/src/lib.rs @@ -5,7 +5,8 @@ // . // This file may not be copied, modified, or distributed // except according to those terms. -#![doc(html_root_url = "https://docs.rs/executors/0.6.1")] +#![doc(html_root_url = "https://docs.rs/executors/0.7.0")] +#![deny(missing_docs)] #![allow(unused_parens)] #![allow(clippy::unused_unit)] diff --git a/executors/src/parker.rs b/executors/src/parker.rs index 3b92684..8386881 100644 --- a/executors/src/parker.rs +++ b/executors/src/parker.rs @@ -6,6 +6,11 @@ // This file may not be copied, modified, or distributed // except according to those terms. +//! A reusable thread-pool-parking mechanism. +//! +//! This module is mostly meant for internal use within this crate, +//! but could be used to build custom thread-pools as well. + use arr_macro::arr; use std::collections::BTreeMap; use std::fmt; @@ -14,12 +19,17 @@ use std::sync::{Arc, Mutex}; use std::thread; use std::thread::Thread; +/// Get a parker for up to 32 threads pub fn small() -> StaticParker { StaticParker::new(SmallThreadData::new()) } + +/// Get a parker for up to 64 threads pub fn large() -> StaticParker { StaticParker::new(LargeThreadData::new()) } + +/// Get a parker for an unbounded number of threads pub fn dynamic() -> StaticParker { StaticParker::new(DynamicThreadData::new()) } @@ -29,16 +39,17 @@ pub fn dynamic() -> StaticParker { pub enum ParkResult { /// Simply retry parking later. /// Usually, indicates that implementation wanted avoid blocking on a lock. - /// The parker was left in the "prepared" state (s. [`prepare_park`]). + /// The parker was left in the "prepared" state (s. [prepare_park](Parker::prepare_park)). Retry, /// Recheck managed resource before retrying. - /// The parker was moved out of the "prepared" state (s. [`abort_park`]). + /// The parker was moved out of the "prepared" state (s. [abort_park](Parker::abort_park)). Abort, /// Thread parked and then was awoken via `unpark`. - /// The parker was moved out of the "prepared" state (s. [`abort_park`]). + /// The parker was moved out of the "prepared" state (s. [unpark_one](Parker::unpark_one) and [unpark_all](Parker::unpark_all)). Woken, } +/// The core trait that every parker implementation must provide pub trait Parker: Sync + Send + Clone { /// Maximum number of threads supported by this parker implementation. fn max_threads(&self) -> Option; @@ -50,15 +61,15 @@ pub trait Parker: Sync + Send + Clone { /// Prepare to go to sleep. /// You *must* call this before doing one last check on the resource being managed. - /// If the resource is available afterwards, you *must* call [`abort_park`]. - /// If it is not available afterwards, then call [`park`]. + /// If the resource is available afterwards, you *must* call [abort_park](Parker::abort_park). + /// If it is not available afterwards, then call [park](Parker::park). /// /// The provided `thread_id` must fit into the underlying parker implementation, or a panic will be issued! /// fn prepare_park(&self, thread_id: usize) -> (); /// Abort attempt at going to sleep. - /// You *must* call this if you have called [`prepare_park`], but then the resource became available. + /// You *must* call this if you have called [prepare_park](Parker::prepare_park), but then the resource became available. /// /// The provided `thread_id` must fit into the underlying parker implementation, or a panic will be issued! /// @@ -80,6 +91,7 @@ pub trait Parker: Sync + Send + Clone { fn unpark_all(&self) -> (); } +/// A trait-object-like wrapper around a concrete parker instance #[derive(Clone, Debug)] pub struct DynParker { inner: Arc, @@ -132,6 +144,9 @@ impl Parker for DynParker { } } +/// A concrete parker instance +/// +/// The generic parameter `T` implements the internal state management of the parker. #[derive(Debug)] pub struct StaticParker where @@ -214,6 +229,7 @@ where } } +/// A trait that manages the internal state of a parker implementation pub trait ThreadData: std::fmt::Debug { /// Maximum number of threads supported by this parker implementation. fn max_threads(&self) -> Option; @@ -249,12 +265,14 @@ enum ParkState { Waking, } +/// Parker state large enough for up to 32 threads #[derive(Debug)] pub struct SmallThreadData { sleep_set: AtomicU32, sleeping: Mutex<[ParkState; 32]>, } impl SmallThreadData { + /// The maximum number of threads supported by this parker pub const MAX_THREADS: usize = 32; fn new() -> SmallThreadData { @@ -386,11 +404,13 @@ impl ThreadData for SmallThreadData { } } +/// Parker state for up to 64 threads pub struct LargeThreadData { sleep_set: AtomicU64, sleeping: Mutex<[ParkState; 64]>, } impl LargeThreadData { + /// The maximum number of threads supported by this parker pub const MAX_THREADS: usize = 64; fn new() -> LargeThreadData { @@ -545,6 +565,16 @@ impl InnerData { } } +/// Park state for an unbounded number of threads +/// +/// Uses a dynamic datastructure internally +/// to keep track of the state for each thread. +/// +/// # Note +/// +/// Technically, since thread-ids are `usize`, +/// there still is an upper bound for the maximum +/// number of threads in a pool, i.e. `usize::MAX`. #[derive(Debug)] pub struct DynamicThreadData { sleep_count: AtomicUsize, diff --git a/executors/src/run_now.rs b/executors/src/run_now.rs index 45ef390..a8b6651 100644 --- a/executors/src/run_now.rs +++ b/executors/src/run_now.rs @@ -6,7 +6,10 @@ // This file may not be copied, modified, or distributed // except according to those terms. -//! A simple `Executor` that simply runs tasks on the current thread. +//! A simple `Executor` that simply runs tasks immediately on the current thread. +//! +//! This is mostly useful to work with APIs that require an `Executor`, +//! even if "normal" stack-based execution is actually desired. //! //! # Examples //! @@ -32,6 +35,9 @@ use super::*; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +/// A handle to the [run_now](run_now) executor +/// +/// See module level documentation for usage information. #[derive(Clone, Debug)] pub struct RunNowExecutor { active: Arc, @@ -45,6 +51,7 @@ impl CanExecute for ThreadLocalRunNow { } impl RunNowExecutor { + /// Create a new [run_now](run_now) executor pub fn new() -> RunNowExecutor { RunNowExecutor { active: Arc::new(AtomicBool::new(true)), diff --git a/executors/src/threadpool_executor.rs b/executors/src/threadpool_executor.rs index 85f501e..e6ad06e 100644 --- a/executors/src/threadpool_executor.rs +++ b/executors/src/threadpool_executor.rs @@ -43,6 +43,9 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use threadpool::ThreadPool; +/// A handle to a [threadpool_executor](threadpool_executor) +/// +/// See module level documentation for usage information. #[derive(Clone, Debug)] pub struct ThreadPoolExecutor { pool: ThreadPool,