Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix coherency issue with 2 phase commits in iot packet verifier #699

Merged
merged 2 commits into from
Jan 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions iot_packet_verifier/src/burner.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::{
balances::{BalanceCache, BalanceStore},
pending::{Burn, PendingTables, PendingTablesTransaction},
pending::{
confirm_pending_txns, Burn, ConfirmPendingError, PendingTables, PendingTablesTransaction,
},
};
use futures::{future::LocalBoxFuture, TryFutureExt};
use solana::{GetSignature, SolanaNetwork};
Expand Down Expand Up @@ -42,6 +44,8 @@ pub enum BurnError<S> {
SqlError(#[from] sqlx::Error),
#[error("Solana error: {0}")]
SolanaError(S),
#[error("Confirm pending transaction error: {0}")]
ConfirmPendingError(#[from] ConfirmPendingError<S>),
}

impl<P, S> Burner<P, S> {
Expand All @@ -66,10 +70,19 @@ where
burn_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);

loop {
#[rustfmt::skip]
tokio::select! {
biased;
_ = shutdown.clone() => break,
_ = burn_timer.tick() => { let _ = self.burn().await; }
_ = burn_timer.tick() => {
match self.burn().await {
Ok(()) => continue,
Err(err) => {
tracing::error!("Error while burning data credits: {err}");
confirm_pending_txns(&self.pending_tables, &self.solana, &self.balances).await?;
}
}
}
}
}
tracing::info!("Stopping burner");
Expand Down
8 changes: 4 additions & 4 deletions iot_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,13 @@ impl Cmd {
None
};

// Check if we have any left over pending transactions, and if we
// do check if they have been confirmed:
confirm_pending_txns(&pool, &solana).await?;

// Set up the balance cache:
let balances = BalanceCache::new(&pool, solana.clone()).await?;

// Check if we have any left over pending transactions, and if we
// do check if they have been confirmed:
confirm_pending_txns(&pool, &solana, &balances.balances()).await?;

// Set up the balance burner:
let burner = Burner::new(
pool.clone(),
Expand Down
19 changes: 18 additions & 1 deletion iot_packet_verifier/src/pending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use sqlx::{postgres::PgRow, FromRow, PgPool, Postgres, Row, Transaction};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::Mutex;

use crate::balances::BalanceStore;

/// To avoid excessive burn transaction (which cost us money), we institute a minimum
/// amount of Data Credits accounted for before we burn from a payer:
const BURN_THRESHOLD: i64 = 10_000;
Expand Down Expand Up @@ -55,6 +57,7 @@ pub enum ConfirmPendingError<S> {
pub async fn confirm_pending_txns<S>(
pending_tables: &impl PendingTables,
solana: &S,
balances: &BalanceStore,
) -> Result<(), ConfirmPendingError<S::Error>>
where
S: SolanaNetwork,
Expand Down Expand Up @@ -82,6 +85,10 @@ where
{
txn.subtract_burned_amount(&pending.payer, pending.amount)
.await?;
let mut balance_lock = balances.lock().await;
let payer_account = balance_lock.get_mut(&pending.payer).unwrap();
payer_account.burned = payer_account.burned.saturating_sub(pending.amount);
payer_account.balance = payer_account.balance.saturating_sub(pending.amount);
}
// Commit our work:
txn.commit().await?;
Expand Down Expand Up @@ -380,6 +387,8 @@ impl<'a> PendingTablesTransaction<'a> for &'a MockPendingTables {

#[cfg(test)]
mod test {
use crate::balances::PayerAccount;

use super::*;
use std::collections::HashSet;

Expand Down Expand Up @@ -443,6 +452,14 @@ mod test {
time_of_submission: Utc::now() - Duration::minutes(1),
},
);
let mut balances = HashMap::new();
balances.insert(
payer.clone(),
PayerAccount {
balance: CONFIRMED_BURN_AMOUNT + UNCONFIRMED_BURN_AMOUNT,
burned: CONFIRMED_BURN_AMOUNT + UNCONFIRMED_BURN_AMOUNT,
},
);
let mut pending_burns = HashMap::new();
pending_burns.insert(
payer.clone(),
Expand All @@ -458,7 +475,7 @@ mod test {
confirmed_txns.insert(confirmed);
let confirmed = MockConfirmed(confirmed_txns);
// Confirm and resolve transactions:
confirm_pending_txns(&pending_tables, &confirmed)
confirm_pending_txns(&pending_tables, &confirmed, &Arc::new(Mutex::new(balances)))
.await
.unwrap();
// The amount left in the pending burns table should only be the unconfirmed
Expand Down
14 changes: 12 additions & 2 deletions iot_packet_verifier/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use helium_proto::{
DataRate, Region,
};
use iot_packet_verifier::{
balances::BalanceCache,
balances::{BalanceCache, PayerAccount},
burner::Burner,
pending::{confirm_pending_txns, AddPendingBurn, Burn, MockPendingTables, PendingTables},
verifier::{payload_size_to_dc, ConfigServer, Org, Verifier, BYTES_PER_DC},
Expand Down Expand Up @@ -543,6 +543,14 @@ async fn test_pending_txns(pool: PgPool) -> anyhow::Result<()> {
payer.clone(),
CONFIRMED_BURN_AMOUNT + UNCONFIRMED_BURN_AMOUNT,
);
let mut cache = HashMap::new();
cache.insert(
payer.clone(),
PayerAccount {
balance: CONFIRMED_BURN_AMOUNT + UNCONFIRMED_BURN_AMOUNT,
burned: CONFIRMED_BURN_AMOUNT + UNCONFIRMED_BURN_AMOUNT,
},
);
let mock_network = MockSolanaNetwork::new(ledger);

// Add both the burn amounts to the pending burns table
Expand Down Expand Up @@ -579,7 +587,9 @@ async fn test_pending_txns(pool: PgPool) -> anyhow::Result<()> {
}

// Confirm pending transactions
confirm_pending_txns(&pool, &mock_network).await.unwrap();
confirm_pending_txns(&pool, &mock_network, &Arc::new(Mutex::new(cache)))
.await
.unwrap();

let pending_burn: Burn = sqlx::query_as("SELECT * FROM pending_burns LIMIT 1")
.fetch_one(&pool)
Expand Down