diff --git a/src/client.rs b/src/client.rs index 96c0a048..9208ec97 100644 --- a/src/client.rs +++ b/src/client.rs @@ -5,6 +5,7 @@ use hyper_util::rt::{TokioExecutor, TokioIo}; use rand::prelude::*; use std::{ borrow::Cow, + io::Write, sync::{ atomic::{AtomicBool, Ordering::Relaxed}, Arc, @@ -965,15 +966,14 @@ fn set_start_latency_correction( } } -/// Run n tasks by m workers -pub async fn work_debug(client: Arc) -> Result<(), ClientError> { +pub async fn work_debug(w: &mut W, client: Arc) -> Result<(), ClientError> { let mut rng = StdRng::from_entropy(); let url = client.url_generator.generate(&mut rng)?; - println!("URL: {}", url); + writeln!(w, "URL: {}", url)?; let request = client.request(&url)?; - println!("{:#?}", request); + writeln!(w, "{:#?}", request)?; let response = if client.is_work_http2() { let (_, mut client_state) = client.connect_http2(&url, &mut rng).await?; @@ -989,7 +989,7 @@ pub async fn work_debug(client: Arc) -> Result<(), ClientError> { let response = http::Response::from_parts(parts, body); - println!("{:#?}", response); + writeln!(w, "{:#?}", response)?; Ok(()) } diff --git a/src/main.rs b/src/main.rs index cb4353f7..4fc32f29 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,7 +8,7 @@ use hyper::http::{ self, header::{HeaderName, HeaderValue}, }; -use printer::PrintMode; +use printer::{PrintConfig, PrintMode}; use rand::prelude::*; use rand_regex::Regex; use ratatui::crossterm; @@ -17,7 +17,7 @@ use std::{ env, fs::File, io::{BufRead, Read}, - path::Path, + path::{Path, PathBuf}, pin::Pin, str::FromStr, sync::Arc, @@ -248,6 +248,12 @@ Note: If qps is specified, burst will be ignored", help = "Perform a single request and dump the request and response" )] debug: bool, + #[arg( + help = "Output file to write the results to. If not specified, results are written to stdout.", + long, + short + )] + output: Option, } /// An entry specified by `connect-to` to override DNS resolution and default @@ -505,12 +511,6 @@ async fn run() -> anyhow::Result<()> { _ => None, }; - let print_mode = if opts.json { - PrintMode::Json - } else { - PrintMode::Text - }; - let ip_strategy = match (opts.ipv4, opts.ipv6) { (false, false) => Default::default(), (true, false) => hickory_resolver::config::LookupIpStrategy::Ipv4Only, @@ -561,6 +561,30 @@ async fn run() -> anyhow::Result<()> { let no_tui = opts.no_tui || !std::io::stdout().is_tty() || opts.debug; + let print_config = { + let mode = if opts.json { + PrintMode::Json + } else { + PrintMode::Text + }; + + let disable_style = + opts.disable_color || !std::io::stdout().is_tty() || opts.output.is_some(); + + let output: Box = if let Some(output) = opts.output { + Box::new(File::create(output)?) + } else { + Box::new(std::io::stdout()) + }; + + PrintConfig { + mode, + output, + disable_style, + stats_success_breakdown: opts.stats_success_breakdown, + } + }; + // When panics, reset terminal mode and exit immediately. std::panic::set_hook(Box::new(move |info| { if !no_tui { @@ -575,190 +599,207 @@ async fn run() -> anyhow::Result<()> { let start = std::time::Instant::now(); - let res = match work_mode { - WorkMode::FixedNumber { - n_requests, - n_connections, - n_http2_parallel, - query_limit: None, - latency_correction: _, - } if no_tui => { - // Use optimized worker of no_tui mode. - let (result_tx, result_rx) = flume::unbounded(); - - client::fast::work( - client.clone(), - result_tx, + let data_collect_future: Pin>> = + match work_mode { + WorkMode::Debug => { + let mut print_config = print_config; + if let Err(e) = client::work_debug(&mut print_config.output, client).await { + eprintln!("{e}"); + } + std::process::exit(libc::EXIT_SUCCESS) + } + WorkMode::FixedNumber { n_requests, n_connections, n_http2_parallel, - ) - .await; + query_limit: None, + latency_correction: _, + } if no_tui => { + // Use optimized worker of no_tui mode. + let (result_tx, result_rx) = flume::unbounded(); + + client::fast::work( + client.clone(), + result_tx, + n_requests, + n_connections, + n_http2_parallel, + ) + .await; - Box::pin(async move { - let mut res = ResultData::default(); - while let Ok(r) = result_rx.recv() { - res.merge(r); - } - res - }) as Pin>> - } - WorkMode::Until { - duration, - n_connections, - n_http2_parallel, - query_limit: None, - latency_correction: _, - wait_ongoing_requests_after_deadline, - } if no_tui => { - // Use optimized worker of no_tui mode. - let (result_tx, result_rx) = flume::unbounded(); - - client::fast::work_until( - client.clone(), - result_tx, - start + duration, + Box::pin(async move { + let mut res = ResultData::default(); + while let Ok(r) = result_rx.recv() { + res.merge(r); + } + (res, print_config) + }) + } + WorkMode::Until { + duration, n_connections, n_http2_parallel, + query_limit: None, + latency_correction: _, wait_ongoing_requests_after_deadline, - ) - .await; - - Box::pin(async move { - let mut res = ResultData::default(); - while let Ok(r) = result_rx.recv() { - res.merge(r); - } - res - }) as Pin>> - } - mode => { - let (result_tx, result_rx) = flume::unbounded(); - let data_collector = if no_tui { - // When `--no-tui` is enabled, just collect all data. - - let result_rx_ctrl_c = result_rx.clone(); - tokio::spawn(async move { - let _ = tokio::signal::ctrl_c().await; - let mut all: ResultData = Default::default(); - for report in result_rx_ctrl_c.drain() { - all.push(report); - } - let _ = printer::print_result( - &mut std::io::stdout(), - print_mode, - start, - &all, - start.elapsed(), - opts.disable_color, - opts.stats_success_breakdown, - ); - std::process::exit(libc::EXIT_SUCCESS); - }); + } if no_tui => { + // Use optimized worker of no_tui mode. + let (result_tx, result_rx) = flume::unbounded(); + + client::fast::work_until( + client.clone(), + result_tx, + start + duration, + n_connections, + n_http2_parallel, + wait_ongoing_requests_after_deadline, + ) + .await; Box::pin(async move { - let mut all = ResultData::default(); - while let Ok(res) = result_rx.recv() { - all.push(res); - } - all - }) as Pin>> - } else { - // Spawn monitor future which draws realtime tui - let join_handle = tokio::spawn( - monitor::Monitor { - print_mode, - end_line: opts - .duration - .map(|d| monitor::EndLine::Duration(d.into())) - .unwrap_or(monitor::EndLine::NumQuery(opts.n_requests)), - report_receiver: result_rx, - start, - fps: opts.fps, - disable_color: opts.disable_color, - stats_success_breakdown: opts.stats_success_breakdown, + let mut res = ResultData::default(); + while let Ok(r) = result_rx.recv() { + res.merge(r); } - .monitor(), - ); + (res, print_config) + }) + } + mode => { + let (result_tx, result_rx) = flume::unbounded(); + let data_collector = if no_tui { + // When `--no-tui` is enabled, just collect all data. + + let token = tokio_util::sync::CancellationToken::new(); + let result_rx_ctrl_c = result_rx.clone(); + let token_ctrl_c = token.clone(); + let ctrl_c = tokio::spawn(async move { + tokio::select! { + _ = tokio::signal::ctrl_c() => { + let mut all: ResultData = Default::default(); + for report in result_rx_ctrl_c.drain() { + all.push(report); + } + let _ = printer::print_result(print_config, start, &all, start.elapsed()); + std::process::exit(libc::EXIT_SUCCESS); + } + _ = token_ctrl_c.cancelled() => { + print_config + } - Box::pin(async { join_handle.await.unwrap().unwrap() }) - as Pin>> - }; + } + }); + + Box::pin(async move { + token.cancel(); + let config = ctrl_c.await.unwrap(); + let mut all = ResultData::default(); + while let Ok(res) = result_rx.recv() { + all.push(res); + } + (all, config) + }) + as Pin>> + } else { + // Spawn monitor future which draws realtime tui + let join_handle = tokio::spawn( + monitor::Monitor { + print_config, + end_line: opts + .duration + .map(|d| monitor::EndLine::Duration(d.into())) + .unwrap_or(monitor::EndLine::NumQuery(opts.n_requests)), + report_receiver: result_rx, + start, + fps: opts.fps, + disable_color: opts.disable_color, + } + .monitor(), + ); - match mode { - WorkMode::Debug => { - if let Err(e) = client::work_debug(client).await { - eprintln!("{e}"); - } - std::process::exit(libc::EXIT_SUCCESS) - } - WorkMode::FixedNumber { - n_requests, - n_connections, - n_http2_parallel, - query_limit, - latency_correction, - } => { - if let Some(query_limit) = query_limit { - if latency_correction { - client::work_with_qps( - client.clone(), - result_tx, - query_limit, - n_requests, - n_connections, - n_http2_parallel, - ) - .await; + Box::pin(async { join_handle.await.unwrap().unwrap() }) + as Pin>> + }; + + match mode { + WorkMode::Debug => unreachable!("Must be already handled"), + WorkMode::FixedNumber { + n_requests, + n_connections, + n_http2_parallel, + query_limit, + latency_correction, + } => { + if let Some(query_limit) = query_limit { + if latency_correction { + client::work_with_qps( + client.clone(), + result_tx, + query_limit, + n_requests, + n_connections, + n_http2_parallel, + ) + .await; + } else { + client::work_with_qps_latency_correction( + client.clone(), + result_tx, + query_limit, + n_requests, + n_connections, + n_http2_parallel, + ) + .await; + } } else { - client::work_with_qps_latency_correction( + client::work( client.clone(), result_tx, - query_limit, n_requests, n_connections, n_http2_parallel, ) .await; } - } else { - client::work( - client.clone(), - result_tx, - n_requests, - n_connections, - n_http2_parallel, - ) - .await; } - } - WorkMode::Until { - duration, - n_connections, - n_http2_parallel, - query_limit, - latency_correction, - wait_ongoing_requests_after_deadline, - } => { - if let Some(query_limit) = query_limit { - if latency_correction { - client::work_until_with_qps_latency_correction( - client.clone(), - result_tx, - query_limit, - start, - start + duration, - n_connections, - n_http2_parallel, - wait_ongoing_requests_after_deadline, - ) - .await; + WorkMode::Until { + duration, + n_connections, + n_http2_parallel, + query_limit, + latency_correction, + wait_ongoing_requests_after_deadline, + } => { + if let Some(query_limit) = query_limit { + if latency_correction { + client::work_until_with_qps_latency_correction( + client.clone(), + result_tx, + query_limit, + start, + start + duration, + n_connections, + n_http2_parallel, + wait_ongoing_requests_after_deadline, + ) + .await; + } else { + client::work_until_with_qps( + client.clone(), + result_tx, + query_limit, + start, + start + duration, + n_connections, + n_http2_parallel, + wait_ongoing_requests_after_deadline, + ) + .await; + } } else { - client::work_until_with_qps( + client::work_until( client.clone(), result_tx, - query_limit, - start, start + duration, n_connections, n_http2_parallel, @@ -766,36 +807,17 @@ async fn run() -> anyhow::Result<()> { ) .await; } - } else { - client::work_until( - client.clone(), - result_tx, - start + duration, - n_connections, - n_http2_parallel, - wait_ongoing_requests_after_deadline, - ) - .await; } } - } - data_collector - } - }; + data_collector + } + }; let duration = start.elapsed(); - let res = res.await; - - printer::print_result( - &mut std::io::stdout(), - print_mode, - start, - &res, - duration, - opts.disable_color, - opts.stats_success_breakdown, - )?; + let (res, print_config) = data_collect_future.await; + + printer::print_result(print_config, start, &res, duration)?; if let Some(db_url) = opts.db_url { eprintln!("Storing results to {db_url}"); diff --git a/src/monitor.rs b/src/monitor.rs index 79501a5f..b046267b 100644 --- a/src/monitor.rs +++ b/src/monitor.rs @@ -17,7 +17,7 @@ use std::{collections::BTreeMap, io}; use crate::{ client::{ClientError, RequestResult}, - printer::PrintMode, + printer::PrintConfig, result_data::{MinMaxMean, ResultData}, timescale::{TimeLabel, TimeScale}, }; @@ -53,7 +53,7 @@ impl ColorScheme { } pub struct Monitor { - pub print_mode: PrintMode, + pub print_config: PrintConfig, pub end_line: EndLine, /// All workers sends each result to this channel pub report_receiver: flume::Receiver>, @@ -62,7 +62,6 @@ pub struct Monitor { // Frame per second of TUI pub fps: usize, pub disable_color: bool, - pub stats_success_breakdown: bool, } struct IntoRawMode; @@ -85,7 +84,7 @@ impl Drop for IntoRawMode { } impl Monitor { - pub async fn monitor(self) -> Result { + pub async fn monitor(self) -> Result<(ResultData, PrintConfig), std::io::Error> { let raw_mode = IntoRawMode::new()?; let mut terminal = { @@ -434,13 +433,10 @@ impl Monitor { }) => { drop(raw_mode); let _ = crate::printer::print_result( - &mut std::io::stdout(), - self.print_mode, + self.print_config, self.start, &all, now - self.start, - self.disable_color, - self.stats_success_breakdown, ); std::process::exit(libc::EXIT_SUCCESS); } @@ -454,6 +450,6 @@ impl Monitor { tokio::time::sleep(per_frame - elapsed).await; } } - Ok(all) + Ok((all, self.print_config)) } } diff --git a/src/printer.rs b/src/printer.rs index a5bc9c45..19b73ae1 100644 --- a/src/printer.rs +++ b/src/printer.rs @@ -12,17 +12,21 @@ use std::{ #[derive(Clone, Copy)] struct StyleScheme { - color_enabled: bool, + style_enabled: bool, } impl StyleScheme { - fn no_color(self, text: &str) -> StyledContent<&str> { - text.reset() + fn no_style(self, text: &str) -> StyledContent<&str> { + StyledContent::new(crossterm::style::ContentStyle::new(), text) } fn heading(self, text: &str) -> StyledContent<&str> { - text.bold().underlined() + if self.style_enabled { + text.bold().underlined() + } else { + self.no_style(text) + } } fn success_rate(self, text: &str, success_rate: f64) -> StyledContent<&str> { - if self.color_enabled { + if self.style_enabled { if success_rate >= 100.0 { text.green().bold() } else if success_rate >= 99.0 { @@ -31,28 +35,28 @@ impl StyleScheme { text.red().bold() } } else { - self.no_color(text).bold() + self.no_style(text) } } fn fastest(self, text: &str) -> StyledContent<&str> { - if self.color_enabled { + if self.style_enabled { text.green() } else { - self.no_color(text) + self.no_style(text) } } fn slowest(self, text: &str) -> StyledContent<&str> { - if self.color_enabled { + if self.style_enabled { text.yellow() } else { - self.no_color(text) + self.no_style(text) } } fn average(self, text: &str) -> StyledContent<&str> { - if self.color_enabled { + if self.style_enabled { text.cyan() } else { - self.no_color(text) + self.no_style(text) } } @@ -61,7 +65,7 @@ impl StyleScheme { const LATENCY_YELLOW_THRESHOLD: f64 = 0.1; const LATENCY_RED_THRESHOLD: f64 = 0.4; - if self.color_enabled { + if self.style_enabled { if label <= LATENCY_YELLOW_THRESHOLD { text.green() } else if label <= LATENCY_RED_THRESHOLD { @@ -70,12 +74,12 @@ impl StyleScheme { text.red() } } else { - self.no_color(text) + self.no_style(text) } } fn status_distribution(self, text: &str, status: StatusCode) -> StyledContent<&str> { - if self.color_enabled { + if self.style_enabled { if status.is_success() { text.green() } else if status.is_client_error() { @@ -86,35 +90,45 @@ impl StyleScheme { text.white() } } else { - self.no_color(text) + self.no_style(text) } } } -#[derive(Clone, Copy)] +#[derive(Clone, Copy, Debug)] pub enum PrintMode { Text, Json, } -pub fn print_result( - w: &mut W, - mode: PrintMode, +pub struct PrintConfig { + pub output: Box, + pub mode: PrintMode, + pub disable_style: bool, + pub stats_success_breakdown: bool, +} + +pub fn print_result( + mut config: PrintConfig, start: Instant, res: &ResultData, total_duration: Duration, - disable_color: bool, - stats_success_breakdown: bool, ) -> anyhow::Result<()> { - match mode { + match config.mode { PrintMode::Text => print_summary( - w, + &mut config.output, + res, + total_duration, + config.disable_style, + config.stats_success_breakdown, + )?, + PrintMode::Json => print_json( + &mut config.output, + start, res, total_duration, - disable_color, - stats_success_breakdown, + config.stats_success_breakdown, )?, - PrintMode::Json => print_json(w, start, res, total_duration, stats_success_breakdown)?, } Ok(()) } @@ -364,11 +378,11 @@ fn print_summary( w: &mut W, res: &ResultData, total_duration: Duration, - disable_color: bool, + disable_style: bool, stats_success_breakdown: bool, ) -> std::io::Result<()> { let style = StyleScheme { - color_enabled: !disable_color, + style_enabled: !disable_style, }; writeln!(w, "{}", style.heading("Summary:"))?; let success_rate = 100.0 * res.success_rate();