diff --git a/iot_packet_verifier/src/pending.rs b/iot_packet_verifier/src/pending.rs index f28ac8f06..0db62ce63 100644 --- a/iot_packet_verifier/src/pending.rs +++ b/iot_packet_verifier/src/pending.rs @@ -392,6 +392,7 @@ mod test { use super::*; use std::collections::HashSet; + #[derive(Clone)] struct MockConfirmed(HashSet); #[async_trait] diff --git a/iot_packet_verifier/tests/integration_tests.rs b/iot_packet_verifier/tests/integration_tests.rs index c3ca415ac..f4251dda0 100644 --- a/iot_packet_verifier/tests/integration_tests.rs +++ b/iot_packet_verifier/tests/integration_tests.rs @@ -576,15 +576,16 @@ async fn test_end_to_end() { ); } +#[derive(Clone)] struct MockSolanaNetwork { - confirmed: Mutex>, + confirmed: Arc>>, ledger: Arc>>, } impl MockSolanaNetwork { fn new(ledger: HashMap) -> Self { Self { - confirmed: Default::default(), + confirmed: Arc::new(Default::default()), ledger: Arc::new(Mutex::new(ledger)), } } diff --git a/mobile_packet_verifier/src/burner.rs b/mobile_packet_verifier/src/burner.rs index 73d792563..7ab9b362b 100644 --- a/mobile_packet_verifier/src/burner.rs +++ b/mobile_packet_verifier/src/burner.rs @@ -1,21 +1,33 @@ +use std::time::Duration; + use file_store::file_sink::FileSinkClient; use helium_crypto::PublicKeyBinary; use helium_proto::services::packet_verifier::ValidDataTransferSession; -use solana::burn::SolanaNetwork; +use solana::{burn::SolanaNetwork, GetSignature}; use sqlx::{Pool, Postgres}; +use tracing::Instrument; use crate::pending_burns; pub struct Burner { valid_sessions: FileSinkClient, solana: S, + failed_retry_attempts: usize, + failed_check_interval: Duration, } impl Burner { - pub fn new(valid_sessions: FileSinkClient, solana: S) -> Self { + pub fn new( + valid_sessions: FileSinkClient, + solana: S, + failed_retry_attempts: usize, + failed_check_interval: Duration, + ) -> Self { Self { valid_sessions, solana, + failed_retry_attempts, + failed_check_interval, } } } @@ -33,43 +45,130 @@ where let payer_balance = self.solana.payer_balance(&payer).await?; if payer_balance < total_dcs { - tracing::warn!(%payer, %payer_balance, %total_dcs, "Payer does not have enough balance to burn dcs"); + tracing::warn!( + %payer, + %payer_balance, + %total_dcs, + "Payer does not have enough balance to burn dcs" + ); continue; } tracing::info!(%total_dcs, %payer, "Burning DC"); - if self.burn_data_credits(&payer, total_dcs).await.is_err() { - // We have failed to burn data credits: - metrics::counter!("burned", "payer" => payer.to_string(), "success" => "false") - .increment(total_dcs); - continue; + let txn = self.solana.make_burn_transaction(&payer, total_dcs).await?; + match self.solana.submit_transaction(&txn).await { + Ok(()) => { + handle_transaction_success( + pool, + payer, + total_dcs, + sessions, + &self.valid_sessions, + ) + .await?; + } + Err(err) => { + let span = tracing::info_span!( + "txn_confirmation", + signature = %txn.get_signature(), + %payer, + total_dcs, + max_attempts = self.failed_retry_attempts + ); + + // block on confirmation + self.transaction_confirmation_check(pool, err, txn, payer, total_dcs, sessions) + .instrument(span) + .await; + } } + } - // We succesfully managed to burn data credits: + Ok(()) + } - metrics::counter!("burned", "payer" => payer.to_string(), "success" => "true") - .increment(total_dcs); + async fn transaction_confirmation_check( + &self, + pool: &Pool, + err: S::Error, + txn: S::Transaction, + payer: PublicKeyBinary, + total_dcs: u64, + sessions: Vec, + ) { + tracing::warn!(?err, "starting txn confirmation check"); + // We don't know if the txn actually made it, maybe it did - // Delete from the data transfer session and write out to S3 - pending_burns::delete_for_payer(pool, &payer, total_dcs).await?; + let signature = txn.get_signature(); - for session in sessions { - self.valid_sessions - .write(ValidDataTransferSession::from(session), &[]) - .await?; + let mut attempt = 0; + while attempt <= self.failed_retry_attempts { + tokio::time::sleep(self.failed_check_interval).await; + match self.solana.confirm_transaction(signature).await { + Ok(true) => { + tracing::debug!("txn confirmed on chain"); + let txn_success = handle_transaction_success( + pool, + payer, + total_dcs, + sessions, + &self.valid_sessions, + ) + .await; + if let Err(err) = txn_success { + tracing::error!(?err, "txn succeeded, something else failed"); + } + + return; + } + Ok(false) => { + tracing::info!(attempt, "txn not confirmed, yet..."); + attempt += 1; + continue; + } + Err(err) => { + // Client errors do not count against retry attempts + tracing::error!(?err, attempt, "failed to confirm txn"); + continue; + } } } - Ok(()) + tracing::warn!("failed to confirm txn"); + + // We have failed to burn data credits: + metrics::counter!( + "burned", + "payer" => payer.to_string(), + "success" => "false" + ) + .increment(total_dcs); } +} - async fn burn_data_credits( - &self, - payer: &PublicKeyBinary, - amount: u64, - ) -> Result<(), S::Error> { - let txn = self.solana.make_burn_transaction(payer, amount).await?; - self.solana.submit_transaction(&txn).await?; - Ok(()) +async fn handle_transaction_success( + pool: &Pool, + payer: PublicKeyBinary, + total_dcs: u64, + sessions: Vec, + valid_sessions: &FileSinkClient, +) -> Result<(), anyhow::Error> { + // We succesfully managed to burn data credits: + metrics::counter!( + "burned", + "payer" => payer.to_string(), + "success" => "true" + ) + .increment(total_dcs); + + // Delete from the data transfer session and write out to S3 + pending_burns::delete_for_payer(pool, &payer, total_dcs).await?; + + for session in sessions { + valid_sessions + .write(ValidDataTransferSession::from(session), &[]) + .await?; } + + Ok(()) } diff --git a/mobile_packet_verifier/src/daemon.rs b/mobile_packet_verifier/src/daemon.rs index 685272b91..6049a93d0 100644 --- a/mobile_packet_verifier/src/daemon.rs +++ b/mobile_packet_verifier/src/daemon.rs @@ -157,7 +157,12 @@ impl Cmd { ) .await?; - let burner = Burner::new(valid_sessions, solana); + let burner = Burner::new( + valid_sessions, + solana, + settings.txn_confirmation_retry_attempts, + settings.txn_confirmation_check_interval, + ); let file_store = FileStore::from_settings(&settings.ingest).await?; diff --git a/mobile_packet_verifier/src/settings.rs b/mobile_packet_verifier/src/settings.rs index 2cd97bf26..08ebd4052 100644 --- a/mobile_packet_verifier/src/settings.rs +++ b/mobile_packet_verifier/src/settings.rs @@ -34,6 +34,17 @@ pub struct Settings { pub purger_interval: Duration, #[serde(with = "humantime_serde", default = "default_purger_max_age")] pub purger_max_age: Duration, + /// When a burn transaction is not a success, how many times should + /// we try to confirm the transaction before considering it a failure? + #[serde(default = "default_txn_confirmation_retry_attempts")] + pub txn_confirmation_retry_attempts: usize, + /// When a burn transaction is not a success, how long should we + /// wait between trying to confirm if the transaction made it to Solana? + #[serde( + with = "humantime_serde", + default = "default_txn_confirmation_check_interval" + )] + pub txn_confirmation_check_interval: Duration, } fn default_purger_interval() -> Duration { @@ -44,6 +55,14 @@ fn default_purger_max_age() -> Duration { humantime::parse_duration("24 hours").unwrap() } +fn default_txn_confirmation_check_interval() -> Duration { + humantime::parse_duration("1 min").unwrap() +} + +fn default_txn_confirmation_retry_attempts() -> usize { + 5 +} + fn default_start_after() -> DateTime { DateTime::UNIX_EPOCH } diff --git a/solana/src/burn.rs b/solana/src/burn.rs index 6f5abc54e..eb84f8127 100644 --- a/solana/src/burn.rs +++ b/solana/src/burn.rs @@ -32,7 +32,7 @@ use std::{ use tokio::sync::Mutex; #[async_trait] -pub trait SolanaNetwork: Send + Sync + 'static { +pub trait SolanaNetwork: Clone + Send + Sync + 'static { type Error: std::error::Error + Send + Sync + 'static; type Transaction: GetSignature + Send + Sync + 'static; @@ -76,8 +76,9 @@ impl Settings { } } +#[derive(Clone)] pub struct SolanaRpc { - provider: RpcClient, + provider: Arc, program_cache: BurnProgramCache, cluster: String, keypair: [u8; 64], @@ -101,7 +102,7 @@ impl SolanaRpc { } Ok(Arc::new(Self { cluster: settings.cluster.clone(), - provider, + provider: Arc::new(provider), program_cache, keypair: keypair.to_bytes(), payers_to_monitor: settings.payers_to_monitor()?, @@ -296,7 +297,7 @@ impl SolanaNetwork for SolanaRpc { } } -#[derive(Default)] +#[derive(Default, Clone)] pub struct PriorityFee { last_estimate: Arc>, } @@ -364,6 +365,7 @@ impl LastEstimate { } /// Cached pubkeys for the burn program +#[derive(Clone)] pub struct BurnProgramCache { pub account_payer: Pubkey, pub data_credits: Pubkey,