From 863894f960edd7de50d02238f35538c549d30e75 Mon Sep 17 00:00:00 2001
From: shiyasmohd <shiyasjaseena124@gmail.com>
Date: Thu, 31 Oct 2024 14:42:42 +0530
Subject: [PATCH 1/2] refactor: drop eventuals in favor of tokio watch + timers
 for allocation id

---
 tap-agent/src/agent/sender_allocation.rs      | 17 +++---
 .../src/tap/context/checks/allocation_id.rs   | 59 ++++++++-----------
 2 files changed, 33 insertions(+), 43 deletions(-)

diff --git a/tap-agent/src/agent/sender_allocation.rs b/tap-agent/src/agent/sender_allocation.rs
index 7c23fe0f..498fdaa7 100644
--- a/tap-agent/src/agent/sender_allocation.rs
+++ b/tap-agent/src/agent/sender_allocation.rs
@@ -350,13 +350,16 @@ impl SenderAllocationState {
         }: SenderAllocationArgs,
     ) -> anyhow::Result<Self> {
         let required_checks: Vec<Arc<dyn Check + Send + Sync>> = vec![
-            Arc::new(AllocationId::new(
-                config.indexer_address,
-                config.escrow_polling_interval,
-                sender,
-                allocation_id,
-                escrow_subgraph,
-            )),
+            Arc::new(
+                AllocationId::new(
+                    config.indexer_address,
+                    config.escrow_polling_interval,
+                    sender,
+                    allocation_id,
+                    escrow_subgraph,
+                )
+                .await,
+            ),
             Arc::new(Signature::new(
                 domain_separator.clone(),
                 escrow_accounts.clone(),
diff --git a/tap-agent/src/tap/context/checks/allocation_id.rs b/tap-agent/src/tap/context/checks/allocation_id.rs
index 978bd7c4..d1fe548c 100644
--- a/tap-agent/src/tap/context/checks/allocation_id.rs
+++ b/tap-agent/src/tap/context/checks/allocation_id.rs
@@ -5,37 +5,37 @@ use std::time::Duration;
 
 use alloy::primitives::Address;
 use anyhow::anyhow;
-use eventuals::{Eventual, EventualExt};
 use graphql_client::GraphQLQuery;
-use indexer_common::subgraph_client::SubgraphClient;
+use indexer_common::{subgraph_client::SubgraphClient, watcher::new_watcher};
 use tap_core::receipt::{
     checks::{Check, CheckError, CheckResult},
     state::Checking,
     ReceiptWithState,
 };
-use tokio::time::sleep;
-use tracing::error;
+use tokio::sync::watch::Receiver;
 
 pub struct AllocationId {
-    tap_allocation_redeemed: Eventual<bool>,
+    tap_allocation_redeemed: Receiver<bool>,
     allocation_id: Address,
 }
 
 impl AllocationId {
-    pub fn new(
+    pub async fn new(
         indexer_address: Address,
         escrow_polling_interval: Duration,
         sender_id: Address,
         allocation_id: Address,
         escrow_subgraph: &'static SubgraphClient,
     ) -> Self {
-        let tap_allocation_redeemed = tap_allocation_redeemed_eventual(
+        let tap_allocation_redeemed = tap_allocation_redeemed_watcher(
             allocation_id,
             sender_id,
             indexer_address,
             escrow_subgraph,
             escrow_polling_interval,
-        );
+        )
+        .await
+        .expect("Failed to initialize tap_allocation_redeemed_watcher");
 
         Self {
             tap_allocation_redeemed,
@@ -60,46 +60,33 @@ impl Check for AllocationId {
         };
 
         // Check that the allocation ID is not redeemed yet for this consumer
-        match self.tap_allocation_redeemed.value().await {
-            Ok(false) => Ok(()),
-            Ok(true) => Err(CheckError::Failed(anyhow!(
+        match *self.tap_allocation_redeemed.borrow() {
+            false => Ok(()),
+            true => Err(CheckError::Failed(anyhow!(
                 "Allocation {} already redeemed",
                 allocation_id
             ))),
-            Err(e) => Err(CheckError::Retryable(anyhow!(
-                "Could not get allocation escrow redemption status from eventual: {:?}",
-                e
-            ))),
         }
     }
 }
 
-fn tap_allocation_redeemed_eventual(
+async fn tap_allocation_redeemed_watcher(
     allocation_id: Address,
     sender_address: Address,
     indexer_address: Address,
     escrow_subgraph: &'static SubgraphClient,
     escrow_polling_interval: Duration,
-) -> Eventual<bool> {
-    eventuals::timer(escrow_polling_interval).map_with_retry(
-        move |_| async move {
-            query_escrow_check_transactions(
-                allocation_id,
-                sender_address,
-                indexer_address,
-                escrow_subgraph,
-            )
-            .await
-            .map_err(|e| e.to_string())
-        },
-        move |error: String| {
-            error!(
-                "Failed to check the escrow redeem status for allocation {} and sender {}: {}",
-                allocation_id, sender_address, error
-            );
-            sleep(escrow_polling_interval.div_f32(2.))
-        },
-    )
+) -> anyhow::Result<Receiver<bool>> {
+    new_watcher(escrow_polling_interval, move || async move {
+        query_escrow_check_transactions(
+            allocation_id,
+            sender_address,
+            indexer_address,
+            escrow_subgraph,
+        )
+        .await
+    })
+    .await
 }
 
 #[derive(GraphQLQuery)]

From 3e7f70b37d423736f4b5fd7a897877c863c6c041 Mon Sep 17 00:00:00 2001
From: shiyasmohd <shiyasjaseena124@gmail.com>
Date: Fri, 1 Nov 2024 00:05:38 +0530
Subject: [PATCH 2/2] test: add mock escrow subgraph server with sample
 response

---
 tap-agent/src/agent/sender_account.rs    |  30 ++++++-
 tap-agent/src/agent/sender_allocation.rs | 101 +++++++++++++++++------
 2 files changed, 103 insertions(+), 28 deletions(-)

diff --git a/tap-agent/src/agent/sender_account.rs b/tap-agent/src/agent/sender_account.rs
index a19123ac..9a22125d 100644
--- a/tap-agent/src/agent/sender_account.rs
+++ b/tap-agent/src/agent/sender_account.rs
@@ -1059,7 +1059,7 @@ pub mod tests {
     use std::time::{Duration, SystemTime, UNIX_EPOCH};
     use tokio::sync::watch;
     use wiremock::matchers::{body_string_contains, method};
-    use wiremock::{Mock, MockServer, ResponseTemplate};
+    use wiremock::{Mock, MockGuard, MockServer, ResponseTemplate};
 
     // we implement the PartialEq and Eq traits for SenderAccountMessage to be able to compare
     impl Eq for SenderAccountMessage {}
@@ -1110,6 +1110,23 @@ pub mod tests {
     const BUFFER_MS: u64 = 100;
     const RECEIPT_LIMIT: u64 = 10000;
 
+    async fn mock_escrow_subgraph() -> (MockServer, MockGuard) {
+        let mock_ecrow_subgraph_server: MockServer = MockServer::start().await;
+        let _mock_ecrow_subgraph = mock_ecrow_subgraph_server
+                .register_as_scoped(
+                    Mock::given(method("POST"))
+                        .and(body_string_contains("TapTransactions"))
+                        .respond_with(ResponseTemplate::new(200).set_body_json(json!({ "data": {
+                                "transactions": [{
+                                    "id": "0x00224ee6ad4ae77b817b4e509dc29d644da9004ad0c44005a7f34481d421256409000000"
+                                }],
+                            }
+                        }))),
+                )
+                .await;
+        (mock_ecrow_subgraph_server, _mock_ecrow_subgraph)
+    }
+
     async fn create_sender_account(
         pgpool: PgPool,
         initial_allocation: HashSet<Address>,
@@ -1201,12 +1218,14 @@ pub mod tests {
             )
             .await;
 
+        let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
+
         let (sender_account, handle, prefix, _) = create_sender_account(
             pgpool,
             HashSet::new(),
             TRIGGER_VALUE,
             TRIGGER_VALUE,
-            DUMMY_URL,
+            &mock_escrow_subgraph_server.uri(),
             &mock_server.uri(),
             RECEIPT_LIMIT,
         )
@@ -1295,12 +1314,14 @@ pub mod tests {
             )
             .await;
 
+        let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
+
         let (sender_account, handle, prefix, _) = create_sender_account(
             pgpool,
             HashSet::new(),
             TRIGGER_VALUE,
             TRIGGER_VALUE,
-            DUMMY_URL,
+            &mock_escrow_subgraph_server.uri(),
             &mock_server.uri(),
             RECEIPT_LIMIT,
         )
@@ -1711,12 +1732,13 @@ pub mod tests {
 
     #[sqlx::test(migrations = "../migrations")]
     async fn test_remove_sender_account(pgpool: PgPool) {
+        let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
         let (sender_account, handle, prefix, _) = create_sender_account(
             pgpool,
             vec![*ALLOCATION_ID_0].into_iter().collect(),
             TRIGGER_VALUE,
             TRIGGER_VALUE,
-            DUMMY_URL,
+            &mock_escrow_subgraph_server.uri(),
             DUMMY_URL,
             RECEIPT_LIMIT,
         )
diff --git a/tap-agent/src/agent/sender_allocation.rs b/tap-agent/src/agent/sender_allocation.rs
index 498fdaa7..5d45d1ca 100644
--- a/tap-agent/src/agent/sender_allocation.rs
+++ b/tap-agent/src/agent/sender_allocation.rs
@@ -907,7 +907,7 @@ pub mod tests {
     use tokio::sync::mpsc;
     use wiremock::{
         matchers::{body_string_contains, method},
-        Mock, MockServer, Respond, ResponseTemplate,
+        Mock, MockGuard, MockServer, Respond, ResponseTemplate,
     };
 
     const DUMMY_URL: &str = "http://localhost:1234";
@@ -916,6 +916,23 @@ pub mod tests {
         pub last_message_emitted: tokio::sync::mpsc::Sender<SenderAccountMessage>,
     }
 
+    async fn mock_escrow_subgraph() -> (MockServer, MockGuard) {
+        let mock_ecrow_subgraph_server: MockServer = MockServer::start().await;
+        let _mock_ecrow_subgraph = mock_ecrow_subgraph_server
+                .register_as_scoped(
+                    Mock::given(method("POST"))
+                        .and(body_string_contains("TapTransactions"))
+                        .respond_with(ResponseTemplate::new(200).set_body_json(json!({ "data": {
+                                "transactions": [{
+                                    "id": "0x00224ee6ad4ae77b817b4e509dc29d644da9004ad0c44005a7f34481d421256409000000"
+                                }],
+                            }
+                        }))),
+                )
+                .await;
+        (mock_ecrow_subgraph_server, _mock_ecrow_subgraph)
+    }
+
     #[async_trait::async_trait]
     impl Actor for MockSenderAccount {
         type Msg = SenderAccountMessage;
@@ -1029,6 +1046,7 @@ pub mod tests {
 
     #[sqlx::test(migrations = "../migrations")]
     async fn should_update_unaggregated_fees_on_start(pgpool: PgPool) {
+        let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
         let (mut last_message_emitted, sender_account, _join_handle) =
             create_mock_sender_account().await;
         // Add receipts to the database.
@@ -1042,7 +1060,7 @@ pub mod tests {
         let sender_allocation = create_sender_allocation(
             pgpool.clone(),
             DUMMY_URL.to_string(),
-            DUMMY_URL,
+            &mock_escrow_subgraph_server.uri(),
             Some(sender_account),
         )
         .await;
@@ -1072,6 +1090,7 @@ pub mod tests {
 
     #[sqlx::test(migrations = "../migrations")]
     async fn should_return_invalid_receipts_on_startup(pgpool: PgPool) {
+        let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
         let (mut message_receiver, sender_account, _join_handle) =
             create_mock_sender_account().await;
         // Add receipts to the database.
@@ -1085,7 +1104,7 @@ pub mod tests {
         let sender_allocation = create_sender_allocation(
             pgpool.clone(),
             DUMMY_URL.to_string(),
-            DUMMY_URL,
+            &mock_escrow_subgraph_server.uri(),
             Some(sender_account),
         )
         .await;
@@ -1123,13 +1142,14 @@ pub mod tests {
 
     #[sqlx::test(migrations = "../migrations")]
     async fn test_receive_new_receipt(pgpool: PgPool) {
+        let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
         let (mut message_receiver, sender_account, _join_handle) =
             create_mock_sender_account().await;
 
         let sender_allocation = create_sender_allocation(
             pgpool.clone(),
             DUMMY_URL.to_string(),
-            DUMMY_URL,
+            &mock_escrow_subgraph_server.uri(),
             Some(sender_account),
         )
         .await;
@@ -1294,6 +1314,7 @@ pub mod tests {
 
     #[sqlx::test(migrations = "../migrations")]
     async fn test_close_allocation_no_pending_fees(pgpool: PgPool) {
+        let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
         let (mut message_receiver, sender_account, _join_handle) =
             create_mock_sender_account().await;
 
@@ -1301,7 +1322,7 @@ pub mod tests {
         let sender_allocation = create_sender_allocation(
             pgpool.clone(),
             DUMMY_URL.to_string(),
-            DUMMY_URL,
+            &mock_escrow_subgraph_server.uri(),
             Some(sender_account),
         )
         .await;
@@ -1415,9 +1436,14 @@ pub mod tests {
 
     #[sqlx::test(migrations = "../migrations")]
     async fn should_return_unaggregated_fees_without_rav(pgpool: PgPool) {
-        let args =
-            create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None)
-                .await;
+        let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
+        let args = create_sender_allocation_args(
+            pgpool.clone(),
+            DUMMY_URL.to_string(),
+            &mock_escrow_subgraph_server.uri(),
+            None,
+        )
+        .await;
         let state = SenderAllocationState::new(args).await.unwrap();
 
         // Add receipts to the database.
@@ -1437,9 +1463,14 @@ pub mod tests {
 
     #[sqlx::test(migrations = "../migrations")]
     async fn should_calculate_invalid_receipts_fee(pgpool: PgPool) {
-        let args =
-            create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None)
-                .await;
+        let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
+        let args = create_sender_allocation_args(
+            pgpool.clone(),
+            DUMMY_URL.to_string(),
+            &mock_escrow_subgraph_server.uri(),
+            None,
+        )
+        .await;
         let state = SenderAllocationState::new(args).await.unwrap();
 
         // Add receipts to the database.
@@ -1465,9 +1496,14 @@ pub mod tests {
     /// than the RAV's timestamp.
     #[sqlx::test(migrations = "../migrations")]
     async fn should_return_unaggregated_fees_with_rav(pgpool: PgPool) {
-        let args =
-            create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None)
-                .await;
+        let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
+        let args = create_sender_allocation_args(
+            pgpool.clone(),
+            DUMMY_URL.to_string(),
+            &mock_escrow_subgraph_server.uri(),
+            None,
+        )
+        .await;
         let state = SenderAllocationState::new(args).await.unwrap();
 
         // Add the RAV to the database.
@@ -1492,9 +1528,14 @@ pub mod tests {
 
     #[sqlx::test(migrations = "../migrations")]
     async fn test_store_failed_rav(pgpool: PgPool) {
-        let args =
-            create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None)
-                .await;
+        let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
+        let args = create_sender_allocation_args(
+            pgpool.clone(),
+            DUMMY_URL.to_string(),
+            &mock_escrow_subgraph_server.uri(),
+            None,
+        )
+        .await;
         let state = SenderAllocationState::new(args).await.unwrap();
 
         let signed_rav = create_rav(*ALLOCATION_ID_0, SIGNER.0.clone(), 4, 10);
@@ -1522,9 +1563,14 @@ pub mod tests {
             }
         }
 
-        let args =
-            create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None)
-                .await;
+        let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
+        let args = create_sender_allocation_args(
+            pgpool.clone(),
+            DUMMY_URL.to_string(),
+            &mock_escrow_subgraph_server.uri(),
+            None,
+        )
+        .await;
         let mut state = SenderAllocationState::new(args).await.unwrap();
 
         let checks = CheckList::new(vec![Arc::new(FailingCheck)]);
@@ -1556,12 +1602,17 @@ pub mod tests {
 
     #[sqlx::test(migrations = "../migrations")]
     async fn test_mark_rav_last(pgpool: PgPool) {
+        let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
         let signed_rav = create_rav(*ALLOCATION_ID_0, SIGNER.0.clone(), 4, 10);
         store_rav(&pgpool, signed_rav, SENDER.1).await.unwrap();
 
-        let args =
-            create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None)
-                .await;
+        let args = create_sender_allocation_args(
+            pgpool.clone(),
+            DUMMY_URL.to_string(),
+            &mock_escrow_subgraph_server.uri(),
+            None,
+        )
+        .await;
         let state = SenderAllocationState::new(args).await.unwrap();
 
         // mark rav as final
@@ -1573,6 +1624,8 @@ pub mod tests {
 
     #[sqlx::test(migrations = "../migrations")]
     async fn test_failed_rav_request(pgpool: PgPool) {
+        let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
+
         // Add receipts to the database.
         for i in 0..10 {
             let receipt =
@@ -1589,7 +1642,7 @@ pub mod tests {
         let sender_allocation = create_sender_allocation(
             pgpool.clone(),
             DUMMY_URL.to_string(),
-            DUMMY_URL,
+            &mock_escrow_subgraph_server.uri(),
             Some(sender_account),
         )
         .await;