Skip to content

Commit

Permalink
refactor: use tokio::watch for escrow_accounts (#413)
Browse files Browse the repository at this point in the history
  • Loading branch information
taslimmuhammed authored Nov 4, 2024
1 parent be1a25b commit 5fa85ac
Show file tree
Hide file tree
Showing 16 changed files with 191 additions and 220 deletions.
38 changes: 14 additions & 24 deletions common/src/escrow_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@ use std::{

use alloy::primitives::{Address, U256};
use anyhow::{anyhow, Result};
use eventuals::{timer, Eventual, EventualExt};
use graphql_client::GraphQLQuery;
use thiserror::Error;
use tokio::time::sleep;
use tokio::sync::watch::Receiver;
use tracing::{error, warn};

use crate::prelude::SubgraphClient;
use crate::{prelude::SubgraphClient, watcher};

#[derive(Error, Debug)]
pub enum EscrowAccountsError {
Expand Down Expand Up @@ -99,27 +98,16 @@ type BigInt = String;
)]
pub struct EscrowAccountQuery;

pub fn escrow_accounts(
pub async fn escrow_accounts(
escrow_subgraph: &'static SubgraphClient,
indexer_address: Address,
interval: Duration,
reject_thawing_signers: bool,
) -> Eventual<EscrowAccounts> {
timer(interval).map_with_retry(
move |_| async move {
get_escrow_accounts(escrow_subgraph, indexer_address, reject_thawing_signers)
.await
.map_err(|e| e.to_string())
},
move |err: String| {
error!(
"Failed to fetch escrow accounts for indexer {:?}: {}",
indexer_address, err
);

sleep(interval.div_f32(2.0))
},
)
) -> Result<Receiver<EscrowAccounts>, anyhow::Error> {
watcher::new_watcher(interval, move || {
get_escrow_accounts(escrow_subgraph, indexer_address, reject_thawing_signers)
})
.await
}

async fn get_escrow_accounts(
Expand Down Expand Up @@ -238,15 +226,17 @@ mod tests {
);
mock_server.register(mock).await;

let accounts = escrow_accounts(
let mut accounts = escrow_accounts(
escrow_subgraph,
*test_vectors::INDEXER_ADDRESS,
Duration::from_secs(60),
true,
);

)
.await
.unwrap();
accounts.changed().await.unwrap();
assert_eq!(
accounts.value().await.unwrap(),
accounts.borrow().clone(),
EscrowAccounts::new(
test_vectors::ESCROW_ACCOUNTS_BALANCES.to_owned(),
test_vectors::ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS.to_owned(),
Expand Down
11 changes: 4 additions & 7 deletions common/src/indexer_service/http/indexer_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use axum::{
};
use axum::{serve, ServiceExt};
use build_info::BuildInfo;
use eventuals::Eventual;
use prometheus::TextEncoder;
use reqwest::StatusCode;
use serde::{de::DeserializeOwned, Serialize};
Expand Down Expand Up @@ -83,8 +82,6 @@ where
{
#[error("Issues with provided receipt: {0}")]
ReceiptError(tap_core::Error),
#[error("Service is not ready yet, try again in a moment")]
ServiceNotReady,
#[error("No attestation signer found for allocation `{0}`")]
NoSignerForAllocation(Address),
#[error("Invalid request body: {0}")]
Expand Down Expand Up @@ -120,8 +117,6 @@ where
}

let status = match self {
ServiceNotReady => StatusCode::SERVICE_UNAVAILABLE,

Unauthorized => StatusCode::UNAUTHORIZED,

NoSignerForAllocation(_) | FailedToSignAttestation => StatusCode::INTERNAL_SERVER_ERROR,
Expand Down Expand Up @@ -188,7 +183,7 @@ where
pub service_impl: Arc<I>,

// tap
pub escrow_accounts: Eventual<EscrowAccounts>,
pub escrow_accounts: Receiver<EscrowAccounts>,
pub domain_separator: Eip712Domain,
}

Expand Down Expand Up @@ -311,7 +306,9 @@ impl IndexerService {
options.config.indexer.indexer_address,
options.config.subgraphs.escrow.config.syncing_interval_secs,
true, // Reject thawing signers eagerly
);
)
.await
.expect("Error creating escrow_accounts channel");

// Establish Database connection necessary for serving indexer management
// requests with defined schema
Expand Down
10 changes: 3 additions & 7 deletions common/src/indexer_service/http/request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ where
});

// recover the signer address
// get escrow accounts from eventual
// get escrow accounts from channel
// return sender from signer
//
// TODO: We are currently doing this process twice.
Expand All @@ -141,13 +141,9 @@ where
let signer = receipt
.recover_signer(&state.domain_separator)
.map_err(IndexerServiceError::CouldNotDecodeSigner)?;

let escrow_accounts = state
let sender = state
.escrow_accounts
.value_immediate()
.ok_or(IndexerServiceError::ServiceNotReady)?;

let sender = escrow_accounts
.borrow()
.get_sender_for_signer(&signer)
.map_err(IndexerServiceError::EscrowAccount)?;

Expand Down
3 changes: 1 addition & 2 deletions common/src/tap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use crate::tap::checks::value_check::MinimumValue;
use crate::{escrow_accounts::EscrowAccounts, prelude::Allocation};
use alloy::dyn_abi::Eip712Domain;
use alloy::primitives::Address;
use eventuals::Eventual;
use receipt_store::{DatabaseReceipt, InnerContext};
use sqlx::PgPool;
use std::fmt::Debug;
Expand Down Expand Up @@ -44,7 +43,7 @@ impl IndexerTapContext {
pub async fn get_checks(
pgpool: PgPool,
indexer_allocations: Receiver<HashMap<Address, Allocation>>,
escrow_accounts: Eventual<EscrowAccounts>,
escrow_accounts: Receiver<EscrowAccounts>,
domain_separator: Eip712Domain,
timestamp_error_tolerance: Duration,
receipt_max_value: u128,
Expand Down
16 changes: 9 additions & 7 deletions common/src/tap/checks/deny_list_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
use crate::escrow_accounts::EscrowAccounts;
use alloy::dyn_abi::Eip712Domain;
use alloy::primitives::Address;
use eventuals::Eventual;
use sqlx::postgres::PgListener;
use sqlx::PgPool;
use std::collections::HashSet;
Expand All @@ -16,10 +15,11 @@ use tap_core::receipt::{
state::Checking,
ReceiptWithState,
};
use tokio::sync::watch::Receiver;
use tracing::error;

pub struct DenyListCheck {
escrow_accounts: Eventual<EscrowAccounts>,
escrow_accounts: Receiver<EscrowAccounts>,
domain_separator: Eip712Domain,
sender_denylist: Arc<RwLock<HashSet<Address>>>,
_sender_denylist_watcher_handle: Arc<tokio::task::JoinHandle<()>>,
Expand All @@ -29,7 +29,7 @@ pub struct DenyListCheck {
impl DenyListCheck {
pub async fn new(
pgpool: PgPool,
escrow_accounts: Eventual<EscrowAccounts>,
escrow_accounts: Receiver<EscrowAccounts>,
domain_separator: Eip712Domain,
) -> Self {
// Listen to pg_notify events. We start it before updating the sender_denylist so that we
Expand Down Expand Up @@ -163,7 +163,7 @@ impl Check for DenyListCheck {
anyhow::anyhow!(e)
})
.map_err(CheckError::Failed)?;
let escrow_accounts_snapshot = self.escrow_accounts.value_immediate().unwrap_or_default();
let escrow_accounts_snapshot = self.escrow_accounts.borrow();

let receipt_sender = escrow_accounts_snapshot
.get_sender_for_signer(&receipt_signer)
Expand Down Expand Up @@ -200,6 +200,7 @@ mod tests {

use alloy::hex::ToHexExt;
use tap_core::receipt::{Context, ReceiptWithState};
use tokio::sync::watch;

use crate::test_vectors::{self, create_signed_receipt, TAP_SENDER};

Expand All @@ -209,14 +210,15 @@ mod tests {

async fn new_deny_list_check(pgpool: PgPool) -> DenyListCheck {
// Mock escrow accounts
let escrow_accounts = Eventual::from_value(EscrowAccounts::new(
let escrow_accounts_rx = watch::channel(EscrowAccounts::new(
test_vectors::ESCROW_ACCOUNTS_BALANCES.to_owned(),
test_vectors::ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS.to_owned(),
));
))
.1;

DenyListCheck::new(
pgpool,
escrow_accounts,
escrow_accounts_rx,
test_vectors::TAP_EIP712_DOMAIN.to_owned(),
)
.await
Expand Down
8 changes: 4 additions & 4 deletions common/src/tap/checks/sender_balance_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,22 @@ use crate::escrow_accounts::EscrowAccounts;
use alloy::dyn_abi::Eip712Domain;
use alloy::primitives::U256;
use anyhow::anyhow;
use eventuals::Eventual;
use tap_core::receipt::{
checks::{Check, CheckError, CheckResult},
state::Checking,
ReceiptWithState,
};
use tokio::sync::watch::Receiver;
use tracing::error;

pub struct SenderBalanceCheck {
escrow_accounts: Eventual<EscrowAccounts>,
escrow_accounts: Receiver<EscrowAccounts>,

domain_separator: Eip712Domain,
}

impl SenderBalanceCheck {
pub fn new(escrow_accounts: Eventual<EscrowAccounts>, domain_separator: Eip712Domain) -> Self {
pub fn new(escrow_accounts: Receiver<EscrowAccounts>, domain_separator: Eip712Domain) -> Self {
Self {
escrow_accounts,
domain_separator,
Expand All @@ -35,7 +35,7 @@ impl Check for SenderBalanceCheck {
_: &tap_core::receipt::Context,
receipt: &ReceiptWithState<Checking>,
) -> CheckResult {
let escrow_accounts_snapshot = self.escrow_accounts.value_immediate().unwrap_or_default();
let escrow_accounts_snapshot = self.escrow_accounts.borrow();

let receipt_signer = receipt
.signed_receipt()
Expand Down
4 changes: 3 additions & 1 deletion tap-agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ pub async fn start_agent() -> (ActorRef<SenderAccountsManagerMessage>, JoinHandl
*indexer_address,
*escrow_sync_interval,
false,
);
)
.await
.expect("Error creating escrow_accounts channel");

let config = Box::leak(Box::new(SenderAccountConfig::from_config(&CONFIG)));

Expand Down
Loading

0 comments on commit 5fa85ac

Please sign in to comment.