Skip to content

Commit

Permalink
wip debug
Browse files Browse the repository at this point in the history
  • Loading branch information
insipx committed Feb 19, 2025
1 parent 5b6e7b5 commit 7d309c5
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 66 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions xmtp_mls/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ xmtp_api = { workspace = true, features = ["test-utils"] }
xmtp_id = { path = "../xmtp_id", features = ["test-utils"] }
xmtp_proto = { workspace = true, features = ["test-utils"] }
fdlimit = { workspace = true }
once_cell.workspace = true

[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
ctor.workspace = true
Expand Down
5 changes: 5 additions & 0 deletions xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,11 @@ where
conn: &DbConnection,
) -> Result<Vec<GroupMessage>, ClientError> {
let id_cursor = conn.get_last_cursor_for_id(group_id, EntityKind::Group)?;
tracing::info!(
"querying group messages from cursor = {}, group = {}",
id_cursor,
hex::encode(group_id)
);

let messages = self
.api_client
Expand Down
19 changes: 16 additions & 3 deletions xmtp_mls/src/groups/mls_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ where
// TODO: Should probably be renamed to `sync_with_provider`
#[tracing::instrument(skip_all)]
pub async fn sync_with_conn(&self, provider: &XmtpOpenMlsProvider) -> Result<(), GroupError> {
tracing::info!("RECEIVING");
let _mutex = self.mutex.lock().await;
let mut errors: Vec<GroupError> = vec![];

Expand Down Expand Up @@ -485,13 +486,18 @@ where
cursor,
intent.id,
intent.kind = %intent.kind,
"[{}]-[{}] processing own message for intent {} / {:?}, message_epoch: {}",
"[{}]-[{}] processing own message for intent {} / {}, message_epoch: {}",
self.context().inbox_id(),
hex::encode(self.group_id.clone()),
intent.id,
intent.kind,
message_epoch
);
#[cfg(test)]
{
let mut w = crate::PROCESSED.lock();
w.push((*cursor, intent.clone()));
}

if let Some((staged_commit, validated_commit)) = commit {
tracing::info!(
Expand Down Expand Up @@ -836,6 +842,7 @@ where
}

/// This function is idempotent. No need to wrap in a transaction.
#[tracing::instrument(skip(self, provider, envelope), level = "debug")]
pub(crate) async fn process_message(
&self,
provider: &XmtpOpenMlsProvider,
Expand Down Expand Up @@ -1112,12 +1119,14 @@ where
}
}

#[tracing::instrument(skip_all)]
#[tracing::instrument(skip_all, level = "debug")]
pub(super) async fn receive(&self, provider: &XmtpOpenMlsProvider) -> Result<(), GroupError> {
tracing::info!("RECEIVING");
let messages = self
.client
.query_group_messages(&self.group_id, provider.conn_ref())
.await?;
tracing::info!("CONTINUING TO PROCESS");
self.process_messages(messages, provider).await?;
Ok(())
}
Expand Down Expand Up @@ -1271,6 +1280,11 @@ where
intent.id,
intent.kind
);
#[cfg(test)]
{
let mut w = crate::PUBLISHED.lock();
w.push(intent.clone());
}
if has_staged_commit {
tracing::info!("Commit sent. Stopping further publishes for this round");
return Ok(());
Expand Down Expand Up @@ -1529,7 +1543,6 @@ where
inbox_ids
.iter()
.try_fold(HashMap::new(), |mut updates, inbox_id| {
tracing::info!("INBOX ID = {}", inbox_id);
match (
latest_sequence_id_map.get(inbox_id as &str),
existing_group_membership.get(inbox_id),
Expand Down
11 changes: 9 additions & 2 deletions xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
}

// Load the stored OpenMLS group from the OpenMLS provider's keystore
#[tracing::instrument(level = "trace", skip_all)]
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) async fn load_mls_group_with_lock_async<F, E, R, Fut>(
&self,
provider: &XmtpOpenMlsProvider,
Expand All @@ -473,8 +473,13 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
// Get the group ID for locking
let group_id = self.group_id.clone();

tracing::info!(
"TRYING TO LOAD MLS GROUP for group_id={}",
hex::encode(&group_id)
);
// Acquire the lock asynchronously
let _lock = MLS_COMMIT_LOCK.get_lock_async(group_id.clone()).await;
tracing::info!("LOADING GROUP");

// Load the MLS group
let mls_group =
Expand All @@ -483,7 +488,7 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
.ok_or(StorageError::from(NotFound::GroupById(
self.group_id.to_vec(),
)))?;

tracing::info!("PERFORM OPERATION");
// Perform the operation with the MLS group
operation(mls_group).await.map_err(Into::into)
}
Expand Down Expand Up @@ -599,6 +604,7 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {

// Create a group from a decrypted and decoded welcome message
// If the group already exists in the store, overwrite the MLS state and do not update the group entry
#[tracing::instrument(skip_all, level = "debug")]
pub(super) async fn create_from_welcome(
client: &ScopedClient,
provider: &XmtpOpenMlsProvider,
Expand Down Expand Up @@ -769,6 +775,7 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
}

/// Send a message on this users XMTP [`Client`].
#[tracing::instrument(skip_all, level = "debug")]
pub async fn send_message(&self, message: &[u8]) -> Result<Vec<u8>, GroupError> {
let provider = self.mls_provider()?;
self.send_message_with_provider(message, &provider).await
Expand Down
81 changes: 33 additions & 48 deletions xmtp_mls/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,28 @@ pub mod utils;
pub mod verified_key_package_v2;

pub use client::{Client, Network};
use parking_lot::Mutex;
use std::collections::HashMap;
use std::sync::{Arc, LazyLock, Mutex};
use std::sync::{Arc, LazyLock};
use storage::{xmtp_openmls_provider::XmtpOpenMlsProvider, DuplicateItem, StorageError};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio::sync::Mutex as TokioMutex;

pub use xmtp_id::InboxOwner;
pub use xmtp_proto::api_client::trait_impls::*;

#[cfg(test)]
pub static PUBLISHED: once_cell::sync::Lazy<Mutex<Vec<storage::group_intent::StoredGroupIntent>>> =
once_cell::sync::Lazy::new(|| Mutex::new(Vec::new()));
#[cfg(test)]
pub static PROCESSED: once_cell::sync::Lazy<
Mutex<Vec<(u64, storage::group_intent::StoredGroupIntent)>>,
> = once_cell::sync::Lazy::new(|| Mutex::new(Vec::new()));

/// A manager for group-specific semaphores
#[derive(Debug)]
pub struct GroupCommitLock {
// Storage for group-specific semaphores
locks: Mutex<HashMap<Vec<u8>, Arc<Semaphore>>>,
locks: Mutex<HashMap<Vec<u8>, Arc<TokioMutex<()>>>>,
}

impl Default for GroupCommitLock {
Expand All @@ -46,65 +55,41 @@ impl GroupCommitLock {
}

/// Get or create a semaphore for a specific group and acquire it, returning a guard
pub async fn get_lock_async(&self, group_id: Vec<u8>) -> Result<SemaphoreGuard, GroupError> {
let semaphore = {
match self.locks.lock() {
Ok(mut locks) => locks
.entry(group_id)
.or_insert_with(|| Arc::new(Semaphore::new(1)))
.clone(),
Err(err) => {
eprintln!("Failed to lock the mutex: {}", err);
return Err(GroupError::LockUnavailable);
}
}
pub async fn get_lock_async(&self, group_id: Vec<u8>) -> Result<MlsGroupGuard, GroupError> {
let lock = {
let mut locks = self.locks.lock();
locks
.entry(group_id)
.or_insert_with(|| Arc::new(TokioMutex::new(())))
.clone()
};

let semaphore_clone = semaphore.clone();
let permit = match semaphore.acquire_owned().await {
Ok(permit) => permit,
Err(err) => {
eprintln!("Failed to acquire semaphore permit: {}", err);
return Err(GroupError::LockFailedToAcquire);
}
};
Ok(SemaphoreGuard {
_permit: permit,
_semaphore: semaphore_clone,
Ok(MlsGroupGuard {
_permit: lock.lock_owned().await,
})
}

/// Get or create a semaphore for a specific group and acquire it synchronously
pub fn get_lock_sync(&self, group_id: Vec<u8>) -> Result<SemaphoreGuard, GroupError> {
let semaphore = {
match self.locks.lock() {
Ok(mut locks) => locks
.entry(group_id)
.or_insert_with(|| Arc::new(Semaphore::new(1)))
.clone(),
Err(err) => {
eprintln!("Failed to lock the mutex: {}", err);
return Err(GroupError::LockUnavailable);
}
}
pub fn get_lock_sync(&self, group_id: Vec<u8>) -> Result<MlsGroupGuard, GroupError> {
let lock = {
let mut locks = self.locks.lock();
locks
.entry(group_id)
.or_insert_with(|| Arc::new(TokioMutex::new(())))
.clone()
};

// Synchronously acquire the permit
let permit = semaphore
.clone()
.try_acquire_owned()
let permit = lock
.try_lock_owned()
.map_err(|_| GroupError::LockUnavailable)?;
Ok(SemaphoreGuard {
_permit: permit,
_semaphore: semaphore, // semaphore is now valid because we cloned it earlier
})
Ok(MlsGroupGuard { _permit: permit })
}
}

/// A guard that releases the semaphore when dropped
pub struct SemaphoreGuard {
_permit: OwnedSemaphorePermit,
_semaphore: Arc<Semaphore>,
pub struct MlsGroupGuard {
_permit: tokio::sync::OwnedMutexGuard<()>,
}

// Static instance of `GroupCommitLock`
Expand Down
32 changes: 31 additions & 1 deletion xmtp_mls/src/storage/encrypted_store/group_intent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ pub enum IntentState {
Error = 4,
}

#[derive(Queryable, Identifiable, Debug, PartialEq, Clone)]
#[derive(Queryable, Identifiable, PartialEq, Clone)]
#[diesel(table_name = group_intents)]
#[diesel(primary_key(id))]
pub struct StoredGroupIntent {
Expand All @@ -79,6 +79,36 @@ pub struct StoredGroupIntent {
pub published_in_epoch: Option<i64>,
}

impl std::fmt::Debug for StoredGroupIntent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "StoredGroupIntent {{")?;
write!(f, "id: {} ", self.id)?;
write!(f, "kind: {} ", self.kind)?;
write!(f, "group_id: {} ", hex::encode(&self.group_id))?;
/* write!(f, "data: {} ", hex::encode(&self.data))?; */
write!(f, "state: {:?} ", self.state)?;
write!(
f,
"payload_hash: {:?} ",
self.payload_hash.as_ref().map(|h| hex::encode(h))
)?;
/*write!(
f,
"post_commit_data: {:?}",
self.post_commit_data.as_ref().map(|d| hex::encode(d))
)?;*/
write!(f, "publish_attempts: {:?} ", self.publish_attempts)?;
/*write!(
f,
"staged_commit: {:?}",
self.staged_commit.as_ref().map(|c| hex::encode(c))
)?;*/
write!(f, "published_in_epoch: {:?} ", self.published_in_epoch)?;
write!(f, "StoredGroupIntent }}")?;
Ok(())
}
}

impl StoredGroupIntent {
/// Calculate the message id for this intent.
///
Expand Down
30 changes: 24 additions & 6 deletions xmtp_mls/src/subscriptions/stream_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,10 @@ mod tests {
let alix = Arc::new(ClientBuilder::new_test_client(&generate_local_wallet()).await);
let eve = Arc::new(ClientBuilder::new_test_client(&generate_local_wallet()).await);
let bo = Arc::new(ClientBuilder::new_test_client(&generate_local_wallet()).await);
tracing::info!(inbox_id = eve.inbox_id(), installation_id = %eve.installation_id(), "EVE");
tracing::info!(inbox_id = eve.inbox_id(), installation_id = %eve.installation_id(), "EVE={}", eve.inbox_id());
tracing::info!(inbox_id = bo.inbox_id(), installation_id = %bo.installation_id(), "BO={}", bo.inbox_id());
tracing::info!(inbox_id = alix.inbox_id(), installation_id = %alix.installation_id(), "ALIX={}", alix.inbox_id());
tracing::info!(inbox_id = caro.inbox_id(), installation_id = %caro.installation_id(), "CARO={}", caro.inbox_id());

let alix_group = alix
.create_group(None, GroupMetadataOptions::default())
Expand All @@ -300,7 +303,7 @@ mod tests {
let alix_group_pointer = alix_group.clone();
xmtp_common::spawn(None, async move {
let mut sent = 0;
for i in 0..15 {
for i in 0..2 {
let msg = format!("main spam {i}");
alix_group_pointer
.send_message(msg.as_bytes())
Expand All @@ -318,7 +321,7 @@ mod tests {
let caro_id = caro.inbox_id().to_string();
xmtp_common::spawn(None, async move {
let caro = &caro_id;
for i in 0..5 {
for i in 0..2 {
let new_group = eve
.create_group(None, GroupMetadataOptions::default())
.unwrap();
Expand All @@ -336,7 +339,7 @@ mod tests {
// this forces our streams to handle resubscribes while receiving lots of messages
xmtp_common::spawn(None, async move {
let bo_group = &bo_group;
for i in 0..20 {
for i in 0..2 {
bo_group
.send_message(format!("msg {i}").as_bytes())
.await
Expand All @@ -350,7 +353,7 @@ mod tests {
let _ = xmtp_common::time::timeout(core::time::Duration::from_secs(timeout), async {
futures::pin_mut!(stream);
loop {
if messages.len() < 40 {
if messages.len() < 6 {
if let Some(Ok(msg)) = stream.next().await {
tracing::info!(
message_id = hex::encode(&msg.id),
Expand All @@ -371,7 +374,22 @@ mod tests {
.await;

tracing::info!("Total Messages: {}", messages.len());
assert_eq!(messages.len(), 40);
tracing::info!("--------------------------");
tracing::info!("PUBLISHED");
tracing::info!("--------------------------");
let published = crate::PUBLISHED.lock();
let processed = crate::PROCESSED.lock();
for i in published.iter() {
tracing::info!("{:?}", i);
}
tracing::info!("--------------------------");
tracing::info!("PROCESSED");
tracing::info!("--------------------------");

for (cursor, i) in processed.iter() {
tracing::info!("cursor = {}, Intent={:?}", cursor, i);
}
assert_eq!(messages.len(), 6);
}

#[wasm_bindgen_test(unsupported = tokio::test(flavor = "multi_thread", worker_threads = 10))]
Expand Down
Loading

0 comments on commit 7d309c5

Please sign in to comment.