Skip to content

Commit

Permalink
Update docs and prepare for 0.7.0 release
Browse files Browse the repository at this point in the history
  • Loading branch information
Bathtor committed Jul 13, 2020
1 parent 7edd216 commit 9c260d2
Show file tree
Hide file tree
Showing 10 changed files with 107 additions and 16 deletions.
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Add this to your `Cargo.toml`:

```toml
[dependencies]
executors = "0.6"
executors = "0.7"
```

You can use, for example, the [crossbeam_workstealing_pool](https://docs.rs/executors/latest/executors/crossbeam_workstealing_pool/index.html) to schedule a number `n_jobs` over a number `n_workers` threads, and collect the results via an `mpsc::channel`.
Expand Down Expand Up @@ -59,7 +59,7 @@ You can enable support for pinning pool threads to particular CPU cores via the

## Some Numbers

The following are some example result from my desktop machine (Intel i7-4770 @ 3.40Ghz Quad-Core with HT (8 logical cores) with 16GB of RAM).
The following are some example results from my desktop machine (Intel i7-4770 @ 3.40Ghz Quad-Core with HT (8 logical cores) with 16GB of RAM).
*Note* that they are all from a single run and thus not particularly scientific and subject to whatever else was going on on my system during the run.

Implementation abbreviations:
Expand Down Expand Up @@ -170,6 +170,6 @@ This corresponds to a very large internal workload in response to every external

## License

Licensed under the terms of MIT license.
Licensed under the terms of the MIT license.

See [LICENSE](LICENSE) for details.
2 changes: 1 addition & 1 deletion executors/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ name = "executors"
# NB: When modifying, also modify:
# 1. html_root_url in lib.rs
# 2. number in readme (for breaking changes)
version = "0.6.1"
version = "0.7.0"
authors = ["Lars Kroll <[email protected]>"]
edition = "2018"
description = "A collection of high-performance task executors."
Expand Down
32 changes: 28 additions & 4 deletions executors/src/bichannel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ pub fn bichannel<Left, Right>() -> (Endpoint<Right, Left>, Endpoint<Left, Right>
(endpoint_left, endpoint_right)
}

/// One end of a bichannel
///
/// This end can send `In` type to the other end
/// and receive `Out` type from the other end.
pub struct Endpoint<In, Out> {
sender: Sender<Out>,
receiver: Receiver<In>,
Expand All @@ -46,32 +50,52 @@ impl<In, Out> Endpoint<In, Out> {
fn new(sender: Sender<Out>, receiver: Receiver<In>) -> Endpoint<In, Out> {
Endpoint { sender, receiver }
}

/// Send `t` to the other end
///
/// This functions works just like [mpsc send](std::sync::mpsc::Sender::send).
pub fn send(&self, t: Out) -> Result<(), SendError<Out>> {
self.sender.send(t)
}

/// Receive something from the channel without blocking
///
/// This functions works just like [mpsc try_recv](std::sync::mpsc::Receiver::try_recv).
pub fn try_recv(&self) -> Result<In, TryRecvError> {
self.receiver.try_recv()
}

/// Receive something from the channel, blocking until something is available
///
/// This functions works just like [mpsc recv](std::sync::mpsc::Receiver::recv).
pub fn recv(&self) -> Result<In, RecvError> {
self.receiver.recv()
}

/// Receive something from the channel, blocking until something is available or the timeout expires
///
/// This functions works just like [mpsc recv_timeout](std::sync::mpsc::Receiver::recv_timeout).
pub fn recv_timeout(&self, timeout: Duration) -> Result<In, RecvTimeoutError> {
self.receiver.recv_timeout(timeout)
}

/// Iterate over incoming data
///
/// This functions works just like [mpsc iter](std::sync::mpsc::Receiver::iter).
pub fn iter(&self) -> Iter<'_, In> {
self.receiver.iter()
}

/// Iterate over incoming data
///
/// This functions works just like [mpsc try_iter](std::sync::mpsc::Receiver::try_iter).
pub fn try_iter(&self) -> TryIter<'_, In> {
self.receiver.try_iter()
}
}

//impl<In, Out> !Sync for Endpoint<In, Out> {}
unsafe impl<In: Send, Out: Send> Send for Endpoint<In, Out> {}

#[cfg(test)]
mod tests {
use env_logger;

use super::*;
use crate::common::ignore;
Expand Down
2 changes: 2 additions & 0 deletions executors/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
// This file may not be copied, modified, or distributed
// except according to those terms.

//! The core traits and reusable functions of this crate.
use std::fmt::Debug;

/// A minimal trait for task executors.
Expand Down
3 changes: 3 additions & 0 deletions executors/src/crossbeam_channel_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ use std::sync::{Arc, Mutex, Weak};
use std::thread;
use std::time::Duration;

/// A handle to a [crossbeam_channel_pool](crossbeam_channel_pool)
///
/// See module level documentation for usage information.
#[derive(Clone, Debug)]
pub struct ThreadPool {
core: Arc<Mutex<ThreadPoolCore>>,
Expand Down
21 changes: 21 additions & 0 deletions executors/src/futures_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
// This file may not be copied, modified, or distributed
// except according to those terms.

//! Support for Rust's futures and async/await APIs
//!
//! This functionality is provided via the [FuturesExecutor](FuturesExecutor)
//! trait, which is implemented for all executors in this crate that can efficiently support it.
use super::*;
use std::future::Future;

Expand All @@ -17,6 +22,22 @@ pub trait FuturesExecutor: Executor + Sync + 'static {
/// Spawn `future` on the pool and return a handle to the result
///
/// Handles can be awaited like any other future.
///
/// # Examples
///
/// Execute an "expensive" computation on the pool and
/// block the main thread waiting for the result to become available.
///
/// ```
/// use executors::*;
/// use futures::executor::block_on;
/// # use executors::crossbeam_channel_pool::ThreadPool;
/// // initialise some executor
/// # let executor = ThreadPool::new(2);
/// let handle = executor.spawn(async move { 2*2 });
/// let result = block_on(handle).expect("result");
/// assert_eq!(4, result);
/// ```
fn spawn<R: Send + 'static>(
&self,
future: impl Future<Output = R> + 'static + Send,
Expand Down
3 changes: 2 additions & 1 deletion executors/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
// <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed
// except according to those terms.
#![doc(html_root_url = "https://docs.rs/executors/0.6.1")]
#![doc(html_root_url = "https://docs.rs/executors/0.7.0")]
#![deny(missing_docs)]
#![allow(unused_parens)]
#![allow(clippy::unused_unit)]

Expand Down
42 changes: 36 additions & 6 deletions executors/src/parker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
// This file may not be copied, modified, or distributed
// except according to those terms.

//! A reusable thread-pool-parking mechanism.
//!
//! This module is mostly meant for internal use within this crate,
//! but could be used to build custom thread-pools as well.
use arr_macro::arr;
use std::collections::BTreeMap;
use std::fmt;
Expand All @@ -14,12 +19,17 @@ use std::sync::{Arc, Mutex};
use std::thread;
use std::thread::Thread;

/// Get a parker for up to 32 threads
pub fn small() -> StaticParker<SmallThreadData> {
StaticParker::new(SmallThreadData::new())
}

/// Get a parker for up to 64 threads
pub fn large() -> StaticParker<LargeThreadData> {
StaticParker::new(LargeThreadData::new())
}

/// Get a parker for an unbounded number of threads
pub fn dynamic() -> StaticParker<DynamicThreadData> {
StaticParker::new(DynamicThreadData::new())
}
Expand All @@ -29,16 +39,17 @@ pub fn dynamic() -> StaticParker<DynamicThreadData> {
pub enum ParkResult {
/// Simply retry parking later.
/// Usually, indicates that implementation wanted avoid blocking on a lock.
/// The parker was left in the "prepared" state (s. [`prepare_park`]).
/// The parker was left in the "prepared" state (s. [prepare_park](Parker::prepare_park)).
Retry,
/// Recheck managed resource before retrying.
/// The parker was moved out of the "prepared" state (s. [`abort_park`]).
/// The parker was moved out of the "prepared" state (s. [abort_park](Parker::abort_park)).
Abort,
/// Thread parked and then was awoken via `unpark`.
/// The parker was moved out of the "prepared" state (s. [`abort_park`]).
/// The parker was moved out of the "prepared" state (s. [unpark_one](Parker::unpark_one) and [unpark_all](Parker::unpark_all)).
Woken,
}

/// The core trait that every parker implementation must provide
pub trait Parker: Sync + Send + Clone {
/// Maximum number of threads supported by this parker implementation.
fn max_threads(&self) -> Option<usize>;
Expand All @@ -50,15 +61,15 @@ pub trait Parker: Sync + Send + Clone {

/// Prepare to go to sleep.
/// You *must* call this before doing one last check on the resource being managed.
/// If the resource is available afterwards, you *must* call [`abort_park`].
/// If it is not available afterwards, then call [`park`].
/// If the resource is available afterwards, you *must* call [abort_park](Parker::abort_park).
/// If it is not available afterwards, then call [park](Parker::park).
///
/// The provided `thread_id` must fit into the underlying parker implementation, or a panic will be issued!
///
fn prepare_park(&self, thread_id: usize) -> ();

/// Abort attempt at going to sleep.
/// You *must* call this if you have called [`prepare_park`], but then the resource became available.
/// You *must* call this if you have called [prepare_park](Parker::prepare_park), but then the resource became available.
///
/// The provided `thread_id` must fit into the underlying parker implementation, or a panic will be issued!
///
Expand All @@ -80,6 +91,7 @@ pub trait Parker: Sync + Send + Clone {
fn unpark_all(&self) -> ();
}

/// A trait-object-like wrapper around a concrete parker instance
#[derive(Clone, Debug)]
pub struct DynParker {
inner: Arc<dyn ThreadData + Sync + Send>,
Expand Down Expand Up @@ -132,6 +144,9 @@ impl Parker for DynParker {
}
}

/// A concrete parker instance
///
/// The generic parameter `T` implements the internal state management of the parker.
#[derive(Debug)]
pub struct StaticParker<T>
where
Expand Down Expand Up @@ -214,6 +229,7 @@ where
}
}

/// A trait that manages the internal state of a parker implementation
pub trait ThreadData: std::fmt::Debug {
/// Maximum number of threads supported by this parker implementation.
fn max_threads(&self) -> Option<usize>;
Expand Down Expand Up @@ -249,12 +265,14 @@ enum ParkState {
Waking,
}

/// Parker state large enough for up to 32 threads
#[derive(Debug)]
pub struct SmallThreadData {
sleep_set: AtomicU32,
sleeping: Mutex<[ParkState; 32]>,
}
impl SmallThreadData {
/// The maximum number of threads supported by this parker
pub const MAX_THREADS: usize = 32;

fn new() -> SmallThreadData {
Expand Down Expand Up @@ -386,11 +404,13 @@ impl ThreadData for SmallThreadData {
}
}

/// Parker state for up to 64 threads
pub struct LargeThreadData {
sleep_set: AtomicU64,
sleeping: Mutex<[ParkState; 64]>,
}
impl LargeThreadData {
/// The maximum number of threads supported by this parker
pub const MAX_THREADS: usize = 64;

fn new() -> LargeThreadData {
Expand Down Expand Up @@ -545,6 +565,16 @@ impl InnerData {
}
}

/// Park state for an unbounded number of threads
///
/// Uses a dynamic datastructure internally
/// to keep track of the state for each thread.
///
/// # Note
///
/// Technically, since thread-ids are `usize`,
/// there still is an upper bound for the maximum
/// number of threads in a pool, i.e. `usize::MAX`.
#[derive(Debug)]
pub struct DynamicThreadData {
sleep_count: AtomicUsize,
Expand Down
9 changes: 8 additions & 1 deletion executors/src/run_now.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@
// This file may not be copied, modified, or distributed
// except according to those terms.

//! A simple `Executor` that simply runs tasks on the current thread.
//! A simple `Executor` that simply runs tasks immediately on the current thread.
//!
//! This is mostly useful to work with APIs that require an `Executor`,
//! even if "normal" stack-based execution is actually desired.
//!
//! # Examples
//!
Expand All @@ -32,6 +35,9 @@ use super::*;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;

/// A handle to the [run_now](run_now) executor
///
/// See module level documentation for usage information.
#[derive(Clone, Debug)]
pub struct RunNowExecutor {
active: Arc<AtomicBool>,
Expand All @@ -45,6 +51,7 @@ impl CanExecute for ThreadLocalRunNow {
}

impl RunNowExecutor {
/// Create a new [run_now](run_now) executor
pub fn new() -> RunNowExecutor {
RunNowExecutor {
active: Arc::new(AtomicBool::new(true)),
Expand Down
3 changes: 3 additions & 0 deletions executors/src/threadpool_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use threadpool::ThreadPool;

/// A handle to a [threadpool_executor](threadpool_executor)
///
/// See module level documentation for usage information.
#[derive(Clone, Debug)]
pub struct ThreadPoolExecutor {
pool: ThreadPool,
Expand Down

0 comments on commit 9c260d2

Please sign in to comment.