From e96384eef77999ba6ba7a7d0dab5e84b4bd85c4e Mon Sep 17 00:00:00 2001 From: Arjan Bal Date: Sun, 12 Jan 2025 23:32:52 +0530 Subject: [PATCH] Add test --- .github/workflows/CI.yml | 12 +++ Cargo.toml | 2 +- benchmark/Cargo.toml | 8 ++ benchmark/src/main.rs | 156 --------------------------------------- benchmark/src/server.rs | 17 +---- benchmark/test.sh | 34 +++++++++ 6 files changed, 58 insertions(+), 171 deletions(-) delete mode 100644 benchmark/src/main.rs create mode 100755 benchmark/test.sh diff --git a/.github/workflows/CI.yml b/.github/workflows/CI.yml index d7ce9c95c..fe02dfee5 100644 --- a/.github/workflows/CI.yml +++ b/.github/workflows/CI.yml @@ -135,6 +135,18 @@ jobs: run: ./interop/test.sh --use_tls tls_rustls shell: bash + benchmark: + name: Benchmark Smoke Test + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: hecrj/setup-rust-action@v2 + - uses: taiki-e/install-action@protoc + - uses: Swatinem/rust-cache@v2 + - name: Run smoke test + run: ./benchmark/test.sh + shell: bash + semver: runs-on: ubuntu-latest steps: diff --git a/Cargo.toml b/Cargo.toml index ebdf3fa79..b025b1a18 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,6 @@ members = [ "tests/default_stubs", "tests/deprecated_methods", "tests/skip_debug", - "benchmark", + "benchmark", # Tests ] resolver = "2" diff --git a/benchmark/Cargo.toml b/benchmark/Cargo.toml index 0d2c4dcec..fab5ec9d1 100644 --- a/benchmark/Cargo.toml +++ b/benchmark/Cargo.toml @@ -5,6 +5,14 @@ edition = "2021" publish = false license = "MIT/Apache-2.0" +[[bin]] +name = "worker" +path = "src/bin/worker.rs" + +[[bin]] +name = "tester" +path = "src/bin/tester.rs" + [dependencies] clap = { version = "4.5.21", features = ["derive"] } num_cpus = "1.16.0" diff --git a/benchmark/src/main.rs b/benchmark/src/main.rs deleted file mode 100644 index a3aca5a78..000000000 --- a/benchmark/src/main.rs +++ /dev/null @@ -1,156 +0,0 @@ -#![recursion_limit = "1024"] - -use std::{pin::Pin, time::Duration}; - -use benchmark::server::BenchmarkServer; -use benchmark::worker::{ - server_args, worker_service_server::WorkerService, worker_service_server::WorkerServiceServer, - ClientArgs, ClientStatus, CoreRequest, CoreResponse, ServerArgs, ServerStatus, Void, -}; -use clap::Parser; -use tokio::{sync::mpsc, time}; -use tokio_stream::{Stream, StreamExt}; -use tonic::{transport::Server, Response, Status}; - -#[derive(Parser, Debug)] -struct Args { - /// Port to start load servers on, if not specified by the server config - #[arg(long = "server_port")] - server_port: Option, - /// Port to expose grpc.testing.WorkerService, Used by driver to initiate work. - #[arg(long = "driver_port")] - driver_port: u16, -} - -#[derive(Debug)] -struct DriverService { - shutdown_channel: mpsc::Sender<()>, -} - -#[tonic::async_trait] -impl WorkerService for DriverService { - // Server streaming response type for the RunServer method. - type RunServerStream = - Pin> + Send + 'static>>; - - async fn run_server( - &self, - request: tonic::Request>, - ) -> std::result::Result, Status> { - println!("Handling server stream."); - let mut stream = request.into_inner(); - - let output = async_stream::try_stream! { - let mut benchmark_server: Option = None; - while let Some(request) = stream.next().await { - let request = request?; - let mut reset_stats = false; - let argtype = request.argtype - .ok_or(Status::invalid_argument("missing request.argtype"))?; - match argtype { - server_args::Argtype::Setup(server_config) => { - println!("Server creation requested."); - if let Some(mut server) = benchmark_server.take() { - println!("server setup received when server already exists, shutting down the existing server"); - } - match BenchmarkServer::start(server_config) { - Ok(server) => { - benchmark_server = Some(server); - }, - Err(status) => { - println!("Error while creating server: {:?}", status); - Err(status)?; - } - } - }, - server_args::Argtype::Mark(mark) => { - println!("Server stats requested."); - benchmark_server.as_ref() - .ok_or(Status::invalid_argument("server does not exist when mark received"))?; - reset_stats = mark.reset; - } - }; - let server = benchmark_server.as_mut().unwrap(); - let stats = server.get_stats(reset_stats)?; - yield ServerStatus { - stats: Some(stats), - cores: num_cpus::get() as i32, - port: server.port as i32, - }; - } - }; - - Ok(Response::new(Box::pin(output) as Self::RunServerStream)) - } - - type RunClientStream = - Pin> + Send + 'static>>; - - async fn run_client( - &self, - _request: tonic::Request>, - ) -> std::result::Result, Status> { - println!("Handling client stream."); - todo!() - } - - async fn core_count( - &self, - _request: tonic::Request, - ) -> std::result::Result, Status> { - return Ok(Response::new(CoreResponse { - cores: num_cpus::get() as i32, - })); - } - - async fn quit_worker( - &self, - _request: tonic::Request, - ) -> std::result::Result, Status> { - match self.shutdown_channel.send(()).await { - Ok(()) => Ok(Response::new(Void {})), - Err(err) => Err(Status::internal(format!("failed to stop server: {}", err))), - } - } -} - -async fn run_worker() -> Result<(), Box> { - let args = Args::parse(); - println!("{:?}", args); - - let addr = format!("0.0.0.0:{}", args.driver_port).parse().unwrap(); - let (tx, mut rx) = mpsc::channel(1); - - let svc = WorkerServiceServer::new(DriverService { - shutdown_channel: tx, - }); - - Server::builder() - .add_service(svc) - .serve_with_shutdown(addr, async { - rx.recv().await; - // Wait for the quit_worker response to be sent. - time::sleep(Duration::from_secs(1)).await; - }) - .await?; - - Ok(()) -} - -fn main() -> Result<(), Box> { - // TODO: Figure out a way to set thread count based on client/server - // configs, possibly by using separate runtimes for the worker and - // client/server. - // Tests run on k8s use specific machine sizes and don't depend on the - // clients/servers to restrict their resource usage. - let core_count = num_cpus::get(); - println!("Creating a runtime with {} threads", core_count); - let runtime = tokio::runtime::Builder::new_multi_thread() - .thread_name("worker-pool") - .worker_threads(core_count) - .enable_all() - .build()?; - - runtime.block_on(run_worker())?; - Ok(()) -} diff --git a/benchmark/src/server.rs b/benchmark/src/server.rs index 5377dbb75..020796ced 100644 --- a/benchmark/src/server.rs +++ b/benchmark/src/server.rs @@ -119,10 +119,7 @@ impl BenchmarkServer { time_elapsed: wall_time_elapsed.as_nanos() as f64 / 1e9, time_user: user_time.num_nanoseconds() as f64 / 1e9, time_system: system_time.num_nanoseconds() as f64 / 1e9, - // The following fields are not set by Java and Go. - idle_cpu_time: 0, - cq_poll_count: 0, - total_cpu_time: 0, + ..Default::default() }); } } @@ -141,11 +138,7 @@ impl crate::protobuf_benchmark_service::benchmark_service_server::BenchmarkServi r#type: PayloadType::Compressable as i32, body: vec![0; request.into_inner().response_size as usize], }), - username: String::new(), - oauth_scope: String::new(), - server_id: String::new(), - grpclb_route_type: 0, - hostname: String::new(), + ..Default::default() })) } @@ -166,11 +159,7 @@ impl crate::protobuf_benchmark_service::benchmark_service_server::BenchmarkServi r#type: PayloadType::Compressable as i32, body: vec![0; request.response_size as usize], }), - username: String::new(), - oauth_scope: String::new(), - server_id: String::new(), - grpclb_route_type: 0, - hostname: String::new(), + ..Default::default() }; } }; diff --git a/benchmark/test.sh b/benchmark/test.sh new file mode 100755 index 000000000..e27d4cb57 --- /dev/null +++ b/benchmark/test.sh @@ -0,0 +1,34 @@ +#!/usr/bin/env bash + +set -eu +set -o pipefail + +set -x + +echo "Running for OS: ${OSTYPE}" + +case "$OSTYPE" in + darwin*) OS="darwin"; EXT="" ;; + linux*) OS="linux"; EXT="" ;; + msys*) OS="windows"; EXT=".exe" ;; + *) exit 2 ;; +esac + +(cd benchmark && cargo build --bins) + +WORKER_PORT=50056 + +# run the worker. +./target/debug/worker --driver_port="${WORKER_PORT}" & +WORKER_PID=$! +echo ":; started worker on port ${WORKER_PORT}." + +# trap exits to make sure we kill the worker process when the script exits, +# regardless of why (errors, SIGTERM, etc). +trap 'echo ":; killing worker"; kill ${WORKER_PID};' EXIT + +sleep 1 + +# run the tester. +echo ":; starting tester." +./target/debug/tester --worker_port="${WORKER_PORT}"