Skip to content

Commit

Permalink
Renamed manager to account
Browse files Browse the repository at this point in the history
Signed-off-by: Alexis Asseman <[email protected]>
  • Loading branch information
aasseman committed Oct 26, 2023
1 parent 500bc33 commit 22def75
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 97 deletions.
4 changes: 2 additions & 2 deletions tap_agent/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use indexer_common::prelude::{
escrow_accounts, indexer_allocations, DeploymentDetails, SubgraphClient,
};

use crate::{aggregator_endpoints, config, database, tap::managers};
use crate::{aggregator_endpoints, config, database, tap::accounts_manager};

pub async fn start_agent(config: &'static config::Cli) {
let pgpool = database::connect(&config.postgres).await;
Expand Down Expand Up @@ -78,7 +78,7 @@ pub async fn start_agent(config: &'static config::Cli) {
verifying_contract: config.receipts.receipts_verifier_address,
};

let _managers = managers::TapManagers::new(
let _accounts_manager = accounts_manager::AccountsManager::new(
config,
pgpool,
indexer_allocations,
Expand Down
98 changes: 53 additions & 45 deletions tap_agent/src/tap/manager.rs → tap_agent/src/tap/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use tokio::{
};
use tracing::{error, warn};

use super::managers::NewReceiptNotification;
use super::accounts_manager::NewReceiptNotification;
use crate::{
config::{self},
tap::{
Expand Down Expand Up @@ -64,12 +64,20 @@ struct Inner {
config: &'static config::Cli,
}

pub struct Manager {
/// An Account is the relationship between the allocation and the sender in the context of a single
/// allocation.
///
/// Manages the lifecycle of Scalar TAP for the Account, including:
/// - Monitoring new receipts and keeping track of the unaggregated fees.
/// - Requesting RAVs from the sender's TAP aggregator once the unaggregated fees reach a certain
/// threshold.
/// - Requesting the last RAV from the sender's TAP aggregator (on Account EOL)
pub struct Account {
inner: Arc<Inner>,
rav_requester_task: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
}

impl Manager {
impl Account {
pub fn new(
config: &'static config::Cli,
pgpool: PgPool,
Expand Down Expand Up @@ -402,8 +410,8 @@ impl Manager {
}
}

impl Drop for Manager {
/// Trying to make sure the RAV requester task is dropped when the manager is dropped.
impl Drop for Account {
/// Trying to make sure the RAV requester task is dropped when the account is dropped.
fn drop(&mut self) {
let rav_requester_task = self.rav_requester_task.clone();

Expand Down Expand Up @@ -436,11 +444,11 @@ mod tests {

const DUMMY_URL: &str = "http://localhost:1234";

async fn create_manager(
async fn create_account(
pgpool: PgPool,
sender_aggregator_endpoint: String,
escrow_subgraph_endpoint: &str,
) -> Manager {
) -> Account {
let config = Box::leak(Box::new(config::Cli {
config: None,
ethereum: config::Ethereum {
Expand All @@ -467,7 +475,7 @@ mod tests {
Eventual::<HashMap<Address, U256>>::new();
escrow_accounts_writer.write(HashMap::from([(SENDER.1, 1000.into())]));

Manager::new(
Account::new(
config,
pgpool.clone(),
*ALLOCATION_ID,
Expand All @@ -479,13 +487,13 @@ mod tests {
)
}

/// Test that the manager correctly updates the unaggregated fees from the database when there
/// Test that the account correctly updates the unaggregated fees from the database when there
/// is no RAV in the database.
///
/// The manager should consider all receipts found for the allocation and sender.
/// The account should consider all receipts found for the allocation and sender.
#[sqlx::test(migrations = "../migrations")]
async fn test_update_unaggregated_fees_no_rav(pgpool: PgPool) {
let manager = create_manager(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL).await;
let account = create_account(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL).await;

// Add receipts to the database.
for i in 1..10 {
Expand All @@ -496,24 +504,24 @@ mod tests {
.unwrap();
}

// Let the manager update the unaggregated fees from the database.
manager.update_unaggregated_fees().await.unwrap();
// Let the account update the unaggregated fees from the database.
account.update_unaggregated_fees().await.unwrap();

// Check that the unaggregated fees are correct.
assert_eq!(manager.inner.unaggregated_fees.lock().await.value, 45u128);
assert_eq!(account.inner.unaggregated_fees.lock().await.value, 45u128);
}

/// Test that the manager correctly updates the unaggregated fees from the database when there
/// Test that the account correctly updates the unaggregated fees from the database when there
/// is a RAV in the database as well as receipts which timestamp are lesser and greater than
/// the RAV's timestamp.
///
/// The manager should only consider receipts with a timestamp greater than the RAV's timestamp.
/// The account should only consider receipts with a timestamp greater than the RAV's timestamp.
#[sqlx::test(migrations = "../migrations")]
async fn test_update_unaggregated_fees_with_rav(pgpool: PgPool) {
let manager = create_manager(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL).await;
let account = create_account(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL).await;

// Add the RAV to the database.
// This RAV has timestamp 4. The Manager should only consider receipts with a timestamp
// This RAV has timestamp 4. The Account should only consider receipts with a timestamp
// greater than 4.
let signed_rav = create_rav(*ALLOCATION_ID, SENDER.0.clone(), 4, 10).await;
store_rav(&pgpool, signed_rav, SENDER.1).await.unwrap();
Expand All @@ -527,18 +535,18 @@ mod tests {
.unwrap();
}

// Let the manager update the unaggregated fees from the database.
manager.update_unaggregated_fees().await.unwrap();
// Let the account update the unaggregated fees from the database.
account.update_unaggregated_fees().await.unwrap();

// Check that the unaggregated fees are correct.
assert_eq!(manager.inner.unaggregated_fees.lock().await.value, 35u128);
assert_eq!(account.inner.unaggregated_fees.lock().await.value, 35u128);
}

/// Test that the manager correctly ignores new receipt notifications with an ID lower than
/// Test that the account correctly ignores new receipt notifications with an ID lower than
/// the last receipt ID processed (be it from the DB or from a prior receipt notification).
#[sqlx::test(migrations = "../migrations")]
async fn test_handle_new_receipt_notification(pgpool: PgPool) {
let manager = create_manager(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL).await;
let account = create_account(pgpool.clone(), DUMMY_URL.to_string(), DUMMY_URL).await;

// Add receipts to the database.
let mut expected_unaggregated_fees = 0u128;
Expand All @@ -551,11 +559,11 @@ mod tests {
expected_unaggregated_fees += u128::from(i);
}

manager.update_unaggregated_fees().await.unwrap();
account.update_unaggregated_fees().await.unwrap();

// Check that the unaggregated fees are correct.
assert_eq!(
manager.inner.unaggregated_fees.lock().await.value,
account.inner.unaggregated_fees.lock().await.value,
expected_unaggregated_fees
);

Expand All @@ -569,13 +577,13 @@ mod tests {
timestamp_ns: 19,
value: 19,
};
manager
account
.handle_new_receipt_notification(new_receipt_notification)
.await;

// Check that the unaggregated fees have *not* increased.
assert_eq!(
manager.inner.unaggregated_fees.lock().await.value,
account.inner.unaggregated_fees.lock().await.value,
expected_unaggregated_fees
);

Expand All @@ -587,14 +595,14 @@ mod tests {
timestamp_ns: 20,
value: 20,
};
manager
account
.handle_new_receipt_notification(new_receipt_notification)
.await;
expected_unaggregated_fees += 20;

// Check that the unaggregated fees are correct.
assert_eq!(
manager.inner.unaggregated_fees.lock().await.value,
account.inner.unaggregated_fees.lock().await.value,
expected_unaggregated_fees
);

Expand All @@ -606,13 +614,13 @@ mod tests {
timestamp_ns: 19,
value: 19,
};
manager
account
.handle_new_receipt_notification(new_receipt_notification)
.await;

// Check that the unaggregated fees have *not* increased.
assert_eq!(
manager.inner.unaggregated_fees.lock().await.value,
account.inner.unaggregated_fees.lock().await.value,
expected_unaggregated_fees
);
}
Expand Down Expand Up @@ -646,8 +654,8 @@ mod tests {
)
.await;

// Create a manager.
let manager = create_manager(
// Create an account.
let account = create_account(
pgpool.clone(),
"http://".to_owned() + &aggregator_endpoint.to_string(),
&mock_server.uri(),
Expand All @@ -663,11 +671,11 @@ mod tests {
.unwrap();
}

// Let the manager update the unaggregated fees from the database.
manager.update_unaggregated_fees().await.unwrap();
// Let the account update the unaggregated fees from the database.
account.update_unaggregated_fees().await.unwrap();

// Trigger a RAV request manually.
Manager::rav_requester_try(&manager.inner).await.unwrap();
Account::rav_requester_try(&account.inner).await.unwrap();

// Stop the TAP aggregator server.
handle.stop().unwrap();
Expand Down Expand Up @@ -703,8 +711,8 @@ mod tests {
)
.await;

// Create a manager.
let manager = create_manager(
// Create an account.
let account = create_account(
pgpool.clone(),
"http://".to_owned() + &aggregator_endpoint.to_string(),
&mock_server.uri(),
Expand All @@ -725,7 +733,7 @@ mod tests {
store_receipt(&pgpool, receipt.signed_receipt())
.await
.unwrap();
manager
account
.handle_new_receipt_notification(NewReceiptNotification {
allocation_id: *ALLOCATION_ID,
sender_address: SENDER.1,
Expand All @@ -742,7 +750,7 @@ mod tests {
}

// Wait for the RAV requester to finish.
while Manager::rav_requester_task_is_running(&manager.rav_requester_task.lock().await) {
while Account::rav_requester_task_is_running(&account.rav_requester_task.lock().await) {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}

Expand Down Expand Up @@ -773,10 +781,10 @@ mod tests {
assert!(latest_rav.message.value_aggregate >= trigger_value);

// Check that the unaggregated fees value is reduced.
assert!(manager.inner.unaggregated_fees.lock().await.value <= trigger_value);
assert!(account.inner.unaggregated_fees.lock().await.value <= trigger_value);

// Reset the total value and trigger value.
total_value = manager.inner.unaggregated_fees.lock().await.value;
total_value = account.inner.unaggregated_fees.lock().await.value;
trigger_value = 0;

// Add more receipts
Expand All @@ -789,7 +797,7 @@ mod tests {
.await
.unwrap();

manager
account
.handle_new_receipt_notification(NewReceiptNotification {
allocation_id: *ALLOCATION_ID,
sender_address: SENDER.1,
Expand All @@ -806,7 +814,7 @@ mod tests {
}

// Wait for the RAV requester to finish.
while Manager::rav_requester_task_is_running(&manager.rav_requester_task.lock().await) {
while Account::rav_requester_task_is_running(&account.rav_requester_task.lock().await) {
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
}

Expand Down Expand Up @@ -838,7 +846,7 @@ mod tests {
assert!(latest_rav.message.value_aggregate >= trigger_value);

// Check that the unaggregated fees value is reduced.
assert!(manager.inner.unaggregated_fees.lock().await.value <= trigger_value);
assert!(account.inner.unaggregated_fees.lock().await.value <= trigger_value);

// Stop the TAP aggregator server.
handle.stop().unwrap();
Expand Down
Loading

0 comments on commit 22def75

Please sign in to comment.