From f8bca98f17220492c5f87210337bccbcecc053ad Mon Sep 17 00:00:00 2001 From: Alexis Asseman Date: Fri, 12 Jan 2024 15:49:10 -0800 Subject: [PATCH 1/6] fix: support sender signers Signed-off-by: Alexis Asseman --- ...be6e47ee6452751af47df3ad6907d163fc65.json} | 4 +- ...f7f8b921e2316f1137476ed2e5265cba5e1b.json} | 4 +- ...49fc07692a008daf16f0f5530f5bc4ed6cae.json} | 4 +- ...8b200386f4eb7cf441c43d5e9f899f408cc49.json | 16 ++ ...19ea4591595cf92516ccd36bf7a4461e53b6.json} | 6 +- ...dad6228922688b46a295548ad605e81bfc49.json} | 7 +- ...1b73bfc863f0fa48004a4d5bd39cd97b07bb.json} | 6 +- ...f5c57d44af2722c379e4528596e9b041242d4.json | 16 -- common/src/escrow_accounts.rs | 179 ++++++++++++++---- .../indexer_service/http/indexer_service.rs | 1 + common/src/tap_manager.rs | 34 ++-- common/src/test_vectors.rs | 83 +++++++- migrations/20230912220523_tap_receipts.up.sql | 6 +- tap-agent/src/agent.rs | 1 + tap-agent/src/tap/escrow_adapter.rs | 99 +++++++--- tap-agent/src/tap/mod.rs | 23 +++ tap-agent/src/tap/rav_storage_adapter.rs | 6 +- tap-agent/src/tap/receipt_checks_adapter.rs | 10 +- tap-agent/src/tap/receipt_storage_adapter.rs | 110 ++++++++--- .../src/tap/sender_allocation_relationship.rs | 63 +++--- ...sender_allocation_relationships_manager.rs | 91 ++++++--- tap-agent/src/tap/test_utils.rs | 13 +- 22 files changed, 573 insertions(+), 209 deletions(-) rename .sqlx/{query-be914423c719351af8ec785671d8446de1cc6b352f33825ebc4ae2de03da4263.json => query-0245e3c8b1c93a2a4f86e2a7e684be6e47ee6452751af47df3ad6907d163fc65.json} (71%) rename .sqlx/{query-1fdec6cc247605be5d1991db42d0a64bf03831f535c6f8766f9ebea7b26d18dc.json => query-0d0b4c9a450ef82c4fdd5903afc6f7f8b921e2316f1137476ed2e5265cba5e1b.json} (78%) rename .sqlx/{query-cbf8955f0b6bd355b56b448497abcf6325e1ee9a10e5be9d8cbc919fbb8c87f7.json => query-21911049356b0593c99c8c71645f49fc07692a008daf16f0f5530f5bc4ed6cae.json} (78%) create mode 100644 .sqlx/query-2cfcdd0b2aca57b1d0b4c54aef18b200386f4eb7cf441c43d5e9f899f408cc49.json rename .sqlx/{query-aede274abf6f8fabe510f7765fc4315bab48aeb9ddd1ed80486f22511c39c92e.json => query-40d9eaa41f7e38e91e850a6d77d619ea4591595cf92516ccd36bf7a4461e53b6.json} (70%) rename .sqlx/{query-6d2f5eecfd846d8f1e2db87e1a79c73af715e64ea63132d4768731b222ad672b.json => query-47f757bea4815b78fca6bc9b20a1dad6228922688b46a295548ad605e81bfc49.json} (59%) rename .sqlx/{query-f0569836ad31081ca3cf7406ef5d397ad89619d6e111741254864e3a1eaeaa7a.json => query-778a427621acd2003b94340e46df1b73bfc863f0fa48004a4d5bd39cd97b07bb.json} (59%) delete mode 100644 .sqlx/query-c3e88c5a56db17eb8e1f8056c58f5c57d44af2722c379e4528596e9b041242d4.json diff --git a/.sqlx/query-be914423c719351af8ec785671d8446de1cc6b352f33825ebc4ae2de03da4263.json b/.sqlx/query-0245e3c8b1c93a2a4f86e2a7e684be6e47ee6452751af47df3ad6907d163fc65.json similarity index 71% rename from .sqlx/query-be914423c719351af8ec785671d8446de1cc6b352f33825ebc4ae2de03da4263.json rename to .sqlx/query-0245e3c8b1c93a2a4f86e2a7e684be6e47ee6452751af47df3ad6907d163fc65.json index 530511f1..29a15220 100644 --- a/.sqlx/query-be914423c719351af8ec785671d8446de1cc6b352f33825ebc4ae2de03da4263.json +++ b/.sqlx/query-0245e3c8b1c93a2a4f86e2a7e684be6e47ee6452751af47df3ad6907d163fc65.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n INSERT INTO scalar_tap_receipts (allocation_id, sender_address, timestamp_ns, value, receipt)\n VALUES ($1, $2, $3, $4, $5)\n ", + "query": "\n INSERT INTO scalar_tap_receipts (allocation_id, signer_address, timestamp_ns, value, receipt)\n VALUES ($1, $2, $3, $4, $5)\n ", "describe": { "columns": [], "parameters": { @@ -14,5 +14,5 @@ }, "nullable": [] }, - "hash": "be914423c719351af8ec785671d8446de1cc6b352f33825ebc4ae2de03da4263" + "hash": "0245e3c8b1c93a2a4f86e2a7e684be6e47ee6452751af47df3ad6907d163fc65" } diff --git a/.sqlx/query-1fdec6cc247605be5d1991db42d0a64bf03831f535c6f8766f9ebea7b26d18dc.json b/.sqlx/query-0d0b4c9a450ef82c4fdd5903afc6f7f8b921e2316f1137476ed2e5265cba5e1b.json similarity index 78% rename from .sqlx/query-1fdec6cc247605be5d1991db42d0a64bf03831f535c6f8766f9ebea7b26d18dc.json rename to .sqlx/query-0d0b4c9a450ef82c4fdd5903afc6f7f8b921e2316f1137476ed2e5265cba5e1b.json index 47709ffb..7fb87cc8 100644 --- a/.sqlx/query-1fdec6cc247605be5d1991db42d0a64bf03831f535c6f8766f9ebea7b26d18dc.json +++ b/.sqlx/query-0d0b4c9a450ef82c4fdd5903afc6f7f8b921e2316f1137476ed2e5265cba5e1b.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n INSERT INTO scalar_tap_receipts (\n allocation_id, sender_address, timestamp_ns, value, receipt\n )\n VALUES ($1, $2, $3, $4, $5)\n RETURNING id\n ", + "query": "\n INSERT INTO scalar_tap_receipts (\n allocation_id, signer_address, timestamp_ns, value, receipt\n )\n VALUES ($1, $2, $3, $4, $5)\n RETURNING id\n ", "describe": { "columns": [ { @@ -22,5 +22,5 @@ false ] }, - "hash": "1fdec6cc247605be5d1991db42d0a64bf03831f535c6f8766f9ebea7b26d18dc" + "hash": "0d0b4c9a450ef82c4fdd5903afc6f7f8b921e2316f1137476ed2e5265cba5e1b" } diff --git a/.sqlx/query-cbf8955f0b6bd355b56b448497abcf6325e1ee9a10e5be9d8cbc919fbb8c87f7.json b/.sqlx/query-21911049356b0593c99c8c71645f49fc07692a008daf16f0f5530f5bc4ed6cae.json similarity index 78% rename from .sqlx/query-cbf8955f0b6bd355b56b448497abcf6325e1ee9a10e5be9d8cbc919fbb8c87f7.json rename to .sqlx/query-21911049356b0593c99c8c71645f49fc07692a008daf16f0f5530f5bc4ed6cae.json index 83116833..dc9952a3 100644 --- a/.sqlx/query-cbf8955f0b6bd355b56b448497abcf6325e1ee9a10e5be9d8cbc919fbb8c87f7.json +++ b/.sqlx/query-21911049356b0593c99c8c71645f49fc07692a008daf16f0f5530f5bc4ed6cae.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n INSERT INTO scalar_tap_receipts_invalid (\n allocation_id,\n sender_address,\n timestamp_ns,\n value,\n received_receipt\n )\n VALUES ($1, $2, $3, $4, $5)\n ", + "query": "\n INSERT INTO scalar_tap_receipts_invalid (\n allocation_id,\n signer_address,\n timestamp_ns,\n value,\n received_receipt\n )\n VALUES ($1, $2, $3, $4, $5)\n ", "describe": { "columns": [], "parameters": { @@ -14,5 +14,5 @@ }, "nullable": [] }, - "hash": "cbf8955f0b6bd355b56b448497abcf6325e1ee9a10e5be9d8cbc919fbb8c87f7" + "hash": "21911049356b0593c99c8c71645f49fc07692a008daf16f0f5530f5bc4ed6cae" } diff --git a/.sqlx/query-2cfcdd0b2aca57b1d0b4c54aef18b200386f4eb7cf441c43d5e9f899f408cc49.json b/.sqlx/query-2cfcdd0b2aca57b1d0b4c54aef18b200386f4eb7cf441c43d5e9f899f408cc49.json new file mode 100644 index 00000000..b256905f --- /dev/null +++ b/.sqlx/query-2cfcdd0b2aca57b1d0b4c54aef18b200386f4eb7cf441c43d5e9f899f408cc49.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n DELETE FROM scalar_tap_receipts\n WHERE allocation_id = $1 AND signer_address IN (SELECT unnest($2::text[]))\n AND $3::numrange @> timestamp_ns\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Bpchar", + "TextArray", + "NumRange" + ] + }, + "nullable": [] + }, + "hash": "2cfcdd0b2aca57b1d0b4c54aef18b200386f4eb7cf441c43d5e9f899f408cc49" +} diff --git a/.sqlx/query-aede274abf6f8fabe510f7765fc4315bab48aeb9ddd1ed80486f22511c39c92e.json b/.sqlx/query-40d9eaa41f7e38e91e850a6d77d619ea4591595cf92516ccd36bf7a4461e53b6.json similarity index 70% rename from .sqlx/query-aede274abf6f8fabe510f7765fc4315bab48aeb9ddd1ed80486f22511c39c92e.json rename to .sqlx/query-40d9eaa41f7e38e91e850a6d77d619ea4591595cf92516ccd36bf7a4461e53b6.json index 07dfbd7b..3cf83c82 100644 --- a/.sqlx/query-aede274abf6f8fabe510f7765fc4315bab48aeb9ddd1ed80486f22511c39c92e.json +++ b/.sqlx/query-40d9eaa41f7e38e91e850a6d77d619ea4591595cf92516ccd36bf7a4461e53b6.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT id, receipt\n FROM scalar_tap_receipts\n WHERE allocation_id = $1 AND sender_address = $2 AND $3::numrange @> timestamp_ns\n ", + "query": "\n SELECT id, receipt\n FROM scalar_tap_receipts\n WHERE allocation_id = $1 AND signer_address IN (SELECT unnest($2::text[]))\n AND $3::numrange @> timestamp_ns\n ", "describe": { "columns": [ { @@ -17,7 +17,7 @@ "parameters": { "Left": [ "Bpchar", - "Bpchar", + "TextArray", "NumRange" ] }, @@ -26,5 +26,5 @@ false ] }, - "hash": "aede274abf6f8fabe510f7765fc4315bab48aeb9ddd1ed80486f22511c39c92e" + "hash": "40d9eaa41f7e38e91e850a6d77d619ea4591595cf92516ccd36bf7a4461e53b6" } diff --git a/.sqlx/query-6d2f5eecfd846d8f1e2db87e1a79c73af715e64ea63132d4768731b222ad672b.json b/.sqlx/query-47f757bea4815b78fca6bc9b20a1dad6228922688b46a295548ad605e81bfc49.json similarity index 59% rename from .sqlx/query-6d2f5eecfd846d8f1e2db87e1a79c73af715e64ea63132d4768731b222ad672b.json rename to .sqlx/query-47f757bea4815b78fca6bc9b20a1dad6228922688b46a295548ad605e81bfc49.json index e878a43f..caad4737 100644 --- a/.sqlx/query-6d2f5eecfd846d8f1e2db87e1a79c73af715e64ea63132d4768731b222ad672b.json +++ b/.sqlx/query-47f757bea4815b78fca6bc9b20a1dad6228922688b46a295548ad605e81bfc49.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n WITH rav AS (\n SELECT \n rav -> 'message' ->> 'timestamp_ns' AS timestamp_ns \n FROM \n scalar_tap_ravs \n WHERE \n allocation_id = $1 \n AND sender_address = $2\n ) \n SELECT \n MAX(id), \n SUM(value) \n FROM \n scalar_tap_receipts \n WHERE \n allocation_id = $1 \n AND sender_address = $2 \n AND CASE WHEN (\n SELECT \n timestamp_ns :: NUMERIC \n FROM \n rav\n ) IS NOT NULL THEN timestamp_ns > (\n SELECT \n timestamp_ns :: NUMERIC \n FROM \n rav\n ) ELSE TRUE END\n ", + "query": "\n WITH rav AS (\n SELECT \n rav -> 'message' ->> 'timestamp_ns' AS timestamp_ns \n FROM \n scalar_tap_ravs \n WHERE \n allocation_id = $1 \n AND sender_address = $2\n ) \n SELECT \n MAX(id), \n SUM(value) \n FROM \n scalar_tap_receipts \n WHERE \n allocation_id = $1 \n AND signer_address IN (SELECT unnest($3::text[]))\n AND CASE WHEN (\n SELECT \n timestamp_ns :: NUMERIC \n FROM \n rav\n ) IS NOT NULL THEN timestamp_ns > (\n SELECT \n timestamp_ns :: NUMERIC \n FROM \n rav\n ) ELSE TRUE END\n ", "describe": { "columns": [ { @@ -17,7 +17,8 @@ "parameters": { "Left": [ "Bpchar", - "Bpchar" + "Bpchar", + "TextArray" ] }, "nullable": [ @@ -25,5 +26,5 @@ null ] }, - "hash": "6d2f5eecfd846d8f1e2db87e1a79c73af715e64ea63132d4768731b222ad672b" + "hash": "47f757bea4815b78fca6bc9b20a1dad6228922688b46a295548ad605e81bfc49" } diff --git a/.sqlx/query-f0569836ad31081ca3cf7406ef5d397ad89619d6e111741254864e3a1eaeaa7a.json b/.sqlx/query-778a427621acd2003b94340e46df1b73bfc863f0fa48004a4d5bd39cd97b07bb.json similarity index 59% rename from .sqlx/query-f0569836ad31081ca3cf7406ef5d397ad89619d6e111741254864e3a1eaeaa7a.json rename to .sqlx/query-778a427621acd2003b94340e46df1b73bfc863f0fa48004a4d5bd39cd97b07bb.json index 1aed43b9..8d1a90b1 100644 --- a/.sqlx/query-f0569836ad31081ca3cf7406ef5d397ad89619d6e111741254864e3a1eaeaa7a.json +++ b/.sqlx/query-778a427621acd2003b94340e46df1b73bfc863f0fa48004a4d5bd39cd97b07bb.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT DISTINCT allocation_id, sender_address\n FROM scalar_tap_receipts\n ", + "query": "\n SELECT DISTINCT allocation_id, signer_address\n FROM scalar_tap_receipts\n ", "describe": { "columns": [ { @@ -10,7 +10,7 @@ }, { "ordinal": 1, - "name": "sender_address", + "name": "signer_address", "type_info": "Bpchar" } ], @@ -22,5 +22,5 @@ false ] }, - "hash": "f0569836ad31081ca3cf7406ef5d397ad89619d6e111741254864e3a1eaeaa7a" + "hash": "778a427621acd2003b94340e46df1b73bfc863f0fa48004a4d5bd39cd97b07bb" } diff --git a/.sqlx/query-c3e88c5a56db17eb8e1f8056c58f5c57d44af2722c379e4528596e9b041242d4.json b/.sqlx/query-c3e88c5a56db17eb8e1f8056c58f5c57d44af2722c379e4528596e9b041242d4.json deleted file mode 100644 index 7596f9d3..00000000 --- a/.sqlx/query-c3e88c5a56db17eb8e1f8056c58f5c57d44af2722c379e4528596e9b041242d4.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n DELETE FROM scalar_tap_receipts\n WHERE allocation_id = $1 AND sender_address = $2 AND $3::numrange @> timestamp_ns\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Bpchar", - "Bpchar", - "NumRange" - ] - }, - "nullable": [] - }, - "hash": "c3e88c5a56db17eb8e1f8056c58f5c57d44af2722c379e4528596e9b041242d4" -} diff --git a/common/src/escrow_accounts.rs b/common/src/escrow_accounts.rs index a7bcb4b1..d8067f79 100644 --- a/common/src/escrow_accounts.rs +++ b/common/src/escrow_accounts.rs @@ -13,18 +13,48 @@ use tracing::{error, warn}; use crate::prelude::{Query, SubgraphClient}; +#[derive(Clone, Debug, Default, PartialEq, Eq)] +pub struct EscrowAccounts { + pub balances: HashMap, + pub signers_to_senders: HashMap, + pub senders_to_signers: HashMap>, +} + +impl EscrowAccounts { + pub fn new( + balances: HashMap, + senders_to_signers: HashMap>, + ) -> Self { + let signers_to_senders = senders_to_signers + .iter() + .flat_map(|(sender, signers)| { + signers + .iter() + .map(move |signer| (*signer, *sender)) + .collect::>() + }) + .collect(); + + Self { + balances, + signers_to_senders, + senders_to_signers, + } + } +} + pub fn escrow_accounts( escrow_subgraph: &'static SubgraphClient, indexer_address: Address, interval: Duration, -) -> Eventual> { + reject_thawing_signers: bool, +) -> Eventual { // Types for deserializing the network subgraph response #[derive(Deserialize)] #[serde(rename_all = "camelCase")] struct EscrowAccountsResponse { escrow_accounts: Vec, } - // These 2 structs are used to deserialize the response from the escrow subgraph. // Note that U256's serde implementation is based on serializing the internal bytes, not the string decimal // representation. This is why we deserialize them as strings below. #[derive(Deserialize)] @@ -38,50 +68,109 @@ pub fn escrow_accounts( #[serde(rename_all = "camelCase")] struct Sender { id: Address, + authorized_signers: Vec, + } + #[derive(Deserialize)] + #[serde(rename_all = "camelCase")] + struct AuthorizedSigner { + id: Address, } + // thawEndTimestamp == 0 means that the signer is not thawing. This also means + // that we don't wait for the thawing period to end before stopping serving + // queries for this signer. + // isAuthorized == true means that the signer is still authorized to sign + // payments in the name of the sender. + let query_no_thawing_signers = r#" + query ($indexer: ID!) { + escrowAccounts(where: {receiver_: {id: $indexer}}) { + balance + totalAmountThawing + sender { + id + authorizedSigners( + where: {thawEndTimestamp: "0", isAuthorized: true} + ) { + id + } + } + } + } + "#; + + let query_with_thawing_signers = r#" + query ($indexer: ID!) { + escrowAccounts(where: {receiver_: {id: $indexer}}) { + balance + totalAmountThawing + sender { + id + authorizedSigners( + where: {isAuthorized: true} + ) { + id + } + } + } + } + "#; + + let query = if reject_thawing_signers { + query_no_thawing_signers + } else { + query_with_thawing_signers + }; + timer(interval).map_with_retry( move |_| async move { let response = escrow_subgraph .query::(Query::new_with_variables( - r#" - query ($indexer: ID!) { - escrowAccounts(where: {receiver_: {id: $indexer}}) { - balance - totalAmountThawing - sender { - id - } - } - } - "#, + query, [("indexer", format!("{:x?}", indexer_address).into())], )) .await .map_err(|e| e.to_string())?; - response.map_err(|e| e.to_string()).and_then(|data| { - data.escrow_accounts - .iter() - .map(|account| { - let balance = U256::checked_sub( - U256::from_dec_str(&account.balance)?, - U256::from_dec_str(&account.total_amount_thawing)?, - ) - .unwrap_or_else(|| { - warn!( - "Balance minus total amount thawing underflowed for account {}. \ + let response = response.map_err(|e| e.to_string())?; + + let balances = response + .escrow_accounts + .iter() + .map(|account| { + let balance = U256::checked_sub( + U256::from_dec_str(&account.balance)?, + U256::from_dec_str(&account.total_amount_thawing)?, + ) + .unwrap_or_else(|| { + warn!( + "Balance minus total amount thawing underflowed for account {}. \ Setting balance to 0, no queries will be served for this sender.", - account.sender.id - ); - U256::from(0) - }); - - Ok((account.sender.id, balance)) - }) - .collect::, anyhow::Error>>() - .map_err(|e| format!("{}", e)) - }) + account.sender.id + ); + U256::from(0) + }); + + Ok((account.sender.id, balance)) + }) + .collect::, anyhow::Error>>() + .map_err(|e| format!("{}", e))?; + + let senders_to_signers = response + .escrow_accounts + .iter() + .map(|account| { + let sender = account.sender.id; + let signers = account + .sender + .authorized_signers + .iter() + .map(|signer| signer.id) + .collect(); + (sender, signers) + }) + .collect(); + + Ok(EscrowAccounts::new(balances, senders_to_signers)) }, move |err: String| { error!( @@ -96,6 +185,7 @@ pub fn escrow_accounts( #[cfg(test)] mod tests { + use test_log::test; use wiremock::matchers::{method, path}; use wiremock::{Mock, MockServer, ResponseTemplate}; @@ -104,7 +194,20 @@ mod tests { use super::*; - #[tokio::test] + #[test] + fn test_new_escrow_accounts() { + let escrow_accounts = EscrowAccounts::new( + test_vectors::ESCROW_ACCOUNTS_BALANCES.to_owned(), + test_vectors::ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS.to_owned(), + ); + + assert_eq!( + escrow_accounts.signers_to_senders, + test_vectors::ESCROW_ACCOUNTS_SIGNERS_TO_SENDERS.to_owned() + ) + } + + #[test(tokio::test)] async fn test_current_accounts() { // Set up a mock escrow subgraph let mock_server = MockServer::start().await; @@ -134,11 +237,15 @@ mod tests { escrow_subgraph, *test_vectors::INDEXER_ADDRESS, Duration::from_secs(60), + true, ); assert_eq!( accounts.value().await.unwrap(), - *test_vectors::ESCROW_ACCOUNTS + EscrowAccounts::new( + test_vectors::ESCROW_ACCOUNTS_BALANCES.to_owned(), + test_vectors::ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS.to_owned(), + ) ); } } diff --git a/common/src/indexer_service/http/indexer_service.rs b/common/src/indexer_service/http/indexer_service.rs index 37b00183..671a74c9 100644 --- a/common/src/indexer_service/http/indexer_service.rs +++ b/common/src/indexer_service/http/indexer_service.rs @@ -262,6 +262,7 @@ impl IndexerService { escrow_subgraph, options.config.indexer.indexer_address, Duration::from_secs(options.config.escrow_subgraph.syncing_interval), + true, // Reject thawing signers eagerly ); // Establish Database connection necessary for serving indexer management diff --git a/common/src/tap_manager.rs b/common/src/tap_manager.rs index 42559b5a..07cb60b8 100644 --- a/common/src/tap_manager.rs +++ b/common/src/tap_manager.rs @@ -11,12 +11,12 @@ use std::{collections::HashMap, str::FromStr, sync::Arc}; use tap_core::tap_manager::SignedReceipt; use tracing::error; -use crate::prelude::Allocation; +use crate::{escrow_accounts::EscrowAccounts, prelude::Allocation}; #[derive(Clone)] pub struct TapManager { indexer_allocations: Eventual>, - escrow_accounts: Eventual>, + escrow_accounts: Eventual, pgpool: PgPool, domain_separator: Arc, } @@ -25,7 +25,7 @@ impl TapManager { pub fn new( pgpool: PgPool, indexer_allocations: Eventual>, - escrow_accounts: Eventual>, + escrow_accounts: Eventual, domain_separator: Eip712Domain, ) -> Self { Self { @@ -67,11 +67,21 @@ impl TapManager { anyhow!(e) })?; - if !self - .escrow_accounts - .value_immediate() - .unwrap_or_default() + let escrow_accounts = self.escrow_accounts.value_immediate().unwrap_or_default(); + + let receipt_sender = escrow_accounts + .signers_to_senders .get(&receipt_signer) + .ok_or_else(|| { + anyhow!( + "Receipt signer `{}` is not eligible for this indexer", + receipt_signer + ) + })?; + + if !escrow_accounts + .balances + .get(receipt_sender) .map_or(false, |balance| balance > &U256::zero()) { return Err(anyhow!( @@ -83,7 +93,7 @@ impl TapManager { // TODO: consider doing this in another async task to avoid slowing down the paid query flow. sqlx::query!( r#" - INSERT INTO scalar_tap_receipts (allocation_id, sender_address, timestamp_ns, value, receipt) + INSERT INTO scalar_tap_receipts (allocation_id, signer_address, timestamp_ns, value, receipt) VALUES ($1, $2, $3, $4, $5) "#, format!("{:?}", allocation_id) @@ -117,7 +127,7 @@ mod test { use keccak_hash::H256; use sqlx::postgres::PgListener; - use crate::test_vectors::{self, create_signed_receipt, TAP_SENDER}; + use crate::test_vectors::{self, create_signed_receipt}; use super::*; @@ -159,8 +169,10 @@ mod test { )); // Mock escrow accounts - let escrow_accounts = - Eventual::from_value(HashMap::from_iter(vec![(TAP_SENDER.1, U256::from(123))])); + let escrow_accounts = Eventual::from_value(EscrowAccounts::new( + test_vectors::ESCROW_ACCOUNTS_BALANCES.to_owned(), + test_vectors::ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS.to_owned(), + )); let tap_manager = TapManager::new( pgpool.clone(), diff --git a/common/src/test_vectors.rs b/common/src/test_vectors.rs index c6b7eaaf..9608349a 100644 --- a/common/src/test_vectors.rs +++ b/common/src/test_vectors.rs @@ -102,14 +102,35 @@ pub const ESCROW_QUERY_RESPONSE: &str = r#" "balance": "34", "totalAmountThawing": "10", "sender": { - "id": "0x90f8bf6a479f320ead074411a4b0e7944ea8c9c1" + "id": "0x9858EfFD232B4033E47d90003D41EC34EcaEda94", + "authorizedSigners": [ + { + "id": "0x533661F0fb14d2E8B26223C86a610Dd7D2260892" + }, + { + "id": "0x2740f6fA9188cF53ffB6729DDD21575721dE92ce" + } + ] } }, { "balance": "42", "totalAmountThawing": "0", "sender": { - "id": "0x22d491bde2303f2f43325b2108d26f1eaba1e32b" + "id": "0x22d491bde2303f2f43325b2108d26f1eaba1e32b", + "authorizedSigners": [ + { + "id": "0x245059163ff6ee14279aa7b35ea8f0fdb967df6e" + } + ] + } + }, + { + "balance": "2987", + "totalAmountThawing": "12", + "sender": { + "id": "0x192c3B6e0184Fa0Cc5B9D2bDDEb6B79Fb216a002", + "authorizedSigners": [] } } ] @@ -240,12 +261,48 @@ lazy_static! { ), ]); - pub static ref ESCROW_ACCOUNTS: HashMap = HashMap::from([ - (Address::from_str("0x90f8bf6a479f320ead074411a4b0e7944ea8c9c1").unwrap(), U256::from(24)), + pub static ref ESCROW_ACCOUNTS_BALANCES: HashMap = HashMap::from([ + (Address::from_str("0x9858EfFD232B4033E47d90003D41EC34EcaEda94").unwrap(), U256::from(24)), // TAP_SENDER (Address::from_str("0x22d491bde2303f2f43325b2108d26f1eaba1e32b").unwrap(), U256::from(42)), + (Address::from_str("0x192c3B6e0184Fa0Cc5B9D2bDDEb6B79Fb216a002").unwrap(), U256::from(2975)), + ]); + + /// Maps signers back to their senders + pub static ref ESCROW_ACCOUNTS_SIGNERS_TO_SENDERS: HashMap = HashMap::from([ + ( + Address::from_str("0x533661F0fb14d2E8B26223C86a610Dd7D2260892").unwrap(), // TAP_SIGNER + Address::from_str("0x9858EfFD232B4033E47d90003D41EC34EcaEda94").unwrap(), // TAP_SENDER + ), + ( + Address::from_str("0x2740f6fA9188cF53ffB6729DDD21575721dE92ce").unwrap(), + Address::from_str("0x9858EfFD232B4033E47d90003D41EC34EcaEda94").unwrap(), // TAP_SENDER + ), + ( + Address::from_str("0x245059163ff6ee14279aa7b35ea8f0fdb967df6e").unwrap(), + Address::from_str("0x22d491bde2303f2f43325b2108d26f1eaba1e32b").unwrap(), + ), + ]); + + pub static ref ESCROW_ACCOUNTS_SENDERS_TO_SIGNERS: HashMap> = HashMap::from([ + ( + Address::from_str("0x9858EfFD232B4033E47d90003D41EC34EcaEda94").unwrap(), // TAP_SENDER + vec![ + Address::from_str("0x533661F0fb14d2E8B26223C86a610Dd7D2260892").unwrap(), // TAP_SIGNER + Address::from_str("0x2740f6fA9188cF53ffB6729DDD21575721dE92ce").unwrap(), + ], + ), + ( + Address::from_str("0x22d491bde2303f2f43325b2108d26f1eaba1e32b").unwrap(), + vec![Address::from_str("0x245059163ff6ee14279aa7b35ea8f0fdb967df6e").unwrap()], + ), + ( + Address::from_str("0x192c3B6e0184Fa0Cc5B9D2bDDEb6B79Fb216a002").unwrap(), + vec![], + ), ]); - /// Fixture to generate a wallet and address + /// Fixture to generate a wallet and address. + /// Address: 0x9858EfFD232B4033E47d90003D41EC34EcaEda94 pub static ref TAP_SENDER: (LocalWallet, Address) = { let wallet: LocalWallet = MnemonicBuilder::::default() .phrase("abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about") @@ -256,6 +313,18 @@ lazy_static! { (wallet, Address::from_slice(address.as_bytes())) }; + /// Fixture to generate a wallet and address. + /// Address: 0x533661F0fb14d2E8B26223C86a610Dd7D2260892 + pub static ref TAP_SIGNER: (LocalWallet, Address) = { + let wallet: LocalWallet = MnemonicBuilder::::default() + .phrase("rude pipe parade travel organ vendor card festival magnet novel forget refuse keep draft tool") + .build() + .unwrap(); + let address = wallet.address(); + + (wallet, Address::from_slice(address.as_bytes())) + }; + pub static ref TAP_EIP712_DOMAIN: Eip712Domain = eip712_domain! { name: "TAP", version: "1", @@ -264,14 +333,14 @@ lazy_static! { }; } -/// Function to generate a signed receipt using the TAP_SENDER wallet. +/// Function to generate a signed receipt using the TAP_SIGNER wallet. pub async fn create_signed_receipt( allocation_id: Address, nonce: u64, timestamp_ns: u64, value: u128, ) -> SignedReceipt { - let (wallet, _) = &*self::TAP_SENDER; + let (wallet, _) = &*self::TAP_SIGNER; EIP712SignedMessage::new( &self::TAP_EIP712_DOMAIN, diff --git a/migrations/20230912220523_tap_receipts.up.sql b/migrations/20230912220523_tap_receipts.up.sql index a95a4a09..1f303901 100644 --- a/migrations/20230912220523_tap_receipts.up.sql +++ b/migrations/20230912220523_tap_receipts.up.sql @@ -1,7 +1,7 @@ CREATE TABLE IF NOT EXISTS scalar_tap_receipts ( id BIGSERIAL PRIMARY KEY, -- id being SERIAL is important for the function of tap-agent allocation_id CHAR(40) NOT NULL, - sender_address CHAR(40) NOT NULL, + signer_address CHAR(40) NOT NULL, timestamp_ns NUMERIC(20) NOT NULL, -- signature CHAR(130) NOT NULL, value NUMERIC(39) NOT NULL, @@ -12,7 +12,7 @@ CREATE FUNCTION scalar_tap_receipt_notify() RETURNS trigger AS $$ BEGIN - PERFORM pg_notify('scalar_tap_receipt_notification', format('{"id": %s, "allocation_id": "%s", "sender_address": "%s", "timestamp_ns": %s, "value": %s}', NEW.id, NEW.allocation_id, NEW.sender_address, NEW.timestamp_ns, NEW.value)); + PERFORM pg_notify('scalar_tap_receipt_notification', format('{"id": %s, "allocation_id": "%s", "signer_address": "%s", "timestamp_ns": %s, "value": %s}', NEW.id, NEW.allocation_id, NEW.signer_address, NEW.timestamp_ns, NEW.value)); RETURN NEW; END; $$ LANGUAGE 'plpgsql'; @@ -29,7 +29,7 @@ CREATE INDEX IF NOT EXISTS scalar_tap_receipts_timestamp_ns_idx ON scalar_tap_re CREATE TABLE IF NOT EXISTS scalar_tap_receipts_invalid ( id BIGSERIAL PRIMARY KEY, allocation_id CHAR(40) NOT NULL, - sender_address CHAR(40) NOT NULL, + signer_address CHAR(40) NOT NULL, timestamp_ns NUMERIC(20) NOT NULL, value NUMERIC(39) NOT NULL, received_receipt JSON NOT NULL diff --git a/tap-agent/src/agent.rs b/tap-agent/src/agent.rs index d1ecde26..1db7a90b 100644 --- a/tap-agent/src/agent.rs +++ b/tap-agent/src/agent.rs @@ -65,6 +65,7 @@ pub async fn start_agent(config: &'static config::Cli) -> SenderAllocationRelati escrow_subgraph, config.ethereum.indexer_address, Duration::from_millis(config.escrow_subgraph.escrow_syncing_interval_ms), + false, ); // TODO: replace with a proper implementation once the gateway registry contract is ready diff --git a/tap-agent/src/tap/escrow_adapter.rs b/tap-agent/src/tap/escrow_adapter.rs index b5957145..afd9b828 100644 --- a/tap-agent/src/tap/escrow_adapter.rs +++ b/tap-agent/src/tap/escrow_adapter.rs @@ -5,8 +5,8 @@ use std::{collections::HashMap, sync::Arc}; use alloy_primitives::Address; use async_trait::async_trait; -use ethereum_types::U256; use eventuals::Eventual; +use indexer_common::escrow_accounts::EscrowAccounts; use tap_core::adapters::escrow_adapter::EscrowAdapter as EscrowAdapterTrait; use thiserror::Error; use tokio::sync::RwLock; @@ -21,7 +21,7 @@ use tokio::sync::RwLock; /// receipt checks only when we need to send a RAV request. #[derive(Clone)] pub struct EscrowAdapter { - escrow_accounts: Eventual>, + escrow_accounts: Eventual, sender_pending_fees: Arc>>, } @@ -32,7 +32,7 @@ pub enum AdapterError { } impl EscrowAdapter { - pub fn new(escrow_accounts: Eventual>) -> Self { + pub fn new(escrow_accounts: Eventual) -> Self { Self { escrow_accounts, sender_pending_fees: Arc::new(RwLock::new(HashMap::new())), @@ -45,14 +45,29 @@ impl EscrowAdapterTrait for EscrowAdapter { type AdapterError = AdapterError; async fn get_available_escrow(&self, sender: Address) -> Result { - let balance = self - .escrow_accounts - .value() - .await - .map_err(|e| AdapterError::AdapterError { - error: format!("Could not get escrow balance from eventual: {:?}.", e), - })? - .get(&sender) + let escrow_accounts = + self.escrow_accounts + .value() + .await + .map_err(|e| AdapterError::AdapterError { + error: format!("Could not get escrow accounts from eventual: {:?}.", e), + })?; + + let sender = + escrow_accounts + .signers_to_senders + .get(&sender) + .ok_or(AdapterError::AdapterError { + error: format!( + "Sender {} not found for receipt signer, could not get available escrow.", + sender + ) + .to_string(), + })?; + + let balance = escrow_accounts + .balances + .get(sender) .ok_or(AdapterError::AdapterError { error: format!( "Sender {} not found in escrow balances map, could not get available escrow.", @@ -74,16 +89,37 @@ impl EscrowAdapterTrait for EscrowAdapter { .sender_pending_fees .read() .await - .get(&sender) + .get(sender) .copied() .unwrap_or(0); Ok(balance - fees) } async fn subtract_escrow(&self, sender: Address, value: u128) -> Result<(), AdapterError> { + let escrow_accounts = + self.escrow_accounts + .value() + .await + .map_err(|e| AdapterError::AdapterError { + error: format!("Could not get escrow accounts from eventual: {:?}.", e), + })?; + let current_available_escrow = self.get_available_escrow(sender).await?; + + let sender = + escrow_accounts + .signers_to_senders + .get(&sender) + .ok_or(AdapterError::AdapterError { + error: format!( + "Sender {} not found for receipt signer, could not get available escrow.", + sender + ) + .to_string(), + })?; + let mut fees_write = self.sender_pending_fees.write().await; - let fees = fees_write.entry(sender).or_insert(0); + let fees = fees_write.entry(sender.to_owned()).or_insert(0); if current_available_escrow < value { return Err(AdapterError::AdapterError { error: format!( @@ -100,29 +136,32 @@ impl EscrowAdapterTrait for EscrowAdapter { #[cfg(test)] mod test { - use super::*; - use ethereum_types::U256; + use std::vec; + + use crate::tap::test_utils::{SENDER, SIGNER}; - use std::str::FromStr; + use super::*; #[tokio::test] async fn test_subtract_escrow() { - let sender = Address::from_str("0xdeadbeefcafebabedeadbeefcafebabadeadbeef").unwrap(); - let escrow_accounts: Eventual> = - Eventual::from_value(HashMap::from([(sender, U256::from(1000))])); + let escrow_accounts = Eventual::from_value(EscrowAccounts::new( + HashMap::from([(SENDER.1, 1000.into())]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )); + let sender_pending_fees = Arc::new(RwLock::new(HashMap::new())); - sender_pending_fees.write().await.insert(sender, 500); + sender_pending_fees.write().await.insert(SENDER.1, 500); let adapter = EscrowAdapter { escrow_accounts, sender_pending_fees, }; adapter - .subtract_escrow(sender, 500) + .subtract_escrow(SIGNER.1, 500) .await .expect("Subtract escrow."); let available_escrow = adapter - .get_available_escrow(sender) + .get_available_escrow(SIGNER.1) .await .expect("Get available escrow."); assert_eq!(available_escrow, 0); @@ -130,23 +169,25 @@ mod test { #[tokio::test] async fn test_subtract_escrow_overflow() { - let sender = Address::from_str("0xdeadbeefcafebabedeadbeefcafebabadeadbeef").unwrap(); - let escrow_accounts: Eventual> = - Eventual::from_value(HashMap::from([(sender, U256::from(1000))])); + let escrow_accounts = Eventual::from_value(EscrowAccounts::new( + HashMap::from([(SENDER.1, 1000.into())]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )); + let sender_pending_fees = Arc::new(RwLock::new(HashMap::new())); - sender_pending_fees.write().await.insert(sender, 500); + sender_pending_fees.write().await.insert(SENDER.1, 500); let adapter = EscrowAdapter { escrow_accounts, sender_pending_fees, }; adapter - .subtract_escrow(sender, 250) + .subtract_escrow(SIGNER.1, 250) .await .expect("Subtract escrow."); - assert!(adapter.subtract_escrow(sender, 251).await.is_err()); + assert!(adapter.subtract_escrow(SIGNER.1, 251).await.is_err()); let available_escrow = adapter - .get_available_escrow(sender) + .get_available_escrow(SIGNER.1) .await .expect("Get available escrow."); assert_eq!(available_escrow, 250); diff --git a/tap-agent/src/tap/mod.rs b/tap-agent/src/tap/mod.rs index 07e01cb1..f40e5a0d 100644 --- a/tap-agent/src/tap/mod.rs +++ b/tap-agent/src/tap/mod.rs @@ -1,6 +1,11 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 +use alloy_primitives::Address; +use anyhow::anyhow; +use eventuals::Eventual; +use indexer_common::escrow_accounts::EscrowAccounts; + mod escrow_adapter; mod rav_storage_adapter; mod receipt_checks_adapter; @@ -10,3 +15,21 @@ pub mod sender_allocation_relationships_manager; #[cfg(test)] pub mod test_utils; + +async fn signers_trimmed( + escrow_accounts: &Eventual, + sender: Address, +) -> Result, anyhow::Error> { + let signers = escrow_accounts + .value() + .await + .map_err(|e| anyhow!("Error while getting escrow accounts: {:?}", e))? + .senders_to_signers + .get(&sender) + .ok_or(anyhow!("No signers found for sender {}.", sender))? + .iter() + .map(|s| s.to_string().trim_start_matches("0x").to_owned()) + .collect::>(); + + Ok(signers) +} diff --git a/tap-agent/src/tap/rav_storage_adapter.rs b/tap-agent/src/tap/rav_storage_adapter.rs index 3db3312c..9e0b0a94 100644 --- a/tap-agent/src/tap/rav_storage_adapter.rs +++ b/tap-agent/src/tap/rav_storage_adapter.rs @@ -96,7 +96,7 @@ impl RAVStorageAdapter { #[cfg(test)] mod test { use super::*; - use crate::tap::test_utils::{create_rav, ALLOCATION_ID, SENDER}; + use crate::tap::test_utils::{create_rav, ALLOCATION_ID, SENDER, SIGNER}; use tap_core::adapters::rav_storage_adapter::RAVStorageAdapter as RAVStorageAdapterTrait; #[sqlx::test(migrations = "../migrations")] @@ -108,7 +108,7 @@ mod test { // Insert a rav let mut new_rav = create_rav( *ALLOCATION_ID, - SENDER.0.clone(), + SIGNER.0.clone(), timestamp_ns, value_aggregate, ) @@ -127,7 +127,7 @@ mod test { for i in 0..3 { new_rav = create_rav( *ALLOCATION_ID, - SENDER.0.clone(), + SIGNER.0.clone(), timestamp_ns + i, value_aggregate - (i as u128), ) diff --git a/tap-agent/src/tap/receipt_checks_adapter.rs b/tap-agent/src/tap/receipt_checks_adapter.rs index e0f9c406..10563774 100644 --- a/tap-agent/src/tap/receipt_checks_adapter.rs +++ b/tap-agent/src/tap/receipt_checks_adapter.rs @@ -7,6 +7,7 @@ use alloy_primitives::Address; use async_trait::async_trait; use ethereum_types::U256; use eventuals::{timer, Eventual, EventualExt}; +use indexer_common::escrow_accounts::EscrowAccounts; use indexer_common::subgraph_client::{Query, SubgraphClient}; use sqlx::PgPool; use tap_core::adapters::receipt_checks_adapter::ReceiptChecksAdapter as ReceiptChecksAdapterTrait; @@ -20,7 +21,7 @@ use crate::config; pub struct ReceiptChecksAdapter { query_appraisals: Option>>>, allocation_id: Address, - escrow_accounts: Eventual>, + escrow_accounts: Eventual, tap_allocation_redeemed: Eventual, } @@ -30,7 +31,7 @@ impl ReceiptChecksAdapter { _pgpool: PgPool, query_appraisals: Option>>>, allocation_id: Address, - escrow_accounts: Eventual>, + escrow_accounts: Eventual, escrow_subgraph: &'static SubgraphClient, sender_id: Address, ) -> Self { @@ -125,8 +126,11 @@ impl ReceiptChecksAdapterTrait for ReceiptChecksAdapter { })?; Ok(escrow_accounts + .signers_to_senders .get(&sender_id) - .map_or(false, |balance| *balance > U256::from(0))) + .and_then(|sender| escrow_accounts.balances.get(sender)) + .map(|balance| *balance > U256::from(0)) + .unwrap_or(false)) } } diff --git a/tap-agent/src/tap/receipt_storage_adapter.rs b/tap-agent/src/tap/receipt_storage_adapter.rs index 2b144a03..f54a03f9 100644 --- a/tap-agent/src/tap/receipt_storage_adapter.rs +++ b/tap-agent/src/tap/receipt_storage_adapter.rs @@ -8,7 +8,8 @@ use std::{ use alloy_primitives::Address; use async_trait::async_trait; - +use eventuals::Eventual; +use indexer_common::escrow_accounts::EscrowAccounts; use sqlx::{postgres::types::PgRange, types::BigDecimal, PgPool}; use tap_core::adapters::receipt_storage_adapter::ReceiptStorageAdapter as ReceiptStorageAdapterTrait; use tap_core::{ @@ -18,12 +19,14 @@ use tap_core::{ use thiserror::Error; use tracing::error; -#[derive(Debug)] +use crate::tap::signers_trimmed; + pub struct ReceiptStorageAdapter { pgpool: PgPool, allocation_id: Address, sender: Address, required_checks: Vec, + escrow_accounts: Eventual, } #[derive(Debug, Error)] @@ -100,17 +103,24 @@ impl ReceiptStorageAdapterTrait for ReceiptStorageAdapter { // TODO: Make use of this limit in this function _receipts_limit: Option, ) -> Result, Self::AdapterError> { + let signers = signers_trimmed(&self.escrow_accounts, self.sender) + .await + .map_err(|e| AdapterError::AdapterError { + error: format!("{:?}.", e), + })?; + let records = sqlx::query!( r#" SELECT id, receipt FROM scalar_tap_receipts - WHERE allocation_id = $1 AND sender_address = $2 AND $3::numrange @> timestamp_ns + WHERE allocation_id = $1 AND signer_address IN (SELECT unnest($2::text[])) + AND $3::numrange @> timestamp_ns "#, self.allocation_id .to_string() .trim_start_matches("0x") .to_owned(), - self.sender.to_string().trim_start_matches("0x").to_owned(), + &signers, rangebounds_to_pgrange(timestamp_range_ns) ) .fetch_all(&self.pgpool) @@ -140,16 +150,23 @@ impl ReceiptStorageAdapterTrait for ReceiptStorageAdapter { &self, timestamp_ns: R, ) -> Result<(), Self::AdapterError> { + let signers = signers_trimmed(&self.escrow_accounts, self.sender) + .await + .map_err(|e| AdapterError::AdapterError { + error: format!("{:?}.", e), + })?; + sqlx::query!( r#" DELETE FROM scalar_tap_receipts - WHERE allocation_id = $1 AND sender_address = $2 AND $3::numrange @> timestamp_ns + WHERE allocation_id = $1 AND signer_address IN (SELECT unnest($2::text[])) + AND $3::numrange @> timestamp_ns "#, self.allocation_id .to_string() .trim_start_matches("0x") .to_owned(), - self.sender.to_string().trim_start_matches("0x").to_owned(), + &signers, rangebounds_to_pgrange(timestamp_ns) ) .execute(&self.pgpool) @@ -164,24 +181,29 @@ impl ReceiptStorageAdapter { allocation_id: Address, sender: Address, required_checks: Vec, + escrow_accounts: Eventual, ) -> Self { Self { pgpool, allocation_id, sender, required_checks, + escrow_accounts, } } } #[cfg(test)] mod test { + use std::collections::HashMap; + use super::*; use crate::tap::test_utils::{ create_received_receipt, store_receipt, ALLOCATION_ID, ALLOCATION_ID_IRRELEVANT, SENDER, - SENDER_IRRELEVANT, TAP_EIP712_DOMAIN_SEPARATOR, + SENDER_IRRELEVANT, SIGNER, TAP_EIP712_DOMAIN_SEPARATOR, }; use anyhow::Result; + use serde_json::Value; use sqlx::PgPool; use tap_core::tap_receipt::get_full_list_of_checks; @@ -191,9 +213,17 @@ mod test { /// retrieve_receipts_in_timestamp_range. async fn retrieve_range_and_check + Send>( storage_adapter: &ReceiptStorageAdapter, + escrow_accounts: &Eventual, received_receipt_vec: &[(u64, ReceivedReceipt)], range: R, ) -> Result<()> { + let signers_to_senders = escrow_accounts + .value() + .await + .unwrap() + .signers_to_senders + .to_owned(); + // Filtering the received receipts by timestamp range let received_receipt_vec: Vec<(u64, ReceivedReceipt)> = received_receipt_vec .iter() @@ -201,11 +231,14 @@ mod test { range.contains(&received_receipt.signed_receipt().message.timestamp_ns) && (received_receipt.signed_receipt().message.allocation_id == storage_adapter.allocation_id) - && (received_receipt - .signed_receipt() - .recover_signer(&TAP_EIP712_DOMAIN_SEPARATOR) - .unwrap() - == storage_adapter.sender) + && (signers_to_senders + .get( + &received_receipt + .signed_receipt() + .recover_signer(&TAP_EIP712_DOMAIN_SEPARATOR) + .unwrap(), + ) + .map_or(false, |v| *v == storage_adapter.sender)) }) .cloned() .collect(); @@ -243,9 +276,17 @@ mod test { async fn remove_range_and_check + Send>( storage_adapter: &ReceiptStorageAdapter, + escrow_accounts: &Eventual, received_receipt_vec: &[ReceivedReceipt], range: R, ) -> Result<()> { + let signers_to_senders = escrow_accounts + .value() + .await + .unwrap() + .signers_to_senders + .to_owned(); + // Storing the receipts let mut received_receipt_id_vec = Vec::new(); for received_receipt in received_receipt_vec.iter() { @@ -269,11 +310,14 @@ mod test { .filter(|(_, received_receipt)| { if (received_receipt.signed_receipt().message.allocation_id == storage_adapter.allocation_id) - && (received_receipt - .signed_receipt() - .recover_signer(&TAP_EIP712_DOMAIN_SEPARATOR) - .unwrap() - == storage_adapter.sender) + && (signers_to_senders + .get( + &received_receipt + .signed_receipt() + .recover_signer(&TAP_EIP712_DOMAIN_SEPARATOR) + .unwrap(), + ) + .map_or(false, |v| *v == storage_adapter.sender)) { !range.contains(&received_receipt.signed_receipt().message.timestamp_ns) } else { @@ -345,11 +389,17 @@ mod test { #[sqlx::test(migrations = "../migrations")] async fn retrieve_receipts_in_timestamp_range(pgpool: PgPool) { + let escrow_accounts = Eventual::from_value(EscrowAccounts::new( + HashMap::from([(SENDER.1, 1000.into())]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )); + let storage_adapter = ReceiptStorageAdapter::new( pgpool.clone(), *ALLOCATION_ID, SENDER.1, get_full_list_of_checks(), + escrow_accounts.clone(), ); // Creating 10 receipts with timestamps 42 to 51 @@ -358,7 +408,7 @@ mod test { received_receipt_vec.push( create_received_receipt( &ALLOCATION_ID, - &SENDER.0, + &SIGNER.0, i + 684, i + 42, (i + 124).into(), @@ -371,7 +421,7 @@ mod test { received_receipt_vec.push( create_received_receipt( &ALLOCATION_ID_IRRELEVANT, - &SENDER.0, + &SIGNER.0, i + 684, i + 42, (i + 124).into(), @@ -413,7 +463,7 @@ mod test { { $( assert!( - retrieve_range_and_check(&storage_adapter, &received_receipt_vec, $arg) + retrieve_range_and_check(&storage_adapter, &escrow_accounts, &received_receipt_vec, $arg) .await .is_ok()); )+ @@ -483,8 +533,18 @@ mod test { #[sqlx::test(migrations = "../migrations")] async fn remove_receipts_in_timestamp_range(pgpool: PgPool) { - let storage_adapter = - ReceiptStorageAdapter::new(pgpool, *ALLOCATION_ID, SENDER.1, get_full_list_of_checks()); + let escrow_accounts = Eventual::from_value(EscrowAccounts::new( + HashMap::from([(SENDER.1, 1000.into())]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )); + + let storage_adapter = ReceiptStorageAdapter::new( + pgpool, + *ALLOCATION_ID, + SENDER.1, + get_full_list_of_checks(), + escrow_accounts.clone(), + ); // Creating 10 receipts with timestamps 42 to 51 let mut received_receipt_vec = Vec::new(); @@ -492,7 +552,7 @@ mod test { received_receipt_vec.push( create_received_receipt( &ALLOCATION_ID, - &SENDER.0, + &SIGNER.0, i + 684, i + 42, (i + 124).into(), @@ -505,7 +565,7 @@ mod test { received_receipt_vec.push( create_received_receipt( &ALLOCATION_ID_IRRELEVANT, - &SENDER.0, + &SIGNER.0, i + 684, i + 42, (i + 124).into(), @@ -531,7 +591,7 @@ mod test { { $( assert!( - remove_range_and_check(&storage_adapter, &received_receipt_vec, $arg) + remove_range_and_check(&storage_adapter, &escrow_accounts, &received_receipt_vec, $arg) .await.is_ok() ); ) + diff --git a/tap-agent/src/tap/sender_allocation_relationship.rs b/tap-agent/src/tap/sender_allocation_relationship.rs index a253167c..64db68a2 100644 --- a/tap-agent/src/tap/sender_allocation_relationship.rs +++ b/tap-agent/src/tap/sender_allocation_relationship.rs @@ -1,14 +1,14 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use std::{collections::HashMap, str::FromStr, sync::Arc, time::Duration}; +use std::{str::FromStr, sync::Arc, time::Duration}; use alloy_primitives::Address; use alloy_sol_types::Eip712Domain; use anyhow::{anyhow, ensure}; -use ethereum_types::U256; + use eventuals::Eventual; -use indexer_common::prelude::SubgraphClient; +use indexer_common::{escrow_accounts::EscrowAccounts, prelude::SubgraphClient}; use jsonrpsee::{core::client::ClientT, http_client::HttpClientBuilder, rpc_params}; use sqlx::{types::BigDecimal, PgPool}; use tap_aggregator::jsonrpsee_helpers::JsonRpcResponse; @@ -30,7 +30,7 @@ use crate::{ tap::{ escrow_adapter::EscrowAdapter, rav_storage_adapter::RAVStorageAdapter, receipt_checks_adapter::ReceiptChecksAdapter, - receipt_storage_adapter::ReceiptStorageAdapter, + receipt_storage_adapter::ReceiptStorageAdapter, signers_trimmed, }, }; @@ -66,6 +66,7 @@ struct Inner { unaggregated_fees: Arc>, state: Arc>, config: &'static config::Cli, + escrow_accounts: Eventual, } /// A SenderAllocationRelationship is the relationship between the indexer and the sender in the @@ -88,7 +89,7 @@ impl SenderAllocationRelationship { pgpool: PgPool, allocation_id: Address, sender: Address, - escrow_accounts: Eventual>, + escrow_accounts: Eventual, escrow_subgraph: &'static SubgraphClient, escrow_adapter: EscrowAdapter, tap_eip712_domain_separator: Eip712Domain, @@ -118,6 +119,7 @@ impl SenderAllocationRelationship { allocation_id, sender, required_checks.clone(), + escrow_accounts.clone(), ); let rav_storage_adapter = RAVStorageAdapter::new(pgpool.clone(), allocation_id, sender); let tap_manager = TapManager::new( @@ -139,6 +141,7 @@ impl SenderAllocationRelationship { unaggregated_fees: Arc::new(Mutex::new(UnaggregatedFees::default())), state: Arc::new(Mutex::new(State::Running)), config, + escrow_accounts, }), rav_requester_task: Arc::new(Mutex::new(None)), } @@ -175,7 +178,7 @@ impl SenderAllocationRelationship { new_receipt_notification.value, unaggregated_fees.value, new_receipt_notification.allocation_id, - new_receipt_notification.sender_address + self.inner.sender ); u128::MAX }); @@ -210,6 +213,8 @@ impl SenderAllocationRelationship { async fn update_unaggregated_fees_static(inner: &Inner) -> Result<(), anyhow::Error> { inner.tap_manager.remove_obsolete_receipts().await?; + let signers = signers_trimmed(&inner.escrow_accounts, inner.sender).await?; + // TODO: Get `rav.timestamp_ns` from the TAP Manager's RAV storage adapter instead? let res = sqlx::query!( r#" @@ -229,7 +234,7 @@ impl SenderAllocationRelationship { scalar_tap_receipts WHERE allocation_id = $1 - AND sender_address = $2 + AND signer_address IN (SELECT unnest($3::text[])) AND CASE WHEN ( SELECT timestamp_ns :: NUMERIC @@ -247,7 +252,8 @@ impl SenderAllocationRelationship { .to_string() .trim_start_matches("0x") .to_owned(), - inner.sender.to_string().trim_start_matches("0x").to_owned() + inner.sender.to_string().trim_start_matches("0x").to_owned(), + &signers ) .fetch_one(&inner.pgpool) .await?; @@ -447,7 +453,7 @@ impl SenderAllocationRelationship { r#" INSERT INTO scalar_tap_receipts_invalid ( allocation_id, - sender_address, + signer_address, timestamp_ns, value, received_receipt @@ -525,6 +531,8 @@ impl Drop for SenderAllocationRelationship { #[cfg(test)] mod tests { + use std::collections::HashMap; + use indexer_common::subgraph_client::DeploymentDetails; use serde_json::json; use tap_aggregator::server::run_server; @@ -537,7 +545,7 @@ mod tests { use super::*; use crate::tap::test_utils::{ create_rav, create_received_receipt, store_rav, store_receipt, ALLOCATION_ID, INDEXER, - SENDER, TAP_EIP712_DOMAIN_SEPARATOR, + SENDER, SIGNER, TAP_EIP712_DOMAIN_SEPARATOR, }; const DUMMY_URL: &str = "http://localhost:1234"; @@ -567,9 +575,10 @@ mod tests { DeploymentDetails::for_query_url(escrow_subgraph_endpoint).unwrap(), ))); - let (mut escrow_accounts_writer, escrow_accounts_eventual) = - Eventual::>::new(); - escrow_accounts_writer.write(HashMap::from([(SENDER.1, 1000.into())])); + let escrow_accounts_eventual = Eventual::from_value(EscrowAccounts::new( + HashMap::from([(SENDER.1, 1000.into())]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )); let escrow_adapter = EscrowAdapter::new(escrow_accounts_eventual.clone()); @@ -600,7 +609,7 @@ mod tests { // Add receipts to the database. for i in 1..10 { let receipt = - create_received_receipt(&ALLOCATION_ID, &SENDER.0, i, i, i.into(), i).await; + create_received_receipt(&ALLOCATION_ID, &SIGNER.0, i, i, i.into(), i).await; store_receipt(&pgpool, receipt.signed_receipt()) .await .unwrap(); @@ -639,13 +648,13 @@ mod tests { // Add the RAV to the database. // This RAV has timestamp 4. The sender_allocation_relatioship should only consider receipts // with a timestamp greater than 4. - let signed_rav = create_rav(*ALLOCATION_ID, SENDER.0.clone(), 4, 10).await; + let signed_rav = create_rav(*ALLOCATION_ID, SIGNER.0.clone(), 4, 10).await; store_rav(&pgpool, signed_rav, SENDER.1).await.unwrap(); // Add receipts to the database. for i in 1..10 { let receipt = - create_received_receipt(&ALLOCATION_ID, &SENDER.0, i, i, i.into(), i).await; + create_received_receipt(&ALLOCATION_ID, &SIGNER.0, i, i, i.into(), i).await; store_receipt(&pgpool, receipt.signed_receipt()) .await .unwrap(); @@ -682,7 +691,7 @@ mod tests { let mut expected_unaggregated_fees = 0u128; for i in 10..20 { let receipt = - create_received_receipt(&ALLOCATION_ID, &SENDER.0, i, i, i.into(), i).await; + create_received_receipt(&ALLOCATION_ID, &SIGNER.0, i, i, i.into(), i).await; store_receipt(&pgpool, receipt.signed_receipt()) .await .unwrap(); @@ -710,7 +719,7 @@ mod tests { // table let new_receipt_notification = NewReceiptNotification { allocation_id: *ALLOCATION_ID, - sender_address: SENDER.1, + signer_address: SIGNER.1, id: 10, timestamp_ns: 19, value: 19, @@ -733,7 +742,7 @@ mod tests { // Send a new receipt notification. let new_receipt_notification = NewReceiptNotification { allocation_id: *ALLOCATION_ID, - sender_address: SENDER.1, + signer_address: SIGNER.1, id: 30, timestamp_ns: 20, value: 20, @@ -757,7 +766,7 @@ mod tests { // Send a new receipt notification that has a lower ID than the previous one. let new_receipt_notification = NewReceiptNotification { allocation_id: *ALLOCATION_ID, - sender_address: SENDER.1, + signer_address: SIGNER.1, id: 25, timestamp_ns: 19, value: 19, @@ -783,7 +792,7 @@ mod tests { // Start a TAP aggregator server. let (handle, aggregator_endpoint) = run_server( 0, - SENDER.0.clone(), + SIGNER.0.clone(), TAP_EIP712_DOMAIN_SEPARATOR.clone(), 100 * 1024, 100 * 1024, @@ -818,7 +827,7 @@ mod tests { // Add receipts to the database. for i in 0..10 { let receipt = - create_received_receipt(&ALLOCATION_ID, &SENDER.0, i, i + 1, i.into(), i).await; + create_received_receipt(&ALLOCATION_ID, &SIGNER.0, i, i + 1, i.into(), i).await; store_receipt(&pgpool, receipt.signed_receipt()) .await .unwrap(); @@ -845,7 +854,7 @@ mod tests { // Start a TAP aggregator server. let (handle, aggregator_endpoint) = run_server( 0, - SENDER.0.clone(), + SIGNER.0.clone(), TAP_EIP712_DOMAIN_SEPARATOR.clone(), 100 * 1024, 100 * 1024, @@ -887,14 +896,14 @@ mod tests { let value = (i + 10) as u128; let receipt = - create_received_receipt(&ALLOCATION_ID, &SENDER.0, i, i + 1, value, i).await; + create_received_receipt(&ALLOCATION_ID, &SIGNER.0, i, i + 1, value, i).await; store_receipt(&pgpool, receipt.signed_receipt()) .await .unwrap(); sender_allocation_relatioship .handle_new_receipt_notification(NewReceiptNotification { allocation_id: *ALLOCATION_ID, - sender_address: SENDER.1, + signer_address: SIGNER.1, id: i, timestamp_ns: i + 1, value, @@ -967,7 +976,7 @@ mod tests { let value = (i + 10) as u128; let receipt = - create_received_receipt(&ALLOCATION_ID, &SENDER.0, i, i + 1, i.into(), i).await; + create_received_receipt(&ALLOCATION_ID, &SIGNER.0, i, i + 1, i.into(), i).await; store_receipt(&pgpool, receipt.signed_receipt()) .await .unwrap(); @@ -975,7 +984,7 @@ mod tests { sender_allocation_relatioship .handle_new_receipt_notification(NewReceiptNotification { allocation_id: *ALLOCATION_ID, - sender_address: SENDER.1, + signer_address: SIGNER.1, id: i, timestamp_ns: i + 1, value, diff --git a/tap-agent/src/tap/sender_allocation_relationships_manager.rs b/tap-agent/src/tap/sender_allocation_relationships_manager.rs index 6832a0fb..f9ebe44a 100644 --- a/tap-agent/src/tap/sender_allocation_relationships_manager.rs +++ b/tap-agent/src/tap/sender_allocation_relationships_manager.rs @@ -1,14 +1,15 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 +use std::collections::HashSet; use std::{collections::HashMap, str::FromStr, sync::Arc}; use alloy_primitives::Address; use alloy_sol_types::Eip712Domain; use anyhow::anyhow; use anyhow::Result; -use ethereum_types::U256; use eventuals::{Eventual, EventualExt, PipeHandle}; +use indexer_common::escrow_accounts::EscrowAccounts; use indexer_common::prelude::{Allocation, SubgraphClient}; use serde::Deserialize; use sqlx::{postgres::PgListener, PgPool}; @@ -23,7 +24,7 @@ use crate::config; pub struct NewReceiptNotification { pub id: u64, pub allocation_id: Address, - pub sender_address: Address, + pub signer_address: Address, pub timestamp_ns: u64, pub value: u128, } @@ -42,7 +43,7 @@ struct Inner { sender_allocation_relationships: Arc>>, indexer_allocations: Eventual>, - escrow_accounts: Eventual>, + escrow_accounts: Eventual, escrow_subgraph: &'static SubgraphClient, escrow_adapter: EscrowAdapter, tap_eip712_domain_separator: Eip712Domain, @@ -54,7 +55,7 @@ impl SenderAllocationRelationshipsManager { config: &'static config::Cli, pgpool: PgPool, indexer_allocations: Eventual>, - escrow_accounts: Eventual>, + escrow_accounts: Eventual, escrow_subgraph: &'static SubgraphClient, tap_eip712_domain_separator: Eip712Domain, sender_aggregator_endpoints: HashMap, @@ -73,6 +74,12 @@ impl SenderAllocationRelationshipsManager { sender_aggregator_endpoints, }); + let escrow_accounts_snapshot = inner + .escrow_accounts + .value() + .await + .expect("Should get escrow accounts from Eventual"); + Self::update_sender_allocation_relationships( &inner, inner @@ -80,11 +87,7 @@ impl SenderAllocationRelationshipsManager { .value() .await .expect("Should get indexer allocations from Eventual"), - inner - .escrow_accounts - .value() - .await - .expect("Should get escrow accounts from Eventual"), + escrow_accounts_snapshot.balances.keys().copied().collect(), ) .await .expect("Should be able to update sender_allocation_relationships"); @@ -111,7 +114,7 @@ impl SenderAllocationRelationshipsManager { // still need to get aggregated. sqlx::query!( r#" - SELECT DISTINCT allocation_id, sender_address + SELECT DISTINCT allocation_id, signer_address FROM scalar_tap_receipts "# ) @@ -122,8 +125,13 @@ impl SenderAllocationRelationshipsManager { .for_each(|row| { let allocation_id = Address::from_str(&row.allocation_id) .expect("allocation_id should be a valid address"); - let sender = Address::from_str(&row.sender_address) - .expect("sender_address should be a valid address"); + let signer = Address::from_str(&row.signer_address) + .expect("signer_address should be a valid address"); + let sender = escrow_accounts_snapshot + .signers_to_senders + .get(&signer) + .copied() + .expect("should be able to get sender from signer"); // Only create a SenderAllocationRelationship if it doesn't exist yet. if let std::collections::hash_map::Entry::Vacant(e) = @@ -162,6 +170,7 @@ impl SenderAllocationRelationshipsManager { let new_receipts_watcher_handle = tokio::spawn(Self::new_receipts_watcher( pglistener, inner.sender_allocation_relationships.clone(), + inner.escrow_accounts.clone(), )); // Start the eligible_allocations_senders_pipe that watches for changes in eligible senders @@ -177,7 +186,7 @@ impl SenderAllocationRelationshipsManager { Self::update_sender_allocation_relationships( &inner, indexer_allocations, - escrow_accounts, + escrow_accounts.balances.keys().copied().collect(), ) .await .unwrap_or_else(|e| { @@ -203,6 +212,7 @@ impl SenderAllocationRelationshipsManager { sender_allocation_relationships: Arc< RwLock>, >, + escrow_accounts: Eventual, ) { loop { // TODO: recover from errors or shutdown the whole program? @@ -216,11 +226,31 @@ impl SenderAllocationRelationshipsManager { NewReceiptNotification", ); - if let Some(sender_allocation_relationship) = - sender_allocation_relationships.read().await.get(&( - new_receipt_notification.allocation_id, - new_receipt_notification.sender_address, - )) + let sender_address = escrow_accounts + .value() + .await + .expect("should be able to get escrow accounts") + .signers_to_senders + .get(&new_receipt_notification.signer_address) + .copied(); + + let sender_address = match sender_address { + Some(sender_address) => sender_address, + None => { + error!( + "No sender address found for receipt signer address {}. \ + This should not happen.", + new_receipt_notification.signer_address + ); + // TODO: save the receipt in the failed receipts table? + continue; + } + }; + + if let Some(sender_allocation_relationship) = sender_allocation_relationships + .read() + .await + .get(&(new_receipt_notification.allocation_id, sender_address)) { sender_allocation_relationship .handle_new_receipt_notification(new_receipt_notification) @@ -228,9 +258,9 @@ impl SenderAllocationRelationshipsManager { } else { warn!( "No sender_allocation_relationship found for allocation_id {} and \ - sender_address {} to process new receipt notification. This should not \ - happen.", - new_receipt_notification.allocation_id, new_receipt_notification.sender_address + sender_address {} to process new receipt notification. This should not \ + happen.", + new_receipt_notification.allocation_id, sender_address ); } } @@ -239,10 +269,9 @@ impl SenderAllocationRelationshipsManager { async fn update_sender_allocation_relationships( inner: &Inner, indexer_allocations: HashMap, - escrow_accounts: HashMap, + senders: HashSet
, ) -> Result<()> { let eligible_allocations: Vec
= indexer_allocations.keys().copied().collect(); - let senders: Vec
= escrow_accounts.keys().copied().collect(); let mut sender_allocation_relationships_write = inner.sender_allocation_relationships.write().await; @@ -303,6 +332,9 @@ impl Drop for SenderAllocationRelationshipsManager { #[cfg(test)] mod tests { + use std::vec; + + use ethereum_types::U256; use indexer_common::{ prelude::{AllocationStatus, SubgraphDeployment}, subgraph_client::DeploymentDetails, @@ -316,7 +348,7 @@ mod tests { use crate::tap::{ sender_allocation_relationship::State, - test_utils::{INDEXER, SENDER, TAP_EIP712_DOMAIN_SEPARATOR}, + test_utils::{INDEXER, SENDER, SIGNER, TAP_EIP712_DOMAIN_SEPARATOR}, }; use super::*; @@ -341,8 +373,8 @@ mod tests { indexer_allocations_writer.write(HashMap::new()); let (mut escrow_accounts_writer, escrow_accounts_eventual) = - Eventual::>::new(); - escrow_accounts_writer.write(HashMap::new()); + Eventual::::new(); + escrow_accounts_writer.write(EscrowAccounts::default()); // Mock escrow subgraph. let mock_server = MockServer::start().await; @@ -405,7 +437,10 @@ mod tests { )])); // Add an escrow account to the escrow_accounts Eventual. - escrow_accounts_writer.write(HashMap::from([(SENDER.1, U256::from_str("1000").unwrap())])); + escrow_accounts_writer.write(EscrowAccounts::new( + HashMap::from([(SENDER.1, 1000.into())]), + HashMap::from([(SENDER.1, vec![SIGNER.1])]), + )); // Wait for the SenderAllocationRelationship to be created. tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; @@ -419,7 +454,7 @@ mod tests { .contains_key(&(allocation_id, SENDER.1))); // Remove the escrow account from the escrow_accounts Eventual. - escrow_accounts_writer.write(HashMap::new()); + escrow_accounts_writer.write(EscrowAccounts::default()); // Wait a bit tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; diff --git a/tap-agent/src/tap/test_utils.rs b/tap-agent/src/tap/test_utils.rs index 1df900e2..c538d17b 100644 --- a/tap-agent/src/tap/test_utils.rs +++ b/tap-agent/src/tap/test_utils.rs @@ -22,7 +22,8 @@ lazy_static! { Address::from_str("0xbcdebcdebcdebcdebcdebcdebcdebcdebcdebcde").unwrap(); pub static ref SENDER: (LocalWallet, Address) = wallet(0); pub static ref SENDER_IRRELEVANT: (LocalWallet, Address) = wallet(1); - pub static ref INDEXER: (LocalWallet, Address) = wallet(2); + pub static ref SIGNER: (LocalWallet, Address) = wallet(2); + pub static ref INDEXER: (LocalWallet, Address) = wallet(3); pub static ref TAP_EIP712_DOMAIN_SEPARATOR: Eip712Domain = eip712_domain! { name: "TAP", version: "1", @@ -47,7 +48,7 @@ pub fn wallet(index: u32) -> (LocalWallet, Address) { /// given `query_id` and `value` pub async fn create_received_receipt( allocation_id: &Address, - sender_wallet: &LocalWallet, + signer_wallet: &LocalWallet, nonce: u64, timestamp_ns: u64, value: u128, @@ -61,7 +62,7 @@ pub async fn create_received_receipt( timestamp_ns, value, }, - sender_wallet, + signer_wallet, ) .await .unwrap(); @@ -71,7 +72,7 @@ pub async fn create_received_receipt( /// Fixture to generate a RAV using the wallet from `keys()` pub async fn create_rav( allocation_id: Address, - sender_wallet: LocalWallet, + signer_wallet: LocalWallet, timestamp_ns: u64, value_aggregate: u128, ) -> SignedRAV { @@ -82,7 +83,7 @@ pub async fn create_rav( timestamp_ns, value_aggregate, }, - &sender_wallet, + &signer_wallet, ) .await .unwrap() @@ -92,7 +93,7 @@ pub async fn store_receipt(pgpool: &PgPool, signed_receipt: SignedReceipt) -> Re let record = sqlx::query!( r#" INSERT INTO scalar_tap_receipts ( - allocation_id, sender_address, timestamp_ns, value, receipt + allocation_id, signer_address, timestamp_ns, value, receipt ) VALUES ($1, $2, $3, $4, $5) RETURNING id From 229b7929f00928f110e8262f127529f3edb5862b Mon Sep 17 00:00:00 2001 From: Alexis Asseman Date: Mon, 15 Jan 2024 11:58:49 -0800 Subject: [PATCH 2/6] refactor: rename EscrowAccount's balances to sender_balances Signed-off-by: Alexis Asseman --- common/src/escrow_accounts.rs | 10 +++++----- common/src/tap_manager.rs | 2 +- tap-agent/src/tap/escrow_adapter.rs | 2 +- tap-agent/src/tap/receipt_checks_adapter.rs | 2 +- .../src/tap/sender_allocation_relationships_manager.rs | 8 ++++++-- 5 files changed, 14 insertions(+), 10 deletions(-) diff --git a/common/src/escrow_accounts.rs b/common/src/escrow_accounts.rs index d8067f79..5182d5ee 100644 --- a/common/src/escrow_accounts.rs +++ b/common/src/escrow_accounts.rs @@ -15,14 +15,14 @@ use crate::prelude::{Query, SubgraphClient}; #[derive(Clone, Debug, Default, PartialEq, Eq)] pub struct EscrowAccounts { - pub balances: HashMap, + pub senders_balances: HashMap, pub signers_to_senders: HashMap, pub senders_to_signers: HashMap>, } impl EscrowAccounts { pub fn new( - balances: HashMap, + senders_balances: HashMap, senders_to_signers: HashMap>, ) -> Self { let signers_to_senders = senders_to_signers @@ -36,7 +36,7 @@ impl EscrowAccounts { .collect(); Self { - balances, + senders_balances, signers_to_senders, senders_to_signers, } @@ -133,7 +133,7 @@ pub fn escrow_accounts( let response = response.map_err(|e| e.to_string())?; - let balances = response + let senders_balances = response .escrow_accounts .iter() .map(|account| { @@ -170,7 +170,7 @@ pub fn escrow_accounts( }) .collect(); - Ok(EscrowAccounts::new(balances, senders_to_signers)) + Ok(EscrowAccounts::new(senders_balances, senders_to_signers)) }, move |err: String| { error!( diff --git a/common/src/tap_manager.rs b/common/src/tap_manager.rs index 07cb60b8..8df27c8c 100644 --- a/common/src/tap_manager.rs +++ b/common/src/tap_manager.rs @@ -80,7 +80,7 @@ impl TapManager { })?; if !escrow_accounts - .balances + .senders_balances .get(receipt_sender) .map_or(false, |balance| balance > &U256::zero()) { diff --git a/tap-agent/src/tap/escrow_adapter.rs b/tap-agent/src/tap/escrow_adapter.rs index afd9b828..bbbb1f1f 100644 --- a/tap-agent/src/tap/escrow_adapter.rs +++ b/tap-agent/src/tap/escrow_adapter.rs @@ -66,7 +66,7 @@ impl EscrowAdapterTrait for EscrowAdapter { })?; let balance = escrow_accounts - .balances + .senders_balances .get(sender) .ok_or(AdapterError::AdapterError { error: format!( diff --git a/tap-agent/src/tap/receipt_checks_adapter.rs b/tap-agent/src/tap/receipt_checks_adapter.rs index 10563774..0e2a97d2 100644 --- a/tap-agent/src/tap/receipt_checks_adapter.rs +++ b/tap-agent/src/tap/receipt_checks_adapter.rs @@ -128,7 +128,7 @@ impl ReceiptChecksAdapterTrait for ReceiptChecksAdapter { Ok(escrow_accounts .signers_to_senders .get(&sender_id) - .and_then(|sender| escrow_accounts.balances.get(sender)) + .and_then(|sender| escrow_accounts.senders_balances.get(sender)) .map(|balance| *balance > U256::from(0)) .unwrap_or(false)) } diff --git a/tap-agent/src/tap/sender_allocation_relationships_manager.rs b/tap-agent/src/tap/sender_allocation_relationships_manager.rs index f9ebe44a..19dccab2 100644 --- a/tap-agent/src/tap/sender_allocation_relationships_manager.rs +++ b/tap-agent/src/tap/sender_allocation_relationships_manager.rs @@ -87,7 +87,11 @@ impl SenderAllocationRelationshipsManager { .value() .await .expect("Should get indexer allocations from Eventual"), - escrow_accounts_snapshot.balances.keys().copied().collect(), + escrow_accounts_snapshot + .senders_balances + .keys() + .copied() + .collect(), ) .await .expect("Should be able to update sender_allocation_relationships"); @@ -186,7 +190,7 @@ impl SenderAllocationRelationshipsManager { Self::update_sender_allocation_relationships( &inner, indexer_allocations, - escrow_accounts.balances.keys().copied().collect(), + escrow_accounts.senders_balances.keys().copied().collect(), ) .await .unwrap_or_else(|e| { From 82ce7cc4891760c8fbce553c3cf9f9db45d11a9b Mon Sep 17 00:00:00 2001 From: Alexis Asseman Date: Mon, 15 Jan 2024 13:00:10 -0800 Subject: [PATCH 3/6] refactor: condense if/else for escrow query Signed-off-by: Alexis Asseman --- common/src/escrow_accounts.rs | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/common/src/escrow_accounts.rs b/common/src/escrow_accounts.rs index 5182d5ee..3cab8608 100644 --- a/common/src/escrow_accounts.rs +++ b/common/src/escrow_accounts.rs @@ -81,7 +81,8 @@ pub fn escrow_accounts( // queries for this signer. // isAuthorized == true means that the signer is still authorized to sign // payments in the name of the sender. - let query_no_thawing_signers = r#" + let query = if reject_thawing_signers { + r#" query ($indexer: ID!) { escrowAccounts(where: {receiver_: {id: $indexer}}) { balance @@ -96,9 +97,9 @@ pub fn escrow_accounts( } } } - "#; - - let query_with_thawing_signers = r#" + "# + } else { + r#" query ($indexer: ID!) { escrowAccounts(where: {receiver_: {id: $indexer}}) { balance @@ -113,12 +114,7 @@ pub fn escrow_accounts( } } } - "#; - - let query = if reject_thawing_signers { - query_no_thawing_signers - } else { - query_with_thawing_signers + "# }; timer(interval).map_with_retry( From 73fb933ff3860cc0414344ce61fe238c3efc2fb1 Mon Sep 17 00:00:00 2001 From: Alexis Asseman Date: Mon, 15 Jan 2024 13:44:03 -0800 Subject: [PATCH 4/6] refactor: remove unneeded .collect() Signed-off-by: Alexis Asseman --- common/src/escrow_accounts.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/common/src/escrow_accounts.rs b/common/src/escrow_accounts.rs index 3cab8608..1809e2c7 100644 --- a/common/src/escrow_accounts.rs +++ b/common/src/escrow_accounts.rs @@ -27,12 +27,7 @@ impl EscrowAccounts { ) -> Self { let signers_to_senders = senders_to_signers .iter() - .flat_map(|(sender, signers)| { - signers - .iter() - .map(move |signer| (*signer, *sender)) - .collect::>() - }) + .flat_map(|(sender, signers)| signers.iter().map(move |signer| (*signer, *sender))) .collect(); Self { From 8cf257cbce7d307601050a1740863702069b2f9d Mon Sep 17 00:00:00 2001 From: Alexis Asseman Date: Mon, 15 Jan 2024 17:39:07 -0800 Subject: [PATCH 5/6] refactor: make EscrowAccounts attributes private Signed-off-by: Alexis Asseman --- common/src/escrow_accounts.rs | 52 +++++++++++++++++-- common/src/tap_manager.rs | 21 ++------ tap-agent/src/tap/escrow_adapter.rs | 49 +++++++---------- tap-agent/src/tap/mod.rs | 4 +- tap-agent/src/tap/receipt_checks_adapter.rs | 6 +-- tap-agent/src/tap/receipt_storage_adapter.rs | 26 +++------- ...sender_allocation_relationships_manager.rs | 20 +++---- 7 files changed, 88 insertions(+), 90 deletions(-) diff --git a/common/src/escrow_accounts.rs b/common/src/escrow_accounts.rs index 1809e2c7..1bd2818f 100644 --- a/common/src/escrow_accounts.rs +++ b/common/src/escrow_accounts.rs @@ -1,7 +1,10 @@ // Copyright 2023-, GraphOps and Semiotic Labs. // SPDX-License-Identifier: Apache-2.0 -use std::{collections::HashMap, time::Duration}; +use std::{ + collections::{HashMap, HashSet}, + time::Duration, +}; use alloy_primitives::Address; use anyhow::Result; @@ -15,9 +18,9 @@ use crate::prelude::{Query, SubgraphClient}; #[derive(Clone, Debug, Default, PartialEq, Eq)] pub struct EscrowAccounts { - pub senders_balances: HashMap, - pub signers_to_senders: HashMap, - pub senders_to_signers: HashMap>, + senders_balances: HashMap, + signers_to_senders: HashMap, + senders_to_signers: HashMap>, } impl EscrowAccounts { @@ -36,6 +39,47 @@ impl EscrowAccounts { senders_to_signers, } } + + pub fn get_signers_for_sender(&self, sender: &Address) -> Result> { + self.senders_to_signers + .get(sender) + .filter(|signers| !signers.is_empty()) + .ok_or(anyhow::format_err!( + "No signers found for sender {}.", + sender + )) + .map(|signers| signers.to_owned()) + } + + pub fn get_sender_for_signer(&self, signer: &Address) -> Result
{ + self.signers_to_senders + .get(signer) + .ok_or(anyhow::format_err!( + "Sender not found for receipt signer {}.", + signer + )) + .copied() + } + + pub fn get_balance_for_sender(&self, sender: &Address) -> Result { + self.senders_balances + .get(sender) + .ok_or(anyhow::format_err!( + "Balance not found for sender {}.", + sender + )) + .copied() + } + + pub fn get_balance_for_signer(&self, signer: &Address) -> Result { + self.get_sender_for_signer(signer) + .and_then(|sender| self.get_balance_for_sender(&sender)) + .map_err(|e| anyhow::format_err!("Could not get balance for signer {}: {}", signer, e)) + } + + pub fn get_senders(&self) -> HashSet
{ + self.senders_balances.keys().copied().collect() + } } pub fn escrow_accounts( diff --git a/common/src/tap_manager.rs b/common/src/tap_manager.rs index 8df27c8c..28da0b3c 100644 --- a/common/src/tap_manager.rs +++ b/common/src/tap_manager.rs @@ -69,25 +69,14 @@ impl TapManager { let escrow_accounts = self.escrow_accounts.value_immediate().unwrap_or_default(); - let receipt_sender = escrow_accounts - .signers_to_senders - .get(&receipt_signer) - .ok_or_else(|| { - anyhow!( - "Receipt signer `{}` is not eligible for this indexer", - receipt_signer - ) - })?; - if !escrow_accounts - .senders_balances - .get(receipt_sender) - .map_or(false, |balance| balance > &U256::zero()) + .get_balance_for_signer(&receipt_signer) + .map_or(false, |balance| balance > U256::zero()) { - return Err(anyhow!( + anyhow::bail!( "Receipt sender `{}` is not eligible for this indexer", - receipt_signer - )); + receipt_signer, + ); } // TODO: consider doing this in another async task to avoid slowing down the paid query flow. diff --git a/tap-agent/src/tap/escrow_adapter.rs b/tap-agent/src/tap/escrow_adapter.rs index bbbb1f1f..716af393 100644 --- a/tap-agent/src/tap/escrow_adapter.rs +++ b/tap-agent/src/tap/escrow_adapter.rs @@ -53,27 +53,16 @@ impl EscrowAdapterTrait for EscrowAdapter { error: format!("Could not get escrow accounts from eventual: {:?}.", e), })?; - let sender = - escrow_accounts - .signers_to_senders - .get(&sender) - .ok_or(AdapterError::AdapterError { - error: format!( - "Sender {} not found for receipt signer, could not get available escrow.", - sender - ) - .to_string(), - })?; + let sender = escrow_accounts + .get_sender_for_signer(&sender) + .map_err(|e| AdapterError::AdapterError { + error: format!("{}", e).to_string(), + })?; let balance = escrow_accounts - .senders_balances - .get(sender) - .ok_or(AdapterError::AdapterError { - error: format!( - "Sender {} not found in escrow balances map, could not get available escrow.", - sender - ) - .to_string(), + .get_balance_for_sender(&sender) + .map_err(|e| AdapterError::AdapterError { + error: format!("Could not get available escrow: {}", e).to_string(), })? .to_owned(); let balance: u128 = balance.try_into().map_err(|_| AdapterError::AdapterError { @@ -89,7 +78,7 @@ impl EscrowAdapterTrait for EscrowAdapter { .sender_pending_fees .read() .await - .get(sender) + .get(&sender) .copied() .unwrap_or(0); Ok(balance - fees) @@ -106,17 +95,15 @@ impl EscrowAdapterTrait for EscrowAdapter { let current_available_escrow = self.get_available_escrow(sender).await?; - let sender = - escrow_accounts - .signers_to_senders - .get(&sender) - .ok_or(AdapterError::AdapterError { - error: format!( - "Sender {} not found for receipt signer, could not get available escrow.", - sender - ) - .to_string(), - })?; + let sender = escrow_accounts + .get_sender_for_signer(&sender) + .map_err(|e| AdapterError::AdapterError { + error: format!( + "Could not get available escrow for receipt signer {}: {}", + sender, e + ) + .to_string(), + })?; let mut fees_write = self.sender_pending_fees.write().await; let fees = fees_write.entry(sender.to_owned()).or_insert(0); diff --git a/tap-agent/src/tap/mod.rs b/tap-agent/src/tap/mod.rs index f40e5a0d..11f2f647 100644 --- a/tap-agent/src/tap/mod.rs +++ b/tap-agent/src/tap/mod.rs @@ -24,9 +24,7 @@ async fn signers_trimmed( .value() .await .map_err(|e| anyhow!("Error while getting escrow accounts: {:?}", e))? - .senders_to_signers - .get(&sender) - .ok_or(anyhow!("No signers found for sender {}.", sender))? + .get_signers_for_sender(&sender)? .iter() .map(|s| s.to_string().trim_start_matches("0x").to_owned()) .collect::>(); diff --git a/tap-agent/src/tap/receipt_checks_adapter.rs b/tap-agent/src/tap/receipt_checks_adapter.rs index 0e2a97d2..ec3687fa 100644 --- a/tap-agent/src/tap/receipt_checks_adapter.rs +++ b/tap-agent/src/tap/receipt_checks_adapter.rs @@ -126,10 +126,8 @@ impl ReceiptChecksAdapterTrait for ReceiptChecksAdapter { })?; Ok(escrow_accounts - .signers_to_senders - .get(&sender_id) - .and_then(|sender| escrow_accounts.senders_balances.get(sender)) - .map(|balance| *balance > U256::from(0)) + .get_balance_for_signer(&sender_id) + .map(|balance| balance > U256::from(0)) .unwrap_or(false)) } } diff --git a/tap-agent/src/tap/receipt_storage_adapter.rs b/tap-agent/src/tap/receipt_storage_adapter.rs index f54a03f9..71d10e8b 100644 --- a/tap-agent/src/tap/receipt_storage_adapter.rs +++ b/tap-agent/src/tap/receipt_storage_adapter.rs @@ -217,12 +217,7 @@ mod test { received_receipt_vec: &[(u64, ReceivedReceipt)], range: R, ) -> Result<()> { - let signers_to_senders = escrow_accounts - .value() - .await - .unwrap() - .signers_to_senders - .to_owned(); + let escrow_accounts_snapshot = escrow_accounts.value().await.unwrap(); // Filtering the received receipts by timestamp range let received_receipt_vec: Vec<(u64, ReceivedReceipt)> = received_receipt_vec @@ -231,14 +226,14 @@ mod test { range.contains(&received_receipt.signed_receipt().message.timestamp_ns) && (received_receipt.signed_receipt().message.allocation_id == storage_adapter.allocation_id) - && (signers_to_senders - .get( + && (escrow_accounts_snapshot + .get_sender_for_signer( &received_receipt .signed_receipt() .recover_signer(&TAP_EIP712_DOMAIN_SEPARATOR) .unwrap(), ) - .map_or(false, |v| *v == storage_adapter.sender)) + .map_or(false, |v| v == storage_adapter.sender)) }) .cloned() .collect(); @@ -280,12 +275,7 @@ mod test { received_receipt_vec: &[ReceivedReceipt], range: R, ) -> Result<()> { - let signers_to_senders = escrow_accounts - .value() - .await - .unwrap() - .signers_to_senders - .to_owned(); + let escrow_accounts_snapshot = escrow_accounts.value().await.unwrap(); // Storing the receipts let mut received_receipt_id_vec = Vec::new(); @@ -310,14 +300,14 @@ mod test { .filter(|(_, received_receipt)| { if (received_receipt.signed_receipt().message.allocation_id == storage_adapter.allocation_id) - && (signers_to_senders - .get( + && (escrow_accounts_snapshot + .get_sender_for_signer( &received_receipt .signed_receipt() .recover_signer(&TAP_EIP712_DOMAIN_SEPARATOR) .unwrap(), ) - .map_or(false, |v| *v == storage_adapter.sender)) + .map_or(false, |v| v == storage_adapter.sender)) { !range.contains(&received_receipt.signed_receipt().message.timestamp_ns) } else { diff --git a/tap-agent/src/tap/sender_allocation_relationships_manager.rs b/tap-agent/src/tap/sender_allocation_relationships_manager.rs index 19dccab2..dd2a67a2 100644 --- a/tap-agent/src/tap/sender_allocation_relationships_manager.rs +++ b/tap-agent/src/tap/sender_allocation_relationships_manager.rs @@ -87,11 +87,7 @@ impl SenderAllocationRelationshipsManager { .value() .await .expect("Should get indexer allocations from Eventual"), - escrow_accounts_snapshot - .senders_balances - .keys() - .copied() - .collect(), + escrow_accounts_snapshot.get_senders(), ) .await .expect("Should be able to update sender_allocation_relationships"); @@ -132,9 +128,7 @@ impl SenderAllocationRelationshipsManager { let signer = Address::from_str(&row.signer_address) .expect("signer_address should be a valid address"); let sender = escrow_accounts_snapshot - .signers_to_senders - .get(&signer) - .copied() + .get_sender_for_signer(&signer) .expect("should be able to get sender from signer"); // Only create a SenderAllocationRelationship if it doesn't exist yet. @@ -190,7 +184,7 @@ impl SenderAllocationRelationshipsManager { Self::update_sender_allocation_relationships( &inner, indexer_allocations, - escrow_accounts.senders_balances.keys().copied().collect(), + escrow_accounts.get_senders(), ) .await .unwrap_or_else(|e| { @@ -234,13 +228,11 @@ impl SenderAllocationRelationshipsManager { .value() .await .expect("should be able to get escrow accounts") - .signers_to_senders - .get(&new_receipt_notification.signer_address) - .copied(); + .get_sender_for_signer(&new_receipt_notification.signer_address); let sender_address = match sender_address { - Some(sender_address) => sender_address, - None => { + Ok(sender_address) => sender_address, + Err(_) => { error!( "No sender address found for receipt signer address {}. \ This should not happen.", From 223fef3d6b7e4d1b140e9aad0ed6dc9c14bf6965 Mon Sep 17 00:00:00 2001 From: Alexis Asseman Date: Tue, 16 Jan 2024 15:13:34 -0800 Subject: [PATCH 6/6] refactor: create EscrowAccountsError Signed-off-by: Alexis Asseman --- common/src/escrow_accounts.rs | 44 ++++++++------ tap-agent/src/tap/escrow_adapter.rs | 90 +++++++++++++---------------- 2 files changed, 66 insertions(+), 68 deletions(-) diff --git a/common/src/escrow_accounts.rs b/common/src/escrow_accounts.rs index 1bd2818f..660a2714 100644 --- a/common/src/escrow_accounts.rs +++ b/common/src/escrow_accounts.rs @@ -11,11 +11,22 @@ use anyhow::Result; use ethers_core::types::U256; use eventuals::{timer, Eventual, EventualExt}; use serde::Deserialize; +use thiserror::Error; use tokio::time::sleep; use tracing::{error, warn}; use crate::prelude::{Query, SubgraphClient}; +#[derive(Error, Debug)] +pub enum EscrowAccountsError { + #[error("No signer found for sender {sender}")] + NoSignerFound { sender: Address }, + #[error("No balance found for sender {sender}")] + NoBalanceFound { sender: Address }, + #[error("No sender found for signer {signer}")] + NoSenderFound { signer: Address }, +} + #[derive(Clone, Debug, Default, PartialEq, Eq)] pub struct EscrowAccounts { senders_balances: HashMap, @@ -40,41 +51,40 @@ impl EscrowAccounts { } } - pub fn get_signers_for_sender(&self, sender: &Address) -> Result> { + pub fn get_signers_for_sender( + &self, + sender: &Address, + ) -> Result, EscrowAccountsError> { self.senders_to_signers .get(sender) .filter(|signers| !signers.is_empty()) - .ok_or(anyhow::format_err!( - "No signers found for sender {}.", - sender - )) + .ok_or(EscrowAccountsError::NoSignerFound { + sender: sender.to_owned(), + }) .map(|signers| signers.to_owned()) } - pub fn get_sender_for_signer(&self, signer: &Address) -> Result
{ + pub fn get_sender_for_signer(&self, signer: &Address) -> Result { self.signers_to_senders .get(signer) - .ok_or(anyhow::format_err!( - "Sender not found for receipt signer {}.", - signer - )) + .ok_or(EscrowAccountsError::NoSenderFound { + signer: signer.to_owned(), + }) .copied() } - pub fn get_balance_for_sender(&self, sender: &Address) -> Result { + pub fn get_balance_for_sender(&self, sender: &Address) -> Result { self.senders_balances .get(sender) - .ok_or(anyhow::format_err!( - "Balance not found for sender {}.", - sender - )) + .ok_or(EscrowAccountsError::NoBalanceFound { + sender: sender.to_owned(), + }) .copied() } - pub fn get_balance_for_signer(&self, signer: &Address) -> Result { + pub fn get_balance_for_signer(&self, signer: &Address) -> Result { self.get_sender_for_signer(signer) .and_then(|sender| self.get_balance_for_sender(&sender)) - .map_err(|e| anyhow::format_err!("Could not get balance for signer {}: {}", signer, e)) } pub fn get_senders(&self) -> HashSet
{ diff --git a/tap-agent/src/tap/escrow_adapter.rs b/tap-agent/src/tap/escrow_adapter.rs index 716af393..0b725774 100644 --- a/tap-agent/src/tap/escrow_adapter.rs +++ b/tap-agent/src/tap/escrow_adapter.rs @@ -27,8 +27,30 @@ pub struct EscrowAdapter { #[derive(Debug, Error)] pub enum AdapterError { - #[error("Error in EscrowAdapter: {error}")] - AdapterError { error: String }, + #[error("Could not get escrow accounts from eventual")] + EscrowEventualError { error: String }, + + #[error("Could not get available escrow for sender")] + AvailableEscrowError(#[from] indexer_common::escrow_accounts::EscrowAccountsError), + + #[error("Sender {sender} escrow balance is too large to fit in u128, could not get available escrow.")] + BalanceTooLarge { sender: Address }, + + #[error("Sender {sender} does not have enough escrow to subtract {fees} from {balance}.")] + NotEnoughEscrow { + sender: Address, + fees: u128, + balance: u128, + }, +} + +// Conversion from eventuals::error::Closed to AdapterError::EscrowEventualError +impl From for AdapterError { + fn from(e: eventuals::error::Closed) -> Self { + AdapterError::EscrowEventualError { + error: format!("{:?}", e), + } + } } impl EscrowAdapter { @@ -45,34 +67,16 @@ impl EscrowAdapterTrait for EscrowAdapter { type AdapterError = AdapterError; async fn get_available_escrow(&self, sender: Address) -> Result { - let escrow_accounts = - self.escrow_accounts - .value() - .await - .map_err(|e| AdapterError::AdapterError { - error: format!("Could not get escrow accounts from eventual: {:?}.", e), - })?; - - let sender = escrow_accounts - .get_sender_for_signer(&sender) - .map_err(|e| AdapterError::AdapterError { - error: format!("{}", e).to_string(), - })?; + let escrow_accounts = self.escrow_accounts.value().await?; - let balance = escrow_accounts - .get_balance_for_sender(&sender) - .map_err(|e| AdapterError::AdapterError { - error: format!("Could not get available escrow: {}", e).to_string(), - })? - .to_owned(); - let balance: u128 = balance.try_into().map_err(|_| AdapterError::AdapterError { - error: format!( - "Sender {} escrow balance is too large to fit in u128, \ - could not get available escrow.", - sender - ) - .to_string(), - })?; + let sender = escrow_accounts.get_sender_for_signer(&sender)?; + + let balance = escrow_accounts.get_balance_for_sender(&sender)?.to_owned(); + let balance: u128 = balance + .try_into() + .map_err(|_| AdapterError::BalanceTooLarge { + sender: sender.to_owned(), + })?; let fees = self .sender_pending_fees @@ -85,35 +89,19 @@ impl EscrowAdapterTrait for EscrowAdapter { } async fn subtract_escrow(&self, sender: Address, value: u128) -> Result<(), AdapterError> { - let escrow_accounts = - self.escrow_accounts - .value() - .await - .map_err(|e| AdapterError::AdapterError { - error: format!("Could not get escrow accounts from eventual: {:?}.", e), - })?; + let escrow_accounts = self.escrow_accounts.value().await?; let current_available_escrow = self.get_available_escrow(sender).await?; - let sender = escrow_accounts - .get_sender_for_signer(&sender) - .map_err(|e| AdapterError::AdapterError { - error: format!( - "Could not get available escrow for receipt signer {}: {}", - sender, e - ) - .to_string(), - })?; + let sender = escrow_accounts.get_sender_for_signer(&sender)?; let mut fees_write = self.sender_pending_fees.write().await; let fees = fees_write.entry(sender.to_owned()).or_insert(0); if current_available_escrow < value { - return Err(AdapterError::AdapterError { - error: format!( - "Sender {} does not have enough escrow to subtract {} from {}.", - sender, value, *fees - ) - .to_string(), + return Err(AdapterError::NotEnoughEscrow { + sender: sender.to_owned(), + fees: value, + balance: current_available_escrow, }); } *fees += value;