Skip to content

Commit

Permalink
refactor: use tokio::watch for allocation id check (#443)
Browse files Browse the repository at this point in the history
  • Loading branch information
shiyasmohd authored Nov 1, 2024
1 parent 99cf66c commit 4d6ee1c
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 71 deletions.
30 changes: 26 additions & 4 deletions tap-agent/src/agent/sender_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1059,7 +1059,7 @@ pub mod tests {
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::watch;
use wiremock::matchers::{body_string_contains, method};
use wiremock::{Mock, MockServer, ResponseTemplate};
use wiremock::{Mock, MockGuard, MockServer, ResponseTemplate};

// we implement the PartialEq and Eq traits for SenderAccountMessage to be able to compare
impl Eq for SenderAccountMessage {}
Expand Down Expand Up @@ -1110,6 +1110,23 @@ pub mod tests {
const BUFFER_MS: u64 = 100;
const RECEIPT_LIMIT: u64 = 10000;

async fn mock_escrow_subgraph() -> (MockServer, MockGuard) {
let mock_ecrow_subgraph_server: MockServer = MockServer::start().await;
let _mock_ecrow_subgraph = mock_ecrow_subgraph_server
.register_as_scoped(
Mock::given(method("POST"))
.and(body_string_contains("TapTransactions"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({ "data": {
"transactions": [{
"id": "0x00224ee6ad4ae77b817b4e509dc29d644da9004ad0c44005a7f34481d421256409000000"
}],
}
}))),
)
.await;
(mock_ecrow_subgraph_server, _mock_ecrow_subgraph)
}

async fn create_sender_account(
pgpool: PgPool,
initial_allocation: HashSet<Address>,
Expand Down Expand Up @@ -1201,12 +1218,14 @@ pub mod tests {
)
.await;

let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;

let (sender_account, handle, prefix, _) = create_sender_account(
pgpool,
HashSet::new(),
TRIGGER_VALUE,
TRIGGER_VALUE,
DUMMY_URL,
&mock_escrow_subgraph_server.uri(),
&mock_server.uri(),
RECEIPT_LIMIT,
)
Expand Down Expand Up @@ -1295,12 +1314,14 @@ pub mod tests {
)
.await;

let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;

let (sender_account, handle, prefix, _) = create_sender_account(
pgpool,
HashSet::new(),
TRIGGER_VALUE,
TRIGGER_VALUE,
DUMMY_URL,
&mock_escrow_subgraph_server.uri(),
&mock_server.uri(),
RECEIPT_LIMIT,
)
Expand Down Expand Up @@ -1711,12 +1732,13 @@ pub mod tests {

#[sqlx::test(migrations = "../migrations")]
async fn test_remove_sender_account(pgpool: PgPool) {
let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
let (sender_account, handle, prefix, _) = create_sender_account(
pgpool,
vec![*ALLOCATION_ID_0].into_iter().collect(),
TRIGGER_VALUE,
TRIGGER_VALUE,
DUMMY_URL,
&mock_escrow_subgraph_server.uri(),
DUMMY_URL,
RECEIPT_LIMIT,
)
Expand Down
118 changes: 87 additions & 31 deletions tap-agent/src/agent/sender_allocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,13 +350,16 @@ impl SenderAllocationState {
}: SenderAllocationArgs,
) -> anyhow::Result<Self> {
let required_checks: Vec<Arc<dyn Check + Send + Sync>> = vec![
Arc::new(AllocationId::new(
config.indexer_address,
config.escrow_polling_interval,
sender,
allocation_id,
escrow_subgraph,
)),
Arc::new(
AllocationId::new(
config.indexer_address,
config.escrow_polling_interval,
sender,
allocation_id,
escrow_subgraph,
)
.await,
),
Arc::new(Signature::new(
domain_separator.clone(),
escrow_accounts.clone(),
Expand Down Expand Up @@ -904,7 +907,7 @@ pub mod tests {
use tokio::sync::mpsc;
use wiremock::{
matchers::{body_string_contains, method},
Mock, MockServer, Respond, ResponseTemplate,
Mock, MockGuard, MockServer, Respond, ResponseTemplate,
};

const DUMMY_URL: &str = "http://localhost:1234";
Expand All @@ -913,6 +916,23 @@ pub mod tests {
pub last_message_emitted: tokio::sync::mpsc::Sender<SenderAccountMessage>,
}

async fn mock_escrow_subgraph() -> (MockServer, MockGuard) {
let mock_ecrow_subgraph_server: MockServer = MockServer::start().await;
let _mock_ecrow_subgraph = mock_ecrow_subgraph_server
.register_as_scoped(
Mock::given(method("POST"))
.and(body_string_contains("TapTransactions"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({ "data": {
"transactions": [{
"id": "0x00224ee6ad4ae77b817b4e509dc29d644da9004ad0c44005a7f34481d421256409000000"
}],
}
}))),
)
.await;
(mock_ecrow_subgraph_server, _mock_ecrow_subgraph)
}

#[async_trait::async_trait]
impl Actor for MockSenderAccount {
type Msg = SenderAccountMessage;
Expand Down Expand Up @@ -1026,6 +1046,7 @@ pub mod tests {

#[sqlx::test(migrations = "../migrations")]
async fn should_update_unaggregated_fees_on_start(pgpool: PgPool) {
let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
let (mut last_message_emitted, sender_account, _join_handle) =
create_mock_sender_account().await;
// Add receipts to the database.
Expand All @@ -1039,7 +1060,7 @@ pub mod tests {
let sender_allocation = create_sender_allocation(
pgpool.clone(),
DUMMY_URL.to_string(),
DUMMY_URL,
&mock_escrow_subgraph_server.uri(),
Some(sender_account),
)
.await;
Expand Down Expand Up @@ -1069,6 +1090,7 @@ pub mod tests {

#[sqlx::test(migrations = "../migrations")]
async fn should_return_invalid_receipts_on_startup(pgpool: PgPool) {
let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
let (mut message_receiver, sender_account, _join_handle) =
create_mock_sender_account().await;
// Add receipts to the database.
Expand All @@ -1082,7 +1104,7 @@ pub mod tests {
let sender_allocation = create_sender_allocation(
pgpool.clone(),
DUMMY_URL.to_string(),
DUMMY_URL,
&mock_escrow_subgraph_server.uri(),
Some(sender_account),
)
.await;
Expand Down Expand Up @@ -1120,13 +1142,14 @@ pub mod tests {

#[sqlx::test(migrations = "../migrations")]
async fn test_receive_new_receipt(pgpool: PgPool) {
let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
let (mut message_receiver, sender_account, _join_handle) =
create_mock_sender_account().await;

let sender_allocation = create_sender_allocation(
pgpool.clone(),
DUMMY_URL.to_string(),
DUMMY_URL,
&mock_escrow_subgraph_server.uri(),
Some(sender_account),
)
.await;
Expand Down Expand Up @@ -1291,14 +1314,15 @@ pub mod tests {

#[sqlx::test(migrations = "../migrations")]
async fn test_close_allocation_no_pending_fees(pgpool: PgPool) {
let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
let (mut message_receiver, sender_account, _join_handle) =
create_mock_sender_account().await;

// create allocation
let sender_allocation = create_sender_allocation(
pgpool.clone(),
DUMMY_URL.to_string(),
DUMMY_URL,
&mock_escrow_subgraph_server.uri(),
Some(sender_account),
)
.await;
Expand Down Expand Up @@ -1412,9 +1436,14 @@ pub mod tests {

#[sqlx::test(migrations = "../migrations")]
async fn should_return_unaggregated_fees_without_rav(pgpool: PgPool) {
let args =
create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None)
.await;
let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
let args = create_sender_allocation_args(
pgpool.clone(),
DUMMY_URL.to_string(),
&mock_escrow_subgraph_server.uri(),
None,
)
.await;
let state = SenderAllocationState::new(args).await.unwrap();

// Add receipts to the database.
Expand All @@ -1434,9 +1463,14 @@ pub mod tests {

#[sqlx::test(migrations = "../migrations")]
async fn should_calculate_invalid_receipts_fee(pgpool: PgPool) {
let args =
create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None)
.await;
let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
let args = create_sender_allocation_args(
pgpool.clone(),
DUMMY_URL.to_string(),
&mock_escrow_subgraph_server.uri(),
None,
)
.await;
let state = SenderAllocationState::new(args).await.unwrap();

// Add receipts to the database.
Expand All @@ -1462,9 +1496,14 @@ pub mod tests {
/// than the RAV's timestamp.
#[sqlx::test(migrations = "../migrations")]
async fn should_return_unaggregated_fees_with_rav(pgpool: PgPool) {
let args =
create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None)
.await;
let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
let args = create_sender_allocation_args(
pgpool.clone(),
DUMMY_URL.to_string(),
&mock_escrow_subgraph_server.uri(),
None,
)
.await;
let state = SenderAllocationState::new(args).await.unwrap();

// Add the RAV to the database.
Expand All @@ -1489,9 +1528,14 @@ pub mod tests {

#[sqlx::test(migrations = "../migrations")]
async fn test_store_failed_rav(pgpool: PgPool) {
let args =
create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None)
.await;
let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
let args = create_sender_allocation_args(
pgpool.clone(),
DUMMY_URL.to_string(),
&mock_escrow_subgraph_server.uri(),
None,
)
.await;
let state = SenderAllocationState::new(args).await.unwrap();

let signed_rav = create_rav(*ALLOCATION_ID_0, SIGNER.0.clone(), 4, 10);
Expand Down Expand Up @@ -1519,9 +1563,14 @@ pub mod tests {
}
}

let args =
create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None)
.await;
let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
let args = create_sender_allocation_args(
pgpool.clone(),
DUMMY_URL.to_string(),
&mock_escrow_subgraph_server.uri(),
None,
)
.await;
let mut state = SenderAllocationState::new(args).await.unwrap();

let checks = CheckList::new(vec![Arc::new(FailingCheck)]);
Expand Down Expand Up @@ -1553,12 +1602,17 @@ pub mod tests {

#[sqlx::test(migrations = "../migrations")]
async fn test_mark_rav_last(pgpool: PgPool) {
let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
let signed_rav = create_rav(*ALLOCATION_ID_0, SIGNER.0.clone(), 4, 10);
store_rav(&pgpool, signed_rav, SENDER.1).await.unwrap();

let args =
create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None)
.await;
let args = create_sender_allocation_args(
pgpool.clone(),
DUMMY_URL.to_string(),
&mock_escrow_subgraph_server.uri(),
None,
)
.await;
let state = SenderAllocationState::new(args).await.unwrap();

// mark rav as final
Expand All @@ -1570,6 +1624,8 @@ pub mod tests {

#[sqlx::test(migrations = "../migrations")]
async fn test_failed_rav_request(pgpool: PgPool) {
let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;

// Add receipts to the database.
for i in 0..10 {
let receipt =
Expand All @@ -1586,7 +1642,7 @@ pub mod tests {
let sender_allocation = create_sender_allocation(
pgpool.clone(),
DUMMY_URL.to_string(),
DUMMY_URL,
&mock_escrow_subgraph_server.uri(),
Some(sender_account),
)
.await;
Expand Down
Loading

0 comments on commit 4d6ee1c

Please sign in to comment.