diff --git a/examples/tcp-echo/client.rs b/examples/tcp-echo/client.rs index c50745d13..5ec995f79 100644 --- a/examples/tcp-echo/client.rs +++ b/examples/tcp-echo/client.rs @@ -80,32 +80,15 @@ impl TcpEchoClient { }); } - /// Runs the target TCP echo client. - pub fn run_sequential( + pub fn run_main_loop( &mut self, log_interval: Option, nclients: usize, nrequests: Option, + connect_handler: fn(&mut TcpEchoClient, &demi_qresult_t) -> Result<()>, ) -> Result<()> { let mut last_log: Instant = Instant::now(); - // Open all connections. - for _ in 0..nclients { - let sockqd: QDesc = self.libos.socket(AF_INET, SOCK_STREAM, 0)?; - - self.clients.insert(sockqd, (vec![0; self.bufsize], 0)); - let qt: QToken = self.libos.connect(sockqd, self.remote)?; - let qr: demi_qresult_t = self.libos.wait(qt, Some(TIMEOUT_SECONDS))?; - if qr.qr_opcode != demi_opcode_t::DEMI_OPC_CONNECT { - anyhow::bail!("failed to connect to server") - } - - println!("INFO: {} clients connected", self.clients.len()); - - // Push first request. - self.issue_push(sockqd)?; - } - loop { // Stop: enough packets were echoed. if let Some(nrequests) = nrequests { @@ -152,10 +135,10 @@ impl TcpEchoClient { demi_opcode_t::DEMI_OPC_PUSH => self.handle_push(&qr)?, demi_opcode_t::DEMI_OPC_POP => self.handle_pop(&qr)?, demi_opcode_t::DEMI_OPC_FAILED => self.handle_fail(&qr)?, - demi_opcode_t::DEMI_OPC_INVALID => self.handle_unexpected("invalid", &qr)?, - demi_opcode_t::DEMI_OPC_CLOSE => self.handle_unexpected("close", &qr)?, - demi_opcode_t::DEMI_OPC_CONNECT => self.handle_unexpected("connect", &qr)?, - demi_opcode_t::DEMI_OPC_ACCEPT => self.handle_unexpected("accept", &qr)?, + demi_opcode_t::DEMI_OPC_INVALID => Self::handle_unexpected("invalid", &qr)?, + demi_opcode_t::DEMI_OPC_CLOSE => Self::handle_unexpected("close", &qr)?, + demi_opcode_t::DEMI_OPC_CONNECT => connect_handler(self, &qr)?, + demi_opcode_t::DEMI_OPC_ACCEPT => Self::handle_unexpected("accept", &qr)?, } } @@ -169,14 +152,44 @@ impl TcpEchoClient { } /// Runs the target TCP echo client. - pub fn run_concurrent( + pub fn run_sequential( &mut self, log_interval: Option, nclients: usize, nrequests: Option, ) -> Result<()> { - let mut last_log: Instant = Instant::now(); + // Open all connections. + for _ in 0..nclients { + let sockqd: QDesc = self.libos.socket(AF_INET, SOCK_STREAM, 0)?; + + self.clients.insert(sockqd, (vec![0; self.bufsize], 0)); + let qt: QToken = self.libos.connect(sockqd, self.remote)?; + let qr: demi_qresult_t = self.libos.wait(qt, Some(TIMEOUT_SECONDS))?; + if qr.qr_opcode != demi_opcode_t::DEMI_OPC_CONNECT { + anyhow::bail!("failed to connect to server") + } + + println!("INFO: {} clients connected", self.clients.len()); + // Push first request. + self.issue_push(sockqd)?; + } + + self.run_main_loop( + log_interval, + nclients, + nrequests, + |_: &mut TcpEchoClient, qr: &demi_qresult_t| -> Result<()> { Self::handle_unexpected("connect", qr) }, + ) + } + + /// Runs the target TCP echo client. + pub fn run_concurrent( + &mut self, + log_interval: Option, + nclients: usize, + nrequests: Option, + ) -> Result<()> { // Open several connections. for i in 0..nclients { let qd: QDesc = self.libos.socket(AF_INET, SOCK_STREAM, 0)?; @@ -205,74 +218,7 @@ impl TcpEchoClient { } } - loop { - // Stop: enough packets were echoed. - if let Some(nrequests) = nrequests { - if self.nbytes >= nclients * self.bufsize * nrequests { - println!("INFO: stopping, {} bytes transferred", self.nbytes); - break; - } - } - - // Stop: all clients were disconnected. - if self.clients.len() == 0 { - println!( - "INFO: stopping, all clients disconnected {} bytes transferred", - self.nbytes - ); - break; - } - - // Dump statistics. - if let Some(log_interval) = log_interval { - if last_log.elapsed() > Duration::from_secs(log_interval) { - let time_elapsed: f64 = (Instant::now() - last_log).as_secs() as f64; - let rps: f64 = self.nechoed as f64 / time_elapsed; - println!( - "INFO: {:?} requests, {:2?} rps, p50 {:?} ns, p99 {:?} ns", - self.nechoed, - rps, - self.stats.percentile(0.50)?.unwrap().start(), - self.stats.percentile(0.99)?.unwrap().start(), - ); - last_log = Instant::now(); - self.nechoed = 0; - } - } - - let qr: demi_qresult_t = { - let (index, qr): (usize, demi_qresult_t) = self.libos.wait_any(&self.qts, Some(TIMEOUT_SECONDS))?; - self.qts.remove(index); - qr - }; - - // Parse result. - match qr.qr_opcode { - demi_opcode_t::DEMI_OPC_CONNECT => { - // Register client. - let qd: QDesc = qr.qr_qd.into(); - self.clients.insert(qd, (vec![0; self.bufsize], 0)); - println!("INFO: {} clients connected", self.clients.len()); - - // Push first request. - self.issue_push(qd)?; - }, - demi_opcode_t::DEMI_OPC_PUSH => self.handle_push(&qr)?, - demi_opcode_t::DEMI_OPC_POP => self.handle_pop(&qr)?, - demi_opcode_t::DEMI_OPC_FAILED => self.handle_fail(&qr)?, - demi_opcode_t::DEMI_OPC_INVALID => self.handle_unexpected("invalid", &qr)?, - demi_opcode_t::DEMI_OPC_CLOSE => self.handle_unexpected("close", &qr)?, - demi_opcode_t::DEMI_OPC_ACCEPT => self.handle_unexpected("accept", &qr)?, - } - } - - // Close all connections. - for (qd, _) in self.clients.drain().collect::>() { - self.libos.close(qd)?; - println!("INFO: {} clients connected", self.clients.len()); - } - - Ok(()) + self.run_main_loop(log_interval, nclients, nrequests, Self::handle_connect) } /// Creates a scatter-gather-array. @@ -287,6 +233,17 @@ impl TcpEchoClient { Ok(sga) } + fn handle_connect(&mut self, qr: &demi_qresult_t) -> Result<()> { + // Register client. + let qd: QDesc = qr.qr_qd.into(); + self.clients.insert(qd, (vec![0; self.bufsize], 0)); + println!("INFO: {} clients connected", self.clients.len()); + + // Push first request. + self.issue_push(qd)?; + Ok(()) + } + /// Handles the completion of a pop operation. fn handle_pop(&mut self, qr: &demi_qresult_t) -> Result<()> { let qd: QDesc = qr.qr_qd.into(); @@ -355,7 +312,7 @@ impl TcpEchoClient { } /// Handles the completion of an unexpected operation. - fn handle_unexpected(&mut self, op_name: &str, qr: &demi_qresult_t) -> Result<()> { + fn handle_unexpected(op_name: &str, qr: &demi_qresult_t) -> Result<()> { let qd: QDesc = qr.qr_qd.into(); let qt: QToken = qr.qr_qt.into();