Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mobile Verifier Burner Txn Error check #906

Merged
merged 7 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions iot_packet_verifier/src/pending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,7 @@ mod test {
use super::*;
use std::collections::HashSet;

#[derive(Clone)]
struct MockConfirmed(HashSet<Signature>);

#[async_trait]
Expand Down
5 changes: 3 additions & 2 deletions iot_packet_verifier/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -576,15 +576,16 @@ async fn test_end_to_end() {
);
}

#[derive(Clone)]
struct MockSolanaNetwork {
confirmed: Mutex<HashSet<Signature>>,
confirmed: Arc<Mutex<HashSet<Signature>>>,
ledger: Arc<Mutex<HashMap<PublicKeyBinary, u64>>>,
}

impl MockSolanaNetwork {
fn new(ledger: HashMap<PublicKeyBinary, u64>) -> Self {
Self {
confirmed: Default::default(),
confirmed: Arc::new(Default::default()),
ledger: Arc::new(Mutex::new(ledger)),
}
}
Expand Down
151 changes: 125 additions & 26 deletions mobile_packet_verifier/src/burner.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,33 @@
use std::time::Duration;

use file_store::file_sink::FileSinkClient;
use helium_crypto::PublicKeyBinary;
use helium_proto::services::packet_verifier::ValidDataTransferSession;
use solana::burn::SolanaNetwork;
use solana::{burn::SolanaNetwork, GetSignature};
use sqlx::{Pool, Postgres};
use tracing::Instrument;

use crate::pending_burns;

pub struct Burner<S> {
valid_sessions: FileSinkClient<ValidDataTransferSession>,
solana: S,
failed_retry_attempts: usize,
failed_check_interval: Duration,
}

impl<S> Burner<S> {
pub fn new(valid_sessions: FileSinkClient<ValidDataTransferSession>, solana: S) -> Self {
pub fn new(
valid_sessions: FileSinkClient<ValidDataTransferSession>,
solana: S,
failed_retry_attempts: usize,
failed_check_interval: Duration,
) -> Self {
Self {
valid_sessions,
solana,
failed_retry_attempts,
failed_check_interval,
}
}
}
Expand All @@ -33,43 +45,130 @@ where
let payer_balance = self.solana.payer_balance(&payer).await?;

if payer_balance < total_dcs {
tracing::warn!(%payer, %payer_balance, %total_dcs, "Payer does not have enough balance to burn dcs");
tracing::warn!(
%payer,
%payer_balance,
%total_dcs,
"Payer does not have enough balance to burn dcs"
);
continue;
}

tracing::info!(%total_dcs, %payer, "Burning DC");
if self.burn_data_credits(&payer, total_dcs).await.is_err() {
// We have failed to burn data credits:
metrics::counter!("burned", "payer" => payer.to_string(), "success" => "false")
.increment(total_dcs);
continue;
let txn = self.solana.make_burn_transaction(&payer, total_dcs).await?;
match self.solana.submit_transaction(&txn).await {
Ok(()) => {
handle_transaction_success(
pool,
payer,
total_dcs,
sessions,
&self.valid_sessions,
)
.await?;
}
Err(err) => {
let span = tracing::info_span!(
"txn_confirmation",
signature = %txn.get_signature(),
%payer,
total_dcs,
max_attempts = self.failed_retry_attempts
);

// block on confirmation
self.transaction_confirmation_check(pool, err, txn, payer, total_dcs, sessions)
.instrument(span)
.await;
}
}
}

// We succesfully managed to burn data credits:
Ok(())
}

metrics::counter!("burned", "payer" => payer.to_string(), "success" => "true")
.increment(total_dcs);
async fn transaction_confirmation_check(
&self,
pool: &Pool<Postgres>,
err: S::Error,
txn: S::Transaction,
payer: PublicKeyBinary,
total_dcs: u64,
sessions: Vec<pending_burns::DataTransferSession>,
) {
tracing::warn!(?err, "starting txn confirmation check");
// We don't know if the txn actually made it, maybe it did

// Delete from the data transfer session and write out to S3
pending_burns::delete_for_payer(pool, &payer, total_dcs).await?;
let signature = txn.get_signature();

for session in sessions {
self.valid_sessions
.write(ValidDataTransferSession::from(session), &[])
.await?;
let mut attempt = 0;
while attempt <= self.failed_retry_attempts {
tokio::time::sleep(self.failed_check_interval).await;
match self.solana.confirm_transaction(signature).await {
Ok(true) => {
tracing::debug!("txn confirmed on chain");
let txn_success = handle_transaction_success(
pool,
payer,
total_dcs,
sessions,
&self.valid_sessions,
)
.await;
if let Err(err) = txn_success {
tracing::error!(?err, "txn succeeded, something else failed");
}

return;
}
Ok(false) => {
tracing::info!(attempt, "txn not confirmed, yet...");
attempt += 1;
continue;
}
Err(err) => {
// Client errors do not count against retry attempts
tracing::error!(?err, attempt, "failed to confirm txn");
continue;
}
}
}

Ok(())
tracing::warn!("failed to confirm txn");

// We have failed to burn data credits:
metrics::counter!(
"burned",
"payer" => payer.to_string(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to include the payer? this is either just the one service provider or an (potentially) unbounded list of payers that balloons the metrics cost by increasing the breadth of the monitored dataset, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already include payer in this metric today. There could be more wallets in the future that pay for this, but we have some alerts that require payer be in this metric

"success" => "false"
)
.increment(total_dcs);
}
}

async fn burn_data_credits(
&self,
payer: &PublicKeyBinary,
amount: u64,
) -> Result<(), S::Error> {
let txn = self.solana.make_burn_transaction(payer, amount).await?;
self.solana.submit_transaction(&txn).await?;
Ok(())
async fn handle_transaction_success(
pool: &Pool<Postgres>,
payer: PublicKeyBinary,
total_dcs: u64,
sessions: Vec<pending_burns::DataTransferSession>,
valid_sessions: &FileSinkClient<ValidDataTransferSession>,
) -> Result<(), anyhow::Error> {
// We succesfully managed to burn data credits:
metrics::counter!(
"burned",
"payer" => payer.to_string(),
"success" => "true"
)
.increment(total_dcs);

// Delete from the data transfer session and write out to S3
pending_burns::delete_for_payer(pool, &payer, total_dcs).await?;

for session in sessions {
valid_sessions
.write(ValidDataTransferSession::from(session), &[])
.await?;
}

Ok(())
}
7 changes: 6 additions & 1 deletion mobile_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,12 @@ impl Cmd {
)
.await?;

let burner = Burner::new(valid_sessions, solana);
let burner = Burner::new(
valid_sessions,
solana,
settings.txn_confirmation_retry_attempts,
settings.txn_confirmation_check_interval,
);

let file_store = FileStore::from_settings(&settings.ingest).await?;

Expand Down
19 changes: 19 additions & 0 deletions mobile_packet_verifier/src/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ pub struct Settings {
pub purger_interval: Duration,
#[serde(with = "humantime_serde", default = "default_purger_max_age")]
pub purger_max_age: Duration,
/// When a burn transaction is not a success, how many times should
/// we try to confirm the transaction before considering it a failure?
#[serde(default = "default_txn_confirmation_retry_attempts")]
pub txn_confirmation_retry_attempts: usize,
/// When a burn transaction is not a success, how long should we
michaeldjeffrey marked this conversation as resolved.
Show resolved Hide resolved
/// wait between trying to confirm if the transaction made it to Solana?
#[serde(
with = "humantime_serde",
default = "default_txn_confirmation_check_interval"
)]
pub txn_confirmation_check_interval: Duration,
}

fn default_purger_interval() -> Duration {
Expand All @@ -44,6 +55,14 @@ fn default_purger_max_age() -> Duration {
humantime::parse_duration("24 hours").unwrap()
}

fn default_txn_confirmation_check_interval() -> Duration {
humantime::parse_duration("1 min").unwrap()
}

fn default_txn_confirmation_retry_attempts() -> usize {
5
}

fn default_start_after() -> DateTime<Utc> {
DateTime::UNIX_EPOCH
}
Expand Down
10 changes: 6 additions & 4 deletions solana/src/burn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use std::{
use tokio::sync::Mutex;

#[async_trait]
pub trait SolanaNetwork: Send + Sync + 'static {
pub trait SolanaNetwork: Clone + Send + Sync + 'static {
type Error: std::error::Error + Send + Sync + 'static;
type Transaction: GetSignature + Send + Sync + 'static;

Expand Down Expand Up @@ -76,8 +76,9 @@ impl Settings {
}
}

#[derive(Clone)]
pub struct SolanaRpc {
provider: RpcClient,
provider: Arc<RpcClient>,
program_cache: BurnProgramCache,
cluster: String,
keypair: [u8; 64],
Expand All @@ -101,7 +102,7 @@ impl SolanaRpc {
}
Ok(Arc::new(Self {
cluster: settings.cluster.clone(),
provider,
provider: Arc::new(provider),
program_cache,
keypair: keypair.to_bytes(),
payers_to_monitor: settings.payers_to_monitor()?,
Expand Down Expand Up @@ -296,7 +297,7 @@ impl SolanaNetwork for SolanaRpc {
}
}

#[derive(Default)]
#[derive(Default, Clone)]
pub struct PriorityFee {
last_estimate: Arc<Mutex<LastEstimate>>,
}
Expand Down Expand Up @@ -364,6 +365,7 @@ impl LastEstimate {
}

/// Cached pubkeys for the burn program
#[derive(Clone)]
pub struct BurnProgramCache {
pub account_payer: Pubkey,
pub data_credits: Pubkey,
Expand Down