From 20720881e9862d5c842b2a91b77a382796823f7e Mon Sep 17 00:00:00 2001 From: Santiago Pittella <87827390+SantiagoPittella@users.noreply.github.com> Date: Thu, 2 Jan 2025 14:47:31 -0300 Subject: [PATCH] feat: add error for tx prover proxy (#1038) --- Cargo.lock | 1 + bin/tx-prover/Cargo.toml | 1 + bin/tx-prover/src/commands/proxy.rs | 16 +++++++++++++--- bin/tx-prover/src/error.rs | 23 +++++++++++++++++++++++ bin/tx-prover/src/main.rs | 1 + bin/tx-prover/src/proxy/mod.rs | 15 ++++++++++----- bin/tx-prover/src/proxy/worker.rs | 9 +++++++-- bin/tx-prover/src/utils.rs | 17 +++++++++++------ 8 files changed, 67 insertions(+), 16 deletions(-) create mode 100644 bin/tx-prover/src/error.rs diff --git a/Cargo.lock b/Cargo.lock index 6483e17ae..b04af2dc0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2043,6 +2043,7 @@ dependencies = [ "reqwest", "serde", "serde_qs", + "thiserror 2.0.3", "tokio", "tokio-stream", "toml", diff --git a/bin/tx-prover/Cargo.toml b/bin/tx-prover/Cargo.toml index 174b3c8eb..1a02075bb 100644 --- a/bin/tx-prover/Cargo.toml +++ b/bin/tx-prover/Cargo.toml @@ -61,6 +61,7 @@ serde_qs = { version = "0.13" } tokio = { version = "1.38", optional = true, features = ["full"] } tokio-stream = { version = "0.1", optional = true, features = [ "net" ]} toml = { version = "0.8" } +thiserror = { workspace = true } tonic-health = { version = "0.12" } tonic-web = { version = "0.12", optional = true } tracing = { version = "0.1", optional = true } diff --git a/bin/tx-prover/src/commands/proxy.rs b/bin/tx-prover/src/commands/proxy.rs index b0a9edd31..12c06c87a 100644 --- a/bin/tx-prover/src/commands/proxy.rs +++ b/bin/tx-prover/src/commands/proxy.rs @@ -9,6 +9,7 @@ use pingora_proxy::http_proxy_service; use tracing::warn; use crate::{ + error::TxProverServiceError, proxy::{LoadBalancer, LoadBalancerState}, utils::MIDEN_TX_PROVER, }; @@ -30,6 +31,13 @@ impl StartProxy { /// /// This method will first read the config file to get the parameters for the proxy. It will /// then start a proxy with each worker passed as command argument as a backend. + /// + /// # Errors + /// Returns an error in the following cases: + /// - The config file cannot be read. + /// - The backend cannot be created. + /// - The Pingora configuration fails. + /// - The server cannot be started. #[tracing::instrument(target = MIDEN_TX_PROVER, name = "proxy:execute")] pub async fn execute(&self) -> Result<(), String> { let mut server = Server::new(Some(Opt::default())).map_err(|err| err.to_string())?; @@ -40,8 +48,8 @@ impl StartProxy { let workers = self .workers .iter() - .map(|worker| Backend::new(worker).map_err(|err| err.to_string())) - .collect::, String>>()?; + .map(|worker| Backend::new(worker).map_err(TxProverServiceError::BackendCreationFailed)) + .collect::, TxProverServiceError>>()?; if workers.is_empty() { warn!("Starting the proxy without any workers"); @@ -58,7 +66,9 @@ impl StartProxy { let proxy_host = proxy_config.host; let proxy_port = proxy_config.port.to_string(); lb.add_tcp(format!("{}:{}", proxy_host, proxy_port).as_str()); - let logic = lb.app_logic_mut().ok_or("Failed to get app logic")?; + let logic = lb + .app_logic_mut() + .ok_or(TxProverServiceError::PingoraConfigFailed("app logic not found".to_string()))?; let mut http_server_options = HttpServerOptions::default(); // Enable HTTP/2 for plaintext diff --git a/bin/tx-prover/src/error.rs b/bin/tx-prover/src/error.rs new file mode 100644 index 000000000..d69136213 --- /dev/null +++ b/bin/tx-prover/src/error.rs @@ -0,0 +1,23 @@ +use axum::http::uri::InvalidUri; +use thiserror::Error; + +// TX PROVER SERVICE ERROR +// ================================================================================================ + +#[derive(Debug, Error)] +pub enum TxProverServiceError { + #[error("invalid uri {1}")] + InvalidURI(#[source] InvalidUri, String), + #[error("failed to connect to worker {1}")] + ConnectionFailed(#[source] tonic::transport::Error, String), + #[error("failed to create backend for worker")] + BackendCreationFailed(#[source] Box), + #[error("failed to setup pingora: {0}")] + PingoraConfigFailed(String), +} + +impl From for String { + fn from(err: TxProverServiceError) -> Self { + err.to_string() + } +} diff --git a/bin/tx-prover/src/main.rs b/bin/tx-prover/src/main.rs index a9ba6a088..9f560d62e 100644 --- a/bin/tx-prover/src/main.rs +++ b/bin/tx-prover/src/main.rs @@ -1,5 +1,6 @@ pub mod api; pub mod commands; +pub mod error; mod proxy; mod utils; use commands::Cli; diff --git a/bin/tx-prover/src/proxy/mod.rs b/bin/tx-prover/src/proxy/mod.rs index 49ec0d079..73b654588 100644 --- a/bin/tx-prover/src/proxy/mod.rs +++ b/bin/tx-prover/src/proxy/mod.rs @@ -25,6 +25,7 @@ use crate::{ update_workers::{Action, UpdateWorkers}, ProxyConfig, }, + error::TxProverServiceError, utils::{ create_queue_full_response, create_response_with_error_message, create_too_many_requests_response, create_workers_updated_response, MIDEN_TX_PROVER, @@ -36,7 +37,7 @@ mod worker; /// Localhost address const LOCALHOST_ADDR: &str = "127.0.0.1"; -// LOAD BALANCER +// LOAD BALANCER STATE // ================================================================================================ /// Load balancer that uses a round robin strategy @@ -54,11 +55,15 @@ pub struct LoadBalancerState { impl LoadBalancerState { /// Create a new load balancer + /// + /// # Errors + /// Returns an error if: + /// - The worker cannot be created. #[tracing::instrument(name = "proxy:new_load_balancer", skip(initial_workers))] pub async fn new( initial_workers: Vec, config: &ProxyConfig, - ) -> core::result::Result { + ) -> core::result::Result { let mut workers: Vec = Vec::with_capacity(initial_workers.len()); let connection_timeout = Duration::from_secs(config.connection_timeout_secs); @@ -120,7 +125,7 @@ impl LoadBalancerState { pub async fn update_workers( &self, update_workers: UpdateWorkers, - ) -> std::result::Result<(), String> { + ) -> std::result::Result<(), TxProverServiceError> { let mut workers = self.workers.write().await; info!("Current workers: {:?}", workers); @@ -129,7 +134,7 @@ impl LoadBalancerState { .iter() .map(|worker| Backend::new(worker)) .collect::, _>>() - .map_err(|err| format!("Failed to create backend: {}", err))?; + .map_err(TxProverServiceError::BackendCreationFailed)?; let mut native_workers = Vec::new(); @@ -326,7 +331,7 @@ impl RequestContext { } } -// LOAD BALANCER WRAPPER +// LOAD BALANCER // ================================================================================================ /// Wrapper around the load balancer that implements the ProxyHttp trait diff --git a/bin/tx-prover/src/proxy/worker.rs b/bin/tx-prover/src/proxy/worker.rs index f42466df4..0d8cf3e4b 100644 --- a/bin/tx-prover/src/proxy/worker.rs +++ b/bin/tx-prover/src/proxy/worker.rs @@ -7,7 +7,7 @@ use tonic_health::pb::{ }; use tracing::error; -use crate::utils::create_health_check_client; +use crate::{error::TxProverServiceError, utils::create_health_check_client}; // WORKER // ================================================================================================ @@ -24,11 +24,16 @@ pub struct Worker { } impl Worker { + /// Creates a new worker and a gRPC health check client for the given worker address. + /// + /// # Errors + /// - Returns [TxProverServiceError::InvalidURI] if the worker address is invalid. + /// - Returns [TxProverServiceError::ConnectionFailed] if the connection to the worker fails. pub async fn new( worker: Backend, connection_timeout: Duration, total_timeout: Duration, - ) -> Result { + ) -> Result { let health_check_client = create_health_check_client(worker.addr.to_string(), connection_timeout, total_timeout) .await?; diff --git a/bin/tx-prover/src/utils.rs b/bin/tx-prover/src/utils.rs index 5e6c3e2bc..6cfdc2ac8 100644 --- a/bin/tx-prover/src/utils.rs +++ b/bin/tx-prover/src/utils.rs @@ -16,6 +16,8 @@ use tonic::transport::Channel; use tonic_health::pb::health_client::HealthClient; use tracing_subscriber::{layer::SubscriberExt, Registry}; +use crate::error::TxProverServiceError; + pub const MIDEN_TX_PROVER: &str = "miden-tx-prover"; const RESOURCE_EXHAUSTED_CODE: u16 = 8; @@ -158,18 +160,21 @@ pub async fn create_response_with_error_message( /// Create a gRPC [HealthClient] for the given worker address. /// -/// It will panic if the worker URI is invalid. +/// # Errors +/// - [TxProverServiceError::InvalidURI] if the worker address is invalid. +/// - [TxProverServiceError::ConnectionFailed] if the connection to the worker fails. pub async fn create_health_check_client( address: String, connection_timeout: Duration, total_timeout: Duration, -) -> Result, String> { - Channel::from_shared(format!("http://{}", address)) - .map_err(|err| format!("Invalid format for worker URI: {}", err))? +) -> Result, TxProverServiceError> { + let channel = Channel::from_shared(format!("http://{}", address)) + .map_err(|err| TxProverServiceError::InvalidURI(err, address.clone()))? .connect_timeout(connection_timeout) .timeout(total_timeout) .connect() .await - .map(HealthClient::new) - .map_err(|err| format!("Failed to create health check client for worker: {}", err)) + .map_err(|err| TxProverServiceError::ConnectionFailed(err, address))?; + + Ok(HealthClient::new(channel)) }