Skip to content

Commit

Permalink
Merge pull request #4 from Bathtor/pinning
Browse files Browse the repository at this point in the history
Thread Pinning fixes #3
  • Loading branch information
Bathtor authored Feb 20, 2020
2 parents a4714e6 + fdc2f14 commit 82f463f
Show file tree
Hide file tree
Showing 11 changed files with 233 additions and 36 deletions.
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Add this to your `Cargo.toml`:

```toml
[dependencies]
executors = "0.5"
executors = "0.6"
```

You can use, for example, the [crossbeam_workstealing_pool](https://docs.rs/executors/latest/executors/crossbeam_workstealing_pool/index.html) to schedule a number `n_jobs` over a number `n_workers` threads, and collect the results via an `mpsc::channel`.
Expand Down Expand Up @@ -53,6 +53,10 @@ If you don't know what hardware your code is going to run on, use the [crossbeam

If you *absolutely need* low response time to bursty workloads, you can compile the crate with the `ws-no-park` feature, which prevents the workers in the [crossbeam_workstealing_pool](https://docs.rs/executors/latest/executors/crossbeam_workstealing_pool/index.html) from parking their threads, when all the task-queues are temporarily empty. This will, of course, not play well with other tasks running on the same system, although the threads are still yielded to the OS scheduler in between queue checks. See latency results below to get an idea of the performance impact of this feature.

## Core Affinity

You can enable support for pinning pool threads to particular CPU cores via the `"thread-pinning"` feature. It will then pin by default as many threads as there are core ids. If you are asking for more threads than that, the rest will be created unpinned. You can also assign only a subset of your core ids to a thread pool by using the `with_affinity(...)` instead of the `new(...)` function.

## Some Numbers

The following are some example result from my desktop machine (Intel i7-4770 @ 3.40Ghz Quad-Core with HT (8 logical cores) with 16GB of RAM).
Expand Down
9 changes: 5 additions & 4 deletions executor-performance/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "executor-performance"
version = "0.1.1"
version = "0.1.2"
authors = ["Lars Kroll <[email protected]>"]
edition = "2018"

Expand All @@ -10,8 +10,9 @@ time = "0.1"

[dependencies.rustc-test]
version = "0.3"
#features = ["asm_black_box"] # uncomment if you want to use --pre or --post
# features = ["asm_black_box"] # uncomment if you want to use --pre or --post
# otherwise the compiler will just optimise out the useless work they do
# doesn't work on travis (stable, beta), though

[dependencies.clap]
version = "2"
Expand All @@ -20,6 +21,6 @@ features = ["color"]
[dependencies.executors]
path = "../executors/"
default-features = true # to use job counting instead of timed global check set this to false and uncomment next line
# features = ["threadpool-exec", "cb-channel-exec", "workstealing-exec"]
# features = ["threadpool-exec", "cb-channel-exec", "workstealing-exec", "thread-pinning"]
# or to use timed and no parking uncomment next line
# features = ["threadpool-exec", "cb-channel-exec", "workstealing-exec", "ws-no-park", "ws-timed-fairness"]
# features = ["threadpool-exec", "cb-channel-exec", "workstealing-exec", "ws-no-park", "ws-timed-fairness", "thread-pinning"]
3 changes: 1 addition & 2 deletions executor-performance/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use executors::crossbeam_channel_pool::ThreadPool as CCExecutor;
use executors::crossbeam_workstealing_pool::ThreadPool as CWSExecutor;
use executors::threadpool_executor::ThreadPoolExecutor as TPExecutor;
use executors::*;
use std::error::Error;
use std::fs::File;
use std::fs::OpenOptions;
use std::io::prelude::*;
Expand Down Expand Up @@ -108,7 +107,7 @@ fn main() {
let path = Path::new(opts.value_of("csv-file").unwrap());
let display = path.display();
let file = match OpenOptions::new().append(true).create(true).open(&path) {
Err(why) => panic!("couldn't open {}: {}", display, why.description()),
Err(why) => panic!("couldn't open {}: {}", display, why),
Ok(file) => file,
};
println!("Results will be added to {}", display);
Expand Down
2 changes: 1 addition & 1 deletion executor-performance/threadinc.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ OUT_LOC="out.csv"
rm $OUT_LOC;
for i in $(seq $MIN_THREADS $MAX_THREADS); do
echo "Run $i starting";
$BIN_LOC -t $i -p $INPAR -m $MSGS -a $AMP -o $OUT_LOC --pre 10000 --post 10000 --skip-tpe;
$BIN_LOC -t $i -p $INPAR -m $MSGS -a $AMP -o $OUT_LOC --skip-tpe throughput --pre 10000 --post 10000;
echo "Run $i finished";
done
17 changes: 10 additions & 7 deletions executors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "executors"
# NB: When modifying, also modify:
# 1. html_root_url in lib.rs
# 2. number in readme (for breaking changes)
version = "0.5.3"
version = "0.6.0"
authors = ["Lars Kroll <[email protected]>"]
edition = "2018"
description = "A collection of high-performance task executors."
Expand All @@ -24,27 +24,30 @@ workstealing-exec = ["crossbeam-channel", "crossbeam-deque", "rand", "crossbeam-
defaults = ["num_cpus"]

# In the workstealing executor, check the global queues every 1ms
ws-timed-fairness = ["time"]
ws-timed-fairness = [] #["time"]
# Otherwise check the global queues every 100 jobs

# In the workstealing executor, never park worker threads
ws-no-park = []

thread-pinning = ["core_affinity"]


[dependencies]
log = "0.4"
synchronoise = "1.0"
arr_macro = "0.1.2"
crossbeam-channel = {version = "0.3", optional = true}
crossbeam-channel = {version = "0.4", optional = true}
threadpool = {version = "1.7", optional = true}
crossbeam-utils = {version = "0.6", optional = true}
crossbeam-utils = {version = "0.7", optional = true}
crossbeam-deque = {version = "0.7", optional = true}
time = {version = "0.1", optional = true}
#time = {version = "0.2", optional = true, default-featurs = false, features = ["std", ""]}
rand = {version = "0.7", optional = true}
num_cpus = {version = "1.10", optional = true}
num_cpus = {version = "1.12", optional = true}
core_affinity = {version = "0.5", optional = true}

[dev-dependencies]
env_logger = "0.6"
env_logger = "0.7"
version-sync = "0.8"


Expand Down
83 changes: 82 additions & 1 deletion executors/src/crossbeam_channel_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ impl ThreadPool {
///
/// This function will panic if `threads` is 0.
///
/// # Core Affinity
///
/// If compiled with `thread-pinning` it will assign a worker to each cores,
/// until it runs out of cores or workers. If there are more workers than cores,
/// the extra workers will be "floating", i.e. not pinned.
///
/// # Examples
///
/// Create a new thread pool capable of executing four jobs concurrently:
Expand All @@ -95,7 +101,59 @@ impl ThreadPool {
threads,
shutdown,
};
for _ in 0..threads {
#[cfg(not(feature = "thread-pinning"))]
{
for _ in 0..threads {
spawn_worker(pool.core.clone());
}
}
#[cfg(feature = "thread-pinning")]
{
let cores = core_affinity::get_core_ids().expect("core ids");
let num_pinned = cores.len().min(threads);
for i in 0..num_pinned {
spawn_worker_pinned(pool.core.clone(), cores[i]);
}
if num_pinned < threads {
let num_unpinned = threads - num_pinned;
for _ in 0..num_unpinned {
spawn_worker(pool.core.clone());
}
}
}
pool
}

/// Creates a new thread pool capable of executing `threads` number of jobs concurrently with a particular core affinity.
///
/// For each core id in the `core` slice, it will generate a single thread pinned to that id.
/// Additionally, it will create `floating` number of unpinned threads.
///
/// # Panics
///
/// This function will panic if `cores.len() + floating` is 0.
#[cfg(feature = "thread-pinning")]
pub fn with_affinity(cores: &[core_affinity::CoreId], floating: usize) -> ThreadPool {
let total_threads = cores.len() + floating;
assert!(total_threads > 0);
let (tx, rx) = channel::unbounded();
let shutdown = Arc::new(AtomicBool::new(false));
let pool = ThreadPool {
core: Arc::new(Mutex::new(ThreadPoolCore {
sender: tx.clone(),
receiver: rx.clone(),
shutdown: shutdown.clone(),
threads: total_threads,
ids: 0,
})),
sender: tx.clone(),
threads: total_threads,
shutdown,
};
cores.iter().for_each(|core_id| {
spawn_worker_pinned(pool.core.clone(), *core_id);
});
for _ in 0..floating {
spawn_worker(pool.core.clone());
}
pool
Expand Down Expand Up @@ -183,6 +241,19 @@ fn spawn_worker(core: Arc<Mutex<ThreadPoolCore>>) {
thread::spawn(move || worker.run());
}

#[cfg(feature = "thread-pinning")]
fn spawn_worker_pinned(core: Arc<Mutex<ThreadPoolCore>>, core_id: core_affinity::CoreId) {
let id = {
let mut guard = core.lock().unwrap();
guard.new_worker_id()
};
let mut worker = ThreadPoolWorker::new(id, core.clone());
thread::spawn(move || {
core_affinity::set_for_current(core_id);
worker.run()
});
}

#[derive(Debug)]
struct ThreadPoolCore {
sender: channel::Sender<JobMsg>,
Expand Down Expand Up @@ -342,6 +413,16 @@ mod tests {
crate::tests::test_defaults::<ThreadPool>(LABEL);
}

#[cfg(feature = "thread-pinning")]
#[test]
fn test_custom_affinity() {
let cores = core_affinity::get_core_ids().expect("core ids");
// travis doesn't have enough cores for this
//let exec = ThreadPool::with_affinity(&cores[0..4], 4);
let exec = ThreadPool::with_affinity(&cores[0..1], 1);
crate::tests::test_custom(exec, LABEL);
}

#[test]
fn run_with_two_threads() {
let _ = env_logger::try_init();
Expand Down
Loading

0 comments on commit 82f463f

Please sign in to comment.