From ecfb1941570cd39bd06ba49b8f672ae44ed28bc4 Mon Sep 17 00:00:00 2001 From: cameronvoell Date: Thu, 9 Jan 2025 09:14:55 -0800 Subject: [PATCH 01/16] pass in provider to update_consent_state --- bindings_ffi/src/mls.rs | 3 ++- bindings_node/src/conversation.rs | 3 ++- bindings_wasm/src/consent_state.rs | 6 ++++-- xmtp_mls/src/groups/mod.rs | 19 +++++++++++-------- 4 files changed, 19 insertions(+), 12 deletions(-) diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index 2146f4939..76e349cf2 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -1710,8 +1710,9 @@ impl FfiConversation { } pub fn update_consent_state(&self, state: FfiConsentState) -> Result<(), GenericError> { + let provider = self.inner.mls_provider()?; self.inner - .update_consent_state(state.into()) + .update_consent_state(&provider, state.into()) .map_err(Into::into) } diff --git a/bindings_node/src/conversation.rs b/bindings_node/src/conversation.rs index 98c27179b..3c78849b5 100644 --- a/bindings_node/src/conversation.rs +++ b/bindings_node/src/conversation.rs @@ -642,9 +642,10 @@ impl Conversation { self.group_id.clone(), self.created_at_ns, ); + let provider = group.mls_provider().map_err(ErrorWrapper::from)?; group - .update_consent_state(state.into()) + .update_consent_state(&provider, state.into()) .map_err(ErrorWrapper::from)?; Ok(()) diff --git a/bindings_wasm/src/consent_state.rs b/bindings_wasm/src/consent_state.rs index 938e53a11..9a0e73595 100644 --- a/bindings_wasm/src/consent_state.rs +++ b/bindings_wasm/src/consent_state.rs @@ -127,9 +127,11 @@ impl Conversation { #[wasm_bindgen(js_name = updateConsentState)] pub fn update_consent_state(&self, state: ConsentState) -> Result<(), JsError> { let group = self.to_mls_group(); - + let provider = group + .mls_provider() + .map_err(|e| JsError::new(&format!("{e}")))?; group - .update_consent_state(state.into()) + .update_consent_state(&provider, state.into()) .map_err(|e| JsError::new(&format!("{e}")))?; Ok(()) diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 0d4049d9a..3ba6648be 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -501,7 +501,7 @@ impl MlsGroup { let new_group = Self::new_from_arc(client.clone(), group_id, stored_group.created_at_ns); // Consent state defaults to allowed when the user creates the group - new_group.update_consent_state(ConsentState::Allowed)?; + new_group.update_consent_state(&provider, ConsentState::Allowed)?; Ok(new_group) } @@ -554,7 +554,7 @@ impl MlsGroup { stored_group.store(provider.conn_ref())?; let new_group = Self::new_from_arc(client.clone(), group_id, stored_group.created_at_ns); // Consent state defaults to allowed when the user creates the group - new_group.update_consent_state(ConsentState::Allowed)?; + new_group.update_consent_state(&provider, ConsentState::Allowed)?; Ok(new_group) } @@ -727,7 +727,7 @@ impl MlsGroup { self.sync_until_last_intent_resolved(provider).await?; // implicitly set group consent state to allowed - self.update_consent_state(ConsentState::Allowed)?; + self.update_consent_state(provider, ConsentState::Allowed)?; message_id } @@ -743,7 +743,7 @@ impl MlsGroup { self.sync_until_last_intent_resolved(&provider).await?; // implicitly set group consent state to allowed - self.update_consent_state(ConsentState::Allowed)?; + self.update_consent_state(&provider, ConsentState::Allowed)?; Ok(()) } @@ -1240,15 +1240,18 @@ impl MlsGroup { } } - pub fn update_consent_state(&self, state: ConsentState) -> Result<(), GroupError> { - let conn = self.context().store().conn()?; + pub fn update_consent_state( + &self, + provider: &XmtpOpenMlsProvider, + state: ConsentState, + ) -> Result<(), GroupError> { let consent_record = StoredConsentRecord::new( ConsentType::ConversationId, state, hex::encode(self.group_id.clone()), ); - conn.insert_or_replace_consent_records(&[consent_record.clone()])?; + provider.conn_ref().insert_or_replace_consent_records(&[consent_record.clone()])?; if self.client.history_sync_url().is_some() { // Dispatch an update event so it can be synced across devices @@ -3926,7 +3929,7 @@ pub(crate) mod tests { assert_eq!(alix_group.consent_state().unwrap(), ConsentState::Allowed); alix_group - .update_consent_state(ConsentState::Denied) + .update_consent_state(&alix.mls_provider().unwrap(), ConsentState::Denied) .unwrap(); assert_eq!(alix_group.consent_state().unwrap(), ConsentState::Denied); From 5a83340a339de3889cd9222b31b99fa0cf82b45f Mon Sep 17 00:00:00 2001 From: cameronvoell Date: Thu, 9 Jan 2025 17:00:13 -0800 Subject: [PATCH 02/16] lint fix --- xmtp_mls/src/groups/mod.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 3ba6648be..19880fcd6 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -501,7 +501,7 @@ impl MlsGroup { let new_group = Self::new_from_arc(client.clone(), group_id, stored_group.created_at_ns); // Consent state defaults to allowed when the user creates the group - new_group.update_consent_state(&provider, ConsentState::Allowed)?; + new_group.update_consent_state(provider, ConsentState::Allowed)?; Ok(new_group) } @@ -554,7 +554,7 @@ impl MlsGroup { stored_group.store(provider.conn_ref())?; let new_group = Self::new_from_arc(client.clone(), group_id, stored_group.created_at_ns); // Consent state defaults to allowed when the user creates the group - new_group.update_consent_state(&provider, ConsentState::Allowed)?; + new_group.update_consent_state(provider, ConsentState::Allowed)?; Ok(new_group) } @@ -1245,13 +1245,14 @@ impl MlsGroup { provider: &XmtpOpenMlsProvider, state: ConsentState, ) -> Result<(), GroupError> { - let consent_record = StoredConsentRecord::new( ConsentType::ConversationId, state, hex::encode(self.group_id.clone()), ); - provider.conn_ref().insert_or_replace_consent_records(&[consent_record.clone()])?; + provider + .conn_ref() + .insert_or_replace_consent_records(&[consent_record.clone()])?; if self.client.history_sync_url().is_some() { // Dispatch an update event so it can be synced across devices From 39bc5d4a942d7a6c0a1e30faffddf5143c142d69 Mon Sep 17 00:00:00 2001 From: cameronvoell Date: Thu, 9 Jan 2025 17:28:49 -0800 Subject: [PATCH 03/16] try send message in async transaction --- xmtp_mls/src/groups/mod.rs | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 19880fcd6..dff391b60 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -717,19 +717,21 @@ impl MlsGroup { message: &[u8], provider: &XmtpOpenMlsProvider, ) -> Result, GroupError> { - let update_interval_ns = Some(SEND_MESSAGE_UPDATE_INSTALLATIONS_INTERVAL_NS); - self.maybe_update_installations(provider, update_interval_ns) - .await?; - - let message_id = - self.prepare_message(message, provider, |now| Self::into_envelope(message, now)); - - self.sync_until_last_intent_resolved(provider).await?; - - // implicitly set group consent state to allowed - self.update_consent_state(provider, ConsentState::Allowed)?; - - message_id + self.context().store().transaction_async(provider, |tx_provider| async move { + let update_interval_ns = Some(SEND_MESSAGE_UPDATE_INSTALLATIONS_INTERVAL_NS); + self.maybe_update_installations(tx_provider, update_interval_ns).await?; + + let message_id = self.prepare_message( + message, + tx_provider, + |now| Self::into_envelope(message, now) + )?; + + self.sync_until_last_intent_resolved(tx_provider).await?; + self.update_consent_state(tx_provider, ConsentState::Allowed)?; + + Ok(message_id) + }).await } /// Publish all unpublished messages. This happens by calling `sync_until_last_intent_resolved` From 1bb66e7b31b3ba7728ff71f7733ababf693a7f62 Mon Sep 17 00:00:00 2001 From: cameronvoell Date: Thu, 9 Jan 2025 17:29:18 -0800 Subject: [PATCH 04/16] fmt fix --- xmtp_mls/src/groups/mod.rs | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index dff391b60..1ef1ff73c 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -717,21 +717,23 @@ impl MlsGroup { message: &[u8], provider: &XmtpOpenMlsProvider, ) -> Result, GroupError> { - self.context().store().transaction_async(provider, |tx_provider| async move { - let update_interval_ns = Some(SEND_MESSAGE_UPDATE_INSTALLATIONS_INTERVAL_NS); - self.maybe_update_installations(tx_provider, update_interval_ns).await?; - - let message_id = self.prepare_message( - message, - tx_provider, - |now| Self::into_envelope(message, now) - )?; - - self.sync_until_last_intent_resolved(tx_provider).await?; - self.update_consent_state(tx_provider, ConsentState::Allowed)?; - - Ok(message_id) - }).await + self.context() + .store() + .transaction_async(provider, |tx_provider| async move { + let update_interval_ns = Some(SEND_MESSAGE_UPDATE_INSTALLATIONS_INTERVAL_NS); + self.maybe_update_installations(tx_provider, update_interval_ns) + .await?; + + let message_id = self.prepare_message(message, tx_provider, |now| { + Self::into_envelope(message, now) + })?; + + self.sync_until_last_intent_resolved(tx_provider).await?; + self.update_consent_state(tx_provider, ConsentState::Allowed)?; + + Ok(message_id) + }) + .await } /// Publish all unpublished messages. This happens by calling `sync_until_last_intent_resolved` From 278395c64ff90189473e31ed2e99672c4116913c Mon Sep 17 00:00:00 2001 From: cameronvoell Date: Fri, 10 Jan 2025 11:52:25 -0800 Subject: [PATCH 05/16] db locked error on consecutive msg send repro test --- bindings_ffi/src/mls.rs | 53 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index 76e349cf2..79e302338 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -5547,4 +5547,57 @@ mod tests { FfiReactionSchema::Unicode )); } + + #[tokio::test(flavor = "multi_thread", worker_threads = 5)] + async fn test_can_send_parallel_messages() { + use futures::future::join_all; + + // Create two test clients + let alix = new_test_client().await; + let bo = new_test_client().await; + + // Create a conversation between them + let alix_conversation = alix + .conversations() + .create_group( + vec![bo.account_address.clone()], + FfiCreateGroupOptions::default(), + ) + .await + .unwrap(); + + // Create futures for sending messages in parallel + let send_futures = vec![ + alix_conversation.send("Message 1".as_bytes().to_vec()), + alix_conversation.send("Message 2".as_bytes().to_vec()), + ]; + + // Send all messages in parallel and collect results + let results = join_all(send_futures).await; + + // Check each result and print any errors + for (i, result) in results.iter().enumerate() { + if let Err(e) = result { + // Getting Error sending message 2: GroupError(Storage(DieselResult(DatabaseError(Unknown, "database is locked")))) + println!("Error sending message {}: {:?}", i + 1, e); + } + } + + // Assert all messages were sent successfully + assert!( + results.into_iter().all(|r| r.is_ok()), + "Not all messages were sent successfully" + ); + + // Have Alix sync to get the messages + alix_conversation.sync().await.unwrap(); + + // Verify messages were received + let messages = alix_conversation + .find_messages(FfiListMessagesOptions::default()) + .await + .unwrap(); + + assert_eq!(messages.len(), 2); + } } From ca9c6baf76ff9d6fd95028975e3c8a1965379610 Mon Sep 17 00:00:00 2001 From: cameronvoell Date: Fri, 10 Jan 2025 14:18:41 -0800 Subject: [PATCH 06/16] Revert "fmt fix" This reverts commit 1bb66e7b31b3ba7728ff71f7733ababf693a7f62. --- xmtp_mls/src/groups/mod.rs | 32 +++++++++++++++----------------- 1 file changed, 15 insertions(+), 17 deletions(-) diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 1ef1ff73c..dff391b60 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -717,23 +717,21 @@ impl MlsGroup { message: &[u8], provider: &XmtpOpenMlsProvider, ) -> Result, GroupError> { - self.context() - .store() - .transaction_async(provider, |tx_provider| async move { - let update_interval_ns = Some(SEND_MESSAGE_UPDATE_INSTALLATIONS_INTERVAL_NS); - self.maybe_update_installations(tx_provider, update_interval_ns) - .await?; - - let message_id = self.prepare_message(message, tx_provider, |now| { - Self::into_envelope(message, now) - })?; - - self.sync_until_last_intent_resolved(tx_provider).await?; - self.update_consent_state(tx_provider, ConsentState::Allowed)?; - - Ok(message_id) - }) - .await + self.context().store().transaction_async(provider, |tx_provider| async move { + let update_interval_ns = Some(SEND_MESSAGE_UPDATE_INSTALLATIONS_INTERVAL_NS); + self.maybe_update_installations(tx_provider, update_interval_ns).await?; + + let message_id = self.prepare_message( + message, + tx_provider, + |now| Self::into_envelope(message, now) + )?; + + self.sync_until_last_intent_resolved(tx_provider).await?; + self.update_consent_state(tx_provider, ConsentState::Allowed)?; + + Ok(message_id) + }).await } /// Publish all unpublished messages. This happens by calling `sync_until_last_intent_resolved` From 08d732f6169a87a106256b91e1d7f9bcf843e2ae Mon Sep 17 00:00:00 2001 From: cameronvoell Date: Fri, 10 Jan 2025 14:18:52 -0800 Subject: [PATCH 07/16] Revert "try send message in async transaction" This reverts commit 39bc5d4a942d7a6c0a1e30faffddf5143c142d69. --- xmtp_mls/src/groups/mod.rs | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index dff391b60..19880fcd6 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -717,21 +717,19 @@ impl MlsGroup { message: &[u8], provider: &XmtpOpenMlsProvider, ) -> Result, GroupError> { - self.context().store().transaction_async(provider, |tx_provider| async move { - let update_interval_ns = Some(SEND_MESSAGE_UPDATE_INSTALLATIONS_INTERVAL_NS); - self.maybe_update_installations(tx_provider, update_interval_ns).await?; - - let message_id = self.prepare_message( - message, - tx_provider, - |now| Self::into_envelope(message, now) - )?; - - self.sync_until_last_intent_resolved(tx_provider).await?; - self.update_consent_state(tx_provider, ConsentState::Allowed)?; - - Ok(message_id) - }).await + let update_interval_ns = Some(SEND_MESSAGE_UPDATE_INSTALLATIONS_INTERVAL_NS); + self.maybe_update_installations(provider, update_interval_ns) + .await?; + + let message_id = + self.prepare_message(message, provider, |now| Self::into_envelope(message, now)); + + self.sync_until_last_intent_resolved(provider).await?; + + // implicitly set group consent state to allowed + self.update_consent_state(provider, ConsentState::Allowed)?; + + message_id } /// Publish all unpublished messages. This happens by calling `sync_until_last_intent_resolved` From 7aadd198b0a81b7074ba09a1dcb861c683576bc4 Mon Sep 17 00:00:00 2001 From: cameronvoell Date: Fri, 10 Jan 2025 14:19:45 -0800 Subject: [PATCH 08/16] fix test --- bindings_ffi/src/mls.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index 79e302338..4687814a5 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -5598,6 +5598,6 @@ mod tests { .await .unwrap(); - assert_eq!(messages.len(), 2); + assert_eq!(messages.len(), 3); } } From af21b63db2c9aa1911d2abb1ed6cfe7f9418e1c8 Mon Sep 17 00:00:00 2001 From: cameronvoell Date: Fri, 10 Jan 2025 14:23:57 -0800 Subject: [PATCH 09/16] update comment in test --- bindings_ffi/src/mls.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index 4687814a5..e3385481a 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -5570,6 +5570,10 @@ mod tests { let send_futures = vec![ alix_conversation.send("Message 1".as_bytes().to_vec()), alix_conversation.send("Message 2".as_bytes().to_vec()), + alix_conversation.send("Message 3".as_bytes().to_vec()), + alix_conversation.send("Message 4".as_bytes().to_vec()), + alix_conversation.send("Message 5".as_bytes().to_vec()), + alix_conversation.send("Message 6".as_bytes().to_vec()), ]; // Send all messages in parallel and collect results @@ -5578,7 +5582,7 @@ mod tests { // Check each result and print any errors for (i, result) in results.iter().enumerate() { if let Err(e) = result { - // Getting Error sending message 2: GroupError(Storage(DieselResult(DatabaseError(Unknown, "database is locked")))) + // No longer erroring here: GroupError(Storage(DieselResult(DatabaseError(Unknown, "database is locked")))) println!("Error sending message {}: {:?}", i + 1, e); } } @@ -5598,6 +5602,6 @@ mod tests { .await .unwrap(); - assert_eq!(messages.len(), 3); + assert_eq!(messages.len(), 7); } } From 58eaea7563abb14023804eaf754fe4e6153fe2b7 Mon Sep 17 00:00:00 2001 From: cameronvoell Date: Fri, 10 Jan 2025 14:25:44 -0800 Subject: [PATCH 10/16] Revert "lint fix" This reverts commit 5a83340a339de3889cd9222b31b99fa0cf82b45f. --- xmtp_mls/src/groups/mod.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 19880fcd6..3ba6648be 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -501,7 +501,7 @@ impl MlsGroup { let new_group = Self::new_from_arc(client.clone(), group_id, stored_group.created_at_ns); // Consent state defaults to allowed when the user creates the group - new_group.update_consent_state(provider, ConsentState::Allowed)?; + new_group.update_consent_state(&provider, ConsentState::Allowed)?; Ok(new_group) } @@ -554,7 +554,7 @@ impl MlsGroup { stored_group.store(provider.conn_ref())?; let new_group = Self::new_from_arc(client.clone(), group_id, stored_group.created_at_ns); // Consent state defaults to allowed when the user creates the group - new_group.update_consent_state(provider, ConsentState::Allowed)?; + new_group.update_consent_state(&provider, ConsentState::Allowed)?; Ok(new_group) } @@ -1245,14 +1245,13 @@ impl MlsGroup { provider: &XmtpOpenMlsProvider, state: ConsentState, ) -> Result<(), GroupError> { + let consent_record = StoredConsentRecord::new( ConsentType::ConversationId, state, hex::encode(self.group_id.clone()), ); - provider - .conn_ref() - .insert_or_replace_consent_records(&[consent_record.clone()])?; + provider.conn_ref().insert_or_replace_consent_records(&[consent_record.clone()])?; if self.client.history_sync_url().is_some() { // Dispatch an update event so it can be synced across devices From f78dcf34d2ac0c1f6ba6849100aa132d72da3dfd Mon Sep 17 00:00:00 2001 From: cameronvoell Date: Fri, 10 Jan 2025 14:26:15 -0800 Subject: [PATCH 11/16] Revert "pass in provider to update_consent_state" This reverts commit ecfb1941570cd39bd06ba49b8f672ae44ed28bc4. --- bindings_ffi/src/mls.rs | 3 +-- bindings_node/src/conversation.rs | 3 +-- bindings_wasm/src/consent_state.rs | 6 ++---- xmtp_mls/src/groups/mod.rs | 19 ++++++++----------- 4 files changed, 12 insertions(+), 19 deletions(-) diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index e3385481a..5368a7e79 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -1710,9 +1710,8 @@ impl FfiConversation { } pub fn update_consent_state(&self, state: FfiConsentState) -> Result<(), GenericError> { - let provider = self.inner.mls_provider()?; self.inner - .update_consent_state(&provider, state.into()) + .update_consent_state(state.into()) .map_err(Into::into) } diff --git a/bindings_node/src/conversation.rs b/bindings_node/src/conversation.rs index 3c78849b5..98c27179b 100644 --- a/bindings_node/src/conversation.rs +++ b/bindings_node/src/conversation.rs @@ -642,10 +642,9 @@ impl Conversation { self.group_id.clone(), self.created_at_ns, ); - let provider = group.mls_provider().map_err(ErrorWrapper::from)?; group - .update_consent_state(&provider, state.into()) + .update_consent_state(state.into()) .map_err(ErrorWrapper::from)?; Ok(()) diff --git a/bindings_wasm/src/consent_state.rs b/bindings_wasm/src/consent_state.rs index 9a0e73595..938e53a11 100644 --- a/bindings_wasm/src/consent_state.rs +++ b/bindings_wasm/src/consent_state.rs @@ -127,11 +127,9 @@ impl Conversation { #[wasm_bindgen(js_name = updateConsentState)] pub fn update_consent_state(&self, state: ConsentState) -> Result<(), JsError> { let group = self.to_mls_group(); - let provider = group - .mls_provider() - .map_err(|e| JsError::new(&format!("{e}")))?; + group - .update_consent_state(&provider, state.into()) + .update_consent_state(state.into()) .map_err(|e| JsError::new(&format!("{e}")))?; Ok(()) diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 3ba6648be..0d4049d9a 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -501,7 +501,7 @@ impl MlsGroup { let new_group = Self::new_from_arc(client.clone(), group_id, stored_group.created_at_ns); // Consent state defaults to allowed when the user creates the group - new_group.update_consent_state(&provider, ConsentState::Allowed)?; + new_group.update_consent_state(ConsentState::Allowed)?; Ok(new_group) } @@ -554,7 +554,7 @@ impl MlsGroup { stored_group.store(provider.conn_ref())?; let new_group = Self::new_from_arc(client.clone(), group_id, stored_group.created_at_ns); // Consent state defaults to allowed when the user creates the group - new_group.update_consent_state(&provider, ConsentState::Allowed)?; + new_group.update_consent_state(ConsentState::Allowed)?; Ok(new_group) } @@ -727,7 +727,7 @@ impl MlsGroup { self.sync_until_last_intent_resolved(provider).await?; // implicitly set group consent state to allowed - self.update_consent_state(provider, ConsentState::Allowed)?; + self.update_consent_state(ConsentState::Allowed)?; message_id } @@ -743,7 +743,7 @@ impl MlsGroup { self.sync_until_last_intent_resolved(&provider).await?; // implicitly set group consent state to allowed - self.update_consent_state(&provider, ConsentState::Allowed)?; + self.update_consent_state(ConsentState::Allowed)?; Ok(()) } @@ -1240,18 +1240,15 @@ impl MlsGroup { } } - pub fn update_consent_state( - &self, - provider: &XmtpOpenMlsProvider, - state: ConsentState, - ) -> Result<(), GroupError> { + pub fn update_consent_state(&self, state: ConsentState) -> Result<(), GroupError> { + let conn = self.context().store().conn()?; let consent_record = StoredConsentRecord::new( ConsentType::ConversationId, state, hex::encode(self.group_id.clone()), ); - provider.conn_ref().insert_or_replace_consent_records(&[consent_record.clone()])?; + conn.insert_or_replace_consent_records(&[consent_record.clone()])?; if self.client.history_sync_url().is_some() { // Dispatch an update event so it can be synced across devices @@ -3929,7 +3926,7 @@ pub(crate) mod tests { assert_eq!(alix_group.consent_state().unwrap(), ConsentState::Allowed); alix_group - .update_consent_state(&alix.mls_provider().unwrap(), ConsentState::Denied) + .update_consent_state(ConsentState::Denied) .unwrap(); assert_eq!(alix_group.consent_state().unwrap(), ConsentState::Denied); From cb1eee70877a7c202fde91d463a3c88002948d3f Mon Sep 17 00:00:00 2001 From: cameronvoell Date: Fri, 10 Jan 2025 14:48:53 -0800 Subject: [PATCH 12/16] try joinset --- bindings_ffi/src/mls.rs | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/bindings_ffi/src/mls.rs b/bindings_ffi/src/mls.rs index 5368a7e79..9c2ba5111 100644 --- a/bindings_ffi/src/mls.rs +++ b/bindings_ffi/src/mls.rs @@ -2155,7 +2155,7 @@ mod tests { Arc, Mutex, }, }; - use tokio::{sync::Notify, time::error::Elapsed}; + use tokio::{sync::Notify, task::JoinSet, time::error::Elapsed}; use xmtp_common::tmp_path; use xmtp_common::{wait_for_eq, wait_for_ok}; use xmtp_content_types::{read_receipt, text::TextCodec, ContentCodec}; @@ -5565,18 +5565,23 @@ mod tests { .await .unwrap(); - // Create futures for sending messages in parallel - let send_futures = vec![ - alix_conversation.send("Message 1".as_bytes().to_vec()), - alix_conversation.send("Message 2".as_bytes().to_vec()), - alix_conversation.send("Message 3".as_bytes().to_vec()), - alix_conversation.send("Message 4".as_bytes().to_vec()), - alix_conversation.send("Message 5".as_bytes().to_vec()), - alix_conversation.send("Message 6".as_bytes().to_vec()), - ]; + // Create JoinSet for parallel tasks + let mut tasks = JoinSet::new(); - // Send all messages in parallel and collect results - let results = join_all(send_futures).await; + // Spawn tasks for sending messages in parallel + for i in 1..=6 { + let conversation = alix_conversation.clone(); + let message = format!("Message {}", i); + tasks.spawn(async move { + conversation.send(message.as_bytes().to_vec()).await + }); + } + + // Collect results as they complete + let mut results = Vec::new(); + while let Some(result) = tasks.join_next().await { + results.push(result.unwrap()); + } // Check each result and print any errors for (i, result) in results.iter().enumerate() { From c150a6396271527bd2c8ba6c210eb7565ad94657 Mon Sep 17 00:00:00 2001 From: cameronvoell Date: Mon, 13 Jan 2025 14:55:32 -0800 Subject: [PATCH 13/16] test send_msg without implicit update consent --- xmtp_mls/src/groups/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index 0d4049d9a..f2ed98b03 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -727,7 +727,7 @@ impl MlsGroup { self.sync_until_last_intent_resolved(provider).await?; // implicitly set group consent state to allowed - self.update_consent_state(ConsentState::Allowed)?; + // self.update_consent_state(ConsentState::Allowed)?; message_id } From 03aad1e6a7d59fedf870c28d332b7878c8976bbd Mon Sep 17 00:00:00 2001 From: cameronvoell Date: Mon, 13 Jan 2025 16:30:58 -0800 Subject: [PATCH 14/16] disable history sync on update consent state --- xmtp_mls/src/groups/mod.rs | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index f2ed98b03..f996d0643 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -727,7 +727,7 @@ impl MlsGroup { self.sync_until_last_intent_resolved(provider).await?; // implicitly set group consent state to allowed - // self.update_consent_state(ConsentState::Allowed)?; + self.update_consent_state(ConsentState::Allowed)?; message_id } @@ -1250,15 +1250,15 @@ impl MlsGroup { ); conn.insert_or_replace_consent_records(&[consent_record.clone()])?; - if self.client.history_sync_url().is_some() { - // Dispatch an update event so it can be synced across devices - let _ = self - .client - .local_events() - .send(LocalEvents::OutgoingPreferenceUpdates(vec![ - UserPreferenceUpdate::ConsentUpdate(consent_record), - ])); - } + // if self.client.history_sync_url().is_some() { + // // Dispatch an update event so it can be synced across devices + // let _ = self + // .client + // .local_events() + // .send(LocalEvents::OutgoingPreferenceUpdates(vec![ + // UserPreferenceUpdate::ConsentUpdate(consent_record), + // ])); + // } Ok(()) } From 1a5f92f69b02996bda76a3e2fc6cd563caa94602 Mon Sep 17 00:00:00 2001 From: Dakota Brink Date: Mon, 13 Jan 2025 22:01:27 -0500 Subject: [PATCH 15/16] retry consent sync --- xmtp_mls/src/groups/device_sync.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/xmtp_mls/src/groups/device_sync.rs b/xmtp_mls/src/groups/device_sync.rs index 734aad132..210da883c 100644 --- a/xmtp_mls/src/groups/device_sync.rs +++ b/xmtp_mls/src/groups/device_sync.rs @@ -182,8 +182,16 @@ where }, LocalEvents::OutgoingPreferenceUpdates(preference_updates) => { tracing::error!("Outgoing preference update {preference_updates:?}"); - UserPreferenceUpdate::sync_across_devices(preference_updates, &self.client) - .await?; + retry_async!( + self.retry, + (async { + UserPreferenceUpdate::sync_across_devices( + preference_updates.clone(), + &self.client, + ) + .await + }) + )?; } LocalEvents::IncomingPreferenceUpdate(_) => { tracing::error!("Incoming preference update"); From 84e59ca6db1f57580480d6f3c3673735d3986339 Mon Sep 17 00:00:00 2001 From: Dakota Brink Date: Mon, 13 Jan 2025 22:35:50 -0500 Subject: [PATCH 16/16] bring this code back --- xmtp_mls/src/groups/mod.rs | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/xmtp_mls/src/groups/mod.rs b/xmtp_mls/src/groups/mod.rs index f996d0643..0d4049d9a 100644 --- a/xmtp_mls/src/groups/mod.rs +++ b/xmtp_mls/src/groups/mod.rs @@ -1250,15 +1250,15 @@ impl MlsGroup { ); conn.insert_or_replace_consent_records(&[consent_record.clone()])?; - // if self.client.history_sync_url().is_some() { - // // Dispatch an update event so it can be synced across devices - // let _ = self - // .client - // .local_events() - // .send(LocalEvents::OutgoingPreferenceUpdates(vec![ - // UserPreferenceUpdate::ConsentUpdate(consent_record), - // ])); - // } + if self.client.history_sync_url().is_some() { + // Dispatch an update event so it can be synced across devices + let _ = self + .client + .local_events() + .send(LocalEvents::OutgoingPreferenceUpdates(vec![ + UserPreferenceUpdate::ConsentUpdate(consent_record), + ])); + } Ok(()) }