Skip to content

Commit

Permalink
Implemented/derived Debug and Default for all provided executor imple…
Browse files Browse the repository at this point in the history
…mentations.

Fixes #1
  • Loading branch information
Bathtor committed Apr 10, 2019
1 parent e9c5f16 commit eeff466
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 26 deletions.
25 changes: 22 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,40 @@ Add this to your `Cargo.toml`:
executors = "0.4"
```

and this to your crate root:
You can use, for example, the [crossbeam_workstealing_pool](https://docs.rs/executors/latest/executors/crossbeam_workstealing_pool/index.html) to schedule a number `n_jobs` over a number `n_workers` threads, and collect the results via an `mpsc::channel`.

```rust
extern crate executors;
use executors::*;
use executors::crossbeam_workstealing_pool::ThreadPool;
use std::sync::mpsc::channel;

let n_workers = 4;
let n_jobs = 8;
let pool = ThreadPool::new(n_workers);

let (tx, rx) = channel();
for _ in 0..n_jobs {
let tx = tx.clone();
pool.execute(move|| {
tx.send(1).expect("channel will be there waiting for the pool");
});
}

assert_eq!(rx.iter().take(n_jobs).fold(0, |a, b| a + b), 8);
```

## Rust Version

Requires at least Rust `1.26`, due to [crossbeam-channel](https://github.com/crossbeam-rs/crossbeam-channel) requirements.

## Deciding on an Implementation

To select an `Executor` implementation, it is best to test the exact requirements on the target hardware.
The crate `executor-performance` provides a performance testing suite for the provided implementations. To use it *clone* this repository, run `cargo build --release`, and then check `target/release/executor-performance --help` to see the available options. There is also a small [script](executor-performance/threadinc.sh) to test thread-scaling with some reasonable default options.

If you don't know what hardware your code is going to run on, use the [crossbeam_workstealing_pool](https://docs.rs/executors/0.4.1/executors/crossbeam_workstealing_pool/index.html). It tends to perform best on all the hardware I have tested (which is pretty much Intel processors like i7 and Xeon).
In general, [crossbeam_workstealing_pool](https://docs.rs/executors/latest/executors/crossbeam_workstealing_pool/index.html) works best for workloads where the tasks on the worker threads spawn more and more tasks. If all tasks a spawned from a single thread that isn't part of the threadpool, then the [threadpool_executor](https://docs.rs/executors/latest/executors/threadpool_executor/index.html) tends perform best with single worker and [crossbeam_channel_pool](https://docs.rs/executors/latest/executors/crossbeam_channel_pool/index.html) performs best for a larger number of workers.

If you don't know what hardware your code is going to run on, use the [crossbeam_workstealing_pool](https://docs.rs/executors/latest/executors/crossbeam_workstealing_pool/index.html). It tends to perform best on all the hardware I have tested (which is pretty much Intel processors like i7 and Xeon).

## License

Expand Down
6 changes: 4 additions & 2 deletions executors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "executors"
# NB: When modifying, also modify:
# 1. html_root_url in lib.rs
# 2. number in readme (for breaking changes)
version = "0.4.2"
version = "0.4.3"
authors = ["Lars Kroll <[email protected]>"]
edition = "2018"
description = "A collection of high-performance task executors."
Expand All @@ -16,11 +16,12 @@ categories = ["concurrency", "asynchronous"]
license = "MIT"

[features]
default = ["threadpool-exec", "cb-channel-exec", "workstealing-exec", "ws-timed-fairness"]
default = ["threadpool-exec", "cb-channel-exec", "workstealing-exec", "ws-timed-fairness", "defaults"]

threadpool-exec = ["threadpool"]
cb-channel-exec = ["crossbeam-channel"]
workstealing-exec = ["crossbeam-channel", "crossbeam-deque", "rand"]
defaults = ["num_cpus"]

# In the workstealing executor, check the global queues every 1ms
ws-timed-fairness = ["time"]
Expand All @@ -35,6 +36,7 @@ threadpool = {version = "1.7", optional = true}
crossbeam-deque = {version = "0.7", optional = true}
time = {version = "0.1", optional = true}
rand = {version = "0.6", optional = true}
num_cpus = {version = "1.10", optional = true}

[dev-dependencies]
env_logger = "0.6"
Expand Down
27 changes: 25 additions & 2 deletions executors/src/crossbeam_channel_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use std::thread;

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct ThreadPool {
core: Arc<Mutex<ThreadPoolCore>>,
sender: channel::Sender<JobMsg>,
Expand Down Expand Up @@ -102,7 +102,15 @@ impl ThreadPool {
}
}

//impl !Sync for ThreadPool {}
/// Create a thread pool with one thread per CPU.
/// On machines with hyperthreading,
/// this will create one thread per hyperthread.
#[cfg(feature = "defaults")]
impl Default for ThreadPool {
fn default() -> Self {
ThreadPool::new(num_cpus::get())
}
}

impl Executor for ThreadPool {
fn execute<F>(&self, job: F)
Expand Down Expand Up @@ -174,6 +182,7 @@ fn spawn_worker(core: Arc<Mutex<ThreadPoolCore>>) {
thread::spawn(move || worker.run());
}

#[derive(Debug)]
struct ThreadPoolCore {
sender: channel::Sender<JobMsg>,
receiver: channel::Receiver<JobMsg>,
Expand Down Expand Up @@ -312,6 +321,20 @@ mod tests {

use super::*;


const LABEL: &'static str = "Crossbeam Channel Pool";

#[test]
fn test_debug() {
let exec = ThreadPool::new(2);
crate::tests::test_debug(&exec, LABEL);
}

#[test]
fn test_defaults() {
crate::tests::test_defaults::<ThreadPool>(LABEL);
}

#[test]
fn run_with_two_threads() {
let _ = env_logger::try_init();
Expand Down
33 changes: 30 additions & 3 deletions executors/src/crossbeam_workstealing_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
//! standard [threadpool](https://crates.io/crates/threadpool), but the
//! implementation runs faster, especially with multiple workers.
//!
//! Uses a per-thread local queues
//! (based on [croosbeam-deque](https://crates.io/crates/crossbeam-deque))
//! Uses per-thread local queues
//! (based on [crossbeam-deque](https://crates.io/crates/crossbeam-deque))
//! for internal scheduling (like a fork-join-pool) and a global queue
//! (based on [crossbeam-channel](https://crates.io/crates/crossbeam-channel))
//! for scheduling from external (non-worker) threads.
Expand Down Expand Up @@ -79,6 +79,7 @@ use std::sync::{Arc, Mutex, Weak};
use std::thread;
use std::time::Duration;
use std::vec::Vec;
use num_cpus;
#[cfg(feature = "ws-timed-fairness")]
use time;

Expand All @@ -95,7 +96,7 @@ thread_local!(
static LOCAL_JOB_QUEUE: UnsafeCell<Option<deque::Worker<Job>>> = UnsafeCell::new(Option::None);
);

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct ThreadPool {
core: Arc<Mutex<ThreadPoolCore>>,
global_sender: Arc<deque::Injector<Job>>,
Expand Down Expand Up @@ -141,6 +142,16 @@ impl ThreadPool {
}
}

/// Create a thread pool with one thread per CPU.
/// On machines with hyperthreading,
/// this will create one thread per hyperthread.
#[cfg(feature = "defaults")]
impl Default for ThreadPool {
fn default() -> Self {
ThreadPool::new(num_cpus::get())
}
}

//impl !Sync for ThreadPool {}

impl Executor for ThreadPool {
Expand Down Expand Up @@ -216,11 +227,13 @@ impl Executor for ThreadPool {
// });
//}

#[derive(Debug)]
struct WorkerEntry {
control: channel::Sender<ControlMsg>,
stealer: Option<JobStealer>,
}

#[derive(Debug)]
struct ThreadPoolCore {
global_injector: Arc<deque::Injector<Job>>,
shutdown: Arc<AtomicBool>,
Expand Down Expand Up @@ -312,6 +325,7 @@ impl Drop for ThreadPoolCore {
}
}

#[derive(Debug)]
struct ThreadPoolWorker {
id: u64,
core: Weak<Mutex<ThreadPoolCore>>,
Expand Down Expand Up @@ -604,6 +618,19 @@ mod tests {

use super::*;

const LABEL: &'static str = "Workstealing Pool";

#[test]
fn test_debug() {
let exec = ThreadPool::new(2);
crate::tests::test_debug(&exec, LABEL);
}

#[test]
fn test_defaults() {
crate::tests::test_defaults::<ThreadPool>(LABEL);
}

#[test]
fn run_with_two_threads() {
let _ = env_logger::try_init();
Expand Down
50 changes: 37 additions & 13 deletions executors/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed
// except according to those terms.
#![doc(html_root_url = "https://docs.rs/executors/0.4.2")]
#![doc(html_root_url = "https://docs.rs/executors/0.4.3")]
#![allow(unused_parens)]

//! This crate provides a number of task executors all implementing the
Expand All @@ -14,17 +14,6 @@
//! General examples can be found in the [`Executor`](common/trait.Executor.html) trait
//! documentation, and implementation specific examples with each implementation module.
#[cfg(any(feature = "cb-channel-exec", feature = "workstealing-exec"))]
extern crate crossbeam_channel;
#[cfg(feature = "workstealing-exec")]
extern crate crossbeam_deque;

#[cfg(feature = "threadpool-exec")]
extern crate threadpool;
#[cfg(feature = "ws-timed-fairness")]
extern crate time;
#[cfg(feature = "workstealing-exec")]
extern crate rand;
#[macro_use]
extern crate log;

Expand All @@ -46,5 +35,40 @@ use crate::common::ignore;
use synchronoise::CountdownEvent;

#[cfg(test)]
mod tests {
pub(crate) mod tests {
use super::*;
use std::sync::Arc;
use std::time::Duration;

pub const N_DEPTH: usize = 32;
pub const N_WIDTH: usize = 8;

pub fn test_debug<E>(exec: &E, label: &str) where E: Executor+std::fmt::Debug {
println!("Debug output for {}: {:?}", label, exec);
}

pub fn test_defaults<E>(label: &str) where E: Executor+std::default::Default+'static {
let pool = E::default();

let latch = Arc::new(CountdownEvent::new(N_DEPTH * N_WIDTH));
for _ in 0..N_WIDTH {
let pool2 = pool.clone();
let latch2 = latch.clone();
pool.execute(move || {
do_step(latch2, pool2, N_DEPTH);
});
}
let res = latch.wait_timeout(Duration::from_secs(5));
assert_eq!(res, 0);
pool.shutdown().unwrap_or_else(|e| error!("Error during pool shutdown {:?} at {}",e, label));
}

fn do_step<E>(latch: Arc<CountdownEvent>, pool: E, depth: usize) where E: Executor+'static {
let new_depth = depth - 1;
ignore(latch.decrement());
if (new_depth > 0) {
let pool2 = pool.clone();
pool.execute(move || {do_step(latch, pool2, new_depth)})
}
}
}
25 changes: 24 additions & 1 deletion executors/src/run_now.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct RunNowExecutor {
active: Arc<AtomicBool>,
}
Expand All @@ -40,6 +40,14 @@ impl RunNowExecutor {
}
}

/// Create an executor running tasks on the invoking thread.
#[cfg(feature = "defaults")]
impl Default for RunNowExecutor {
fn default() -> Self {
RunNowExecutor::new()
}
}

impl Executor for RunNowExecutor {
fn execute<F>(&self, job: F)
where
Expand Down Expand Up @@ -77,6 +85,21 @@ mod tests {
use super::*;
use std::time::Duration;

const LABEL: &'static str = "Run Now";


#[test]
fn test_debug() {
let exec = RunNowExecutor::new();
crate::tests::test_debug(&exec, LABEL);
}

#[test]
fn test_defaults() {
crate::tests::test_defaults::<RunNowExecutor>(LABEL);
}


#[test]
fn run_tasks() {
let _ = env_logger::try_init();
Expand Down
27 changes: 25 additions & 2 deletions executors/src/threadpool_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use threadpool::ThreadPool;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};

#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct ThreadPoolExecutor {
pool: ThreadPool,
active: Arc<AtomicBool>,
Expand Down Expand Up @@ -75,6 +75,16 @@ impl ThreadPoolExecutor {
}
}

/// Create a thread pool with one thread per CPU.
/// On machines with hyperthreading,
/// this will create one thread per hyperthread.
#[cfg(feature = "defaults")]
impl Default for ThreadPoolExecutor {
fn default() -> Self {
ThreadPoolExecutor::new(num_cpus::get())
}
}

impl Executor for ThreadPoolExecutor {
fn execute<F>(&self, job: F)
where
Expand Down Expand Up @@ -114,6 +124,19 @@ mod tests {
use super::*;
use std::time::Duration;

const LABEL: &'static str = "Threadpool";

#[test]
fn test_debug() {
let exec = ThreadPoolExecutor::new(2);
crate::tests::test_debug(&exec, LABEL);
}

#[test]
fn test_defaults() {
crate::tests::test_defaults::<ThreadPoolExecutor>(LABEL);
}

#[test]
fn run_with_two_threads() {
let _ = env_logger::try_init();
Expand Down Expand Up @@ -181,4 +204,4 @@ mod tests {
let res = latch.wait_timeout(Duration::from_secs(1));
assert_eq!(res, 1);
}
}
}

0 comments on commit eeff466

Please sign in to comment.