Skip to content

Commit

Permalink
Make Solana Errors concrete types (#924)
Browse files Browse the repository at this point in the history
* reify SolanaNetwork::Error to SolanaRpcError

* Debiter::Error to SolanaRpcError

* ConfigServerError::OrgError to ClientError

* VerificationError::ConfigError to ConfigServerError

* VerificationError::WriteError to file_store::Error

* simplify Verifier::verify signature

* ConfirmPendingError:SolanaError to SolanaRpcError

* format tokio::select!

* allow auto from error type for big errors

* Add file path to keypair error

* remove unneccessary import

* MonitorError::Config/Solana to SolanaRpcError and ConfigServerError

* use auto error conversion for verifier

* boosting SolanaNetwork::Error to SolanaRpcError

* use std::time for macro

may not be imported where the macro is used

* Add temp error for failing boost_manager test

When the rest of this code starts to use helium-lib proper, this error type will be removed in lieu of mocking a tonic::Status error
  • Loading branch information
michaeldjeffrey authored Dec 20, 2024
1 parent b756ace commit bad8f0d
Show file tree
Hide file tree
Showing 12 changed files with 157 additions and 205 deletions.
18 changes: 6 additions & 12 deletions boost_manager/tests/integrations/updater_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use async_trait::async_trait;
use boost_manager::{db, updater::Updater, OnChainStatus};
use chrono::{DateTime, Utc};
use file_store::hex_boost::BoostedHexActivation;
use solana::{start_boost::SolanaNetwork, GetSignature};
use solana::{start_boost::SolanaNetwork, GetSignature, SolanaRpcError};
use solana_sdk::signature::Signature;
use sqlx::{PgPool, Postgres, Transaction};
use std::{string::ToString, sync::Mutex, time::Duration};
Expand All @@ -23,12 +23,6 @@ pub struct MockSolanaConnection {
error: Option<String>,
}

#[derive(thiserror::Error, Debug)]
pub enum Error {
#[error("not found")]
SubmitError(String),
}

impl MockSolanaConnection {
fn ok() -> Self {
Self {
Expand All @@ -47,28 +41,28 @@ impl MockSolanaConnection {

#[async_trait]
impl SolanaNetwork for MockSolanaConnection {
type Error = Error;
type Transaction = MockTransaction;

async fn make_start_boost_transaction(
&self,
batch: &[BoostedHexActivation],
) -> Result<Self::Transaction, Self::Error> {
) -> Result<Self::Transaction, SolanaRpcError> {
Ok(MockTransaction {
signature: Signature::new_unique(),
_activations: batch.to_owned(),
})
}

async fn submit_transaction(&self, txn: &Self::Transaction) -> Result<(), Self::Error> {
async fn submit_transaction(&self, txn: &Self::Transaction) -> Result<(), SolanaRpcError> {
self.submitted.lock().unwrap().push(txn.clone());

self.error
.as_ref()
.map(|str| Err(Error::SubmitError(str.clone())))
.map(|err| Err(SolanaRpcError::Test(err.to_owned())))
.unwrap_or(Ok(()))
}

async fn confirm_transaction(&self, _id: &str) -> Result<bool, Self::Error> {
async fn confirm_transaction(&self, _id: &str) -> Result<bool, SolanaRpcError> {
Ok(true)
}
}
Expand Down
12 changes: 4 additions & 8 deletions iot_config/src/client/org_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@ use helium_proto::services::iot_config::{

#[async_trait]
pub trait Orgs: Send + Sync + 'static {
type Error: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static;

async fn get(&mut self, oui: u64) -> Result<OrgResV1, Self::Error>;
async fn list(&mut self) -> Result<Vec<OrgV1>, Self::Error>;
async fn enable(&mut self, oui: u64) -> Result<(), Self::Error>;
async fn disable(&mut self, oui: u64) -> Result<(), Self::Error>;
async fn get(&mut self, oui: u64) -> Result<OrgResV1, ClientError>;
async fn list(&mut self) -> Result<Vec<OrgV1>, ClientError>;
async fn enable(&mut self, oui: u64) -> Result<(), ClientError>;
async fn disable(&mut self, oui: u64) -> Result<(), ClientError>;
}

#[derive(Clone)]
Expand All @@ -42,8 +40,6 @@ impl OrgClient {

#[async_trait]
impl Orgs for OrgClient {
type Error = ClientError;

async fn get(&mut self, oui: u64) -> Result<OrgResV1, ClientError> {
tracing::debug!(%oui, "retrieving org");

Expand Down
6 changes: 2 additions & 4 deletions iot_packet_verifier/src/balances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
verifier::Debiter,
};
use helium_crypto::PublicKeyBinary;
use solana::burn::SolanaNetwork;
use solana::{burn::SolanaNetwork, SolanaRpcError};
use std::{
collections::{hash_map::Entry, HashMap},
sync::Arc,
Expand Down Expand Up @@ -62,16 +62,14 @@ impl<S> Debiter for BalanceCache<S>
where
S: SolanaNetwork,
{
type Error = S::Error;

/// Debits the balance from the cache, returning the remaining balance as an
/// option if there was enough and none otherwise.
async fn debit_if_sufficient(
&self,
payer: &PublicKeyBinary,
amount: u64,
trigger_balance_check_threshold: u64,
) -> Result<Option<u64>, S::Error> {
) -> Result<Option<u64>, SolanaRpcError> {
let mut payer_accounts = self.payer_accounts.lock().await;

// Fetch the balance if we haven't seen the payer before
Expand Down
35 changes: 17 additions & 18 deletions iot_packet_verifier/src/burner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
},
};
use futures::{future::LocalBoxFuture, TryFutureExt};
use solana::{burn::SolanaNetwork, GetSignature};
use solana::{burn::SolanaNetwork, GetSignature, SolanaRpcError};
use std::time::Duration;
use task_manager::ManagedTask;
use tokio::time::{self, MissedTickBehavior};
Expand Down Expand Up @@ -37,15 +37,15 @@ where
}

#[derive(thiserror::Error, Debug)]
pub enum BurnError<S> {
pub enum BurnError {
#[error("Join error: {0}")]
JoinError(#[from] tokio::task::JoinError),
#[error("Sql error: {0}")]
SqlError(#[from] sqlx::Error),
#[error("Solana error: {0}")]
SolanaError(S),
SolanaError(#[from] SolanaRpcError),
#[error("Confirm pending transaction error: {0}")]
ConfirmPendingError(#[from] ConfirmPendingError<S>),
ConfirmPendingError(#[from] ConfirmPendingError),
}

impl<P, S> Burner<P, S> {
Expand All @@ -69,32 +69,31 @@ where
P: PendingTables + Send + Sync + 'static,
S: SolanaNetwork,
{
pub async fn run(mut self, shutdown: triggered::Listener) -> Result<(), BurnError<S::Error>> {
pub async fn run(mut self, shutdown: triggered::Listener) -> Result<(), BurnError> {
tracing::info!("Starting burner");
let mut burn_timer = time::interval(self.burn_period);
burn_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);

loop {
#[rustfmt::skip]
tokio::select! {
biased;
_ = shutdown.clone() => break,
_ = burn_timer.tick() => {
match self.burn().await {
Ok(()) => continue,
Err(err) => {
tracing::error!("Error while burning data credits: {err}");
confirm_pending_txns(&self.pending_tables, &self.solana, &self.balances).await?;
}
}
}
biased;
_ = shutdown.clone() => break,
_ = burn_timer.tick() => {
match self.burn().await {
Ok(()) => continue,
Err(err) => {
tracing::error!("Error while burning data credits: {err}");
confirm_pending_txns(&self.pending_tables, &self.solana, &self.balances).await?;
}
}
}
}
}
tracing::info!("Stopping burner");
Ok(())
}

pub async fn burn(&mut self) -> Result<(), BurnError<S::Error>> {
pub async fn burn(&mut self) -> Result<(), BurnError> {
// Fetch the next payer and amount that should be burn. If no such burn
// exists, perform no action.
let Some(Burn { payer, amount }) = self.pending_tables.fetch_next_burn().await? else {
Expand Down
4 changes: 2 additions & 2 deletions iot_packet_verifier/src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ where
self.minimum_allowed_balance,
&mut transaction,
reports,
&self.valid_packets,
&self.invalid_packets,
&mut self.valid_packets,
&mut self.invalid_packets,
)
.await?;
transaction.commit().await?;
Expand Down
20 changes: 10 additions & 10 deletions iot_packet_verifier/src/pending.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use async_trait::async_trait;
use chrono::{DateTime, Duration, Utc};
use helium_crypto::PublicKeyBinary;
use solana::burn::SolanaNetwork;
use solana::{burn::SolanaNetwork, SolanaRpcError};
use solana_sdk::signature::Signature;
use sqlx::{postgres::PgRow, FromRow, PgPool, Postgres, Row, Transaction};
use std::{collections::HashMap, sync::Arc};
Expand Down Expand Up @@ -45,20 +45,20 @@ pub trait PendingTables {
}

#[derive(thiserror::Error, Debug)]
pub enum ConfirmPendingError<S> {
pub enum ConfirmPendingError {
#[error("Sqlx error: {0}")]
SqlxError(#[from] sqlx::Error),
#[error("Chrono error: {0}")]
ChronoError(#[from] chrono::OutOfRangeError),
#[error("Solana error: {0}")]
SolanaError(S),
SolanaError(#[from] SolanaRpcError),
}

pub async fn confirm_pending_txns<S>(
pending_tables: &impl PendingTables,
solana: &S,
balances: &BalanceStore,
) -> Result<(), ConfirmPendingError<S::Error>>
) -> Result<(), ConfirmPendingError>
where
S: SolanaNetwork,
{
Expand Down Expand Up @@ -165,7 +165,7 @@ impl PendingTables for PgPool {
}

#[async_trait]
impl AddPendingBurn for &'_ mut Transaction<'_, Postgres> {
impl AddPendingBurn for Transaction<'_, Postgres> {
async fn add_burned_amount(
&mut self,
payer: &PublicKeyBinary,
Expand Down Expand Up @@ -387,6 +387,7 @@ impl<'a> PendingTablesTransaction<'a> for &'a MockPendingTables {

#[cfg(test)]
mod test {

use crate::balances::PayerAccount;

use super::*;
Expand All @@ -397,11 +398,10 @@ mod test {

#[async_trait]
impl SolanaNetwork for MockConfirmed {
type Error = std::convert::Infallible;
type Transaction = Signature;

#[allow(clippy::diverging_sub_expression)]
async fn payer_balance(&self, _payer: &PublicKeyBinary) -> Result<u64, Self::Error> {
async fn payer_balance(&self, _payer: &PublicKeyBinary) -> Result<u64, SolanaRpcError> {
unreachable!()
}

Expand All @@ -410,19 +410,19 @@ mod test {
&self,
_payer: &PublicKeyBinary,
_amount: u64,
) -> Result<Self::Transaction, Self::Error> {
) -> Result<Self::Transaction, SolanaRpcError> {
unreachable!()
}

#[allow(clippy::diverging_sub_expression)]
async fn submit_transaction(
&self,
_transaction: &Self::Transaction,
) -> Result<(), Self::Error> {
) -> Result<(), SolanaRpcError> {
unreachable!()
}

async fn confirm_transaction(&self, txn: &Signature) -> Result<bool, Self::Error> {
async fn confirm_transaction(&self, txn: &Signature) -> Result<bool, SolanaRpcError> {
Ok(self.0.contains(txn))
}
}
Expand Down
Loading

0 comments on commit bad8f0d

Please sign in to comment.