From a2f180745761c43ba08e3ec2e26ccbe8721cbef6 Mon Sep 17 00:00:00 2001 From: taslimmuhammed Date: Sat, 26 Oct 2024 07:27:47 +0530 Subject: [PATCH 1/9] axum issue solved --- common/src/escrow_accounts.rs | 50 ++++++++------ .../indexer_service/http/indexer_service.rs | 5 +- .../indexer_service/http/request_handler.rs | 18 ++--- common/src/tap.rs | 3 +- common/src/tap/checks/deny_list_check.rs | 17 ++--- common/src/tap/checks/sender_balance_check.rs | 10 +-- tap-agent/src/agent.rs | 2 +- tap-agent/src/agent/sender_account.rs | 67 +++++++++---------- .../src/agent/sender_accounts_manager.rs | 30 ++++----- tap-agent/src/agent/sender_allocation.rs | 23 +++---- tap-agent/src/tap/context.rs | 8 ++- .../src/tap/context/checks/allocation_id.rs | 1 - tap-agent/src/tap/context/checks/signature.rs | 17 ++--- tap-agent/src/tap/context/rav.rs | 3 +- tap-agent/src/tap/context/receipt.rs | 34 +++++----- tap-agent/src/tap/escrow_adapter.rs | 34 +++++----- tap-agent/src/tap/mod.rs | 10 +-- 17 files changed, 153 insertions(+), 179 deletions(-) diff --git a/common/src/escrow_accounts.rs b/common/src/escrow_accounts.rs index ddee549a..516156fe 100644 --- a/common/src/escrow_accounts.rs +++ b/common/src/escrow_accounts.rs @@ -9,10 +9,9 @@ use std::{ use alloy::primitives::{Address, U256}; use anyhow::{anyhow, Result}; -use eventuals::{timer, Eventual, EventualExt}; use graphql_client::GraphQLQuery; use thiserror::Error; -use tokio::time::sleep; +use tokio::{sync::watch::{self, Receiver}, time::{self, sleep}}; use tracing::{error, warn}; use crate::prelude::SubgraphClient; @@ -104,22 +103,31 @@ pub fn escrow_accounts( indexer_address: Address, interval: Duration, reject_thawing_signers: bool, -) -> Eventual { - timer(interval).map_with_retry( - move |_| async move { - get_escrow_accounts(escrow_subgraph, indexer_address, reject_thawing_signers) - .await - .map_err(|e| e.to_string()) - }, - move |err: String| { - error!( - "Failed to fetch escrow accounts for indexer {:?}: {}", - indexer_address, err - ); - - sleep(interval.div_f32(2.0)) - }, - ) +) -> Receiver { + let (tx, rx) = watch::channel(EscrowAccounts::default()); + tokio::spawn(async move { + let mut time_interval = time::interval(interval); + time_interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip); + loop { + time_interval.tick().await; + let result = get_escrow_accounts(escrow_subgraph, indexer_address, reject_thawing_signers) + .await; + match result{ + Ok(accounts) => tx + .send(accounts) + .expect("Failed to update escrow_accounts channel"), + Err(err) => { + error!( + "Failed to fetch escrow accounts for indexer {:?}: {}", + indexer_address, err + ); + // Sleep for a bit before we retry + sleep(interval.div_f32(2.0)).await; + }, + } + } + }); + rx } async fn get_escrow_accounts( @@ -235,15 +243,15 @@ mod tests { ); mock_server.register(mock).await; - let accounts = escrow_accounts( + let mut accounts = escrow_accounts( escrow_subgraph, *test_vectors::INDEXER_ADDRESS, Duration::from_secs(60), true, ); - + accounts.changed().await; assert_eq!( - accounts.value().await.unwrap(), + accounts.borrow().clone(), EscrowAccounts::new( test_vectors::ESCROW_ACCOUNTS_BALANCES.to_owned(), test_vectors::ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS.to_owned(), diff --git a/common/src/indexer_service/http/indexer_service.rs b/common/src/indexer_service/http/indexer_service.rs index 06dce764..bfffd57c 100644 --- a/common/src/indexer_service/http/indexer_service.rs +++ b/common/src/indexer_service/http/indexer_service.rs @@ -14,7 +14,6 @@ use axum::{ }; use axum::{serve, ServiceExt}; use build_info::BuildInfo; -use eventuals::Eventual; use prometheus::TextEncoder; use reqwest::StatusCode; use serde::{de::DeserializeOwned, Serialize}; @@ -188,7 +187,7 @@ where pub service_impl: Arc, // tap - pub escrow_accounts: Eventual, + pub escrow_accounts: Receiver, pub domain_separator: Eip712Domain, } @@ -513,4 +512,4 @@ pub async fn shutdown_signal() { } info!("Signal received, starting graceful shutdown"); -} +} \ No newline at end of file diff --git a/common/src/indexer_service/http/request_handler.rs b/common/src/indexer_service/http/request_handler.rs index be84ab4b..2bb8ed81 100644 --- a/common/src/indexer_service/http/request_handler.rs +++ b/common/src/indexer_service/http/request_handler.rs @@ -56,13 +56,7 @@ pub async fn request_handler( where I: IndexerServiceImpl + Sync + Send + 'static, { - _request_handler(manifest_id, typed_header, state, headers, body) - .await - .inspect_err(|_| { - HANDLER_FAILURE - .with_label_values(&[&manifest_id.to_string()]) - .inc() - }) + _request_handler(manifest_id, typed_header, state, headers, body, ).await } async fn _request_handler( @@ -111,7 +105,7 @@ where let allocation_id = receipt.message.allocation_id; // recover the signer address - // get escrow accounts from eventual + // get escrow accounts from reciever // return sender from signer // // TODO: We are currently doing this process twice. @@ -120,13 +114,9 @@ where let signer = receipt .recover_signer(&state.domain_separator) .map_err(IndexerServiceError::CouldNotDecodeSigner)?; - - let escrow_accounts = state - .escrow_accounts - .value_immediate() - .ok_or(IndexerServiceError::ServiceNotReady)?; - + let escrow_accounts = state.escrow_accounts.clone(); let sender = escrow_accounts + .borrow() .get_sender_for_signer(&signer) .map_err(IndexerServiceError::EscrowAccount)?; diff --git a/common/src/tap.rs b/common/src/tap.rs index 2d23ca92..464d55c9 100644 --- a/common/src/tap.rs +++ b/common/src/tap.rs @@ -9,7 +9,6 @@ use crate::tap::checks::timestamp_check::TimestampCheck; use crate::{escrow_accounts::EscrowAccounts, prelude::Allocation}; use alloy::dyn_abi::Eip712Domain; use alloy::primitives::Address; -use eventuals::Eventual; use receipt_store::{DatabaseReceipt, InnerContext}; use sqlx::PgPool; use std::fmt::Debug; @@ -40,7 +39,7 @@ impl IndexerTapContext { pub async fn get_checks( pgpool: PgPool, indexer_allocations: Receiver>, - escrow_accounts: Eventual, + escrow_accounts: Receiver, domain_separator: Eip712Domain, timestamp_error_tolerance: Duration, receipt_max_value: u128, diff --git a/common/src/tap/checks/deny_list_check.rs b/common/src/tap/checks/deny_list_check.rs index 469e6a84..a0e1f0f8 100644 --- a/common/src/tap/checks/deny_list_check.rs +++ b/common/src/tap/checks/deny_list_check.rs @@ -4,9 +4,9 @@ use crate::escrow_accounts::EscrowAccounts; use alloy::dyn_abi::Eip712Domain; use alloy::primitives::Address; -use eventuals::Eventual; use sqlx::postgres::PgListener; use sqlx::PgPool; +use tokio::sync::watch::Receiver; use std::collections::HashSet; use std::sync::RwLock; use std::{str::FromStr, sync::Arc}; @@ -19,7 +19,7 @@ use tap_core::receipt::{ use tracing::error; pub struct DenyListCheck { - escrow_accounts: Eventual, + escrow_accounts: Receiver, domain_separator: Eip712Domain, sender_denylist: Arc>>, _sender_denylist_watcher_handle: Arc>, @@ -29,7 +29,7 @@ pub struct DenyListCheck { impl DenyListCheck { pub async fn new( pgpool: PgPool, - escrow_accounts: Eventual, + escrow_accounts: Receiver, domain_separator: Eip712Domain, ) -> Self { // Listen to pg_notify events. We start it before updating the sender_denylist so that we @@ -159,8 +159,8 @@ impl Check for DenyListCheck { anyhow::anyhow!(e) }) .map_err(CheckError::Failed)?; - let escrow_accounts_snapshot = self.escrow_accounts.value_immediate().unwrap_or_default(); - + let escrow_accounts_snapshot = self.escrow_accounts.borrow(); + let receipt_sender = escrow_accounts_snapshot .get_sender_for_signer(&receipt_signer) .map_err(|e| CheckError::Failed(e.into()))?; @@ -196,6 +196,7 @@ mod tests { use alloy::hex::ToHexExt; use tap_core::receipt::ReceiptWithState; + use tokio::sync::watch; use crate::test_vectors::{self, create_signed_receipt, TAP_SENDER}; @@ -205,14 +206,14 @@ mod tests { async fn new_deny_list_check(pgpool: PgPool) -> DenyListCheck { // Mock escrow accounts - let escrow_accounts = Eventual::from_value(EscrowAccounts::new( + let escrow_accounts_rx = watch::channel(EscrowAccounts::new( test_vectors::ESCROW_ACCOUNTS_BALANCES.to_owned(), test_vectors::ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS.to_owned(), - )); + )).1; DenyListCheck::new( pgpool, - escrow_accounts, + escrow_accounts_rx, test_vectors::TAP_EIP712_DOMAIN.to_owned(), ) .await diff --git a/common/src/tap/checks/sender_balance_check.rs b/common/src/tap/checks/sender_balance_check.rs index b0269e71..327c4ee1 100644 --- a/common/src/tap/checks/sender_balance_check.rs +++ b/common/src/tap/checks/sender_balance_check.rs @@ -1,26 +1,27 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 + use crate::escrow_accounts::EscrowAccounts; use alloy::dyn_abi::Eip712Domain; use alloy::primitives::U256; use anyhow::anyhow; -use eventuals::Eventual; use tap_core::receipt::{ checks::{Check, CheckError, CheckResult}, state::Checking, ReceiptWithState, }; +use tokio::sync::watch::Receiver; use tracing::error; pub struct SenderBalanceCheck { - escrow_accounts: Eventual, + escrow_accounts: Receiver, domain_separator: Eip712Domain, } impl SenderBalanceCheck { - pub fn new(escrow_accounts: Eventual, domain_separator: Eip712Domain) -> Self { + pub fn new(escrow_accounts: Receiver, domain_separator: Eip712Domain) -> Self { Self { escrow_accounts, domain_separator, @@ -31,8 +32,7 @@ impl SenderBalanceCheck { #[async_trait::async_trait] impl Check for SenderBalanceCheck { async fn check(&self, receipt: &ReceiptWithState) -> CheckResult { - let escrow_accounts_snapshot = self.escrow_accounts.value_immediate().unwrap_or_default(); - + let escrow_accounts_snapshot = self.escrow_accounts.borrow(); let receipt_signer = receipt .signed_receipt() .recover_signer(&self.domain_separator) diff --git a/tap-agent/src/agent.rs b/tap-agent/src/agent.rs index d79d02b0..bb4cbdb6 100644 --- a/tap-agent/src/agent.rs +++ b/tap-agent/src/agent.rs @@ -109,7 +109,7 @@ pub async fn start_agent() -> (ActorRef, JoinHandl escrow_subgraph, *indexer_address, Duration::from_millis(*escrow_syncing_interval_ms), - false, + false ); let args = SenderAccountsManagerArgs { diff --git a/tap-agent/src/agent/sender_account.rs b/tap-agent/src/agent/sender_account.rs index 596389ad..c38b8c24 100644 --- a/tap-agent/src/agent/sender_account.rs +++ b/tap-agent/src/agent/sender_account.rs @@ -19,7 +19,6 @@ use tokio::task::JoinHandle; use alloy::dyn_abi::Eip712Domain; use alloy::primitives::Address; use anyhow::Result; -use eventuals::{Eventual, EventualExt, PipeHandle}; use indexer_common::{escrow_accounts::EscrowAccounts, prelude::SubgraphClient}; use ractor::{Actor, ActorProcessingErr, ActorRef, MessagingErr, SupervisionEvent}; use sqlx::PgPool; @@ -123,7 +122,7 @@ pub struct SenderAccountArgs { pub config: &'static config::Config, pub pgpool: PgPool, pub sender_id: Address, - pub escrow_accounts: Eventual, + pub escrow_accounts: Receiver, pub indexer_allocations: Receiver>, pub escrow_subgraph: &'static SubgraphClient, pub domain_separator: Eip712Domain, @@ -140,7 +139,7 @@ pub struct State { invalid_receipts_tracker: SimpleFeeTracker, allocation_ids: HashSet
, _indexer_allocations_handle: JoinHandle<()>, - _escrow_account_monitor: PipeHandle, + _escrow_account_monitor: JoinHandle<()>, scheduled_rav_request: Option>>>, sender: Address, @@ -153,8 +152,8 @@ pub struct State { // concurrent rav request adaptive_limiter: AdaptiveLimiter, - //Eventuals - escrow_accounts: Eventual, + // Receivers + escrow_accounts: Receiver, escrow_subgraph: &'static SubgraphClient, escrow_adapter: EscrowAdapter, @@ -414,16 +413,19 @@ impl Actor for SenderAccount { let myself_clone = myself.clone(); let pgpool_clone = pgpool.clone(); - let _escrow_account_monitor = escrow_accounts.clone().pipe_async(move |escrow_account| { - let myself = myself_clone.clone(); - let pgpool = pgpool_clone.clone(); - // get balance or default value for sender - // this balance already takes into account thawing - let balance = escrow_account - .get_balance_for_sender(&sender_id) - .unwrap_or_default(); - - async move { + let mut accounts_clone = escrow_accounts.clone(); + let _escrow_account_monitor = tokio::spawn(async move{ + while accounts_clone.changed().await.is_ok(){ + // change let accounts_ref = escrow_accounts.borrow(); + // let escrow_account = accounts_ref.as_ref().unwrap(); + let escrow_account = accounts_clone.borrow().clone().unwrap(); + let myself = myself_clone.clone(); + let pgpool = pgpool_clone.clone(); + // Get balance or default value for sender + // this balance already takes into account thawing + let balance = escrow_account + .get_balance_for_sender(&sender_id) + .unwrap_or_default(); let last_non_final_ravs = sqlx::query!( r#" SELECT allocation_id, value_aggregate @@ -507,9 +509,7 @@ impl Actor for SenderAccount { .expect("Deny status cannot be null"); let sender_balance = escrow_accounts - .value() - .await - .expect("should be able to get escrow accounts") + .borrow() .get_balance_for_sender(&sender_id) .unwrap_or_default(); @@ -931,7 +931,6 @@ pub mod tests { }; use alloy::hex::ToHexExt; use alloy::primitives::{Address, U256}; - use eventuals::{Eventual, EventualWriter}; use indexer_common::escrow_accounts::EscrowAccounts; use indexer_common::prelude::{DeploymentDetails, SubgraphClient}; use ractor::concurrency::JoinHandle; @@ -942,7 +941,7 @@ pub mod tests { use std::sync::atomic::AtomicU32; use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; - use tokio::sync::watch; + use tokio::sync::watch::{self, Sender}; use wiremock::matchers::{body_string_contains, method}; use wiremock::{Mock, MockServer, ResponseTemplate}; @@ -1006,7 +1005,7 @@ pub mod tests { ActorRef, tokio::task::JoinHandle<()>, String, - EventualWriter, + Sender>, ) { let config = Box::leak(Box::new(config::Config { config: None, @@ -1029,12 +1028,12 @@ pub mod tests { None, DeploymentDetails::for_query_url(escrow_subgraph_endpoint).unwrap(), ))); - let (mut writer, escrow_accounts_eventual) = Eventual::new(); + let ( escrow_accounts_tx, escrow_accounts_rx) = watch::channel(None); - writer.write(EscrowAccounts::new( + escrow_accounts_tx.send(Some(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(ESCROW_VALUE))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - )); + ))); let prefix = format!( "test-{}", @@ -1045,7 +1044,7 @@ pub mod tests { config, pgpool, sender_id: SENDER.1, - escrow_accounts: escrow_accounts_eventual, + escrow_accounts: escrow_accounts_rx, indexer_allocations: watch::channel(initial_allocation).1, escrow_subgraph, domain_separator: TAP_EIP712_DOMAIN_SEPARATOR.clone(), @@ -1059,7 +1058,7 @@ pub mod tests { .await .unwrap(); tokio::time::sleep(Duration::from_millis(10)).await; - (sender, handle, prefix, writer) + (sender, handle, prefix, escrow_accounts_tx) } #[sqlx::test(migrations = "../migrations")] @@ -1849,7 +1848,7 @@ pub mod tests { .await .unwrap(); - let (sender_account, handle, _, mut escrow_writer) = create_sender_account( + let (sender_account, handle, _, escrow_accounts_tx) = create_sender_account( pgpool.clone(), HashSet::new(), TRIGGER_VALUE, @@ -1878,10 +1877,10 @@ pub mod tests { ) .await; // escrow_account updated - escrow_writer.write(EscrowAccounts::new( + escrow_accounts_tx.send(Some(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(1))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - )); + ))); // wait the actor react to the messages tokio::time::sleep(Duration::from_millis(10)).await; @@ -1903,7 +1902,7 @@ pub mod tests { .await .unwrap(); - let (sender_account, handle, _, mut escrow_writer) = create_sender_account( + let (sender_account, handle, _, escrow_accounts_tx) = create_sender_account( pgpool.clone(), HashSet::new(), TRIGGER_VALUE, @@ -1917,10 +1916,10 @@ pub mod tests { assert!(!deny, "should start unblocked"); // update the escrow to a lower value - escrow_writer.write(EscrowAccounts::new( + escrow_accounts_tx.send(Some(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(ESCROW_VALUE / 2))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - )); + ))); tokio::time::sleep(Duration::from_millis(20)).await; @@ -1928,10 +1927,10 @@ pub mod tests { assert!(deny, "should block the sender"); // simulate deposit - escrow_writer.write(EscrowAccounts::new( + escrow_accounts_tx.send(Some(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(ESCROW_VALUE))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - )); + ))); tokio::time::sleep(Duration::from_millis(10)).await; diff --git a/tap-agent/src/agent/sender_accounts_manager.rs b/tap-agent/src/agent/sender_accounts_manager.rs index aff314ec..c091745c 100644 --- a/tap-agent/src/agent/sender_accounts_manager.rs +++ b/tap-agent/src/agent/sender_accounts_manager.rs @@ -11,7 +11,6 @@ use alloy::dyn_abi::Eip712Domain; use alloy::primitives::Address; use anyhow::Result; use anyhow::{anyhow, bail}; -use eventuals::{Eventual, EventualExt, PipeHandle}; use indexer_common::escrow_accounts::EscrowAccounts; use indexer_common::prelude::{Allocation, SubgraphClient}; use ractor::{Actor, ActorCell, ActorProcessingErr, ActorRef, SupervisionEvent}; @@ -57,7 +56,7 @@ pub struct SenderAccountsManagerArgs { pub pgpool: PgPool, pub indexer_allocations: Receiver>, - pub escrow_accounts: Eventual, + pub escrow_accounts: Receiver, pub escrow_subgraph: &'static SubgraphClient, pub sender_aggregator_endpoints: HashMap, @@ -73,7 +72,7 @@ pub struct State { domain_separator: Eip712Domain, pgpool: PgPool, indexer_allocations: Receiver>, - escrow_accounts: Eventual, + escrow_accounts: Receiver, escrow_subgraph: &'static SubgraphClient, sender_aggregator_endpoints: HashMap, prefix: Option, @@ -341,11 +340,7 @@ impl State { } async fn get_pending_sender_allocation_id(&self) -> HashMap> { - let escrow_accounts_snapshot = self - .escrow_accounts - .value() - .await - .expect("Should get escrow accounts from Eventual"); + let escrow_accounts_snapshot = self.escrow_accounts.borrow(); // Gather all outstanding receipts and unfinalized RAVs from the database. // Used to create SenderAccount instances for all senders that have unfinalized allocations @@ -476,7 +471,7 @@ impl State { /// corresponding SenderAccount. async fn new_receipts_watcher( mut pglistener: PgListener, - escrow_accounts: Eventual, + escrow_accounts: Receiver, prefix: Option, ) { loop { @@ -504,7 +499,7 @@ async fn new_receipts_watcher( async fn handle_notification( new_receipt_notification: NewReceiptNotification, - escrow_accounts: &Eventual, + escrow_accounts: Receiver, prefix: Option<&str>, ) -> Result<()> { tracing::trace!( @@ -600,7 +595,6 @@ mod tests { ALLOCATION_ID_1, INDEXER, SENDER, SENDER_2, SENDER_3, SIGNER, TAP_EIP712_DOMAIN_SEPARATOR, }; use alloy::hex::ToHexExt; - use eventuals::{Eventual, EventualExt}; use indexer_common::escrow_accounts::EscrowAccounts; use indexer_common::prelude::{DeploymentDetails, SubgraphClient}; use ractor::concurrency::JoinHandle; @@ -648,9 +642,9 @@ mod tests { let (_allocations_tx, allocations_rx) = watch::channel(HashMap::new()); let escrow_subgraph = get_subgraph_client(); - let (mut escrow_accounts_writer, escrow_accounts_eventual) = - Eventual::::new(); - escrow_accounts_writer.write(EscrowAccounts::default()); + let (mut escrow_accounts_tx, escrow_accounts_rx) = + watch::channel(EscrowAccounts::default()); + // change escrow_accounts_tx.write(EscrowAccounts::default()); let prefix = format!( "test-{}", @@ -661,7 +655,7 @@ mod tests { domain_separator: TAP_EIP712_DOMAIN_SEPARATOR.clone(), pgpool, indexer_allocations: allocations_rx, - escrow_accounts: escrow_accounts_eventual, + escrow_accounts: escrow_accounts_rx, escrow_subgraph, sender_aggregator_endpoints: HashMap::from([ (SENDER.1, String::from("http://localhost:8000")), @@ -897,7 +891,7 @@ mod tests { 'scalar_tap_receipt_notification'", ); - let escrow_accounts_eventual = Eventual::from_value(EscrowAccounts::new( + let escrow_accounts_rx = Eventual::from_value(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(1000))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), )); @@ -905,7 +899,7 @@ mod tests { // Start the new_receipts_watcher task that will consume from the `pglistener` let new_receipts_watcher_handle = tokio::spawn(new_receipts_watcher( pglistener, - escrow_accounts_eventual, + escrow_accounts_rx, Some(prefix.clone()), )); @@ -973,4 +967,4 @@ mod tests { sender_account.stop_and_wait(None, None).await.unwrap(); join_handle.await.unwrap(); } -} +} \ No newline at end of file diff --git a/tap-agent/src/agent/sender_allocation.rs b/tap-agent/src/agent/sender_allocation.rs index b7f6ab32..915bec64 100644 --- a/tap-agent/src/agent/sender_allocation.rs +++ b/tap-agent/src/agent/sender_allocation.rs @@ -10,7 +10,6 @@ use alloy::primitives::Address; use alloy::{dyn_abi::Eip712Domain, hex::ToHexExt}; use anyhow::{anyhow, ensure, Result}; use bigdecimal::{num_bigint::BigInt, ToPrimitive}; -use eventuals::Eventual; use indexer_common::{escrow_accounts::EscrowAccounts, prelude::SubgraphClient}; use jsonrpsee::{core::client::ClientT, rpc_params}; use prometheus::{register_counter_vec, register_histogram_vec, CounterVec, HistogramVec}; @@ -27,6 +26,7 @@ use tap_core::{ }, signed_message::EIP712SignedMessage, }; +use tokio::sync::watch::Receiver; use tracing::{debug, error, warn}; use crate::{agent::sender_account::ReceiptFees, lazy_static}; @@ -101,7 +101,7 @@ pub struct SenderAllocationState { allocation_id: Address, sender: Address, config: &'static config::Config, - escrow_accounts: Eventual, + escrow_accounts: Receiver, domain_separator: Eip712Domain, sender_account_ref: ActorRef, @@ -113,7 +113,7 @@ pub struct SenderAllocationArgs { pub pgpool: PgPool, pub allocation_id: Address, pub sender: Address, - pub escrow_accounts: Eventual, + pub escrow_accounts: Receiver, pub escrow_subgraph: &'static SubgraphClient, pub escrow_adapter: EscrowAdapter, pub domain_separator: Eip712Domain, @@ -367,7 +367,7 @@ impl SenderAllocationState { tracing::trace!("calculate_unaggregated_fee()"); self.tap_manager.remove_obsolete_receipts().await?; - let signers = signers_trimmed(&self.escrow_accounts, self.sender).await?; + let signers = signers_trimmed(self.escrow_accounts.clone(), self.sender).await?; let res = sqlx::query!( r#" @@ -418,7 +418,7 @@ impl SenderAllocationState { async fn calculate_invalid_receipts_fee(&self) -> Result { tracing::trace!("calculate_invalid_receipts_fee()"); - let signers = signers_trimmed(&self.escrow_accounts, self.sender).await?; + let signers = signers_trimmed(self.escrow_accounts.clone(), self.sender).await?; // TODO: Get `rav.timestamp_ns` from the TAP Manager's RAV storage adapter instead? let res = sqlx::query!( @@ -523,7 +523,7 @@ impl SenderAllocationState { .map(|receipt| receipt.signed_receipt().message.timestamp_ns) .max() .expect("invalid receipts should not be empty"); - let signers = signers_trimmed(&self.escrow_accounts, self.sender).await?; + let signers = signers_trimmed(self.escrow_accounts.clone(), self.sender).await?; sqlx::query!( r#" DELETE FROM scalar_tap_receipts @@ -835,7 +835,6 @@ pub mod tests { }, }, }; - use eventuals::Eventual; use futures::future::join_all; use indexer_common::{ escrow_accounts::EscrowAccounts, @@ -859,7 +858,7 @@ pub mod tests { state::Checking, ReceiptWithState, }; - use tokio::sync::mpsc; + use tokio::sync::{mpsc, watch}; use wiremock::{ matchers::{body_string_contains, method}, Mock, MockServer, Respond, ResponseTemplate, @@ -942,12 +941,12 @@ pub mod tests { DeploymentDetails::for_query_url(escrow_subgraph_endpoint).unwrap(), ))); - let escrow_accounts_eventual = Eventual::from_value(EscrowAccounts::new( + let escrow_accounts_rx = watch::channel(Some(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(1000))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - )); + ))).1; - let escrow_adapter = EscrowAdapter::new(escrow_accounts_eventual.clone(), SENDER.1); + let escrow_adapter = EscrowAdapter::new(escrow_accounts_rx.clone(), SENDER.1); let sender_account_ref = match sender_account { Some(sender) => sender, @@ -962,7 +961,7 @@ pub mod tests { pgpool: pgpool.clone(), allocation_id: *ALLOCATION_ID_0, sender: SENDER.1, - escrow_accounts: escrow_accounts_eventual, + escrow_accounts: escrow_accounts_rx, escrow_subgraph, escrow_adapter, domain_separator: TAP_EIP712_DOMAIN_SEPARATOR.clone(), diff --git a/tap-agent/src/tap/context.rs b/tap-agent/src/tap/context.rs index 99c24eed..485766b2 100644 --- a/tap-agent/src/tap/context.rs +++ b/tap-agent/src/tap/context.rs @@ -1,9 +1,11 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 + + use alloy::primitives::Address; -use eventuals::Eventual; use indexer_common::escrow_accounts::EscrowAccounts; use sqlx::PgPool; +use tokio::sync::watch::Receiver; use super::escrow_adapter::EscrowAdapter; @@ -20,7 +22,7 @@ pub struct TapAgentContext { pgpool: PgPool, allocation_id: Address, sender: Address, - escrow_accounts: Eventual, + escrow_accounts: Receiver, escrow_adapter: EscrowAdapter, } @@ -29,7 +31,7 @@ impl TapAgentContext { pgpool: PgPool, allocation_id: Address, sender: Address, - escrow_accounts: Eventual, + escrow_accounts: Receiver, escrow_adapter: EscrowAdapter, ) -> Self { Self { diff --git a/tap-agent/src/tap/context/checks/allocation_id.rs b/tap-agent/src/tap/context/checks/allocation_id.rs index 62d60859..83db75a0 100644 --- a/tap-agent/src/tap/context/checks/allocation_id.rs +++ b/tap-agent/src/tap/context/checks/allocation_id.rs @@ -5,7 +5,6 @@ use std::time::Duration; use alloy::primitives::Address; use anyhow::anyhow; -use eventuals::{Eventual, EventualExt}; use graphql_client::GraphQLQuery; use indexer_common::subgraph_client::SubgraphClient; use tap_core::receipt::{ diff --git a/tap-agent/src/tap/context/checks/signature.rs b/tap-agent/src/tap/context/checks/signature.rs index e4727ef6..0f7c946f 100644 --- a/tap-agent/src/tap/context/checks/signature.rs +++ b/tap-agent/src/tap/context/checks/signature.rs @@ -3,23 +3,21 @@ use alloy::{dyn_abi::Eip712Domain, primitives::U256}; use anyhow::anyhow; -use eventuals::Eventual; use indexer_common::escrow_accounts::EscrowAccounts; use tap_core::receipt::{ checks::{Check, CheckError, CheckResult}, state::Checking, ReceiptWithState, }; - -use crate::tap::context::error::AdapterError; +use tokio::sync::watch::Receiver; pub struct Signature { domain_separator: Eip712Domain, - escrow_accounts: Eventual, + escrow_accounts: Receiver, } impl Signature { - pub fn new(domain_separator: Eip712Domain, escrow_accounts: Eventual) -> Self { + pub fn new(domain_separator: Eip712Domain, escrow_accounts: Receiver) -> Self { Self { domain_separator, escrow_accounts, @@ -34,14 +32,7 @@ impl Check for Signature { .signed_receipt() .recover_signer(&self.domain_separator) .map_err(|e| CheckError::Failed(e.into()))?; - let escrow_accounts = self - .escrow_accounts - .value() - .await - .map_err(|e| AdapterError::ValidationError { - error: format!("Could not get escrow accounts from eventual: {:?}", e), - }) - .map_err(|e| CheckError::Retryable(e.into()))?; + let escrow_accounts = self.escrow_accounts.borrow(); let sender = escrow_accounts .get_sender_for_signer(&signer) diff --git a/tap-agent/src/tap/context/rav.rs b/tap-agent/src/tap/context/rav.rs index 593a0487..fd758f0c 100644 --- a/tap-agent/src/tap/context/rav.rs +++ b/tap-agent/src/tap/context/rav.rs @@ -130,6 +130,7 @@ impl RAVStore for TapAgentContext { mod test { use eventuals::Eventual; use sqlx::PgPool; + use tokio::sync::watch; use super::*; use crate::tap::{ @@ -157,7 +158,7 @@ mod test { pool.clone(), *ALLOCATION_ID_0, SENDER.1, - Eventual::new().1, + watch::channel(None).1, EscrowAdapter::mock(), ); diff --git a/tap-agent/src/tap/context/receipt.rs b/tap-agent/src/tap/context/receipt.rs index b530629a..77637fb7 100644 --- a/tap-agent/src/tap/context/receipt.rs +++ b/tap-agent/src/tap/context/receipt.rs @@ -81,7 +81,7 @@ impl ReceiptRead for TapAgentContext { timestamp_range_ns: R, receipts_limit: Option, ) -> Result>, Self::AdapterError> { - let signers = signers_trimmed(&self.escrow_accounts, self.sender) + let signers = signers_trimmed(self.escrow_accounts.clone(), self.sender) .await .map_err(|e| AdapterError::ReceiptRead { error: format!("{:?}.", e), @@ -168,7 +168,7 @@ impl ReceiptDelete for TapAgentContext { &self, timestamp_ns: R, ) -> Result<(), Self::AdapterError> { - let signers = signers_trimmed(&self.escrow_accounts, self.sender) + let signers = signers_trimmed(self.escrow_accounts.clone(), self.sender) .await .map_err(|e| AdapterError::ReceiptDelete { error: format!("{:?}.", e), @@ -202,10 +202,10 @@ mod test { }; use alloy::{primitives::U256, signers::local::PrivateKeySigner}; use anyhow::Result; - use eventuals::Eventual; use indexer_common::escrow_accounts::EscrowAccounts; use lazy_static::lazy_static; use sqlx::PgPool; + use tokio::sync::watch::{self, Receiver}; use std::collections::HashMap; lazy_static! { @@ -218,10 +218,10 @@ mod test { /// The point here it to test the deserialization of large numbers. #[sqlx::test(migrations = "../migrations")] async fn insert_and_retrieve_single_receipt(pgpool: PgPool) { - let escrow_accounts = Eventual::from_value(EscrowAccounts::new( + let escrow_accounts = watch::channel(Some(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(1000))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - )); + ))).1; let storage_adapter = TapAgentContext::new( pgpool, @@ -256,11 +256,11 @@ mod test { /// retrieve_receipts_in_timestamp_range. async fn retrieve_range_and_check + Send>( storage_adapter: &TapAgentContext, - escrow_accounts: &Eventual, + escrow_accounts: Receiver, received_receipt_vec: &[ReceiptWithState], range: R, ) -> Result<()> { - let escrow_accounts_snapshot = escrow_accounts.value().await.unwrap(); + let escrow_accounts_snapshot = self.escrow_accounts.borrow(); // Filtering the received receipts by timestamp range let received_receipt_vec: Vec> = received_receipt_vec @@ -306,11 +306,11 @@ mod test { async fn remove_range_and_check + Send>( storage_adapter: &TapAgentContext, - escrow_accounts: &Eventual, + escrow_accounts: Receiver, received_receipt_vec: &[ReceiptWithState], range: R, ) -> Result<()> { - let escrow_accounts_snapshot = escrow_accounts.value().await.unwrap(); + let escrow_accounts_snapshot = self.escrow_accounts.borrow(); // Storing the receipts let mut received_receipt_id_vec = Vec::new(); @@ -433,10 +433,10 @@ mod test { #[sqlx::test(migrations = "../migrations")] async fn retrieve_receipts_with_limit(pgpool: PgPool) { - let escrow_accounts = Eventual::from_value(EscrowAccounts::new( + let escrow_accounts = watch::channel(Some(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(1000))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - )); + ))).1; let storage_adapter = TapAgentContext::new( pgpool.clone(), @@ -501,10 +501,10 @@ mod test { #[sqlx::test(migrations = "../migrations")] async fn retrieve_receipts_in_timestamp_range(pgpool: PgPool) { - let escrow_accounts = Eventual::from_value(EscrowAccounts::new( + let escrow_accounts =watch::channel(Some(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(1000))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - )); + ))).1; let storage_adapter = TapAgentContext::new( pgpool.clone(), @@ -559,7 +559,7 @@ mod test { { $( assert!( - retrieve_range_and_check(&storage_adapter, &escrow_accounts, &received_receipt_vec, $arg) + retrieve_range_and_check(&storage_adapter, escrow_accounts.clone(), &received_receipt_vec, $arg) .await .is_ok()); )+ @@ -629,10 +629,10 @@ mod test { #[sqlx::test(migrations = "../migrations")] async fn remove_receipts_in_timestamp_range(pgpool: PgPool) { - let escrow_accounts = Eventual::from_value(EscrowAccounts::new( + let escrow_accounts =watch::channel(Some(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(1000))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - )); + ))).1; let storage_adapter = TapAgentContext::new( pgpool, @@ -675,7 +675,7 @@ mod test { { $( assert!( - remove_range_and_check(&storage_adapter, &escrow_accounts, &received_receipt_vec, $arg) + remove_range_and_check(&storage_adapter, escrow_accounts.clone(), &received_receipt_vec, $arg) .await.is_ok() ); ) + diff --git a/tap-agent/src/tap/escrow_adapter.rs b/tap-agent/src/tap/escrow_adapter.rs index 04ac2255..b528f91a 100644 --- a/tap-agent/src/tap/escrow_adapter.rs +++ b/tap-agent/src/tap/escrow_adapter.rs @@ -5,9 +5,9 @@ use std::sync::{Arc, RwLock}; use alloy::primitives::Address; use async_trait::async_trait; -use eventuals::Eventual; -use indexer_common::escrow_accounts::EscrowAccounts; +use indexer_common::escrow_accounts::{self, EscrowAccounts}; use tap_core::manager::adapters::EscrowHandler as EscrowAdapterTrait; +use tokio::sync::watch::Receiver; use super::context::AdapterError; @@ -21,13 +21,13 @@ use super::context::AdapterError; /// receipt checks only when we need to send a RAV request. #[derive(Clone)] pub struct EscrowAdapter { - escrow_accounts: Eventual, + escrow_accounts: Receiver, sender_id: Address, sender_pending_fees: Arc>, } impl EscrowAdapter { - pub fn new(escrow_accounts: Eventual, sender_id: Address) -> Self { + pub fn new(escrow_accounts: Receiver, sender_id: Address) -> Self { Self { escrow_accounts, sender_pending_fees: Arc::new(RwLock::new(0)), @@ -41,7 +41,7 @@ impl EscrowAdapterTrait for EscrowAdapter { type AdapterError = AdapterError; async fn get_available_escrow(&self, signer: Address) -> Result { - let escrow_accounts = self.escrow_accounts.value().await?; + let escrow_accounts = accounts_ref.borrow().unwrap(); let sender = escrow_accounts.get_sender_for_signer(&signer)?; @@ -57,7 +57,8 @@ impl EscrowAdapterTrait for EscrowAdapter { } async fn subtract_escrow(&self, signer: Address, value: u128) -> Result<(), AdapterError> { - let escrow_accounts = self.escrow_accounts.value().await?; + + let escrow_accounts = self.escrow_accounts.borrow(); let current_available_escrow = self.get_available_escrow(signer).await?; @@ -76,13 +77,7 @@ impl EscrowAdapterTrait for EscrowAdapter { } async fn verify_signer(&self, signer: Address) -> Result { - let escrow_account = - self.escrow_accounts - .value() - .await - .map_err(|_| AdapterError::ValidationError { - error: "Could not load escrow_accounts eventual".into(), - })?; + let escrow_accounts = self.escrow_accounts.borrow(); let sender = escrow_account.get_sender_for_signer(&signer).map_err(|_| { AdapterError::ValidationError { error: format!("Could not find the sender for the signer {}", signer), @@ -97,6 +92,7 @@ mod test { use std::{collections::HashMap, vec}; use alloy::primitives::U256; + use tokio::sync::watch; use crate::tap::test_utils::{SENDER, SIGNER}; @@ -104,10 +100,10 @@ mod test { impl super::EscrowAdapter { pub fn mock() -> Self { - let escrow_accounts = Eventual::from_value(EscrowAccounts::new( + let escrow_accounts = watch::channel(Some(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(1000))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - )); + ))).1; Self { escrow_accounts, sender_pending_fees: Arc::new(RwLock::new(0)), @@ -118,10 +114,10 @@ mod test { #[tokio::test] async fn test_subtract_escrow() { - let escrow_accounts = Eventual::from_value(EscrowAccounts::new( + let escrow_accounts = watch::channel(Some(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(1000))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - )); + ))).1; let sender_pending_fees = Arc::new(RwLock::new(500)); @@ -143,10 +139,10 @@ mod test { #[tokio::test] async fn test_subtract_escrow_overflow() { - let escrow_accounts = Eventual::from_value(EscrowAccounts::new( + let escrow_accounts = watch::channel(Some(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(1000))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - )); + ))).1; let sender_pending_fees = Arc::new(RwLock::new(500)); diff --git a/tap-agent/src/tap/mod.rs b/tap-agent/src/tap/mod.rs index 76356ca9..02c81fdd 100644 --- a/tap-agent/src/tap/mod.rs +++ b/tap-agent/src/tap/mod.rs @@ -3,9 +3,8 @@ use alloy::hex::ToHexExt; use alloy::primitives::Address; -use anyhow::anyhow; -use eventuals::Eventual; use indexer_common::escrow_accounts::EscrowAccounts; +use tokio::sync::watch::Receiver; pub mod context; pub mod escrow_adapter; @@ -14,17 +13,14 @@ pub mod escrow_adapter; pub mod test_utils; pub async fn signers_trimmed( - escrow_accounts: &Eventual, + escrow_accounts: Receiver , sender: Address, ) -> Result, anyhow::Error> { + let escrow_accounts = self.escrow_accounts.borrow(); let signers = escrow_accounts - .value() - .await - .map_err(|e| anyhow!("Error while getting escrow accounts: {:?}", e))? .get_signers_for_sender(&sender) .iter() .map(|s| s.encode_hex()) .collect::>(); - Ok(signers) } From 0cc6f22f6f83f1135640e37b99c6f94f6abd5f06 Mon Sep 17 00:00:00 2001 From: taslimmuhammed Date: Sat, 26 Oct 2024 10:01:39 +0530 Subject: [PATCH 2/9] all set temp --- common/src/escrow_accounts.rs | 2 +- tap-agent/src/agent/sender_account.rs | 24 ++++--- .../src/agent/sender_accounts_manager.rs | 66 +++++++++---------- tap-agent/src/agent/sender_allocation.rs | 4 +- .../src/tap/context/checks/allocation_id.rs | 1 + tap-agent/src/tap/context/rav.rs | 4 +- tap-agent/src/tap/context/receipt.rs | 20 +++--- tap-agent/src/tap/escrow_adapter.rs | 22 +++---- tap-agent/src/tap/mod.rs | 2 +- 9 files changed, 71 insertions(+), 74 deletions(-) diff --git a/common/src/escrow_accounts.rs b/common/src/escrow_accounts.rs index 516156fe..3f6be9af 100644 --- a/common/src/escrow_accounts.rs +++ b/common/src/escrow_accounts.rs @@ -249,7 +249,7 @@ mod tests { Duration::from_secs(60), true, ); - accounts.changed().await; + accounts.changed().await.unwrap(); assert_eq!( accounts.borrow().clone(), EscrowAccounts::new( diff --git a/tap-agent/src/agent/sender_account.rs b/tap-agent/src/agent/sender_account.rs index c38b8c24..b482b93f 100644 --- a/tap-agent/src/agent/sender_account.rs +++ b/tap-agent/src/agent/sender_account.rs @@ -416,9 +416,7 @@ impl Actor for SenderAccount { let mut accounts_clone = escrow_accounts.clone(); let _escrow_account_monitor = tokio::spawn(async move{ while accounts_clone.changed().await.is_ok(){ - // change let accounts_ref = escrow_accounts.borrow(); - // let escrow_account = accounts_ref.as_ref().unwrap(); - let escrow_account = accounts_clone.borrow().clone().unwrap(); + let escrow_account = accounts_clone.borrow().clone(); let myself = myself_clone.clone(); let pgpool = pgpool_clone.clone(); // Get balance or default value for sender @@ -1005,7 +1003,7 @@ pub mod tests { ActorRef, tokio::task::JoinHandle<()>, String, - Sender>, + Sender, ) { let config = Box::leak(Box::new(config::Config { config: None, @@ -1028,12 +1026,12 @@ pub mod tests { None, DeploymentDetails::for_query_url(escrow_subgraph_endpoint).unwrap(), ))); - let ( escrow_accounts_tx, escrow_accounts_rx) = watch::channel(None); + let ( escrow_accounts_tx, escrow_accounts_rx) = watch::channel(EscrowAccounts::default()); - escrow_accounts_tx.send(Some(EscrowAccounts::new( + escrow_accounts_tx.send(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(ESCROW_VALUE))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - ))); + )); let prefix = format!( "test-{}", @@ -1877,10 +1875,10 @@ pub mod tests { ) .await; // escrow_account updated - escrow_accounts_tx.send(Some(EscrowAccounts::new( + escrow_accounts_tx.send(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(1))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - ))); + )); // wait the actor react to the messages tokio::time::sleep(Duration::from_millis(10)).await; @@ -1916,10 +1914,10 @@ pub mod tests { assert!(!deny, "should start unblocked"); // update the escrow to a lower value - escrow_accounts_tx.send(Some(EscrowAccounts::new( + escrow_accounts_tx.send(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(ESCROW_VALUE / 2))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - ))); + )); tokio::time::sleep(Duration::from_millis(20)).await; @@ -1927,10 +1925,10 @@ pub mod tests { assert!(deny, "should block the sender"); // simulate deposit - escrow_accounts_tx.send(Some(EscrowAccounts::new( + escrow_accounts_tx.send(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(ESCROW_VALUE))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - ))); + )); tokio::time::sleep(Duration::from_millis(10)).await; diff --git a/tap-agent/src/agent/sender_accounts_manager.rs b/tap-agent/src/agent/sender_accounts_manager.rs index c091745c..33a4d084 100644 --- a/tap-agent/src/agent/sender_accounts_manager.rs +++ b/tap-agent/src/agent/sender_accounts_manager.rs @@ -13,6 +13,7 @@ use anyhow::Result; use anyhow::{anyhow, bail}; use indexer_common::escrow_accounts::EscrowAccounts; use indexer_common::prelude::{Allocation, SubgraphClient}; +use ractor::concurrency::JoinHandle; use ractor::{Actor, ActorCell, ActorProcessingErr, ActorRef, SupervisionEvent}; use serde::Deserialize; use sqlx::{postgres::PgListener, PgPool}; @@ -66,7 +67,7 @@ pub struct SenderAccountsManagerArgs { pub struct State { sender_ids: HashSet
, new_receipts_watcher_handle: Option>, - _eligible_allocations_senders_pipe: PipeHandle, + _eligible_allocations_senders_pipe: JoinHandle<()>, config: &'static config::Config, domain_separator: Eip712Domain, @@ -121,21 +122,19 @@ impl Actor for SenderAccountsManager { "should be able to subscribe to Postgres Notify events on the channel \ 'scalar_tap_receipt_notification'", ); - let clone = myself.clone(); - let _eligible_allocations_senders_pipe = - escrow_accounts.clone().pipe_async(move |escrow_accounts| { - let myself = clone.clone(); - - async move { - myself - .cast(SenderAccountsManagerMessage::UpdateSenderAccounts( - escrow_accounts.get_senders(), - )) - .unwrap_or_else(|e| { - error!("Error while updating sender_accounts: {:?}", e); - }); - } - }); + let myself_clone = myself.clone(); + let mut accounts_clone = escrow_accounts.clone(); + let _eligible_allocations_senders_pipe = tokio::spawn(async move{ + while accounts_clone.changed().await.is_ok(){ + myself_clone + .cast(SenderAccountsManagerMessage::UpdateSenderAccounts( + accounts_clone.borrow().get_senders(), + )) + .unwrap_or_else(|e| { + error!("Error while updating sender_accounts: {:?}", e); + }); + } + }); let mut state = State { config, @@ -340,7 +339,8 @@ impl State { } async fn get_pending_sender_allocation_id(&self) -> HashMap> { - let escrow_accounts_snapshot = self.escrow_accounts.borrow(); + //change cloning + let escrow_accounts_snapshot = self.escrow_accounts.borrow().clone(); // Gather all outstanding receipts and unfinalized RAVs from the database. // Used to create SenderAccount instances for all senders that have unfinalized allocations @@ -448,7 +448,8 @@ impl State { config: self.config, pgpool: self.pgpool.clone(), sender_id: *sender_id, - escrow_accounts: self.escrow_accounts.clone(), + // change self.escrow_accounts.clone() + escrow_accounts: watch::channel(EscrowAccounts::default()).1, indexer_allocations: self.indexer_allocations.clone(), escrow_subgraph: self.escrow_subgraph, domain_separator: self.domain_separator.clone(), @@ -471,7 +472,7 @@ impl State { /// corresponding SenderAccount. async fn new_receipts_watcher( mut pglistener: PgListener, - escrow_accounts: Receiver, + escrow_accounts_rx: Receiver, prefix: Option, ) { loop { @@ -487,7 +488,7 @@ async fn new_receipts_watcher( ); if let Err(e) = handle_notification( new_receipt_notification, - &escrow_accounts, + escrow_accounts_rx.clone(), prefix.as_deref(), ) .await @@ -499,7 +500,7 @@ async fn new_receipts_watcher( async fn handle_notification( new_receipt_notification: NewReceiptNotification, - escrow_accounts: Receiver, + escrow_accounts_rx: Receiver, prefix: Option<&str>, ) -> Result<()> { tracing::trace!( @@ -507,10 +508,8 @@ async fn handle_notification( "New receipt notification detected!" ); - let Ok(sender_address) = escrow_accounts - .value() - .await - .expect("should be able to get escrow accounts") + let Ok(sender_address) = escrow_accounts_rx + .borrow() .get_sender_for_signer(&new_receipt_notification.signer_address) else { // TODO: save the receipt in the failed receipts table? @@ -642,9 +641,9 @@ mod tests { let (_allocations_tx, allocations_rx) = watch::channel(HashMap::new()); let escrow_subgraph = get_subgraph_client(); - let (mut escrow_accounts_tx, escrow_accounts_rx) = + let (_, escrow_accounts_rx) = watch::channel(EscrowAccounts::default()); - // change escrow_accounts_tx.write(EscrowAccounts::default()); + //change escrow_accounts_tx.write(EscrowAccounts::default()); let prefix = format!( "test-{}", @@ -694,11 +693,10 @@ mod tests { domain_separator: TAP_EIP712_DOMAIN_SEPARATOR.clone(), sender_ids: HashSet::new(), new_receipts_watcher_handle: None, - _eligible_allocations_senders_pipe: Eventual::from_value(()) - .pipe_async(|_| async {}), + _eligible_allocations_senders_pipe: tokio::spawn(async move{}), pgpool, indexer_allocations: watch::channel(HashSet::new()).1, - escrow_accounts: Eventual::from_value(escrow_accounts), + escrow_accounts: watch::channel(escrow_accounts).1, escrow_subgraph: get_subgraph_client(), sender_aggregator_endpoints: HashMap::from([ (SENDER.1, String::from("http://localhost:8000")), @@ -891,10 +889,10 @@ mod tests { 'scalar_tap_receipt_notification'", ); - let escrow_accounts_rx = Eventual::from_value(EscrowAccounts::new( + let escrow_accounts_rx = watch::channel(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(1000))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - )); + )).1; // Start the new_receipts_watcher task that will consume from the `pglistener` let new_receipts_watcher_handle = tokio::spawn(new_receipts_watcher( @@ -927,7 +925,7 @@ mod tests { async fn test_create_allocation_id() { let senders_to_signers = vec![(SENDER.1, vec![SIGNER.1])].into_iter().collect(); let escrow_accounts = EscrowAccounts::new(HashMap::new(), senders_to_signers); - let escrow_accounts = Eventual::from_value(escrow_accounts); + let escrow_accounts = watch::channel(escrow_accounts).1; let prefix = format!( "test-{}", @@ -954,7 +952,7 @@ mod tests { value: 1, }; - handle_notification(new_receipt_notification, &escrow_accounts, Some(&prefix)) + handle_notification(new_receipt_notification, escrow_accounts, Some(&prefix)) .await .unwrap(); diff --git a/tap-agent/src/agent/sender_allocation.rs b/tap-agent/src/agent/sender_allocation.rs index 915bec64..a64fe782 100644 --- a/tap-agent/src/agent/sender_allocation.rs +++ b/tap-agent/src/agent/sender_allocation.rs @@ -941,10 +941,10 @@ pub mod tests { DeploymentDetails::for_query_url(escrow_subgraph_endpoint).unwrap(), ))); - let escrow_accounts_rx = watch::channel(Some(EscrowAccounts::new( + let escrow_accounts_rx = watch::channel(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(1000))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - ))).1; + )).1; let escrow_adapter = EscrowAdapter::new(escrow_accounts_rx.clone(), SENDER.1); diff --git a/tap-agent/src/tap/context/checks/allocation_id.rs b/tap-agent/src/tap/context/checks/allocation_id.rs index 83db75a0..62d60859 100644 --- a/tap-agent/src/tap/context/checks/allocation_id.rs +++ b/tap-agent/src/tap/context/checks/allocation_id.rs @@ -5,6 +5,7 @@ use std::time::Duration; use alloy::primitives::Address; use anyhow::anyhow; +use eventuals::{Eventual, EventualExt}; use graphql_client::GraphQLQuery; use indexer_common::subgraph_client::SubgraphClient; use tap_core::receipt::{ diff --git a/tap-agent/src/tap/context/rav.rs b/tap-agent/src/tap/context/rav.rs index fd758f0c..0fd1531f 100644 --- a/tap-agent/src/tap/context/rav.rs +++ b/tap-agent/src/tap/context/rav.rs @@ -128,7 +128,7 @@ impl RAVStore for TapAgentContext { #[cfg(test)] mod test { - use eventuals::Eventual; + use indexer_common::escrow_accounts::EscrowAccounts; use sqlx::PgPool; use tokio::sync::watch; @@ -158,7 +158,7 @@ mod test { pool.clone(), *ALLOCATION_ID_0, SENDER.1, - watch::channel(None).1, + watch::channel(EscrowAccounts::default()).1, EscrowAdapter::mock(), ); diff --git a/tap-agent/src/tap/context/receipt.rs b/tap-agent/src/tap/context/receipt.rs index 77637fb7..f375dcf6 100644 --- a/tap-agent/src/tap/context/receipt.rs +++ b/tap-agent/src/tap/context/receipt.rs @@ -218,10 +218,10 @@ mod test { /// The point here it to test the deserialization of large numbers. #[sqlx::test(migrations = "../migrations")] async fn insert_and_retrieve_single_receipt(pgpool: PgPool) { - let escrow_accounts = watch::channel(Some(EscrowAccounts::new( + let escrow_accounts = watch::channel(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(1000))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - ))).1; + )).1; let storage_adapter = TapAgentContext::new( pgpool, @@ -260,7 +260,7 @@ mod test { received_receipt_vec: &[ReceiptWithState], range: R, ) -> Result<()> { - let escrow_accounts_snapshot = self.escrow_accounts.borrow(); + let escrow_accounts_snapshot = escrow_accounts.borrow(); // Filtering the received receipts by timestamp range let received_receipt_vec: Vec> = received_receipt_vec @@ -310,7 +310,7 @@ mod test { received_receipt_vec: &[ReceiptWithState], range: R, ) -> Result<()> { - let escrow_accounts_snapshot = self.escrow_accounts.borrow(); + let escrow_accounts_snapshot = escrow_accounts.borrow(); // Storing the receipts let mut received_receipt_id_vec = Vec::new(); @@ -433,10 +433,10 @@ mod test { #[sqlx::test(migrations = "../migrations")] async fn retrieve_receipts_with_limit(pgpool: PgPool) { - let escrow_accounts = watch::channel(Some(EscrowAccounts::new( + let escrow_accounts = watch::channel(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(1000))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - ))).1; + )).1; let storage_adapter = TapAgentContext::new( pgpool.clone(), @@ -501,10 +501,10 @@ mod test { #[sqlx::test(migrations = "../migrations")] async fn retrieve_receipts_in_timestamp_range(pgpool: PgPool) { - let escrow_accounts =watch::channel(Some(EscrowAccounts::new( + let escrow_accounts =watch::channel(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(1000))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - ))).1; + )).1; let storage_adapter = TapAgentContext::new( pgpool.clone(), @@ -629,10 +629,10 @@ mod test { #[sqlx::test(migrations = "../migrations")] async fn remove_receipts_in_timestamp_range(pgpool: PgPool) { - let escrow_accounts =watch::channel(Some(EscrowAccounts::new( + let escrow_accounts =watch::channel(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(1000))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - ))).1; + )).1; let storage_adapter = TapAgentContext::new( pgpool, diff --git a/tap-agent/src/tap/escrow_adapter.rs b/tap-agent/src/tap/escrow_adapter.rs index b528f91a..2272d5ea 100644 --- a/tap-agent/src/tap/escrow_adapter.rs +++ b/tap-agent/src/tap/escrow_adapter.rs @@ -5,7 +5,7 @@ use std::sync::{Arc, RwLock}; use alloy::primitives::Address; use async_trait::async_trait; -use indexer_common::escrow_accounts::{self, EscrowAccounts}; +use indexer_common::escrow_accounts::EscrowAccounts; use tap_core::manager::adapters::EscrowHandler as EscrowAdapterTrait; use tokio::sync::watch::Receiver; @@ -41,7 +41,7 @@ impl EscrowAdapterTrait for EscrowAdapter { type AdapterError = AdapterError; async fn get_available_escrow(&self, signer: Address) -> Result { - let escrow_accounts = accounts_ref.borrow().unwrap(); + let escrow_accounts = self.escrow_accounts.borrow(); let sender = escrow_accounts.get_sender_for_signer(&signer)?; @@ -57,8 +57,8 @@ impl EscrowAdapterTrait for EscrowAdapter { } async fn subtract_escrow(&self, signer: Address, value: u128) -> Result<(), AdapterError> { - - let escrow_accounts = self.escrow_accounts.borrow(); + // change cloning seems essential + let escrow_accounts = self.escrow_accounts.borrow().clone(); let current_available_escrow = self.get_available_escrow(signer).await?; @@ -78,7 +78,7 @@ impl EscrowAdapterTrait for EscrowAdapter { async fn verify_signer(&self, signer: Address) -> Result { let escrow_accounts = self.escrow_accounts.borrow(); - let sender = escrow_account.get_sender_for_signer(&signer).map_err(|_| { + let sender = escrow_accounts.get_sender_for_signer(&signer).map_err(|_| { AdapterError::ValidationError { error: format!("Could not find the sender for the signer {}", signer), } @@ -100,10 +100,10 @@ mod test { impl super::EscrowAdapter { pub fn mock() -> Self { - let escrow_accounts = watch::channel(Some(EscrowAccounts::new( + let escrow_accounts = watch::channel(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(1000))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - ))).1; + )).1; Self { escrow_accounts, sender_pending_fees: Arc::new(RwLock::new(0)), @@ -114,10 +114,10 @@ mod test { #[tokio::test] async fn test_subtract_escrow() { - let escrow_accounts = watch::channel(Some(EscrowAccounts::new( + let escrow_accounts = watch::channel(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(1000))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - ))).1; + )).1; let sender_pending_fees = Arc::new(RwLock::new(500)); @@ -139,10 +139,10 @@ mod test { #[tokio::test] async fn test_subtract_escrow_overflow() { - let escrow_accounts = watch::channel(Some(EscrowAccounts::new( + let escrow_accounts = watch::channel(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(1000))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - ))).1; + )).1; let sender_pending_fees = Arc::new(RwLock::new(500)); diff --git a/tap-agent/src/tap/mod.rs b/tap-agent/src/tap/mod.rs index 02c81fdd..4afd998c 100644 --- a/tap-agent/src/tap/mod.rs +++ b/tap-agent/src/tap/mod.rs @@ -16,7 +16,7 @@ pub async fn signers_trimmed( escrow_accounts: Receiver , sender: Address, ) -> Result, anyhow::Error> { - let escrow_accounts = self.escrow_accounts.borrow(); + let escrow_accounts = escrow_accounts.borrow(); let signers = escrow_accounts .get_signers_for_sender(&sender) .iter() From cfc032d81ee12c904b4c48269c4a2d04ef3d2203 Mon Sep 17 00:00:00 2001 From: taslimmuhammed Date: Sat, 26 Oct 2024 11:44:40 +0530 Subject: [PATCH 3/9] migrate_2_watch escrow_accounts --- common/src/escrow_accounts.rs | 17 ++++--- .../indexer_service/http/indexer_service.rs | 9 ++-- .../indexer_service/http/request_handler.rs | 2 +- common/src/subgraph_client/client.rs | 2 +- common/src/tap/checks/deny_list_check.rs | 7 +-- common/src/tap/checks/sender_balance_check.rs | 1 - tap-agent/src/agent/sender_account.rs | 46 +++++++++++-------- .../src/agent/sender_accounts_manager.rs | 24 +++++----- tap-agent/src/agent/sender_allocation.rs | 3 +- tap-agent/src/tap/context.rs | 1 - tap-agent/src/tap/context/receipt.rs | 18 +++++--- tap-agent/src/tap/escrow_adapter.rs | 27 ++++++----- tap-agent/src/tap/mod.rs | 2 +- 13 files changed, 87 insertions(+), 72 deletions(-) diff --git a/common/src/escrow_accounts.rs b/common/src/escrow_accounts.rs index 3f6be9af..6a5485dd 100644 --- a/common/src/escrow_accounts.rs +++ b/common/src/escrow_accounts.rs @@ -11,7 +11,10 @@ use alloy::primitives::{Address, U256}; use anyhow::{anyhow, Result}; use graphql_client::GraphQLQuery; use thiserror::Error; -use tokio::{sync::watch::{self, Receiver}, time::{self, sleep}}; +use tokio::{ + sync::watch::{self, Receiver}, + time::{self, sleep}, +}; use tracing::{error, warn}; use crate::prelude::SubgraphClient; @@ -110,12 +113,12 @@ pub fn escrow_accounts( time_interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip); loop { time_interval.tick().await; - let result = get_escrow_accounts(escrow_subgraph, indexer_address, reject_thawing_signers) - .await; - match result{ + let result = + get_escrow_accounts(escrow_subgraph, indexer_address, reject_thawing_signers).await; + match result { Ok(accounts) => tx - .send(accounts) - .expect("Failed to update escrow_accounts channel"), + .send(accounts) + .expect("Failed to update escrow_accounts channel"), Err(err) => { error!( "Failed to fetch escrow accounts for indexer {:?}: {}", @@ -123,7 +126,7 @@ pub fn escrow_accounts( ); // Sleep for a bit before we retry sleep(interval.div_f32(2.0)).await; - }, + } } } }); diff --git a/common/src/indexer_service/http/indexer_service.rs b/common/src/indexer_service/http/indexer_service.rs index bfffd57c..7484d9e6 100644 --- a/common/src/indexer_service/http/indexer_service.rs +++ b/common/src/indexer_service/http/indexer_service.rs @@ -82,8 +82,8 @@ where { #[error("Issues with provided receipt: {0}")] ReceiptError(tap_core::Error), - #[error("Service is not ready yet, try again in a moment")] - ServiceNotReady, + // #[error("Service is not ready yet, try again in a moment")] + // ServiceNotReady, #[error("No attestation signer found for allocation `{0}`")] NoSignerForAllocation(Address), #[error("Invalid request body: {0}")] @@ -119,8 +119,7 @@ where } let status = match self { - ServiceNotReady => StatusCode::SERVICE_UNAVAILABLE, - + //ServiceNotReady => StatusCode::SERVICE_UNAVAILABLE, Unauthorized => StatusCode::UNAUTHORIZED, NoSignerForAllocation(_) | FailedToSignAttestation => StatusCode::INTERNAL_SERVER_ERROR, @@ -512,4 +511,4 @@ pub async fn shutdown_signal() { } info!("Signal received, starting graceful shutdown"); -} \ No newline at end of file +} diff --git a/common/src/indexer_service/http/request_handler.rs b/common/src/indexer_service/http/request_handler.rs index 2bb8ed81..9ee3db5e 100644 --- a/common/src/indexer_service/http/request_handler.rs +++ b/common/src/indexer_service/http/request_handler.rs @@ -56,7 +56,7 @@ pub async fn request_handler( where I: IndexerServiceImpl + Sync + Send + 'static, { - _request_handler(manifest_id, typed_header, state, headers, body, ).await + _request_handler(manifest_id, typed_header, state, headers, body).await } async fn _request_handler( diff --git a/common/src/subgraph_client/client.rs b/common/src/subgraph_client/client.rs index a083a204..8dc52c3e 100644 --- a/common/src/subgraph_client/client.rs +++ b/common/src/subgraph_client/client.rs @@ -607,4 +607,4 @@ mod test { assert_eq!(data.user.name, "remote".to_string()); } -} \ No newline at end of file +} diff --git a/common/src/tap/checks/deny_list_check.rs b/common/src/tap/checks/deny_list_check.rs index a0e1f0f8..cc094c0e 100644 --- a/common/src/tap/checks/deny_list_check.rs +++ b/common/src/tap/checks/deny_list_check.rs @@ -6,7 +6,6 @@ use alloy::dyn_abi::Eip712Domain; use alloy::primitives::Address; use sqlx::postgres::PgListener; use sqlx::PgPool; -use tokio::sync::watch::Receiver; use std::collections::HashSet; use std::sync::RwLock; use std::{str::FromStr, sync::Arc}; @@ -16,6 +15,7 @@ use tap_core::receipt::{ state::Checking, ReceiptWithState, }; +use tokio::sync::watch::Receiver; use tracing::error; pub struct DenyListCheck { @@ -160,7 +160,7 @@ impl Check for DenyListCheck { }) .map_err(CheckError::Failed)?; let escrow_accounts_snapshot = self.escrow_accounts.borrow(); - + let receipt_sender = escrow_accounts_snapshot .get_sender_for_signer(&receipt_signer) .map_err(|e| CheckError::Failed(e.into()))?; @@ -209,7 +209,8 @@ mod tests { let escrow_accounts_rx = watch::channel(EscrowAccounts::new( test_vectors::ESCROW_ACCOUNTS_BALANCES.to_owned(), test_vectors::ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS.to_owned(), - )).1; + )) + .1; DenyListCheck::new( pgpool, diff --git a/common/src/tap/checks/sender_balance_check.rs b/common/src/tap/checks/sender_balance_check.rs index 327c4ee1..f7480a56 100644 --- a/common/src/tap/checks/sender_balance_check.rs +++ b/common/src/tap/checks/sender_balance_check.rs @@ -1,7 +1,6 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 - use crate::escrow_accounts::EscrowAccounts; use alloy::dyn_abi::Eip712Domain; use alloy::primitives::U256; diff --git a/tap-agent/src/agent/sender_account.rs b/tap-agent/src/agent/sender_account.rs index 63799bfd..dcd75201 100644 --- a/tap-agent/src/agent/sender_account.rs +++ b/tap-agent/src/agent/sender_account.rs @@ -441,8 +441,8 @@ impl Actor for SenderAccount { let myself_clone = myself.clone(); let pgpool_clone = pgpool.clone(); let mut accounts_clone = escrow_accounts.clone(); - let _escrow_account_monitor = tokio::spawn(async move{ - while accounts_clone.changed().await.is_ok(){ + let _escrow_account_monitor = tokio::spawn(async move { + while accounts_clone.changed().await.is_ok() { let escrow_account = accounts_clone.borrow().clone(); let myself = myself_clone.clone(); let pgpool = pgpool_clone.clone(); @@ -1045,12 +1045,14 @@ pub mod tests { None, DeploymentDetails::for_query_url(escrow_subgraph_endpoint).unwrap(), ))); - let ( escrow_accounts_tx, escrow_accounts_rx) = watch::channel(EscrowAccounts::default()); + let (escrow_accounts_tx, escrow_accounts_rx) = watch::channel(EscrowAccounts::default()); - escrow_accounts_tx.send(EscrowAccounts::new( - HashMap::from([(SENDER.1, U256::from(ESCROW_VALUE))]), - HashMap::from([(SENDER.1, vec![SIGNER.1])]), - )).expect("Failed to update escrow_accounts channel"); + escrow_accounts_tx + .send(EscrowAccounts::new( + HashMap::from([(SENDER.1, U256::from(ESCROW_VALUE))]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )) + .expect("Failed to update escrow_accounts channel"); let prefix = format!( "test-{}", @@ -1894,10 +1896,12 @@ pub mod tests { ) .await; // escrow_account updated - escrow_accounts_tx.send(EscrowAccounts::new( - HashMap::from([(SENDER.1, U256::from(1))]), - HashMap::from([(SENDER.1, vec![SIGNER.1])]), - )).unwrap(); + escrow_accounts_tx + .send(EscrowAccounts::new( + HashMap::from([(SENDER.1, U256::from(1))]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )) + .unwrap(); // wait the actor react to the messages tokio::time::sleep(Duration::from_millis(10)).await; @@ -1933,10 +1937,12 @@ pub mod tests { assert!(!deny, "should start unblocked"); // update the escrow to a lower value - escrow_accounts_tx.send(EscrowAccounts::new( - HashMap::from([(SENDER.1, U256::from(ESCROW_VALUE / 2))]), - HashMap::from([(SENDER.1, vec![SIGNER.1])]), - )).unwrap(); + escrow_accounts_tx + .send(EscrowAccounts::new( + HashMap::from([(SENDER.1, U256::from(ESCROW_VALUE / 2))]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )) + .unwrap(); tokio::time::sleep(Duration::from_millis(20)).await; @@ -1944,10 +1950,12 @@ pub mod tests { assert!(deny, "should block the sender"); // simulate deposit - escrow_accounts_tx.send(EscrowAccounts::new( - HashMap::from([(SENDER.1, U256::from(ESCROW_VALUE))]), - HashMap::from([(SENDER.1, vec![SIGNER.1])]), - )).unwrap(); + escrow_accounts_tx + .send(EscrowAccounts::new( + HashMap::from([(SENDER.1, U256::from(ESCROW_VALUE))]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )) + .unwrap(); tokio::time::sleep(Duration::from_millis(10)).await; diff --git a/tap-agent/src/agent/sender_accounts_manager.rs b/tap-agent/src/agent/sender_accounts_manager.rs index 73d0a557..a0d44b90 100644 --- a/tap-agent/src/agent/sender_accounts_manager.rs +++ b/tap-agent/src/agent/sender_accounts_manager.rs @@ -126,8 +126,8 @@ impl Actor for SenderAccountsManager { ); let myself_clone = myself.clone(); let mut accounts_clone = escrow_accounts.clone(); - let _eligible_allocations_senders_pipe = tokio::spawn(async move{ - while accounts_clone.changed().await.is_ok(){ + let _eligible_allocations_senders_pipe = tokio::spawn(async move { + while accounts_clone.changed().await.is_ok() { myself_clone .cast(SenderAccountsManagerMessage::UpdateSenderAccounts( accounts_clone.borrow().get_senders(), @@ -341,9 +341,6 @@ impl State { } async fn get_pending_sender_allocation_id(&self) -> HashMap> { - //change cloning - let escrow_accounts_snapshot = self.escrow_accounts.borrow().clone(); - // Gather all outstanding receipts and unfinalized RAVs from the database. // Used to create SenderAccount instances for all senders that have unfinalized allocations // and try to finalize them if they have become ineligible. @@ -389,7 +386,9 @@ impl State { .collect::>(); let signer_id = Address::from_str(&row.signer_address) .expect("signer_address should be a valid address"); - let sender_id = escrow_accounts_snapshot + let sender_id = self + .escrow_accounts + .borrow() .get_sender_for_signer(&signer_id) .expect("should be able to get sender from signer"); @@ -450,8 +449,7 @@ impl State { config: self.config, pgpool: self.pgpool.clone(), sender_id: *sender_id, - // change self.escrow_accounts.clone() - escrow_accounts: watch::channel(EscrowAccounts::default()).1, + escrow_accounts: self.escrow_accounts.clone(), indexer_allocations: self.indexer_allocations.clone(), escrow_subgraph: self.escrow_subgraph, domain_separator: self.domain_separator.clone(), @@ -640,8 +638,7 @@ mod tests { let (_allocations_tx, allocations_rx) = watch::channel(HashMap::new()); let escrow_subgraph = get_subgraph_client(); - let (_, escrow_accounts_rx) = - watch::channel(EscrowAccounts::default()); + let (_, escrow_accounts_rx) = watch::channel(EscrowAccounts::default()); //change escrow_accounts_tx.write(EscrowAccounts::default()); let prefix = format!( @@ -692,7 +689,7 @@ mod tests { domain_separator: TAP_EIP712_DOMAIN_SEPARATOR.clone(), sender_ids: HashSet::new(), new_receipts_watcher_handle: None, - _eligible_allocations_senders_pipe: tokio::spawn(async move{}), + _eligible_allocations_senders_pipe: tokio::spawn(async move {}), pgpool, indexer_allocations: watch::channel(HashSet::new()).1, escrow_accounts: watch::channel(escrow_accounts).1, @@ -891,7 +888,8 @@ mod tests { let escrow_accounts_rx = watch::channel(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(1000))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - )).1; + )) + .1; // Start the new_receipts_watcher task that will consume from the `pglistener` let new_receipts_watcher_handle = tokio::spawn(new_receipts_watcher( @@ -964,4 +962,4 @@ mod tests { sender_account.stop_and_wait(None, None).await.unwrap(); join_handle.await.unwrap(); } -} \ No newline at end of file +} diff --git a/tap-agent/src/agent/sender_allocation.rs b/tap-agent/src/agent/sender_allocation.rs index 7d2a5228..b9c55249 100644 --- a/tap-agent/src/agent/sender_allocation.rs +++ b/tap-agent/src/agent/sender_allocation.rs @@ -956,7 +956,8 @@ pub mod tests { let escrow_accounts_rx = watch::channel(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(1000))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - )).1; + )) + .1; let escrow_adapter = EscrowAdapter::new(escrow_accounts_rx.clone(), SENDER.1); diff --git a/tap-agent/src/tap/context.rs b/tap-agent/src/tap/context.rs index 485766b2..3a936b78 100644 --- a/tap-agent/src/tap/context.rs +++ b/tap-agent/src/tap/context.rs @@ -1,7 +1,6 @@ // Copyright 2023-, Edge & Node, GraphOps, and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 - use alloy::primitives::Address; use indexer_common::escrow_accounts::EscrowAccounts; use sqlx::PgPool; diff --git a/tap-agent/src/tap/context/receipt.rs b/tap-agent/src/tap/context/receipt.rs index f375dcf6..07f521db 100644 --- a/tap-agent/src/tap/context/receipt.rs +++ b/tap-agent/src/tap/context/receipt.rs @@ -205,8 +205,8 @@ mod test { use indexer_common::escrow_accounts::EscrowAccounts; use lazy_static::lazy_static; use sqlx::PgPool; - use tokio::sync::watch::{self, Receiver}; use std::collections::HashMap; + use tokio::sync::watch::{self, Receiver}; lazy_static! { pub static ref SENDER_IRRELEVANT: (PrivateKeySigner, Address) = wallet(1); @@ -221,7 +221,8 @@ mod test { let escrow_accounts = watch::channel(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(1000))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - )).1; + )) + .1; let storage_adapter = TapAgentContext::new( pgpool, @@ -436,7 +437,8 @@ mod test { let escrow_accounts = watch::channel(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(1000))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - )).1; + )) + .1; let storage_adapter = TapAgentContext::new( pgpool.clone(), @@ -501,10 +503,11 @@ mod test { #[sqlx::test(migrations = "../migrations")] async fn retrieve_receipts_in_timestamp_range(pgpool: PgPool) { - let escrow_accounts =watch::channel(EscrowAccounts::new( + let escrow_accounts = watch::channel(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(1000))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - )).1; + )) + .1; let storage_adapter = TapAgentContext::new( pgpool.clone(), @@ -629,10 +632,11 @@ mod test { #[sqlx::test(migrations = "../migrations")] async fn remove_receipts_in_timestamp_range(pgpool: PgPool) { - let escrow_accounts =watch::channel(EscrowAccounts::new( + let escrow_accounts = watch::channel(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(1000))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - )).1; + )) + .1; let storage_adapter = TapAgentContext::new( pgpool, diff --git a/tap-agent/src/tap/escrow_adapter.rs b/tap-agent/src/tap/escrow_adapter.rs index 2272d5ea..10db98c5 100644 --- a/tap-agent/src/tap/escrow_adapter.rs +++ b/tap-agent/src/tap/escrow_adapter.rs @@ -57,12 +57,12 @@ impl EscrowAdapterTrait for EscrowAdapter { } async fn subtract_escrow(&self, signer: Address, value: u128) -> Result<(), AdapterError> { - // change cloning seems essential - let escrow_accounts = self.escrow_accounts.borrow().clone(); - let current_available_escrow = self.get_available_escrow(signer).await?; - let sender = escrow_accounts.get_sender_for_signer(&signer)?; + let sender = self + .escrow_accounts + .borrow() + .get_sender_for_signer(&signer)?; let mut fees = self.sender_pending_fees.write().unwrap(); if current_available_escrow < value { @@ -78,11 +78,11 @@ impl EscrowAdapterTrait for EscrowAdapter { async fn verify_signer(&self, signer: Address) -> Result { let escrow_accounts = self.escrow_accounts.borrow(); - let sender = escrow_accounts.get_sender_for_signer(&signer).map_err(|_| { - AdapterError::ValidationError { + let sender = escrow_accounts + .get_sender_for_signer(&signer) + .map_err(|_| AdapterError::ValidationError { error: format!("Could not find the sender for the signer {}", signer), - } - })?; + })?; Ok(sender == self.sender_id) } } @@ -103,7 +103,8 @@ mod test { let escrow_accounts = watch::channel(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(1000))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - )).1; + )) + .1; Self { escrow_accounts, sender_pending_fees: Arc::new(RwLock::new(0)), @@ -117,7 +118,8 @@ mod test { let escrow_accounts = watch::channel(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(1000))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - )).1; + )) + .1; let sender_pending_fees = Arc::new(RwLock::new(500)); @@ -139,10 +141,11 @@ mod test { #[tokio::test] async fn test_subtract_escrow_overflow() { - let escrow_accounts = watch::channel(EscrowAccounts::new( + let escrow_accounts = watch::channel(EscrowAccounts::new( HashMap::from([(SENDER.1, U256::from(1000))]), HashMap::from([(SENDER.1, vec![SIGNER.1])]), - )).1; + )) + .1; let sender_pending_fees = Arc::new(RwLock::new(500)); diff --git a/tap-agent/src/tap/mod.rs b/tap-agent/src/tap/mod.rs index 4afd998c..ad4c4425 100644 --- a/tap-agent/src/tap/mod.rs +++ b/tap-agent/src/tap/mod.rs @@ -13,7 +13,7 @@ pub mod escrow_adapter; pub mod test_utils; pub async fn signers_trimmed( - escrow_accounts: Receiver , + escrow_accounts: Receiver, sender: Address, ) -> Result, anyhow::Error> { let escrow_accounts = escrow_accounts.borrow(); From 703c4dece502aa959d309a9e71dc96774b64f2d5 Mon Sep 17 00:00:00 2001 From: taslimmuhammed Date: Sat, 26 Oct 2024 17:05:07 +0530 Subject: [PATCH 4/9] corrections --- common/src/indexer_service/http/request_handler.rs | 10 ++++++++-- tap-agent/src/agent/sender_accounts_manager.rs | 9 ++++----- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/common/src/indexer_service/http/request_handler.rs b/common/src/indexer_service/http/request_handler.rs index 9ee3db5e..50913728 100644 --- a/common/src/indexer_service/http/request_handler.rs +++ b/common/src/indexer_service/http/request_handler.rs @@ -56,7 +56,13 @@ pub async fn request_handler( where I: IndexerServiceImpl + Sync + Send + 'static, { - _request_handler(manifest_id, typed_header, state, headers, body).await + _request_handler(manifest_id, typed_header, state, headers, body) + .await + .inspect_err(|_| { + HANDLER_FAILURE + .with_label_values(&[&manifest_id.to_string()]) + .inc() + }) } async fn _request_handler( @@ -105,7 +111,7 @@ where let allocation_id = receipt.message.allocation_id; // recover the signer address - // get escrow accounts from reciever + // get escrow accounts from channel // return sender from signer // // TODO: We are currently doing this process twice. diff --git a/tap-agent/src/agent/sender_accounts_manager.rs b/tap-agent/src/agent/sender_accounts_manager.rs index a0d44b90..ce332eb8 100644 --- a/tap-agent/src/agent/sender_accounts_manager.rs +++ b/tap-agent/src/agent/sender_accounts_manager.rs @@ -69,7 +69,7 @@ pub struct SenderAccountsManagerArgs { pub struct State { sender_ids: HashSet
, new_receipts_watcher_handle: Option>, - _eligible_allocations_senders_pipe: JoinHandle<()>, + _eligible_allocations_senders_handle: JoinHandle<()>, config: &'static SenderAccountConfig, domain_separator: Eip712Domain, @@ -126,7 +126,7 @@ impl Actor for SenderAccountsManager { ); let myself_clone = myself.clone(); let mut accounts_clone = escrow_accounts.clone(); - let _eligible_allocations_senders_pipe = tokio::spawn(async move { + let _eligible_allocations_senders_handle = tokio::spawn(async move { while accounts_clone.changed().await.is_ok() { myself_clone .cast(SenderAccountsManagerMessage::UpdateSenderAccounts( @@ -143,7 +143,7 @@ impl Actor for SenderAccountsManager { domain_separator, sender_ids: HashSet::new(), new_receipts_watcher_handle: None, - _eligible_allocations_senders_pipe, + _eligible_allocations_senders_handle, pgpool, indexer_allocations: allocations_rx, escrow_accounts: escrow_accounts.clone(), @@ -639,7 +639,6 @@ mod tests { let escrow_subgraph = get_subgraph_client(); let (_, escrow_accounts_rx) = watch::channel(EscrowAccounts::default()); - //change escrow_accounts_tx.write(EscrowAccounts::default()); let prefix = format!( "test-{}", @@ -689,7 +688,7 @@ mod tests { domain_separator: TAP_EIP712_DOMAIN_SEPARATOR.clone(), sender_ids: HashSet::new(), new_receipts_watcher_handle: None, - _eligible_allocations_senders_pipe: tokio::spawn(async move {}), + _eligible_allocations_senders_handle: tokio::spawn(async move {}), pgpool, indexer_allocations: watch::channel(HashSet::new()).1, escrow_accounts: watch::channel(escrow_accounts).1, From 99740aab56ffa9f241136e7781625ae4e3d1e5ea Mon Sep 17 00:00:00 2001 From: taslimmuhammed Date: Sun, 27 Oct 2024 06:22:24 +0530 Subject: [PATCH 5/9] initial accounts --- common/src/escrow_accounts.rs | 11 ++++++++--- common/src/indexer_service/http/indexer_service.rs | 3 ++- tap-agent/src/agent.rs | 3 ++- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/common/src/escrow_accounts.rs b/common/src/escrow_accounts.rs index 6a5485dd..fcfdaaa8 100644 --- a/common/src/escrow_accounts.rs +++ b/common/src/escrow_accounts.rs @@ -101,13 +101,17 @@ type BigInt = String; )] pub struct EscrowAccountQuery; -pub fn escrow_accounts( +pub async fn escrow_accounts( escrow_subgraph: &'static SubgraphClient, indexer_address: Address, interval: Duration, reject_thawing_signers: bool, ) -> Receiver { - let (tx, rx) = watch::channel(EscrowAccounts::default()); + let initial_accounts = + get_escrow_accounts(escrow_subgraph, indexer_address, reject_thawing_signers) + .await + .expect("Failed to create escrow_accounts channel"); + let (tx, rx) = watch::channel(initial_accounts); tokio::spawn(async move { let mut time_interval = time::interval(interval); time_interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip); @@ -251,7 +255,8 @@ mod tests { *test_vectors::INDEXER_ADDRESS, Duration::from_secs(60), true, - ); + ) + .await; accounts.changed().await.unwrap(); assert_eq!( accounts.borrow().clone(), diff --git a/common/src/indexer_service/http/indexer_service.rs b/common/src/indexer_service/http/indexer_service.rs index 7484d9e6..c8a6b908 100644 --- a/common/src/indexer_service/http/indexer_service.rs +++ b/common/src/indexer_service/http/indexer_service.rs @@ -276,7 +276,8 @@ impl IndexerService { options.config.indexer.indexer_address, Duration::from_secs(options.config.escrow_subgraph.syncing_interval), true, // Reject thawing signers eagerly - ); + ) + .await; // Establish Database connection necessary for serving indexer management // requests with defined schema diff --git a/tap-agent/src/agent.rs b/tap-agent/src/agent.rs index 038cf5f5..763428d9 100644 --- a/tap-agent/src/agent.rs +++ b/tap-agent/src/agent.rs @@ -118,7 +118,8 @@ pub async fn start_agent() -> (ActorRef, JoinHandl *indexer_address, *escrow_sync_interval, false, - ); + ) + .await; let config = Box::leak(Box::new(SenderAccountConfig::from_config(&CONFIG))); From 75b7cd1ee2b1b3a11af19a9a98bc12055734065b Mon Sep 17 00:00:00 2001 From: taslimmuhammed Date: Thu, 31 Oct 2024 06:37:19 +0530 Subject: [PATCH 6/9] revrted back --- common/src/indexer_service/http/indexer_service.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/common/src/indexer_service/http/indexer_service.rs b/common/src/indexer_service/http/indexer_service.rs index f4eb621b..78e60bb0 100644 --- a/common/src/indexer_service/http/indexer_service.rs +++ b/common/src/indexer_service/http/indexer_service.rs @@ -82,8 +82,6 @@ where { #[error("Issues with provided receipt: {0}")] ReceiptError(tap_core::Error), - // #[error("Service is not ready yet, try again in a moment")] - // ServiceNotReady, #[error("No attestation signer found for allocation `{0}`")] NoSignerForAllocation(Address), #[error("Invalid request body: {0}")] @@ -119,7 +117,6 @@ where } let status = match self { - //ServiceNotReady => StatusCode::SERVICE_UNAVAILABLE, Unauthorized => StatusCode::UNAUTHORIZED, NoSignerForAllocation(_) | FailedToSignAttestation => StatusCode::INTERNAL_SERVER_ERROR, From a6b743d6a9380a05c5fbea0dabe6632f8f2aeb39 Mon Sep 17 00:00:00 2001 From: taslimmuhammed Date: Thu, 31 Oct 2024 06:56:09 +0530 Subject: [PATCH 7/9] revrted back --- tap-agent/src/tap/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tap-agent/src/tap/mod.rs b/tap-agent/src/tap/mod.rs index ad4c4425..56be0537 100644 --- a/tap-agent/src/tap/mod.rs +++ b/tap-agent/src/tap/mod.rs @@ -13,10 +13,10 @@ pub mod escrow_adapter; pub mod test_utils; pub async fn signers_trimmed( - escrow_accounts: Receiver, + escrow_accounts_rx: Receiver, sender: Address, ) -> Result, anyhow::Error> { - let escrow_accounts = escrow_accounts.borrow(); + let escrow_accounts = escrow_accounts_rx.borrow(); let signers = escrow_accounts .get_signers_for_sender(&sender) .iter() From b3842e3124bf8c71bcfdc40a4f956ef6e6a29b77 Mon Sep 17 00:00:00 2001 From: taslimmuhammed Date: Fri, 1 Nov 2024 23:00:13 +0530 Subject: [PATCH 8/9] used watcher --- common/src/escrow_accounts.rs | 38 +++---------------- .../indexer_service/http/request_handler.rs | 4 +- 2 files changed, 8 insertions(+), 34 deletions(-) diff --git a/common/src/escrow_accounts.rs b/common/src/escrow_accounts.rs index fcfdaaa8..88cc9172 100644 --- a/common/src/escrow_accounts.rs +++ b/common/src/escrow_accounts.rs @@ -11,13 +11,10 @@ use alloy::primitives::{Address, U256}; use anyhow::{anyhow, Result}; use graphql_client::GraphQLQuery; use thiserror::Error; -use tokio::{ - sync::watch::{self, Receiver}, - time::{self, sleep}, -}; +use tokio::sync::watch::Receiver; use tracing::{error, warn}; -use crate::prelude::SubgraphClient; +use crate::{prelude::SubgraphClient, watcher}; #[derive(Error, Debug)] pub enum EscrowAccountsError { @@ -107,34 +104,11 @@ pub async fn escrow_accounts( interval: Duration, reject_thawing_signers: bool, ) -> Receiver { - let initial_accounts = + watcher::new_watcher(interval, move || { get_escrow_accounts(escrow_subgraph, indexer_address, reject_thawing_signers) - .await - .expect("Failed to create escrow_accounts channel"); - let (tx, rx) = watch::channel(initial_accounts); - tokio::spawn(async move { - let mut time_interval = time::interval(interval); - time_interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip); - loop { - time_interval.tick().await; - let result = - get_escrow_accounts(escrow_subgraph, indexer_address, reject_thawing_signers).await; - match result { - Ok(accounts) => tx - .send(accounts) - .expect("Failed to update escrow_accounts channel"), - Err(err) => { - error!( - "Failed to fetch escrow accounts for indexer {:?}: {}", - indexer_address, err - ); - // Sleep for a bit before we retry - sleep(interval.div_f32(2.0)).await; - } - } - } - }); - rx + }) + .await + .unwrap() } async fn get_escrow_accounts( diff --git a/common/src/indexer_service/http/request_handler.rs b/common/src/indexer_service/http/request_handler.rs index c9f6bb98..62ae21ce 100644 --- a/common/src/indexer_service/http/request_handler.rs +++ b/common/src/indexer_service/http/request_handler.rs @@ -141,8 +141,8 @@ where let signer = receipt .recover_signer(&state.domain_separator) .map_err(IndexerServiceError::CouldNotDecodeSigner)?; - let escrow_accounts = state.escrow_accounts.clone(); - let sender = escrow_accounts + let sender = state + .escrow_accounts .borrow() .get_sender_for_signer(&signer) .map_err(IndexerServiceError::EscrowAccount)?; From ebec51b11641eed70d05895c3b7a053a15bfce62 Mon Sep 17 00:00:00 2001 From: taslimmuhammed Date: Sat, 2 Nov 2024 06:16:50 +0530 Subject: [PATCH 9/9] changed escrow_accounts return type --- common/src/escrow_accounts.rs | 6 +++--- common/src/indexer_service/http/indexer_service.rs | 3 ++- tap-agent/src/agent.rs | 3 ++- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/common/src/escrow_accounts.rs b/common/src/escrow_accounts.rs index 88cc9172..e7496685 100644 --- a/common/src/escrow_accounts.rs +++ b/common/src/escrow_accounts.rs @@ -103,12 +103,11 @@ pub async fn escrow_accounts( indexer_address: Address, interval: Duration, reject_thawing_signers: bool, -) -> Receiver { +) -> Result, anyhow::Error> { watcher::new_watcher(interval, move || { get_escrow_accounts(escrow_subgraph, indexer_address, reject_thawing_signers) }) .await - .unwrap() } async fn get_escrow_accounts( @@ -230,7 +229,8 @@ mod tests { Duration::from_secs(60), true, ) - .await; + .await + .unwrap(); accounts.changed().await.unwrap(); assert_eq!( accounts.borrow().clone(), diff --git a/common/src/indexer_service/http/indexer_service.rs b/common/src/indexer_service/http/indexer_service.rs index 78e60bb0..3946eec4 100644 --- a/common/src/indexer_service/http/indexer_service.rs +++ b/common/src/indexer_service/http/indexer_service.rs @@ -299,7 +299,8 @@ impl IndexerService { options.config.subgraphs.escrow.config.syncing_interval_secs, true, // Reject thawing signers eagerly ) - .await; + .await + .expect("Error creating escrow_accounts channel"); // Establish Database connection necessary for serving indexer management // requests with defined schema diff --git a/tap-agent/src/agent.rs b/tap-agent/src/agent.rs index 91d2b52f..9ef0e34d 100644 --- a/tap-agent/src/agent.rs +++ b/tap-agent/src/agent.rs @@ -121,7 +121,8 @@ pub async fn start_agent() -> (ActorRef, JoinHandl *escrow_sync_interval, false, ) - .await; + .await + .expect("Error creating escrow_accounts channel"); let config = Box::leak(Box::new(SenderAccountConfig::from_config(&CONFIG)));