Skip to content

Commit

Permalink
Mobile Verifier Burner Txn Error check (#906)
Browse files Browse the repository at this point in the history
* Make SolanaNetwork implement Clone so we can get a copy for a spawned task

* Add retry settings to mobile verifier burner

* Fallback to confirming transactions on all errors

If a submitted transaction does not come back as a defacto success, we spawn a process that checks a txn made it to the chain by signature. By default, we will retry 5 times a minute apart, before considering the txn an actual failure to be tried again later.

If during the checking we find the txn on chain, we do the same logic as if it had passed the first time.

* consistent txn in logs

* handle success case first

* prefix with failed so we know what we're retrying

* block on txn confirmation

mobile packet verifier is not handling many burn txns, we can block on confirming before attempting to process the next txn.

Txn tracking by signature and block-height will come in a later update.
  • Loading branch information
michaeldjeffrey authored Dec 4, 2024
1 parent 44b26c5 commit 2582122
Show file tree
Hide file tree
Showing 6 changed files with 160 additions and 33 deletions.
1 change: 1 addition & 0 deletions iot_packet_verifier/src/pending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ mod test {
use super::*;
use std::collections::HashSet;

#[derive(Clone)]
struct MockConfirmed(HashSet<Signature>);

#[async_trait]
Expand Down
5 changes: 3 additions & 2 deletions iot_packet_verifier/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,15 +576,16 @@ async fn test_end_to_end() {
);
}

#[derive(Clone)]
struct MockSolanaNetwork {
confirmed: Mutex<HashSet<Signature>>,
confirmed: Arc<Mutex<HashSet<Signature>>>,
ledger: Arc<Mutex<HashMap<PublicKeyBinary, u64>>>,
}

impl MockSolanaNetwork {
fn new(ledger: HashMap<PublicKeyBinary, u64>) -> Self {
Self {
confirmed: Default::default(),
confirmed: Arc::new(Default::default()),
ledger: Arc::new(Mutex::new(ledger)),
}
}
Expand Down
151 changes: 125 additions & 26 deletions mobile_packet_verifier/src/burner.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,33 @@
use std::time::Duration;

use file_store::file_sink::FileSinkClient;
use helium_crypto::PublicKeyBinary;
use helium_proto::services::packet_verifier::ValidDataTransferSession;
use solana::burn::SolanaNetwork;
use solana::{burn::SolanaNetwork, GetSignature};
use sqlx::{Pool, Postgres};
use tracing::Instrument;

use crate::pending_burns;

pub struct Burner<S> {
valid_sessions: FileSinkClient<ValidDataTransferSession>,
solana: S,
failed_retry_attempts: usize,
failed_check_interval: Duration,
}

impl<S> Burner<S> {
pub fn new(valid_sessions: FileSinkClient<ValidDataTransferSession>, solana: S) -> Self {
pub fn new(
valid_sessions: FileSinkClient<ValidDataTransferSession>,
solana: S,
failed_retry_attempts: usize,
failed_check_interval: Duration,
) -> Self {
Self {
valid_sessions,
solana,
failed_retry_attempts,
failed_check_interval,
}
}
}
Expand All @@ -33,43 +45,130 @@ where
let payer_balance = self.solana.payer_balance(&payer).await?;

if payer_balance < total_dcs {
tracing::warn!(%payer, %payer_balance, %total_dcs, "Payer does not have enough balance to burn dcs");
tracing::warn!(
%payer,
%payer_balance,
%total_dcs,
"Payer does not have enough balance to burn dcs"
);
continue;
}

tracing::info!(%total_dcs, %payer, "Burning DC");
if self.burn_data_credits(&payer, total_dcs).await.is_err() {
// We have failed to burn data credits:
metrics::counter!("burned", "payer" => payer.to_string(), "success" => "false")
.increment(total_dcs);
continue;
let txn = self.solana.make_burn_transaction(&payer, total_dcs).await?;
match self.solana.submit_transaction(&txn).await {
Ok(()) => {
handle_transaction_success(
pool,
payer,
total_dcs,
sessions,
&self.valid_sessions,
)
.await?;
}
Err(err) => {
let span = tracing::info_span!(
"txn_confirmation",
signature = %txn.get_signature(),
%payer,
total_dcs,
max_attempts = self.failed_retry_attempts
);

// block on confirmation
self.transaction_confirmation_check(pool, err, txn, payer, total_dcs, sessions)
.instrument(span)
.await;
}
}
}

// We succesfully managed to burn data credits:
Ok(())
}

metrics::counter!("burned", "payer" => payer.to_string(), "success" => "true")
.increment(total_dcs);
async fn transaction_confirmation_check(
&self,
pool: &Pool<Postgres>,
err: S::Error,
txn: S::Transaction,
payer: PublicKeyBinary,
total_dcs: u64,
sessions: Vec<pending_burns::DataTransferSession>,
) {
tracing::warn!(?err, "starting txn confirmation check");
// We don't know if the txn actually made it, maybe it did

// Delete from the data transfer session and write out to S3
pending_burns::delete_for_payer(pool, &payer, total_dcs).await?;
let signature = txn.get_signature();

for session in sessions {
self.valid_sessions
.write(ValidDataTransferSession::from(session), &[])
.await?;
let mut attempt = 0;
while attempt <= self.failed_retry_attempts {
tokio::time::sleep(self.failed_check_interval).await;
match self.solana.confirm_transaction(signature).await {
Ok(true) => {
tracing::debug!("txn confirmed on chain");
let txn_success = handle_transaction_success(
pool,
payer,
total_dcs,
sessions,
&self.valid_sessions,
)
.await;
if let Err(err) = txn_success {
tracing::error!(?err, "txn succeeded, something else failed");
}

return;
}
Ok(false) => {
tracing::info!(attempt, "txn not confirmed, yet...");
attempt += 1;
continue;
}
Err(err) => {
// Client errors do not count against retry attempts
tracing::error!(?err, attempt, "failed to confirm txn");
continue;
}
}
}

Ok(())
tracing::warn!("failed to confirm txn");

// We have failed to burn data credits:
metrics::counter!(
"burned",
"payer" => payer.to_string(),
"success" => "false"
)
.increment(total_dcs);
}
}

async fn burn_data_credits(
&self,
payer: &PublicKeyBinary,
amount: u64,
) -> Result<(), S::Error> {
let txn = self.solana.make_burn_transaction(payer, amount).await?;
self.solana.submit_transaction(&txn).await?;
Ok(())
async fn handle_transaction_success(
pool: &Pool<Postgres>,
payer: PublicKeyBinary,
total_dcs: u64,
sessions: Vec<pending_burns::DataTransferSession>,
valid_sessions: &FileSinkClient<ValidDataTransferSession>,
) -> Result<(), anyhow::Error> {
// We succesfully managed to burn data credits:
metrics::counter!(
"burned",
"payer" => payer.to_string(),
"success" => "true"
)
.increment(total_dcs);

// Delete from the data transfer session and write out to S3
pending_burns::delete_for_payer(pool, &payer, total_dcs).await?;

for session in sessions {
valid_sessions
.write(ValidDataTransferSession::from(session), &[])
.await?;
}

Ok(())
}
7 changes: 6 additions & 1 deletion mobile_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,12 @@ impl Cmd {
)
.await?;

let burner = Burner::new(valid_sessions, solana);
let burner = Burner::new(
valid_sessions,
solana,
settings.txn_confirmation_retry_attempts,
settings.txn_confirmation_check_interval,
);

let file_store = FileStore::from_settings(&settings.ingest).await?;

Expand Down
19 changes: 19 additions & 0 deletions mobile_packet_verifier/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ pub struct Settings {
pub purger_interval: Duration,
#[serde(with = "humantime_serde", default = "default_purger_max_age")]
pub purger_max_age: Duration,
/// When a burn transaction is not a success, how many times should
/// we try to confirm the transaction before considering it a failure?
#[serde(default = "default_txn_confirmation_retry_attempts")]
pub txn_confirmation_retry_attempts: usize,
/// When a burn transaction is not a success, how long should we
/// wait between trying to confirm if the transaction made it to Solana?
#[serde(
with = "humantime_serde",
default = "default_txn_confirmation_check_interval"
)]
pub txn_confirmation_check_interval: Duration,
}

fn default_purger_interval() -> Duration {
Expand All @@ -44,6 +55,14 @@ fn default_purger_max_age() -> Duration {
humantime::parse_duration("24 hours").unwrap()
}

fn default_txn_confirmation_check_interval() -> Duration {
humantime::parse_duration("1 min").unwrap()
}

fn default_txn_confirmation_retry_attempts() -> usize {
5
}

fn default_start_after() -> DateTime<Utc> {
DateTime::UNIX_EPOCH
}
Expand Down
10 changes: 6 additions & 4 deletions solana/src/burn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use std::{
use tokio::sync::Mutex;

#[async_trait]
pub trait SolanaNetwork: Send + Sync + 'static {
pub trait SolanaNetwork: Clone + Send + Sync + 'static {
type Error: std::error::Error + Send + Sync + 'static;
type Transaction: GetSignature + Send + Sync + 'static;

Expand Down Expand Up @@ -76,8 +76,9 @@ impl Settings {
}
}

#[derive(Clone)]
pub struct SolanaRpc {
provider: RpcClient,
provider: Arc<RpcClient>,
program_cache: BurnProgramCache,
cluster: String,
keypair: [u8; 64],
Expand All @@ -101,7 +102,7 @@ impl SolanaRpc {
}
Ok(Arc::new(Self {
cluster: settings.cluster.clone(),
provider,
provider: Arc::new(provider),
program_cache,
keypair: keypair.to_bytes(),
payers_to_monitor: settings.payers_to_monitor()?,
Expand Down Expand Up @@ -296,7 +297,7 @@ impl SolanaNetwork for SolanaRpc {
}
}

#[derive(Default)]
#[derive(Default, Clone)]
pub struct PriorityFee {
last_estimate: Arc<Mutex<LastEstimate>>,
}
Expand Down Expand Up @@ -364,6 +365,7 @@ impl LastEstimate {
}

/// Cached pubkeys for the burn program
#[derive(Clone)]
pub struct BurnProgramCache {
pub account_payer: Pubkey,
pub data_credits: Pubkey,
Expand Down

0 comments on commit 2582122

Please sign in to comment.