Skip to content

Commit

Permalink
update deps and make clippy happy
Browse files Browse the repository at this point in the history
  • Loading branch information
Bathtor committed Jan 11, 2025
1 parent dfa71da commit d08ebff
Show file tree
Hide file tree
Showing 16 changed files with 369 additions and 311 deletions.
6 changes: 2 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
[workspace]
members = [
"executors",
"executor-performance",
]
members = ["executors", "executor-performance"]
resolver = "2"
4 changes: 2 additions & 2 deletions executor-performance/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ synchronoise = "1.0"
quanta = "0.6"
#time = "0.2"

[dependencies.rustc-test]
version = "0.3"
# [dependencies.rustc-test]
# version = "0.3"
# features = ["asm_black_box"] # uncomment if you want to use --pre or --post
# otherwise the compiler will just optimise out the useless work they do
# doesn't work on travis (stable, beta), though
Expand Down
12 changes: 2 additions & 10 deletions executor-performance/src/experiment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,6 @@ use super::*;
use std::{default::Default, sync::Arc, thread};
use synchronoise::CountdownEvent;

#[cfg(feature = "nightly")]
use test::black_box;

#[cfg(not(feature = "nightly"))]
fn black_box<T>(dummy: T) -> T {
dummy
}

#[derive(Clone, Debug)]
pub struct ExperimentSettings {
num_threads: usize,
Expand Down Expand Up @@ -181,7 +173,7 @@ fn do_work(n: u64) -> u64 {

fn amplify<E: Executor + 'static>(exec: E, pre_work: u64, post_work: u64, remaining: u64) {
black_box(do_work(pre_work));
if (remaining > 0) {
if remaining > 0 {
let amp_exec = exec.clone();
exec.execute(move || amplify(amp_exec, pre_work, post_work, remaining - 1));
}
Expand All @@ -196,7 +188,7 @@ fn amplify_finish<E: Executor + 'static>(
remaining: u64,
) {
black_box(do_work(pre_work));
if (remaining > 0) {
if remaining > 0 {
let amp_exec = exec.clone();
exec.execute(move || {
amplify_finish(amp_exec, pre_work, post_work, finisher, remaining - 1)
Expand Down
12 changes: 2 additions & 10 deletions executor-performance/src/latency_experiment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,6 @@ use crate::stats::Stats;
use std::{default::Default, sync::Arc, thread, time::Duration};
use synchronoise::CountdownEvent;

#[cfg(feature = "nightly")]
use test::black_box;

#[cfg(not(feature = "nightly"))]
fn black_box<T>(dummy: T) -> T {
dummy
}

const PRE_WORK: u64 = 10;
const POST_WORK: u64 = 10;

Expand Down Expand Up @@ -185,7 +177,7 @@ fn do_work(n: u64) -> u64 {

fn amplify<E: Executor + 'static>(exec: E, remaining: u64) {
black_box(do_work(POST_WORK));
if (remaining > 0) {
if remaining > 0 {
let amp_exec = exec.clone();
exec.execute(move || amplify(amp_exec, remaining - 1));
}
Expand All @@ -194,7 +186,7 @@ fn amplify<E: Executor + 'static>(exec: E, remaining: u64) {

fn amplify_finish<E: Executor + 'static>(exec: E, finisher: Finisher, remaining: u64) {
black_box(do_work(PRE_WORK));
if (remaining > 0) {
if remaining > 0 {
let amp_exec = exec.clone();
exec.execute(move || amplify_finish(amp_exec, finisher, remaining - 1));
black_box(do_work(POST_WORK));
Expand Down
10 changes: 8 additions & 2 deletions executor-performance/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
// <LICENSE or http://opensource.org/licenses/MIT>.
// This file may not be copied, modified, or distributed
// except according to those terms.
#![cfg_attr(feature = "nightly", feature(test))]
#![allow(unused_parens)]
#![allow(clippy::style)] // not important in the testing app

pub mod experiment;
Expand All @@ -28,6 +26,14 @@ use std::{
path::Path,
};

//#[cfg(feature = "nightly")]
use std::hint::black_box;

//#[cfg(not(feature = "nightly"))]
// fn black_box<T>(dummy: T) -> T {
// dummy
// }

fn main() {
let throughput = SubCommand::with_name("throughput")
.about("Runs a throughput performance evaluation of all Executor implementations.")
Expand Down
2 changes: 1 addition & 1 deletion executor-performance/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub trait Statistics {
}
}

impl<'a> Statistics for &'a Vec<f64> {
impl Statistics for &Vec<f64> {
fn sample_size(&self) -> f64 {
self.len() as f64
}
Expand Down
6 changes: 3 additions & 3 deletions executors/benches/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ mod spawn_many {
});
}

let _ = rx.recv().unwrap();
rx.recv().unwrap();
});

rt.shutdown().expect("shutdown");
Expand All @@ -260,7 +260,7 @@ mod spawn_many {
.detach();
}

let _ = rx.recv().unwrap();
rx.recv().unwrap();
});
});

Expand Down Expand Up @@ -387,7 +387,7 @@ mod ping_pong {

let res = done_rx.recv_timeout(Duration::from_millis(5000));
assert!(
!res.is_err(),
res.is_ok(),
"done_rx timeouted within 5s. Remaining={}",
outer_rem.load(SeqCst)
);
Expand Down
1 change: 1 addition & 0 deletions executors/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ pub trait Executor: CanExecute + Clone + Send {
// res.log_warn("Result was an error");
// ```
// NOTE: Don't generate docs for this, so the test is never run
#[allow(dead_code)]
pub(crate) trait LogErrors {
fn log_error(self, msg: &str);
fn log_warn(self, msg: &str);
Expand Down
24 changes: 16 additions & 8 deletions executors/src/crossbeam_channel_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ impl CanExecute for ThreadPool {
.unwrap_or_else(|e| error!("Error submitting job: {:?}", e));

#[cfg(feature = "produce-metrics")]
increment_gauge!("executors.jobs_queued", 1.0, "executor" => "crossbeam_channel_pool");
gauge!("executors.jobs_queued", "executor" => "crossbeam_channel_pool").increment(1.0);
} else {
warn!("Ignoring job as pool is shutting down.");
}
Expand Down Expand Up @@ -257,8 +257,14 @@ impl Executor for ThreadPool {

#[cfg(feature = "produce-metrics")]
fn register_metrics(&self) {
register_counter!("executors.jobs_executed", "The total number of jobs that were executed", "executor" => "crossbeam_channel_pool");
register_gauge!("executors.jobs_queued", "The number of jobs that are currently waiting to be executed", "executor" => "crossbeam_channel_pool");
describe_counter!(
"executors.jobs_executed",
"The total number of jobs that were executed"
);
describe_gauge!(
"executors.jobs_queued",
"The number of jobs that are currently waiting to be executed"
);
}
}

Expand Down Expand Up @@ -359,7 +365,7 @@ impl CanExecute for ThreadLocalExecute {
.unwrap_or_else(|e| error!("Error submitting Job: {:?}", e));

#[cfg(feature = "produce-metrics")]
increment_gauge!("executors.jobs_queued", 1.0, "executor" => "crossbeam_channel_pool");
gauge!("executors.jobs_queued", "executor" => "crossbeam_channel_pool").increment(1.0);
}
}

Expand Down Expand Up @@ -404,8 +410,9 @@ impl ThreadPoolWorker {
{
executed_since_check += 1u64;
if last_metrics_check.elapsed() > METRICS_INTERVAL {
counter!("executors.jobs_executed", executed_since_check, "executor" => "crossbeam_channel_pool", "thread_id" => format!("{}", self.id));
decrement_gauge!("executors.jobs_queued", executed_since_check as f64, "executor" => "crossbeam_channel_pool");
counter!("executors.jobs_executed", "executor" => "crossbeam_channel_pool", "thread_id" => format!("{}", self.id)).increment(executed_since_check);
gauge!("executors.jobs_queued", "executor" => "crossbeam_channel_pool")
.decrement(executed_since_check as f64);
executed_since_check = 0u64;
last_metrics_check = std::time::Instant::now();
}
Expand All @@ -418,8 +425,9 @@ impl ThreadPoolWorker {
{
executed_since_check += 1u64;
if last_metrics_check.elapsed() > METRICS_INTERVAL {
counter!("executors.jobs_executed", executed_since_check, "executor" => "crossbeam_channel_pool", "thread_id" => format!("{}", self.id));
decrement_gauge!("executors.jobs_queued", executed_since_check as f64, "executor" => "crossbeam_channel_pool");
counter!("executors.jobs_executed", "executor" => "crossbeam_channel_pool", "thread_id" => format!("{}", self.id)).increment(executed_since_check);
gauge!("executors.jobs_queued", "executor" => "crossbeam_channel_pool")
.decrement(executed_since_check as f64);
executed_since_check = 0u64;
last_metrics_check = std::time::Instant::now();
}
Expand Down
60 changes: 36 additions & 24 deletions executors/src/crossbeam_workstealing_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
//! There are two ways of managing fairness between local and global queues:
//!
//! - Timeout based: Check global queue at most every 1ms
//! (activated via the `ws-timed-fairness` feature, which is in the
//! default feature set)
//! (activated via the `ws-timed-fairness` feature, which is in the
//! default feature set)
//! - Job count based: Check global every 100 local jobs
//! (used if the `ws-timed-fairness` feature is disabled)
//! (used if the `ws-timed-fairness` feature is disabled)
//!
//! Timeout based fairness is more predictable, as it is less dependent
//! on the job execution time. But if a platform implementation of
Expand Down Expand Up @@ -166,7 +166,8 @@ const MAX_WAIT_SHUTDOWN_MS: u64 = 5 * (timeconstants::MS_PER_S as u64);
// UnsafeCell has 10x the performance of RefCell
// and the scoping guarantees that the borrows are exclusive
thread_local!(
static LOCAL_JOB_QUEUE: UnsafeCell<Option<deque::Worker<Job>>> = UnsafeCell::new(Option::None);
static LOCAL_JOB_QUEUE: UnsafeCell<Option<deque::Worker<Job>>> =
const { UnsafeCell::new(Option::None) };
);

/// Try to append the job to the thread-local job queue
Expand All @@ -184,7 +185,8 @@ where
q.push(msg);

#[cfg(feature = "produce-metrics")]
increment_gauge!("executors.jobs_queued", 1.0, "executor" => "crossbeam_workstealing_pool");
gauge!("executors.jobs_queued", "executor" => "crossbeam_workstealing_pool")
.increment(1.0);

Ok(())
}
Expand Down Expand Up @@ -413,7 +415,8 @@ where
q.push(msg);

#[cfg(feature = "produce-metrics")]
increment_gauge!("executors.jobs_queued", 1.0, "executor" => "crossbeam_workstealing_pool");
gauge!("executors.jobs_queued", "executor" => "crossbeam_workstealing_pool")
.increment(1.0);
}
None => {
debug!("Scheduling on global pool.");
Expand All @@ -423,7 +426,8 @@ where
self.inner.parker.unpark_one();

#[cfg(feature = "produce-metrics")]
increment_gauge!("executors.jobs_queued", 1.0, "executor" => "crossbeam_workstealing_pool");
gauge!("executors.jobs_queued", "executor" => "crossbeam_workstealing_pool")
.increment(1.0);
}
}
})
Expand Down Expand Up @@ -456,7 +460,7 @@ where
q.push(msg);

#[cfg(feature = "produce-metrics")]
increment_gauge!("executors.jobs_queued", 1.0, "executor" => "crossbeam_workstealing_pool");
gauge!("executors.jobs_queued", "executor" => "crossbeam_workstealing_pool").increment(1.0);
}
None => {
debug!("Scheduling on global pool.");
Expand All @@ -466,7 +470,7 @@ where
self.inner.parker.unpark_one();

#[cfg(feature = "produce-metrics")]
increment_gauge!("executors.jobs_queued", 1.0, "executor" => "crossbeam_workstealing_pool");
gauge!("executors.jobs_queued", "executor" => "crossbeam_workstealing_pool").increment(1.0);
}
}
})
Expand Down Expand Up @@ -525,8 +529,14 @@ where

#[cfg(feature = "produce-metrics")]
fn register_metrics(&self) {
register_counter!("executors.jobs_executed", "The total number of jobs that were executed", "executor" => "crossbeam_workstealing_pool");
register_gauge!("executors.jobs_queued", "The number of jobs that are currently waiting to be executed", "executor" => "crossbeam_workstealing_pool");
describe_counter!(
"executors.jobs_executed",
"The total number of jobs that were executed"
);
describe_gauge!(
"executors.jobs_queued",
"The number of jobs that are currently waiting to be executed"
);
}
}

Expand Down Expand Up @@ -661,8 +671,7 @@ impl ThreadPoolCore {
let stealers: Vec<JobStealer> = self
.workers
.values()
.filter(|w| w.stealer.is_some())
.map(|w| w.stealer.clone().unwrap())
.filter_map(|w| w.stealer.clone())
.collect();
for (wid, worker) in self.workers.iter() {
let l: Vec<JobStealer> = stealers
Expand All @@ -688,7 +697,8 @@ impl CanExecute for ThreadLocalExecute {
q.push(msg);

#[cfg(feature = "produce-metrics")]
increment_gauge!("executors.jobs_queued", 1.0, "executor" => "crossbeam_workstealing_pool");
gauge!("executors.jobs_queued", "executor" => "crossbeam_workstealing_pool")
.increment(1.0);
}
None => {
unreachable!("Local Executor was set but local job queue was not!");
Expand Down Expand Up @@ -850,8 +860,8 @@ where

#[cfg(feature = "produce-metrics")]
{
counter!("executors.jobs_executed", count, "executor" => "crossbeam_workstealing_pool", "thread_id" => format!("{}", self.id()));
decrement_gauge!("executors.jobs_queued", count as f64, "executor" => "crossbeam_workstealing_pool");
counter!("executors.jobs_executed", "executor" => "crossbeam_workstealing_pool", "thread_id" => format!("{}", self.id())).increment(count);
gauge!("executors.jobs_queued", "executor" => "crossbeam_workstealing_pool").decrement(count as f64);
}
} _ => {
panic!("Queue should have been initialised!");
Expand Down Expand Up @@ -897,11 +907,12 @@ where
}
// sometimes try the global queue
let glob_res = LOCAL_JOB_QUEUE.with(|q| unsafe {
match *q.get() { Some(ref local_queue) => {
core.global_sender.steal_batch_and_pop(local_queue)
} _ => {
panic!("Queue should have been initialised!");
}}
match *q.get() {
Some(ref local_queue) => core.global_sender.steal_batch_and_pop(local_queue),
_ => {
panic!("Queue should have been initialised!");
}
}
});
if let deque::Steal::Success(msg) = glob_res {
msg.run();
Expand All @@ -919,8 +930,9 @@ where

#[cfg(feature = "produce-metrics")]
{
increment_counter!("executors.jobs_executed", "executor" => "crossbeam_workstealing_pool", "thread_id" => format!("{}", self.id()));
decrement_gauge!("executors.jobs_queued", 1.0, "executor" => "crossbeam_workstealing_pool");
counter!("executors.jobs_executed", "executor" => "crossbeam_workstealing_pool", "thread_id" => format!("{}", self.id())).increment(1);
gauge!("executors.jobs_queued", "executor" => "crossbeam_workstealing_pool")
.decrement(1.0);
}

continue 'main;
Expand Down Expand Up @@ -1021,7 +1033,7 @@ where
let weight = if let Some(stealer_id) = stealer.core_id() {
distances.distance(my_id, *stealer_id)
} else {
std::i32::MIN
i32::MIN
};
(weight, stealer)
})
Expand Down
2 changes: 1 addition & 1 deletion executors/src/futures_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ mod tests {
}

async fn wait_for_channel(receiver: Receiver<()>, barrier: Arc<AtomicBool>) -> () {
let _ok = receiver.await.expect("message");
receiver.await.expect("message");
let res = barrier.compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst);
assert!(res.is_ok());
}
Expand Down
9 changes: 1 addition & 8 deletions executors/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,7 @@ mod locals;
pub use locals::*;

#[cfg(feature = "produce-metrics")]
use metrics::{
counter,
decrement_gauge,
increment_counter,
increment_gauge,
register_counter,
register_gauge,
};
use metrics::{counter, describe_counter, describe_gauge, gauge};

// #[cfg(feature = "produce-metrics")]
// pub mod metric_keys {
Expand Down
Loading

0 comments on commit d08ebff

Please sign in to comment.