From 25821227c9037b23341f62286f45191e96956545 Mon Sep 17 00:00:00 2001 From: Michael Jeffrey Date: Wed, 4 Dec 2024 12:36:36 -0700 Subject: [PATCH] Mobile Verifier Burner Txn Error check (#906) * Make SolanaNetwork implement Clone so we can get a copy for a spawned task * Add retry settings to mobile verifier burner * Fallback to confirming transactions on all errors If a submitted transaction does not come back as a defacto success, we spawn a process that checks a txn made it to the chain by signature. By default, we will retry 5 times a minute apart, before considering the txn an actual failure to be tried again later. If during the checking we find the txn on chain, we do the same logic as if it had passed the first time. * consistent txn in logs * handle success case first * prefix with failed so we know what we're retrying * block on txn confirmation mobile packet verifier is not handling many burn txns, we can block on confirming before attempting to process the next txn. Txn tracking by signature and block-height will come in a later update. --- iot_packet_verifier/src/pending.rs | 1 + .../tests/integration_tests.rs | 5 +- mobile_packet_verifier/src/burner.rs | 151 +++++++++++++++--- mobile_packet_verifier/src/daemon.rs | 7 +- mobile_packet_verifier/src/settings.rs | 19 +++ solana/src/burn.rs | 10 +- 6 files changed, 160 insertions(+), 33 deletions(-) diff --git a/iot_packet_verifier/src/pending.rs b/iot_packet_verifier/src/pending.rs index f28ac8f06..0db62ce63 100644 --- a/iot_packet_verifier/src/pending.rs +++ b/iot_packet_verifier/src/pending.rs @@ -392,6 +392,7 @@ mod test { use super::*; use std::collections::HashSet; + #[derive(Clone)] struct MockConfirmed(HashSet); #[async_trait] diff --git a/iot_packet_verifier/tests/integration_tests.rs b/iot_packet_verifier/tests/integration_tests.rs index c3ca415ac..f4251dda0 100644 --- a/iot_packet_verifier/tests/integration_tests.rs +++ b/iot_packet_verifier/tests/integration_tests.rs @@ -576,15 +576,16 @@ async fn test_end_to_end() { ); } +#[derive(Clone)] struct MockSolanaNetwork { - confirmed: Mutex>, + confirmed: Arc>>, ledger: Arc>>, } impl MockSolanaNetwork { fn new(ledger: HashMap) -> Self { Self { - confirmed: Default::default(), + confirmed: Arc::new(Default::default()), ledger: Arc::new(Mutex::new(ledger)), } } diff --git a/mobile_packet_verifier/src/burner.rs b/mobile_packet_verifier/src/burner.rs index 73d792563..7ab9b362b 100644 --- a/mobile_packet_verifier/src/burner.rs +++ b/mobile_packet_verifier/src/burner.rs @@ -1,21 +1,33 @@ +use std::time::Duration; + use file_store::file_sink::FileSinkClient; use helium_crypto::PublicKeyBinary; use helium_proto::services::packet_verifier::ValidDataTransferSession; -use solana::burn::SolanaNetwork; +use solana::{burn::SolanaNetwork, GetSignature}; use sqlx::{Pool, Postgres}; +use tracing::Instrument; use crate::pending_burns; pub struct Burner { valid_sessions: FileSinkClient, solana: S, + failed_retry_attempts: usize, + failed_check_interval: Duration, } impl Burner { - pub fn new(valid_sessions: FileSinkClient, solana: S) -> Self { + pub fn new( + valid_sessions: FileSinkClient, + solana: S, + failed_retry_attempts: usize, + failed_check_interval: Duration, + ) -> Self { Self { valid_sessions, solana, + failed_retry_attempts, + failed_check_interval, } } } @@ -33,43 +45,130 @@ where let payer_balance = self.solana.payer_balance(&payer).await?; if payer_balance < total_dcs { - tracing::warn!(%payer, %payer_balance, %total_dcs, "Payer does not have enough balance to burn dcs"); + tracing::warn!( + %payer, + %payer_balance, + %total_dcs, + "Payer does not have enough balance to burn dcs" + ); continue; } tracing::info!(%total_dcs, %payer, "Burning DC"); - if self.burn_data_credits(&payer, total_dcs).await.is_err() { - // We have failed to burn data credits: - metrics::counter!("burned", "payer" => payer.to_string(), "success" => "false") - .increment(total_dcs); - continue; + let txn = self.solana.make_burn_transaction(&payer, total_dcs).await?; + match self.solana.submit_transaction(&txn).await { + Ok(()) => { + handle_transaction_success( + pool, + payer, + total_dcs, + sessions, + &self.valid_sessions, + ) + .await?; + } + Err(err) => { + let span = tracing::info_span!( + "txn_confirmation", + signature = %txn.get_signature(), + %payer, + total_dcs, + max_attempts = self.failed_retry_attempts + ); + + // block on confirmation + self.transaction_confirmation_check(pool, err, txn, payer, total_dcs, sessions) + .instrument(span) + .await; + } } + } - // We succesfully managed to burn data credits: + Ok(()) + } - metrics::counter!("burned", "payer" => payer.to_string(), "success" => "true") - .increment(total_dcs); + async fn transaction_confirmation_check( + &self, + pool: &Pool, + err: S::Error, + txn: S::Transaction, + payer: PublicKeyBinary, + total_dcs: u64, + sessions: Vec, + ) { + tracing::warn!(?err, "starting txn confirmation check"); + // We don't know if the txn actually made it, maybe it did - // Delete from the data transfer session and write out to S3 - pending_burns::delete_for_payer(pool, &payer, total_dcs).await?; + let signature = txn.get_signature(); - for session in sessions { - self.valid_sessions - .write(ValidDataTransferSession::from(session), &[]) - .await?; + let mut attempt = 0; + while attempt <= self.failed_retry_attempts { + tokio::time::sleep(self.failed_check_interval).await; + match self.solana.confirm_transaction(signature).await { + Ok(true) => { + tracing::debug!("txn confirmed on chain"); + let txn_success = handle_transaction_success( + pool, + payer, + total_dcs, + sessions, + &self.valid_sessions, + ) + .await; + if let Err(err) = txn_success { + tracing::error!(?err, "txn succeeded, something else failed"); + } + + return; + } + Ok(false) => { + tracing::info!(attempt, "txn not confirmed, yet..."); + attempt += 1; + continue; + } + Err(err) => { + // Client errors do not count against retry attempts + tracing::error!(?err, attempt, "failed to confirm txn"); + continue; + } } } - Ok(()) + tracing::warn!("failed to confirm txn"); + + // We have failed to burn data credits: + metrics::counter!( + "burned", + "payer" => payer.to_string(), + "success" => "false" + ) + .increment(total_dcs); } +} - async fn burn_data_credits( - &self, - payer: &PublicKeyBinary, - amount: u64, - ) -> Result<(), S::Error> { - let txn = self.solana.make_burn_transaction(payer, amount).await?; - self.solana.submit_transaction(&txn).await?; - Ok(()) +async fn handle_transaction_success( + pool: &Pool, + payer: PublicKeyBinary, + total_dcs: u64, + sessions: Vec, + valid_sessions: &FileSinkClient, +) -> Result<(), anyhow::Error> { + // We succesfully managed to burn data credits: + metrics::counter!( + "burned", + "payer" => payer.to_string(), + "success" => "true" + ) + .increment(total_dcs); + + // Delete from the data transfer session and write out to S3 + pending_burns::delete_for_payer(pool, &payer, total_dcs).await?; + + for session in sessions { + valid_sessions + .write(ValidDataTransferSession::from(session), &[]) + .await?; } + + Ok(()) } diff --git a/mobile_packet_verifier/src/daemon.rs b/mobile_packet_verifier/src/daemon.rs index 685272b91..6049a93d0 100644 --- a/mobile_packet_verifier/src/daemon.rs +++ b/mobile_packet_verifier/src/daemon.rs @@ -157,7 +157,12 @@ impl Cmd { ) .await?; - let burner = Burner::new(valid_sessions, solana); + let burner = Burner::new( + valid_sessions, + solana, + settings.txn_confirmation_retry_attempts, + settings.txn_confirmation_check_interval, + ); let file_store = FileStore::from_settings(&settings.ingest).await?; diff --git a/mobile_packet_verifier/src/settings.rs b/mobile_packet_verifier/src/settings.rs index 2cd97bf26..08ebd4052 100644 --- a/mobile_packet_verifier/src/settings.rs +++ b/mobile_packet_verifier/src/settings.rs @@ -34,6 +34,17 @@ pub struct Settings { pub purger_interval: Duration, #[serde(with = "humantime_serde", default = "default_purger_max_age")] pub purger_max_age: Duration, + /// When a burn transaction is not a success, how many times should + /// we try to confirm the transaction before considering it a failure? + #[serde(default = "default_txn_confirmation_retry_attempts")] + pub txn_confirmation_retry_attempts: usize, + /// When a burn transaction is not a success, how long should we + /// wait between trying to confirm if the transaction made it to Solana? + #[serde( + with = "humantime_serde", + default = "default_txn_confirmation_check_interval" + )] + pub txn_confirmation_check_interval: Duration, } fn default_purger_interval() -> Duration { @@ -44,6 +55,14 @@ fn default_purger_max_age() -> Duration { humantime::parse_duration("24 hours").unwrap() } +fn default_txn_confirmation_check_interval() -> Duration { + humantime::parse_duration("1 min").unwrap() +} + +fn default_txn_confirmation_retry_attempts() -> usize { + 5 +} + fn default_start_after() -> DateTime { DateTime::UNIX_EPOCH } diff --git a/solana/src/burn.rs b/solana/src/burn.rs index 6f5abc54e..eb84f8127 100644 --- a/solana/src/burn.rs +++ b/solana/src/burn.rs @@ -32,7 +32,7 @@ use std::{ use tokio::sync::Mutex; #[async_trait] -pub trait SolanaNetwork: Send + Sync + 'static { +pub trait SolanaNetwork: Clone + Send + Sync + 'static { type Error: std::error::Error + Send + Sync + 'static; type Transaction: GetSignature + Send + Sync + 'static; @@ -76,8 +76,9 @@ impl Settings { } } +#[derive(Clone)] pub struct SolanaRpc { - provider: RpcClient, + provider: Arc, program_cache: BurnProgramCache, cluster: String, keypair: [u8; 64], @@ -101,7 +102,7 @@ impl SolanaRpc { } Ok(Arc::new(Self { cluster: settings.cluster.clone(), - provider, + provider: Arc::new(provider), program_cache, keypair: keypair.to_bytes(), payers_to_monitor: settings.payers_to_monitor()?, @@ -296,7 +297,7 @@ impl SolanaNetwork for SolanaRpc { } } -#[derive(Default)] +#[derive(Default, Clone)] pub struct PriorityFee { last_estimate: Arc>, } @@ -364,6 +365,7 @@ impl LastEstimate { } /// Cached pubkeys for the burn program +#[derive(Clone)] pub struct BurnProgramCache { pub account_payer: Pubkey, pub data_credits: Pubkey,