Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: use tokio::watch for allocation id check #443

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions tap-agent/src/agent/sender_account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1059,7 +1059,7 @@ pub mod tests {
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::watch;
use wiremock::matchers::{body_string_contains, method};
use wiremock::{Mock, MockServer, ResponseTemplate};
use wiremock::{Mock, MockGuard, MockServer, ResponseTemplate};

// we implement the PartialEq and Eq traits for SenderAccountMessage to be able to compare
impl Eq for SenderAccountMessage {}
Expand Down Expand Up @@ -1110,6 +1110,23 @@ pub mod tests {
const BUFFER_MS: u64 = 100;
const RECEIPT_LIMIT: u64 = 10000;

async fn mock_escrow_subgraph() -> (MockServer, MockGuard) {
let mock_ecrow_subgraph_server: MockServer = MockServer::start().await;
let _mock_ecrow_subgraph = mock_ecrow_subgraph_server
.register_as_scoped(
Mock::given(method("POST"))
.and(body_string_contains("TapTransactions"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({ "data": {
"transactions": [{
"id": "0x00224ee6ad4ae77b817b4e509dc29d644da9004ad0c44005a7f34481d421256409000000"
}],
}
}))),
)
.await;
(mock_ecrow_subgraph_server, _mock_ecrow_subgraph)
}

async fn create_sender_account(
pgpool: PgPool,
initial_allocation: HashSet<Address>,
Expand Down Expand Up @@ -1201,12 +1218,14 @@ pub mod tests {
)
.await;

let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;

let (sender_account, handle, prefix, _) = create_sender_account(
pgpool,
HashSet::new(),
TRIGGER_VALUE,
TRIGGER_VALUE,
DUMMY_URL,
&mock_escrow_subgraph_server.uri(),
&mock_server.uri(),
RECEIPT_LIMIT,
)
Expand Down Expand Up @@ -1295,12 +1314,14 @@ pub mod tests {
)
.await;

let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;

let (sender_account, handle, prefix, _) = create_sender_account(
pgpool,
HashSet::new(),
TRIGGER_VALUE,
TRIGGER_VALUE,
DUMMY_URL,
&mock_escrow_subgraph_server.uri(),
&mock_server.uri(),
RECEIPT_LIMIT,
)
Expand Down Expand Up @@ -1711,12 +1732,13 @@ pub mod tests {

#[sqlx::test(migrations = "../migrations")]
async fn test_remove_sender_account(pgpool: PgPool) {
let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
let (sender_account, handle, prefix, _) = create_sender_account(
pgpool,
vec![*ALLOCATION_ID_0].into_iter().collect(),
TRIGGER_VALUE,
TRIGGER_VALUE,
DUMMY_URL,
&mock_escrow_subgraph_server.uri(),
DUMMY_URL,
RECEIPT_LIMIT,
)
Expand Down
118 changes: 87 additions & 31 deletions tap-agent/src/agent/sender_allocation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,13 +350,16 @@ impl SenderAllocationState {
}: SenderAllocationArgs,
) -> anyhow::Result<Self> {
let required_checks: Vec<Arc<dyn Check + Send + Sync>> = vec![
Arc::new(AllocationId::new(
config.indexer_address,
config.escrow_polling_interval,
sender,
allocation_id,
escrow_subgraph,
)),
Arc::new(
AllocationId::new(
config.indexer_address,
config.escrow_polling_interval,
sender,
allocation_id,
escrow_subgraph,
)
.await,
),
Arc::new(Signature::new(
domain_separator.clone(),
escrow_accounts.clone(),
Expand Down Expand Up @@ -904,7 +907,7 @@ pub mod tests {
use tokio::sync::mpsc;
use wiremock::{
matchers::{body_string_contains, method},
Mock, MockServer, Respond, ResponseTemplate,
Mock, MockGuard, MockServer, Respond, ResponseTemplate,
};

const DUMMY_URL: &str = "http://localhost:1234";
Expand All @@ -913,6 +916,23 @@ pub mod tests {
pub last_message_emitted: tokio::sync::mpsc::Sender<SenderAccountMessage>,
}

async fn mock_escrow_subgraph() -> (MockServer, MockGuard) {
let mock_ecrow_subgraph_server: MockServer = MockServer::start().await;
let _mock_ecrow_subgraph = mock_ecrow_subgraph_server
.register_as_scoped(
Mock::given(method("POST"))
.and(body_string_contains("TapTransactions"))
.respond_with(ResponseTemplate::new(200).set_body_json(json!({ "data": {
"transactions": [{
"id": "0x00224ee6ad4ae77b817b4e509dc29d644da9004ad0c44005a7f34481d421256409000000"
}],
}
}))),
)
.await;
(mock_ecrow_subgraph_server, _mock_ecrow_subgraph)
}

#[async_trait::async_trait]
impl Actor for MockSenderAccount {
type Msg = SenderAccountMessage;
Expand Down Expand Up @@ -1026,6 +1046,7 @@ pub mod tests {

#[sqlx::test(migrations = "../migrations")]
async fn should_update_unaggregated_fees_on_start(pgpool: PgPool) {
let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
let (mut last_message_emitted, sender_account, _join_handle) =
create_mock_sender_account().await;
// Add receipts to the database.
Expand All @@ -1039,7 +1060,7 @@ pub mod tests {
let sender_allocation = create_sender_allocation(
pgpool.clone(),
DUMMY_URL.to_string(),
DUMMY_URL,
&mock_escrow_subgraph_server.uri(),
Some(sender_account),
)
.await;
Expand Down Expand Up @@ -1069,6 +1090,7 @@ pub mod tests {

#[sqlx::test(migrations = "../migrations")]
async fn should_return_invalid_receipts_on_startup(pgpool: PgPool) {
let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
let (mut message_receiver, sender_account, _join_handle) =
create_mock_sender_account().await;
// Add receipts to the database.
Expand All @@ -1082,7 +1104,7 @@ pub mod tests {
let sender_allocation = create_sender_allocation(
pgpool.clone(),
DUMMY_URL.to_string(),
DUMMY_URL,
&mock_escrow_subgraph_server.uri(),
Some(sender_account),
)
.await;
Expand Down Expand Up @@ -1120,13 +1142,14 @@ pub mod tests {

#[sqlx::test(migrations = "../migrations")]
async fn test_receive_new_receipt(pgpool: PgPool) {
let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
let (mut message_receiver, sender_account, _join_handle) =
create_mock_sender_account().await;

let sender_allocation = create_sender_allocation(
pgpool.clone(),
DUMMY_URL.to_string(),
DUMMY_URL,
&mock_escrow_subgraph_server.uri(),
Some(sender_account),
)
.await;
Expand Down Expand Up @@ -1291,14 +1314,15 @@ pub mod tests {

#[sqlx::test(migrations = "../migrations")]
async fn test_close_allocation_no_pending_fees(pgpool: PgPool) {
let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
let (mut message_receiver, sender_account, _join_handle) =
create_mock_sender_account().await;

// create allocation
let sender_allocation = create_sender_allocation(
pgpool.clone(),
DUMMY_URL.to_string(),
DUMMY_URL,
&mock_escrow_subgraph_server.uri(),
Some(sender_account),
)
.await;
Expand Down Expand Up @@ -1412,9 +1436,14 @@ pub mod tests {

#[sqlx::test(migrations = "../migrations")]
async fn should_return_unaggregated_fees_without_rav(pgpool: PgPool) {
let args =
create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None)
.await;
let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
let args = create_sender_allocation_args(
pgpool.clone(),
DUMMY_URL.to_string(),
&mock_escrow_subgraph_server.uri(),
None,
)
.await;
let state = SenderAllocationState::new(args).await.unwrap();

// Add receipts to the database.
Expand All @@ -1434,9 +1463,14 @@ pub mod tests {

#[sqlx::test(migrations = "../migrations")]
async fn should_calculate_invalid_receipts_fee(pgpool: PgPool) {
let args =
create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None)
.await;
let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
let args = create_sender_allocation_args(
pgpool.clone(),
DUMMY_URL.to_string(),
&mock_escrow_subgraph_server.uri(),
None,
)
.await;
let state = SenderAllocationState::new(args).await.unwrap();

// Add receipts to the database.
Expand All @@ -1462,9 +1496,14 @@ pub mod tests {
/// than the RAV's timestamp.
#[sqlx::test(migrations = "../migrations")]
async fn should_return_unaggregated_fees_with_rav(pgpool: PgPool) {
let args =
create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None)
.await;
let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
let args = create_sender_allocation_args(
pgpool.clone(),
DUMMY_URL.to_string(),
&mock_escrow_subgraph_server.uri(),
None,
)
.await;
let state = SenderAllocationState::new(args).await.unwrap();

// Add the RAV to the database.
Expand All @@ -1489,9 +1528,14 @@ pub mod tests {

#[sqlx::test(migrations = "../migrations")]
async fn test_store_failed_rav(pgpool: PgPool) {
let args =
create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None)
.await;
let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
let args = create_sender_allocation_args(
pgpool.clone(),
DUMMY_URL.to_string(),
&mock_escrow_subgraph_server.uri(),
None,
)
.await;
let state = SenderAllocationState::new(args).await.unwrap();

let signed_rav = create_rav(*ALLOCATION_ID_0, SIGNER.0.clone(), 4, 10);
Expand Down Expand Up @@ -1519,9 +1563,14 @@ pub mod tests {
}
}

let args =
create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None)
.await;
let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
let args = create_sender_allocation_args(
pgpool.clone(),
DUMMY_URL.to_string(),
&mock_escrow_subgraph_server.uri(),
None,
)
.await;
let mut state = SenderAllocationState::new(args).await.unwrap();

let checks = CheckList::new(vec![Arc::new(FailingCheck)]);
Expand Down Expand Up @@ -1553,12 +1602,17 @@ pub mod tests {

#[sqlx::test(migrations = "../migrations")]
async fn test_mark_rav_last(pgpool: PgPool) {
let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;
let signed_rav = create_rav(*ALLOCATION_ID_0, SIGNER.0.clone(), 4, 10);
store_rav(&pgpool, signed_rav, SENDER.1).await.unwrap();

let args =
create_sender_allocation_args(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL, None)
.await;
let args = create_sender_allocation_args(
pgpool.clone(),
DUMMY_URL.to_string(),
&mock_escrow_subgraph_server.uri(),
None,
)
.await;
let state = SenderAllocationState::new(args).await.unwrap();

// mark rav as final
Expand All @@ -1570,6 +1624,8 @@ pub mod tests {

#[sqlx::test(migrations = "../migrations")]
async fn test_failed_rav_request(pgpool: PgPool) {
let (mock_escrow_subgraph_server, _mock_ecrow_subgraph) = mock_escrow_subgraph().await;

// Add receipts to the database.
for i in 0..10 {
let receipt =
Expand All @@ -1586,7 +1642,7 @@ pub mod tests {
let sender_allocation = create_sender_allocation(
pgpool.clone(),
DUMMY_URL.to_string(),
DUMMY_URL,
&mock_escrow_subgraph_server.uri(),
Some(sender_account),
)
.await;
Expand Down
Loading
Loading