From 19f20b051a67e0afdf60c939673acf5eec2c5066 Mon Sep 17 00:00:00 2001 From: Swenschaeferjohann Date: Tue, 14 Jan 2025 02:03:59 +0000 Subject: [PATCH 1/8] use elements from helius-sdk for send logic rm helius error rm unused code add get_priority_fee_estimate wip fmt, use forester_epoch_derivation pubkey 10_000 localhost prio fee response remove deadcode --- Cargo.lock | 58 ++++++ Cargo.toml | 1 + forester/Cargo.toml | 7 + forester/README.md | 23 ++- forester/src/helius_types.rs | 107 +++++++++++ forester/src/lib.rs | 2 + forester/src/send_transaction.rs | 217 ++++++++++++++++------ forester/src/smart_transaction.rs | 150 +++++++++++++++ forester/tests/priority_fee_test.rs | 103 ++++++++++ sdk-libs/client/src/rpc/rpc_connection.rs | 12 ++ sdk-libs/client/src/rpc/solana_rpc.rs | 27 ++- sdk-libs/program-test/Cargo.toml | 4 +- sdk-libs/program-test/src/test_rpc.rs | 20 ++ 13 files changed, 660 insertions(+), 71 deletions(-) create mode 100644 forester/src/helius_types.rs create mode 100644 forester/src/smart_transaction.rs create mode 100644 forester/tests/priority_fee_test.rs diff --git a/Cargo.lock b/Cargo.lock index 03f09499fe..b8424c21ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1668,6 +1668,12 @@ dependencies = [ "syn 2.0.96", ] +[[package]] +name = "dotenvy" +version = "0.15.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1aaf95b3e5c8f23aa320147307562d361db0ae0d51242340f558153b4eb2439b" + [[package]] name = "downcast" version = "0.11.0" @@ -1949,10 +1955,13 @@ dependencies = [ "anchor-lang", "anyhow", "async-trait", + "bb8", + "bincode", "borsh 0.10.3", "bs58 0.5.1", "clap 4.5.23", "dashmap 6.1.0", + "dotenvy", "env_logger 0.11.6", "forester-utils", "futures", @@ -1980,11 +1989,14 @@ dependencies = [ "solana-client", "solana-program", "solana-sdk", + "solana-transaction-status", "thiserror 1.0.64", "tokio", + "tokio-tungstenite 0.16.1", "tracing", "tracing-appender", "tracing-subscriber", + "url", "warp", ] @@ -3127,7 +3139,9 @@ dependencies = [ "reqwest", "solana-banks-client", "solana-program-test", + "solana-rpc-client-api", "solana-sdk", + "solana-transaction-status", "tokio", ] @@ -5029,6 +5043,19 @@ dependencies = [ "syn 2.0.96", ] +[[package]] +name = "sha-1" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99cd6713db3cf16b6c84e06321e049a9b9f699826e16096d23bbcc44d15d51a6" +dependencies = [ + "block-buffer 0.9.0", + "cfg-if", + "cpufeatures", + "digest 0.9.0", + "opaque-debug", +] + [[package]] name = "sha1" version = "0.10.6" @@ -7342,6 +7369,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e80b39df6afcc12cdf752398ade96a6b9e99c903dfdc36e53ad10b9c366bca72" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite 0.16.0", +] + [[package]] name = "tokio-tungstenite" version = "0.20.1" @@ -7546,6 +7585,25 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tungstenite" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ad3713a14ae247f22a728a0456a545df14acf3867f905adff84be99e23b3ad1" +dependencies = [ + "base64 0.13.1", + "byteorder", + "bytes", + "http 0.2.12", + "httparse", + "log", + "rand 0.8.5", + "sha-1", + "thiserror 1.0.64", + "url", + "utf-8", +] + [[package]] name = "tungstenite" version = "0.20.1" diff --git a/Cargo.toml b/Cargo.toml index 72413219bb..8bd22e17ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,6 +55,7 @@ solana-cli-output = "=1.18.22" solana-transaction-status = "=1.18.22" solana-account-decoder = "=1.18.22" solana-rpc = "=1.18.22" +solana-rpc-client-api = "=1.18.22" spl-token = "=4.0.0" spl-token-2022 = {version="3.0.5", no-default-features = true, features = ["no-entrypoint"]} diff --git a/forester/Cargo.toml b/forester/Cargo.toml index 5cf562daac..bce6dee24b 100644 --- a/forester/Cargo.toml +++ b/forester/Cargo.toml @@ -23,6 +23,12 @@ light-client = { workspace = true } light-merkle-tree-metadata = { workspace = true } light-sdk = { workspace = true } light-program-test = { workspace = true} +solana-transaction-status = { workspace = true } +bincode = "1.3" +url = "2.2" +tokio-tungstenite = "0.16" +bb8 = { workspace = true } + serde_json = "1.0" serde = { version = "1.0", features = ["derive"] } tokio = { version = "1", features = ["full"] } @@ -48,3 +54,4 @@ serial_test = "3.2.0" light-prover-client = { workspace = true } light-test-utils = { workspace = true } light-program-test = { workspace = true, features = ["devenv"] } +dotenvy = "0.15" \ No newline at end of file diff --git a/forester/README.md b/forester/README.md index b94d73c579..fcfd5ca51f 100644 --- a/forester/README.md +++ b/forester/README.md @@ -8,36 +8,35 @@ It subscribes to the nullifier queue and nullifies merkle tree leaves. ## Configuration Forester requires a configuration file, `forester.toml`, specifying necessary keys: + - `STATE_MERKLE_TREE_PUBKEY`: Address of the State Merkle tree. - `NULLIFIER_QUEUE_PUBKEY`: Address of the State Nullifier queue. - `ADDRESS_MERKLE_TREE_PUBKEY`: Address of the Address Merkle tree. - `ADDRESS_MERKLE_TREE_QUEUE_PUBKEY`: Address of the Address queue. - `REGISTRY_PUBKEY`: Address of the Registry program. +To setup your environment properly, copy `.env.example` to `.env` +and update the `FORESTER_PAYER` field with your appropriate key. -To setup your environment properly, copy `.env.example` to `.env` -and update the `FORESTER_PAYER` field with your appropriate key. - -Alternatively, if you prefer to use a terminal profile file, -add the key to your `~/.zshrc` (zsh) or `~/.bashrc` (bash) +Alternatively, if you prefer to use a terminal profile file, +add the key to your `~/.zshrc` (zsh) or `~/.bashrc` (bash) by including this line: `export FORESTER_PAYER=your_value_here`. -Substitute `your_value_here` with your actual key. +Substitute `your_value_here` with your actual key. Remember to restart your terminal or source your terminal profile for the changes to take effect. ## Usage 1. Run the service: -To subscribe to nullify the state merkle tree, use the following command: -`cargo run -- subscribe` + To subscribe to nullify the state merkle tree, use the following command: + `cargo run -- subscribe` 2. To manually nullify state merkle tree leaves, use the following command: -`cargo run -- nullify-state` + `cargo run -- nullify-state` 3. To manually nullify address merkle tree leaves, use the following command: -`cargo run -- nullify-addresses` -4. To manually nullify state *and* address merkle tree leaves, use the following command: + `cargo run -- nullify-addresses` +4. To manually nullify state _and_ address merkle tree leaves, use the following command: `cargo run -- nullify` - ## TODO 1. Add indexer URL to the configuration file. diff --git a/forester/src/helius_types.rs b/forester/src/helius_types.rs new file mode 100644 index 0000000000..a900c0e17a --- /dev/null +++ b/forester/src/helius_types.rs @@ -0,0 +1,107 @@ +use std::time::Duration; + +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct MicroLamportPriorityFeeLevels { + pub min: f64, + pub low: f64, + pub medium: f64, + pub high: f64, + #[serde(rename = "veryHigh")] + pub very_high: f64, + #[serde(rename = "unsafeMax")] + pub unsafe_max: f64, +} + +#[derive(Serialize, Deserialize, Debug)] +pub enum PriorityLevel { + Min, + Low, + Medium, + High, + VeryHigh, + UnsafeMax, + Default, +} + +#[derive(Serialize, Deserialize, Debug)] +pub enum UiTransactionEncoding { + Binary, + Base64, + Base58, + Json, + JsonParsed, +} +#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq)] +pub struct RpcRequest { + pub jsonrpc: String, + pub id: String, + pub method: String, + #[serde(rename = "params")] + pub parameters: T, +} + +impl RpcRequest { + pub fn new(method: String, parameters: T) -> Self { + Self { + jsonrpc: "2.0".to_string(), + id: "1".to_string(), + method, + parameters, + } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq)] +pub struct RpcResponse { + pub jsonrpc: String, + pub id: String, + pub result: T, +} + +#[derive(Serialize, Deserialize, Debug, Default)] +#[serde(rename_all = "camelCase")] +pub struct GetPriorityFeeEstimateOptions { + pub priority_level: Option, + pub include_all_priority_fee_levels: Option, + pub transaction_encoding: Option, + pub lookback_slots: Option, + pub recommended: Option, + pub include_vote: Option, +} + +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct GetPriorityFeeEstimateRequest { + #[serde(skip_serializing_if = "Option::is_none")] + pub transaction: Option, + #[serde(rename = "accountKeys", skip_serializing_if = "Option::is_none")] + pub account_keys: Option>, + pub options: Option, +} + +#[derive(Serialize, Deserialize, Debug, Default)] +pub struct GetPriorityFeeEstimateResponse { + #[serde(rename = "priorityFeeEstimate")] + pub priority_fee_estimate: Option, + // #[serde(rename = "priorityFeeLevels")] + // pub priority_fee_levels: Option, +} + +pub struct Timeout { + pub duration: Duration, +} + +impl Default for Timeout { + fn default() -> Self { + Self { + duration: Duration::from_secs(60), + } + } +} + +impl From for Duration { + fn from(val: Timeout) -> Self { + val.duration + } +} diff --git a/forester/src/lib.rs b/forester/src/lib.rs index 64e9f5fdfe..81f3bb9585 100644 --- a/forester/src/lib.rs +++ b/forester/src/lib.rs @@ -6,6 +6,7 @@ pub mod config; pub mod epoch_manager; pub mod errors; pub mod forester_status; +pub mod helius_types; mod indexer_type; pub mod metrics; pub mod pagerduty; @@ -15,6 +16,7 @@ pub mod queue_helpers; pub mod rollover; pub mod send_transaction; mod slot_tracker; +pub mod smart_transaction; pub mod telemetry; pub mod tree_data_sync; pub mod tree_finder; diff --git a/forester/src/send_transaction.rs b/forester/src/send_transaction.rs index ceee29010e..1960db89cc 100644 --- a/forester/src/send_transaction.rs +++ b/forester/src/send_transaction.rs @@ -12,12 +12,16 @@ use light_client::{ rpc::{RetryConfig, RpcConnection}, rpc_pool::SolanaRpcPool, }; -use light_registry::account_compression_cpi::sdk::{ - create_nullify_instruction, create_update_address_merkle_tree_instruction, - CreateNullifyInstructionInputs, UpdateAddressMerkleTreeInstructionInputs, +use light_registry::{ + account_compression_cpi::sdk::{ + create_nullify_instruction, create_update_address_merkle_tree_instruction, + CreateNullifyInstructionInputs, UpdateAddressMerkleTreeInstructionInputs, + }, + utils::get_forester_epoch_pda_from_authority, }; +use solana_client::rpc_config::RpcSendTransactionConfig; use solana_sdk::{ - compute_budget::ComputeBudgetInstruction, + bs58, hash::Hash, instruction::Instruction, pubkey::Pubkey, @@ -30,25 +34,36 @@ use tokio::{ time::{sleep, Instant}, }; use tracing::{debug, warn}; +use url::Url; use crate::{ config::QueueConfig, epoch_manager::{MerkleProofType, WorkItem}, errors::ForesterError, + helius_types::{ + GetPriorityFeeEstimateOptions, GetPriorityFeeEstimateRequest, + GetPriorityFeeEstimateResponse, RpcRequest, RpcResponse, + }, queue_helpers::fetch_queue_item_data, + smart_transaction::{ + create_smart_transaction, send_and_confirm_transaction, CreateSmartTransactionConfig, + }, Result, }; - #[async_trait] +#[allow(clippy::too_many_arguments)] pub trait TransactionBuilder { + fn epoch(&self) -> u64; async fn build_signed_transaction_batch( &self, payer: &Keypair, derivation: &Pubkey, recent_blockhash: &Hash, + last_valid_block_height: u64, + priority_fee: u64, work_items: &[WorkItem], config: BuildTransactionBatchConfig, - ) -> Result>; + ) -> Result<(Vec, u64)>; } // We're assuming that: @@ -60,6 +75,11 @@ const LATENCY: Duration = Duration::from_millis(4 * 500); const TIMEOUT_CHECK_ENABLED: bool = true; +/// Calculate the compute unit price in microLamports based on the target lamports and compute units +pub fn calculate_compute_unit_price(target_lamports: u64, compute_units: u64) -> u64 { + ((target_lamports * 1_000_000) as f64 / compute_units as f64).ceil() as u64 +} + /// Setting: /// 1. We have 1 light slot 15 seconds and a lot of elements in the queue /// 2. we want to send as many elements from the queue as possible @@ -97,6 +117,7 @@ pub async fn send_batched_transactions( let start_time = Instant::now(); let mut rpc = pool.get_connection().await?; + let mut num_batches = 0; let mut num_sent_transactions: usize = 0; // 1. Execute batches until max number of batches is reached or light slot @@ -143,11 +164,34 @@ pub async fn send_batched_transactions( continue; } + // TODO: note that fresh blockhash has higher chance of landing. consider doing per batch. // 4. Fetch recent blockhash. // A recent blockhash is valid for 2 mins we only need one per batch. We // use a new one per batch in case that we want to retry these same // transactions and identical transactions might be dropped. let recent_blockhash = rpc.get_latest_blockhash().await?; + let current_block_height = rpc.get_block_height().await?; + let last_valid_block_height = current_block_height + 150; + + let forester_epoch_pda_pubkey = + get_forester_epoch_pda_from_authority(derivation, transaction_builder.epoch()).0; + // Get the priority fee estimate based on write locked accounts + let account_keys = vec![ + payer.pubkey(), + forester_epoch_pda_pubkey, + tree_accounts.queue, + tree_accounts.merkle_tree, + ]; + + let url = Url::parse(&rpc.get_url()).expect("Failed to parse URL"); + println!("URL HOST_STR: {:?}", url.host_str()); + let priority_fee_recommendation = request_priority_fee_estimate(&url, account_keys).await?; + println!( + "Priority fee recommendation: {:?}", + priority_fee_recommendation + ); + let priority_fee = get_capped_priority_fee(priority_fee_recommendation); + // 5. Iterate over work items in chunks of batch size. for work_items in work_items.chunks(config.build_transaction_batch_config.batch_size as usize) @@ -175,11 +219,13 @@ pub async fn send_batched_transactions( // Minimum time to wait for the next batch of transactions. // Can be used to avoid rate limits. let transaction_build_time_start = Instant::now(); - let transactions: Vec = transaction_builder + let (transactions, _block_height) = transaction_builder .build_signed_transaction_batch( payer, derivation, &recent_blockhash, + last_valid_block_height, + priority_fee, work_items, config.build_transaction_batch_config, ) @@ -202,32 +248,41 @@ pub async fn send_batched_transactions( } } // Asynchronously send all transactions in the batch - let pool_clone = Arc::clone(&pool); - let send_futures = transactions.into_iter().map(move |tx| { - let pool_clone = Arc::clone(&pool_clone); - tokio::spawn(async move { - match pool_clone.get_connection().await { - Ok(mut rpc) => { - let result = rpc.process_transaction(tx).await; - println!("tx result: {:?}", result); - result + let send_futures: Vec<_> = transactions + .into_iter() + .map(|tx| { + let pool_clone = Arc::clone(&pool); + async move { + match pool_clone.get_connection().await { + Ok(mut rpc) => { + send_and_confirm_transaction( + &mut rpc, + &tx, + RpcSendTransactionConfig::default(), + last_valid_block_height, + Some(Duration::from_secs(60)), + ) + .await + } + Err(e) => Err(light_client::rpc::RpcError::CustomError(format!( + "Failed to get RPC connection: {}", + e + ))), } - Err(e) => Err(light_client::rpc::RpcError::CustomError(format!( - "Failed to get RPC connection: {}", - e - ))), } }) - }); + .collect(); let results = join_all(send_futures).await; // Process results for result in results { match result { - Ok(Ok(_)) => num_sent_transactions += 1, - Ok(Err(e)) => warn!("Transaction failed: {:?}", e), - Err(e) => warn!("Task failed: {:?}", e), + Ok(signature) => { + num_sent_transactions += 1; + println!("Transaction sent: {:?}", signature); + } + Err(e) => warn!("Transaction failed: {:?}", e), } } @@ -279,14 +334,20 @@ pub struct EpochManagerTransactions> { #[async_trait] impl> TransactionBuilder for EpochManagerTransactions { + fn epoch(&self) -> u64 { + self.epoch + } + async fn build_signed_transaction_batch( &self, payer: &Keypair, derivation: &Pubkey, recent_blockhash: &Hash, + last_valid_block_height: u64, + priority_fee: u64, work_items: &[WorkItem], config: BuildTransactionBatchConfig, - ) -> Result> { + ) -> Result<(Vec, u64)> { let mut transactions = vec![]; let (_, all_instructions) = fetch_proofs_and_create_instructions( payer.pubkey(), @@ -296,42 +357,22 @@ impl> TransactionBuilder for EpochManagerTransac work_items, ) .await?; + for instruction in all_instructions { - let transaction = build_signed_transaction( - payer, - recent_blockhash, - config.compute_unit_price, - config.compute_unit_limit, - instruction, - ) - .await; + let (transaction, _) = create_smart_transaction(CreateSmartTransactionConfig { + payer: payer.insecure_clone(), + instructions: vec![instruction], + recent_blockhash: *recent_blockhash, + compute_unit_price: config.compute_unit_price, + compute_unit_limit: config.compute_unit_limit, + last_valid_block_hash: last_valid_block_height, + priority_fee, + }) + .await?; transactions.push(transaction); } - Ok(transactions) - } -} - -async fn build_signed_transaction( - payer: &Keypair, - recent_blockhash: &Hash, - compute_unit_price: Option, - compute_unit_limit: Option, - instruction: Instruction, -) -> Transaction { - let mut instructions: Vec = if let Some(price) = compute_unit_price { - vec![ComputeBudgetInstruction::set_compute_unit_price(price)] - } else { - vec![] - }; - if let Some(limit) = compute_unit_limit { - instructions.push(ComputeBudgetInstruction::set_compute_unit_limit(limit)); + Ok((transactions, last_valid_block_height)) } - instructions.push(instruction); - - let mut transaction = - Transaction::new_with_payer(instructions.as_slice(), Some(&payer.pubkey())); - transaction.sign(&[payer], *recent_blockhash); - transaction } /// Work items should be of only one type and tree @@ -453,3 +494,65 @@ pub async fn fetch_proofs_and_create_instructions) -> Result { + if url.host_str() == Some("localhost") { + return Ok(10_000); + } + + let priority_fee_request = GetPriorityFeeEstimateRequest { + transaction: None, + account_keys: Some( + account_keys + .iter() + .map(|pubkey| bs58::encode(pubkey).into_string()) + .collect(), + ), + options: Some(GetPriorityFeeEstimateOptions { + include_all_priority_fee_levels: None, + recommended: Some(true), + include_vote: None, + lookback_slots: None, + priority_level: None, + transaction_encoding: None, + }), + }; + + let rpc_request = RpcRequest::new( + "getPriorityFeeEstimate".to_string(), + serde_json::json!({ + "get_priority_fee_estimate_request": priority_fee_request + }), + ); + + let client = reqwest::Client::new(); + let response = client + .post(url.clone()) + .header("Content-Type", "application/json") + .json(&rpc_request) + .send() + .await?; + + let response_text = response.text().await?; + + let response: RpcResponse = + serde_json::from_str(&response_text)?; + + response + .result + .priority_fee_estimate + .map(|estimate| estimate as u64) + .ok_or( + ForesterError::General { + error: "Priority fee estimate not available".to_string(), + } + .into(), + ) +} + +/// Get capped priority fee for transaction +pub fn get_capped_priority_fee(priority_fee_recommendation: u64) -> u64 { + let priority_fee_cap = calculate_compute_unit_price(10_000, 170_000); + std::cmp::min(priority_fee_recommendation, priority_fee_cap) +} diff --git a/forester/src/smart_transaction.rs b/forester/src/smart_transaction.rs new file mode 100644 index 0000000000..930e24ec1f --- /dev/null +++ b/forester/src/smart_transaction.rs @@ -0,0 +1,150 @@ +// from helius-sdk. adjusted for use in forester +use std::time::{Duration, Instant}; + +use light_client::{rpc::RpcConnection, rpc_pool::SolanaConnectionManager}; +use solana_client::rpc_config::RpcSendTransactionConfig; +use solana_sdk::{ + compute_budget::ComputeBudgetInstruction, + hash::Hash, + instruction::Instruction, + pubkey::Pubkey, + signature::{Signature, Signer}, + signer::keypair::Keypair, + transaction::Transaction, +}; +use solana_transaction_status::TransactionConfirmationStatus; +use tokio::time::sleep; + +pub struct CreateSmartTransactionConfig { + pub payer: Keypair, + pub recent_blockhash: Hash, + pub compute_unit_price: Option, + pub compute_unit_limit: Option, + pub instructions: Vec, + pub last_valid_block_hash: u64, + pub priority_fee: u64, +} + +/// Poll a transaction to check whether it has been confirmed +/// +/// * `txt-sig` - The transaction signature to check +/// +/// # Returns +/// The confirmed transaction signature or an error if the confirmation times out +pub async fn poll_transaction_confirmation<'a, R: RpcConnection>( + connection: &mut bb8::PooledConnection<'a, SolanaConnectionManager>, + txt_sig: Signature, +) -> Result { + // 15 second timeout + let timeout: Duration = Duration::from_secs(15); + // 5 second retry interval + let interval: Duration = Duration::from_secs(5); + let start: Instant = Instant::now(); + + loop { + if start.elapsed() >= timeout { + return Err(light_client::rpc::RpcError::CustomError(format!( + "Transaction {}'s confirmation timed out", + txt_sig + ))); + } + + let status: Vec> = + connection.get_signature_statuses(&[txt_sig]).await?; + + match status[0].clone() { + Some(status) => { + if status.err.is_none() + && (status.confirmation_status + == Some(TransactionConfirmationStatus::Confirmed) + || status.confirmation_status + == Some(TransactionConfirmationStatus::Finalized)) + { + return Ok(txt_sig); + } + if status.err.is_some() { + return Err(light_client::rpc::RpcError::CustomError(format!( + "Transaction {}'s confirmation failed", + txt_sig + ))); + } + } + None => { + sleep(interval).await; + } + } + } +} + +/// Sends a transaction and handles its confirmation status +/// +/// # Arguments +/// * `transaction` - The transaction to be sent, which implements `SerializableTransaction` +/// * `send_transaction_config` - Configuration options for sending the transaction +/// * `last_valid_block_height` - The last block height at which the transaction is valid +/// * `timeout` - Optional duration for polling transaction confirmation, defaults to 60 seconds +/// +/// # Returns +/// The transaction signature, if successful +pub async fn send_and_confirm_transaction<'a, R: RpcConnection>( + connection: &mut bb8::PooledConnection<'a, SolanaConnectionManager>, + transaction: &Transaction, + send_transaction_config: RpcSendTransactionConfig, + last_valid_block_height: u64, + timeout: Option, +) -> Result { + // Retry logic with a timeout + let timeout: Duration = timeout.unwrap_or(Duration::from_secs(60)); + let start_time: Instant = Instant::now(); + + while Instant::now().duration_since(start_time) < timeout + || connection.get_slot().await? <= last_valid_block_height + { + let result = connection.send_transaction_with_config(transaction, send_transaction_config); + + match result.await { + Ok(signature) => { + // Poll for transaction confirmation + match poll_transaction_confirmation(connection, signature).await { + Ok(sig) => return Ok(sig), + // Retry on polling failure + Err(_) => continue, + } + } + // Retry on send failure + Err(_) => continue, + } + } + + Err(light_client::rpc::RpcError::CustomError( + "Transaction failed to confirm in 60s.".to_string(), + )) +} + +/// Creates an optimized transaction based on the provided configuration +/// +/// # Arguments +/// * `config` - The configuration for the smart transaction, which includes the transaction's instructions, signers, and lookup tables, depending on +/// whether it's a legacy or versioned smart transaction. The transaction's send configuration can also be changed, if provided +/// +/// # Returns +/// An optimized `Transaction` and the `last_valid_block_height` +pub async fn create_smart_transaction( + config: CreateSmartTransactionConfig, +) -> Result<(Transaction, u64), light_client::rpc::RpcError> { + let payer_pubkey: Pubkey = config.payer.pubkey(); + let mut final_instructions: Vec = if let Some(price) = config.compute_unit_price { + vec![ComputeBudgetInstruction::set_compute_unit_price(price)] + } else { + vec![] + }; + if let Some(limit) = config.compute_unit_limit { + final_instructions.push(ComputeBudgetInstruction::set_compute_unit_limit(limit)); + } + final_instructions.extend(config.instructions); + + let mut tx = Transaction::new_with_payer(&final_instructions, Some(&payer_pubkey)); + tx.sign(&[&config.payer], config.recent_blockhash); + + Ok((tx, config.last_valid_block_hash)) +} diff --git a/forester/tests/priority_fee_test.rs b/forester/tests/priority_fee_test.rs new file mode 100644 index 0000000000..981c35dff6 --- /dev/null +++ b/forester/tests/priority_fee_test.rs @@ -0,0 +1,103 @@ +use forester::{ + cli::StartArgs, + send_transaction::{ + calculate_compute_unit_price, get_capped_priority_fee, request_priority_fee_estimate, + }, + ForesterConfig, +}; +use light_client::rpc::{RpcConnection, SolanaRpcConnection}; +use solana_sdk::{commitment_config::CommitmentConfig, signature::Signer}; +use url::Url; + +use crate::test_utils::init; + +mod test_utils; + +#[tokio::test] +async fn test_priority_fee_request() { + dotenvy::dotenv().ok(); + + init(None).await; + + let args = StartArgs { + rpc_url: Some( + std::env::var("FORESTER_RPC_URL").expect("FORESTER_RPC_URL must be set in environment"), + ), + push_gateway_url: None, + pagerduty_routing_key: None, + ws_rpc_url: Some( + std::env::var("FORESTER_WS_RPC_URL") + .expect("FORESTER_WS_RPC_URL must be set in environment"), + ), + indexer_url: Some( + std::env::var("FORESTER_INDEXER_URL") + .expect("FORESTER_INDEXER_URL must be set in environment"), + ), + prover_url: Some( + std::env::var("FORESTER_PROVER_URL") + .expect("FORESTER_PROVER_URL must be set in environment"), + ), + payer: Some( + std::env::var("FORESTER_PAYER").expect("FORESTER_PAYER must be set in environment"), + ), + derivation: Some( + std::env::var("FORESTER_DERIVATION_PUBKEY") + .expect("FORESTER_DERIVATION_PUBKEY must be set in environment"), + ), + photon_api_key: Some( + std::env::var("PHOTON_API_KEY").expect("PHOTON_API_KEY must be set in environment"), + ), + indexer_batch_size: 50, + indexer_max_concurrent_batches: 10, + transaction_batch_size: 1, + transaction_max_concurrent_batches: 20, + cu_limit: 1_000_000, + rpc_pool_size: 20, + slot_update_interval_seconds: 10, + tree_discovery_interval_seconds: 5, + max_retries: 3, + retry_delay: 1000, + retry_timeout: 30000, + state_queue_start_index: 0, + state_queue_processing_length: 28807, + address_queue_start_index: 0, + address_queue_processing_length: 28807, + }; + + let config = ForesterConfig::new_for_start(&args).expect("Failed to create config"); + + // Setup RPC connection using config + let mut rpc = SolanaRpcConnection::new( + config.external_services.rpc_url, + Some(CommitmentConfig::confirmed()), + ); + rpc.payer = config.payer_keypair.insecure_clone(); + + let account_keys = vec![config.payer_keypair.pubkey()]; + + let url = Url::parse(&rpc.get_url()).expect("Failed to parse URL"); + println!("URL: {}", url); + let priority_fee = request_priority_fee_estimate(&url, account_keys) + .await + .unwrap(); + + println!("Priority fee: {:?}", priority_fee); + assert!(priority_fee > 0, "Priority fee should be greater than 0"); +} + +#[test] +fn test_capped_priority_fee() { + let test_cases = vec![ + (1000, 1000), // Below cap + (1_000_000, calculate_compute_unit_price(10_000, 170_000)), // Above cap + ]; + + for (input, expected) in test_cases { + let result = get_capped_priority_fee(input); + assert_eq!( + result, expected, + "Priority fee capping failed for input {}", + input + ); + } +} diff --git a/sdk-libs/client/src/rpc/rpc_connection.rs b/sdk-libs/client/src/rpc/rpc_connection.rs index 08854d230d..34cfd5d338 100644 --- a/sdk-libs/client/src/rpc/rpc_connection.rs +++ b/sdk-libs/client/src/rpc/rpc_connection.rs @@ -2,6 +2,7 @@ use std::fmt::Debug; use async_trait::async_trait; use borsh::BorshDeserialize; +use solana_client::rpc_config::RpcSendTransactionConfig; use solana_program::{clock::Slot, instruction::Instruction}; use solana_sdk::{ account::{Account, AccountSharedData}, @@ -12,6 +13,7 @@ use solana_sdk::{ signature::{Keypair, Signature}, transaction::Transaction, }; +use solana_transaction_status::TransactionStatus; use crate::{rpc::errors::RpcError, transaction_params::TransactionParams}; @@ -91,5 +93,15 @@ pub trait RpcConnection: Send + Sync + Debug + 'static { async fn get_slot(&mut self) -> Result; async fn warp_to_slot(&mut self, slot: Slot) -> Result<(), RpcError>; async fn send_transaction(&self, transaction: &Transaction) -> Result; + async fn send_transaction_with_config( + &self, + transaction: &Transaction, + config: RpcSendTransactionConfig, + ) -> Result; async fn get_transaction_slot(&mut self, signature: &Signature) -> Result; + async fn get_signature_statuses( + &self, + signatures: &[Signature], + ) -> Result>, RpcError>; + async fn get_block_height(&mut self) -> Result; } diff --git a/sdk-libs/client/src/rpc/solana_rpc.rs b/sdk-libs/client/src/rpc/solana_rpc.rs index 2c3cc6b632..95b799ee7b 100644 --- a/sdk-libs/client/src/rpc/solana_rpc.rs +++ b/sdk-libs/client/src/rpc/solana_rpc.rs @@ -22,7 +22,7 @@ use solana_sdk::{ transaction::Transaction, }; use solana_transaction_status::{ - option_serializer::OptionSerializer, UiInstruction, UiTransactionEncoding, + option_serializer::OptionSerializer, TransactionStatus, UiInstruction, UiTransactionEncoding, }; use tokio::time::{sleep, Instant}; @@ -343,6 +343,15 @@ impl RpcConnection for SolanaRpcConnection { let result = parsed_event.map(|e| (e, signature, slot)); Ok(result) } + async fn get_signature_statuses( + &self, + signatures: &[Signature], + ) -> Result>, RpcError> { + self.client + .get_signature_statuses(signatures) + .map(|response| response.value) + .map_err(RpcError::from) + } async fn confirm_transaction(&self, signature: Signature) -> Result { self.retry(|| async { @@ -443,6 +452,18 @@ impl RpcConnection for SolanaRpcConnection { }) .await } + async fn send_transaction_with_config( + &self, + transaction: &Transaction, + config: RpcSendTransactionConfig, + ) -> Result { + self.retry(|| async { + self.client + .send_transaction_with_config(transaction, config) + .map_err(RpcError::from) + }) + .await + } async fn get_transaction_slot(&mut self, signature: &Signature) -> Result { self.retry(|| async { @@ -461,6 +482,10 @@ impl RpcConnection for SolanaRpcConnection { }) .await } + async fn get_block_height(&mut self) -> Result { + self.retry(|| async { self.client.get_block_height().map_err(RpcError::from) }) + .await + } } impl MerkleTreeExt for SolanaRpcConnection {} diff --git a/sdk-libs/program-test/Cargo.toml b/sdk-libs/program-test/Cargo.toml index a9ec2ab938..10cdcc4f1b 100644 --- a/sdk-libs/program-test/Cargo.toml +++ b/sdk-libs/program-test/Cargo.toml @@ -33,4 +33,6 @@ num-traits = { workspace = true } reqwest = { workspace = true } anchor-lang = { workspace = true } light-verifier = { workspace = true } -light-batched-merkle-tree = { workspace = true } \ No newline at end of file +light-batched-merkle-tree = { workspace = true } +solana-transaction-status = { workspace = true } +solana-rpc-client-api = { workspace = true } diff --git a/sdk-libs/program-test/src/test_rpc.rs b/sdk-libs/program-test/src/test_rpc.rs index 8a9bc2d6ab..3864f46d08 100644 --- a/sdk-libs/program-test/src/test_rpc.rs +++ b/sdk-libs/program-test/src/test_rpc.rs @@ -8,6 +8,7 @@ use light_client::{ }; use solana_banks_client::BanksClientError; use solana_program_test::ProgramTestContext; +use solana_rpc_client_api::config::RpcSendTransactionConfig; use solana_sdk::{ account::{Account, AccountSharedData}, clock::Slot, @@ -20,6 +21,7 @@ use solana_sdk::{ system_instruction, transaction::{Transaction, TransactionError}, }; +use solana_transaction_status::TransactionStatus; pub struct ProgramTestRpcConnection { pub context: ProgramTestContext, @@ -308,6 +310,14 @@ impl RpcConnection for ProgramTestRpcConnection { unimplemented!("send transaction is unimplemented for ProgramTestRpcConnection") } + async fn send_transaction_with_config( + &self, + _transaction: &Transaction, + _config: RpcSendTransactionConfig, + ) -> Result { + unimplemented!("send transaction with config is unimplemented for ProgramTestRpcConnection") + } + async fn get_transaction_slot(&mut self, signature: &Signature) -> Result { self.context .banks_client @@ -322,6 +332,16 @@ impl RpcConnection for ProgramTestRpcConnection { .map(|status| status.slot) }) } + async fn get_signature_statuses( + &self, + _signatures: &[Signature], + ) -> Result>, RpcError> { + unimplemented!("get_signature_statuses is unimplemented for ProgramTestRpcConnection") + } + + async fn get_block_height(&mut self) -> Result { + unimplemented!("get_block_height is unimplemented for ProgramTestRpcConnection") + } } impl MerkleTreeExt for ProgramTestRpcConnection {} From ec33cee6e69c5162d1eb7102f01e72d4a4990bec Mon Sep 17 00:00:00 2001 From: Swenschaeferjohann Date: Thu, 16 Jan 2025 16:11:09 +0000 Subject: [PATCH 2/8] add debug stack trace to forester tests --- .github/workflows/forester-tests.yml | 36 ++++++++++++++++++++++------ forester/src/smart_transaction.rs | 2 +- 2 files changed, 30 insertions(+), 8 deletions(-) diff --git a/.github/workflows/forester-tests.yml b/.github/workflows/forester-tests.yml index 96c301a1f1..efbd2ea303 100644 --- a/.github/workflows/forester-tests.yml +++ b/.github/workflows/forester-tests.yml @@ -22,18 +22,40 @@ concurrency: cancel-in-progress: true env: + RUST_BACKTRACE: "1" RUSTFLAGS: "--cfg tokio_unstable -D warnings" jobs: test: strategy: matrix: - test-name: [ - {name: "address-batched", command: "test_address_batched", timeout: 60, needs-test-program: true}, - {name: "state-batched", command: "test_state_batched", timeout: 60, needs-test-program: false}, - {name: "2-foresters", command: "test_epoch_monitor_with_2_foresters", timeout: 60, needs-test-program: false}, - {name: "double-registration", command: "test_epoch_double_registration", timeout: 60, needs-test-program: false} - ] + test-name: + [ + { + name: "address-batched", + command: "test_address_batched", + timeout: 60, + needs-test-program: true, + }, + { + name: "state-batched", + command: "test_state_batched", + timeout: 60, + needs-test-program: false, + }, + { + name: "2-foresters", + command: "test_epoch_monitor_with_2_foresters", + timeout: 60, + needs-test-program: false, + }, + { + name: "double-registration", + command: "test_epoch_double_registration", + timeout: 60, + needs-test-program: false, + }, + ] name: test-${{ matrix.test-name.name }} runs-on: ubuntu-latest timeout-minutes: ${{ matrix.test-name.timeout }} @@ -63,4 +85,4 @@ jobs: - name: Run ${{ matrix.test-name.name }} tests run: | source ./scripts/devenv.sh - cargo test --package forester ${{ matrix.test-name.command }} -- --nocapture \ No newline at end of file + cargo test --package forester ${{ matrix.test-name.command }} -- --nocapture diff --git a/forester/src/smart_transaction.rs b/forester/src/smart_transaction.rs index 930e24ec1f..c78e7148c5 100644 --- a/forester/src/smart_transaction.rs +++ b/forester/src/smart_transaction.rs @@ -98,7 +98,7 @@ pub async fn send_and_confirm_transaction<'a, R: RpcConnection>( let start_time: Instant = Instant::now(); while Instant::now().duration_since(start_time) < timeout - || connection.get_slot().await? <= last_valid_block_height + && connection.get_slot().await? <= last_valid_block_height { let result = connection.send_transaction_with_config(transaction, send_transaction_config); From da625b3c0cbef485267fdbac6d30f8598e2c8bc3 Mon Sep 17 00:00:00 2001 From: Swenschaeferjohann Date: Thu, 16 Jan 2025 19:14:50 +0000 Subject: [PATCH 3/8] add light slot timeout, optimze configs --- forester/src/epoch_manager.rs | 1 + forester/src/send_transaction.rs | 45 +++++++++---------- forester/src/smart_transaction.rs | 6 +-- .../registry/src/protocol_config/state.rs | 2 +- sdk-libs/client/src/rpc/solana_rpc.rs | 13 +++++- 5 files changed, 36 insertions(+), 31 deletions(-) diff --git a/forester/src/epoch_manager.rs b/forester/src/epoch_manager.rs index 067a01f041..8575b1d40b 100644 --- a/forester/src/epoch_manager.rs +++ b/forester/src/epoch_manager.rs @@ -851,6 +851,7 @@ impl + IndexerType> EpochManager { ) .await?; + // light slot length in s let light_slot_timeout = { let slot_length_u32 = u32::try_from(epoch_pda.protocol_config.slot_length) .map_err(|_| ConfigurationError::SlotLengthOverflow { diff --git a/forester/src/send_transaction.rs b/forester/src/send_transaction.rs index 1960db89cc..916e576f41 100644 --- a/forester/src/send_transaction.rs +++ b/forester/src/send_transaction.rs @@ -22,6 +22,7 @@ use light_registry::{ use solana_client::rpc_config::RpcSendTransactionConfig; use solana_sdk::{ bs58, + commitment_config::CommitmentLevel, hash::Hash, instruction::Instruction, pubkey::Pubkey, @@ -184,12 +185,8 @@ pub async fn send_batched_transactions( ]; let url = Url::parse(&rpc.get_url()).expect("Failed to parse URL"); - println!("URL HOST_STR: {:?}", url.host_str()); let priority_fee_recommendation = request_priority_fee_estimate(&url, account_keys).await?; - println!( - "Priority fee recommendation: {:?}", - priority_fee_recommendation - ); + let priority_fee = get_capped_priority_fee(priority_fee_recommendation); // 5. Iterate over work items in chunks of batch size. @@ -198,18 +195,8 @@ pub async fn send_batched_transactions( { // 6. Check if we reached the end of the light slot. if TIMEOUT_CHECK_ENABLED { - let remaining_time = match config - .retry_config - .timeout - .checked_sub(start_time.elapsed()) - { - Some(time) => time, - None => { - debug!("Reached end of light slot"); - break; - } - }; - + let remaining_time = + get_remaining_time_in_light_slot(start_time, config.retry_config.timeout); if remaining_time < LATENCY { debug!("Reached end of light slot"); break; @@ -237,16 +224,22 @@ pub async fn send_batched_transactions( let batch_start = Instant::now(); if TIMEOUT_CHECK_ENABLED { - let remaining_time = config - .retry_config - .timeout - .saturating_sub(start_time.elapsed()); - + let remaining_time = + get_remaining_time_in_light_slot(start_time, config.retry_config.timeout); if remaining_time < LATENCY { debug!("Reached end of light slot"); break; } } + + let send_transaction_config = RpcSendTransactionConfig { + // Required for routing through staked connection, see + // https://docs.helius.dev/guides/sending-transactions-on-solana + skip_preflight: true, + max_retries: Some(0), + preflight_commitment: Some(CommitmentLevel::Confirmed), + ..Default::default() + }; // Asynchronously send all transactions in the batch let send_futures: Vec<_> = transactions .into_iter() @@ -258,9 +251,9 @@ pub async fn send_batched_transactions( send_and_confirm_transaction( &mut rpc, &tx, - RpcSendTransactionConfig::default(), + send_transaction_config, last_valid_block_height, - Some(Duration::from_secs(60)), + config.retry_config.timeout, ) .await } @@ -310,6 +303,10 @@ pub async fn send_batched_transactions( Ok(num_sent_transactions) } +fn get_remaining_time_in_light_slot(start_time: Instant, timeout: Duration) -> Duration { + timeout.saturating_sub(start_time.elapsed()) +} + #[derive(Debug, Clone, Copy)] pub struct SendBatchedTransactionsConfig { pub num_batches: u64, diff --git a/forester/src/smart_transaction.rs b/forester/src/smart_transaction.rs index c78e7148c5..f1a14e901c 100644 --- a/forester/src/smart_transaction.rs +++ b/forester/src/smart_transaction.rs @@ -82,7 +82,7 @@ pub async fn poll_transaction_confirmation<'a, R: RpcConnection>( /// * `transaction` - The transaction to be sent, which implements `SerializableTransaction` /// * `send_transaction_config` - Configuration options for sending the transaction /// * `last_valid_block_height` - The last block height at which the transaction is valid -/// * `timeout` - Optional duration for polling transaction confirmation, defaults to 60 seconds +/// * `timeout` - Duration for polling transaction confirmation /// /// # Returns /// The transaction signature, if successful @@ -91,10 +91,8 @@ pub async fn send_and_confirm_transaction<'a, R: RpcConnection>( transaction: &Transaction, send_transaction_config: RpcSendTransactionConfig, last_valid_block_height: u64, - timeout: Option, + timeout: Duration, ) -> Result { - // Retry logic with a timeout - let timeout: Duration = timeout.unwrap_or(Duration::from_secs(60)); let start_time: Instant = Instant::now(); while Instant::now().duration_since(start_time) < timeout diff --git a/programs/registry/src/protocol_config/state.rs b/programs/registry/src/protocol_config/state.rs index 24a6eca469..0a635b3bb9 100644 --- a/programs/registry/src/protocol_config/state.rs +++ b/programs/registry/src/protocol_config/state.rs @@ -25,7 +25,7 @@ pub struct ProtocolConfig { pub genesis_slot: u64, /// Minimum weight required for a forester to register to an epoch. pub min_weight: u64, - /// Light protocol slot length. + /// Light protocol slot length pub slot_length: u64, /// Foresters can register for this phase. pub registration_phase_length: u64, diff --git a/sdk-libs/client/src/rpc/solana_rpc.rs b/sdk-libs/client/src/rpc/solana_rpc.rs index 95b799ee7b..c5461381f2 100644 --- a/sdk-libs/client/src/rpc/solana_rpc.rs +++ b/sdk-libs/client/src/rpc/solana_rpc.rs @@ -56,6 +56,8 @@ impl Display for SolanaRpcUrl { pub struct RetryConfig { pub max_retries: u32, pub retry_delay: Duration, + /// Max Light slot timeout in time based on solana slot length and light + /// slot length. pub timeout: Duration, } @@ -422,8 +424,15 @@ impl RpcConnection for SolanaRpcConnection { } async fn get_latest_blockhash(&mut self) -> Result { - self.retry(|| async { self.client.get_latest_blockhash().map_err(RpcError::from) }) - .await + self.retry(|| async { + self.client + // Confirmed commitments land more reliably than finalized + // https://www.helius.dev/blog/how-to-deal-with-blockhash-errors-on-solana#how-to-deal-with-blockhash-errors + .get_latest_blockhash_with_commitment(CommitmentConfig::confirmed()) + .map(|response| response.0) + .map_err(RpcError::from) + }) + .await } async fn get_slot(&mut self) -> Result { From b62b025b0bec3fc7e69f326476d01718c9e783b3 Mon Sep 17 00:00:00 2001 From: Swenschaeferjohann Date: Thu, 16 Jan 2025 19:44:10 +0000 Subject: [PATCH 4/8] adjust intervals --- forester/src/smart_transaction.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/forester/src/smart_transaction.rs b/forester/src/smart_transaction.rs index f1a14e901c..ade364392e 100644 --- a/forester/src/smart_transaction.rs +++ b/forester/src/smart_transaction.rs @@ -35,10 +35,10 @@ pub async fn poll_transaction_confirmation<'a, R: RpcConnection>( connection: &mut bb8::PooledConnection<'a, SolanaConnectionManager>, txt_sig: Signature, ) -> Result { - // 15 second timeout - let timeout: Duration = Duration::from_secs(15); - // 5 second retry interval - let interval: Duration = Duration::from_secs(5); + // 12 second timeout + let timeout: Duration = Duration::from_secs(12); + // 6 second retry interval + let interval: Duration = Duration::from_secs(6); let start: Instant = Instant::now(); loop { @@ -95,6 +95,8 @@ pub async fn send_and_confirm_transaction<'a, R: RpcConnection>( ) -> Result { let start_time: Instant = Instant::now(); + // As is, if timeout=30s, it'll send, poll 2 times within 12s, + // then try once again. while Instant::now().duration_since(start_time) < timeout && connection.get_slot().await? <= last_valid_block_height { @@ -115,7 +117,7 @@ pub async fn send_and_confirm_transaction<'a, R: RpcConnection>( } Err(light_client::rpc::RpcError::CustomError( - "Transaction failed to confirm in 60s.".to_string(), + "Transaction failed to confirm within timeout.".to_string(), )) } From ec647fbfe6b4c24fca479ae50fd4dfb8265696da Mon Sep 17 00:00:00 2001 From: Swenschaeferjohann Date: Thu, 16 Jan 2025 20:55:00 +0000 Subject: [PATCH 5/8] extend test for capped-prio-fee --- forester/src/epoch_manager.rs | 1 + forester/src/send_transaction.rs | 50 ++++++++---- forester/src/smart_transaction.rs | 5 +- forester/tests/priority_fee_test.rs | 118 +++++++++++++++++++++++----- 4 files changed, 138 insertions(+), 36 deletions(-) diff --git a/forester/src/epoch_manager.rs b/forester/src/epoch_manager.rs index 8575b1d40b..189d11a048 100644 --- a/forester/src/epoch_manager.rs +++ b/forester/src/epoch_manager.rs @@ -933,6 +933,7 @@ impl + IndexerType> EpochManager { }; debug!("Sending transactions..."); + // sequential let start_time = Instant::now(); let batch_tx_future = send_batched_transactions( &self.config.payer_keypair, diff --git a/forester/src/send_transaction.rs b/forester/src/send_transaction.rs index 916e576f41..955869b0e4 100644 --- a/forester/src/send_transaction.rs +++ b/forester/src/send_transaction.rs @@ -82,7 +82,7 @@ pub fn calculate_compute_unit_price(target_lamports: u64, compute_units: u64) -> } /// Setting: -/// 1. We have 1 light slot 15 seconds and a lot of elements in the queue +/// 1. We have 1 light slot (n solana slots), and elements in thequeue /// 2. we want to send as many elements from the queue as possible /// /// Strategy: @@ -99,7 +99,6 @@ pub fn calculate_compute_unit_price(target_lamports: u64, compute_units: u64) -> /// /// Questions: /// - How do we make sure that we have send all the transactions? -/// - How can we monitor how many txs have been dropped? /// /// TODO: /// - return number of sent transactions @@ -165,29 +164,32 @@ pub async fn send_batched_transactions( continue; } - // TODO: note that fresh blockhash has higher chance of landing. consider doing per batch. - // 4. Fetch recent blockhash. - // A recent blockhash is valid for 2 mins we only need one per batch. We - // use a new one per batch in case that we want to retry these same - // transactions and identical transactions might be dropped. + // 4. Fetch recent confirmed blockhash. + // A recent blockhash is valid for 150 blocks. let recent_blockhash = rpc.get_latest_blockhash().await?; let current_block_height = rpc.get_block_height().await?; let last_valid_block_height = current_block_height + 150; let forester_epoch_pda_pubkey = get_forester_epoch_pda_from_authority(derivation, transaction_builder.epoch()).0; - // Get the priority fee estimate based on write locked accounts + // Get the priority fee estimate based on write-locked accounts let account_keys = vec![ payer.pubkey(), forester_epoch_pda_pubkey, tree_accounts.queue, tree_accounts.merkle_tree, ]; - let url = Url::parse(&rpc.get_url()).expect("Failed to parse URL"); - let priority_fee_recommendation = request_priority_fee_estimate(&url, account_keys).await?; - - let priority_fee = get_capped_priority_fee(priority_fee_recommendation); + let priority_fee_recommendation: u64 = + request_priority_fee_estimate(&url, account_keys).await?; + + let cap_config = CapConfig { + rec_fee_microlamports_per_cu: priority_fee_recommendation, + min_fee_lamports: 10_000, + max_fee_lamports: 100_000, + compute_unit_limit: 180_000, + }; + let priority_fee = get_capped_priority_fee(cap_config); // 5. Iterate over work items in chunks of batch size. for work_items in @@ -303,6 +305,14 @@ pub async fn send_batched_transactions( Ok(num_sent_transactions) } +#[derive(Debug, Clone, Copy)] +pub struct CapConfig { + pub rec_fee_microlamports_per_cu: u64, + pub min_fee_lamports: u64, + pub max_fee_lamports: u64, + pub compute_unit_limit: u64, +} + fn get_remaining_time_in_light_slot(start_time: Instant, timeout: Duration) -> Duration { timeout.saturating_sub(start_time.elapsed()) } @@ -548,8 +558,16 @@ pub async fn request_priority_fee_estimate(url: &Url, account_keys: Vec) ) } -/// Get capped priority fee for transaction -pub fn get_capped_priority_fee(priority_fee_recommendation: u64) -> u64 { - let priority_fee_cap = calculate_compute_unit_price(10_000, 170_000); - std::cmp::min(priority_fee_recommendation, priority_fee_cap) +/// Get capped priority fee for transaction between min and max. +pub fn get_capped_priority_fee(cap_config: CapConfig) -> u64 { + if cap_config.max_fee_lamports < cap_config.min_fee_lamports { + panic!("Max fee is less than min fee"); + } + + let priority_fee_max = + calculate_compute_unit_price(cap_config.max_fee_lamports, cap_config.compute_unit_limit); + let priority_fee_min = + calculate_compute_unit_price(cap_config.min_fee_lamports, cap_config.compute_unit_limit); + let capped_fee = std::cmp::min(cap_config.rec_fee_microlamports_per_cu, priority_fee_max); + std::cmp::max(capped_fee, priority_fee_min) } diff --git a/forester/src/smart_transaction.rs b/forester/src/smart_transaction.rs index ade364392e..d9a41a7edf 100644 --- a/forester/src/smart_transaction.rs +++ b/forester/src/smart_transaction.rs @@ -34,6 +34,7 @@ pub struct CreateSmartTransactionConfig { pub async fn poll_transaction_confirmation<'a, R: RpcConnection>( connection: &mut bb8::PooledConnection<'a, SolanaConnectionManager>, txt_sig: Signature, + abort_timeout: Duration, ) -> Result { // 12 second timeout let timeout: Duration = Duration::from_secs(12); @@ -42,7 +43,7 @@ pub async fn poll_transaction_confirmation<'a, R: RpcConnection>( let start: Instant = Instant::now(); loop { - if start.elapsed() >= timeout { + if start.elapsed() >= timeout || start.elapsed() >= abort_timeout { return Err(light_client::rpc::RpcError::CustomError(format!( "Transaction {}'s confirmation timed out", txt_sig @@ -105,7 +106,7 @@ pub async fn send_and_confirm_transaction<'a, R: RpcConnection>( match result.await { Ok(signature) => { // Poll for transaction confirmation - match poll_transaction_confirmation(connection, signature).await { + match poll_transaction_confirmation(connection, signature, timeout).await { Ok(sig) => return Ok(sig), // Retry on polling failure Err(_) => continue, diff --git a/forester/tests/priority_fee_test.rs b/forester/tests/priority_fee_test.rs index 981c35dff6..ca67446964 100644 --- a/forester/tests/priority_fee_test.rs +++ b/forester/tests/priority_fee_test.rs @@ -1,8 +1,6 @@ use forester::{ cli::StartArgs, - send_transaction::{ - calculate_compute_unit_price, get_capped_priority_fee, request_priority_fee_estimate, - }, + send_transaction::{get_capped_priority_fee, request_priority_fee_estimate, CapConfig}, ForesterConfig, }; use light_client::rpc::{RpcConnection, SolanaRpcConnection}; @@ -10,7 +8,6 @@ use solana_sdk::{commitment_config::CommitmentConfig, signature::Signer}; use url::Url; use crate::test_utils::init; - mod test_utils; #[tokio::test] @@ -84,20 +81,105 @@ async fn test_priority_fee_request() { println!("Priority fee: {:?}", priority_fee); assert!(priority_fee > 0, "Priority fee should be greater than 0"); } - #[test] + fn test_capped_priority_fee() { - let test_cases = vec![ - (1000, 1000), // Below cap - (1_000_000, calculate_compute_unit_price(10_000, 170_000)), // Above cap - ]; - - for (input, expected) in test_cases { - let result = get_capped_priority_fee(input); - assert_eq!( - result, expected, - "Priority fee capping failed for input {}", - input - ); - } + let cap_config = CapConfig { + rec_fee_microlamports_per_cu: 50_000, + min_fee_lamports: 10_000, + max_fee_lamports: 100_000, + // 1_000_000 cu x 50_000 microlamports per cu = 50_000 lamports total + compute_unit_limit: 1_000_000, + }; + let expected = 50_000; + + let result = get_capped_priority_fee(cap_config); + assert_eq!( + result, expected, + "Priority fee capping failed for input {}", + cap_config.rec_fee_microlamports_per_cu + ); + + let cap_config = CapConfig { + rec_fee_microlamports_per_cu: 10_000, + min_fee_lamports: 10_000, + max_fee_lamports: 100_000, + compute_unit_limit: 1_000_000, + }; + let expected = 10_000; + let result = get_capped_priority_fee(cap_config); + assert_eq!( + result, expected, + "Priority fee capping failed for input {}", + cap_config.rec_fee_microlamports_per_cu + ); + + let cap_config = CapConfig { + rec_fee_microlamports_per_cu: 100_000, + min_fee_lamports: 10_000, + max_fee_lamports: 100_000, + compute_unit_limit: 1_000_000, + }; + let expected = 100_000; + let result = get_capped_priority_fee(cap_config); + assert_eq!( + result, expected, + "Priority fee capping failed for input {}", + cap_config.rec_fee_microlamports_per_cu + ); + + let cap_config = CapConfig { + rec_fee_microlamports_per_cu: 10_000, + min_fee_lamports: 20_000, + max_fee_lamports: 100_000, + compute_unit_limit: 1_000_000, + }; + let expected = 20_000; + let result = get_capped_priority_fee(cap_config); + assert_eq!( + result, expected, + "Priority fee capping failed for input {}", + cap_config.rec_fee_microlamports_per_cu + ); + + let cap_config = CapConfig { + rec_fee_microlamports_per_cu: 200_000, + min_fee_lamports: 10_000, + max_fee_lamports: 100_000, + compute_unit_limit: 1_000_000, + }; + let expected = 100_000; + let result = get_capped_priority_fee(cap_config); + assert_eq!( + result, expected, + "Priority fee capping failed for input {}", + cap_config.rec_fee_microlamports_per_cu + ); + + let cap_config = CapConfig { + rec_fee_microlamports_per_cu: 10_000, + min_fee_lamports: 0, + max_fee_lamports: 0, + compute_unit_limit: 1_000_000, + }; + let expected = 0; + let result = get_capped_priority_fee(cap_config); + assert_eq!( + result, expected, + "Priority fee capping failed for input {}", + cap_config.rec_fee_microlamports_per_cu + ); + + let cap_config = CapConfig { + rec_fee_microlamports_per_cu: 10_000, + min_fee_lamports: 10_000, + max_fee_lamports: 0, + compute_unit_limit: 1_000_000, + }; + println!("expecting panic"); + let result = std::panic::catch_unwind(|| get_capped_priority_fee(cap_config)); + assert!( + result.is_err(), + "Expected panic for max fee less than min fee" + ); } From 0a2345da1b6a8f1225520b5947f74dfb6760ae3f Mon Sep 17 00:00:00 2001 From: Swenschaeferjohann Date: Thu, 16 Jan 2025 21:29:19 +0000 Subject: [PATCH 6/8] clean --- ..._types.rs => helius_priority_fee_types.rs} | 35 +------------------ forester/src/lib.rs | 2 +- forester/src/send_transaction.rs | 2 +- forester/src/smart_transaction.rs | 3 +- 4 files changed, 5 insertions(+), 37 deletions(-) rename forester/src/{helius_types.rs => helius_priority_fee_types.rs} (72%) diff --git a/forester/src/helius_types.rs b/forester/src/helius_priority_fee_types.rs similarity index 72% rename from forester/src/helius_types.rs rename to forester/src/helius_priority_fee_types.rs index a900c0e17a..d9d22b2fa1 100644 --- a/forester/src/helius_types.rs +++ b/forester/src/helius_priority_fee_types.rs @@ -1,19 +1,6 @@ -use std::time::Duration; - +// adapted from https://github.com/helius-labs/helius-rust-sdk/blob/dev/src/types/types.rs use serde::{Deserialize, Serialize}; -#[derive(Serialize, Deserialize, Debug, Default)] -pub struct MicroLamportPriorityFeeLevels { - pub min: f64, - pub low: f64, - pub medium: f64, - pub high: f64, - #[serde(rename = "veryHigh")] - pub very_high: f64, - #[serde(rename = "unsafeMax")] - pub unsafe_max: f64, -} - #[derive(Serialize, Deserialize, Debug)] pub enum PriorityLevel { Min, @@ -84,24 +71,4 @@ pub struct GetPriorityFeeEstimateRequest { pub struct GetPriorityFeeEstimateResponse { #[serde(rename = "priorityFeeEstimate")] pub priority_fee_estimate: Option, - // #[serde(rename = "priorityFeeLevels")] - // pub priority_fee_levels: Option, -} - -pub struct Timeout { - pub duration: Duration, -} - -impl Default for Timeout { - fn default() -> Self { - Self { - duration: Duration::from_secs(60), - } - } -} - -impl From for Duration { - fn from(val: Timeout) -> Self { - val.duration - } } diff --git a/forester/src/lib.rs b/forester/src/lib.rs index 81f3bb9585..25ea7e6c0e 100644 --- a/forester/src/lib.rs +++ b/forester/src/lib.rs @@ -6,7 +6,7 @@ pub mod config; pub mod epoch_manager; pub mod errors; pub mod forester_status; -pub mod helius_types; +pub mod helius_priority_fee_types; mod indexer_type; pub mod metrics; pub mod pagerduty; diff --git a/forester/src/send_transaction.rs b/forester/src/send_transaction.rs index 955869b0e4..7a06cd40e0 100644 --- a/forester/src/send_transaction.rs +++ b/forester/src/send_transaction.rs @@ -41,7 +41,7 @@ use crate::{ config::QueueConfig, epoch_manager::{MerkleProofType, WorkItem}, errors::ForesterError, - helius_types::{ + helius_priority_fee_types::{ GetPriorityFeeEstimateOptions, GetPriorityFeeEstimateRequest, GetPriorityFeeEstimateResponse, RpcRequest, RpcResponse, }, diff --git a/forester/src/smart_transaction.rs b/forester/src/smart_transaction.rs index d9a41a7edf..3dc333f692 100644 --- a/forester/src/smart_transaction.rs +++ b/forester/src/smart_transaction.rs @@ -1,4 +1,5 @@ -// from helius-sdk. adjusted for use in forester +// adapted from https://github.com/helius-labs/helius-rust-sdk/blob/dev/src/optimized_transaction.rs +// optimized for forester client use std::time::{Duration, Instant}; use light_client::{rpc::RpcConnection, rpc_pool::SolanaConnectionManager}; From ede18bc1521bc0d9d6a12892e7581e4cae017180 Mon Sep 17 00:00:00 2001 From: Swenschaeferjohann Date: Thu, 16 Jan 2025 23:44:08 +0000 Subject: [PATCH 7/8] add doc --- forester/src/send_transaction.rs | 33 +++++++++++++++++++---------- forester/src/smart_transaction.rs | 16 ++------------ forester/tests/priority_fee_test.rs | 28 ++++++++++++++++++++++++ 3 files changed, 52 insertions(+), 25 deletions(-) diff --git a/forester/src/send_transaction.rs b/forester/src/send_transaction.rs index 7a06cd40e0..a610586339 100644 --- a/forester/src/send_transaction.rs +++ b/forester/src/send_transaction.rs @@ -165,7 +165,7 @@ pub async fn send_batched_transactions( } // 4. Fetch recent confirmed blockhash. - // A recent blockhash is valid for 150 blocks. + // A blockhash is valid for 150 blocks. let recent_blockhash = rpc.get_latest_blockhash().await?; let current_block_height = rpc.get_block_height().await?; let last_valid_block_height = current_block_height + 150; @@ -183,11 +183,21 @@ pub async fn send_batched_transactions( let priority_fee_recommendation: u64 = request_priority_fee_estimate(&url, account_keys).await?; + // Cap the priority fee and CU usage with buffer. let cap_config = CapConfig { rec_fee_microlamports_per_cu: priority_fee_recommendation, - min_fee_lamports: 10_000, - max_fee_lamports: 100_000, - compute_unit_limit: 180_000, + min_fee_lamports: config + .build_transaction_batch_config + .compute_unit_price + .unwrap_or(10_000), + max_fee_lamports: config + .build_transaction_batch_config + .compute_unit_price + .unwrap_or(100_000), + compute_unit_limit: config + .build_transaction_batch_config + .compute_unit_limit + .unwrap_or(200_000) as u64, }; let priority_fee = get_capped_priority_fee(cap_config); @@ -205,8 +215,10 @@ pub async fn send_batched_transactions( } } - // Minimum time to wait for the next batch of transactions. - // Can be used to avoid rate limits. + // Minimum time to wait for the next batch of transactions. Can be + // used to avoid rate limits. TODO(swen): check max feasible batch + // size and latency for large tx batches. TODO: add global rate + // limit across our instances and queues: max 100 RPS global. let transaction_build_time_start = Instant::now(); let (transactions, _block_height) = transaction_builder .build_signed_transaction_batch( @@ -235,14 +247,14 @@ pub async fn send_batched_transactions( } let send_transaction_config = RpcSendTransactionConfig { - // Required for routing through staked connection, see + // Use required settings for routing through staked connection: // https://docs.helius.dev/guides/sending-transactions-on-solana skip_preflight: true, max_retries: Some(0), preflight_commitment: Some(CommitmentLevel::Confirmed), ..Default::default() }; - // Asynchronously send all transactions in the batch + // Send and confirm all transactions in the batch non-blocking. let send_futures: Vec<_> = transactions .into_iter() .map(|tx| { @@ -270,7 +282,7 @@ pub async fn send_batched_transactions( let results = join_all(send_futures).await; - // Process results + // Evaluate results for result in results { match result { Ok(signature) => { @@ -370,10 +382,9 @@ impl> TransactionBuilder for EpochManagerTransac payer: payer.insecure_clone(), instructions: vec![instruction], recent_blockhash: *recent_blockhash, - compute_unit_price: config.compute_unit_price, + compute_unit_price: Some(priority_fee), compute_unit_limit: config.compute_unit_limit, last_valid_block_hash: last_valid_block_height, - priority_fee, }) .await?; transactions.push(transaction); diff --git a/forester/src/smart_transaction.rs b/forester/src/smart_transaction.rs index 3dc333f692..16bb84a7af 100644 --- a/forester/src/smart_transaction.rs +++ b/forester/src/smart_transaction.rs @@ -23,7 +23,6 @@ pub struct CreateSmartTransactionConfig { pub compute_unit_limit: Option, pub instructions: Vec, pub last_valid_block_hash: u64, - pub priority_fee: u64, } /// Poll a transaction to check whether it has been confirmed @@ -37,7 +36,7 @@ pub async fn poll_transaction_confirmation<'a, R: RpcConnection>( txt_sig: Signature, abort_timeout: Duration, ) -> Result { - // 12 second timeout + // 12 second total timeout before exiting let timeout: Duration = Duration::from_secs(12); // 6 second retry interval let interval: Duration = Duration::from_secs(6); @@ -78,16 +77,7 @@ pub async fn poll_transaction_confirmation<'a, R: RpcConnection>( } } -/// Sends a transaction and handles its confirmation status -/// -/// # Arguments -/// * `transaction` - The transaction to be sent, which implements `SerializableTransaction` -/// * `send_transaction_config` - Configuration options for sending the transaction -/// * `last_valid_block_height` - The last block height at which the transaction is valid -/// * `timeout` - Duration for polling transaction confirmation -/// -/// # Returns -/// The transaction signature, if successful +// Sends a transaction and handles its confirmation. Retries until timeout or last_valid_block_height is reached. pub async fn send_and_confirm_transaction<'a, R: RpcConnection>( connection: &mut bb8::PooledConnection<'a, SolanaConnectionManager>, transaction: &Transaction, @@ -97,8 +87,6 @@ pub async fn send_and_confirm_transaction<'a, R: RpcConnection>( ) -> Result { let start_time: Instant = Instant::now(); - // As is, if timeout=30s, it'll send, poll 2 times within 12s, - // then try once again. while Instant::now().duration_since(start_time) < timeout && connection.get_slot().await? <= last_valid_block_height { diff --git a/forester/tests/priority_fee_test.rs b/forester/tests/priority_fee_test.rs index ca67446964..5591c64b3e 100644 --- a/forester/tests/priority_fee_test.rs +++ b/forester/tests/priority_fee_test.rs @@ -182,4 +182,32 @@ fn test_capped_priority_fee() { result.is_err(), "Expected panic for max fee less than min fee" ); + + let cap_config = CapConfig { + rec_fee_microlamports_per_cu: 10_000, + min_fee_lamports: 50_000, + max_fee_lamports: 50_000, + compute_unit_limit: 1_000_000, + }; + let expected = 50_000; + let result = get_capped_priority_fee(cap_config); + assert_eq!( + result, expected, + "Priority fee capping failed for input {}", + cap_config.rec_fee_microlamports_per_cu + ); + + let cap_config = CapConfig { + rec_fee_microlamports_per_cu: 100_000, + min_fee_lamports: 50_000, + max_fee_lamports: 50_000, + compute_unit_limit: 1_000_000, + }; + let expected = 50_000; + let result = get_capped_priority_fee(cap_config); + assert_eq!( + result, expected, + "Priority fee capping failed for input {}", + cap_config.rec_fee_microlamports_per_cu + ); } From 00508acf02131f475a7a993aa1f85841d3d0bf5b Mon Sep 17 00:00:00 2001 From: Swenschaeferjohann Date: Thu, 16 Jan 2025 23:47:58 +0000 Subject: [PATCH 8/8] add doc --- forester/src/epoch_manager.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/forester/src/epoch_manager.rs b/forester/src/epoch_manager.rs index 189d11a048..9e9933afc4 100644 --- a/forester/src/epoch_manager.rs +++ b/forester/src/epoch_manager.rs @@ -908,14 +908,21 @@ impl + IndexerType> EpochManager { } } } else { - // TODO: measure accuracy - // Optional replace with shutdown signal for all child processes + // TODO: measure accuracy Optional replace with shutdown + // signal for all child processes + // + // Note: as of now, this executes all batches sequentially: + // a single batch must fully complete before the next batch + // is sent. We can either limit num_batches to 1 and + // increase batch_size (quick fix) and require another + // rate-limiting mechanism (with more control). Or rework + // the send logic to not await confirmations. let batched_tx_config = SendBatchedTransactionsConfig { num_batches: 10, build_transaction_batch_config: BuildTransactionBatchConfig { batch_size: 50, // TODO: make batch size configurable and or dynamic based on queue usage - compute_unit_price: None, // Make dynamic based on queue usage - compute_unit_limit: Some(1_000_000), + compute_unit_price: Some(10_000), // Is dynamic. Sets max. + compute_unit_limit: Some(180_000), }, queue_config: self.config.queue_config, retry_config: RetryConfig {