Skip to content

Commit

Permalink
feat: chunk role mentions
Browse files Browse the repository at this point in the history
  • Loading branch information
IAmTomahawkx committed Jan 31, 2025
1 parent acb6a9e commit 162f156
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 28 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ Revolt.*.toml
target
.data
.env
.venv/

.vercel
.DS_Store
Expand Down
9 changes: 8 additions & 1 deletion crates/core/database/src/models/server_members/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::{FieldsMember, Member, MemberCompositeKey, PartialMember};
mod mongodb;
mod reference;

#[derive(Debug)]
pub enum ChunkedServerMembersGenerator {
#[cfg(feature = "mongodb")]
MongoDb {
Expand Down Expand Up @@ -85,9 +86,15 @@ pub trait AbstractServerMembers: Sync + Send {
async fn fetch_all_members_with_roles(
&self,
server_id: &str,
roles: &Vec<String>,
roles: &[String],
) -> Result<Vec<Member>>;

async fn fetch_all_members_with_roles_chunked(
&self,
server_id: &str,
roles: &[String],
) -> Result<ChunkedServerMembersGenerator>;

/// Fetch all memberships for a user
async fn fetch_all_memberships<'a>(&self, user_id: &str) -> Result<Vec<Member>>;

Expand Down
34 changes: 33 additions & 1 deletion crates/core/database/src/models/server_members/ops/mongodb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl AbstractServerMembers for MongoDb {
async fn fetch_all_members_with_roles(
&self,
server_id: &str,
roles: &Vec<String>,
roles: &[String],
) -> Result<Vec<Member>> {
Ok(self
.col::<Member>(COL)
Expand All @@ -106,6 +106,38 @@ impl AbstractServerMembers for MongoDb {
.await)
}

async fn fetch_all_members_with_roles_chunked(
&self,
server_id: &str,
roles: &[String],
) -> Result<ChunkedServerMembersGenerator> {
let config = revolt_config::config().await;

let mut session = self
.start_session()
.await
.map_err(|_| create_database_error!("start_session", COL))?;

session
.start_transaction()
.read_concern(ReadConcern::snapshot())
.await
.map_err(|_| create_database_error!("start_transaction", COL))?;

let cursor = self
.col::<Member>(COL)
.find(doc! {
"_id.server": server_id,
"roles": {"$in": roles}
})
.session(&mut session)
.batch_size(config.pushd.mass_mention_chunk_size as u32)
.await
.map_err(|_| create_database_error!("find", COL))?;

return Ok(ChunkedServerMembersGenerator::new_mongo(session, cursor));
}

/// Fetch all memberships for a user
async fn fetch_all_memberships<'a>(&self, user_id: &str) -> Result<Vec<Member>> {
Ok(self
Expand Down
26 changes: 25 additions & 1 deletion crates/core/database/src/models/server_members/ops/reference.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl AbstractServerMembers for ReferenceDb {
async fn fetch_all_members_with_roles(
&self,
server_id: &str,
roles: &Vec<String>,
roles: &[String],
) -> Result<Vec<Member>> {
let server_members = self.server_members.lock().await;

Expand All @@ -80,6 +80,30 @@ impl AbstractServerMembers for ReferenceDb {
.collect())
}

async fn fetch_all_members_with_roles_chunked(
&self,
server_id: &str,
roles: &[String],
) -> Result<ChunkedServerMembersGenerator> {
let server_members = self.server_members.lock().await;

let resp = server_members
.clone()
.into_values()
.filter(|member| {
member.id.server == server_id
&& !member
.roles
.iter()
.filter(|p| roles.contains(*p))
.collect::<Vec<&String>>()
.is_empty()
})
.collect();

return Ok(ChunkedServerMembersGenerator::new_reference(resp));
}

/// Fetch all memberships for a user
async fn fetch_all_memberships<'a>(&self, user_id: &str) -> Result<Vec<Member>> {
let server_members = self.server_members.lock().await;
Expand Down
69 changes: 45 additions & 24 deletions crates/daemons/pushd/src/consumers/inbound/mass_mention.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ impl AsyncConsumer for MassMessageConsumer {
let userids: Vec<String> =
chunk.iter().map(|member| member.id.user.clone()).collect();

debug!("Userids in chunk: {:?}", userids);

if let Err(err) = self
.db
.add_mention_to_many_unreads(push.channel.id(), &userids, &ack_chnl)
Expand All @@ -196,6 +198,11 @@ impl AsyncConsumer for MassMessageConsumer {
.cloned()
.collect();

debug!(
"Userids after filter: {:?} (online: {:?}",
target_users, online_users
);

self.fire_notification_for_users(&push, &target_users).await;

if exhausted {
Expand All @@ -206,7 +213,7 @@ impl AsyncConsumer for MassMessageConsumer {
// role mentions
let _role_members = self
.db
.fetch_all_members_with_roles(&payload.server_id, roles)
.fetch_all_members_with_roles_chunked(&payload.server_id, roles)
.await;

debug!("role members: {:?}", _role_members);
Expand All @@ -216,37 +223,51 @@ impl AsyncConsumer for MassMessageConsumer {
return;
}

let role_members = _role_members.unwrap();
let mut role_members = _role_members.unwrap();
let mut chunk = vec![];
let mut exhausted = false;

// TODO: this should probably be chunked in the future
let mut q = query.clone().members(&role_members);
let viewing_members: Vec<String> = q
.members_can_see_channel()
.await
.iter()
.filter_map(|(uid, viewable)| {
if *viewable && !existing_mentions.contains(uid) {
Some(uid.clone())
while !exhausted {
chunk.clear();

for _ in 0..config.pushd.mass_mention_chunk_size {
if let Some(member) = role_members.next().await {
chunk.push(member);
} else {
None
exhausted = true;
break;
}
})
.collect();
}

debug!("viewing members: {:?}", viewing_members);
let mut q = query.clone().members(&chunk);
let viewing_members: Vec<String> = q
.members_can_see_channel()
.await
.iter()
.filter_map(|(uid, viewable)| {
if *viewable && !existing_mentions.contains(uid) {
Some(uid.clone())
} else {
None
}
})
.collect();

let online = revolt_presence::filter_online(&viewing_members).await;
debug!("online: {:?}", online);
debug!("viewing members: {:?}", viewing_members);

let targets: Vec<String> = viewing_members
.iter()
.filter(|m| !online.contains(*m))
.cloned()
.collect();
let online = revolt_presence::filter_online(&viewing_members).await;
debug!("online: {:?}", online);

debug!("targets: {:?}", targets);
let targets: Vec<String> = viewing_members
.iter()
.filter(|m| !online.contains(*m))
.cloned()
.collect();

self.fire_notification_for_users(&push, &targets).await;
debug!("targets: {:?}", targets);

self.fire_notification_for_users(&push, &targets).await;
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/daemons/pushd/src/consumers/outbound/apn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ impl ApnsOutboundConsumer {
}
}

println!("Got badge count for APN: {}", mention_count);
debug!("Got badge count for APN: {}", mention_count);

return Some(mention_count);
}
Expand Down

0 comments on commit 162f156

Please sign in to comment.