From c55b5bf75f08e2aa4dd13f6211962bf2c3384c21 Mon Sep 17 00:00:00 2001 From: Zomatree Date: Mon, 27 Jan 2025 20:46:29 +0000 Subject: [PATCH] fix: include voice states in servercreate event feat: call started system message in dms --- Cargo.lock | 24 +-- crates/bonfire/Cargo.toml | 1 - crates/bonfire/src/config.rs | 1 + crates/bonfire/src/events/impl.rs | 33 +--- crates/bonfire/src/main.rs | 2 +- crates/core/database/Cargo.toml | 5 + crates/core/database/src/events/client.rs | 1 + crates/core/database/src/lib.rs | 3 + .../database/src/models/messages/model.rs | 5 + .../src/models/server_members/model.rs | 10 + crates/core/database/src/util/bridge/v0.rs | 1 + .../src/lib.rs => database/src/voice/mod.rs} | 173 ++++++------------ .../core/database/src/voice/voice_client.rs | 108 +++++++++++ crates/core/models/src/v0/messages.rs | 3 + crates/core/result/src/lib.rs | 2 + crates/core/voice/Cargo.toml | 33 ---- crates/daemons/voice-ingress/Cargo.toml | 1 - crates/daemons/voice-ingress/src/main.rs | 2 +- crates/delta/Cargo.toml | 1 - crates/delta/src/main.rs | 4 +- .../delta/src/routes/channels/voice_join.rs | 30 ++- crates/delta/src/routes/servers/ban_create.rs | 2 +- .../delta/src/routes/servers/member_edit.rs | 2 +- .../delta/src/routes/servers/roles_delete.rs | 5 +- crates/delta/src/routes/servers/roles_edit.rs | 5 +- 25 files changed, 237 insertions(+), 220 deletions(-) rename crates/core/{voice/src/lib.rs => database/src/voice/mod.rs} (53%) create mode 100644 crates/core/database/src/voice/voice_client.rs delete mode 100644 crates/core/voice/Cargo.toml diff --git a/Cargo.lock b/Cargo.lock index 51db43a61..9db589c30 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6268,7 +6268,6 @@ dependencies = [ "revolt-permissions", "revolt-presence", "revolt-result", - "revolt-voice", "rmp-serde", "sentry", "serde", @@ -6313,6 +6312,9 @@ dependencies = [ "isahc", "iso8601-timestamp 0.2.17", "linkify 0.8.1", + "livekit-api", + "livekit-protocol", + "livekit-runtime", "log", "lru 0.11.1", "mongodb", @@ -6375,7 +6377,6 @@ dependencies = [ "revolt-permissions", "revolt-presence", "revolt-result", - "revolt-voice", "revolt_rocket_okapi", "rocket", "rocket_authifier", @@ -6538,24 +6539,6 @@ dependencies = [ "utoipa", ] -[[package]] -name = "revolt-voice" -version = "0.1.0" -dependencies = [ - "async-std", - "futures", - "livekit-api", - "livekit-protocol", - "redis-kiss", - "revolt-config", - "revolt-database", - "revolt-models", - "revolt-permissions", - "revolt-result", - "serde", - "serde_json", -] - [[package]] name = "revolt-voice-ingress" version = "0.7.1" @@ -6573,7 +6556,6 @@ dependencies = [ "revolt-models", "revolt-permissions", "revolt-result", - "revolt-voice", "rmp-serde", "rocket", "rocket_empty", diff --git a/crates/bonfire/Cargo.toml b/crates/bonfire/Cargo.toml index 64c6507de..86627e3cb 100644 --- a/crates/bonfire/Cargo.toml +++ b/crates/bonfire/Cargo.toml @@ -43,7 +43,6 @@ revolt-config = { path = "../core/config" } revolt-database = { path = "../core/database" } revolt-permissions = { version = "0.8.1", path = "../core/permissions" } revolt-presence = { path = "../core/presence", features = ["redis-is-patched"] } -revolt-voice = { path = "../core/voice" } # redis fred = { version = "8.0.1", features = ["subscriber-client"] } diff --git a/crates/bonfire/src/config.rs b/crates/bonfire/src/config.rs index 5642d3baa..04cfd5ca6 100644 --- a/crates/bonfire/src/config.rs +++ b/crates/bonfire/src/config.rs @@ -93,6 +93,7 @@ impl ProtocolConfiguration { ReadyPayloadFields::Channels, ReadyPayloadFields::Members, ReadyPayloadFields::Emoji, + ReadyPayloadFields::VoiceStates, ] } } diff --git a/crates/bonfire/src/events/impl.rs b/crates/bonfire/src/events/impl.rs index 714d9a8d7..5c974fb65 100644 --- a/crates/bonfire/src/events/impl.rs +++ b/crates/bonfire/src/events/impl.rs @@ -3,13 +3,13 @@ use std::collections::HashSet; use revolt_database::{ events::client::{EventV1, ReadyPayloadFields}, util::permissions::DatabasePermissionQuery, + voice::{delete_voice_state, get_voice_channel_members, get_voice_state, get_channel_voice_state}, Channel, Database, Member, MemberCompositeKey, Presence, RelationshipStatus, }; use revolt_models::v0; use revolt_permissions::{calculate_channel_permissions, ChannelPermission}; use revolt_presence::filter_online; use revolt_result::Result; -use revolt_voice::{delete_voice_state, get_voice_channel_members, get_voice_state}; use super::state::{Cache, State}; @@ -236,7 +236,7 @@ impl State { let mut voice_states = Vec::new(); for channel in &channels { - if let Ok(Some(voice_state)) = self.fetch_voice_state(channel).await { + if let Ok(Some(voice_state)) = get_channel_voice_state(channel).await { voice_states.push(voice_state) } } @@ -464,6 +464,7 @@ impl State { server, channels, emojis: _, + voice_states: _, } => { self.insert_subscription(id.clone()).await; @@ -637,32 +638,4 @@ impl State { true } - - async fn fetch_voice_state( - &self, - channel: &Channel, - ) -> Result> { - let members = get_voice_channel_members(&channel.id()).await?; - - if !members.is_empty() { - let mut participants = Vec::with_capacity(members.len()); - - for user_id in members { - if let Some(voice_state) = get_voice_state(&channel.id(), channel.server().as_deref(), &user_id).await? { - participants.push(voice_state); - } else { - log::info!("Voice state not found but member in voice channel members, removing."); - - delete_voice_state(&channel.id(), channel.server().as_deref(), &user_id).await?; - } - } - - Ok(Some(v0::ChannelVoiceState { - id: channel.id().to_string(), - participants, - })) - } else { - Ok(None) - } - } } diff --git a/crates/bonfire/src/main.rs b/crates/bonfire/src/main.rs index 2cfeac921..3f68e4729 100644 --- a/crates/bonfire/src/main.rs +++ b/crates/bonfire/src/main.rs @@ -3,7 +3,7 @@ use std::{env, sync::Arc}; use async_std::net::TcpListener; use revolt_presence::clear_region; use once_cell::sync::OnceCell; -use revolt_voice::VoiceClient; +use revolt_database::voice::VoiceClient; #[macro_use] extern crate log; diff --git a/crates/core/database/Cargo.toml b/crates/core/database/Cargo.toml index ba3de99ac..329217ffa 100644 --- a/crates/core/database/Cargo.toml +++ b/crates/core/database/Cargo.toml @@ -100,3 +100,8 @@ authifier = { version = "1.0.9", features = ["rocket_impl"] } # RabbitMQ amqprs = { version = "1.7.0" } + +# Voice +livekit-api = "0.4.1" +livekit-protocol = "0.3.6" +livekit-runtime = { version = "0.3.1", features = ["tokio"] } \ No newline at end of file diff --git a/crates/core/database/src/events/client.rs b/crates/core/database/src/events/client.rs index 9262d9996..6262f9da6 100644 --- a/crates/core/database/src/events/client.rs +++ b/crates/core/database/src/events/client.rs @@ -120,6 +120,7 @@ pub enum EventV1 { server: Server, channels: Vec, emojis: Vec, + voice_states: Vec }, /// Update existing server diff --git a/crates/core/database/src/lib.rs b/crates/core/database/src/lib.rs index 9e26336d1..8889f196c 100644 --- a/crates/core/database/src/lib.rs +++ b/crates/core/database/src/lib.rs @@ -108,6 +108,9 @@ pub mod tasks; mod amqp; pub use amqp::amqp::AMQP; +pub mod voice; + + /// Utility function to check if a boolean value is false pub fn if_false(t: &bool) -> bool { !t diff --git a/crates/core/database/src/models/messages/model.rs b/crates/core/database/src/models/messages/model.rs index 725d46d1d..d4991ed42 100644 --- a/crates/core/database/src/models/messages/model.rs +++ b/crates/core/database/src/models/messages/model.rs @@ -106,6 +106,8 @@ auto_derived!( MessagePinned { id: String, by: String }, #[serde(rename = "message_unpinned")] MessageUnpinned { id: String, by: String }, + #[serde(rename = "call_started")] + CallStarted { by: String }, } /// Name and / or avatar override information @@ -658,6 +660,9 @@ impl Message { v0::SystemMessage::MessageUnpinned { by, .. } => { users.push(by.clone()); } + v0::SystemMessage::CallStarted { by } => { + users.push(by.clone()) + } } } users diff --git a/crates/core/database/src/models/server_members/model.rs b/crates/core/database/src/models/server_members/model.rs index 0728c30a8..da33644f3 100644 --- a/crates/core/database/src/models/server_members/model.rs +++ b/crates/core/database/src/models/server_members/model.rs @@ -1,6 +1,7 @@ use iso8601_timestamp::Timestamp; use revolt_permissions::{calculate_channel_permissions, ChannelPermission}; use revolt_result::{create_error, Result}; +use crate::voice::get_channel_voice_state; use crate::{ events::client::EventV1, if_false, util::permissions::DatabasePermissionQuery, Channel, @@ -132,6 +133,14 @@ impl Member { let emojis = db.fetch_emoji_by_parent_id(&server.id).await?; + let mut voice_states = Vec::new(); + + for channel in &channels { + if let Ok(Some(voice_state)) = get_channel_voice_state(channel).await { + voice_states.push(voice_state) + } + } + EventV1::ServerMemberJoin { id: server.id.clone(), user: user.id.clone(), @@ -148,6 +157,7 @@ impl Member { .map(|channel| channel.into()) .collect(), emojis: emojis.into_iter().map(|emoji| emoji.into()).collect(), + voice_states } .private(user.id.clone()) .await; diff --git a/crates/core/database/src/util/bridge/v0.rs b/crates/core/database/src/util/bridge/v0.rs index c04af1554..1ad07a927 100644 --- a/crates/core/database/src/util/bridge/v0.rs +++ b/crates/core/database/src/util/bridge/v0.rs @@ -545,6 +545,7 @@ impl From for SystemMessage { crate::SystemMessage::UserRemove { id, by } => Self::UserRemove { id, by }, crate::SystemMessage::MessagePinned { id, by } => Self::MessagePinned { id, by }, crate::SystemMessage::MessageUnpinned { id, by } => Self::MessageUnpinned { id, by }, + crate::SystemMessage::CallStarted { by } => Self::CallStarted { by } } } } diff --git a/crates/core/voice/src/lib.rs b/crates/core/database/src/voice/mod.rs similarity index 53% rename from crates/core/voice/src/lib.rs rename to crates/core/database/src/voice/mod.rs index c5bfc0f8b..2e59ca335 100644 --- a/crates/core/voice/src/lib.rs +++ b/crates/core/database/src/voice/mod.rs @@ -1,28 +1,25 @@ -use livekit_api::{ - access_token::{AccessToken, VideoGrants}, - services::room::{CreateRoomOptions, RoomClient, UpdateParticipantOptions}, -}; -use livekit_protocol::{ParticipantInfo, ParticipantPermission, Room}; + use redis_kiss::{get_connection, redis::Pipeline, AsyncCommands}; -use revolt_config::config; -use revolt_database::{Channel, User}; +use crate::models::{Channel, User}; use revolt_models::v0::{self, PartialUserVoiceState, UserVoiceState}; use revolt_permissions::{ChannelPermission, PermissionValue}; use revolt_result::{create_error, Result, ToRevoltError}; -use std::{collections::HashMap, time::Duration}; + +mod voice_client; +pub use voice_client::VoiceClient; pub async fn raise_if_in_voice(user: &User, target: &str) -> Result<()> { let mut conn = get_connection().await.to_internal_error()?; if user.bot.is_some() // bots can be in as many voice channels as it wants so we just check if its already connected to the one its trying to connect to - && conn.sismember(format!("vc-{}", &user.id), target) + && conn.sismember(format!("vc:{}", &user.id), target) .await .to_internal_error()? { Err(create_error!(AlreadyConnected)) } else if conn - .scard::<_, u32>(format!("vc-{}", &user.id)) // check if the current vc set is empty + .scard::<_, u32>(format!("vc:{}", &user.id)) // check if the current vc set is empty .await .to_internal_error()? > 0 @@ -39,7 +36,7 @@ pub async fn get_user_voice_channel_in_server( ) -> Result> { let mut conn = get_connection().await.to_internal_error()?; - let unique_key = format!("{}-{}", user_id, server_id); + let unique_key = format!("{}:{}", user_id, server_id); conn.get::<&str, Option>(&unique_key) .await @@ -65,7 +62,7 @@ pub async fn create_voice_state( server_id: Option<&str>, user_id: &str, ) -> Result { - let unique_key = format!("{}-{}", &user_id, server_id.unwrap_or(channel_id)); + let unique_key = format!("{}:{}", &user_id, server_id.unwrap_or(channel_id)); let voice_state = UserVoiceState { id: user_id.to_string(), @@ -76,22 +73,22 @@ pub async fn create_voice_state( }; Pipeline::new() - .sadd(format!("vc-members-{channel_id}"), user_id) - .sadd(format!("vc-{user_id}"), channel_id) + .sadd(format!("vc_members:{channel_id}"), user_id) + .sadd(format!("vc:{user_id}"), channel_id) .set(&unique_key, channel_id) .set( - format!("is_publishing-{unique_key}"), + format!("is_publishing:{unique_key}"), voice_state.is_publishing, ) .set( - format!("is_receiving-{unique_key}"), + format!("is_receiving:{unique_key}"), voice_state.is_receiving, ) .set( - format!("screensharing-{unique_key}"), + format!("screensharing:{unique_key}"), voice_state.screensharing, ) - .set(format!("camera-{unique_key}"), voice_state.camera) + .set(format!("camera:{unique_key}"), voice_state.camera) .query_async(&mut get_connection().await.to_internal_error()?.into_inner()) .await .to_internal_error()?; @@ -104,16 +101,16 @@ pub async fn delete_voice_state( server_id: Option<&str>, user_id: &str, ) -> Result<()> { - let unique_key = format!("{}-{}", &user_id, server_id.unwrap_or(channel_id)); + let unique_key = format!("{}:{}", &user_id, server_id.unwrap_or(channel_id)); Pipeline::new() - .srem(format!("vc-members-{channel_id}"), user_id) - .srem(format!("vc-{user_id}"), channel_id) + .srem(format!("vc_members:{channel_id}"), user_id) + .srem(format!("vc:{user_id}"), channel_id) .del(&[ - format!("is_publishing-{unique_key}"), - format!("is_receiving-{unique_key}"), - format!("screensharing-{unique_key}"), - format!("camera-{unique_key}"), + format!("is_publishing:{unique_key}"), + format!("is_receiving:{unique_key}"), + format!("screensharing:{unique_key}"), + format!("camera:{unique_key}"), unique_key.clone(), ]) .query_async(&mut get_connection().await.to_internal_error()?.into_inner()) @@ -161,24 +158,24 @@ pub async fn update_voice_state( user_id: &str, partial: &PartialUserVoiceState, ) -> Result<()> { - let unique_key = format!("{}-{}", &user_id, server_id.unwrap_or(channel_id)); + let unique_key = format!("{}:{}", &user_id, server_id.unwrap_or(channel_id)); let mut pipeline = Pipeline::new(); if let Some(camera) = &partial.camera { - pipeline.set(format!("camera-{unique_key}"), camera); + pipeline.set(format!("camera:{unique_key}"), camera); }; if let Some(is_publishing) = &partial.is_publishing { - pipeline.set(format!("is_publishing-{unique_key}"), is_publishing); + pipeline.set(format!("is_publishing:{unique_key}"), is_publishing); } if let Some(is_receiving) = &partial.is_receiving { - pipeline.set(format!("is_receiving-{unique_key}"), is_receiving); + pipeline.set(format!("is_receiving:{unique_key}"), is_receiving); } if let Some(screensharing) = &partial.screensharing { - pipeline.set(format!("screensharing-{unique_key}"), screensharing); + pipeline.set(format!("screensharing:{unique_key}"), screensharing); } pipeline @@ -193,7 +190,7 @@ pub async fn get_voice_channel_members(channel_id: &str) -> Result> get_connection() .await .to_internal_error()? - .smembers::<_, Vec>(format!("vc-members-{}", channel_id)) + .smembers::<_, Vec>(format!("vc_members:{}", channel_id)) .await .to_internal_error() } @@ -203,16 +200,16 @@ pub async fn get_voice_state( server_id: Option<&str>, user_id: &str, ) -> Result> { - let unique_key = format!("{}-{user_id}", server_id.unwrap_or(channel_id)); + let unique_key = format!("{}:{user_id}", server_id.unwrap_or(channel_id)); let (is_publishing, is_receiving, screensharing, camera) = get_connection() .await .to_internal_error()? .mget::<_, (Option, Option, Option, Option)>(&[ - format!("is_publishing-{unique_key}"), - format!("is_receiving-{unique_key}"), - format!("screensharing-{unique_key}"), - format!("camera-{unique_key}"), + format!("is_publishing:{unique_key}"), + format!("is_receiving:{unique_key}"), + format!("screensharing:{unique_key}"), + format!("camera:{unique_key}"), ]) .await .to_internal_error()?; @@ -231,92 +228,32 @@ pub async fn get_voice_state( } } -#[derive(Debug)] -pub struct VoiceClient { - rooms: RoomClient, - api_key: String, - api_secret: String, -} +pub async fn get_channel_voice_state(channel: &Channel) -> Result> { + let members = get_voice_channel_members(channel.id()).await?; -impl VoiceClient { - pub fn new(url: String, api_key: String, api_secret: String) -> Self { - Self { - rooms: RoomClient::with_api_key(&url, &api_key, &api_secret), - api_key, - api_secret, - } - } + let server = match channel { + Channel::TextChannel { server, .. } | Channel::VoiceChannel { server, .. } => Some(server.as_str()), + _ => None + }; - pub async fn from_revolt_config() -> Self { - let config = config().await; + if !members.is_empty() { + let mut participants = Vec::with_capacity(members.len()); - Self::new( - config.hosts.livekit, - config.api.livekit.key, - config.api.livekit.secret, - ) - } + for user_id in members { + if let Some(voice_state) = get_voice_state(channel.id(), server, &user_id).await? { + participants.push(voice_state); + } else { + log::info!("Voice state not found but member in voice channel members, removing."); - pub fn create_token( - &self, - user: &User, - permissions: PermissionValue, - channel: &Channel, - ) -> Result { - let allowed_sources = get_allowed_sources(permissions); - - AccessToken::with_api_key(&self.api_key, &self.api_secret) - .with_name(&format!("{}#{}", user.username, user.discriminator)) - .with_identity(&user.id) - .with_metadata(&serde_json::to_string(&user).to_internal_error()?) - .with_ttl(Duration::from_secs(10)) - .with_grants(VideoGrants { - room_join: true, - can_publish_sources: allowed_sources.into_iter().map(ToString::to_string).collect(), - can_subscribe: permissions.has_channel_permission(ChannelPermission::Listen), - room: channel.id().to_string(), - ..Default::default() - }) - .to_jwt() - .to_internal_error() - } - - pub async fn create_room(&self, channel: &Channel) -> Result { - let voice = channel - .voice() - .ok_or_else(|| create_error!(NotAVoiceChannel))?; - - self.rooms - .create_room( - channel.id(), - CreateRoomOptions { - max_participants: voice.max_users.unwrap_or(u32::MAX), - empty_timeout: 5 * 60, // 5 minutes - ..Default::default() - }, - ) - .await - .to_internal_error() - } + delete_voice_state(channel.id(), server, &user_id).await?; + } + } - pub async fn update_permissions( - &self, - user: &User, - channel_id: &str, - new_permissions: ParticipantPermission, - ) -> Result { - self.rooms - .update_participant( - channel_id, - &user.id, - UpdateParticipantOptions { - permission: Some(new_permissions), - attributes: HashMap::new(), - name: "".to_string(), - metadata: "".to_string(), - }, - ) - .await - .to_internal_error() + Ok(Some(v0::ChannelVoiceState { + id: channel.id().to_string(), + participants, + })) + } else { + Ok(None) } } diff --git a/crates/core/database/src/voice/voice_client.rs b/crates/core/database/src/voice/voice_client.rs new file mode 100644 index 000000000..0aa78ec20 --- /dev/null +++ b/crates/core/database/src/voice/voice_client.rs @@ -0,0 +1,108 @@ +use livekit_api::{ + access_token::{AccessToken, VideoGrants}, + services::room::{CreateRoomOptions, RoomClient, UpdateParticipantOptions}, +}; +use livekit_protocol::{ParticipantInfo, ParticipantPermission, Room}; +use revolt_config::config; +use crate::models::{Channel, User}; +use revolt_models::v0; +use revolt_permissions::{ChannelPermission, PermissionValue}; +use revolt_result::{create_error, Result, ToRevoltError}; +use std::{borrow::Cow, collections::HashMap, time::Duration}; + +use super::get_allowed_sources; + +#[derive(Debug)] +pub struct VoiceClient { + rooms: RoomClient, + api_key: String, + api_secret: String, +} + +impl VoiceClient { + pub fn new(url: String, api_key: String, api_secret: String) -> Self { + Self { + rooms: RoomClient::with_api_key(&url, &api_key, &api_secret), + api_key, + api_secret, + } + } + + pub async fn from_revolt_config() -> Self { + let config = config().await; + + Self::new( + config.hosts.livekit, + config.api.livekit.key, + config.api.livekit.secret, + ) + } + + pub fn create_token( + &self, + user: &User, + permissions: PermissionValue, + channel: &Channel, + ) -> Result { + let allowed_sources = get_allowed_sources(permissions); + + AccessToken::with_api_key(&self.api_key, &self.api_secret) + .with_name(&format!("{}#{}", user.username, user.discriminator)) + .with_identity(&user.id) + .with_metadata(&serde_json::to_string(&user).to_internal_error()?) + .with_ttl(Duration::from_secs(10)) + .with_grants(VideoGrants { + room_join: true, + can_publish: true, + can_publish_sources: vec![], // allowed_sources.into_iter().map(ToString::to_string).collect(), + can_subscribe: permissions.has_channel_permission(ChannelPermission::Listen), + room: channel.id().to_string(), + ..Default::default() + }) + .to_jwt() + .to_internal_error() + } + + pub async fn create_room(&self, channel: &Channel) -> Result { + let voice = match channel { + Channel::DirectMessage { .. } | Channel::VoiceChannel { .. } => Some(Cow::Owned(v0::VoiceInformation::default())), + Channel::TextChannel { voice: Some(voice), .. } => Some(Cow::Borrowed(voice)), + _ => None + } + + .ok_or_else(|| create_error!(NotAVoiceChannel))?; + + self.rooms + .create_room( + channel.id(), + CreateRoomOptions { + max_participants: voice.max_users.unwrap_or(u32::MAX), + empty_timeout: 5 * 60, // 5 minutes + ..Default::default() + }, + ) + .await + .to_internal_error() + } + + pub async fn update_permissions( + &self, + user: &User, + channel_id: &str, + new_permissions: ParticipantPermission, + ) -> Result { + self.rooms + .update_participant( + channel_id, + &user.id, + UpdateParticipantOptions { + permission: Some(new_permissions), + attributes: HashMap::new(), + name: "".to_string(), + metadata: "".to_string(), + }, + ) + .await + .to_internal_error() + } +} diff --git a/crates/core/models/src/v0/messages.rs b/crates/core/models/src/v0/messages.rs index b8c53aeed..21541984b 100644 --- a/crates/core/models/src/v0/messages.rs +++ b/crates/core/models/src/v0/messages.rs @@ -134,6 +134,8 @@ auto_derived!( MessagePinned { id: String, by: String }, #[serde(rename = "message_unpinned")] MessageUnpinned { id: String, by: String }, + #[serde(rename = "call_started")] + CallStarted { by: String }, } /// Name and / or avatar override information @@ -438,6 +440,7 @@ impl From for String { } SystemMessage::MessagePinned { .. } => "Message pinned.".to_string(), SystemMessage::MessageUnpinned { .. } => "Message unpinned.".to_string(), + SystemMessage::CallStarted { .. } => "Call started.".to_string(), } } } diff --git a/crates/core/result/src/lib.rs b/crates/core/result/src/lib.rs index 36dda5ac0..8c04b7098 100644 --- a/crates/core/result/src/lib.rs +++ b/crates/core/result/src/lib.rs @@ -218,6 +218,7 @@ pub trait ToRevoltError { } impl ToRevoltError for Result { + #[track_caller] fn to_internal_error(self) -> Result { self .inspect_err(|e| log::error!("{e:?}")) @@ -233,6 +234,7 @@ impl ToRevoltError for Result { } impl ToRevoltError for Option { + #[track_caller] fn to_internal_error(self) -> Result { self.ok_or_else(|| { let loc = Location::caller(); diff --git a/crates/core/voice/Cargo.toml b/crates/core/voice/Cargo.toml deleted file mode 100644 index b43e4bf47..000000000 --- a/crates/core/voice/Cargo.toml +++ /dev/null @@ -1,33 +0,0 @@ -[package] -name = "revolt-voice" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -# voice -livekit-api = "0.4.1" -livekit-protocol = "0.3.6" - -# core -revolt-result = { path = "../result" } -revolt-models = { path = "../models" } -revolt-config = { path = "../config" } -revolt-database = { path = "../database" } -revolt-permissions = { path = "../permissions" } - -# async -futures = "0.3.21" -async-std = { version = "1.8.0", features = [ - "tokio1", - "tokio02", - "attributes", -] } - -# util -redis-kiss = "0.1.4" - -# serde -serde_json = "1.0.79" -serde = "1.0.136" diff --git a/crates/daemons/voice-ingress/Cargo.toml b/crates/daemons/voice-ingress/Cargo.toml index a2aea7aa5..93eee95bd 100644 --- a/crates/daemons/voice-ingress/Cargo.toml +++ b/crates/daemons/voice-ingress/Cargo.toml @@ -37,7 +37,6 @@ revolt-models = { path = "../../core/models" } revolt-config = { path = "../../core/config" } revolt-database = { path = "../../core/database" } revolt-permissions = { path = "../../core/permissions" } -revolt-voice = { path = "../../core/voice" } # voice livekit-api = "0.4.1" diff --git a/crates/daemons/voice-ingress/src/main.rs b/crates/daemons/voice-ingress/src/main.rs index f4c9f8770..91abe3215 100644 --- a/crates/daemons/voice-ingress/src/main.rs +++ b/crates/daemons/voice-ingress/src/main.rs @@ -4,8 +4,8 @@ use std::env; use livekit_protocol::WebhookEvent; use revolt_database::{ events::client::EventV1, util::reference::Reference, Database, DatabaseInfo, + voice::{create_voice_state, delete_voice_state, update_voice_state_tracks, VoiceClient} }; -use revolt_voice::{create_voice_state, delete_voice_state, update_voice_state_tracks, VoiceClient}; use rocket::{build, post, routes, serde::json::Json, Config, State}; use rocket_empty::EmptyResponse; diff --git a/crates/delta/Cargo.toml b/crates/delta/Cargo.toml index 0039f8ff9..5de0a2c03 100644 --- a/crates/delta/Cargo.toml +++ b/crates/delta/Cargo.toml @@ -80,7 +80,6 @@ revolt-models = { path = "../core/models", features = [ revolt-presence = { path = "../core/presence" } revolt-result = { path = "../core/result", features = ["rocket", "okapi"] } revolt-permissions = { path = "../core/permissions", features = ["schemas"] } -revolt-voice = { path = "../core/voice" } # voice livekit-api = "0.4.1" diff --git a/crates/delta/src/main.rs b/crates/delta/src/main.rs index 245d77860..f9feb40b3 100644 --- a/crates/delta/src/main.rs +++ b/crates/delta/src/main.rs @@ -24,7 +24,7 @@ use amqprs::{ use async_std::channel::unbounded; use authifier::AuthifierEvent; use rocket::data::ToByteUnit; -use livekit_api::services::room::RoomClient; +use revolt_database::voice::VoiceClient; pub async fn web() -> Rocket { // Get settings @@ -84,7 +84,7 @@ pub async fn web() -> Rocket { .into(); // Voice handler - let voice_client = revolt_voice::VoiceClient::new(config.api.livekit.url.clone(), config.api.livekit.key.clone(), config.api.livekit.secret.clone()); + let voice_client = VoiceClient::new(config.api.livekit.url.clone(), config.api.livekit.key.clone(), config.api.livekit.secret.clone()); // Configure Rabbit let connection = Connection::open(&OpenConnectionArguments::new( &config.rabbit.host, diff --git a/crates/delta/src/routes/channels/voice_join.rs b/crates/delta/src/routes/channels/voice_join.rs index 460b362da..c2f9f3096 100644 --- a/crates/delta/src/routes/channels/voice_join.rs +++ b/crates/delta/src/routes/channels/voice_join.rs @@ -1,8 +1,7 @@ use revolt_models::v0; -use revolt_database::{util::{permissions::perms, reference::Reference}, Database, User}; +use revolt_database::{util::{permissions::perms, reference::Reference}, voice::{raise_if_in_voice, VoiceClient}, Channel, Database, SystemMessage, User, AMQP}; use revolt_permissions::{calculate_channel_permissions, ChannelPermission}; use revolt_result::Result; -use revolt_voice::{VoiceClient, raise_if_in_voice}; use rocket::{serde::json::Json, State}; @@ -11,10 +10,10 @@ use rocket::{serde::json::Json, State}; /// Asks the voice server for a token to join the call. #[openapi(tag = "Voice")] #[post("//join_call")] -pub async fn call(db: &State, voice: &State, user: User, target: Reference) -> Result> { +pub async fn call(db: &State, amqp: &State, voice: &State, user: User, target: Reference) -> Result> { let channel = target.as_channel(db).await?; - raise_if_in_voice(&user, &channel.id()).await?; + raise_if_in_voice(&user, channel.id()).await?; let mut permissions = perms(db, &user).channel(&channel); @@ -24,7 +23,28 @@ pub async fn call(db: &State, voice: &State, user: User, let token = voice.create_token(&user, current_permissions, &channel)?; let room = voice.create_room(&channel).await?; - log::debug!("created room {}", room.name); + log::debug!("Created room {}", room.name); + + match &channel { + Channel::DirectMessage { .. } | Channel::Group { .. } => { + SystemMessage::CallStarted { + by: user.id.clone() + } + .into_message(channel.id().to_string()) + .send( + db, + Some(amqp), + v0::MessageAuthor::System { + username: &user.username, + avatar: user.avatar.as_ref().map(|file| file.id.as_ref()), + }, + None, + None, + &channel, false + ).await?; + } + _ => {} + }; Ok(Json(v0::CreateVoiceUserResponse { token })) } diff --git a/crates/delta/src/routes/servers/ban_create.rs b/crates/delta/src/routes/servers/ban_create.rs index 09666ac48..309b53f69 100644 --- a/crates/delta/src/routes/servers/ban_create.rs +++ b/crates/delta/src/routes/servers/ban_create.rs @@ -1,5 +1,6 @@ use revolt_database::{ util::{permissions::DatabasePermissionQuery, reference::Reference}, + voice::{delete_voice_state, get_user_voice_channel_in_server, VoiceClient}, Database, RemovalIntention, ServerBan, User, }; use revolt_models::v0; @@ -8,7 +9,6 @@ use revolt_permissions::{ calculate_server_permissions, ChannelPermission, }; use revolt_result::{create_error, Result}; -use revolt_voice::{delete_voice_state, get_user_voice_channel_in_server, VoiceClient}; use rocket::{serde::json::Json, State}; use validator::Validate; diff --git a/crates/delta/src/routes/servers/member_edit.rs b/crates/delta/src/routes/servers/member_edit.rs index 5fb71a14d..e20fa8460 100644 --- a/crates/delta/src/routes/servers/member_edit.rs +++ b/crates/delta/src/routes/servers/member_edit.rs @@ -7,13 +7,13 @@ use redis_kiss::{get_connection, redis::Pipeline, AsyncCommands}; use revolt_database::{ events::client::EventV1, util::{permissions::DatabasePermissionQuery, reference::Reference}, + voice::VoiceClient, Database, File, PartialMember, User, }; use revolt_models::v0::{self, FieldsMember, PartialUserVoiceState}; use revolt_permissions::{calculate_server_permissions, ChannelPermission}; use revolt_result::{create_error, Result, ToRevoltError}; -use revolt_voice::VoiceClient; use rocket::{form::validate::Contains, serde::json::Json, State}; use validator::Validate; diff --git a/crates/delta/src/routes/servers/roles_delete.rs b/crates/delta/src/routes/servers/roles_delete.rs index 3f1e5b509..881ef6cb5 100644 --- a/crates/delta/src/routes/servers/roles_delete.rs +++ b/crates/delta/src/routes/servers/roles_delete.rs @@ -1,11 +1,12 @@ use livekit_protocol::ParticipantPermission; use revolt_database::{ - util::{permissions::DatabasePermissionQuery, reference::Reference}, Channel, Database, User + util::{permissions::DatabasePermissionQuery, reference::Reference}, + voice::{get_allowed_sources, get_voice_channel_members, get_voice_state, update_voice_state, VoiceClient}, + Channel, Database, User }; use revolt_models::v0::PartialUserVoiceState; use revolt_permissions::{calculate_channel_permissions, calculate_server_permissions, ChannelPermission}; use revolt_result::{create_error, Result}; -use revolt_voice::{get_allowed_sources, get_voice_channel_members, get_voice_state, update_voice_state, VoiceClient}; use rocket::State; use rocket_empty::EmptyResponse; diff --git a/crates/delta/src/routes/servers/roles_edit.rs b/crates/delta/src/routes/servers/roles_edit.rs index 46d22314a..7b211d0f2 100644 --- a/crates/delta/src/routes/servers/roles_edit.rs +++ b/crates/delta/src/routes/servers/roles_edit.rs @@ -1,11 +1,12 @@ use livekit_protocol::ParticipantPermission; use revolt_database::{ - util::{permissions::DatabasePermissionQuery, reference::Reference}, Channel, Database, PartialRole, User + util::{permissions::DatabasePermissionQuery, reference::Reference}, + voice::{get_allowed_sources, get_voice_channel_members, get_voice_state, update_voice_state, update_voice_state_tracks, VoiceClient}, + Channel, Database, PartialRole, User }; use revolt_models::v0::{self, PartialUserVoiceState}; use revolt_permissions::{calculate_channel_permissions, calculate_server_permissions, ChannelPermission, PermissionQuery}; use revolt_result::{create_error, Result}; -use revolt_voice::{get_allowed_sources, get_voice_channel_members, get_voice_state, update_voice_state, update_voice_state_tracks, VoiceClient}; use rocket::{serde::json::Json, State}; use serde::{Deserialize, Serialize}; use validator::Validate;