diff --git a/CHANGELOG.md b/CHANGELOG.md index 530cccb175..9cd5fe30df 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,13 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). The crates in this repository do not adhere to [Semantic Versioning](https://semver.org/spec/v2.0.0.html) at this time. +## [5.0.0] + +### Changed + +- Fog-view now imposes a limit on how many user events it will return to the user, to ensure that we can avoid exceeding + grpc maximums. The new flag `may_have_more_user_events` is set when this limit is reached, so that clients can know to retry. ([#3151]) + ## [4.1.0] ### Changed diff --git a/fog/api/proto/view.proto b/fog/api/proto/view.proto index 023299b21e..586df242c7 100644 --- a/fog/api/proto/view.proto +++ b/fog/api/proto/view.proto @@ -145,6 +145,11 @@ message QueryResponse { /// This can be used by the client as a hint when choosing cryptonote mixin indices. /// This field doesn't have the same "cursor" semantics as the other fields. uint64 last_known_block_cumulative_txo_count = 9; + + /// If true, this means that due to limits, we could not return all the requested + /// user events in one response. Clients cannot compute an accurate balance check + /// until they have received all relevant user events. + bool may_have_more_user_events = 10; } /// A record of an Rng created by a fog ingest enclave. diff --git a/fog/api/tests/fog_types.rs b/fog/api/tests/fog_types.rs index 49a9be107d..9fa4c47a4e 100644 --- a/fog/api/tests/fog_types.rs +++ b/fog/api/tests/fog_types.rs @@ -134,6 +134,7 @@ fn fog_view_query_response_round_trip() { .collect(), last_known_block_count: rng.next_u32() as u64, last_known_block_cumulative_txo_count: rng.next_u32() as u64, + may_have_more_user_events: true, }; round_trip_message::( &test_val, @@ -157,6 +158,7 @@ fn fog_view_query_response_round_trip() { .collect(), last_known_block_count: rng.next_u32() as u64, last_known_block_cumulative_txo_count: rng.next_u32() as u64, + may_have_more_user_events: true, }; round_trip_message::( &test_val, @@ -187,6 +189,7 @@ fn fog_view_query_response_round_trip() { .collect(), last_known_block_count: rng.next_u32() as u64, last_known_block_cumulative_txo_count: rng.next_u32() as u64, + may_have_more_user_events: true, }; round_trip_message::( &test_val, diff --git a/fog/recovery_db_iface/src/lib.rs b/fog/recovery_db_iface/src/lib.rs index df7d67c536..a5accb22c8 100644 --- a/fog/recovery_db_iface/src/lib.rs +++ b/fog/recovery_db_iface/src/lib.rs @@ -206,14 +206,15 @@ pub trait RecoveryDb { /// /// Arguments: /// * start_after_event_id: The last event id the user has received. + /// * max_num_events: The maximum number of user events to return. /// /// Returns: - /// * List of found events, and higehst event id in the database (to be used - /// as - /// start_after_event_id in the next query). + /// * List of found events, and highest event id in the database (to be used + /// as start_after_event_id in the next query). fn search_user_events( &self, start_from_user_event_id: i64, + max_num_events: usize, ) -> Result<(Vec, i64), Self::Error>; /// Get any TxOutSearchResults corresponding to given search keys. diff --git a/fog/sql_recovery_db/src/lib.rs b/fog/sql_recovery_db/src/lib.rs index 3c009f9f51..caa5fbe101 100644 --- a/fog/sql_recovery_db/src/lib.rs +++ b/fog/sql_recovery_db/src/lib.rs @@ -713,6 +713,7 @@ impl SqlRecoveryDb { fn search_user_events_retriable( &self, start_from_user_event_id: i64, + max_num_events: usize, ) -> Result<(Vec, i64), Error> { // Early return if start_from_user_event_id is max if start_from_user_event_id == i64::MAX { @@ -737,6 +738,10 @@ impl SqlRecoveryDb { // NOTE: sql auto increment columns start from 1, so "start_from_user_event_id = 0" // will capture everything .filter(schema::user_events::dsl::id.gt(start_from_user_event_id)) + // Limit the number of responses we can get + .limit(max_num_events as i64) + // Order by id + .order(schema::user_events::dsl::id.asc()) // Get only the fields that we need .select(( // Fields for every event type @@ -1400,9 +1405,10 @@ impl RecoveryDb for SqlRecoveryDb { fn search_user_events( &self, start_from_user_event_id: i64, + max_num_events: usize, ) -> Result<(Vec, i64), Self::Error> { our_retry(self.get_retries(), || { - self.search_user_events_retriable(start_from_user_event_id) + self.search_user_events_retriable(start_from_user_event_id, max_num_events) }) } @@ -1633,6 +1639,8 @@ mod tests { use mc_util_from_random::FromRandom; use rand::{rngs::StdRng, thread_rng, SeedableRng}; + const MAX_USER_EVENTS: usize = 10_000; + #[test_with_logger] fn test_new_ingest_invocation(logger: Logger) { let mut rng: StdRng = SeedableRng::from_seed([123u8; 32]); @@ -1824,7 +1832,8 @@ mod tests { assert_eq!(ranges[1].last_ingested_block, None); // Ensure we do not have any decommissioning events. - let (events, next_start_from_user_event_id) = db.search_user_events(0).unwrap(); + let (events, next_start_from_user_event_id) = + db.search_user_events(0, MAX_USER_EVENTS).unwrap(); assert_eq!( events .iter() @@ -1851,7 +1860,7 @@ mod tests { // We should have one decommissioning event. let (events, next_start_from_user_event_id) = db - .search_user_events(next_start_from_user_event_id) + .search_user_events(next_start_from_user_event_id, MAX_USER_EVENTS) .unwrap(); assert_eq!(events.len(), 1); assert_eq!( @@ -1899,7 +1908,7 @@ mod tests { // We should have one decommissioning event and one new ingest invocation event. let (events, _next_start_from_user_event_id) = db - .search_user_events(next_start_from_user_event_id) + .search_user_events(next_start_from_user_event_id, MAX_USER_EVENTS) .unwrap(); assert_eq!(events.len(), 2); assert_eq!( @@ -2144,7 +2153,7 @@ mod tests { db.report_lost_ingress_key(ingress_key2).unwrap(); // Search for events and verify the results. - let (events, _) = db.search_user_events(0).unwrap(); + let (events, _) = db.search_user_events(0, MAX_USER_EVENTS).unwrap(); assert_eq!( events, vec![ @@ -2182,10 +2191,11 @@ mod tests { // Searching with a start_from_user_id that is higher than the highest available // one should return nothing. - let (_events, next_start_from_user_event_id) = db.search_user_events(0).unwrap(); + let (_events, next_start_from_user_event_id) = + db.search_user_events(0, MAX_USER_EVENTS).unwrap(); let (events, next_start_from_user_event_id2) = db - .search_user_events(next_start_from_user_event_id) + .search_user_events(next_start_from_user_event_id, MAX_USER_EVENTS) .unwrap(); assert_eq!(events.len(), 0); assert_eq!( @@ -2194,7 +2204,7 @@ mod tests { ); let (events, next_start_from_user_event_id2) = db - .search_user_events(next_start_from_user_event_id + 1) + .search_user_events(next_start_from_user_event_id + 1, MAX_USER_EVENTS) .unwrap(); assert_eq!(events.len(), 0); assert_eq!( diff --git a/fog/test_infra/src/db_tests.rs b/fog/test_infra/src/db_tests.rs index 117a607dd2..fb02a38f0e 100644 --- a/fog/test_infra/src/db_tests.rs +++ b/fog/test_infra/src/db_tests.rs @@ -21,6 +21,8 @@ pub fn get_num_blocks(db: &impl RecoveryDb) -> u64 { .unwrap_or(0) } +const USER_EVENT_LIMIT: usize = 1000; + /// Exercise new recovery db apis and check the results /// - Add random blocks and get tx's using new get txs API, check for NotFound /// result with junk queries @@ -49,8 +51,9 @@ pub fn recovery_db_smoke_tests_new_apis( ); // Test that they have no rng records when the cursor value is up-to-date - let (user_events, _next_start_from_user_event_id) = - db.search_user_events(start_from_user_event_id).unwrap(); + let (user_events, _next_start_from_user_event_id) = db + .search_user_events(start_from_user_event_id, USER_EVENT_LIMIT) + .unwrap(); let has_rng_events = user_events .iter() .any(|event| matches!(event, FogUserEvent::NewRngRecord(_))); @@ -74,8 +77,9 @@ pub fn recovery_db_smoke_tests_new_apis( // Test that the user can see them { - let (user_events, next_start_from_user_event_id) = - db.search_user_events(start_from_user_event_id).unwrap(); + let (user_events, next_start_from_user_event_id) = db + .search_user_events(start_from_user_event_id, USER_EVENT_LIMIT) + .unwrap(); let num_rng_events = user_events .iter() .filter(|event| matches!(event, FogUserEvent::NewRngRecord(_))) @@ -113,8 +117,9 @@ pub fn recovery_db_smoke_tests_new_apis( // Test that the user can still see those rng records at // start_from_user_event_id. { - let (user_events, next_start_from_user_event_id) = - db.search_user_events(start_from_user_event_id).unwrap(); + let (user_events, next_start_from_user_event_id) = db + .search_user_events(start_from_user_event_id, USER_EVENT_LIMIT) + .unwrap(); assert_rng_record_rows_were_recovered( &user_events[..], &invoc_ids_with_kex_rng_pubkeys[..], @@ -128,8 +133,9 @@ pub fn recovery_db_smoke_tests_new_apis( // Test that the user cannot see those rng records at the updated // start_from_user_event_id { - let (user_events, next_start_from_user_event_id) = - db.search_user_events(start_from_user_event_id).unwrap(); + let (user_events, next_start_from_user_event_id) = db + .search_user_events(start_from_user_event_id, USER_EVENT_LIMIT) + .unwrap(); assert_eq!(user_events.len(), 0); assert_eq!( next_start_from_user_event_id, start_from_user_event_id, @@ -140,7 +146,8 @@ pub fn recovery_db_smoke_tests_new_apis( // Test that if user tries full recovery (cursor = 0) they get 10 rounds worth // of rng records - let (user_events, _next_start_from_user_event_id) = db.search_user_events(0).unwrap(); + let (user_events, _next_start_from_user_event_id) = + db.search_user_events(0, USER_EVENT_LIMIT).unwrap(); let num_rng_events = user_events .iter() .filter(|event| matches!(event, FogUserEvent::NewRngRecord(_))) @@ -238,7 +245,8 @@ pub fn recovery_db_rng_records_decommissioning( db.new_ingress_key(&ingress_key, 0).unwrap(); // We start without any rng record events. - let (user_events, _next_start_from_user_event_id) = db.search_user_events(0).unwrap(); + let (user_events, _next_start_from_user_event_id) = + db.search_user_events(0, USER_EVENT_LIMIT).unwrap(); let has_rng_events = user_events .iter() .any(|event| matches!(event, FogUserEvent::NewRngRecord(_))); @@ -253,7 +261,8 @@ pub fn recovery_db_rng_records_decommissioning( // Test that user has rng record event now let test_rows0 = vec![kex_rng_pubkey1]; - let (user_events, next_start_from_user_event_id) = db.search_user_events(0).unwrap(); + let (user_events, next_start_from_user_event_id) = + db.search_user_events(0, USER_EVENT_LIMIT).unwrap(); let rng_records: Vec = user_events .iter() .filter_map(|event| { @@ -274,7 +283,7 @@ pub fn recovery_db_rng_records_decommissioning( // Test that user has no new rngs after cursor update let (user_events, _next_start_from_user_event_id) = db - .search_user_events(next_start_from_user_event_id) + .search_user_events(next_start_from_user_event_id, USER_EVENT_LIMIT) .unwrap(); assert_eq!(user_events, vec![]); @@ -294,7 +303,7 @@ pub fn recovery_db_rng_records_decommissioning( let test_rows1 = vec![kex_rng_pubkey2]; let (user_events, _next_start_from_user_event_id) = db - .search_user_events(next_start_from_user_event_id) + .search_user_events(next_start_from_user_event_id, USER_EVENT_LIMIT) .unwrap(); let rng_records: Vec = user_events .iter() @@ -315,7 +324,8 @@ pub fn recovery_db_rng_records_decommissioning( assert_eq!(10, rng_records[0].start_block); // Check that if starting at 0 we see both rngs - let (user_events, _next_start_from_user_event_id) = db.search_user_events(0).unwrap(); + let (user_events, _next_start_from_user_event_id) = + db.search_user_events(0, USER_EVENT_LIMIT).unwrap(); let rng_records: Vec = user_events .iter() .filter_map(|event| { @@ -393,7 +403,8 @@ pub fn recovery_db_rng_records_decommissioning( assert_eq!(ingestable_ranges[1].last_ingested_block, None); // Check if we can see an event for that. - let (user_events, _next_start_from_user_event_id) = db.search_user_events(0).unwrap(); + let (user_events, _next_start_from_user_event_id) = + db.search_user_events(0, USER_EVENT_LIMIT).unwrap(); let decommissioned_invocs: Vec<_> = user_events .iter() .filter_map(|event| { @@ -459,7 +470,8 @@ pub fn recovery_db_rng_records_decommissioning( assert!(!ingestable_ranges[2].decommissioned); assert_eq!(ingestable_ranges[2].last_ingested_block, None); - let (user_events, _next_start_from_user_event_id) = db.search_user_events(0).unwrap(); + let (user_events, _next_start_from_user_event_id) = + db.search_user_events(0, USER_EVENT_LIMIT).unwrap(); let decommissioned_invocs: Vec<_> = user_events .iter() .filter_map(|event| { diff --git a/fog/test_infra/src/mock_client.rs b/fog/test_infra/src/mock_client.rs index feb123b55b..bfe9988702 100644 --- a/fog/test_infra/src/mock_client.rs +++ b/fog/test_infra/src/mock_client.rs @@ -31,8 +31,11 @@ impl FogViewConnection for PassThroughViewClient { start_from_block_index: u64, search_keys: Vec>, ) -> Result { - let (user_events, next_start_from_user_event_id) = - self.db.search_user_events(start_from_user_event_id)?; + const USER_EVENT_LIMIT: usize = 10_000; + let (user_events, next_start_from_user_event_id) = self + .db + .search_user_events(start_from_user_event_id, USER_EVENT_LIMIT)?; + let may_have_more_user_events = user_events.len() >= USER_EVENT_LIMIT; let highest_known_block_count = self .db @@ -75,6 +78,7 @@ impl FogViewConnection for PassThroughViewClient { tx_out_search_results: Default::default(), last_known_block_count: highest_known_block_count, last_known_block_cumulative_txo_count: cumulative_txo_count, + may_have_more_user_events, }; resp.tx_out_search_results = self.db.get_tx_outs(start_from_block_index, &search_keys)?; diff --git a/fog/types/src/view.rs b/fog/types/src/view.rs index fe1c258ab2..2224aeb8e4 100644 --- a/fog/types/src/view.rs +++ b/fog/types/src/view.rs @@ -93,6 +93,13 @@ pub struct QueryResponse { /// clients sample for mixins. #[prost(uint64, tag = "9")] pub last_known_block_cumulative_txo_count: u64, + + /// If true, this means that due to limits, we could not return all the + /// requested user events in one response. Clients cannot compute an + /// accurate balance check until they have received all relevant user + /// events. + #[prost(bool, tag = "10")] + pub may_have_more_user_events: bool, } /// A record that can be used by the user to produce an Rng shared with fog diff --git a/fog/view/enclave/api/src/lib.rs b/fog/view/enclave/api/src/lib.rs index 9a4ec82dd8..0242b57e25 100644 --- a/fog/view/enclave/api/src/lib.rs +++ b/fog/view/enclave/api/src/lib.rs @@ -49,6 +49,9 @@ pub struct UntrustedQueryResponse { /// The cumulative txo count of the last known block. pub last_known_block_cumulative_txo_count: u64, + + /// True if we might have more user events than the number of events returned in this response. + pub may_have_more_user_events: bool, } /// Represents a serialized request for the view enclave to service diff --git a/fog/view/enclave/impl/src/lib.rs b/fog/view/enclave/impl/src/lib.rs index f6578c2fa0..df29dae704 100644 --- a/fog/view/enclave/impl/src/lib.rs +++ b/fog/view/enclave/impl/src/lib.rs @@ -154,6 +154,7 @@ where last_known_block_count: untrusted_query_response.last_known_block_count, last_known_block_cumulative_txo_count: untrusted_query_response .last_known_block_cumulative_txo_count, + may_have_more_user_events: untrusted_query_response.may_have_more_user_events, }; // Do the txos part, scope lock of e_tx_out_store diff --git a/fog/view/protocol/src/polling.rs b/fog/view/protocol/src/polling.rs index 49403a38e5..6b3e6e45fe 100644 --- a/fog/view/protocol/src/polling.rs +++ b/fog/view/protocol/src/polling.rs @@ -75,7 +75,7 @@ pub trait FogViewConnection { let mut missed_block_ranges = Vec::::new(); // Update seeds, get block count - let mut new_highest_processed_block_count = { + let mut new_highest_processed_block_count = loop { match self .request( user_rng_set.get_next_start_from_user_event_id(), @@ -105,7 +105,10 @@ pub trait FogViewConnection { user_rng_set .set_next_start_from_user_event_id(result.next_start_from_user_event_id); - result.highest_processed_block_count + if result.may_have_more_user_events { + continue; + } + break result.highest_processed_block_count; } } }; diff --git a/fog/view/server/src/config.rs b/fog/view/server/src/config.rs index 5c4e42f86f..a8ae43c027 100644 --- a/fog/view/server/src/config.rs +++ b/fog/view/server/src/config.rs @@ -77,4 +77,11 @@ pub struct MobileAcctViewConfig { /// and should not much harm performance otherwise when loading the DB. #[clap(long, default_value = "1000", env = "MC_BLOCK_QUERY_BATCH_SIZE")] pub block_query_batch_size: usize, + + /// How many user events to request at once when requesting user events from + /// postgres. + /// This limit affects the maximum possible size of a grpc response from the + /// server. + #[clap(long, default_value = "10000", env = "MC_MAX_USER_EVENTS")] + pub max_user_events: usize, } diff --git a/fog/view/server/src/fog_view_service.rs b/fog/view/server/src/fog_view_service.rs index e4905c3956..b10ff5b917 100644 --- a/fog/view/server/src/fog_view_service.rs +++ b/fog/view/server/src/fog_view_service.rs @@ -76,9 +76,13 @@ impl FogViewService { let (user_events, next_start_from_user_event_id) = tracer.in_span("search_user_events", |_cx| { self.db - .search_user_events(query_request_aad.start_from_user_event_id) + .search_user_events( + query_request_aad.start_from_user_event_id, + self.config.max_user_events, + ) .map_err(|e| rpc_internal_error("search_user_events", e, &self.logger)) })?; + let may_have_more_user_events = user_events.len() >= self.config.max_user_events; let ( highest_processed_block_count, @@ -102,6 +106,7 @@ impl FogViewService { highest_processed_block_signature_timestamp, last_known_block_count, last_known_block_cumulative_txo_count, + may_have_more_user_events, }; let result_blob = tracer.in_span("enclave_query", |_cx| { diff --git a/fog/view/server/tests/smoke_tests.rs b/fog/view/server/tests/smoke_tests.rs index 4a543a5f57..923ed6a8c8 100644 --- a/fog/view/server/tests/smoke_tests.rs +++ b/fog/view/server/tests/smoke_tests.rs @@ -71,6 +71,7 @@ fn get_test_environment( client_auth_token_max_lifetime: Default::default(), postgres_config: Default::default(), block_query_batch_size: 2, + max_user_events: 10_000, }; let enclave = SgxViewEnclave::new(