Skip to content

Commit

Permalink
Fix coherency issue with 2 phase commits (#699)
Browse files Browse the repository at this point in the history
  • Loading branch information
Matthew Plant authored Jan 26, 2024
1 parent f9d2b07 commit 513e696
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 9 deletions.
17 changes: 15 additions & 2 deletions iot_packet_verifier/src/burner.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use crate::{
balances::{BalanceCache, BalanceStore},
pending::{Burn, PendingTables, PendingTablesTransaction},
pending::{
confirm_pending_txns, Burn, ConfirmPendingError, PendingTables, PendingTablesTransaction,
},
};
use futures::{future::LocalBoxFuture, TryFutureExt};
use solana::{GetSignature, SolanaNetwork};
Expand Down Expand Up @@ -42,6 +44,8 @@ pub enum BurnError<S> {
SqlError(#[from] sqlx::Error),
#[error("Solana error: {0}")]
SolanaError(S),
#[error("Confirm pending transaction error: {0}")]
ConfirmPendingError(#[from] ConfirmPendingError<S>),
}

impl<P, S> Burner<P, S> {
Expand All @@ -66,10 +70,19 @@ where
burn_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);

loop {
#[rustfmt::skip]
tokio::select! {
biased;
_ = shutdown.clone() => break,
_ = burn_timer.tick() => { let _ = self.burn().await; }
_ = burn_timer.tick() => {
match self.burn().await {
Ok(()) => continue,
Err(err) => {
tracing::error!("Error while burning data credits: {err}");
confirm_pending_txns(&self.pending_tables, &self.solana, &self.balances).await?;
}
}
}
}
}
tracing::info!("Stopping burner");
Expand Down
8 changes: 4 additions & 4 deletions iot_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,13 @@ impl Cmd {
None
};

// Check if we have any left over pending transactions, and if we
// do check if they have been confirmed:
confirm_pending_txns(&pool, &solana).await?;

// Set up the balance cache:
let balances = BalanceCache::new(&pool, solana.clone()).await?;

// Check if we have any left over pending transactions, and if we
// do check if they have been confirmed:
confirm_pending_txns(&pool, &solana, &balances.balances()).await?;

// Set up the balance burner:
let burner = Burner::new(
pool.clone(),
Expand Down
19 changes: 18 additions & 1 deletion iot_packet_verifier/src/pending.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use sqlx::{postgres::PgRow, FromRow, PgPool, Postgres, Row, Transaction};
use std::{collections::HashMap, sync::Arc};
use tokio::sync::Mutex;

use crate::balances::BalanceStore;

/// To avoid excessive burn transaction (which cost us money), we institute a minimum
/// amount of Data Credits accounted for before we burn from a payer:
const BURN_THRESHOLD: i64 = 10_000;
Expand Down Expand Up @@ -55,6 +57,7 @@ pub enum ConfirmPendingError<S> {
pub async fn confirm_pending_txns<S>(
pending_tables: &impl PendingTables,
solana: &S,
balances: &BalanceStore,
) -> Result<(), ConfirmPendingError<S::Error>>
where
S: SolanaNetwork,
Expand Down Expand Up @@ -82,6 +85,10 @@ where
{
txn.subtract_burned_amount(&pending.payer, pending.amount)
.await?;
let mut balance_lock = balances.lock().await;
let payer_account = balance_lock.get_mut(&pending.payer).unwrap();
payer_account.burned = payer_account.burned.saturating_sub(pending.amount);
payer_account.balance = payer_account.balance.saturating_sub(pending.amount);
}
// Commit our work:
txn.commit().await?;
Expand Down Expand Up @@ -380,6 +387,8 @@ impl<'a> PendingTablesTransaction<'a> for &'a MockPendingTables {

#[cfg(test)]
mod test {
use crate::balances::PayerAccount;

use super::*;
use std::collections::HashSet;

Expand Down Expand Up @@ -443,6 +452,14 @@ mod test {
time_of_submission: Utc::now() - Duration::minutes(1),
},
);
let mut balances = HashMap::new();
balances.insert(
payer.clone(),
PayerAccount {
balance: CONFIRMED_BURN_AMOUNT + UNCONFIRMED_BURN_AMOUNT,
burned: CONFIRMED_BURN_AMOUNT + UNCONFIRMED_BURN_AMOUNT,
},
);
let mut pending_burns = HashMap::new();
pending_burns.insert(
payer.clone(),
Expand All @@ -458,7 +475,7 @@ mod test {
confirmed_txns.insert(confirmed);
let confirmed = MockConfirmed(confirmed_txns);
// Confirm and resolve transactions:
confirm_pending_txns(&pending_tables, &confirmed)
confirm_pending_txns(&pending_tables, &confirmed, &Arc::new(Mutex::new(balances)))
.await
.unwrap();
// The amount left in the pending burns table should only be the unconfirmed
Expand Down
14 changes: 12 additions & 2 deletions iot_packet_verifier/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use helium_proto::{
DataRate, Region,
};
use iot_packet_verifier::{
balances::BalanceCache,
balances::{BalanceCache, PayerAccount},
burner::Burner,
pending::{confirm_pending_txns, AddPendingBurn, Burn, MockPendingTables, PendingTables},
verifier::{payload_size_to_dc, ConfigServer, Org, Verifier, BYTES_PER_DC},
Expand Down Expand Up @@ -543,6 +543,14 @@ async fn test_pending_txns(pool: PgPool) -> anyhow::Result<()> {
payer.clone(),
CONFIRMED_BURN_AMOUNT + UNCONFIRMED_BURN_AMOUNT,
);
let mut cache = HashMap::new();
cache.insert(
payer.clone(),
PayerAccount {
balance: CONFIRMED_BURN_AMOUNT + UNCONFIRMED_BURN_AMOUNT,
burned: CONFIRMED_BURN_AMOUNT + UNCONFIRMED_BURN_AMOUNT,
},
);
let mock_network = MockSolanaNetwork::new(ledger);

// Add both the burn amounts to the pending burns table
Expand Down Expand Up @@ -579,7 +587,9 @@ async fn test_pending_txns(pool: PgPool) -> anyhow::Result<()> {
}

// Confirm pending transactions
confirm_pending_txns(&pool, &mock_network).await.unwrap();
confirm_pending_txns(&pool, &mock_network, &Arc::new(Mutex::new(cache)))
.await
.unwrap();

let pending_burn: Burn = sqlx::query_as("SELECT * FROM pending_burns LIMIT 1")
.fetch_one(&pool)
Expand Down

0 comments on commit 513e696

Please sign in to comment.