Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message Stream Test + Fixes #1646

Merged
merged 12 commits into from
Feb 20, 2025
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions bindings_ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ fdlimit = { version = "0.3", optional = true }

[target.'cfg(target_os = "android")'.dependencies]
paranoid-android = "0.2"
tracing_android_trace = { version = "0.1", features = ["api_level_29"] }

[target.'cfg(target_os = "ios")'.dependencies]
tracing-oslog = "0.2"
Expand Down
14 changes: 11 additions & 3 deletions bindings_ffi/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,17 @@ mod android {
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
paranoid_android::layer(env!("CARGO_PKG_NAME"))
.with_thread_names(true)
.with_filter(tracing_subscriber::filter::LevelFilter::DEBUG)
use tracing_subscriber::EnvFilter;
let api_calls_filter = EnvFilter::builder().parse_lossy("xmtp_api=debug");
vec![
paranoid_android::layer(env!("CARGO_PKG_NAME"))
.with_thread_names(true)
.with_filter(tracing_subscriber::filter::LevelFilter::DEBUG)
.boxed(),
tracing_android_trace::AndroidTraceAsyncLayer::new()
.with_filter(api_calls_filter)
.boxed(),
]
}
}

Expand Down
3 changes: 3 additions & 0 deletions common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ tracing-subscriber = { workspace = true, features = [
"ansi",
"json",
] }
parking_lot.workspace = true
once_cell.workspace = true

[target.'cfg(target_arch = "wasm32")'.dev-dependencies]
tokio = { workspace = true, features = ["time", "macros", "rt", "sync"] }
Expand All @@ -73,6 +75,7 @@ test-utils = [
"dep:tracing-wasm",
"dep:console_error_panic_hook",
"dep:tracing-forest",
"dep:once_cell",
]
bench = [
"test-utils",
Expand Down
37 changes: 37 additions & 0 deletions common/src/fmt.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
pub fn truncate_hex(hex_string: impl AsRef<str>) -> String {
let hex_string = hex_string.as_ref();
// If empty string, return it
if hex_string.is_empty() {
return String::new();
}

let hex_value = if let Some(hex_value) = hex_string.strip_prefix("0x") {
hex_value
} else {
hex_string
};

// If the hex value is 8 or fewer chars, return original string
if hex_value.len() <= 8 {
return hex_string.to_string();
}

format!(
"0x{}...{}",
&hex_value[..4],
&hex_value[hex_value.len() - 4..]
)
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_long_hex() {
assert_eq!(
truncate_hex("0x5bf078bd83995fe83092d93c5655f059"),
"0x5bf0...f059"
);
}
}
2 changes: 2 additions & 0 deletions common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ pub use stream_handles::*;

pub mod time;

pub mod fmt;

use rand::{
distributions::{Alphanumeric, DistString},
RngCore,
Expand Down
42 changes: 39 additions & 3 deletions common/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ use rand::{
use std::{future::Future, sync::OnceLock};
use xmtp_cryptography::utils as crypto_utils;

use once_cell::sync::Lazy;
use parking_lot::Mutex;
use std::collections::HashMap;

#[cfg(not(target_arch = "wasm32"))]
pub mod traced_test;
#[cfg(not(target_arch = "wasm32"))]
Expand All @@ -19,6 +23,32 @@ mod macros;

static INIT: OnceLock<()> = OnceLock::new();

static REPLACE_IDS: Lazy<Mutex<HashMap<String, String>>> = Lazy::new(|| Mutex::new(HashMap::new()));

/// Replace inbox id in Contextual output with a name (i.e Alix, Bo, etc.)
#[derive(Default)]
pub struct InboxIdReplace {
ids: HashMap<String, String>,
}

impl InboxIdReplace {
pub fn add(&mut self, id: &str, name: &str) {
self.ids.insert(id.to_string(), name.to_string());
let mut ids = REPLACE_IDS.lock();
ids.insert(id.to_string(), name.to_string());
}
}

// remove ids for replacement from map on drop
impl Drop for InboxIdReplace {
fn drop(&mut self) {
let mut ids = REPLACE_IDS.lock();
for id in self.ids.keys() {
let _ = ids.remove(id.as_str());
}
}
}

#[cfg(not(all(target_family = "wasm", target_os = "unknown")))]
use tracing_subscriber::{
fmt::{self, format},
Expand Down Expand Up @@ -68,7 +98,13 @@ where
.fmt_fields({
format::debug_fn(move |writer, field, value| {
if field.name() == "message" {
write!(writer, "{:?}", value)?;
let mut message = format!("{:?}", value);
let ids = REPLACE_IDS.lock();
for (id, name) in ids.iter() {
message = message.replace(id, name);
}

write!(writer, "{}", message)?;
}
Ok(())
})
Expand Down Expand Up @@ -100,8 +136,8 @@ pub fn logger() {

INIT.get_or_init(|| {
let filter = EnvFilter::builder()
.with_default_directive(tracing::metadata::LevelFilter::TRACE.into())
.parse_lossy("debug");
// .with_default_directive(tracing::metadata::LevelFilter::INFO.into())
.parse_lossy("xmtp_mls::subscriptions=debug,xmtp_mls::groups=info");

tracing_subscriber::registry()
.with(tracing_wasm::WASMLayer::default())
Expand Down
10 changes: 8 additions & 2 deletions common/src/test/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,14 @@ impl Contextual {
}

fn format_event(event: &Event, writer: &mut String) -> fmt::Result {
if let Some(message) = event.message() {
writer.write_str(message)?;
let mut message = String::new();
if let Some(msg) = event.message() {
message += msg;
let ids = super::REPLACE_IDS.lock();
for (id, name) in ids.iter() {
message = message.replace(id, name);
}
writer.write_str(&message)?;
}
/*
for field in event.fields().iter() {
Expand Down
6 changes: 3 additions & 3 deletions xmtp_api/src/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ where
Ok(())
}

#[tracing::instrument(level = "trace", skip_all)]
#[tracing::instrument(level = "debug", skip(self), fields(len = filters.len()))]
pub async fn get_identity_updates_v2<T>(
&self,
filters: Vec<GetIdentityUpdatesV2Filter>,
Expand Down Expand Up @@ -93,7 +93,7 @@ where
Ok(res)
}

#[tracing::instrument(level = "trace", skip_all)]
#[tracing::instrument(level = "debug", skip(self), fields(len = account_addresses.len()))]
pub async fn get_inbox_ids(
&self,
account_addresses: Vec<String>,
Expand All @@ -120,7 +120,7 @@ where
.collect())
}

#[tracing::instrument(level = "trace", skip_all)]
#[tracing::instrument(level = "debug", skip_all)]
pub async fn verify_smart_contract_wallet_signatures(
&self,
request: VerifySmartContractWalletSignaturesRequest,
Expand Down
15 changes: 7 additions & 8 deletions xmtp_api/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ impl<ApiClient> ApiClientWrapper<ApiClient>
where
ApiClient: XmtpApi,
{
#[tracing::instrument(level = "trace", skip_all)]
#[tracing::instrument(level = "debug", skip(self), fields(group_id = hex::encode(&group_id)))]
pub async fn query_group_messages(
&self,
group_id: Vec<u8>,
Expand Down Expand Up @@ -112,11 +112,11 @@ where

id_cursor = Some(paging_info.id_cursor);
}

Ok(out)
}

/// Query for the latest message on a group
#[tracing::instrument(level = "debug", skip(self), fields(group_id = hex::encode(group_id)))]
pub async fn query_latest_group_message<Id: AsRef<[u8]> + Copy>(
&self,
group_id: Id,
Expand Down Expand Up @@ -146,7 +146,7 @@ where
Ok(result.messages.into_iter().next())
}

#[tracing::instrument(level = "trace", skip_all)]
#[tracing::instrument(level = "debug", skip(self), fields(installation_id = hex::encode(&installation_id)))]
pub async fn query_welcome_messages<Id: AsRef<[u8]> + Copy>(
&self,
installation_id: Id,
Expand Down Expand Up @@ -201,7 +201,7 @@ where
/// New InboxID clients should set `is_inbox_id_credential` to true.
/// V3 clients should have `is_inbox_id_credential` to `false`.
/// Not indicating your client version will result in validation failure.
#[tracing::instrument(level = "trace", skip_all)]
#[tracing::instrument(level = "debug", skip_all)]
pub async fn upload_key_package(
&self,
key_package: Vec<u8>,
Expand All @@ -226,7 +226,7 @@ where
Ok(())
}

#[tracing::instrument(level = "trace", skip_all)]
#[tracing::instrument(level = "debug", skip_all)]
pub async fn fetch_key_packages(
&self,
installation_keys: Vec<Vec<u8>>,
Expand Down Expand Up @@ -266,7 +266,7 @@ where
Ok(mapping)
}

#[tracing::instrument(level = "trace", skip_all)]
#[tracing::instrument(level = "debug", skip_all)]
pub async fn send_welcome_messages(&self, messages: &[WelcomeMessageInput]) -> Result<()> {
tracing::debug!(inbox_id = self.inbox_id, "send welcome messages");
retry_async!(
Expand All @@ -284,7 +284,7 @@ where
Ok(())
}

#[tracing::instrument(level = "trace", skip_all)]
#[tracing::instrument(level = "debug", skip_all)]
pub async fn send_group_messages(&self, group_messages: Vec<GroupMessageInput>) -> Result<()> {
tracing::debug!(
inbox_id = self.inbox_id,
Expand Down Expand Up @@ -719,7 +719,6 @@ pub mod tests {
#[cfg_attr(target_arch = "wasm32", wasm_bindgen_test::wasm_bindgen_test)]
#[cfg_attr(not(target_arch = "wasm32"), tokio::test)]
async fn cooldowns_apply_to_concurrent_fns() {
xmtp_common::logger();
let mut mock_api = MockApiClient::new();
let group_id = vec![1, 2, 3];

Expand Down
2 changes: 1 addition & 1 deletion xmtp_api_http/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ pub enum HttpClientError {
HeaderValue(#[from] reqwest::header::InvalidHeaderValue),
#[error(transparent)]
HeaderName(#[from] reqwest::header::InvalidHeaderName),
#[error(transparent)]
#[error("error deserializing json response {0}")]
Json(#[from] serde_json::Error),
}

Expand Down
14 changes: 5 additions & 9 deletions xmtp_api_http/src/http_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,24 +110,20 @@ where
{
fn on_bytes(bytes: bytes::Bytes, remaining: &mut Vec<u8>) -> Result<Vec<R>, HttpClientError> {
let bytes = &[remaining.as_ref(), bytes.as_ref()].concat();
let de = Deserializer::from_slice(bytes);
remaining.clear();
let de = Deserializer::from_slice(&bytes);
let mut deser_stream = de.into_iter::<GrpcResponse<R>>();
let mut items = Vec::new();
loop {
let item = deser_stream.next();
if item.is_none() {
break;
}
match item.expect("checked for none;") {
while let Some(item) = deser_stream.next() {
match item {
Ok(GrpcResponse::Ok(response)) => items.push(response),
Ok(GrpcResponse::SubscriptionItem(item)) => items.push(item.result),
Ok(GrpcResponse::Err(e)) => {
return Err(HttpClientError::Grpc(e));
}
Err(e) => {
if e.is_eof() {
*remaining = (&**bytes)[deser_stream.byte_offset()..].to_vec();
break;
*remaining = bytes[deser_stream.byte_offset()..].to_vec();
} else {
return Err(HttpClientError::from(e));
}
Expand Down
1 change: 1 addition & 0 deletions xmtp_mls/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ xmtp_api = { workspace = true, features = ["test-utils"] }
xmtp_id = { path = "../xmtp_id", features = ["test-utils"] }
xmtp_proto = { workspace = true, features = ["test-utils"] }
fdlimit = { workspace = true }
once_cell.workspace = true

[target.'cfg(not(target_arch = "wasm32"))'.dev-dependencies]
ctor.workspace = true
Expand Down
Loading
Loading