Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use tokio::watch for escrow_accounts #413

Merged
merged 12 commits into from
Nov 4, 2024
38 changes: 14 additions & 24 deletions common/src/escrow_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@ use std::{

use alloy::primitives::{Address, U256};
use anyhow::{anyhow, Result};
use eventuals::{timer, Eventual, EventualExt};
use graphql_client::GraphQLQuery;
use thiserror::Error;
use tokio::time::sleep;
use tokio::sync::watch::Receiver;
use tracing::{error, warn};

use crate::prelude::SubgraphClient;
use crate::{prelude::SubgraphClient, watcher};

#[derive(Error, Debug)]
pub enum EscrowAccountsError {
Expand Down Expand Up @@ -99,27 +98,16 @@ type BigInt = String;
)]
pub struct EscrowAccountQuery;

pub fn escrow_accounts(
pub async fn escrow_accounts(
escrow_subgraph: &'static SubgraphClient,
indexer_address: Address,
interval: Duration,
reject_thawing_signers: bool,
) -> Eventual<EscrowAccounts> {
timer(interval).map_with_retry(
move |_| async move {
get_escrow_accounts(escrow_subgraph, indexer_address, reject_thawing_signers)
.await
.map_err(|e| e.to_string())
},
move |err: String| {
error!(
"Failed to fetch escrow accounts for indexer {:?}: {}",
indexer_address, err
);

sleep(interval.div_f32(2.0))
},
)
) -> Result<Receiver<EscrowAccounts>, anyhow::Error> {
watcher::new_watcher(interval, move || {
get_escrow_accounts(escrow_subgraph, indexer_address, reject_thawing_signers)
})
.await
}

async fn get_escrow_accounts(
Expand Down Expand Up @@ -238,15 +226,17 @@ mod tests {
);
mock_server.register(mock).await;

let accounts = escrow_accounts(
let mut accounts = escrow_accounts(
escrow_subgraph,
*test_vectors::INDEXER_ADDRESS,
Duration::from_secs(60),
true,
);

)
.await
.unwrap();
accounts.changed().await.unwrap();
assert_eq!(
accounts.value().await.unwrap(),
accounts.borrow().clone(),
EscrowAccounts::new(
test_vectors::ESCROW_ACCOUNTS_BALANCES.to_owned(),
test_vectors::ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS.to_owned(),
Expand Down
11 changes: 4 additions & 7 deletions common/src/indexer_service/http/indexer_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use axum::{
};
use axum::{serve, ServiceExt};
use build_info::BuildInfo;
use eventuals::Eventual;
use prometheus::TextEncoder;
use reqwest::StatusCode;
use serde::{de::DeserializeOwned, Serialize};
Expand Down Expand Up @@ -83,8 +82,6 @@ where
{
#[error("Issues with provided receipt: {0}")]
ReceiptError(tap_core::Error),
#[error("Service is not ready yet, try again in a moment")]
ServiceNotReady,
#[error("No attestation signer found for allocation `{0}`")]
NoSignerForAllocation(Address),
#[error("Invalid request body: {0}")]
Expand Down Expand Up @@ -120,8 +117,6 @@ where
}

let status = match self {
ServiceNotReady => StatusCode::SERVICE_UNAVAILABLE,

Unauthorized => StatusCode::UNAUTHORIZED,

NoSignerForAllocation(_) | FailedToSignAttestation => StatusCode::INTERNAL_SERVER_ERROR,
Expand Down Expand Up @@ -188,7 +183,7 @@ where
pub service_impl: Arc<I>,

// tap
pub escrow_accounts: Eventual<EscrowAccounts>,
pub escrow_accounts: Receiver<EscrowAccounts>,
pub domain_separator: Eip712Domain,
}

Expand Down Expand Up @@ -311,7 +306,9 @@ impl IndexerService {
options.config.indexer.indexer_address,
options.config.subgraphs.escrow.config.syncing_interval_secs,
true, // Reject thawing signers eagerly
);
)
.await
.expect("Error creating escrow_accounts channel");

// Establish Database connection necessary for serving indexer management
// requests with defined schema
Expand Down
10 changes: 3 additions & 7 deletions common/src/indexer_service/http/request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ where
});

// recover the signer address
// get escrow accounts from eventual
// get escrow accounts from channel
// return sender from signer
//
// TODO: We are currently doing this process twice.
Expand All @@ -141,13 +141,9 @@ where
let signer = receipt
.recover_signer(&state.domain_separator)
.map_err(IndexerServiceError::CouldNotDecodeSigner)?;

let escrow_accounts = state
let sender = state
.escrow_accounts
.value_immediate()
.ok_or(IndexerServiceError::ServiceNotReady)?;

let sender = escrow_accounts
.borrow()
.get_sender_for_signer(&signer)
.map_err(IndexerServiceError::EscrowAccount)?;

Expand Down
3 changes: 1 addition & 2 deletions common/src/tap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use crate::tap::checks::value_check::MinimumValue;
use crate::{escrow_accounts::EscrowAccounts, prelude::Allocation};
use alloy::dyn_abi::Eip712Domain;
use alloy::primitives::Address;
use eventuals::Eventual;
use receipt_store::{DatabaseReceipt, InnerContext};
use sqlx::PgPool;
use std::fmt::Debug;
Expand Down Expand Up @@ -44,7 +43,7 @@ impl IndexerTapContext {
pub async fn get_checks(
pgpool: PgPool,
indexer_allocations: Receiver<HashMap<Address, Allocation>>,
escrow_accounts: Eventual<EscrowAccounts>,
escrow_accounts: Receiver<EscrowAccounts>,
domain_separator: Eip712Domain,
timestamp_error_tolerance: Duration,
receipt_max_value: u128,
Expand Down
16 changes: 9 additions & 7 deletions common/src/tap/checks/deny_list_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
use crate::escrow_accounts::EscrowAccounts;
use alloy::dyn_abi::Eip712Domain;
use alloy::primitives::Address;
use eventuals::Eventual;
use sqlx::postgres::PgListener;
use sqlx::PgPool;
use std::collections::HashSet;
Expand All @@ -16,10 +15,11 @@ use tap_core::receipt::{
state::Checking,
ReceiptWithState,
};
use tokio::sync::watch::Receiver;
use tracing::error;

pub struct DenyListCheck {
escrow_accounts: Eventual<EscrowAccounts>,
escrow_accounts: Receiver<EscrowAccounts>,
domain_separator: Eip712Domain,
sender_denylist: Arc<RwLock<HashSet<Address>>>,
_sender_denylist_watcher_handle: Arc<tokio::task::JoinHandle<()>>,
Expand All @@ -29,7 +29,7 @@ pub struct DenyListCheck {
impl DenyListCheck {
pub async fn new(
pgpool: PgPool,
escrow_accounts: Eventual<EscrowAccounts>,
escrow_accounts: Receiver<EscrowAccounts>,
domain_separator: Eip712Domain,
) -> Self {
// Listen to pg_notify events. We start it before updating the sender_denylist so that we
Expand Down Expand Up @@ -163,7 +163,7 @@ impl Check for DenyListCheck {
anyhow::anyhow!(e)
})
.map_err(CheckError::Failed)?;
let escrow_accounts_snapshot = self.escrow_accounts.value_immediate().unwrap_or_default();
let escrow_accounts_snapshot = self.escrow_accounts.borrow();

let receipt_sender = escrow_accounts_snapshot
.get_sender_for_signer(&receipt_signer)
Expand Down Expand Up @@ -200,6 +200,7 @@ mod tests {

use alloy::hex::ToHexExt;
use tap_core::receipt::{Context, ReceiptWithState};
use tokio::sync::watch;

use crate::test_vectors::{self, create_signed_receipt, TAP_SENDER};

Expand All @@ -209,14 +210,15 @@ mod tests {

async fn new_deny_list_check(pgpool: PgPool) -> DenyListCheck {
// Mock escrow accounts
let escrow_accounts = Eventual::from_value(EscrowAccounts::new(
let escrow_accounts_rx = watch::channel(EscrowAccounts::new(
test_vectors::ESCROW_ACCOUNTS_BALANCES.to_owned(),
test_vectors::ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS.to_owned(),
));
))
.1;

DenyListCheck::new(
pgpool,
escrow_accounts,
escrow_accounts_rx,
test_vectors::TAP_EIP712_DOMAIN.to_owned(),
)
.await
Expand Down
8 changes: 4 additions & 4 deletions common/src/tap/checks/sender_balance_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,22 @@ use crate::escrow_accounts::EscrowAccounts;
use alloy::dyn_abi::Eip712Domain;
use alloy::primitives::U256;
use anyhow::anyhow;
use eventuals::Eventual;
use tap_core::receipt::{
checks::{Check, CheckError, CheckResult},
state::Checking,
ReceiptWithState,
};
use tokio::sync::watch::Receiver;
use tracing::error;

pub struct SenderBalanceCheck {
escrow_accounts: Eventual<EscrowAccounts>,
escrow_accounts: Receiver<EscrowAccounts>,

domain_separator: Eip712Domain,
}

impl SenderBalanceCheck {
pub fn new(escrow_accounts: Eventual<EscrowAccounts>, domain_separator: Eip712Domain) -> Self {
pub fn new(escrow_accounts: Receiver<EscrowAccounts>, domain_separator: Eip712Domain) -> Self {
Self {
escrow_accounts,
domain_separator,
Expand All @@ -35,7 +35,7 @@ impl Check for SenderBalanceCheck {
_: &tap_core::receipt::Context,
receipt: &ReceiptWithState<Checking>,
) -> CheckResult {
let escrow_accounts_snapshot = self.escrow_accounts.value_immediate().unwrap_or_default();
let escrow_accounts_snapshot = self.escrow_accounts.borrow();

let receipt_signer = receipt
.signed_receipt()
Expand Down
4 changes: 3 additions & 1 deletion tap-agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,9 @@ pub async fn start_agent() -> (ActorRef<SenderAccountsManagerMessage>, JoinHandl
*indexer_address,
*escrow_sync_interval,
false,
);
)
.await
.expect("Error creating escrow_accounts channel");

let config = Box::leak(Box::new(SenderAccountConfig::from_config(&CONFIG)));

Expand Down
Loading
Loading