Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message Stream Test + Fixes #1646

Merged
merged 12 commits into from
Feb 20, 2025
25 changes: 21 additions & 4 deletions xmtp_mls/src/subscriptions/stream_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,23 +274,27 @@ mod tests {
assert_msg!(stream, "second");
}

#[wasm_bindgen_test(unsupported = tokio::test(flavor = "current_thread"))]
#[wasm_bindgen_test(unsupported = tokio::test(flavor = "multi_thread"))]
#[cfg_attr(target_arch = "wasm32", ignore)]
async fn test_stream_all_messages_does_not_lose_messages() {
xmtp_common::logger();
let caro = ClientBuilder::new_test_client(&generate_local_wallet()).await;
let alix = Arc::new(ClientBuilder::new_test_client(&generate_local_wallet()).await);
let eve = Arc::new(ClientBuilder::new_test_client(&generate_local_wallet()).await);
let bo = Arc::new(ClientBuilder::new_test_client(&generate_local_wallet()).await);
tracing::info!(inbox_id = eve.inbox_id(), installation_id = %eve.installation_id(), "EVE");

let alix_group = alix
.create_group(None, GroupMetadataOptions::default())
.unwrap();
alix_group
.add_members_by_inbox_id(&[caro.inbox_id()])
.add_members_by_inbox_id(&[caro.inbox_id(), bo.inbox_id()])
.await
.unwrap();

let provider = bo.store().mls_provider().unwrap();
let bo_group = bo.sync_welcomes(&provider).await.unwrap()[0].clone();

let stream = caro.stream_all_messages(None).await.unwrap();

let alix_group_pointer = alix_group.clone();
Expand Down Expand Up @@ -328,12 +332,25 @@ mod tests {
}
});

// Bo will try to break our stream by sending lots of messages
// this forces our streams to handle resubscribes while receiving lots of messages
xmtp_common::spawn(None, async move {
let bo_group = &bo_group;
for i in 0..20 {
bo_group
.send_message(format!("msg {i}").as_bytes())
.await
.unwrap();
xmtp_common::time::sleep(core::time::Duration::from_millis(50)).await
}
});

let mut messages = Vec::new();
let timeout = if cfg!(target_arch = "wasm32") { 15 } else { 5 };
let _ = xmtp_common::time::timeout(core::time::Duration::from_secs(timeout), async {
futures::pin_mut!(stream);
loop {
if messages.len() < 20 {
if messages.len() < 40 {
if let Some(Ok(msg)) = stream.next().await {
tracing::info!(
message_id = hex::encode(&msg.id),
Expand All @@ -354,7 +371,7 @@ mod tests {
.await;

tracing::info!("Total Messages: {}", messages.len());
assert_eq!(messages.len(), 20);
assert_eq!(messages.len(), 40);
}

#[wasm_bindgen_test(unsupported = tokio::test(flavor = "multi_thread", worker_threads = 10))]
Expand Down
Loading