Skip to content

Commit

Permalink
refactor transaction handling
Browse files Browse the repository at this point in the history
  • Loading branch information
sergeytimoshin committed Jan 22, 2025
1 parent 76276f3 commit 07da7aa
Showing 1 changed file with 37 additions and 27 deletions.
64 changes: 37 additions & 27 deletions forester/src/send_transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use tokio::{
sync::Mutex,
time::{sleep, Instant},
};
use tracing::log::info;
use tracing::{debug, warn};
use url::Url;

Expand Down Expand Up @@ -229,7 +230,7 @@ pub async fn send_batched_transactions<T: TransactionBuilder, R: RpcConnection>(
config.build_transaction_batch_config,
)
.await?;
debug!(
info!(
"build transaction time {:?}",
transaction_build_time_start.elapsed()
);
Expand All @@ -252,49 +253,58 @@ pub async fn send_batched_transactions<T: TransactionBuilder, R: RpcConnection>(
preflight_commitment: Some(CommitmentLevel::Confirmed),
..Default::default()
};
// Asynchronously send all transactions in the batch
let pool_clone = Arc::clone(&pool);
let send_futures = transactions.into_iter().map(move |tx| {
let pool_clone = Arc::clone(&pool_clone);

let (tx_sender, mut tx_receiver) = tokio::sync::mpsc::channel(120);
for tx in transactions {
let tx_sender = tx_sender.clone();
let pool_clone = Arc::clone(&pool);
let config = send_transaction_config;

tokio::spawn(async move {
match pool_clone.get_connection().await {
Ok(mut rpc) => {
let result = rpc
.process_transaction_with_config(tx, send_transaction_config)
.await;
println!("tx result: {:?}", result);
result
let result = rpc.process_transaction_with_config(tx, config).await;
let _ = tx_sender.send(result).await;
}
Err(e) => {
warn!("Failed to get RPC connection: {}", e);
}
Err(e) => Err(light_client::rpc::RpcError::CustomError(format!(
"Failed to get RPC connection: {}",
e
))),
}
})
});
let results = join_all(send_futures).await;

// Process results
for result in results {
});
}
drop(tx_sender);

while let Some(result) = tx_receiver.recv().await {
match result {
Ok(Ok(_)) => num_sent_transactions += 1,
Ok(Err(e)) => warn!("Transaction failed: {:?}", e),
Err(e) => warn!("Task failed: {:?}", e),
Ok(signature) => {
num_sent_transactions += 1;
// debug!("Transaction sent: {:?}", signature);
info!(
"tree {} / queue {} / tx {:?}",
tree_accounts.merkle_tree.to_string(),
tree_accounts.queue.to_string(),
signature
);
}
Err(e) => warn!("Transaction failed: {:?}", e),
}
}

num_batches += 1;
let batch_duration = batch_start.elapsed();
debug!("Batch duration: {:?}", batch_duration);
info!("Batch duration: {:?}", batch_duration);

// 8. Await minimum batch time.
if start_time.elapsed() + config.retry_config.retry_delay < config.retry_config.timeout
{
sleep(config.retry_config.retry_delay).await;
} else {
break;
}

if num_batches >= config.num_batches {
debug!("Reached max number of batches");
break;
}

// 9. Check if we reached max number of batches.
if num_batches >= config.num_batches {
debug!("Reached max number of batches");
Expand Down Expand Up @@ -505,7 +515,7 @@ pub async fn fetch_proofs_and_create_instructions<R: RpcConnection, I: Indexer<R

/// Request priority fee estimate from Helius RPC endpoint
pub async fn request_priority_fee_estimate(url: &Url, account_keys: Vec<Pubkey>) -> Result<u64> {
if url.host_str() == Some("localhost") {
if url.host_str() != Some("mainnet") {
return Ok(10_000);
}

Expand Down

0 comments on commit 07da7aa

Please sign in to comment.