Skip to content

Commit

Permalink
refactor: drop eventuals in favor of tokio watch + timers for allocat…
Browse files Browse the repository at this point in the history
…ion id
  • Loading branch information
shiyasmohd committed Oct 31, 2024
1 parent 99cf66c commit 863894f
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 43 deletions.
17 changes: 10 additions & 7 deletions tap-agent/src/agent/sender_allocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,13 +350,16 @@ impl SenderAllocationState {
}: SenderAllocationArgs,
) -> anyhow::Result<Self> {
let required_checks: Vec<Arc<dyn Check + Send + Sync>> = vec![
Arc::new(AllocationId::new(
config.indexer_address,
config.escrow_polling_interval,
sender,
allocation_id,
escrow_subgraph,
)),
Arc::new(
AllocationId::new(
config.indexer_address,
config.escrow_polling_interval,
sender,
allocation_id,
escrow_subgraph,
)
.await,
),
Arc::new(Signature::new(
domain_separator.clone(),
escrow_accounts.clone(),
Expand Down
59 changes: 23 additions & 36 deletions tap-agent/src/tap/context/checks/allocation_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,37 @@ use std::time::Duration;

use alloy::primitives::Address;
use anyhow::anyhow;
use eventuals::{Eventual, EventualExt};
use graphql_client::GraphQLQuery;
use indexer_common::subgraph_client::SubgraphClient;
use indexer_common::{subgraph_client::SubgraphClient, watcher::new_watcher};
use tap_core::receipt::{
checks::{Check, CheckError, CheckResult},
state::Checking,
ReceiptWithState,
};
use tokio::time::sleep;
use tracing::error;
use tokio::sync::watch::Receiver;

pub struct AllocationId {
tap_allocation_redeemed: Eventual<bool>,
tap_allocation_redeemed: Receiver<bool>,
allocation_id: Address,
}

impl AllocationId {
pub fn new(
pub async fn new(
indexer_address: Address,
escrow_polling_interval: Duration,
sender_id: Address,
allocation_id: Address,
escrow_subgraph: &'static SubgraphClient,
) -> Self {
let tap_allocation_redeemed = tap_allocation_redeemed_eventual(
let tap_allocation_redeemed = tap_allocation_redeemed_watcher(
allocation_id,
sender_id,
indexer_address,
escrow_subgraph,
escrow_polling_interval,
);
)
.await
.expect("Failed to initialize tap_allocation_redeemed_watcher");

Self {
tap_allocation_redeemed,
Expand All @@ -60,46 +60,33 @@ impl Check for AllocationId {
};

// Check that the allocation ID is not redeemed yet for this consumer
match self.tap_allocation_redeemed.value().await {
Ok(false) => Ok(()),
Ok(true) => Err(CheckError::Failed(anyhow!(
match *self.tap_allocation_redeemed.borrow() {
false => Ok(()),
true => Err(CheckError::Failed(anyhow!(
"Allocation {} already redeemed",
allocation_id
))),
Err(e) => Err(CheckError::Retryable(anyhow!(
"Could not get allocation escrow redemption status from eventual: {:?}",
e
))),
}
}
}

fn tap_allocation_redeemed_eventual(
async fn tap_allocation_redeemed_watcher(
allocation_id: Address,
sender_address: Address,
indexer_address: Address,
escrow_subgraph: &'static SubgraphClient,
escrow_polling_interval: Duration,
) -> Eventual<bool> {
eventuals::timer(escrow_polling_interval).map_with_retry(
move |_| async move {
query_escrow_check_transactions(
allocation_id,
sender_address,
indexer_address,
escrow_subgraph,
)
.await
.map_err(|e| e.to_string())
},
move |error: String| {
error!(
"Failed to check the escrow redeem status for allocation {} and sender {}: {}",
allocation_id, sender_address, error
);
sleep(escrow_polling_interval.div_f32(2.))
},
)
) -> anyhow::Result<Receiver<bool>> {
new_watcher(escrow_polling_interval, move || async move {
query_escrow_check_transactions(
allocation_id,
sender_address,
indexer_address,
escrow_subgraph,
)
.await
})
.await
}

#[derive(GraphQLQuery)]
Expand Down

0 comments on commit 863894f

Please sign in to comment.