Skip to content

Commit

Permalink
Merge pull request #1493 from microsoft/enhancement-testing-clean-up
Browse files Browse the repository at this point in the history
[testing] Enhancement: Removing duplicate code
  • Loading branch information
iyzhang authored Feb 5, 2025
2 parents 8ceb916 + b488936 commit 8636d46
Showing 1 changed file with 51 additions and 94 deletions.
145 changes: 51 additions & 94 deletions examples/tcp-echo/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,32 +80,15 @@ impl TcpEchoClient {
});
}

/// Runs the target TCP echo client.
pub fn run_sequential(
pub fn run_main_loop(
&mut self,
log_interval: Option<u64>,
nclients: usize,
nrequests: Option<usize>,
connect_handler: fn(&mut TcpEchoClient, &demi_qresult_t) -> Result<()>,
) -> Result<()> {
let mut last_log: Instant = Instant::now();

// Open all connections.
for _ in 0..nclients {
let sockqd: QDesc = self.libos.socket(AF_INET, SOCK_STREAM, 0)?;

self.clients.insert(sockqd, (vec![0; self.bufsize], 0));
let qt: QToken = self.libos.connect(sockqd, self.remote)?;
let qr: demi_qresult_t = self.libos.wait(qt, Some(TIMEOUT_SECONDS))?;
if qr.qr_opcode != demi_opcode_t::DEMI_OPC_CONNECT {
anyhow::bail!("failed to connect to server")
}

println!("INFO: {} clients connected", self.clients.len());

// Push first request.
self.issue_push(sockqd)?;
}

loop {
// Stop: enough packets were echoed.
if let Some(nrequests) = nrequests {
Expand Down Expand Up @@ -152,10 +135,10 @@ impl TcpEchoClient {
demi_opcode_t::DEMI_OPC_PUSH => self.handle_push(&qr)?,
demi_opcode_t::DEMI_OPC_POP => self.handle_pop(&qr)?,
demi_opcode_t::DEMI_OPC_FAILED => self.handle_fail(&qr)?,
demi_opcode_t::DEMI_OPC_INVALID => self.handle_unexpected("invalid", &qr)?,
demi_opcode_t::DEMI_OPC_CLOSE => self.handle_unexpected("close", &qr)?,
demi_opcode_t::DEMI_OPC_CONNECT => self.handle_unexpected("connect", &qr)?,
demi_opcode_t::DEMI_OPC_ACCEPT => self.handle_unexpected("accept", &qr)?,
demi_opcode_t::DEMI_OPC_INVALID => Self::handle_unexpected("invalid", &qr)?,
demi_opcode_t::DEMI_OPC_CLOSE => Self::handle_unexpected("close", &qr)?,
demi_opcode_t::DEMI_OPC_CONNECT => connect_handler(self, &qr)?,
demi_opcode_t::DEMI_OPC_ACCEPT => Self::handle_unexpected("accept", &qr)?,
}
}

Expand All @@ -169,14 +152,44 @@ impl TcpEchoClient {
}

/// Runs the target TCP echo client.
pub fn run_concurrent(
pub fn run_sequential(
&mut self,
log_interval: Option<u64>,
nclients: usize,
nrequests: Option<usize>,
) -> Result<()> {
let mut last_log: Instant = Instant::now();
// Open all connections.
for _ in 0..nclients {
let sockqd: QDesc = self.libos.socket(AF_INET, SOCK_STREAM, 0)?;

self.clients.insert(sockqd, (vec![0; self.bufsize], 0));
let qt: QToken = self.libos.connect(sockqd, self.remote)?;
let qr: demi_qresult_t = self.libos.wait(qt, Some(TIMEOUT_SECONDS))?;
if qr.qr_opcode != demi_opcode_t::DEMI_OPC_CONNECT {
anyhow::bail!("failed to connect to server")
}

println!("INFO: {} clients connected", self.clients.len());

// Push first request.
self.issue_push(sockqd)?;
}

self.run_main_loop(
log_interval,
nclients,
nrequests,
|_: &mut TcpEchoClient, qr: &demi_qresult_t| -> Result<()> { Self::handle_unexpected("connect", qr) },
)
}

/// Runs the target TCP echo client.
pub fn run_concurrent(
&mut self,
log_interval: Option<u64>,
nclients: usize,
nrequests: Option<usize>,
) -> Result<()> {
// Open several connections.
for i in 0..nclients {
let qd: QDesc = self.libos.socket(AF_INET, SOCK_STREAM, 0)?;
Expand Down Expand Up @@ -205,74 +218,7 @@ impl TcpEchoClient {
}
}

loop {
// Stop: enough packets were echoed.
if let Some(nrequests) = nrequests {
if self.nbytes >= nclients * self.bufsize * nrequests {
println!("INFO: stopping, {} bytes transferred", self.nbytes);
break;
}
}

// Stop: all clients were disconnected.
if self.clients.len() == 0 {
println!(
"INFO: stopping, all clients disconnected {} bytes transferred",
self.nbytes
);
break;
}

// Dump statistics.
if let Some(log_interval) = log_interval {
if last_log.elapsed() > Duration::from_secs(log_interval) {
let time_elapsed: f64 = (Instant::now() - last_log).as_secs() as f64;
let rps: f64 = self.nechoed as f64 / time_elapsed;
println!(
"INFO: {:?} requests, {:2?} rps, p50 {:?} ns, p99 {:?} ns",
self.nechoed,
rps,
self.stats.percentile(0.50)?.unwrap().start(),
self.stats.percentile(0.99)?.unwrap().start(),
);
last_log = Instant::now();
self.nechoed = 0;
}
}

let qr: demi_qresult_t = {
let (index, qr): (usize, demi_qresult_t) = self.libos.wait_any(&self.qts, Some(TIMEOUT_SECONDS))?;
self.qts.remove(index);
qr
};

// Parse result.
match qr.qr_opcode {
demi_opcode_t::DEMI_OPC_CONNECT => {
// Register client.
let qd: QDesc = qr.qr_qd.into();
self.clients.insert(qd, (vec![0; self.bufsize], 0));
println!("INFO: {} clients connected", self.clients.len());

// Push first request.
self.issue_push(qd)?;
},
demi_opcode_t::DEMI_OPC_PUSH => self.handle_push(&qr)?,
demi_opcode_t::DEMI_OPC_POP => self.handle_pop(&qr)?,
demi_opcode_t::DEMI_OPC_FAILED => self.handle_fail(&qr)?,
demi_opcode_t::DEMI_OPC_INVALID => self.handle_unexpected("invalid", &qr)?,
demi_opcode_t::DEMI_OPC_CLOSE => self.handle_unexpected("close", &qr)?,
demi_opcode_t::DEMI_OPC_ACCEPT => self.handle_unexpected("accept", &qr)?,
}
}

// Close all connections.
for (qd, _) in self.clients.drain().collect::<Vec<_>>() {
self.libos.close(qd)?;
println!("INFO: {} clients connected", self.clients.len());
}

Ok(())
self.run_main_loop(log_interval, nclients, nrequests, Self::handle_connect)
}

/// Creates a scatter-gather-array.
Expand All @@ -287,6 +233,17 @@ impl TcpEchoClient {
Ok(sga)
}

fn handle_connect(&mut self, qr: &demi_qresult_t) -> Result<()> {
// Register client.
let qd: QDesc = qr.qr_qd.into();
self.clients.insert(qd, (vec![0; self.bufsize], 0));
println!("INFO: {} clients connected", self.clients.len());

// Push first request.
self.issue_push(qd)?;
Ok(())
}

/// Handles the completion of a pop operation.
fn handle_pop(&mut self, qr: &demi_qresult_t) -> Result<()> {
let qd: QDesc = qr.qr_qd.into();
Expand Down Expand Up @@ -355,7 +312,7 @@ impl TcpEchoClient {
}

/// Handles the completion of an unexpected operation.
fn handle_unexpected(&mut self, op_name: &str, qr: &demi_qresult_t) -> Result<()> {
fn handle_unexpected(op_name: &str, qr: &demi_qresult_t) -> Result<()> {
let qd: QDesc = qr.qr_qd.into();
let qt: QToken = qr.qr_qt.into();

Expand Down

0 comments on commit 8636d46

Please sign in to comment.