diff --git a/src/client.rs b/src/client.rs index cd9a5b7e..780b74c0 100644 --- a/src/client.rs +++ b/src/client.rs @@ -759,8 +759,8 @@ pub async fn work( let client = Arc::new(client); - let futures = if client.is_http2() { - (0..n_connections) + if client.is_http2() { + let futures = (0..n_connections) .map(|_| { let report_tx = report_tx.clone(); let counter = counter.clone(); @@ -786,35 +786,55 @@ pub async fn work( set_connection_time(&mut res, connection_time); report_tx.send_async(res).await.unwrap(); if is_cancel || is_reconnect { - return is_cancel; + return (false, is_cancel); } } - true + (true, true) }) }) .collect::>(); + let mut connection_gone = false; for f in futures { - if f.await.unwrap_or(true) { - return; + match f.await { + Ok((true, _)) => { + // All works done + return true; + } + Ok((_, is_cancel)) => { + connection_gone |= is_cancel; + } + Err(_) => { + // Unexpected + return false; + } } } + + if connection_gone { + return false; + } } Err(err) => { report_tx.send_async(Err(err)).await.unwrap(); if !(counter.fetch_add(1, Ordering::Relaxed) < n_tasks) { - break; + return true; } } } } }) }) - .collect::>() + .collect::>(); + for f in futures { + if matches!(f.await, Ok(true)) { + break; + } + } } else { - (0..n_connections) + let futures = (0..n_connections) .map(|_| { let report_tx = report_tx.clone(); let counter = counter.clone(); @@ -826,17 +846,19 @@ pub async fn work( let is_cancel = is_too_many_open_files(&res); report_tx.send_async(res).await.unwrap(); if is_cancel { - break; + return false; } } + true }) }) - .collect::>() + .collect::>(); + for f in futures { + if matches!(f.await, Ok(true)) { + break; + } + } }; - - for f in futures { - let _ = f.await; - } } /// n tasks by m workers limit to qps works in a second