diff --git a/Cargo.lock b/Cargo.lock index 24a09f476d..bc4a48de64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1966,7 +1966,6 @@ dependencies = [ "forester-utils", "futures", "itertools 0.14.0", - "lazy_static", "light-batched-merkle-tree", "light-client", "light-hash-set", @@ -1980,7 +1979,6 @@ dependencies = [ "light-system-program", "light-test-utils", "photon-api", - "prometheus", "reqwest", "scopeguard", "serde", @@ -1998,7 +1996,6 @@ dependencies = [ "tracing-appender", "tracing-subscriber", "url", - "warp", ] [[package]] @@ -2008,7 +2005,11 @@ dependencies = [ "account-compression", "anchor-lang", "anchor-spl", + "anyhow", "async-trait", + "bb8", + "borsh 0.10.3", + "lazy_static", "light-batched-merkle-tree", "light-client", "light-compressed-token", @@ -2019,6 +2020,7 @@ dependencies = [ "light-merkle-tree-reference", "light-prover-client", "light-registry", + "light-sdk", "light-system-program", "light-utils 1.1.0", "light-verifier", @@ -2026,14 +2028,18 @@ dependencies = [ "num-bigint 0.4.6", "num-traits", "photon-api", + "prometheus", "reqwest", "solana-client", + "solana-program", "solana-program-test", "solana-sdk", "solana-transaction-status", "spl-token", "thiserror 1.0.64", "tokio", + "tracing", + "warp", ] [[package]] diff --git a/forester-utils/Cargo.toml b/forester-utils/Cargo.toml index 9186a6c078..76f404f1d3 100644 --- a/forester-utils/Cargo.toml +++ b/forester-utils/Cargo.toml @@ -21,6 +21,7 @@ light-system-program = { workspace = true, features = ["cpi"] } light-utils = { workspace = true } light-batched-merkle-tree = { workspace = true } light-verifier = { workspace = true } +light-sdk = { workspace = true } photon-api = { workspace = true } light-client = { workspace = true } @@ -30,6 +31,7 @@ anchor-spl = { workspace = true } # Solana spl-token = { workspace = true, features = ["no-entrypoint"] } +solana-program = { workspace = true } solana-program-test = { workspace = true } solana-sdk = { workspace = true } solana-client = { workspace = true } @@ -51,4 +53,12 @@ num-traits = { workspace = true } # HTTP client reqwest = "0.11.26" +bb8 = { workspace = true } +# Metrics +prometheus = "0.13" +warp = "0.3" +lazy_static = "1.4" +anyhow = "1.0.94" +borsh = "0.10.3" +tracing = "0.1.40" \ No newline at end of file diff --git a/forester-utils/src/lib.rs b/forester-utils/src/lib.rs index c83a36c21b..ea1c6876ca 100644 --- a/forester-utils/src/lib.rs +++ b/forester-utils/src/lib.rs @@ -18,6 +18,10 @@ pub mod address_merkle_tree_config; pub mod forester_epoch; pub mod instructions; pub mod registry; +pub mod metrics; +pub mod rpc_pool; +pub mod solana_rpc; +pub use solana_rpc::{RetryConfig, SolanaRpcConnection}; pub fn create_account_instruction( payer: &Pubkey, diff --git a/forester/src/metrics.rs b/forester-utils/src/metrics/helpers.rs similarity index 62% rename from forester/src/metrics.rs rename to forester-utils/src/metrics/helpers.rs index 6c69d24d47..1522eae6d5 100644 --- a/forester/src/metrics.rs +++ b/forester-utils/src/metrics/helpers.rs @@ -1,15 +1,14 @@ use std::{ sync::Once, - time::{SystemTime, UNIX_EPOCH}, + time::{Instant, SystemTime, UNIX_EPOCH}, }; use lazy_static::lazy_static; -use prometheus::{Encoder, GaugeVec, IntCounterVec, IntGauge, IntGaugeVec, Registry, TextEncoder}; +use prometheus::{Encoder, GaugeVec, HistogramOpts, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec, Registry, TextEncoder}; use reqwest::Client; use tokio::sync::Mutex; use tracing::{debug, error}; - -use crate::Result; +use anyhow::Result; lazy_static! { pub static ref REGISTRY: Registry = Registry::new(); @@ -55,11 +54,71 @@ lazy_static! { &["pubkey"] ) .expect("metric can be created"); + pub static ref REGISTERED_FORESTERS: GaugeVec = GaugeVec::new( prometheus::opts!("registered_foresters", "Foresters registered per epoch"), &["epoch", "authority"] ) .expect("metric can be created"); + + pub static ref RPC_REQUESTS_TOTAL: IntCounterVec = IntCounterVec::new( + prometheus::opts!( + "solana_rpc_requests_total", + "Total number of RPC requests made" + ), + &["method", "status"] + ).expect("metric can be created"); + + pub static ref RPC_REQUEST_DURATION: HistogramVec = { + let opts = HistogramOpts::new( + "solana_rpc_request_duration_seconds", + "RPC request latency by method" + ).buckets(vec![0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]); + + HistogramVec::new(opts, &["method"]).expect("metric can be created") + }; + + pub static ref RPC_POOL_CONNECTIONS: IntGaugeVec = IntGaugeVec::new( + prometheus::opts!( + "solana_rpc_pool_connections", + "Number of connections in the RPC pool" + ), + &["state"] // "active", "idle" + ).expect("metric can be created"); + + pub static ref RPC_REQUEST_ERRORS: IntCounterVec = IntCounterVec::new( + prometheus::opts!( + "solana_rpc_request_errors_total", + "Total number of RPC request errors" + ), + &["method", "error_type"] + ).expect("metric can be created"); + + pub static ref RPC_POOL_WAIT_DURATION: HistogramVec = { + let opts = HistogramOpts::new( + "solana_rpc_pool_wait_duration_seconds", + "Time spent waiting for an RPC connection from the pool" + ).buckets(vec![0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0]); + + HistogramVec::new(opts, &["pool_id"]).expect("metric can be created") + }; + + pub static ref RPC_POOL_ACQUISITION_TOTAL: IntCounterVec = IntCounterVec::new( + prometheus::opts!( + "solana_rpc_pool_acquisitions_total", + "Total number of RPC connection acquisitions" + ), + &["pool_id", "status"] // status: success, timeout, error + ).expect("metric can be created"); + + pub static ref RPC_POOL_TIMEOUTS: IntCounterVec = IntCounterVec::new( + prometheus::opts!( + "solana_rpc_pool_timeouts_total", + "Total number of RPC pool timeouts" + ), + &["pool_id"] + ).expect("metric can be created"); + static ref METRIC_UPDATES: Mutex> = Mutex::new(Vec::new()); } @@ -88,6 +147,20 @@ pub fn register_metrics() { REGISTRY .register(Box::new(REGISTERED_FORESTERS.clone())) .expect("collector can be registered"); + REGISTRY.register(Box::new(RPC_REQUESTS_TOTAL.clone())).expect("collector can be registered"); + REGISTRY.register(Box::new(RPC_REQUEST_DURATION.clone())).expect("collector can be registered"); + REGISTRY.register(Box::new(RPC_POOL_CONNECTIONS.clone())).expect("collector can be registered"); + REGISTRY.register(Box::new(RPC_REQUEST_ERRORS.clone())).expect("collector can be registered"); + + REGISTRY + .register(Box::new(RPC_POOL_WAIT_DURATION.clone())) + .expect("collector can be registered"); + REGISTRY + .register(Box::new(RPC_POOL_ACQUISITION_TOTAL.clone())) + .expect("collector can be registered"); + REGISTRY + .register(Box::new(RPC_POOL_TIMEOUTS.clone())) + .expect("collector can be registered"); }); } @@ -214,3 +287,26 @@ pub async fn metrics_handler() -> Result { res.push_str(&res_prometheus); Ok(res) } + +pub fn track_rpc_request(method: &str, start_time: Instant, result: &Result<()>) { + let duration = start_time.elapsed().as_secs_f64(); + + match result { + Ok(_) => { + RPC_REQUESTS_TOTAL.with_label_values(&[method, "success"]).inc(); + } + Err(e) => { + RPC_REQUESTS_TOTAL.with_label_values(&[method, "error"]).inc(); + RPC_REQUEST_ERRORS + .with_label_values(&[method, &e.to_string()]) + .inc(); + } + } + + RPC_REQUEST_DURATION.with_label_values(&[method]).observe(duration); +} + +pub fn update_rpc_pool_connections(active: i64, idle: i64) { + RPC_POOL_CONNECTIONS.with_label_values(&["active"]).set(active); + RPC_POOL_CONNECTIONS.with_label_values(&["idle"]).set(idle); +} \ No newline at end of file diff --git a/forester-utils/src/metrics/mod.rs b/forester-utils/src/metrics/mod.rs new file mode 100644 index 0000000000..46bfe90fd0 --- /dev/null +++ b/forester-utils/src/metrics/mod.rs @@ -0,0 +1,12 @@ +pub mod helpers; + +pub use helpers::{ + RPC_REQUESTS_TOTAL, + RPC_REQUEST_DURATION, + RPC_REQUEST_ERRORS, + + RPC_POOL_CONNECTIONS, + RPC_POOL_WAIT_DURATION, + RPC_POOL_ACQUISITION_TOTAL, + RPC_POOL_TIMEOUTS, +}; \ No newline at end of file diff --git a/forester-utils/src/rpc_pool.rs b/forester-utils/src/rpc_pool.rs new file mode 100644 index 0000000000..5e3a234d00 --- /dev/null +++ b/forester-utils/src/rpc_pool.rs @@ -0,0 +1,220 @@ +use std::{ + future::Future, sync::atomic::{AtomicU64, Ordering}, time::{Duration, Instant} +}; +use async_trait::async_trait; +use bb8::{Pool, PooledConnection}; +use light_client::rpc::{RpcConnection, RpcError}; +use solana_sdk::commitment_config::CommitmentConfig; +use thiserror::Error; +use tokio::time::sleep; +use tracing::warn; + +use crate::metrics::{ + RPC_POOL_CONNECTIONS, + RPC_POOL_WAIT_DURATION, + RPC_POOL_ACQUISITION_TOTAL, + RPC_POOL_TIMEOUTS, +}; + +#[derive(Error, Debug)] +pub enum PoolError { + #[error("Failed to create RPC client: {0}")] + ClientCreation(String), + #[error("RPC request failed: {0}")] + RpcRequest(#[from] RpcError), + #[error("Pool error: {0}")] + Pool(String), +} + +pub struct SolanaConnectionManager { + url: String, + commitment: CommitmentConfig, + _phantom: std::marker::PhantomData, +} + +impl SolanaConnectionManager { + pub fn new(url: String, commitment: CommitmentConfig) -> Self { + Self { + url, + commitment, + _phantom: std::marker::PhantomData, + } + } +} + +#[async_trait] +impl bb8::ManageConnection for SolanaConnectionManager { + type Connection = R; + type Error = PoolError; + + async fn connect(&self) -> Result { + Ok(R::new(&self.url, Some(self.commitment))) + } + + async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { + conn.health().await.map_err(PoolError::RpcRequest) + } + + fn has_broken(&self, _conn: &mut Self::Connection) -> bool { + false + } +} + +#[derive(Debug)] +pub struct SolanaRpcPool { + pool: Pool>, + active_connections: AtomicU64, + max_size: u32, +} + +impl SolanaRpcPool { + pub async fn new( + url: String, + commitment: CommitmentConfig, + max_size: u32, + ) -> Result { + let manager = SolanaConnectionManager::new(url, commitment); + let pool = Pool::builder() + .max_size(max_size) + .connection_timeout(Duration::from_secs(15)) + .idle_timeout(Some(Duration::from_secs(60 * 5))) + .build(manager) + .await + .map_err(|e| PoolError::Pool(e.to_string()))?; + + Ok(Self { + pool, + active_connections: AtomicU64::new(0), + max_size, + }) + } + + async fn measure_pool_operation( + &self, + operation_type: &str, + f: impl Future> + ) -> Result { + let start = Instant::now(); + let pool_id = self.max_size.to_string(); + + let result = f.await; + + // Record metrics + let duration = start.elapsed().as_secs_f64(); + RPC_POOL_WAIT_DURATION + .with_label_values(&[&pool_id]) + .observe(duration); + + match &result { + Ok(_) => { + RPC_POOL_ACQUISITION_TOTAL + .with_label_values(&[&pool_id, operation_type]) + .inc(); + } + Err(e) => { + if e.to_string().contains("timeout") { + RPC_POOL_TIMEOUTS + .with_label_values(&[&pool_id]) + .inc(); + RPC_POOL_ACQUISITION_TOTAL + .with_label_values(&[&pool_id, "timeout"]) + .inc(); + } else { + RPC_POOL_ACQUISITION_TOTAL + .with_label_values(&[&pool_id, "error"]) + .inc(); + } + } + } + + result + } +} + +pub struct TrackedConnection<'a, R: RpcConnection> { + conn: PooledConnection<'a, SolanaConnectionManager>, + pool: &'a SolanaRpcPool, +} + +impl std::ops::Deref for TrackedConnection<'_, R> { + type Target = R; + + fn deref(&self) -> &Self::Target { + &self.conn + } +} + +impl std::ops::DerefMut for TrackedConnection<'_, R> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.conn + } +} + +impl Drop for TrackedConnection<'_, R> { + fn drop(&mut self) { + let active = self.pool.active_connections.fetch_sub(1, Ordering::SeqCst) - 1; + let idle = self.pool.pool.state().idle_connections as i64; + + RPC_POOL_CONNECTIONS.with_label_values(&["active"]).set(active as i64); + RPC_POOL_CONNECTIONS.with_label_values(&["idle"]).set(idle); + } +} + +#[async_trait] +pub trait RpcPool: Send + Sync + std::fmt::Debug + 'static { + type Connection<'a>: std::ops::Deref + std::ops::DerefMut + Send + 'a + where + Self: 'a; + + async fn get_connection<'a>(&'a self) -> Result, PoolError>; + + async fn get_connection_with_retry<'a>( + &'a self, + max_retries: u32, + delay: Duration, + ) -> Result, PoolError>; +} + +#[async_trait] +impl RpcPool for SolanaRpcPool { + type Connection<'a> = TrackedConnection<'a, R> where Self: 'a; + + async fn get_connection<'a>(&'a self) -> Result, PoolError> { + self.measure_pool_operation("get_connection", async { + match self.pool.get().await { + Ok(conn) => { + let active = self.active_connections.fetch_add(1, Ordering::SeqCst) + 1; + let idle = self.pool.state().idle_connections as i64; + + RPC_POOL_CONNECTIONS.with_label_values(&["active"]).set(active as i64); + RPC_POOL_CONNECTIONS.with_label_values(&["idle"]).set(idle); + + Ok(TrackedConnection { + conn, + pool: self, + }) + } + Err(e) => Err(PoolError::Pool(e.to_string())), + } + }).await + } + + async fn get_connection_with_retry<'a>( + &'a self, + max_retries: u32, + delay: Duration, + ) -> Result, PoolError> { + let mut retries = 0; + loop { + match self.get_connection().await { + Ok(conn) => return Ok(conn), + Err(e) if retries < max_retries => { + retries += 1; + warn!("Failed to get connection (attempt {}): {:?}", retries, e); + sleep(delay).await; + } + Err(e) => return Err(e), + } + } + } +} \ No newline at end of file diff --git a/sdk-libs/client/src/rpc/solana_rpc.rs b/forester-utils/src/solana_rpc.rs similarity index 83% rename from sdk-libs/client/src/rpc/solana_rpc.rs rename to forester-utils/src/solana_rpc.rs index c5461381f2..a8c3685c64 100644 --- a/sdk-libs/client/src/rpc/solana_rpc.rs +++ b/forester-utils/src/solana_rpc.rs @@ -26,9 +26,15 @@ use solana_transaction_status::{ }; use tokio::time::{sleep, Instant}; -use crate::{ +use light_client::{ rpc::{errors::RpcError, merkle_tree::MerkleTreeExt, rpc_connection::RpcConnection}, - transaction_params::TransactionParams, + transaction_params::TransactionParams +}; + +use crate::metrics::{ + RPC_REQUESTS_TOTAL, + RPC_REQUEST_DURATION, + RPC_REQUEST_ERRORS, }; pub enum SolanaRpcUrl { @@ -75,7 +81,7 @@ impl Default for RetryConfig { pub struct SolanaRpcConnection { pub client: RpcClient, pub payer: Keypair, - retry_config: RetryConfig, + pub retry_config: RetryConfig, } impl Debug for SolanaRpcConnection { @@ -89,42 +95,55 @@ impl Debug for SolanaRpcConnection { } impl SolanaRpcConnection { - pub fn new_with_retry( - url: U, - commitment_config: Option, - retry_config: Option, - ) -> Self { - let payer = Keypair::new(); - let commitment_config = commitment_config.unwrap_or(CommitmentConfig::confirmed()); - let client = RpcClient::new_with_commitment(url.to_string(), commitment_config); - let retry_config = retry_config.unwrap_or_default(); - Self { - client, - payer, - retry_config, + // Add helper method for measuring requests + async fn measure_request(&self, method: &str, f: F) -> Result + where + F: FnOnce() -> Fut, + Fut: std::future::Future>, + { + let start = Instant::now(); + let result = f().await; + let duration = start.elapsed().as_secs_f64(); + + match &result { + Ok(_) => { + RPC_REQUESTS_TOTAL.with_label_values(&[method, "success"]).inc(); + } + Err(e) => { + RPC_REQUESTS_TOTAL.with_label_values(&[method, "error"]).inc(); + RPC_REQUEST_ERRORS + .with_label_values(&[method, &e.to_string()]) + .inc(); + } } + RPC_REQUEST_DURATION.with_label_values(&[method]).observe(duration); + + result } - async fn retry(&self, operation: F) -> Result + // Modify retry logic to use measured requests + pub async fn retry(&self, method: &str, operation: F) -> Result where F: Fn() -> Fut, Fut: std::future::Future>, { let mut attempts = 0; let start_time = Instant::now(); + loop { - match operation().await { + match self.measure_request(method, &operation).await { Ok(result) => return Ok(result), Err(e) => { attempts += 1; - if attempts >= self.retry_config.max_retries - || start_time.elapsed() >= self.retry_config.timeout + if attempts >= self.retry_config.max_retries + || start_time.elapsed() >= self.retry_config.timeout { return Err(e); } warn!( - "Operation failed, retrying in {:?} (attempt {}/{}): {:?}", - self.retry_config.retry_delay, attempts, self.retry_config.max_retries, e + "Operation {} failed, retrying in {:?} (attempt {}/{}): {:?}", + method, self.retry_config.retry_delay, attempts, + self.retry_config.max_retries, e ); sleep(self.retry_config.retry_delay).await; } @@ -133,9 +152,27 @@ impl SolanaRpcConnection { } } +impl SolanaRpcConnection { + pub fn new_with_retry( + url: U, + commitment_config: Option, + retry_config: Option, + ) -> Self { + let payer = Keypair::new(); + let commitment_config = commitment_config.unwrap_or(CommitmentConfig::confirmed()); + let client = RpcClient::new_with_commitment(url.to_string(), commitment_config); + let retry_config = retry_config.unwrap_or_default(); + Self { + client, + payer, + retry_config, + } + } +} + impl SolanaRpcConnection { #[allow(clippy::result_large_err)] - fn parse_inner_instructions( + pub fn parse_inner_instructions( &self, signature: Signature, ) -> Result { @@ -217,17 +254,17 @@ impl RpcConnection for SolanaRpcConnection { } async fn health(&self) -> Result<(), RpcError> { - self.retry(|| async { self.client.get_health().map_err(RpcError::from) }) + self.retry("health", || async { self.client.get_health().map_err(RpcError::from) }) .await } async fn get_block_time(&self, slot: u64) -> Result { - self.retry(|| async { self.client.get_block_time(slot).map_err(RpcError::from) }) + self.retry("get_block_time", || async { self.client.get_block_time(slot).map_err(RpcError::from) }) .await } async fn get_epoch_info(&self) -> Result { - self.retry(|| async { self.client.get_epoch_info().map_err(RpcError::from) }) + self.retry("get_epoch_info", || async { self.client.get_epoch_info().map_err(RpcError::from) }) .await } @@ -235,7 +272,7 @@ impl RpcConnection for SolanaRpcConnection { &self, program_id: &Pubkey, ) -> Result, RpcError> { - self.retry(|| async { + self.retry("get_program_accounts", || async { self.client .get_program_accounts(program_id) .map_err(RpcError::from) @@ -247,7 +284,7 @@ impl RpcConnection for SolanaRpcConnection { &mut self, transaction: Transaction, ) -> Result { - self.retry(|| async { + self.retry("process_transaction", || async { self.client .send_and_confirm_transaction(&transaction) .map_err(RpcError::from) @@ -259,7 +296,7 @@ impl RpcConnection for SolanaRpcConnection { &mut self, transaction: Transaction, ) -> Result<(Signature, Slot), RpcError> { - self.retry(|| async { + self.retry("process_transaction_with_context", || async { let signature = self.client.send_and_confirm_transaction(&transaction)?; let sig_info = self.client.get_signature_statuses(&[signature])?; let slot = sig_info @@ -356,7 +393,7 @@ impl RpcConnection for SolanaRpcConnection { } async fn confirm_transaction(&self, signature: Signature) -> Result { - self.retry(|| async { + self.retry("confirm_transaction", || async { self.client .confirm_transaction(&signature) .map_err(RpcError::from) @@ -365,7 +402,7 @@ impl RpcConnection for SolanaRpcConnection { } async fn get_account(&mut self, address: Pubkey) -> Result, RpcError> { - self.retry(|| async { + self.retry("get_account", || async { self.client .get_account_with_commitment(&address, self.client.commitment()) .map(|response| response.value) @@ -382,7 +419,7 @@ impl RpcConnection for SolanaRpcConnection { &mut self, data_len: usize, ) -> Result { - self.retry(|| async { + self.retry("get_minimum_balance_for_rent_exemption", || async { self.client .get_minimum_balance_for_rent_exemption(data_len) .map_err(RpcError::from) @@ -395,12 +432,12 @@ impl RpcConnection for SolanaRpcConnection { to: &Pubkey, lamports: u64, ) -> Result { - self.retry(|| async { + self.retry("request_airdrop", || async { let signature = self .client .request_airdrop(to, lamports) .map_err(RpcError::ClientError)?; - self.retry(|| async { + self.retry("confirm_transaction_with_commitment", || async { if self .client .confirm_transaction_with_commitment(&signature, self.client.commitment())? @@ -419,27 +456,30 @@ impl RpcConnection for SolanaRpcConnection { } async fn get_balance(&mut self, pubkey: &Pubkey) -> Result { - self.retry(|| async { self.client.get_balance(pubkey).map_err(RpcError::from) }) - .await + self.retry("get_balance", || async { + self.client.get_balance(pubkey).map_err(RpcError::from) + }).await } + async fn get_latest_blockhash(&mut self) -> Result { - self.retry(|| async { + self.retry("get_latest_blockhash", || async { self.client // Confirmed commitments land more reliably than finalized // https://www.helius.dev/blog/how-to-deal-with-blockhash-errors-on-solana#how-to-deal-with-blockhash-errors .get_latest_blockhash_with_commitment(CommitmentConfig::confirmed()) .map(|response| response.0) .map_err(RpcError::from) - }) - .await + }).await } async fn get_slot(&mut self) -> Result { - self.retry(|| async { self.client.get_slot().map_err(RpcError::from) }) - .await + self.retry("get_slot", || async { + self.client.get_slot().map_err(RpcError::from) + }).await } + async fn warp_to_slot(&mut self, _slot: Slot) -> Result<(), RpcError> { Err(RpcError::CustomError( "Warp to slot is not supported in SolanaRpcConnection".to_string(), @@ -447,7 +487,7 @@ impl RpcConnection for SolanaRpcConnection { } async fn send_transaction(&self, transaction: &Transaction) -> Result { - self.retry(|| async { + self.retry("send_transaction", || async { self.client .send_transaction_with_config( transaction, @@ -466,7 +506,7 @@ impl RpcConnection for SolanaRpcConnection { transaction: &Transaction, config: RpcSendTransactionConfig, ) -> Result { - self.retry(|| async { + self.retry("send_transaction_with_config", || async { self.client .send_transaction_with_config(transaction, config) .map_err(RpcError::from) @@ -475,7 +515,7 @@ impl RpcConnection for SolanaRpcConnection { } async fn get_transaction_slot(&mut self, signature: &Signature) -> Result { - self.retry(|| async { + self.retry("get_transaction_slot", || async { Ok(self .client .get_transaction_with_config( @@ -492,7 +532,7 @@ impl RpcConnection for SolanaRpcConnection { .await } async fn get_block_height(&mut self) -> Result { - self.retry(|| async { self.client.get_block_height().map_err(RpcError::from) }) + self.retry("get_block_height", || async { self.client.get_block_height().map_err(RpcError::from) }) .await } } diff --git a/forester/Cargo.toml b/forester/Cargo.toml index 9a24ea9322..02300eb314 100644 --- a/forester/Cargo.toml +++ b/forester/Cargo.toml @@ -11,6 +11,7 @@ solana-client = { workspace = true } solana-account-decoder = { workspace = true } solana-program = { workspace = true } account-compression = { workspace = true } +light-client = { workspace = true } light-batched-merkle-tree = { workspace = true } light-system-program = { workspace = true, features = ["cpi"] } light-hash-set = { workspace = true, features = ["solana"] } @@ -19,7 +20,6 @@ light-merkle-tree-reference = { workspace = true } light-registry = { workspace = true} photon-api = { workspace = true } forester-utils = { workspace = true } -light-client = { workspace = true } light-merkle-tree-metadata = { workspace = true } light-sdk = { workspace = true } light-program-test = { workspace = true} @@ -35,18 +35,15 @@ tokio = { version = "1", features = ["full"] } reqwest = { version = "0.11", features = ["json", "rustls-tls", "blocking"] } futures = "0.3.31" thiserror = "1" -borsh = "0.10.3" bs58 = "0.5.1" env_logger = "0.11" async-trait = "0.1.81" tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter", "json"] } tracing-appender = "0.2.3" -prometheus = "0.13" -lazy_static = "1.4" -warp = "0.3" dashmap = "6.1.0" scopeguard = "1.2.0" +borsh = "0.10.3" anyhow = "1.0.94" itertools = "0.14.0" diff --git a/forester/src/batch_processor/address.rs b/forester/src/batch_processor/address.rs index 339a6abe72..c45e322bfa 100644 --- a/forester/src/batch_processor/address.rs +++ b/forester/src/batch_processor/address.rs @@ -1,11 +1,10 @@ use borsh::BorshSerialize; -use forester_utils::instructions::create_batch_update_address_tree_instruction_data; +use forester_utils::{instructions::create_batch_update_address_tree_instruction_data, rpc_pool::RpcPool}; use light_batched_merkle_tree::event::BatchNullifyEvent; use light_client::{indexer::Indexer, rpc::RpcConnection}; use light_registry::account_compression_cpi::sdk::create_batch_update_address_tree_instruction; use solana_sdk::signer::Signer; use tracing::{info, instrument}; - use super::common::BatchContext; use crate::{ batch_processor::error::{BatchProcessError, Result}, @@ -13,8 +12,8 @@ use crate::{ }; #[instrument(level = "debug", skip(context), fields(tree = %context.merkle_tree))] -pub(crate) async fn process_batch + IndexerType>( - context: &BatchContext, +pub(crate) async fn process_batch + IndexerType, P: RpcPool>( + context: &BatchContext, ) -> Result { info!("Processing address batch operation"); let mut rpc = context.rpc_pool.get_connection().await?; diff --git a/forester/src/batch_processor/common.rs b/forester/src/batch_processor/common.rs index cdc0c424c4..43c2c5c0d3 100644 --- a/forester/src/batch_processor/common.rs +++ b/forester/src/batch_processor/common.rs @@ -1,12 +1,12 @@ use std::sync::Arc; -use forester_utils::forester_epoch::TreeType; +use forester_utils::{forester_epoch::TreeType, rpc_pool::RpcPool}; use light_batched_merkle_tree::{ batch::{Batch, BatchState}, merkle_tree::BatchedMerkleTreeAccount, queue::BatchedQueueAccount, }; -use light_client::{indexer::Indexer, rpc::RpcConnection, rpc_pool::SolanaRpcPool}; +use light_client::{indexer::Indexer, rpc::RpcConnection}; use solana_program::pubkey::Pubkey; use solana_sdk::signature::Keypair; use tokio::sync::Mutex; @@ -16,14 +16,15 @@ use super::{address, error::Result, state, BatchProcessError}; use crate::indexer_type::IndexerType; #[derive(Debug)] -pub struct BatchContext> { - pub rpc_pool: Arc>, +pub struct BatchContext, P: RpcPool> { + pub rpc_pool: Arc

, pub indexer: Arc>, pub authority: Keypair, pub derivation: Pubkey, pub epoch: u64, pub merkle_tree: Pubkey, pub output_queue: Pubkey, + pub phantom: std::marker::PhantomData, } #[derive(Debug)] @@ -34,13 +35,13 @@ pub enum BatchReadyState { } #[derive(Debug)] -pub struct BatchProcessor + IndexerType> { - context: BatchContext, +pub struct BatchProcessor + IndexerType, P: RpcPool> { + context: BatchContext, tree_type: TreeType, } -impl + IndexerType> BatchProcessor { - pub fn new(context: BatchContext, tree_type: TreeType) -> Self { +impl + IndexerType, P: RpcPool> BatchProcessor { + pub fn new(context: BatchContext, tree_type: TreeType) -> Self { Self { context, tree_type } } diff --git a/forester/src/batch_processor/error.rs b/forester/src/batch_processor/error.rs index 9d031e560c..2eef760357 100644 --- a/forester/src/batch_processor/error.rs +++ b/forester/src/batch_processor/error.rs @@ -1,5 +1,4 @@ -use forester_utils::forester_epoch::TreeType; -use light_client::rpc_pool::PoolError; +use forester_utils::{forester_epoch::TreeType, rpc_pool::PoolError}; use solana_client::rpc_request::RpcError; use thiserror::Error; diff --git a/forester/src/batch_processor/mod.rs b/forester/src/batch_processor/mod.rs index 9e392aee33..a07dcf48bb 100644 --- a/forester/src/batch_processor/mod.rs +++ b/forester/src/batch_processor/mod.rs @@ -5,7 +5,7 @@ mod state; use common::BatchProcessor; use error::Result; -use forester_utils::forester_epoch::TreeType; +use forester_utils::{forester_epoch::TreeType, rpc_pool::RpcPool}; use light_client::rpc::RpcConnection; use tracing::{info, instrument}; @@ -17,8 +17,8 @@ use tracing::{info, instrument}; tree_type = ?tree_type ) )] -pub async fn process_batched_operations + IndexerType>( - context: BatchContext, +pub async fn process_batched_operations + IndexerType, P: RpcPool>( + context: BatchContext, tree_type: TreeType, ) -> Result { info!("process_batched_operations"); diff --git a/forester/src/batch_processor/state.rs b/forester/src/batch_processor/state.rs index d560ae8d32..1a3a549465 100644 --- a/forester/src/batch_processor/state.rs +++ b/forester/src/batch_processor/state.rs @@ -1,5 +1,5 @@ use borsh::BorshSerialize; -use forester_utils::instructions::{create_append_batch_ix_data, create_nullify_batch_ix_data}; +use forester_utils::{instructions::{create_append_batch_ix_data, create_nullify_batch_ix_data}, rpc_pool::RpcPool}; use light_batched_merkle_tree::event::{BatchAppendEvent, BatchNullifyEvent}; use light_client::{indexer::Indexer, rpc::RpcConnection}; use light_registry::account_compression_cpi::sdk::{ @@ -15,8 +15,8 @@ use crate::{ }, }; -pub(crate) async fn perform_append + IndexerType>( - context: &BatchContext, +pub(crate) async fn perform_append + IndexerType, P: RpcPool>( + context: &BatchContext, rpc: &mut R, num_inserted_zkps: u64, ) -> Result<()> { @@ -61,8 +61,8 @@ pub(crate) async fn perform_append + IndexerType Ok(()) } -pub(crate) async fn perform_nullify + IndexerType>( - context: &BatchContext, +pub(crate) async fn perform_nullify + IndexerType, P: RpcPool>( + context: &BatchContext, rpc: &mut R, ) -> Result<()> { let batch_index = get_batch_index(context, rpc).await?; @@ -102,8 +102,8 @@ pub(crate) async fn perform_nullify + IndexerTyp Ok(()) } -async fn get_batch_index>( - context: &BatchContext, +async fn get_batch_index, P: RpcPool>( + context: &BatchContext, rpc: &mut R, ) -> Result { let mut account = rpc.get_account(context.merkle_tree).await?.unwrap(); diff --git a/forester/src/config.rs b/forester/src/config.rs index 460229f7d6..3ca332e50a 100644 --- a/forester/src/config.rs +++ b/forester/src/config.rs @@ -6,9 +6,9 @@ use account_compression::{ }; use anchor_lang::Id; use forester_utils::forester_epoch::{Epoch, TreeAccounts, TreeForesterSchedule}; -use light_client::rpc::RetryConfig; use light_registry::{EpochPda, ForesterEpochPda}; use solana_sdk::signature::Keypair; +use forester_utils::RetryConfig; use crate::{ cli::{StartArgs, StatusArgs}, diff --git a/forester/src/epoch_manager.rs b/forester/src/epoch_manager.rs index 9e9933afc4..6459659171 100644 --- a/forester/src/epoch_manager.rs +++ b/forester/src/epoch_manager.rs @@ -1,22 +1,19 @@ use std::{ - collections::HashMap, - sync::{ + collections::HashMap, marker::PhantomData, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, Arc, - }, - time::Duration, + }, time::Duration }; use anyhow::Context; use dashmap::DashMap; -use forester_utils::forester_epoch::{ +use forester_utils::{forester_epoch::{ get_epoch_phases, Epoch, TreeAccounts, TreeForesterSchedule, TreeType, -}; +}, metrics::helpers::{push_metrics, queue_metric_update, update_forester_sol_balance}, rpc_pool::RpcPool, RetryConfig, SolanaRpcConnection}; use futures::future::join_all; use light_client::{ indexer::{Indexer, MerkleProof, NewAddressProofWithContext}, - rpc::{RetryConfig, RpcConnection, RpcError, SolanaRpcConnection}, - rpc_pool::SolanaRpcPool, + rpc::{RpcConnection, RpcError}, }; use light_registry::{ protocol_config::state::ProtocolConfig, @@ -34,24 +31,13 @@ use tokio::{ use tracing::{debug, error, info, info_span, instrument, warn}; use crate::{ - batch_processor::{process_batched_operations, BatchContext}, - errors::{ + batch_processor::{process_batched_operations, BatchContext}, errors::{ ChannelError, ConfigurationError, ForesterError, InitializationError, RegistrationError, WorkReportError, - }, - indexer_type::{rollover_address_merkle_tree, rollover_state_merkle_tree, IndexerType}, - metrics::{push_metrics, queue_metric_update, update_forester_sol_balance}, - pagerduty::send_pagerduty_alert, - queue_helpers::QueueItemData, - rollover::is_tree_ready_for_rollover, - send_transaction::{ + }, indexer_type::{rollover_address_merkle_tree, rollover_state_merkle_tree, IndexerType}, pagerduty::send_pagerduty_alert, queue_helpers::QueueItemData, rollover::is_tree_ready_for_rollover, send_transaction::{ send_batched_transactions, BuildTransactionBatchConfig, EpochManagerTransactions, SendBatchedTransactionsConfig, - }, - slot_tracker::{slot_duration, wait_until_slot_reached, SlotTracker}, - tree_data_sync::fetch_trees, - tree_finder::TreeFinder, - ForesterConfig, ForesterEpochInfo, Result, + }, slot_tracker::{slot_duration, wait_until_slot_reached, SlotTracker}, tree_data_sync::fetch_trees, tree_finder::TreeFinder, ForesterConfig, ForesterEpochInfo, Result }; #[derive(Copy, Clone, Debug)] @@ -83,10 +69,10 @@ pub enum MerkleProofType { } #[derive(Debug)] -pub struct EpochManager> { +pub struct EpochManager, P: RpcPool> { config: Arc, protocol_config: Arc, - rpc_pool: Arc>, + rpc_pool: Arc

, indexer: Arc>, work_report_sender: mpsc::Sender, processed_items_per_epoch_count: Arc>>, @@ -94,9 +80,10 @@ pub struct EpochManager> { slot_tracker: Arc, processing_epochs: Arc>>, new_tree_sender: broadcast::Sender, + _phantom: PhantomData, } -impl> Clone for EpochManager { +impl, P: RpcPool> Clone for EpochManager { fn clone(&self) -> Self { Self { config: self.config.clone(), @@ -109,16 +96,17 @@ impl> Clone for EpochManager { slot_tracker: self.slot_tracker.clone(), processing_epochs: self.processing_epochs.clone(), new_tree_sender: self.new_tree_sender.clone(), + _phantom: PhantomData, } } } -impl + IndexerType> EpochManager { +impl + IndexerType, P: RpcPool> EpochManager { #[allow(clippy::too_many_arguments)] pub async fn new( config: Arc, protocol_config: Arc, - rpc_pool: Arc>, + rpc_pool: Arc

, indexer: Arc>, work_report_sender: mpsc::Sender, trees: Vec, @@ -136,6 +124,7 @@ impl + IndexerType> EpochManager { slot_tracker, processing_epochs: Arc::new(DashMap::new()), new_tree_sender, + _phantom: PhantomData }) } @@ -878,6 +867,7 @@ impl + IndexerType> EpochManager { epoch: epoch_info.epoch, merkle_tree: tree.tree_accounts.merkle_tree, output_queue: tree.tree_accounts.queue, + phantom: std::marker::PhantomData::, }; let start_time = Instant::now(); @@ -1160,10 +1150,10 @@ impl + IndexerType> EpochManager { skip(config, protocol_config, rpc_pool, indexer, shutdown, work_report_sender, slot_tracker), fields(forester = %config.payer_keypair.pubkey()) )] -pub async fn run_service + IndexerType>( +pub async fn run_service + IndexerType, P: RpcPool>( config: Arc, protocol_config: Arc, - rpc_pool: Arc>, + rpc_pool: Arc

, indexer: Arc>, shutdown: oneshot::Receiver<()>, work_report_sender: mpsc::Sender, @@ -1214,7 +1204,7 @@ pub async fn run_service + IndexerType>( .await { Ok(epoch_manager) => { - let epoch_manager: Arc> = Arc::new(epoch_manager); + let epoch_manager: Arc> = Arc::new(epoch_manager); debug!( "Successfully created EpochManager after {} attempts", retry_count + 1 diff --git a/forester/src/errors.rs b/forester/src/errors.rs index 079550527f..b783a399c3 100644 --- a/forester/src/errors.rs +++ b/forester/src/errors.rs @@ -1,6 +1,7 @@ use std::time::Duration; -use light_client::{rpc::errors::RpcError, rpc_pool::PoolError}; +use forester_utils::rpc_pool::PoolError; +use light_client::rpc::errors::RpcError; use light_registry::errors::RegistryError; use photon_api::apis::{default_api::GetCompressedAccountProofPostError, Error as PhotonApiError}; use solana_program::{program_error::ProgramError, pubkey::Pubkey}; diff --git a/forester/src/forester_status.rs b/forester/src/forester_status.rs index 1257155fbe..9049a5b97b 100644 --- a/forester/src/forester_status.rs +++ b/forester/src/forester_status.rs @@ -1,16 +1,15 @@ use std::sync::Arc; use anchor_lang::{AccountDeserialize, Discriminator}; -use forester_utils::forester_epoch::TreeType; +use forester_utils::{forester_epoch::TreeType, metrics::helpers::{push_metrics, register_metrics, update_registered_foresters}, SolanaRpcConnection}; use itertools::Itertools; -use light_client::rpc::{RpcConnection, SolanaRpcConnection}; +use light_client::rpc::RpcConnection; use light_registry::{protocol_config::state::ProtocolConfigPda, EpochPda, ForesterEpochPda}; use solana_sdk::{account::ReadableAccount, commitment_config::CommitmentConfig}; use tracing::{debug, warn}; use crate::{ cli::StatusArgs, - metrics::{push_metrics, register_metrics, update_registered_foresters}, rollover::get_tree_fullness, run_queue_info, tree_data_sync::fetch_trees, diff --git a/forester/src/indexer_type.rs b/forester/src/indexer_type.rs index 68d891d76a..d35cfbd6a4 100644 --- a/forester/src/indexer_type.rs +++ b/forester/src/indexer_type.rs @@ -4,7 +4,7 @@ use async_trait::async_trait; use forester_utils::forester_epoch::TreeAccounts; use light_client::{ indexer::{Indexer, StateMerkleTreeAccounts, StateMerkleTreeBundle}, - rpc::RpcConnection, + rpc::{merkle_tree::MerkleTreeExt, RpcConnection}, }; use light_hasher::Poseidon; use light_merkle_tree_reference::MerkleTree; @@ -74,8 +74,7 @@ pub trait IndexerType: sealed::Sealed { } #[async_trait] -impl IndexerType - for TestIndexer +impl IndexerType for TestIndexer { fn handle_state_bundle( indexer: &mut impl Indexer, diff --git a/forester/src/lib.rs b/forester/src/lib.rs index 25ea7e6c0e..9fe5b97f6e 100644 --- a/forester/src/lib.rs +++ b/forester/src/lib.rs @@ -8,7 +8,6 @@ pub mod errors; pub mod forester_status; pub mod helius_priority_fee_types; mod indexer_type; -pub mod metrics; pub mod pagerduty; pub mod photon_indexer; pub mod pubsub_client; @@ -26,12 +25,12 @@ use std::{sync::Arc, time::Duration}; use account_compression::utils::constants::{ADDRESS_QUEUE_VALUES, STATE_NULLIFIER_QUEUE_VALUES}; pub use config::{ForesterConfig, ForesterEpochInfo}; -use forester_utils::forester_epoch::{TreeAccounts, TreeType}; +use forester_utils::{forester_epoch::{TreeAccounts, TreeType}, metrics::helpers::QUEUE_LENGTH, rpc_pool::{RpcPool, SolanaRpcPool}}; use light_client::{ indexer::Indexer, - rpc::{RpcConnection, SolanaRpcConnection}, - rpc_pool::SolanaRpcPool, + rpc::RpcConnection }; +use forester_utils::solana_rpc::SolanaRpcConnection; use solana_sdk::commitment_config::CommitmentConfig; use tokio::sync::{mpsc, oneshot, Mutex}; use tracing::debug; @@ -39,7 +38,6 @@ use tracing::debug; use crate::{ epoch_manager::{run_service, WorkReport}, indexer_type::IndexerType, - metrics::QUEUE_LENGTH, queue_helpers::fetch_queue_item_data, slot_tracker::SlotTracker, utils::get_protocol_config, @@ -82,13 +80,17 @@ pub async fn run_queue_info( } } -pub async fn run_pipeline + IndexerType>( +pub async fn run_pipeline( config: Arc, indexer: Arc>, shutdown: oneshot::Receiver<()>, work_report_sender: mpsc::Sender, -) -> Result<()> { - let rpc_pool = SolanaRpcPool::::new( +) -> Result<()> +where + R: RpcConnection, + I: Indexer + IndexerType, +{ + let rpc_pool = SolanaRpcPool::::new( config.external_services.rpc_url.to_string(), CommitmentConfig::confirmed(), config.general_config.rpc_pool_size as u32, diff --git a/forester/src/main.rs b/forester/src/main.rs index 51ae5fa3e7..deccf359c6 100644 --- a/forester/src/main.rs +++ b/forester/src/main.rs @@ -2,16 +2,9 @@ use std::sync::Arc; use clap::Parser; use forester::{ - cli::{Cli, Commands}, - errors::ForesterError, - forester_status, - metrics::register_metrics, - photon_indexer::PhotonIndexer, - run_pipeline, - telemetry::setup_telemetry, - ForesterConfig, + cli::{Cli, Commands}, errors::ForesterError, forester_status, photon_indexer::PhotonIndexer, run_pipeline, telemetry::setup_telemetry, ForesterConfig }; -use light_client::rpc::{RpcConnection, SolanaRpcConnection}; +use forester_utils::{metrics::helpers::register_metrics, SolanaRpcConnection}; use tokio::{ signal::ctrl_c, sync::{mpsc, oneshot}, @@ -48,15 +41,24 @@ async fn main() -> Result<(), ForesterError> { } }); - let indexer_rpc = - SolanaRpcConnection::new(config.external_services.rpc_url.clone(), None); + let indexer_rpc = SolanaRpcConnection::new_with_retry( + config.external_services.rpc_url.clone(), + None, + None, + ); + let indexer = Arc::new(tokio::sync::Mutex::new(PhotonIndexer::new( config.external_services.indexer_url.clone().unwrap(), config.external_services.photon_api_key.clone(), indexer_rpc, ))); - run_pipeline(config, indexer, shutdown_receiver, work_report_sender).await? + run_pipeline::>( + config, + indexer, + shutdown_receiver, + work_report_sender, + ).await? } Commands::Status(args) => { forester_status::fetch_forester_status(args).await; diff --git a/forester/src/send_transaction.rs b/forester/src/send_transaction.rs index a610586339..e21d7c62d8 100644 --- a/forester/src/send_transaction.rs +++ b/forester/src/send_transaction.rs @@ -5,13 +5,13 @@ use account_compression::utils::constants::{ STATE_MERKLE_TREE_CHANGELOG, STATE_NULLIFIER_QUEUE_VALUES, }; use async_trait::async_trait; -use forester_utils::forester_epoch::{TreeAccounts, TreeType}; +use forester_utils::{forester_epoch::{TreeAccounts, TreeType}, rpc_pool::RpcPool}; use futures::future::join_all; use light_client::{ indexer::Indexer, - rpc::{RetryConfig, RpcConnection}, - rpc_pool::SolanaRpcPool, + rpc::RpcConnection, }; +use forester_utils::solana_rpc::RetryConfig; use light_registry::{ account_compression_cpi::sdk::{ create_nullify_instruction, create_update_address_merkle_tree_instruction, @@ -106,10 +106,10 @@ pub fn calculate_compute_unit_price(target_lamports: u64, compute_units: u64) -> /// end of slot /// - consider dynamic batch size based on the number of transactions in the /// queue -pub async fn send_batched_transactions( +pub async fn send_batched_transactions>( payer: &Keypair, derivation: &Pubkey, - pool: Arc>, + pool: Arc

, config: &SendBatchedTransactionsConfig, tree_accounts: TreeAccounts, transaction_builder: &T, diff --git a/forester/src/smart_transaction.rs b/forester/src/smart_transaction.rs index 16bb84a7af..fe03bee1e6 100644 --- a/forester/src/smart_transaction.rs +++ b/forester/src/smart_transaction.rs @@ -1,8 +1,8 @@ // adapted from https://github.com/helius-labs/helius-rust-sdk/blob/dev/src/optimized_transaction.rs // optimized for forester client -use std::time::{Duration, Instant}; +use std::{ops::{Deref, DerefMut}, time::{Duration, Instant}}; -use light_client::{rpc::RpcConnection, rpc_pool::SolanaConnectionManager}; +use light_client::rpc::RpcConnection; use solana_client::rpc_config::RpcSendTransactionConfig; use solana_sdk::{ compute_budget::ComputeBudgetInstruction, @@ -31,8 +31,8 @@ pub struct CreateSmartTransactionConfig { /// /// # Returns /// The confirmed transaction signature or an error if the confirmation times out -pub async fn poll_transaction_confirmation<'a, R: RpcConnection>( - connection: &mut bb8::PooledConnection<'a, SolanaConnectionManager>, +pub async fn poll_transaction_confirmation<'a, R: RpcConnection, C: Deref + DerefMut>( + connection: &mut C, txt_sig: Signature, abort_timeout: Duration, ) -> Result { @@ -78,8 +78,8 @@ pub async fn poll_transaction_confirmation<'a, R: RpcConnection>( } // Sends a transaction and handles its confirmation. Retries until timeout or last_valid_block_height is reached. -pub async fn send_and_confirm_transaction<'a, R: RpcConnection>( - connection: &mut bb8::PooledConnection<'a, SolanaConnectionManager>, +pub async fn send_and_confirm_transaction<'a, R: RpcConnection, C: Deref + DerefMut>( + connection: &mut C, transaction: &Transaction, send_transaction_config: RpcSendTransactionConfig, last_valid_block_height: u64, diff --git a/forester/src/tree_finder.rs b/forester/src/tree_finder.rs index e19263a8ff..85ba88e2f4 100644 --- a/forester/src/tree_finder.rs +++ b/forester/src/tree_finder.rs @@ -1,7 +1,7 @@ -use std::sync::Arc; +use std::{marker::PhantomData, sync::Arc}; -use forester_utils::forester_epoch::TreeAccounts; -use light_client::{rpc::RpcConnection, rpc_pool::SolanaRpcPool}; +use forester_utils::{forester_epoch::TreeAccounts, rpc_pool::RpcPool}; +use light_client::rpc::RpcConnection; use tokio::{ sync::broadcast, time::{interval, Duration}, @@ -10,16 +10,17 @@ use tracing::{debug, error, info}; use crate::{tree_data_sync::fetch_trees, Result}; -pub struct TreeFinder { - rpc_pool: Arc>, +pub struct TreeFinder> { + rpc_pool: Arc

, known_trees: Vec, new_tree_sender: broadcast::Sender, check_interval: Duration, + _phantom: std::marker::PhantomData, } -impl TreeFinder { +impl> TreeFinder { pub fn new( - rpc_pool: Arc>, + rpc_pool: Arc

, initial_trees: Vec, new_tree_sender: broadcast::Sender, check_interval: Duration, @@ -29,6 +30,7 @@ impl TreeFinder { known_trees: initial_trees, new_tree_sender, check_interval, + _phantom: PhantomData, } } diff --git a/forester/tests/batched_address_test.rs b/forester/tests/batched_address_test.rs index d6e570796b..b10e3f6e1f 100644 --- a/forester/tests/batched_address_test.rs +++ b/forester/tests/batched_address_test.rs @@ -1,16 +1,13 @@ use std::{sync::Arc, time::Duration}; use forester::run_pipeline; -use forester_utils::registry::{register_test_forester, update_test_forester}; +use forester_utils::{registry::{register_test_forester, update_test_forester}, rpc_pool::{RpcPool, SolanaRpcPool}, solana_rpc::SolanaRpcUrl, SolanaRpcConnection}; use light_batched_merkle_tree::{ batch::BatchState, initialize_address_tree::InitAddressTreeAccountsInstructionData, merkle_tree::BatchedMerkleTreeAccount, }; -use light_client::{ - indexer::AddressMerkleTreeAccounts, - rpc::{solana_rpc::SolanaRpcUrl, RpcConnection, SolanaRpcConnection}, - rpc_pool::SolanaRpcPool, -}; +use light_client::rpc::RpcConnection; +use light_client::indexer::AddressMerkleTreeAccounts; use light_program_test::{indexer::TestIndexer, test_env::EnvAccounts}; use light_prover_client::gnark::helpers::{LightValidatorConfig, ProverConfig, ProverMode}; use light_test_utils::{ diff --git a/forester/tests/batched_state_test.rs b/forester/tests/batched_state_test.rs index f0d5c48c61..e211e4cd68 100644 --- a/forester/tests/batched_state_test.rs +++ b/forester/tests/batched_state_test.rs @@ -1,15 +1,12 @@ use std::{sync::Arc, time::Duration}; use forester::run_pipeline; -use forester_utils::registry::{register_test_forester, update_test_forester}; +use forester_utils::{registry::{register_test_forester, update_test_forester}, rpc_pool::{RpcPool, SolanaRpcPool}, solana_rpc::SolanaRpcUrl, SolanaRpcConnection}; use light_batched_merkle_tree::{ batch::BatchState, initialize_state_tree::InitStateTreeAccountsInstructionData, merkle_tree::BatchedMerkleTreeAccount, queue::BatchedQueueAccount, }; -use light_client::{ - rpc::{solana_rpc::SolanaRpcUrl, RpcConnection, SolanaRpcConnection}, - rpc_pool::SolanaRpcPool, -}; +use light_client::rpc::RpcConnection; use light_program_test::{indexer::TestIndexer, test_env::EnvAccounts}; use light_prover_client::gnark::helpers::LightValidatorConfig; use light_test_utils::e2e_test_env::{init_program_test_env, E2ETestEnv}; diff --git a/forester/tests/e2e_test.rs b/forester/tests/e2e_test.rs index 3c7e81c41f..dcb72e7f86 100644 --- a/forester/tests/e2e_test.rs +++ b/forester/tests/e2e_test.rs @@ -5,12 +5,9 @@ use account_compression::{ AddressMerkleTreeAccount, }; use forester::{queue_helpers::fetch_queue_item_data, run_pipeline, utils::get_protocol_config}; -use forester_utils::registry::register_test_forester; -use light_client::{ - indexer::{AddressMerkleTreeAccounts, StateMerkleTreeAccounts}, - rpc::{solana_rpc::SolanaRpcUrl, RpcConnection, RpcError, SolanaRpcConnection}, - rpc_pool::SolanaRpcPool, -}; +use forester_utils::{registry::register_test_forester, rpc_pool::{RpcPool, SolanaRpcPool}, solana_rpc::SolanaRpcUrl, SolanaRpcConnection}; +use light_client::indexer::{AddressMerkleTreeAccounts, StateMerkleTreeAccounts}; +use light_client::rpc::{RpcConnection, RpcError}; use light_program_test::{indexer::TestIndexer, test_env::EnvAccounts}; use light_prover_client::gnark::helpers::{ spawn_prover, LightValidatorConfig, ProverConfig, ProverMode, @@ -69,7 +66,7 @@ async fn test_epoch_monitor_with_test_indexer_and_1_forester() { .await .unwrap(); - let mut rpc = SolanaRpcConnection::new(SolanaRpcUrl::Localnet, None); + let mut rpc = SolanaRpcConnection::new_with_retry(SolanaRpcUrl::Localnet, None, None); rpc.payer = forester_keypair.insecure_clone(); rpc.airdrop_lamports(&forester_keypair.pubkey(), LAMPORTS_PER_SOL * 100_000) diff --git a/forester/tests/priority_fee_test.rs b/forester/tests/priority_fee_test.rs index 5591c64b3e..fd667dad48 100644 --- a/forester/tests/priority_fee_test.rs +++ b/forester/tests/priority_fee_test.rs @@ -3,7 +3,8 @@ use forester::{ send_transaction::{get_capped_priority_fee, request_priority_fee_estimate, CapConfig}, ForesterConfig, }; -use light_client::rpc::{RpcConnection, SolanaRpcConnection}; +use forester_utils::SolanaRpcConnection; +use light_client::rpc::RpcConnection; use solana_sdk::{commitment_config::CommitmentConfig, signature::Signer}; use url::Url; diff --git a/forester/tests/test_utils.rs b/forester/tests/test_utils.rs index 4cfbbc6ca5..97bb999d93 100644 --- a/forester/tests/test_utils.rs +++ b/forester/tests/test_utils.rs @@ -1,11 +1,11 @@ use account_compression::initialize_address_merkle_tree::Pubkey; use forester::{ config::{ExternalServicesConfig, GeneralConfig}, - metrics::register_metrics, photon_indexer::PhotonIndexer, telemetry::setup_telemetry, ForesterConfig, }; +use forester_utils::metrics::helpers::register_metrics; use light_client::{ indexer::{Indexer, IndexerError, NewAddressProofWithContext}, rpc::RpcConnection, diff --git a/program-tests/utils/src/lib.rs b/program-tests/utils/src/lib.rs index 0d561fd465..f233a0517e 100644 --- a/program-tests/utils/src/lib.rs +++ b/program-tests/utils/src/lib.rs @@ -43,7 +43,7 @@ pub use forester_utils::{ }; pub use light_client::{ rpc::{ - assert_rpc_error, solana_rpc::SolanaRpcUrl, RpcConnection, RpcError, SolanaRpcConnection, + assert_rpc_error, RpcConnection, RpcError, }, transaction_params::{FeeConfig, TransactionParams}, }; diff --git a/sdk-libs/client/src/lib.rs b/sdk-libs/client/src/lib.rs index f7cfda17be..ead4e4fd09 100644 --- a/sdk-libs/client/src/lib.rs +++ b/sdk-libs/client/src/lib.rs @@ -1,5 +1,4 @@ pub mod indexer; pub mod photon_rpc; pub mod rpc; -pub mod rpc_pool; pub mod transaction_params; diff --git a/sdk-libs/client/src/rpc/mod.rs b/sdk-libs/client/src/rpc/mod.rs index 60ef0920ac..df925d264b 100644 --- a/sdk-libs/client/src/rpc/mod.rs +++ b/sdk-libs/client/src/rpc/mod.rs @@ -1,8 +1,6 @@ pub mod errors; pub mod merkle_tree; pub mod rpc_connection; -pub mod solana_rpc; pub use errors::{assert_rpc_error, RpcError}; pub use rpc_connection::RpcConnection; -pub use solana_rpc::{RetryConfig, SolanaRpcConnection}; diff --git a/sdk-libs/client/src/rpc/rpc_connection.rs b/sdk-libs/client/src/rpc/rpc_connection.rs index 34cfd5d338..3a1c7a2ad7 100644 --- a/sdk-libs/client/src/rpc/rpc_connection.rs +++ b/sdk-libs/client/src/rpc/rpc_connection.rs @@ -51,7 +51,7 @@ pub trait RpcConnection: Send + Sync + Debug + 'static { transaction_params: Option, ) -> Result, RpcError> where - T: BorshDeserialize + Send + Debug; + T: BorshDeserialize + Send + Debug + 'static; async fn create_and_send_transaction<'a>( &'a mut self, @@ -75,7 +75,7 @@ pub trait RpcConnection: Send + Sync + Debug + 'static { async fn airdrop_lamports(&mut self, to: &Pubkey, lamports: u64) -> Result; - async fn get_anchor_account( + async fn get_anchor_account( &mut self, pubkey: &Pubkey, ) -> Result, RpcError> { diff --git a/sdk-libs/client/src/rpc_pool.rs b/sdk-libs/client/src/rpc_pool.rs deleted file mode 100644 index 23f2ed9152..0000000000 --- a/sdk-libs/client/src/rpc_pool.rs +++ /dev/null @@ -1,105 +0,0 @@ -use std::time::Duration; - -use async_trait::async_trait; -use bb8::{Pool, PooledConnection}; -use solana_sdk::commitment_config::CommitmentConfig; -use thiserror::Error; -use tokio::time::sleep; - -use crate::rpc::{RpcConnection, RpcError}; - -#[derive(Error, Debug)] -pub enum PoolError { - #[error("Failed to create RPC client: {0}")] - ClientCreation(String), - #[error("RPC request failed: {0}")] - RpcRequest(#[from] RpcError), - #[error("Pool error: {0}")] - Pool(String), -} - -pub struct SolanaConnectionManager { - url: String, - commitment: CommitmentConfig, - _phantom: std::marker::PhantomData, -} - -impl SolanaConnectionManager { - pub fn new(url: String, commitment: CommitmentConfig) -> Self { - Self { - url, - commitment, - _phantom: std::marker::PhantomData, - } - } -} - -#[async_trait] -impl bb8::ManageConnection for SolanaConnectionManager { - type Connection = R; - type Error = PoolError; - - async fn connect(&self) -> Result { - Ok(R::new(&self.url, Some(self.commitment))) - } - - async fn is_valid(&self, conn: &mut Self::Connection) -> Result<(), Self::Error> { - conn.health().await.map_err(PoolError::RpcRequest) - } - - fn has_broken(&self, _conn: &mut Self::Connection) -> bool { - false - } -} - -#[derive(Debug)] -pub struct SolanaRpcPool { - pool: Pool>, -} - -impl SolanaRpcPool { - pub async fn new( - url: String, - commitment: CommitmentConfig, - max_size: u32, - ) -> Result { - let manager = SolanaConnectionManager::new(url, commitment); - let pool = Pool::builder() - .max_size(max_size) - .connection_timeout(Duration::from_secs(15)) - .idle_timeout(Some(Duration::from_secs(60 * 5))) - .build(manager) - .await - .map_err(|e| PoolError::Pool(e.to_string()))?; - - Ok(Self { pool }) - } - - pub async fn get_connection( - &self, - ) -> Result>, PoolError> { - self.pool - .get() - .await - .map_err(|e| PoolError::Pool(e.to_string())) - } - - pub async fn get_connection_with_retry( - &self, - max_retries: u32, - delay: Duration, - ) -> Result>, PoolError> { - let mut retries = 0; - loop { - match self.pool.get().await { - Ok(conn) => return Ok(conn), - Err(e) if retries < max_retries => { - retries += 1; - eprintln!("Failed to get connection (attempt {}): {:?}", retries, e); - sleep(delay).await; - } - Err(e) => return Err(PoolError::Pool(e.to_string())), - } - } - } -} diff --git a/sdk-libs/program-test/src/indexer/test_indexer.rs b/sdk-libs/program-test/src/indexer/test_indexer.rs index 93c8a539af..b1f53d0c6f 100644 --- a/sdk-libs/program-test/src/indexer/test_indexer.rs +++ b/sdk-libs/program-test/src/indexer/test_indexer.rs @@ -80,9 +80,9 @@ use crate::{ }; #[derive(Debug)] -pub struct TestIndexer +pub struct TestIndexer where - R: RpcConnection + MerkleTreeExt, + C: RpcConnection + MerkleTreeExt, { pub state_merkle_trees: Vec, pub address_merkle_trees: Vec, @@ -94,13 +94,13 @@ where pub token_nullified_compressed_accounts: Vec, pub events: Vec, pub prover_config: Option, - phantom: PhantomData, + phantom: PhantomData, } #[async_trait] -impl Indexer for TestIndexer +impl Indexer for TestIndexer where - R: RpcConnection + MerkleTreeExt, + C: RpcConnection + MerkleTreeExt, { async fn get_queue_elements( &self, @@ -158,7 +158,7 @@ where state_merkle_tree_pubkeys: Option>, new_addresses: Option<&[[u8; 32]]>, address_merkle_tree_pubkeys: Option>, - rpc: &mut R, + rpc: &mut C, ) -> ProofRpcResult { if compressed_accounts.is_some() && ![1usize, 2usize, 3usize, 4usize, 8usize] @@ -429,9 +429,9 @@ where } #[async_trait] -impl TestIndexerExtensions for TestIndexer +impl TestIndexerExtensions for TestIndexer where - R: RpcConnection + MerkleTreeExt, + C: RpcConnection + MerkleTreeExt, { fn get_address_merkle_tree( &self, @@ -547,7 +547,7 @@ where state_merkle_tree_pubkeys: Option>, new_addresses: Option<&[[u8; 32]]>, address_merkle_tree_pubkeys: Option>, - rpc: &mut R, + rpc: &mut C, ) -> BatchedTreeProofRpcResult { let mut indices_to_remove = Vec::new(); @@ -745,7 +745,7 @@ where async fn update_test_indexer_after_append( &mut self, - rpc: &mut R, + rpc: &mut C, merkle_tree_pubkey: Pubkey, output_queue_pubkey: Pubkey, num_inserted_zkps: u64, @@ -820,7 +820,7 @@ where async fn update_test_indexer_after_nullification( &mut self, - rpc: &mut R, + rpc: &mut C, merkle_tree_pubkey: Pubkey, batch_index: usize, ) { @@ -859,7 +859,7 @@ where async fn finalize_batched_address_tree_update( &mut self, - rpc: &mut R, + rpc: &mut C, merkle_tree_pubkey: Pubkey, ) { let mut account = rpc.get_account(merkle_tree_pubkey).await.unwrap().unwrap(); @@ -895,9 +895,9 @@ where } } -impl TestIndexer +impl TestIndexer where - R: RpcConnection + MerkleTreeExt, + C: RpcConnection + MerkleTreeExt, { pub async fn init_from_env( payer: &Keypair, @@ -1024,7 +1024,7 @@ where async fn add_address_merkle_tree_v1( &mut self, - rpc: &mut R, + rpc: &mut C, merkle_tree_keypair: &Keypair, queue_keypair: &Keypair, owning_program_id: Option, @@ -1048,7 +1048,7 @@ where async fn add_address_merkle_tree_v2( &mut self, - rpc: &mut R, + rpc: &mut C, merkle_tree_keypair: &Keypair, queue_keypair: &Keypair, owning_program_id: Option, @@ -1077,7 +1077,7 @@ where pub async fn add_address_merkle_tree( &mut self, - rpc: &mut R, + rpc: &mut C, merkle_tree_keypair: &Keypair, queue_keypair: &Keypair, owning_program_id: Option, @@ -1110,7 +1110,7 @@ where #[allow(clippy::too_many_arguments)] pub async fn add_state_merkle_tree( &mut self, - rpc: &mut R, + rpc: &mut C, merkle_tree_keypair: &Keypair, queue_keypair: &Keypair, cpi_context_keypair: &Keypair, @@ -1184,7 +1184,7 @@ where &self, merkle_tree_pubkeys: &[Pubkey], accounts: &[[u8; 32]], - rpc: &mut R, + rpc: &mut C, ) -> ( Option, Option, @@ -1240,7 +1240,7 @@ where let (root_index, root) = if version == 1 { let fetched_merkle_tree = - get_concurrent_merkle_tree::( + get_concurrent_merkle_tree::( rpc, pubkey, ) .await; @@ -1295,7 +1295,7 @@ where &self, address_merkle_tree_pubkeys: &[Pubkey], addresses: &[[u8; 32]], - rpc: &mut R, + rpc: &mut C, ) -> ( Option, Option, @@ -1351,7 +1351,7 @@ where } else { let fetched_address_merkle_tree = get_indexed_merkle_tree::< AddressMerkleTreeAccount, - R, + C, Poseidon, usize, 26, diff --git a/sdk-libs/program-test/src/test_env.rs b/sdk-libs/program-test/src/test_env.rs index 42047c366d..1052f15013 100644 --- a/sdk-libs/program-test/src/test_env.rs +++ b/sdk-libs/program-test/src/test_env.rs @@ -18,9 +18,9 @@ use light_batched_merkle_tree::{ initialize_address_tree::InitAddressTreeAccountsInstructionData, initialize_state_tree::InitStateTreeAccountsInstructionData, }; -use light_client::rpc::{ - errors::RpcError, solana_rpc::SolanaRpcUrl, RpcConnection, SolanaRpcConnection, -}; +use light_client::rpc::{errors::RpcError, RpcConnection}; +use forester_utils::solana_rpc::{SolanaRpcUrl, SolanaRpcConnection}; + use light_registry::{ account_compression_cpi::sdk::get_registered_program_pda, protocol_config::state::ProtocolConfig,