diff --git a/tap-agent/src/agent/sender_account.rs b/tap-agent/src/agent/sender_account.rs index a19123ac..9a22125d 100644 --- a/tap-agent/src/agent/sender_account.rs +++ b/tap-agent/src/agent/sender_account.rs @@ -1059,7 +1059,7 @@ pub mod tests { use std::time::{Duration, SystemTime, UNIX_EPOCH}; use tokio::sync::watch; use wiremock::matchers::{body_string_contains, method}; - use wiremock::{Mock, MockServer, ResponseTemplate}; + use wiremock::{Mock, MockGuard, MockServer, ResponseTemplate}; // we implement the PartialEq and Eq traits for SenderAccountMessage to be able to compare impl Eq for SenderAccountMessage {} @@ -1110,6 +1110,23 @@ pub mod tests { const BUFFER_MS: u64 = 100; const RECEIPT_LIMIT: u64 = 10000; + async fn mock_escrow_subgraph() -> (MockServer, MockGuard) { + let mock_ecrow_subgraph_server: MockServer = MockServer::start().await; + let _mock_ecrow_subgraph = mock_ecrow_subgraph_server + .register_as_scoped( + Mock::given(method("POST")) + .and(body_string_contains("TapTransactions")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ "data": { + "transactions": [{ + "id": "0x00224ee6ad4ae77b817b4e509dc29d644da9004ad0c44005a7f34481d421256409000000" + }], + } + }))), + ) + .await; + (mock_ecrow_subgraph_server, _mock_ecrow_subgraph) + } + async fn create_sender_account( pgpool: PgPool, initial_allocation: HashSet
, @@ -1201,12 +1218,14 @@ pub mod tests { ) .await; + let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await; + let (sender_account, handle, prefix, _) = create_sender_account( pgpool, HashSet::new(), TRIGGER_VALUE, TRIGGER_VALUE, - DUMMY_URL, + &mock_escrow_subgraph_server.uri(), &mock_server.uri(), RECEIPT_LIMIT, ) @@ -1295,12 +1314,14 @@ pub mod tests { ) .await; + let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await; + let (sender_account, handle, prefix, _) = create_sender_account( pgpool, HashSet::new(), TRIGGER_VALUE, TRIGGER_VALUE, - DUMMY_URL, + &mock_escrow_subgraph_server.uri(), &mock_server.uri(), RECEIPT_LIMIT, ) @@ -1711,12 +1732,13 @@ pub mod tests { #[sqlx::test(migrations = "../migrations")] async fn test_remove_sender_account(pgpool: PgPool) { + let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await; let (sender_account, handle, prefix, _) = create_sender_account( pgpool, vec![*ALLOCATION_ID_0].into_iter().collect(), TRIGGER_VALUE, TRIGGER_VALUE, - DUMMY_URL, + &mock_escrow_subgraph_server.uri(), DUMMY_URL, RECEIPT_LIMIT, ) diff --git a/tap-agent/src/agent/sender_allocation.rs b/tap-agent/src/agent/sender_allocation.rs index 7c23fe0f..5d45d1ca 100644 --- a/tap-agent/src/agent/sender_allocation.rs +++ b/tap-agent/src/agent/sender_allocation.rs @@ -350,13 +350,16 @@ impl SenderAllocationState { }: SenderAllocationArgs, ) -> anyhow::Result { let required_checks: Vec> = vec![ - Arc::new(AllocationId::new( - config.indexer_address, - config.escrow_polling_interval, - sender, - allocation_id, - escrow_subgraph, - )), + Arc::new( + AllocationId::new( + config.indexer_address, + config.escrow_polling_interval, + sender, + allocation_id, + escrow_subgraph, + ) + .await, + ), Arc::new(Signature::new( domain_separator.clone(), escrow_accounts.clone(), @@ -904,7 +907,7 @@ pub mod tests { use tokio::sync::mpsc; use wiremock::{ matchers::{body_string_contains, method}, - Mock, MockServer, Respond, ResponseTemplate, + Mock, MockGuard, MockServer, Respond, ResponseTemplate, }; const DUMMY_URL: &str = "http://localhost:1234"; @@ -913,6 +916,23 @@ pub mod tests { pub last_message_emitted: tokio::sync::mpsc::Sender, } + async fn mock_escrow_subgraph() -> (MockServer, MockGuard) { + let mock_ecrow_subgraph_server: MockServer = MockServer::start().await; + let _mock_ecrow_subgraph = mock_ecrow_subgraph_server + .register_as_scoped( + Mock::given(method("POST")) + .and(body_string_contains("TapTransactions")) + .respond_with(ResponseTemplate::new(200).set_body_json(json!({ "data": { + "transactions": [{ + "id": "0x00224ee6ad4ae77b817b4e509dc29d644da9004ad0c44005a7f34481d421256409000000" + }], + } + }))), + ) + .await; + (mock_ecrow_subgraph_server, _mock_ecrow_subgraph) + } + #[async_trait::async_trait] impl Actor for MockSenderAccount { type Msg = SenderAccountMessage; @@ -1026,6 +1046,7 @@ pub mod tests { #[sqlx::test(migrations = "../migrations")] async fn should_update_unaggregated_fees_on_start(pgpool: PgPool) { + let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await; let (mut last_message_emitted, sender_account, _join_handle) = create_mock_sender_account().await; // Add receipts to the database. @@ -1039,7 +1060,7 @@ pub mod tests { let sender_allocation = create_sender_allocation( pgpool.clone(), DUMMY_URL.to_string(), - DUMMY_URL, + &mock_escrow_subgraph_server.uri(), Some(sender_account), ) .await; @@ -1069,6 +1090,7 @@ pub mod tests { #[sqlx::test(migrations = "../migrations")] async fn should_return_invalid_receipts_on_startup(pgpool: PgPool) { + let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await; let (mut message_receiver, sender_account, _join_handle) = create_mock_sender_account().await; // Add receipts to the database. @@ -1082,7 +1104,7 @@ pub mod tests { let sender_allocation = create_sender_allocation( pgpool.clone(), DUMMY_URL.to_string(), - DUMMY_URL, + &mock_escrow_subgraph_server.uri(), Some(sender_account), ) .await; @@ -1120,13 +1142,14 @@ pub mod tests { #[sqlx::test(migrations = "../migrations")] async fn test_receive_new_receipt(pgpool: PgPool) { + let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await; let (mut message_receiver, sender_account, _join_handle) = create_mock_sender_account().await; let sender_allocation = create_sender_allocation( pgpool.clone(), DUMMY_URL.to_string(), - DUMMY_URL, + &mock_escrow_subgraph_server.uri(), Some(sender_account), ) .await; @@ -1291,6 +1314,7 @@ pub mod tests { #[sqlx::test(migrations = "../migrations")] async fn test_close_allocation_no_pending_fees(pgpool: PgPool) { + let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await; let (mut message_receiver, sender_account, _join_handle) = create_mock_sender_account().await; @@ -1298,7 +1322,7 @@ pub mod tests { let sender_allocation = create_sender_allocation( pgpool.clone(), DUMMY_URL.to_string(), - DUMMY_URL, + &mock_escrow_subgraph_server.uri(), Some(sender_account), ) .await; @@ -1412,9 +1436,14 @@ pub mod tests { #[sqlx::test(migrations = "../migrations")] async fn should_return_unaggregated_fees_without_rav(pgpool: PgPool) { - let args = - create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None) - .await; + let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await; + let args = create_sender_allocation_args( + pgpool.clone(), + DUMMY_URL.to_string(), + &mock_escrow_subgraph_server.uri(), + None, + ) + .await; let state = SenderAllocationState::new(args).await.unwrap(); // Add receipts to the database. @@ -1434,9 +1463,14 @@ pub mod tests { #[sqlx::test(migrations = "../migrations")] async fn should_calculate_invalid_receipts_fee(pgpool: PgPool) { - let args = - create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None) - .await; + let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await; + let args = create_sender_allocation_args( + pgpool.clone(), + DUMMY_URL.to_string(), + &mock_escrow_subgraph_server.uri(), + None, + ) + .await; let state = SenderAllocationState::new(args).await.unwrap(); // Add receipts to the database. @@ -1462,9 +1496,14 @@ pub mod tests { /// than the RAV's timestamp. #[sqlx::test(migrations = "../migrations")] async fn should_return_unaggregated_fees_with_rav(pgpool: PgPool) { - let args = - create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None) - .await; + let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await; + let args = create_sender_allocation_args( + pgpool.clone(), + DUMMY_URL.to_string(), + &mock_escrow_subgraph_server.uri(), + None, + ) + .await; let state = SenderAllocationState::new(args).await.unwrap(); // Add the RAV to the database. @@ -1489,9 +1528,14 @@ pub mod tests { #[sqlx::test(migrations = "../migrations")] async fn test_store_failed_rav(pgpool: PgPool) { - let args = - create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None) - .await; + let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await; + let args = create_sender_allocation_args( + pgpool.clone(), + DUMMY_URL.to_string(), + &mock_escrow_subgraph_server.uri(), + None, + ) + .await; let state = SenderAllocationState::new(args).await.unwrap(); let signed_rav = create_rav(*ALLOCATION_ID_0, SIGNER.0.clone(), 4, 10); @@ -1519,9 +1563,14 @@ pub mod tests { } } - let args = - create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None) - .await; + let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await; + let args = create_sender_allocation_args( + pgpool.clone(), + DUMMY_URL.to_string(), + &mock_escrow_subgraph_server.uri(), + None, + ) + .await; let mut state = SenderAllocationState::new(args).await.unwrap(); let checks = CheckList::new(vec![Arc::new(FailingCheck)]); @@ -1553,12 +1602,17 @@ pub mod tests { #[sqlx::test(migrations = "../migrations")] async fn test_mark_rav_last(pgpool: PgPool) { + let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await; let signed_rav = create_rav(*ALLOCATION_ID_0, SIGNER.0.clone(), 4, 10); store_rav(&pgpool, signed_rav, SENDER.1).await.unwrap(); - let args = - create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None) - .await; + let args = create_sender_allocation_args( + pgpool.clone(), + DUMMY_URL.to_string(), + &mock_escrow_subgraph_server.uri(), + None, + ) + .await; let state = SenderAllocationState::new(args).await.unwrap(); // mark rav as final @@ -1570,6 +1624,8 @@ pub mod tests { #[sqlx::test(migrations = "../migrations")] async fn test_failed_rav_request(pgpool: PgPool) { + let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await; + // Add receipts to the database. for i in 0..10 { let receipt = @@ -1586,7 +1642,7 @@ pub mod tests { let sender_allocation = create_sender_allocation( pgpool.clone(), DUMMY_URL.to_string(), - DUMMY_URL, + &mock_escrow_subgraph_server.uri(), Some(sender_account), ) .await; diff --git a/tap-agent/src/tap/context/checks/allocation_id.rs b/tap-agent/src/tap/context/checks/allocation_id.rs index 978bd7c4..d1fe548c 100644 --- a/tap-agent/src/tap/context/checks/allocation_id.rs +++ b/tap-agent/src/tap/context/checks/allocation_id.rs @@ -5,37 +5,37 @@ use std::time::Duration; use alloy::primitives::Address; use anyhow::anyhow; -use eventuals::{Eventual, EventualExt}; use graphql_client::GraphQLQuery; -use indexer_common::subgraph_client::SubgraphClient; +use indexer_common::{subgraph_client::SubgraphClient, watcher::new_watcher}; use tap_core::receipt::{ checks::{Check, CheckError, CheckResult}, state::Checking, ReceiptWithState, }; -use tokio::time::sleep; -use tracing::error; +use tokio::sync::watch::Receiver; pub struct AllocationId { - tap_allocation_redeemed: Eventual, + tap_allocation_redeemed: Receiver, allocation_id: Address, } impl AllocationId { - pub fn new( + pub async fn new( indexer_address: Address, escrow_polling_interval: Duration, sender_id: Address, allocation_id: Address, escrow_subgraph: &'static SubgraphClient, ) -> Self { - let tap_allocation_redeemed = tap_allocation_redeemed_eventual( + let tap_allocation_redeemed = tap_allocation_redeemed_watcher( allocation_id, sender_id, indexer_address, escrow_subgraph, escrow_polling_interval, - ); + ) + .await + .expect("Failed to initialize tap_allocation_redeemed_watcher"); Self { tap_allocation_redeemed, @@ -60,46 +60,33 @@ impl Check for AllocationId { }; // Check that the allocation ID is not redeemed yet for this consumer - match self.tap_allocation_redeemed.value().await { - Ok(false) => Ok(()), - Ok(true) => Err(CheckError::Failed(anyhow!( + match *self.tap_allocation_redeemed.borrow() { + false => Ok(()), + true => Err(CheckError::Failed(anyhow!( "Allocation {} already redeemed", allocation_id ))), - Err(e) => Err(CheckError::Retryable(anyhow!( - "Could not get allocation escrow redemption status from eventual: {:?}", - e - ))), } } } -fn tap_allocation_redeemed_eventual( +async fn tap_allocation_redeemed_watcher( allocation_id: Address, sender_address: Address, indexer_address: Address, escrow_subgraph: &'static SubgraphClient, escrow_polling_interval: Duration, -) -> Eventual { - eventuals::timer(escrow_polling_interval).map_with_retry( - move |_| async move { - query_escrow_check_transactions( - allocation_id, - sender_address, - indexer_address, - escrow_subgraph, - ) - .await - .map_err(|e| e.to_string()) - }, - move |error: String| { - error!( - "Failed to check the escrow redeem status for allocation {} and sender {}: {}", - allocation_id, sender_address, error - ); - sleep(escrow_polling_interval.div_f32(2.)) - }, - ) +) -> anyhow::Result> { + new_watcher(escrow_polling_interval, move || async move { + query_escrow_check_transactions( + allocation_id, + sender_address, + indexer_address, + escrow_subgraph, + ) + .await + }) + .await } #[derive(GraphQLQuery)]