Skip to content

Commit

Permalink
Error handling for rate limits (#1601)
Browse files Browse the repository at this point in the history
* error rework
* cooldown in retry strategy

---------

Co-authored-by: Mojtaba Chenani <[email protected]>
  • Loading branch information
insipx and mchenani authored Feb 12, 2025
1 parent f5b1089 commit 656e039
Show file tree
Hide file tree
Showing 85 changed files with 3,005 additions and 1,436 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test-webassembly.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ on:
env:
CARGO_TERM_COLOR: always
CARGO_INCREMENTAL: 0
RUSTFLAGS: --cfg tracing_unstable
RUSTFLAGS: --cfg tracing_unstable -Ctarget-feature=+bulk-memory,+mutable-globals,+atomics
CARGO_PROFILE_TEST_DEBUG: 0
WASM_BINDGEN_TEST_TIMEOUT: 480
WASM_BINDGEN_TEST_ONLY_WEB: 1
Expand All @@ -41,7 +41,7 @@ jobs:
run: cargo build --locked --tests --release --target wasm32-unknown-unknown -p xmtp_id -p xmtp_mls -p xmtp_api_http -p xmtp_cryptography -p xmtp_common
- name: test with chrome
run: |
cargo test --locked --release --target wasm32-unknown-unknown -p xmtp_mls -p xmtp_id -p xmtp_api_http -p xmtp_cryptography -- \
cargo test --locked --release --target wasm32-unknown-unknown -p xmtp_mls -p xmtp_id -p xmtp_api_http -p xmtp_cryptography -p xmtp_api -- \
--skip xmtp_mls::storage::encrypted_store::group_message::tests::it_cannot_insert_message_without_group \
--skip xmtp_mls::groups::tests::process_messages_abort_on_retryable_error \
--skip xmtp_mls::storage::encrypted_store::group::tests::test_find_groups \
Expand Down
1 change: 0 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
"rust-analyzer.procMacro.attributes.enable": true,
"rust-analyzer.procMacro.ignored": {
"tracing": ["instrument"],
"async-trait": ["async_trait"],
"napi-derive": ["napi"],
"async-recursion": ["async_recursion"],
"ctor": ["ctor"],
Expand Down
48 changes: 44 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ members = [
"xmtp_user_preferences",
"xmtp_v2",
"xmtp_mls",
"xmtp_api",
"xmtp_id",
"bindings_wasm",
"bindings_node",
Expand Down Expand Up @@ -51,13 +52,15 @@ ethers = { version = "2.0", default-features = false }
fdlimit = "0.3"
futures = { version = "0.3.30", default-features = false }
getrandom = { version = "0.2", default-features = false }
arc-swap = "1.7"
gloo-timers = "0.3"
hex = "0.4.3"
hkdf = "0.12.3"
js-sys = "0.3"
libsqlite3-sys = { version = "0.29", features = [
"bundled-sqlcipher-vendored-openssl",
] }
mockall = "0.13"
once_cell = "1.2"
openmls = { git = "https://github.com/xmtp/openmls", rev = "082cab5f17a54796e87bc1762a64496c86cb9bf8", default-features = false }
openmls_basic_credential = { git = "https://github.com/xmtp/openmls", rev = "082cab5f17a54796e87bc1762a64496c86cb9bf8" }
Expand All @@ -81,33 +84,38 @@ sqlite-web = "0.0.1"
thiserror = "2.0"
tls_codec = "0.4.1"
tokio = { version = "1.43.0", default-features = false }
tokio-stream = { version = "0.1", default-features = false }
uuid = "1.12"
vergen-git2 = "1.0.2"
web-time = "1.1"
bytes = "1.9"
pin-project-lite = "0.2"
reqwest = { version = "0.12.12", features = ["json", "stream"] }
itertools = "0.14"
tonic = { version = "0.12", default-features = false }
tracing = { version = "0.1", features = ["log"] }
tracing-logfmt = "0.3"
tracing-subscriber = { version = "0.3", default-features = false }
trait-variant = "0.1.2"
url = "2.5.0"
wasm-bindgen = { version = "=0.2.100", features = ["enable-interning"] }
wasm-bindgen = "=0.2.100"
wasm-bindgen-futures = "0.4.50"
wasm-bindgen-test = "0.3.50"
web-sys = "0.3"
zeroize = "1.8"

# Internal Crate Dependencies
xmtp_api = { path = "xmtp_api" }
xmtp_api_grpc = { path = "xmtp_api_grpc" }
xmtp_api_http = { path = "xmtp_api_http" }
xmtp_common = { path = "common" }
xmtp_content_types = { path = "xmtp_content_types" }
xmtp_cryptography = { path = "xmtp_cryptography" }
xmtp_id = { path = "xmtp_id" }
xmtp_mls = { path = "xmtp_mls" }
xmtp_proto = { path = "xmtp_proto" }


[profile.dev]
# Disabling debug info speeds up builds a bunch,
# and we don't rely on it for debugging that much.
Expand Down
11 changes: 6 additions & 5 deletions bindings_ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ tracing-subscriber = { workspace = true, features = [
"json",
] }
uniffi = { version = "0.28.0", default-features = false, features = ["tokio"] }
xmtp_api_grpc = { path = "../xmtp_api_grpc" }
xmtp_api.workspace = true
xmtp_api_grpc.workspace = true
xmtp_common.workspace = true
xmtp_content_types = { path = "../xmtp_content_types" }
xmtp_cryptography = { path = "../xmtp_cryptography" }
xmtp_id = { path = "../xmtp_id" }
xmtp_mls = { path = "../xmtp_mls" }
xmtp_content_types.workspace = true
xmtp_cryptography.workspace = true
xmtp_id.workspace = true
xmtp_mls.workspace = true
xmtp_proto = { path = "../xmtp_proto", features = ["proto_full"] }
xmtp_user_preferences = { path = "../xmtp_user_preferences" }
xmtp_v2 = { path = "../xmtp_v2" }
Expand Down
6 changes: 4 additions & 2 deletions bindings_ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ pub enum GenericError {
ClientBuilder(#[from] xmtp_mls::builder::ClientBuilderError),
#[error("Storage error: {0}")]
Storage(#[from] xmtp_mls::storage::StorageError),
#[error("API error: {0}")]
ApiError(#[from] xmtp_proto::Error),
#[error("Group error: {0}")]
GroupError(#[from] xmtp_mls::groups::GroupError),
#[error("Signature: {0}")]
Expand Down Expand Up @@ -62,6 +60,10 @@ pub enum GenericError {
IoError(#[from] tokio::io::Error),
#[error(transparent)]
Subscription(#[from] xmtp_mls::subscriptions::SubscribeError),
#[error(transparent)]
ApiClientBuild(#[from] xmtp_api_grpc::GrpcBuilderError),
#[error(transparent)]
Grpc(#[from] xmtp_api_grpc::GrpcError),
}

#[derive(uniffi::Error, thiserror::Error, Debug)]
Expand Down
20 changes: 13 additions & 7 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ use crate::{FfiSubscribeError, GenericError};
use prost::Message;
use std::{collections::HashMap, convert::TryInto, sync::Arc};
use tokio::sync::Mutex;
use xmtp_api::{strategies, ApiClientWrapper};
use xmtp_api_grpc::grpc_api_helper::Client as TonicApiClient;
use xmtp_common::{AbortHandle, GenericStreamHandle, StreamHandle};
use xmtp_content_types::reaction::ReactionCodec;
use xmtp_content_types::ContentCodec;
use xmtp_id::associations::{verify_signed_with_public_context, DeserializationError};
Expand All @@ -28,8 +30,6 @@ use xmtp_mls::storage::group::ConversationType;
use xmtp_mls::storage::group_message::{ContentType, MsgQueryArgs};
use xmtp_mls::storage::group_message::{SortDirection, StoredGroupMessageWithReactions};
use xmtp_mls::{
api::ApiClientWrapper,
builder::ClientBuilder,
client::Client as MlsClient,
groups::{
group_metadata::GroupMetadata,
Expand All @@ -51,8 +51,8 @@ use xmtp_mls::{
EncryptedMessageStore, EncryptionKey, StorageOption,
},
subscriptions::SubscribeError,
AbortHandle, GenericStreamHandle, StreamHandle,
};
use xmtp_proto::api_client::ApiBuilder;
use xmtp_proto::xmtp::device_sync::BackupElementSelection;
use xmtp_proto::xmtp::mls::message_contents::content_types::ReactionV2;
use xmtp_proto::xmtp::mls::message_contents::{DeviceSyncKind, EncodedContent};
Expand All @@ -74,7 +74,11 @@ pub async fn connect_to_backend(
host,
is_secure
);
let api_client = TonicApiClient::create(host, is_secure).await?;
let mut api_client = TonicApiClient::builder();
api_client.set_host(host);
api_client.set_tls(true);
api_client.set_libxmtp_version(env!("CARGO_PKG_VERSION").into())?;
let api_client = api_client.build().await?;
Ok(Arc::new(XmtpApiClient(api_client)))
}

Expand Down Expand Up @@ -140,8 +144,9 @@ pub async fn create_client(
legacy_signed_private_key_proto,
);

let mut builder = ClientBuilder::new(identity_strategy)
let mut builder = xmtp_mls::Client::builder(identity_strategy)
.api_client(Arc::unwrap_or_clone(api).0)
.with_remote_verifier()?
.store(store);

if let Some(url) = &history_sync_url {
Expand All @@ -166,7 +171,8 @@ pub async fn get_inbox_id_for_address(
api: Arc<XmtpApiClient>,
account_address: String,
) -> Result<Option<String>, GenericError> {
let api = ApiClientWrapper::new(Arc::new(api.0.clone()), Default::default());
let mut api =
ApiClientWrapper::new(Arc::new(api.0.clone()), strategies::exponential_cooldown());
let results = api
.get_inbox_ids(vec![account_address.clone()])
.await
Expand Down Expand Up @@ -2297,7 +2303,7 @@ impl FfiStreamCloser {

/// End the stream and asynchronously wait for it to shutdown
pub async fn end_and_wait(&self) -> Result<(), GenericError> {
use xmtp_mls::StreamHandleError::*;
use xmtp_common::StreamHandleError::*;
use GenericError::Generic;

if self.abort_handle.is_finished() {
Expand Down
5 changes: 2 additions & 3 deletions bindings_ffi/src/v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,6 @@ mod tests {

use futures::stream;
use xmtp_proto::api_client::Envelope;
use xmtp_proto::Error as ApiError;

use crate::{
v2::{
Expand Down Expand Up @@ -610,7 +609,7 @@ mod tests {

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_subscribe() {
let items: Vec<Result<Envelope, ApiError>> = vec![
let items: Vec<Result<Envelope, xmtp_api_grpc::GrpcError>> = vec![
Ok(Envelope {
content_topic: "test1".to_string(),
timestamp_ns: 0,
Expand Down Expand Up @@ -646,7 +645,7 @@ mod tests {

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_subscription_close() {
let items: Vec<Result<Envelope, ApiError>> = vec![
let items: Vec<Result<Envelope, xmtp_api_grpc::GrpcError>> = vec![
Ok(Envelope {
content_topic: "test1".to_string(),
timestamp_ns: 0,
Expand Down
Loading

0 comments on commit 656e039

Please sign in to comment.