Skip to content

Commit

Permalink
fix(disappearing-messages): run disappearing messages cleaner worker …
Browse files Browse the repository at this point in the history
…every second (#1611)

* run disappearing messages worker every second
  • Loading branch information
mchenani authored Feb 7, 2025
1 parent cb8ee07 commit 41cba8c
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 17 deletions.
4 changes: 0 additions & 4 deletions bindings_ffi/src/mls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5080,10 +5080,6 @@ mod tests {
.unwrap();
let msg_counts_before_cleanup = alix_messages.len();

// Step 7: Start cleanup worker and delete expired messages
alix.inner_client
.start_disappearing_messages_cleaner_worker();

// Wait for cleanup to complete
tokio::time::sleep(std::time::Duration::from_secs(2)).await;

Expand Down
39 changes: 39 additions & 0 deletions common/src/time.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Time primitives for native and WebAssembly
use std::fmt;
use tokio::time;

#[derive(Debug)]
pub struct Expired;
Expand Down Expand Up @@ -96,3 +97,41 @@ pub async fn sleep(duration: Duration) {
pub async fn sleep(duration: Duration) {
tokio::time::sleep(duration).await
}

pub struct Interval {
#[cfg(target_arch = "wasm32")]
duration: Duration,

#[cfg(not(target_arch = "wasm32"))]
inner: time::Interval,
}

impl Interval {
/// Creates a new interval that ticks every `duration`
pub fn new(duration: Duration) -> Self {
#[cfg(target_arch = "wasm32")]
{
Self { duration }
}

#[cfg(not(target_arch = "wasm32"))]
{
Self {
inner: time::interval(duration),
}
}
}

/// Waits for the next tick of the interval
pub async fn tick(&mut self) {
#[cfg(target_arch = "wasm32")]
{
sleep(self.duration).await;
}

#[cfg(not(target_arch = "wasm32"))]
{
self.inner.tick().await;
}
}
}
27 changes: 14 additions & 13 deletions xmtp_mls/src/groups/disappearing_messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@ use crate::Client;
use std::time::Duration;
use thiserror::Error;
use tokio::sync::OnceCell;
use xmtp_common::time::Interval;
use xmtp_id::scw_verifier::SmartContractSignatureVerifier;
use xmtp_proto::api_client::trait_impls::XmtpApi;

/// Restart the DisappearingMessagesCleanerWorker every 1 sec to delete the expired messages
/// Duration to wait before restarting the worker in case of an error.
pub const WORKER_RESTART_DELAY: Duration = Duration::from_secs(1);

/// Interval at which the DisappearingMessagesCleanerWorker runs to delete expired messages.
pub const INTERVAL_DURATION: Duration = Duration::from_secs(1);

#[derive(Debug, Error)]
pub enum DisappearingMessagesCleanerError {
#[error("storage error: {0}")]
Expand Down Expand Up @@ -68,21 +72,18 @@ where
{
/// Iterate on the list of groups and delete expired messages
async fn delete_expired_messages(&mut self) -> Result<(), DisappearingMessagesCleanerError> {
let provider = self.client.mls_provider()?;
match provider.conn_ref().delete_expired_messages() {
Ok(deleted_count) => {
tracing::info!("Successfully deleted {} expired messages", deleted_count);
}
Err(e) => {
tracing::error!("Failed to delete expired messages, error: {:?}", e);
}
}
self.client
.mls_provider()?
.conn_ref()
.delete_expired_messages()?;
Ok(())
}
async fn run(&mut self) -> Result<(), DisappearingMessagesCleanerError> {
if let Err(err) = self.delete_expired_messages().await {
tracing::error!("Error during deletion of expired messages: {:?}", err);
let mut interval = Interval::new(INTERVAL_DURATION);

loop {
interval.tick().await;
self.delete_expired_messages().await?;
}
Ok(())
}
}

0 comments on commit 41cba8c

Please sign in to comment.