Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use tokio::watch for escrow_accounts #413

Merged
merged 12 commits into from
Nov 4, 2024
53 changes: 32 additions & 21 deletions common/src/escrow_accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ use std::{

use alloy::primitives::{Address, U256};
use anyhow::{anyhow, Result};
use eventuals::{timer, Eventual, EventualExt};
use graphql_client::GraphQLQuery;
use thiserror::Error;
use tokio::time::sleep;
use tokio::{
sync::watch::{self, Receiver},
time::{self, sleep},
};
use tracing::{error, warn};

use crate::prelude::SubgraphClient;
Expand Down Expand Up @@ -104,22 +106,31 @@ pub fn escrow_accounts(
indexer_address: Address,
interval: Duration,
reject_thawing_signers: bool,
) -> Eventual<EscrowAccounts> {
timer(interval).map_with_retry(
move |_| async move {
get_escrow_accounts(escrow_subgraph, indexer_address, reject_thawing_signers)
.await
.map_err(|e| e.to_string())
},
move |err: String| {
error!(
"Failed to fetch escrow accounts for indexer {:?}: {}",
indexer_address, err
);

sleep(interval.div_f32(2.0))
},
)
) -> Receiver<EscrowAccounts> {
let (tx, rx) = watch::channel(EscrowAccounts::default());
tokio::spawn(async move {
let mut time_interval = time::interval(interval);
time_interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
loop {
time_interval.tick().await;
let result =
get_escrow_accounts(escrow_subgraph, indexer_address, reject_thawing_signers).await;
match result {
Ok(accounts) => tx
.send(accounts)
.expect("Failed to update escrow_accounts channel"),
Err(err) => {
error!(
"Failed to fetch escrow accounts for indexer {:?}: {}",
indexer_address, err
);
// Sleep for a bit before we retry
sleep(interval.div_f32(2.0)).await;
}
}
}
});
rx
gusinacio marked this conversation as resolved.
Show resolved Hide resolved
}

async fn get_escrow_accounts(
Expand Down Expand Up @@ -235,15 +246,15 @@ mod tests {
);
mock_server.register(mock).await;

let accounts = escrow_accounts(
let mut accounts = escrow_accounts(
escrow_subgraph,
*test_vectors::INDEXER_ADDRESS,
Duration::from_secs(60),
true,
);

accounts.changed().await.unwrap();
assert_eq!(
accounts.value().await.unwrap(),
accounts.borrow().clone(),
EscrowAccounts::new(
test_vectors::ESCROW_ACCOUNTS_BALANCES.to_owned(),
test_vectors::ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS.to_owned(),
Expand Down
10 changes: 4 additions & 6 deletions common/src/indexer_service/http/indexer_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ use axum::{
};
use axum::{serve, ServiceExt};
use build_info::BuildInfo;
use eventuals::Eventual;
use prometheus::TextEncoder;
use reqwest::StatusCode;
use serde::{de::DeserializeOwned, Serialize};
Expand Down Expand Up @@ -83,8 +82,8 @@ where
{
#[error("Issues with provided receipt: {0}")]
ReceiptError(tap_core::Error),
#[error("Service is not ready yet, try again in a moment")]
ServiceNotReady,
// #[error("Service is not ready yet, try again in a moment")]
// ServiceNotReady,
gusinacio marked this conversation as resolved.
Show resolved Hide resolved
#[error("No attestation signer found for allocation `{0}`")]
NoSignerForAllocation(Address),
#[error("Invalid request body: {0}")]
Expand Down Expand Up @@ -120,8 +119,7 @@ where
}

let status = match self {
ServiceNotReady => StatusCode::SERVICE_UNAVAILABLE,

//ServiceNotReady => StatusCode::SERVICE_UNAVAILABLE,
gusinacio marked this conversation as resolved.
Show resolved Hide resolved
Unauthorized => StatusCode::UNAUTHORIZED,

NoSignerForAllocation(_) | FailedToSignAttestation => StatusCode::INTERNAL_SERVER_ERROR,
Expand Down Expand Up @@ -188,7 +186,7 @@ where
pub service_impl: Arc<I>,

// tap
pub escrow_accounts: Eventual<EscrowAccounts>,
pub escrow_accounts: Receiver<EscrowAccounts>,
pub domain_separator: Eip712Domain,
}

Expand Down
10 changes: 3 additions & 7 deletions common/src/indexer_service/http/request_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ where
let allocation_id = receipt.message.allocation_id;

// recover the signer address
// get escrow accounts from eventual
// get escrow accounts from channel
// return sender from signer
//
// TODO: We are currently doing this process twice.
Expand All @@ -120,13 +120,9 @@ where
let signer = receipt
.recover_signer(&state.domain_separator)
.map_err(IndexerServiceError::CouldNotDecodeSigner)?;

let escrow_accounts = state
.escrow_accounts
.value_immediate()
.ok_or(IndexerServiceError::ServiceNotReady)?;

let escrow_accounts = state.escrow_accounts.clone();
let sender = escrow_accounts
.borrow()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you don't need to clone here.

.get_sender_for_signer(&signer)
.map_err(IndexerServiceError::EscrowAccount)?;

Expand Down
3 changes: 1 addition & 2 deletions common/src/tap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use crate::tap::checks::timestamp_check::TimestampCheck;
use crate::{escrow_accounts::EscrowAccounts, prelude::Allocation};
use alloy::dyn_abi::Eip712Domain;
use alloy::primitives::Address;
use eventuals::Eventual;
use receipt_store::{DatabaseReceipt, InnerContext};
use sqlx::PgPool;
use std::fmt::Debug;
Expand Down Expand Up @@ -40,7 +39,7 @@ impl IndexerTapContext {
pub async fn get_checks(
pgpool: PgPool,
indexer_allocations: Receiver<HashMap<Address, Allocation>>,
escrow_accounts: Eventual<EscrowAccounts>,
escrow_accounts: Receiver<EscrowAccounts>,
domain_separator: Eip712Domain,
timestamp_error_tolerance: Duration,
receipt_max_value: u128,
Expand Down
16 changes: 9 additions & 7 deletions common/src/tap/checks/deny_list_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
use crate::escrow_accounts::EscrowAccounts;
use alloy::dyn_abi::Eip712Domain;
use alloy::primitives::Address;
use eventuals::Eventual;
use sqlx::postgres::PgListener;
use sqlx::PgPool;
use std::collections::HashSet;
Expand All @@ -16,10 +15,11 @@ use tap_core::receipt::{
state::Checking,
ReceiptWithState,
};
use tokio::sync::watch::Receiver;
use tracing::error;

pub struct DenyListCheck {
escrow_accounts: Eventual<EscrowAccounts>,
escrow_accounts: Receiver<EscrowAccounts>,
domain_separator: Eip712Domain,
sender_denylist: Arc<RwLock<HashSet<Address>>>,
_sender_denylist_watcher_handle: Arc<tokio::task::JoinHandle<()>>,
Expand All @@ -29,7 +29,7 @@ pub struct DenyListCheck {
impl DenyListCheck {
pub async fn new(
pgpool: PgPool,
escrow_accounts: Eventual<EscrowAccounts>,
escrow_accounts: Receiver<EscrowAccounts>,
domain_separator: Eip712Domain,
) -> Self {
// Listen to pg_notify events. We start it before updating the sender_denylist so that we
Expand Down Expand Up @@ -159,7 +159,7 @@ impl Check for DenyListCheck {
anyhow::anyhow!(e)
})
.map_err(CheckError::Failed)?;
let escrow_accounts_snapshot = self.escrow_accounts.value_immediate().unwrap_or_default();
let escrow_accounts_snapshot = self.escrow_accounts.borrow();

let receipt_sender = escrow_accounts_snapshot
.get_sender_for_signer(&receipt_signer)
Expand Down Expand Up @@ -196,6 +196,7 @@ mod tests {

use alloy::hex::ToHexExt;
use tap_core::receipt::ReceiptWithState;
use tokio::sync::watch;

use crate::test_vectors::{self, create_signed_receipt, TAP_SENDER};

Expand All @@ -205,14 +206,15 @@ mod tests {

async fn new_deny_list_check(pgpool: PgPool) -> DenyListCheck {
// Mock escrow accounts
let escrow_accounts = Eventual::from_value(EscrowAccounts::new(
let escrow_accounts_rx = watch::channel(EscrowAccounts::new(
test_vectors::ESCROW_ACCOUNTS_BALANCES.to_owned(),
test_vectors::ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS.to_owned(),
));
))
.1;

DenyListCheck::new(
pgpool,
escrow_accounts,
escrow_accounts_rx,
test_vectors::TAP_EIP712_DOMAIN.to_owned(),
)
.await
Expand Down
9 changes: 4 additions & 5 deletions common/src/tap/checks/sender_balance_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,22 +5,22 @@ use crate::escrow_accounts::EscrowAccounts;
use alloy::dyn_abi::Eip712Domain;
use alloy::primitives::U256;
use anyhow::anyhow;
use eventuals::Eventual;
use tap_core::receipt::{
checks::{Check, CheckError, CheckResult},
state::Checking,
ReceiptWithState,
};
use tokio::sync::watch::Receiver;
use tracing::error;

pub struct SenderBalanceCheck {
escrow_accounts: Eventual<EscrowAccounts>,
escrow_accounts: Receiver<EscrowAccounts>,

domain_separator: Eip712Domain,
}

impl SenderBalanceCheck {
pub fn new(escrow_accounts: Eventual<EscrowAccounts>, domain_separator: Eip712Domain) -> Self {
pub fn new(escrow_accounts: Receiver<EscrowAccounts>, domain_separator: Eip712Domain) -> Self {
Self {
escrow_accounts,
domain_separator,
Expand All @@ -31,8 +31,7 @@ impl SenderBalanceCheck {
#[async_trait::async_trait]
impl Check for SenderBalanceCheck {
async fn check(&self, receipt: &ReceiptWithState<Checking>) -> CheckResult {
let escrow_accounts_snapshot = self.escrow_accounts.value_immediate().unwrap_or_default();

let escrow_accounts_snapshot = self.escrow_accounts.borrow();
let receipt_signer = receipt
.signed_receipt()
.recover_signer(&self.domain_separator)
Expand Down
Loading
Loading