Skip to content

Commit

Permalink
Fixed parking related logic issue in crossbeam_workstealing_pool and …
Browse files Browse the repository at this point in the history
…added script to test different feature sets on travis
  • Loading branch information
Bathtor committed Sep 9, 2019
1 parent 26ce052 commit 8e1e3d9
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 15 deletions.
6 changes: 5 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ rust:
- nightly
- beta
- stable
script:
- cargo test
- cd executors
- ./test-features.sh

matrix:
allow_failures:
- rust: nightly
- rust: nightly
2 changes: 1 addition & 1 deletion executors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "executors"
# NB: When modifying, also modify:
# 1. html_root_url in lib.rs
# 2. number in readme (for breaking changes)
version = "0.5.1"
version = "0.5.2"
authors = ["Lars Kroll <[email protected]>"]
edition = "2018"
description = "A collection of high-performance task executors."
Expand Down
8 changes: 8 additions & 0 deletions executors/src/crossbeam_channel_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,14 @@ mod tests {
fn test_debug() {
let exec = ThreadPool::new(2);
crate::tests::test_debug(&exec, LABEL);
exec.shutdown().expect("Pool didn't shut down!");
}

#[test]
fn test_sleepy() {
let exec = ThreadPool::new(4);
crate::tests::test_sleepy(&exec, LABEL);
exec.shutdown().expect("Pool didn't shut down!");
}

#[test]
Expand Down
51 changes: 40 additions & 11 deletions executors/src/crossbeam_workstealing_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ where
{
let mut guard = pool.core.lock().unwrap();
for _ in 0..threads {
guard.spawn_worker(pool.core.clone());
guard.spawn_worker(pool.core.clone(), None);
}
}
pool
Expand Down Expand Up @@ -399,8 +399,8 @@ where
}
}

fn spawn_worker(&mut self, core: Arc<Mutex<ThreadPoolCore<P>>>) {
let id = self.new_worker_id();
fn spawn_worker(&mut self, core: Arc<Mutex<ThreadPoolCore<P>>>, old_id: Option<usize>) {
let id = old_id.unwrap_or_else(|| self.new_worker_id());
let (tx_control, rx_control) = channel::unbounded();
let worker = WorkerEntry {
control: tx_control,
Expand All @@ -409,10 +409,13 @@ where
let recv = self.global_injector.clone();
self.workers.insert(id, worker);
let p = self.parker.clone();
thread::spawn(move || {
let mut worker = ThreadPoolWorker::new(id, recv, rx_control, core, p);
worker.run()
});
thread::Builder::new()
.name(format!("cb-ws-pool-worker-{}", id))
.spawn(move || {
let mut worker = ThreadPoolWorker::new(id, recv, rx_control, core, p);
worker.run()
})
.expect("Could not create worker thread!");
}
}

Expand All @@ -424,6 +427,7 @@ where
if !self.shutdown.load(Ordering::SeqCst) {
warn!("Threadpools should be shut down before deallocating.");
// the threads won't leak, as they'll get errors on their control queues next time they check
self.parker.unpark_all(); // must wake them all up so they don't idle forever
}
}
}
Expand Down Expand Up @@ -463,11 +467,14 @@ where
parker,
}
}

#[inline(always)]
fn id(&self) -> &usize {
&self.id
}

fn abort_sleep(&self, backoff: &Backoff, parking: &mut bool, snoozing: &mut bool) -> () {
#[inline(always)]
fn abort_sleep(&self, backoff: &Backoff, snoozing: &mut bool, parking: &mut bool) -> () {
if *parking {
#[cfg(feature = "ws-no-park")]
unreachable!("parking should never be true in ws-no-park!");
Expand All @@ -483,8 +490,9 @@ where
}
}

#[inline(always)]
#[cfg(not(feature = "ws-no-park"))]
fn stop_sleep(&self, backoff: &Backoff, parking: &mut bool, snoozing: &mut bool) -> () {
fn stop_sleep(&self, backoff: &Backoff, snoozing: &mut bool, parking: &mut bool) -> () {
backoff.reset();
*snoozing = false;
*parking = false;
Expand All @@ -500,6 +508,7 @@ where
});
let local_stealer = JobStealer::new(local_stealer_raw, self.id);
self.register_stealer(local_stealer.clone());
self.parker.init(*self.id());
let sentinel = Sentinel::new(self.core.clone(), self.id);
let backoff = Backoff::new();
let mut snoozing = false;
Expand Down Expand Up @@ -557,7 +566,7 @@ where
},
Err(channel::TryRecvError::Empty) => break 'ctrl,
Err(channel::TryRecvError::Disconnected) => {
debug!("Worker {} self-terminating.", self.id());
warn!("Worker {} self-terminating.", self.id());
sentinel.cancel();
panic!("Threadpool wasn't shut down properly!");
}
Expand Down Expand Up @@ -768,7 +777,8 @@ where
// cleanup
guard.drop_worker(self.id);
// restart
guard.spawn_worker(core.clone());
// make sure the new thread starts with the same worker id, so the parker doesn't run out of slots
guard.spawn_worker(core.clone(), Some(self.id));
drop(guard);
for job in jobs.into_iter() {
gsend.push(job);
Expand All @@ -792,6 +802,14 @@ mod tests {
fn test_debug() {
let exec = ThreadPool::new(2, parker::small());
crate::tests::test_debug(&exec, LABEL);
exec.shutdown().expect("Pool didn't shut down!");
}

#[test]
fn test_sleepy() {
let exec = ThreadPool::new(4, parker::small());
crate::tests::test_sleepy(&exec, LABEL);
exec.shutdown().expect("Pool didn't shut down!");
}

#[test]
Expand Down Expand Up @@ -938,12 +956,23 @@ mod tests {
assert_eq!(res, 1);
}

/// This test may/should panic on the worker threads, for example with "Threadpool wasn't shut down properly!".
/// Otherwise they aren't shut down properly.
#[test]
fn dealloc_on_handle_drop() {
let _ = env_logger::try_init();

let pool = ThreadPool::new(1, parker::small());
let latch = Arc::new(CountdownEvent::new(1));
let latch2 = latch.clone();
pool.execute(move || {
latch2.decrement().expect("Latch should have decremented!");
});
let res = latch.wait_timeout(Duration::from_secs(1));
assert_eq!(res, 0);
let core = Arc::downgrade(&pool.core);
// sleep before drop, to ensure that even parked threads shut down
std::thread::sleep(std::time::Duration::from_secs(1));
drop(pool);
std::thread::sleep(std::time::Duration::from_secs(1));
assert!(core.upgrade().is_none());
Expand Down
29 changes: 27 additions & 2 deletions executors/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
// <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed
// except according to those terms.
#![doc(html_root_url = "https://docs.rs/executors/0.5.1")]
#![doc(html_root_url = "https://docs.rs/executors/0.5.2")]
#![allow(unused_parens)]

//! This crate provides a number of task executors all implementing the
Expand Down Expand Up @@ -35,6 +35,8 @@ pub use crate::common::Executor;
//use bichannel::*;
use synchronoise::CountdownEvent;

// TODO add default implementation for abstract executor impl with associated type executable

#[cfg(test)]
pub(crate) mod tests {
use super::*;
Expand Down Expand Up @@ -71,12 +73,35 @@ pub(crate) mod tests {
.unwrap_or_else(|e| error!("Error during pool shutdown {:?} at {}", e, label));
}

pub fn test_sleepy<E>(pool: &E, label: &str)
where
E: Executor + 'static,
{
info!("Running sleepy test for {}", label);
let latch = Arc::new(CountdownEvent::new(2 * N_WIDTH));
for _ in 0..N_WIDTH {
let latch2 = latch.clone();
pool.execute(move || {
latch2.decrement().expect("Latch didn't decrement!");
});
}
std::thread::sleep(Duration::from_secs(1));
for _ in 0..N_WIDTH {
let latch2 = latch.clone();
pool.execute(move || {
latch2.decrement().expect("Latch didn't decrement!");
});
}
let res = latch.wait_timeout(Duration::from_secs(5));
assert_eq!(res, 0);
}

fn do_step<E>(latch: Arc<CountdownEvent>, pool: E, depth: usize)
where
E: Executor + 'static,
{
let new_depth = depth - 1;
ignore(latch.decrement());
latch.decrement().expect("Latch didn't decrement!");
if (new_depth > 0) {
let pool2 = pool.clone();
pool.execute(move || do_step(latch, pool2, new_depth))
Expand Down
45 changes: 45 additions & 0 deletions executors/src/parker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ pub trait Parker: Send + Clone {
/// Maximum number of threads supported by this parker implementation.
fn max_threads(&self) -> Option<usize>;

/// Set thread state to `Awake`, no matter what the current state is.
///
/// This is mostly meant for cleanup after a thread panic, where the state is unkown.
fn init(&self, thread_id: usize) -> ();

/// Prepare to go to sleep.
/// You *must* call this before doing one last check on the resource being managed.
/// If the resource is available afterwards, you *must* call [`abort_park`].
Expand Down Expand Up @@ -92,6 +97,11 @@ impl Parker for DynParker {
self.inner.max_threads()
}

#[inline(always)]
fn init(&self, thread_id: usize) -> () {
self.inner.init(thread_id);
}

#[inline(always)]
fn prepare_park(&self, thread_id: usize) -> () {
self.inner.prepare_park(thread_id);
Expand Down Expand Up @@ -169,6 +179,11 @@ where
self.inner.max_threads()
}

#[inline(always)]
fn init(&self, thread_id: usize) -> () {
self.inner.init(thread_id);
}

#[inline(always)]
fn prepare_park(&self, thread_id: usize) -> () {
self.inner.prepare_park(thread_id);
Expand Down Expand Up @@ -203,6 +218,9 @@ pub trait ThreadData: std::fmt::Debug {
/// Maximum number of threads supported by this parker implementation.
fn max_threads(&self) -> Option<usize>;

/// Initialise thread at `thread_id`.
fn init(&self, thread_id: usize) -> ();

/// Prepare to go to sleep.
fn prepare_park(&self, thread_id: usize) -> ();

Expand Down Expand Up @@ -251,6 +269,15 @@ impl ThreadData for SmallThreadData {
Some(SmallThreadData::MAX_THREADS)
}

fn init(&self, thread_id: usize) -> () {
assert!(thread_id < 32);
if let Ok(mut guard) = self.sleeping.lock() {
guard[thread_id] = ParkState::Awake;
} else {
panic!("Mutex is poisoned!");
}
}

fn prepare_park(&self, thread_id: usize) -> () {
assert!(thread_id < 32);
self.sleep_set.set_at(thread_id);
Expand Down Expand Up @@ -387,6 +414,15 @@ impl ThreadData for LargeThreadData {
Some(LargeThreadData::MAX_THREADS)
}

fn init(&self, thread_id: usize) -> () {
assert!(thread_id < 64);
if let Ok(mut guard) = self.sleeping.lock() {
guard[thread_id] = ParkState::Awake;
} else {
panic!("Mutex is poisoned!");
}
}

fn prepare_park(&self, thread_id: usize) -> () {
assert!(thread_id < 64);
self.sleep_set.set_at(thread_id);
Expand Down Expand Up @@ -527,6 +563,15 @@ impl ThreadData for DynamicThreadData {
None
}

fn init(&self, thread_id: usize) -> () {
if let Ok(mut guard) = self.data.lock() {
// Doesn't matter if it was there or not
let _ = guard.sleeping.remove(&thread_id);
} else {
panic!("Mutex is poisoned!");
}
}

fn prepare_park(&self, _thread_id: usize) -> () {
self.sleep_count.fetch_add(1usize, Ordering::SeqCst);
}
Expand Down
8 changes: 8 additions & 0 deletions executors/src/threadpool_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,14 @@ mod tests {
fn test_debug() {
let exec = ThreadPoolExecutor::new(2);
crate::tests::test_debug(&exec, LABEL);
exec.shutdown().expect("Pool didn't shut down!");
}

#[test]
fn test_sleepy() {
let exec = ThreadPoolExecutor::new(4);
crate::tests::test_sleepy(&exec, LABEL);
exec.shutdown().expect("Pool didn't shut down!");
}

#[test]
Expand Down
14 changes: 14 additions & 0 deletions executors/test-features.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
#!/bin/bash
set +e

echo "%%%%%% Testing default features %%%%%%"
cargo test
echo "%%%%%% Finished testing default features %%%%%%"

echo "%%%%%% Testing feature ws-no-park %%%%%%"
cargo test --features ws-no-park
echo "%%%%%% Finished testing feature ws-no-park %%%%%%"

echo "%%%%%% Testing feature !ws-timed-fairness %%%%%%"
cargo test --no-default-features --features threadpool-exec, cb-channel-exec, workstealing-exec, defaults
echo "%%%%%% Finished testing feature !ws-timed-fairness %%%%%%"

0 comments on commit 8e1e3d9

Please sign in to comment.