From eeff4660e03a3a52f6f4b93a7ed76095d0b9b3e7 Mon Sep 17 00:00:00 2001 From: Lars Kroll Date: Wed, 10 Apr 2019 16:40:19 +0200 Subject: [PATCH] Implemented/derived Debug and Default for all provided executor implementations. Fixes #1 --- README.md | 25 ++++++++-- executors/Cargo.toml | 6 ++- executors/src/crossbeam_channel_pool.rs | 27 ++++++++++- executors/src/crossbeam_workstealing_pool.rs | 33 +++++++++++-- executors/src/lib.rs | 50 +++++++++++++++----- executors/src/run_now.rs | 25 +++++++++- executors/src/threadpool_executor.rs | 27 ++++++++++- 7 files changed, 167 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index 2b8327c..28b232f 100644 --- a/README.md +++ b/README.md @@ -16,21 +16,40 @@ Add this to your `Cargo.toml`: executors = "0.4" ``` -and this to your crate root: +You can use, for example, the [crossbeam_workstealing_pool](https://docs.rs/executors/latest/executors/crossbeam_workstealing_pool/index.html) to schedule a number `n_jobs` over a number `n_workers` threads, and collect the results via an `mpsc::channel`. ```rust -extern crate executors; +use executors::*; +use executors::crossbeam_workstealing_pool::ThreadPool; +use std::sync::mpsc::channel; + +let n_workers = 4; +let n_jobs = 8; +let pool = ThreadPool::new(n_workers); + +let (tx, rx) = channel(); +for _ in 0..n_jobs { + let tx = tx.clone(); + pool.execute(move|| { + tx.send(1).expect("channel will be there waiting for the pool"); + }); +} + +assert_eq!(rx.iter().take(n_jobs).fold(0, |a, b| a + b), 8); ``` ## Rust Version Requires at least Rust `1.26`, due to [crossbeam-channel](https://github.com/crossbeam-rs/crossbeam-channel) requirements. + ## Deciding on an Implementation To select an `Executor` implementation, it is best to test the exact requirements on the target hardware. The crate `executor-performance` provides a performance testing suite for the provided implementations. To use it *clone* this repository, run `cargo build --release`, and then check `target/release/executor-performance --help` to see the available options. There is also a small [script](executor-performance/threadinc.sh) to test thread-scaling with some reasonable default options. -If you don't know what hardware your code is going to run on, use the [crossbeam_workstealing_pool](https://docs.rs/executors/0.4.1/executors/crossbeam_workstealing_pool/index.html). It tends to perform best on all the hardware I have tested (which is pretty much Intel processors like i7 and Xeon). +In general, [crossbeam_workstealing_pool](https://docs.rs/executors/latest/executors/crossbeam_workstealing_pool/index.html) works best for workloads where the tasks on the worker threads spawn more and more tasks. If all tasks a spawned from a single thread that isn't part of the threadpool, then the [threadpool_executor](https://docs.rs/executors/latest/executors/threadpool_executor/index.html) tends perform best with single worker and [crossbeam_channel_pool](https://docs.rs/executors/latest/executors/crossbeam_channel_pool/index.html) performs best for a larger number of workers. + +If you don't know what hardware your code is going to run on, use the [crossbeam_workstealing_pool](https://docs.rs/executors/latest/executors/crossbeam_workstealing_pool/index.html). It tends to perform best on all the hardware I have tested (which is pretty much Intel processors like i7 and Xeon). ## License diff --git a/executors/Cargo.toml b/executors/Cargo.toml index e85ebe5..6fe7535 100644 --- a/executors/Cargo.toml +++ b/executors/Cargo.toml @@ -3,7 +3,7 @@ name = "executors" # NB: When modifying, also modify: # 1. html_root_url in lib.rs # 2. number in readme (for breaking changes) -version = "0.4.2" +version = "0.4.3" authors = ["Lars Kroll "] edition = "2018" description = "A collection of high-performance task executors." @@ -16,11 +16,12 @@ categories = ["concurrency", "asynchronous"] license = "MIT" [features] -default = ["threadpool-exec", "cb-channel-exec", "workstealing-exec", "ws-timed-fairness"] +default = ["threadpool-exec", "cb-channel-exec", "workstealing-exec", "ws-timed-fairness", "defaults"] threadpool-exec = ["threadpool"] cb-channel-exec = ["crossbeam-channel"] workstealing-exec = ["crossbeam-channel", "crossbeam-deque", "rand"] +defaults = ["num_cpus"] # In the workstealing executor, check the global queues every 1ms ws-timed-fairness = ["time"] @@ -35,6 +36,7 @@ threadpool = {version = "1.7", optional = true} crossbeam-deque = {version = "0.7", optional = true} time = {version = "0.1", optional = true} rand = {version = "0.6", optional = true} +num_cpus = {version = "1.10", optional = true} [dev-dependencies] env_logger = "0.6" diff --git a/executors/src/crossbeam_channel_pool.rs b/executors/src/crossbeam_channel_pool.rs index 516d6d4..6eff782 100644 --- a/executors/src/crossbeam_channel_pool.rs +++ b/executors/src/crossbeam_channel_pool.rs @@ -54,7 +54,7 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use std::thread; -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct ThreadPool { core: Arc>, sender: channel::Sender, @@ -102,7 +102,15 @@ impl ThreadPool { } } -//impl !Sync for ThreadPool {} +/// Create a thread pool with one thread per CPU. +/// On machines with hyperthreading, +/// this will create one thread per hyperthread. +#[cfg(feature = "defaults")] +impl Default for ThreadPool { + fn default() -> Self { + ThreadPool::new(num_cpus::get()) + } +} impl Executor for ThreadPool { fn execute(&self, job: F) @@ -174,6 +182,7 @@ fn spawn_worker(core: Arc>) { thread::spawn(move || worker.run()); } +#[derive(Debug)] struct ThreadPoolCore { sender: channel::Sender, receiver: channel::Receiver, @@ -312,6 +321,20 @@ mod tests { use super::*; + + const LABEL: &'static str = "Crossbeam Channel Pool"; + + #[test] + fn test_debug() { + let exec = ThreadPool::new(2); + crate::tests::test_debug(&exec, LABEL); + } + + #[test] + fn test_defaults() { + crate::tests::test_defaults::(LABEL); + } + #[test] fn run_with_two_threads() { let _ = env_logger::try_init(); diff --git a/executors/src/crossbeam_workstealing_pool.rs b/executors/src/crossbeam_workstealing_pool.rs index cef5283..2b78242 100644 --- a/executors/src/crossbeam_workstealing_pool.rs +++ b/executors/src/crossbeam_workstealing_pool.rs @@ -18,8 +18,8 @@ //! standard [threadpool](https://crates.io/crates/threadpool), but the //! implementation runs faster, especially with multiple workers. //! -//! Uses a per-thread local queues -//! (based on [croosbeam-deque](https://crates.io/crates/crossbeam-deque)) +//! Uses per-thread local queues +//! (based on [crossbeam-deque](https://crates.io/crates/crossbeam-deque)) //! for internal scheduling (like a fork-join-pool) and a global queue //! (based on [crossbeam-channel](https://crates.io/crates/crossbeam-channel)) //! for scheduling from external (non-worker) threads. @@ -79,6 +79,7 @@ use std::sync::{Arc, Mutex, Weak}; use std::thread; use std::time::Duration; use std::vec::Vec; +use num_cpus; #[cfg(feature = "ws-timed-fairness")] use time; @@ -95,7 +96,7 @@ thread_local!( static LOCAL_JOB_QUEUE: UnsafeCell>> = UnsafeCell::new(Option::None); ); -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct ThreadPool { core: Arc>, global_sender: Arc>, @@ -141,6 +142,16 @@ impl ThreadPool { } } +/// Create a thread pool with one thread per CPU. +/// On machines with hyperthreading, +/// this will create one thread per hyperthread. +#[cfg(feature = "defaults")] +impl Default for ThreadPool { + fn default() -> Self { + ThreadPool::new(num_cpus::get()) + } +} + //impl !Sync for ThreadPool {} impl Executor for ThreadPool { @@ -216,11 +227,13 @@ impl Executor for ThreadPool { // }); //} +#[derive(Debug)] struct WorkerEntry { control: channel::Sender, stealer: Option, } +#[derive(Debug)] struct ThreadPoolCore { global_injector: Arc>, shutdown: Arc, @@ -312,6 +325,7 @@ impl Drop for ThreadPoolCore { } } +#[derive(Debug)] struct ThreadPoolWorker { id: u64, core: Weak>, @@ -604,6 +618,19 @@ mod tests { use super::*; + const LABEL: &'static str = "Workstealing Pool"; + + #[test] + fn test_debug() { + let exec = ThreadPool::new(2); + crate::tests::test_debug(&exec, LABEL); + } + + #[test] + fn test_defaults() { + crate::tests::test_defaults::(LABEL); + } + #[test] fn run_with_two_threads() { let _ = env_logger::try_init(); diff --git a/executors/src/lib.rs b/executors/src/lib.rs index 41f196a..bf0bd0c 100644 --- a/executors/src/lib.rs +++ b/executors/src/lib.rs @@ -5,7 +5,7 @@ // . // This file may not be copied, modified, or distributed // except according to those terms. -#![doc(html_root_url = "https://docs.rs/executors/0.4.2")] +#![doc(html_root_url = "https://docs.rs/executors/0.4.3")] #![allow(unused_parens)] //! This crate provides a number of task executors all implementing the @@ -14,17 +14,6 @@ //! General examples can be found in the [`Executor`](common/trait.Executor.html) trait //! documentation, and implementation specific examples with each implementation module. -#[cfg(any(feature = "cb-channel-exec", feature = "workstealing-exec"))] -extern crate crossbeam_channel; -#[cfg(feature = "workstealing-exec")] -extern crate crossbeam_deque; - -#[cfg(feature = "threadpool-exec")] -extern crate threadpool; -#[cfg(feature = "ws-timed-fairness")] -extern crate time; -#[cfg(feature = "workstealing-exec")] -extern crate rand; #[macro_use] extern crate log; @@ -46,5 +35,40 @@ use crate::common::ignore; use synchronoise::CountdownEvent; #[cfg(test)] -mod tests { +pub(crate) mod tests { + use super::*; + use std::sync::Arc; + use std::time::Duration; + + pub const N_DEPTH: usize = 32; + pub const N_WIDTH: usize = 8; + + pub fn test_debug(exec: &E, label: &str) where E: Executor+std::fmt::Debug { + println!("Debug output for {}: {:?}", label, exec); + } + + pub fn test_defaults(label: &str) where E: Executor+std::default::Default+'static { + let pool = E::default(); + + let latch = Arc::new(CountdownEvent::new(N_DEPTH * N_WIDTH)); + for _ in 0..N_WIDTH { + let pool2 = pool.clone(); + let latch2 = latch.clone(); + pool.execute(move || { + do_step(latch2, pool2, N_DEPTH); + }); + } + let res = latch.wait_timeout(Duration::from_secs(5)); + assert_eq!(res, 0); + pool.shutdown().unwrap_or_else(|e| error!("Error during pool shutdown {:?} at {}",e, label)); + } + + fn do_step(latch: Arc, pool: E, depth: usize) where E: Executor+'static { + let new_depth = depth - 1; + ignore(latch.decrement()); + if (new_depth > 0) { + let pool2 = pool.clone(); + pool.execute(move || {do_step(latch, pool2, new_depth)}) + } + } } diff --git a/executors/src/run_now.rs b/executors/src/run_now.rs index ad81b72..816601d 100644 --- a/executors/src/run_now.rs +++ b/executors/src/run_now.rs @@ -27,7 +27,7 @@ use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct RunNowExecutor { active: Arc, } @@ -40,6 +40,14 @@ impl RunNowExecutor { } } +/// Create an executor running tasks on the invoking thread. +#[cfg(feature = "defaults")] +impl Default for RunNowExecutor { + fn default() -> Self { + RunNowExecutor::new() + } +} + impl Executor for RunNowExecutor { fn execute(&self, job: F) where @@ -77,6 +85,21 @@ mod tests { use super::*; use std::time::Duration; + const LABEL: &'static str = "Run Now"; + + + #[test] + fn test_debug() { + let exec = RunNowExecutor::new(); + crate::tests::test_debug(&exec, LABEL); + } + + #[test] + fn test_defaults() { + crate::tests::test_defaults::(LABEL); + } + + #[test] fn run_tasks() { let _ = env_logger::try_init(); diff --git a/executors/src/threadpool_executor.rs b/executors/src/threadpool_executor.rs index 7b33515..ee77f69 100644 --- a/executors/src/threadpool_executor.rs +++ b/executors/src/threadpool_executor.rs @@ -43,7 +43,7 @@ use threadpool::ThreadPool; use std::sync::Arc; use std::sync::atomic::{AtomicBool, Ordering}; -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct ThreadPoolExecutor { pool: ThreadPool, active: Arc, @@ -75,6 +75,16 @@ impl ThreadPoolExecutor { } } +/// Create a thread pool with one thread per CPU. +/// On machines with hyperthreading, +/// this will create one thread per hyperthread. +#[cfg(feature = "defaults")] +impl Default for ThreadPoolExecutor { + fn default() -> Self { + ThreadPoolExecutor::new(num_cpus::get()) + } +} + impl Executor for ThreadPoolExecutor { fn execute(&self, job: F) where @@ -114,6 +124,19 @@ mod tests { use super::*; use std::time::Duration; + const LABEL: &'static str = "Threadpool"; + + #[test] + fn test_debug() { + let exec = ThreadPoolExecutor::new(2); + crate::tests::test_debug(&exec, LABEL); + } + + #[test] + fn test_defaults() { + crate::tests::test_defaults::(LABEL); + } + #[test] fn run_with_two_threads() { let _ = env_logger::try_init(); @@ -181,4 +204,4 @@ mod tests { let res = latch.wait_timeout(Duration::from_secs(1)); assert_eq!(res, 1); } -} \ No newline at end of file +}