Skip to content

Commit

Permalink
feat: add error for tx prover proxy (#1038)
Browse files Browse the repository at this point in the history
  • Loading branch information
SantiagoPittella authored Jan 2, 2025
1 parent 487e8b9 commit 2072088
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 16 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bin/tx-prover/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ serde_qs = { version = "0.13" }
tokio = { version = "1.38", optional = true, features = ["full"] }
tokio-stream = { version = "0.1", optional = true, features = [ "net" ]}
toml = { version = "0.8" }
thiserror = { workspace = true }
tonic-health = { version = "0.12" }
tonic-web = { version = "0.12", optional = true }
tracing = { version = "0.1", optional = true }
Expand Down
16 changes: 13 additions & 3 deletions bin/tx-prover/src/commands/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use pingora_proxy::http_proxy_service;
use tracing::warn;

use crate::{
error::TxProverServiceError,
proxy::{LoadBalancer, LoadBalancerState},
utils::MIDEN_TX_PROVER,
};
Expand All @@ -30,6 +31,13 @@ impl StartProxy {
///
/// This method will first read the config file to get the parameters for the proxy. It will
/// then start a proxy with each worker passed as command argument as a backend.
///
/// # Errors
/// Returns an error in the following cases:
/// - The config file cannot be read.
/// - The backend cannot be created.
/// - The Pingora configuration fails.
/// - The server cannot be started.
#[tracing::instrument(target = MIDEN_TX_PROVER, name = "proxy:execute")]
pub async fn execute(&self) -> Result<(), String> {
let mut server = Server::new(Some(Opt::default())).map_err(|err| err.to_string())?;
Expand All @@ -40,8 +48,8 @@ impl StartProxy {
let workers = self
.workers
.iter()
.map(|worker| Backend::new(worker).map_err(|err| err.to_string()))
.collect::<Result<Vec<Backend>, String>>()?;
.map(|worker| Backend::new(worker).map_err(TxProverServiceError::BackendCreationFailed))
.collect::<Result<Vec<Backend>, TxProverServiceError>>()?;

if workers.is_empty() {
warn!("Starting the proxy without any workers");
Expand All @@ -58,7 +66,9 @@ impl StartProxy {
let proxy_host = proxy_config.host;
let proxy_port = proxy_config.port.to_string();
lb.add_tcp(format!("{}:{}", proxy_host, proxy_port).as_str());
let logic = lb.app_logic_mut().ok_or("Failed to get app logic")?;
let logic = lb
.app_logic_mut()
.ok_or(TxProverServiceError::PingoraConfigFailed("app logic not found".to_string()))?;
let mut http_server_options = HttpServerOptions::default();

// Enable HTTP/2 for plaintext
Expand Down
23 changes: 23 additions & 0 deletions bin/tx-prover/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
use axum::http::uri::InvalidUri;
use thiserror::Error;

// TX PROVER SERVICE ERROR
// ================================================================================================

#[derive(Debug, Error)]
pub enum TxProverServiceError {
#[error("invalid uri {1}")]
InvalidURI(#[source] InvalidUri, String),
#[error("failed to connect to worker {1}")]
ConnectionFailed(#[source] tonic::transport::Error, String),
#[error("failed to create backend for worker")]
BackendCreationFailed(#[source] Box<pingora::Error>),
#[error("failed to setup pingora: {0}")]
PingoraConfigFailed(String),
}

impl From<TxProverServiceError> for String {
fn from(err: TxProverServiceError) -> Self {
err.to_string()
}
}
1 change: 1 addition & 0 deletions bin/tx-prover/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod api;
pub mod commands;
pub mod error;
mod proxy;
mod utils;
use commands::Cli;
Expand Down
15 changes: 10 additions & 5 deletions bin/tx-prover/src/proxy/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::{
update_workers::{Action, UpdateWorkers},
ProxyConfig,
},
error::TxProverServiceError,
utils::{
create_queue_full_response, create_response_with_error_message,
create_too_many_requests_response, create_workers_updated_response, MIDEN_TX_PROVER,
Expand All @@ -36,7 +37,7 @@ mod worker;
/// Localhost address
const LOCALHOST_ADDR: &str = "127.0.0.1";

// LOAD BALANCER
// LOAD BALANCER STATE
// ================================================================================================

/// Load balancer that uses a round robin strategy
Expand All @@ -54,11 +55,15 @@ pub struct LoadBalancerState {

impl LoadBalancerState {
/// Create a new load balancer
///
/// # Errors
/// Returns an error if:
/// - The worker cannot be created.
#[tracing::instrument(name = "proxy:new_load_balancer", skip(initial_workers))]
pub async fn new(
initial_workers: Vec<Backend>,
config: &ProxyConfig,
) -> core::result::Result<Self, String> {
) -> core::result::Result<Self, TxProverServiceError> {
let mut workers: Vec<Worker> = Vec::with_capacity(initial_workers.len());

let connection_timeout = Duration::from_secs(config.connection_timeout_secs);
Expand Down Expand Up @@ -120,7 +125,7 @@ impl LoadBalancerState {
pub async fn update_workers(
&self,
update_workers: UpdateWorkers,
) -> std::result::Result<(), String> {
) -> std::result::Result<(), TxProverServiceError> {
let mut workers = self.workers.write().await;
info!("Current workers: {:?}", workers);

Expand All @@ -129,7 +134,7 @@ impl LoadBalancerState {
.iter()
.map(|worker| Backend::new(worker))
.collect::<Result<Vec<Backend>, _>>()
.map_err(|err| format!("Failed to create backend: {}", err))?;
.map_err(TxProverServiceError::BackendCreationFailed)?;

let mut native_workers = Vec::new();

Expand Down Expand Up @@ -326,7 +331,7 @@ impl RequestContext {
}
}

// LOAD BALANCER WRAPPER
// LOAD BALANCER
// ================================================================================================

/// Wrapper around the load balancer that implements the ProxyHttp trait
Expand Down
9 changes: 7 additions & 2 deletions bin/tx-prover/src/proxy/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use tonic_health::pb::{
};
use tracing::error;

use crate::utils::create_health_check_client;
use crate::{error::TxProverServiceError, utils::create_health_check_client};

// WORKER
// ================================================================================================
Expand All @@ -24,11 +24,16 @@ pub struct Worker {
}

impl Worker {
/// Creates a new worker and a gRPC health check client for the given worker address.
///
/// # Errors
/// - Returns [TxProverServiceError::InvalidURI] if the worker address is invalid.
/// - Returns [TxProverServiceError::ConnectionFailed] if the connection to the worker fails.
pub async fn new(
worker: Backend,
connection_timeout: Duration,
total_timeout: Duration,
) -> Result<Self, String> {
) -> Result<Self, TxProverServiceError> {
let health_check_client =
create_health_check_client(worker.addr.to_string(), connection_timeout, total_timeout)
.await?;
Expand Down
17 changes: 11 additions & 6 deletions bin/tx-prover/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ use tonic::transport::Channel;
use tonic_health::pb::health_client::HealthClient;
use tracing_subscriber::{layer::SubscriberExt, Registry};

use crate::error::TxProverServiceError;

pub const MIDEN_TX_PROVER: &str = "miden-tx-prover";

const RESOURCE_EXHAUSTED_CODE: u16 = 8;
Expand Down Expand Up @@ -158,18 +160,21 @@ pub async fn create_response_with_error_message(

/// Create a gRPC [HealthClient] for the given worker address.
///
/// It will panic if the worker URI is invalid.
/// # Errors
/// - [TxProverServiceError::InvalidURI] if the worker address is invalid.
/// - [TxProverServiceError::ConnectionFailed] if the connection to the worker fails.
pub async fn create_health_check_client(
address: String,
connection_timeout: Duration,
total_timeout: Duration,
) -> Result<HealthClient<Channel>, String> {
Channel::from_shared(format!("http://{}", address))
.map_err(|err| format!("Invalid format for worker URI: {}", err))?
) -> Result<HealthClient<Channel>, TxProverServiceError> {
let channel = Channel::from_shared(format!("http://{}", address))
.map_err(|err| TxProverServiceError::InvalidURI(err, address.clone()))?
.connect_timeout(connection_timeout)
.timeout(total_timeout)
.connect()
.await
.map(HealthClient::new)
.map_err(|err| format!("Failed to create health check client for worker: {}", err))
.map_err(|err| TxProverServiceError::ConnectionFailed(err, address))?;

Ok(HealthClient::new(channel))
}

0 comments on commit 2072088

Please sign in to comment.