From ab08ea7cd1799167a46bc5ec30585c9043318560 Mon Sep 17 00:00:00 2001 From: Andrew Plaza Date: Wed, 19 Feb 2025 17:27:10 -0500 Subject: [PATCH] wip --- Cargo.lock | 1 - common/Cargo.toml | 3 - common/src/fmt.rs | 38 ++++++++ common/src/lib.rs | 2 + common/src/test.rs | 48 +++++++++- common/src/test/logger.rs | 36 +------- dev/test-wasm-interactive | 1 + xmtp_api/src/mls.rs | 1 - xmtp_api_http/src/error.rs | 2 +- xmtp_api_http/src/http_stream.rs | 14 ++- xmtp_mls/src/client.rs | 5 -- xmtp_mls/src/groups/mls_sync.rs | 3 - xmtp_mls/src/groups/mod.rs | 9 +- xmtp_mls/src/lib.rs | 5 +- .../storage/encrypted_store/group_intent.rs | 47 ++++++---- xmtp_mls/src/subscriptions/stream_all.rs | 90 +++++++++++-------- xmtp_mls/src/subscriptions/stream_messages.rs | 66 +++----------- xmtp_mls/src/utils/test/mod.rs | 12 +++ 18 files changed, 201 insertions(+), 182 deletions(-) create mode 100644 common/src/fmt.rs diff --git a/Cargo.lock b/Cargo.lock index 7b74b4113..fca660fcb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7506,7 +7506,6 @@ dependencies = [ "futures", "getrandom", "gloo-timers 0.3.0", - "hex", "js-sys", "once_cell", "parking_lot 0.12.3", diff --git a/common/Cargo.toml b/common/Cargo.toml index f2a78dfbe..bb8e2cfea 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -15,7 +15,6 @@ tracing.workspace = true web-time.workspace = true xmtp_cryptography.workspace = true -hex = { workspace = true, optional = true } # optional once_cell = { workspace = true, optional = true } parking_lot = { workspace = true, optional = true } @@ -52,7 +51,6 @@ tracing-subscriber = { workspace = true, features = [ ] } parking_lot.workspace = true once_cell.workspace = true -hex.workspace = true [target.'cfg(target_arch = "wasm32")'.dev-dependencies] tokio = { workspace = true, features = ["time", "macros", "rt", "sync"] } @@ -78,7 +76,6 @@ test-utils = [ "dep:console_error_panic_hook", "dep:tracing-forest", "dep:once_cell", - "dep:hex", ] bench = [ "test-utils", diff --git a/common/src/fmt.rs b/common/src/fmt.rs new file mode 100644 index 000000000..d19c41bfd --- /dev/null +++ b/common/src/fmt.rs @@ -0,0 +1,38 @@ +pub fn truncate_hex(hex_string: impl AsRef) -> String { + let hex_string = hex_string.as_ref(); + // If empty string, return it + if hex_string.is_empty() { + return String::new(); + } + + // Determine if string has 0x prefix + let hex_value = if hex_string.starts_with("0x") { + &hex_string[2..] + } else { + hex_string + }; + + // If the hex value is 8 or fewer chars, return original string + if hex_value.len() <= 8 { + return hex_string.to_string(); + } + + format!( + "0x{}...{}", + &hex_value[..4], + &hex_value[hex_value.len() - 4..] + ) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_long_hex() { + assert_eq!( + truncate_hex("0x5bf078bd83995fe83092d93c5655f059"), + "0x5bf0...f059" + ); + } +} diff --git a/common/src/lib.rs b/common/src/lib.rs index af705252b..ebdcc64fa 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -21,6 +21,8 @@ pub use stream_handles::*; pub mod time; +pub mod fmt; + use rand::{ distributions::{Alphanumeric, DistString}, RngCore, diff --git a/common/src/test.rs b/common/src/test.rs index 4840f1c44..4d6125767 100644 --- a/common/src/test.rs +++ b/common/src/test.rs @@ -7,6 +7,10 @@ use rand::{ use std::{future::Future, sync::OnceLock}; use xmtp_cryptography::utils as crypto_utils; +use once_cell::sync::Lazy; +use parking_lot::Mutex; +use std::collections::HashMap; + #[cfg(not(target_arch = "wasm32"))] pub mod traced_test; #[cfg(not(target_arch = "wasm32"))] @@ -17,9 +21,39 @@ use crate::time::Expired; mod logger; mod macros; -pub use logger::InboxIdReplace; static INIT: OnceLock<()> = OnceLock::new(); +static REPLACE_IDS: Lazy>> = Lazy::new(|| Mutex::new(HashMap::new())); + +/// Replace inbox id in Contextual output with a name (i.e Alix, Bo, etc.) +pub struct InboxIdReplace { + ids: HashMap, +} + +impl InboxIdReplace { + pub fn new() -> Self { + Self { + ids: HashMap::new(), + } + } + + pub fn add(&mut self, id: &str, name: &str) { + self.ids.insert(id.to_string(), name.to_string()); + let mut ids = REPLACE_IDS.lock(); + ids.insert(id.to_string(), name.to_string()); + } +} + +// remove ids for replacement from map on drop +impl Drop for InboxIdReplace { + fn drop(&mut self) { + let mut ids = REPLACE_IDS.lock(); + for (id, _name) in &self.ids { + let _ = ids.remove(id.as_str()); + } + } +} + #[cfg(not(all(target_family = "wasm", target_os = "unknown")))] use tracing_subscriber::{ fmt::{self, format}, @@ -69,7 +103,13 @@ where .fmt_fields({ format::debug_fn(move |writer, field, value| { if field.name() == "message" { - write!(writer, "{:?}", value)?; + let mut message = format!("{:?}", value); + let ids = REPLACE_IDS.lock(); + for (id, name) in ids.iter() { + message = message.replace(id, name); + } + + write!(writer, "{}", message)?; } Ok(()) }) @@ -101,8 +141,8 @@ pub fn logger() { INIT.get_or_init(|| { let filter = EnvFilter::builder() - .with_default_directive(tracing::metadata::LevelFilter::TRACE.into()) - .parse_lossy("debug"); + // .with_default_directive(tracing::metadata::LevelFilter::INFO.into()) + .parse_lossy("xmtp_mls::subscriptions=debug,xmtp_mls::groups=info"); tracing_subscriber::registry() .with(tracing_wasm::WASMLayer::default()) diff --git a/common/src/test/logger.rs b/common/src/test/logger.rs index 3f2b35289..c86e0c70b 100644 --- a/common/src/test/logger.rs +++ b/common/src/test/logger.rs @@ -1,43 +1,9 @@ // copy-paste of https://docs.rs/tracing-forest/latest/src/tracing_forest/printer/pretty.rs.html#62 // but with slight variations -use once_cell::sync::Lazy; -use parking_lot::Mutex; -use std::collections::HashMap; use std::fmt::{self, Write}; use tracing_forest::printer::Formatter; use tracing_forest::tree::{Event, Span, Tree}; -static REPLACE_IDS: Lazy>> = Lazy::new(|| Mutex::new(HashMap::new())); - -/// Replace inbox id in Contextual output with a name (i.e Alix, Bo, etc.) -pub struct InboxIdReplace { - ids: HashMap, -} - -impl InboxIdReplace { - pub fn new() -> Self { - Self { - ids: HashMap::new(), - } - } - - pub fn add(&mut self, id: &str, name: &str) { - self.ids.insert(id.to_string(), name.to_string()); - let mut ids = REPLACE_IDS.lock(); - ids.insert(id.to_string(), name.to_string()); - } -} - -// remove ids for replacement from map on drop -impl Drop for InboxIdReplace { - fn drop(&mut self) { - let mut ids = REPLACE_IDS.lock(); - for (id, _name) in &self.ids { - let _ = ids.remove(id.as_str()); - } - } -} - type IndentVec = Vec; pub struct Contextual; @@ -75,7 +41,7 @@ impl Contextual { let mut message = String::new(); if let Some(msg) = event.message() { message = message + msg; - let ids = REPLACE_IDS.lock(); + let ids = super::REPLACE_IDS.lock(); for (id, name) in ids.iter() { message = message.replace(id, name); } diff --git a/dev/test-wasm-interactive b/dev/test-wasm-interactive index bd5d5e297..09aa96b61 100755 --- a/dev/test-wasm-interactive +++ b/dev/test-wasm-interactive @@ -16,6 +16,7 @@ WASM_BINDGEN_SPLIT_LINKED_MODULES=1 \ WASM_BINDGEN_TEST_ONLY_WEB=1 \ NO_HEADLESS=1 \ cargo test --target wasm32-unknown-unknown --release \ + test_stream_all_messages_does_not_lose_messages \ -p $PACKAGE -- \ --skip xmtp_mls::storage::encrypted_store::group_message::tests::it_cannot_insert_message_without_group \ --skip xmtp_mls::groups::tests::process_messages_abort_on_retryable_error \ diff --git a/xmtp_api/src/mls.rs b/xmtp_api/src/mls.rs index 5c81778e2..c44b9fc1f 100644 --- a/xmtp_api/src/mls.rs +++ b/xmtp_api/src/mls.rs @@ -718,7 +718,6 @@ pub mod tests { #[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)] #[cfg_attr(not(target_arch = "wasm32"), tokio::test)] async fn cooldowns_apply_to_concurrent_fns() { - xmtp_common::logger(); let mut mock_api = MockApiClient::new(); let group_id = vec![1, 2, 3]; diff --git a/xmtp_api_http/src/error.rs b/xmtp_api_http/src/error.rs index eebed4f99..4e1c335c1 100644 --- a/xmtp_api_http/src/error.rs +++ b/xmtp_api_http/src/error.rs @@ -163,7 +163,7 @@ pub enum HttpClientError { HeaderValue(#[from] reqwest::header::InvalidHeaderValue), #[error(transparent)] HeaderName(#[from] reqwest::header::InvalidHeaderName), - #[error(transparent)] + #[error("error deserializing json response {0}")] Json(#[from] serde_json::Error), } diff --git a/xmtp_api_http/src/http_stream.rs b/xmtp_api_http/src/http_stream.rs index 85800276c..96e4a3451 100644 --- a/xmtp_api_http/src/http_stream.rs +++ b/xmtp_api_http/src/http_stream.rs @@ -110,15 +110,12 @@ where { fn on_bytes(bytes: bytes::Bytes, remaining: &mut Vec) -> Result, HttpClientError> { let bytes = &[remaining.as_ref(), bytes.as_ref()].concat(); - let de = Deserializer::from_slice(bytes); + remaining.clear(); + let de = Deserializer::from_slice(&bytes); let mut deser_stream = de.into_iter::>(); let mut items = Vec::new(); - loop { - let item = deser_stream.next(); - if item.is_none() { - break; - } - match item.expect("checked for none;") { + while let Some(item) = deser_stream.next() { + match item { Ok(GrpcResponse::Ok(response)) => items.push(response), Ok(GrpcResponse::SubscriptionItem(item)) => items.push(item.result), Ok(GrpcResponse::Err(e)) => { @@ -126,8 +123,7 @@ where } Err(e) => { if e.is_eof() { - *remaining = (&**bytes)[deser_stream.byte_offset()..].to_vec(); - break; + *remaining = bytes[deser_stream.byte_offset()..].to_vec(); } else { return Err(HttpClientError::from(e)); } diff --git a/xmtp_mls/src/client.rs b/xmtp_mls/src/client.rs index 4d0e540c2..cf9b37fae 100644 --- a/xmtp_mls/src/client.rs +++ b/xmtp_mls/src/client.rs @@ -813,11 +813,6 @@ where conn: &DbConnection, ) -> Result, ClientError> { let id_cursor = conn.get_last_cursor_for_id(group_id, EntityKind::Group)?; - tracing::info!( - "querying group messages from cursor = {}, group = {}", - id_cursor, - hex::encode(group_id) - ); let messages = self .api_client diff --git a/xmtp_mls/src/groups/mls_sync.rs b/xmtp_mls/src/groups/mls_sync.rs index 902b8fe0e..8cdd201dd 100644 --- a/xmtp_mls/src/groups/mls_sync.rs +++ b/xmtp_mls/src/groups/mls_sync.rs @@ -203,7 +203,6 @@ where // TODO: Should probably be renamed to `sync_with_provider` #[tracing::instrument(skip_all)] pub async fn sync_with_conn(&self, provider: &XmtpOpenMlsProvider) -> Result<(), GroupError> { - tracing::info!("RECEIVING"); let _mutex = self.mutex.lock().await; let mut errors: Vec = vec![]; @@ -1116,12 +1115,10 @@ where #[tracing::instrument(skip_all, level = "debug")] pub(super) async fn receive(&self, provider: &XmtpOpenMlsProvider) -> Result<(), GroupError> { - tracing::info!("RECEIVING"); let messages = self .client .query_group_messages(&self.group_id, provider.conn_ref()) .await?; - tracing::info!("CONTINUING TO PROCESS"); self.process_messages(messages, provider).await?; Ok(()) } diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index aaa763b77..12d31408f 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -478,13 +478,8 @@ impl MlsGroup { // Get the group ID for locking let group_id = self.group_id.clone(); - tracing::info!( - "TRYING TO LOAD MLS GROUP for group_id={}", - hex::encode(&group_id) - ); // Acquire the lock asynchronously let _lock = self.mls_commit_lock.get_lock_async(group_id.clone()).await; - tracing::info!("LOADING GROUP"); // Load the MLS group let mls_group = @@ -493,7 +488,7 @@ impl MlsGroup { .ok_or(StorageError::from(NotFound::GroupById( self.group_id.to_vec(), )))?; - tracing::info!("PERFORM OPERATION"); + // Perform the operation with the MLS group operation(mls_group).await.map_err(Into::into) } @@ -1941,6 +1936,7 @@ pub(crate) mod tests { #[cfg(target_arch = "wasm32")] wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_dedicated_worker); + use crate::groups::scoped_client::ScopedGroupClient; use diesel::connection::SimpleConnection; use diesel::RunQueryDsl; use futures::future::join_all; @@ -1956,7 +1952,6 @@ pub(crate) mod tests { use super::{group_permissions::PolicySet, DMMetadataOptions, MlsGroup}; use crate::groups::group_mutable_metadata::MessageDisappearingSettings; - use crate::groups::scoped_client::ScopedGroupClient; use crate::groups::{ MAX_GROUP_DESCRIPTION_LENGTH, MAX_GROUP_IMAGE_URL_LENGTH, MAX_GROUP_NAME_LENGTH, }; diff --git a/xmtp_mls/src/lib.rs b/xmtp_mls/src/lib.rs index 6a5b561f6..dcb32b38b 100644 --- a/xmtp_mls/src/lib.rs +++ b/xmtp_mls/src/lib.rs @@ -19,7 +19,7 @@ pub mod verified_key_package_v2; pub use client::{Client, Network}; use parking_lot::Mutex; use std::collections::HashMap; -use std::sync::{Arc, LazyLock}; +use std::sync::Arc; use storage::{xmtp_openmls_provider::XmtpOpenMlsProvider, DuplicateItem, StorageError}; use tokio::sync::Mutex as TokioMutex; @@ -84,9 +84,6 @@ pub struct MlsGroupGuard { _permit: tokio::sync::OwnedMutexGuard<()>, } -// Static instance of `GroupCommitLock` -// pub static MLS_COMMIT_LOCK: LazyLock = LazyLock::new(GroupCommitLock::new); - /// Inserts a model to the underlying data store, erroring if it already exists pub trait Store { fn store(&self, into: &StorageConnection) -> Result<(), StorageError>; diff --git a/xmtp_mls/src/storage/encrypted_store/group_intent.rs b/xmtp_mls/src/storage/encrypted_store/group_intent.rs index 4aa661efd..d75691bcb 100644 --- a/xmtp_mls/src/storage/encrypted_store/group_intent.rs +++ b/xmtp_mls/src/storage/encrypted_store/group_intent.rs @@ -7,6 +7,7 @@ use diesel::{ sql_types::Integer, }; use prost::Message; +use xmtp_common::fmt; use super::{ db_connection::DbConnection, @@ -81,30 +82,40 @@ pub struct StoredGroupIntent { impl std::fmt::Debug for StoredGroupIntent { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "StoredGroupIntent {{")?; - write!(f, "id: {} ", self.id)?; - write!(f, "kind: {} ", self.kind)?; - write!(f, "group_id: {} ", hex::encode(&self.group_id))?; - /* write!(f, "data: {} ", hex::encode(&self.data))?; */ - write!(f, "state: {:?} ", self.state)?; + write!(f, "StoredGroupIntent {{ ")?; + write!(f, "id: {}, ", self.id)?; + write!(f, "kind: {}, ", self.kind)?; write!( f, - "payload_hash: {:?} ", - self.payload_hash.as_ref().map(|h| hex::encode(h)) + "group_id: {}, ", + fmt::truncate_hex(&hex::encode(&self.group_id)) )?; - /*write!( + write!(f, "data: {}, ", fmt::truncate_hex(hex::encode(&self.data)))?; + write!(f, "state: {:?}, ", self.state)?; + write!( f, - "post_commit_data: {:?}", - self.post_commit_data.as_ref().map(|d| hex::encode(d)) - )?;*/ - write!(f, "publish_attempts: {:?} ", self.publish_attempts)?; - /*write!( + "payload_hash: {:?}, ", + self.payload_hash + .as_ref() + .map(|h| fmt::truncate_hex(&hex::encode(h))) + )?; + write!( f, - "staged_commit: {:?}", - self.staged_commit.as_ref().map(|c| hex::encode(c)) - )?;*/ + "post_commit_data: {:?}, ", + self.post_commit_data + .as_ref() + .map(|d| fmt::truncate_hex(hex::encode(d))) + )?; + write!(f, "publish_attempts: {:?}, ", self.publish_attempts)?; + write!( + f, + "staged_commit: {:?}, ", + self.staged_commit + .as_ref() + .map(|c| fmt::truncate_hex(hex::encode(c))) + )?; write!(f, "published_in_epoch: {:?} ", self.published_in_epoch)?; - write!(f, "StoredGroupIntent }}")?; + write!(f, " }}")?; Ok(()) } } diff --git a/xmtp_mls/src/subscriptions/stream_all.rs b/xmtp_mls/src/subscriptions/stream_all.rs index c1a2743a3..ca674e5ea 100644 --- a/xmtp_mls/src/subscriptions/stream_all.rs +++ b/xmtp_mls/src/subscriptions/stream_all.rs @@ -114,6 +114,7 @@ mod tests { wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_dedicated_worker); use std::sync::Arc; + use std::time::Duration; use crate::{assert_msg, builder::ClientBuilder, groups::GroupMetadataOptions}; use xmtp_cryptography::utils::generate_local_wallet; @@ -274,8 +275,23 @@ mod tests { assert_msg!(stream, "second"); } + use std::collections::HashMap; + fn find_duplicates_with_count(strings: &[String]) -> HashMap<&String, usize> { + let mut counts = HashMap::new(); + + // Count occurrences + for string in strings { + *counts.entry(string).or_insert(0) += 1; + } + + // Filter to keep only strings that appear more than once + counts.retain(|_, count| *count > 1); + + counts + } + #[wasm_bindgen_test(unsupported = tokio::test(flavor = "multi_thread"))] - #[cfg_attr(target_arch = "wasm32", ignore)] + // #[cfg_attr(target_arch = "wasm32", ignore)] async fn test_stream_all_messages_does_not_lose_messages() { xmtp_common::logger(); let mut replace = xmtp_common::InboxIdReplace::new(); @@ -302,20 +318,17 @@ mod tests { let provider = bo.store().mls_provider().unwrap(); let bo_group = bo.sync_welcomes(&provider).await.unwrap()[0].clone(); - let stream = caro.stream_all_messages(None).await.unwrap(); + let mut stream = caro.stream_all_messages(None).await.unwrap(); let alix_group_pointer = alix_group.clone(); xmtp_common::spawn(None, async move { - let mut sent = 0; - for i in 0..2 { + for i in 0..10 { let msg = format!("main spam {i}"); alix_group_pointer .send_message(msg.as_bytes()) .await .unwrap(); - sent += 1; - xmtp_common::time::sleep(core::time::Duration::from_micros(100)).await; - tracing::info!("sent {sent}"); + xmtp_common::time::sleep(Duration::from_micros(100)).await; } }); @@ -325,15 +338,11 @@ mod tests { let caro_id = caro.inbox_id().to_string(); xmtp_common::spawn(None, async move { let caro = &caro_id; - for i in 0..2 { + for i in 0..10 { let new_group = eve .create_group(None, GroupMetadataOptions::default()) .unwrap(); new_group.add_members_by_inbox_id(&[caro]).await.unwrap(); - tracing::info!( - "\n\n EVE SENDING {i} to {}\n\n", - hex::encode(&new_group.group_id) - ); let msg = format!("EVE spam {i} from new group"); new_group.send_message(msg.as_bytes()).await.unwrap(); } @@ -343,41 +352,45 @@ mod tests { // this forces our streams to handle resubscribes while receiving lots of messages xmtp_common::spawn(None, async move { let bo_group = &bo_group; - for i in 0..2 { + for i in 0..10 { bo_group - .send_message(format!("msg {i}").as_bytes()) + .send_message(format!("bo msg {i}").as_bytes()) .await .unwrap(); - xmtp_common::time::sleep(core::time::Duration::from_millis(50)).await + xmtp_common::time::sleep(Duration::from_millis(50)).await } }); let mut messages = Vec::new(); - let timeout = if cfg!(target_arch = "wasm32") { 15 } else { 5 }; - let _ = xmtp_common::time::timeout(core::time::Duration::from_secs(timeout), async { - futures::pin_mut!(stream); - loop { - if messages.len() < 6 { - if let Some(Ok(msg)) = stream.next().await { - tracing::info!( - message_id = hex::encode(&msg.id), - sender_inbox_id = msg.sender_inbox_id, - sender_installation_id = hex::encode(&msg.sender_installation_id), - group_id = hex::encode(&msg.group_id), - "GOT MESSAGE {}, text={}", - messages.len(), - String::from_utf8_lossy(msg.decrypted_message_bytes.as_slice()) - ); - messages.push(msg) + let timeout = if cfg!(target_arch = "wasm32") { 10 } else { 10 }; + + loop { + tokio::select! { + Some(msg) = stream.next() => { + match msg { + Ok(m) => messages.push(m), + Err(e) => { + tracing::error!("error in stream test {e}"); + } } - } else { - break; - } + }, + _ = xmtp_common::time::sleep(Duration::from_secs(timeout)) => break + } - }) - .await; + } - assert_eq!(messages.len(), 6); + let msgs = &messages + .iter() + .map(|m| String::from_utf8_lossy(m.decrypted_message_bytes.as_slice()).to_string()) + .collect::>(); + let duplicates = find_duplicates_with_count(&msgs); + /* + for message in messages.iter() { + let m = String::from_utf8_lossy(message.decrypted_message_bytes.as_slice()); + tracing::info!("{}", m); + }*/ + assert!(duplicates.is_empty()); + assert_eq!(messages.len(), 30, "too many messages mean duplicates, too little means missed. Also ensure timeout is sufficient."); } #[wasm_bindgen_test(unsupported = tokio::test(flavor = "multi_thread", worker_threads = 10))] @@ -407,7 +420,7 @@ mod tests { }); let mut messages = Vec::new(); - let _ = xmtp_common::time::timeout(core::time::Duration::from_secs(20), async { + let _ = xmtp_common::time::timeout(Duration::from_secs(20), async { futures::pin_mut!(stream); loop { if messages.len() < 5 { @@ -429,7 +442,6 @@ mod tests { } }) .await; - tracing::info!("Total Messages: {}", messages.len()); assert_eq!(messages.len(), 5); } diff --git a/xmtp_mls/src/subscriptions/stream_messages.rs b/xmtp_mls/src/subscriptions/stream_messages.rs index 8d730a3c1..f0a6cf13c 100644 --- a/xmtp_mls/src/subscriptions/stream_messages.rs +++ b/xmtp_mls/src/subscriptions/stream_messages.rs @@ -1,5 +1,5 @@ use std::{ - collections::{HashMap, VecDeque}, + collections::HashMap, future::Future, pin::Pin, task::{ready, Context, Poll}, @@ -97,7 +97,6 @@ pin_project! { #[pin] state: State<'a, Subscription>, client: &'a C, group_list: HashMap, - drained: VecDeque>>, } } @@ -132,7 +131,7 @@ where let mut group_list = group_list .into_iter() - .map(|group_id| (group_id, 1u64)) + .map(|group_id| (group_id, 0u64)) .collect::>(); let cursors = group_list @@ -164,7 +163,7 @@ where .inspect(|(group_id, cursor)| { tracing::debug!( "subscribed to group {} at {}", - hex::encode(group_id), + xmtp_common::fmt::truncate_hex(&hex::encode(group_id)), cursor ) }) @@ -177,7 +176,6 @@ where client, state: Default::default(), group_list: group_list.into_iter().map(|(g, c)| (g, c.into())).collect(), - drained: VecDeque::new(), }) } @@ -224,42 +222,15 @@ where let stream = client.api().subscribe_group_messages(filters).await?; Ok((stream, new_group, Some(1))) } - _ => { - let msg = client - .api() - .query_latest_group_message(new_group.as_slice()) - .await?; - - let mut cursor = None; - if let Some(m) = msg { - let m = extract_message_v1(m.clone())?; - if let Some(new) = filters.iter_mut().find(|f| f.group_id == new_group) { - new.id_cursor = Some(m.id); - cursor = Some(m.id); - } + c => { + if let Some(new) = filters.iter_mut().find(|f| f.group_id == new_group) { + new.id_cursor = Some(c as u64); } let stream = client.api().subscribe_group_messages(filters).await?; - Ok((stream, new_group, cursor)) + Ok((stream, new_group, Some(c as u64))) } } } - - // needed mainly for slower connections when we may receive messages - // in between a switch. - pub(super) fn drain( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> VecDeque>> { - let mut drained = VecDeque::new(); - let mut this = self.as_mut().project(); - while let Poll::Ready(msg) = this.inner.as_mut().poll_next(cx) { - drained.push_back(msg.map(|v| { - v.map_err(xmtp_proto::ApiError::from) - .map_err(SubscribeError::from) - })); - } - drained - } } impl<'a, C> Stream for StreamGroupMessages<'a, C, MessagesApiSubscription<'a, C>> @@ -276,14 +247,6 @@ where match this.state.as_mut().project() { Waiting => { - if let Some(envelope) = this.drained.pop_front().flatten() { - let future = ProcessMessageFuture::new(*this.client, envelope?)?; - let future = future.process(); - this.state.set(State::Processing { - future: FutureWrapper::new(future), - }); - return self.try_update_state(cx); - } if let Some(envelope) = ready!(this.inner.poll_next(cx)) { let future = ProcessMessageFuture::new( *this.client, @@ -306,9 +269,7 @@ where if let Some(c) = cursor { this.set_cursor(group.as_slice(), c) }; - let drained = self.as_mut().drain(cx); let mut this = self.as_mut().project(); - this.drained.extend(drained); this.inner.set(stream); if let Some(cursor) = this.group_list.get(group.as_slice()) { tracing::debug!( @@ -413,8 +374,9 @@ where inbox_id = self.inbox_id(), group_id = hex::encode(&self.msg.group_id), cursor_id, - "[{}] is about to process streamed envelope cursor_id=[{}]", + "[{}] is about to process streamed envelope for group {} cursor_id=[{}]", self.inbox_id(), + xmtp_common::fmt::truncate_hex(&hex::encode(&self.msg.group_id)), &cursor_id ); @@ -520,15 +482,15 @@ where self.msg.group_id.clone(), self.msg.created_ns as i64, ); - tracing::info!("recovery sync"); - // let epoch = group.epoch(&self.provider).await.unwrap_or(0); + let epoch = group.epoch(&self.provider).await.unwrap_or(0); tracing::debug!( inbox_id = self.client.inbox_id(), group_id = hex::encode(&self.msg.group_id), cursor_id = self.msg.id, - // epoch = epoch, - "attempting recovery sync", - // epoch + epoch = epoch, + "attempting recovery sync for group {} in epoch {}", + xmtp_common::fmt::truncate_hex(hex::encode(&self.msg.group_id)), + epoch ); // Swallow errors here, since another process may have successfully saved the message // to the DB diff --git a/xmtp_mls/src/utils/test/mod.rs b/xmtp_mls/src/utils/test/mod.rs index 37307132c..93e5f1be5 100755 --- a/xmtp_mls/src/utils/test/mod.rs +++ b/xmtp_mls/src/utils/test/mod.rs @@ -88,6 +88,18 @@ impl ClientBuilder { .await } + pub async fn new_test_client_dev(owner: &impl InboxOwner) -> FullXmtpClient { + let api_client = ::create_dev().await; + + build_with_verifier( + owner, + api_client, + MockSmartContractSignatureVerifier::new(true), + None, + ) + .await + } + pub async fn new_test_client_with_history( owner: &impl InboxOwner, history_sync_url: &str,