Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
insipx committed Feb 19, 2025
1 parent 4f20dc3 commit ab08ea7
Show file tree
Hide file tree
Showing 18 changed files with 201 additions and 182 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 0 additions & 3 deletions common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ tracing.workspace = true
web-time.workspace = true
xmtp_cryptography.workspace = true

hex = { workspace = true, optional = true }
# optional
once_cell = { workspace = true, optional = true }
parking_lot = { workspace = true, optional = true }
Expand Down Expand Up @@ -52,7 +51,6 @@ tracing-subscriber = { workspace = true, features = [
] }
parking_lot.workspace = true
once_cell.workspace = true
hex.workspace = true

[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
tokio = { workspace = true, features = ["time", "macros", "rt", "sync"] }
Expand All @@ -78,7 +76,6 @@ test-utils = [
"dep:console_error_panic_hook",
"dep:tracing-forest",
"dep:once_cell",
"dep:hex",
]
bench = [
"test-utils",
Expand Down
38 changes: 38 additions & 0 deletions common/src/fmt.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
pub fn truncate_hex(hex_string: impl AsRef<str>) -> String {
let hex_string = hex_string.as_ref();
// If empty string, return it
if hex_string.is_empty() {
return String::new();
}

// Determine if string has 0x prefix
let hex_value = if hex_string.starts_with("0x") {
&hex_string[2..]
} else {
hex_string
};

// If the hex value is 8 or fewer chars, return original string
if hex_value.len() <= 8 {
return hex_string.to_string();
}

format!(
"0x{}...{}",
&hex_value[..4],
&hex_value[hex_value.len() - 4..]
)
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_long_hex() {
assert_eq!(
truncate_hex("0x5bf078bd83995fe83092d93c5655f059"),
"0x5bf0...f059"
);
}
}
2 changes: 2 additions & 0 deletions common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub use stream_handles::*;

pub mod time;

pub mod fmt;

use rand::{
distributions::{Alphanumeric, DistString},
RngCore,
Expand Down
48 changes: 44 additions & 4 deletions common/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ use rand::{
use std::{future::Future, sync::OnceLock};
use xmtp_cryptography::utils as crypto_utils;

use once_cell::sync::Lazy;
use parking_lot::Mutex;
use std::collections::HashMap;

#[cfg(not(target_arch = "wasm32"))]
pub mod traced_test;
#[cfg(not(target_arch = "wasm32"))]
Expand All @@ -17,9 +21,39 @@ use crate::time::Expired;
mod logger;
mod macros;

pub use logger::InboxIdReplace;
static INIT: OnceLock<()> = OnceLock::new();

static REPLACE_IDS: Lazy<Mutex<HashMap<String, String>>> = Lazy::new(|| Mutex::new(HashMap::new()));

/// Replace inbox id in Contextual output with a name (i.e Alix, Bo, etc.)
pub struct InboxIdReplace {
ids: HashMap<String, String>,
}

impl InboxIdReplace {
pub fn new() -> Self {
Self {
ids: HashMap::new(),
}
}

pub fn add(&mut self, id: &str, name: &str) {
self.ids.insert(id.to_string(), name.to_string());
let mut ids = REPLACE_IDS.lock();
ids.insert(id.to_string(), name.to_string());
}
}

// remove ids for replacement from map on drop
impl Drop for InboxIdReplace {
fn drop(&mut self) {
let mut ids = REPLACE_IDS.lock();
for (id, _name) in &self.ids {
let _ = ids.remove(id.as_str());
}
}
}

#[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
use tracing_subscriber::{
fmt::{self, format},
Expand Down Expand Up @@ -69,7 +103,13 @@ where
.fmt_fields({
format::debug_fn(move |writer, field, value| {
if field.name() == "message" {
write!(writer, "{:?}", value)?;
let mut message = format!("{:?}", value);
let ids = REPLACE_IDS.lock();
for (id, name) in ids.iter() {
message = message.replace(id, name);
}

write!(writer, "{}", message)?;
}
Ok(())
})
Expand Down Expand Up @@ -101,8 +141,8 @@ pub fn logger() {

INIT.get_or_init(|| {
let filter = EnvFilter::builder()
.with_default_directive(tracing::metadata::LevelFilter::TRACE.into())
.parse_lossy("debug");
// .with_default_directive(tracing::metadata::LevelFilter::INFO.into())
.parse_lossy("xmtp_mls::subscriptions=debug,xmtp_mls::groups=info");

tracing_subscriber::registry()
.with(tracing_wasm::WASMLayer::default())
Expand Down
36 changes: 1 addition & 35 deletions common/src/test/logger.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,9 @@
// copy-paste of https://docs.rs/tracing-forest/latest/src/tracing_forest/printer/pretty.rs.html#62
// but with slight variations
use once_cell::sync::Lazy;
use parking_lot::Mutex;
use std::collections::HashMap;
use std::fmt::{self, Write};
use tracing_forest::printer::Formatter;
use tracing_forest::tree::{Event, Span, Tree};

static REPLACE_IDS: Lazy<Mutex<HashMap<String, String>>> = Lazy::new(|| Mutex::new(HashMap::new()));

/// Replace inbox id in Contextual output with a name (i.e Alix, Bo, etc.)
pub struct InboxIdReplace {
ids: HashMap<String, String>,
}

impl InboxIdReplace {
pub fn new() -> Self {
Self {
ids: HashMap::new(),
}
}

pub fn add(&mut self, id: &str, name: &str) {
self.ids.insert(id.to_string(), name.to_string());
let mut ids = REPLACE_IDS.lock();
ids.insert(id.to_string(), name.to_string());
}
}

// remove ids for replacement from map on drop
impl Drop for InboxIdReplace {
fn drop(&mut self) {
let mut ids = REPLACE_IDS.lock();
for (id, _name) in &self.ids {
let _ = ids.remove(id.as_str());
}
}
}

type IndentVec = Vec<Indent>;

pub struct Contextual;
Expand Down Expand Up @@ -75,7 +41,7 @@ impl Contextual {
let mut message = String::new();
if let Some(msg) = event.message() {
message = message + msg;
let ids = REPLACE_IDS.lock();
let ids = super::REPLACE_IDS.lock();
for (id, name) in ids.iter() {
message = message.replace(id, name);
}
Expand Down
1 change: 1 addition & 0 deletions dev/test-wasm-interactive
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ WASM_BINDGEN_SPLIT_LINKED_MODULES=1 \
WASM_BINDGEN_TEST_ONLY_WEB=1 \
NO_HEADLESS=1 \
cargo test --target wasm32-unknown-unknown --release \
test_stream_all_messages_does_not_lose_messages \
-p $PACKAGE -- \
--skip xmtp_mls::storage::encrypted_store::group_message::tests::it_cannot_insert_message_without_group \
--skip xmtp_mls::groups::tests::process_messages_abort_on_retryable_error \
Expand Down
1 change: 0 additions & 1 deletion xmtp_api/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,6 @@ pub mod tests {
#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
#[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
async fn cooldowns_apply_to_concurrent_fns() {
xmtp_common::logger();
let mut mock_api = MockApiClient::new();
let group_id = vec![1, 2, 3];

Expand Down
2 changes: 1 addition & 1 deletion xmtp_api_http/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ pub enum HttpClientError {
HeaderValue(#[from] reqwest::header::InvalidHeaderValue),
#[error(transparent)]
HeaderName(#[from] reqwest::header::InvalidHeaderName),
#[error(transparent)]
#[error("error deserializing json response {0}")]
Json(#[from] serde_json::Error),
}

Expand Down
14 changes: 5 additions & 9 deletions xmtp_api_http/src/http_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,24 +110,20 @@ where
{
fn on_bytes(bytes: bytes::Bytes, remaining: &mut Vec<u8>) -> Result<Vec<R>, HttpClientError> {
let bytes = &[remaining.as_ref(), bytes.as_ref()].concat();
let de = Deserializer::from_slice(bytes);
remaining.clear();
let de = Deserializer::from_slice(&bytes);
let mut deser_stream = de.into_iter::<GrpcResponse<R>>();
let mut items = Vec::new();
loop {
let item = deser_stream.next();
if item.is_none() {
break;
}
match item.expect("checked for none;") {
while let Some(item) = deser_stream.next() {
match item {
Ok(GrpcResponse::Ok(response)) => items.push(response),
Ok(GrpcResponse::SubscriptionItem(item)) => items.push(item.result),
Ok(GrpcResponse::Err(e)) => {
return Err(HttpClientError::Grpc(e));
}
Err(e) => {
if e.is_eof() {
*remaining = (&**bytes)[deser_stream.byte_offset()..].to_vec();
break;
*remaining = bytes[deser_stream.byte_offset()..].to_vec();
} else {
return Err(HttpClientError::from(e));
}
Expand Down
5 changes: 0 additions & 5 deletions xmtp_mls/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -813,11 +813,6 @@ where
conn: &DbConnection,
) -> Result<Vec<GroupMessage>, ClientError> {
let id_cursor = conn.get_last_cursor_for_id(group_id, EntityKind::Group)?;
tracing::info!(
"querying group messages from cursor = {}, group = {}",
id_cursor,
hex::encode(group_id)
);

let messages = self
.api_client
Expand Down
3 changes: 0 additions & 3 deletions xmtp_mls/src/groups/mls_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ where
// TODO: Should probably be renamed to `sync_with_provider`
#[tracing::instrument(skip_all)]
pub async fn sync_with_conn(&self, provider: &XmtpOpenMlsProvider) -> Result<(), GroupError> {
tracing::info!("RECEIVING");
let _mutex = self.mutex.lock().await;
let mut errors: Vec<GroupError> = vec![];

Expand Down Expand Up @@ -1116,12 +1115,10 @@ where

#[tracing::instrument(skip_all, level = "debug")]
pub(super) async fn receive(&self, provider: &XmtpOpenMlsProvider) -> Result<(), GroupError> {
tracing::info!("RECEIVING");
let messages = self
.client
.query_group_messages(&self.group_id, provider.conn_ref())
.await?;
tracing::info!("CONTINUING TO PROCESS");
self.process_messages(messages, provider).await?;
Ok(())
}
Expand Down
9 changes: 2 additions & 7 deletions xmtp_mls/src/groups/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,13 +478,8 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
// Get the group ID for locking
let group_id = self.group_id.clone();

tracing::info!(
"TRYING TO LOAD MLS GROUP for group_id={}",
hex::encode(&group_id)
);
// Acquire the lock asynchronously
let _lock = self.mls_commit_lock.get_lock_async(group_id.clone()).await;
tracing::info!("LOADING GROUP");

// Load the MLS group
let mls_group =
Expand All @@ -493,7 +488,7 @@ impl<ScopedClient: ScopedGroupClient> MlsGroup<ScopedClient> {
.ok_or(StorageError::from(NotFound::GroupById(
self.group_id.to_vec(),
)))?;
tracing::info!("PERFORM OPERATION");

// Perform the operation with the MLS group
operation(mls_group).await.map_err(Into::into)
}
Expand Down Expand Up @@ -1941,6 +1936,7 @@ pub(crate) mod tests {
#[cfg(target_arch = "wasm32")]
wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_dedicated_worker);

use crate::groups::scoped_client::ScopedGroupClient;
use diesel::connection::SimpleConnection;
use diesel::RunQueryDsl;
use futures::future::join_all;
Expand All @@ -1956,7 +1952,6 @@ pub(crate) mod tests {

use super::{group_permissions::PolicySet, DMMetadataOptions, MlsGroup};
use crate::groups::group_mutable_metadata::MessageDisappearingSettings;
use crate::groups::scoped_client::ScopedGroupClient;
use crate::groups::{
MAX_GROUP_DESCRIPTION_LENGTH, MAX_GROUP_IMAGE_URL_LENGTH, MAX_GROUP_NAME_LENGTH,
};
Expand Down
5 changes: 1 addition & 4 deletions xmtp_mls/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub mod verified_key_package_v2;
pub use client::{Client, Network};
use parking_lot::Mutex;
use std::collections::HashMap;
use std::sync::{Arc, LazyLock};
use std::sync::Arc;
use storage::{xmtp_openmls_provider::XmtpOpenMlsProvider, DuplicateItem, StorageError};
use tokio::sync::Mutex as TokioMutex;

Expand Down Expand Up @@ -84,9 +84,6 @@ pub struct MlsGroupGuard {
_permit: tokio::sync::OwnedMutexGuard<()>,
}

// Static instance of `GroupCommitLock`
// pub static MLS_COMMIT_LOCK: LazyLock<GroupCommitLock> = LazyLock::new(GroupCommitLock::new);

/// Inserts a model to the underlying data store, erroring if it already exists
pub trait Store<StorageConnection> {
fn store(&self, into: &StorageConnection) -> Result<(), StorageError>;
Expand Down
Loading

0 comments on commit ab08ea7

Please sign in to comment.