diff --git a/CHANGELOG.md b/CHANGELOG.md index 7cb350a7e..43ca2dd7a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -29,6 +29,7 @@ All user visible changes to this project will be documented in this file. This p - gRPC Control API callbacks ([#63]): - `on_join`; - `on_leave`. + - Configuration of `Member`'s Client API RPC settings ([#95]). - Signalling: - Dynamic `Peer`s creation when client connects ([#28]); - Auto-removing `Peer`s when `Member` disconnects ([#28]); @@ -64,6 +65,7 @@ All user visible changes to this project will be documented in this file. This p [#84]: /../../pull/84 [#86]: /../../pull/86 [#94]: /../../pull/94 +[#95]: /../../pull/95 diff --git a/Cargo.lock b/Cargo.lock index 5f7de1fca..04b3b40bb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -416,9 +416,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.26" +version = "0.1.27" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21a03abb7c9b93ae229356151a083d26218c0358866a2a59d4280c856e9482e6" +checksum = "991d0a1a3e790c835fd54ab41742a59251338d8c7577fe7d7f0170c7072be708" dependencies = [ "proc-macro2", "quote", @@ -559,9 +559,9 @@ checksum = "130aac562c0dd69c56b3b1cc8ffd2e17be31d0b6c25b61c96b76231aa23e39e1" [[package]] name = "bytestring" -version = "0.1.4" +version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fc267467f58ef6cc8874064c62a0423eb0d099362c8a23edd1c6d044f46eead4" +checksum = "fc7c05fa5172da78a62d9949d662d2ac89d4cc7355d7b49adee5163f1fb3f363" dependencies = [ "bytes", ] @@ -848,9 +848,9 @@ dependencies = [ [[package]] name = "derive_more" -version = "0.99.3" +version = "0.99.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a806e96c59a76a5ba6e18735b6cf833344671e61e7863f2edb5c518ea2cac95c" +checksum = "e2323f3f47db9a0e77ce7a300605d8d2098597fc451ed1a97bb1f6411bb550a7" dependencies = [ "proc-macro2", "quote", @@ -1173,9 +1173,9 @@ dependencies = [ [[package]] name = "hermit-abi" -version = "0.1.8" +version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1010591b26bbfe835e9faeabeb11866061cc7dcebffd56ad7d0942d0e61aefd8" +checksum = "725cf19794cf90aa94e65050cb4191ff5d8fa87a498383774c47b332e3af952e" dependencies = [ "libc", ] @@ -1503,6 +1503,7 @@ dependencies = [ "actix-web", "clap", "dotenv", + "humantime-serde", "medea-control-api-proto", "protobuf", "serde 1.0.105", @@ -1520,6 +1521,7 @@ name = "medea-control-api-proto" version = "0.1.0-dev" dependencies = [ "prost", + "prost-types", "tonic", "tonic-build", ] @@ -1691,9 +1693,9 @@ dependencies = [ [[package]] name = "multimap" -version = "0.8.0" +version = "0.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a97fbd5d00e0e37bfb10f433af8f5aaf631e739368dc9fc28286ca81ca4948dc" +checksum = "d8883adfde9756c1d30b0f519c9b8c502a94b41ac62f696453c37c7fc0a958ce" [[package]] name = "net2" @@ -2295,9 +2297,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.48" +version = "1.0.50" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9371ade75d4c2d6cb154141b9752cf3781ec9c05e0e5cf35060e1e70ee7b9c25" +checksum = "78a7a12c167809363ec3bd7329fc0a3369056996de43c4b37ef3cd54a6ce4867" dependencies = [ "itoa", "ryu", diff --git a/_dev/specs/pub-pub-video-call.yml b/_dev/specs/pub-pub-video-call.yml index 4793c1822..63da76706 100644 --- a/_dev/specs/pub-pub-video-call.yml +++ b/_dev/specs/pub-pub-video-call.yml @@ -8,6 +8,9 @@ spec: credentials: test on_join: "grpc://127.0.0.1:9099" on_leave: "grpc://127.0.0.1:9099" + idle_timeout: 1m + reconnect_timeout: 1m + ping_interval: 10s spec: pipeline: # Media element which is able to receive media data from client @@ -28,6 +31,9 @@ spec: credentials: test on_join: "grpc://127.0.0.1:9099" on_leave: "grpc://127.0.0.1:9099" + idle_timeout: 1m + reconnect_timeout: 1m + ping_interval: 10s spec: pipeline: publish: diff --git a/config.toml b/config.toml index 1f9ee963a..eee3111f8 100644 --- a/config.toml +++ b/config.toml @@ -49,6 +49,9 @@ # Duration, after which remote RPC client will be considered idle if no # heartbeat messages received. # +# It applies to all related pipelines as default value, but can be overridden +# for each specific case via Control API. +# # Env var: MEDEA_RPC__IDLE_TIMEOUT # Default: # idle_timeout = "10s" @@ -56,12 +59,18 @@ # Duration, after which the server deletes client session if remote RPC client # does not reconnect after it is idle. # +# It applies to all related pipelines as default value, but can be overridden +# for each specific case via Control API. +# # Env var: MEDEA_RPC__RECONNECT_TIMEOUT # Default: # reconnect_timeout = "10s" # Interval between pings that server sends to clients. # +# It applies to all related pipelines as default value, but can be overridden +# for each specific case via Control API. +# # Env var: MEDEA_RPC__PING_INTERVAL # Default: # ping_interval = "3s" diff --git a/docs/rfc/0001-control-api.md b/docs/rfc/0001-control-api.md index 86d69c47d..a5a66ceaf 100644 --- a/docs/rfc/0001-control-api.md +++ b/docs/rfc/0001-control-api.md @@ -59,6 +59,14 @@ spec: on_join: "grpc://127.0.0.1:9091" # Fires when "caller" client disconnects from media server via WebSocket. on_leave: "grpc://127.0.0.1:9091" + # Timeout of receiving heartbeat messages from the member via WebSocket. + # Once reached, the member is considered being idle. + idle_timeout: 1m + # Timeout of the member reconnecting via WebSocket. + # Once reached, the member is considered disconnected. + reconnect_timeout: 3m + # Interval of sending pings from media server to the member via WebSocket. + ping_interval: 10s pipeline: # Media element which is able to receive media data from client via WebRTC. publish: @@ -780,7 +788,11 @@ message Room { message Member { optional string on_join = 1; optional string on_leave = 2; - map pipeline = 3; + optional string credentials = 3; + optional uint64 idle_timeout = 4; + optional uint64 reconnect_timeout = 5; + optional uint64 ping_interval = 6; + map pipeline = 7; message Element { oneof el { diff --git a/jason/e2e-demo/js/index.js b/jason/e2e-demo/js/index.js index 0bb3006dc..bf4ba3cc9 100644 --- a/jason/e2e-demo/js/index.js +++ b/jason/e2e-demo/js/index.js @@ -194,7 +194,25 @@ const controlDebugWindows = { let memberId = container.getElementsByClassName('control-debug__id_member')[0].value; let credentials = container.getElementsByClassName('member-spec__credentials')[0].value; - await controlApi.createMember(roomId, memberId, credentials); + let idleTimeout = container.getElementsByClassName('member-spec__idle-timeout')[0].value; + let reconnectTimeout = container.getElementsByClassName('member-spec__reconnect-timeout')[0].value; + let pingInterval = container.getElementsByClassName('member-spec__ping-interval')[0].value; + + let spec = {}; + if (credentials.length > 0) { + spec.credentials = credentials; + } + if (idleTimeout.length > 0) { + spec.idle_timeout = idleTimeout; + } + if (reconnectTimeout.length > 0) { + spec.reconnect_timeout = reconnectTimeout; + } + if (pingInterval.length > 0) { + spec.ping_interval = pingInterval; + } + + await controlApi.createMember(roomId, memberId, spec); }); }, @@ -546,16 +564,15 @@ const controlApi = { } }, - createMember: async function(roomId, memberId, credentials) { + createMember: async function(roomId, memberId, spec) { + spec.kind = 'Member'; + spec.pipeline = {}; + try { await axios({ method: 'post', url: controlUrl + roomId + '/' + memberId, - data: { - kind: 'Member', - credentials: credentials, - pipeline: {} - } + data: spec }); } catch (e) { alert(JSON.stringify(e.response.data)); diff --git a/jason/e2e-demo/video-call.html b/jason/e2e-demo/video-call.html index 74d36ff2c..27cf0bd1d 100644 --- a/jason/e2e-demo/video-call.html +++ b/jason/e2e-demo/video-call.html @@ -330,6 +330,11 @@ +
+ + + + diff --git a/mock/control-api/Cargo.toml b/mock/control-api/Cargo.toml index e145fb150..cc2eb02b8 100644 --- a/mock/control-api/Cargo.toml +++ b/mock/control-api/Cargo.toml @@ -11,12 +11,12 @@ publish = false [dependencies] actix = "0.9" -actix-rt = "1.0" actix-cors = "0.2" +actix-rt = "1.0" actix-web = "2.0" clap = "2.33" dotenv = "0.15" -tonic = "0.1" +humantime-serde = "1.0" medea-control-api-proto = { path = "../../proto/control-api" } protobuf = "2.11" serde = { version = "1.0", features = ["derive"] } @@ -26,3 +26,4 @@ slog-envlogger = "2.2" slog-scope = "4.3" slog-stdlog = "4.0" slog-term = "2.5" +tonic = "0.1" diff --git a/mock/control-api/src/api/member.rs b/mock/control-api/src/api/member.rs index b9df010b6..d69d6084c 100644 --- a/mock/control-api/src/api/member.rs +++ b/mock/control-api/src/api/member.rs @@ -1,6 +1,6 @@ //! `Member` element related methods and entities. -use std::collections::HashMap; +use std::{collections::HashMap, convert::TryInto as _, time::Duration}; use medea_control_api_proto::grpc::api as proto; use serde::{Deserialize, Serialize}; @@ -33,6 +33,20 @@ pub struct Member { /// URL to which `OnLeave` Control API callback will be sent. #[serde(skip_serializing_if = "Option::is_none")] on_leave: Option, + + /// Timeout of receiving heartbeat messages from the `Member` via Client + /// API. Once reached, the `Member` is considered being idle. + #[serde(default, with = "humantime_serde")] + idle_timeout: Option, + + /// Timeout of the `Member` reconnecting via Client API. + /// Once reached, the `Member` is considered disconnected. + #[serde(default, with = "humantime_serde")] + reconnect_timeout: Option, + + /// Interval of sending pings from Medea to the `Member` via Client API. + #[serde(default, with = "humantime_serde")] + ping_interval: Option, } impl Member { @@ -51,6 +65,9 @@ impl Member { credentials: self.credentials.unwrap_or_default(), on_join: self.on_join.unwrap_or_default(), on_leave: self.on_leave.unwrap_or_default(), + idle_timeout: self.idle_timeout.map(Into::into), + reconnect_timeout: self.reconnect_timeout.map(Into::into), + ping_interval: self.ping_interval.map(Into::into), } } @@ -77,6 +94,13 @@ impl From for Member { credentials: Some(proto.credentials), on_join: Some(proto.on_join).filter(|s| !s.is_empty()), on_leave: Some(proto.on_leave).filter(|s| !s.is_empty()), + idle_timeout: proto.idle_timeout.map(|dur| dur.try_into().unwrap()), + reconnect_timeout: proto + .reconnect_timeout + .map(|dur| dur.try_into().unwrap()), + ping_interval: proto + .ping_interval + .map(|dur| dur.try_into().unwrap()), } } } diff --git a/proto/control-api/Cargo.toml b/proto/control-api/Cargo.toml index 20ea026b5..52846e442 100644 --- a/proto/control-api/Cargo.toml +++ b/proto/control-api/Cargo.toml @@ -14,11 +14,12 @@ categories = ["api-bindings", "network-programming"] [features] default = ["grpc"] -grpc = ["prost", "tonic-build", "tonic"] +grpc = ["prost", "prost-types", "tonic", "tonic-build"] [dependencies] prost = { version = "0.6", optional = true } +prost-types = { version = "0.6", optional = true } tonic = { version = "0.1", optional = true } [build-dependencies] -tonic-build = { version = "0.1", optional = true} +tonic-build = { version = "0.1", optional = true } diff --git a/proto/control-api/build.rs b/proto/control-api/build.rs index 53f722bb4..fb651c264 100644 --- a/proto/control-api/build.rs +++ b/proto/control-api/build.rs @@ -45,6 +45,21 @@ mod grpc { .build_client(true) .build_server(true) .compile(&grpc_spec_files, &[GRPC_DIR.to_string()])?; + + // Remove empty `google.protobuf.rs` file generated by + // `prost-build`. This file doesn't affect to any + // functionality. It's generated only because of + // `prost-build` bug. + // + // Read instrumentisto/medea#95, hyperium/tonic#314 and + // danburkert/prost#228 for more info. + fs::remove_file(format!("{}/google.protobuf.rs", GRPC_DIR)) + .expect( + "`google.protobuf.rs` file isn't generated. This \ + is good news, because hyperium/tonic#314 issue \ + was probably really fixed. Check it and if so, \ + then just remove this line of code.", + ); break; } else { panic!("{}", e); @@ -97,9 +112,6 @@ mod grpc { self.0 .iter() .map(|filename| format!("{}/{}.rs", GRPC_DIR, filename)) - .chain(self.0.iter().map(|filename| { - format!("{}/{}_grpc.rs", GRPC_DIR, filename) - })) .collect() } } diff --git a/proto/control-api/src/grpc/api.proto b/proto/control-api/src/grpc/api.proto index 3703b47e6..5c1faaa62 100644 --- a/proto/control-api/src/grpc/api.proto +++ b/proto/control-api/src/grpc/api.proto @@ -4,6 +4,8 @@ syntax = "proto3"; package api; +import "google/protobuf/duration.proto"; + // Media server's Control API service. service ControlApi { // Creates new Element with a given ID. @@ -140,8 +142,16 @@ message Member { string on_leave = 3; // Credentials of the Member to authorize via Client API with. string credentials = 4; + // Timeout of receiving heartbeat messages from the Member via Client API. + // Once reached, the Member is considered being idle. + google.protobuf.Duration idle_timeout = 5; + // Timeout of the Member reconnecting via Client API. + // Once reached, the Member is considered disconnected. + google.protobuf.Duration reconnect_timeout = 6; + // Interval of sending pings from a media server to the Member via Client API. + google.protobuf.Duration ping_interval = 7; // Pipeline of this Member. - map pipeline = 5; + map pipeline = 8; // Elements which Member's pipeline can contain. message Element { diff --git a/proto/control-api/src/grpc/api.rs b/proto/control-api/src/grpc/api.rs index c0f98373b..b7ccb5322 100644 --- a/proto/control-api/src/grpc/api.rs +++ b/proto/control-api/src/grpc/api.rs @@ -162,8 +162,19 @@ pub struct Member { /// Credentials of the Member to authorize via Client API with. #[prost(string, tag="4")] pub credentials: std::string::String, + /// Timeout of receiving heartbeat messages from the Member via Client API. + /// Once reached, the Member is considered being idle. + #[prost(message, optional, tag="5")] + pub idle_timeout: ::std::option::Option<::prost_types::Duration>, + /// Timeout of the Member reconnecting via Client API. + /// Once reached, the Member is considered disconnected. + #[prost(message, optional, tag="6")] + pub reconnect_timeout: ::std::option::Option<::prost_types::Duration>, + /// Interval of sending pings from a media server to the Member via Client API. + #[prost(message, optional, tag="7")] + pub ping_interval: ::std::option::Option<::prost_types::Duration>, /// Pipeline of this Member. - #[prost(map="string, message", tag="5")] + #[prost(map="string, message", tag="8")] pub pipeline: ::std::collections::HashMap, } pub mod member { diff --git a/src/api/client/rpc_connection.rs b/src/api/client/rpc_connection.rs index a3ca3ea0b..54bb51b89 100644 --- a/src/api/client/rpc_connection.rs +++ b/src/api/client/rpc_connection.rs @@ -2,7 +2,7 @@ //! //! [`RpcConnection`]: crate::api::client::rpc_connection::RpcConnection -use std::fmt; +use std::{fmt, time::Duration}; use actix::Message; use derive_more::{From, Into}; @@ -60,14 +60,28 @@ pub trait RpcConnection: fmt::Debug + Send { -> LocalBoxFuture<'static, Result<(), ()>>; } +/// Settings of [`WsSession`]. +#[derive(Clone, Copy, Debug)] +pub struct RpcConnectionSettings { + /// [`Duration`], after which [`WsSession`] will be considered idle if no + /// heartbeat messages were received. + pub idle_timeout: Duration, + + /// Interval of sending `Ping`s to remote [`Member`]. + /// + /// [`Member`]: crate::signalling::elements::member::Member + pub ping_interval: Duration, +} + /// Signal for authorizing new [`RpcConnection`] before establishing. #[derive(Debug, Message)] -#[rtype(result = "Result<(), AuthorizationError>")] +#[rtype(result = "Result")] pub struct Authorize { /// ID of [`Member`] to authorize [`RpcConnection`] for. /// /// [`Member`]: crate::signalling::elements::member::Member pub member_id: MemberId, + /// Credentials to authorize [`RpcConnection`] with. pub credentials: String, // TODO: &str when futures will allow references } @@ -82,6 +96,7 @@ pub enum AuthorizationError { /// [`Member`]: crate::signalling::elements::member::Member /// [`Room`]: crate::signalling::Room MemberNotExists, + /// Provided credentials are invalid. InvalidCredentials, } @@ -97,6 +112,7 @@ pub struct RpcConnectionEstablished { /// /// [`Member`]: crate::signalling::elements::member::Member pub member_id: MemberId, + /// Established [`RpcConnection`]. pub connection: Box, } @@ -110,6 +126,7 @@ pub struct RpcConnectionClosed { /// /// [`Member`]: crate::signalling::elements::member::Member pub member_id: MemberId, + /// Reason of why [`RpcConnection`] is closed. pub reason: ClosedReason, } @@ -130,6 +147,7 @@ pub enum ClosedReason { /// [`Away`]: actix_http::ws::CloseCode::Away normal: bool, }, + /// [`RpcConnection`] was lost, but may be reestablished. Lost, } diff --git a/src/api/client/server.rs b/src/api/client/server.rs index ab2cd776b..db32abbb0 100644 --- a/src/api/client/server.rs +++ b/src/api/client/server.rs @@ -65,13 +65,13 @@ async fn ws_index( }) .await?; match auth_result { - Ok(_) => ws::start( + Ok(settings) => ws::start( WsSession::new( member_id, room_id, Box::new(room), - state.config.idle_timeout, - state.config.ping_interval, + settings.idle_timeout, + settings.ping_interval, ), &request, payload, @@ -113,7 +113,7 @@ impl Server { let server = HttpServer::new(move || { App::new() - .app_data(Self::app_data(rooms.clone(), config.rpc.clone())) + .app_data(Self::app_data(rooms.clone(), config.rpc)) .configure(Self::configure) .wrap(middleware::Logger::default()) }) @@ -195,7 +195,7 @@ mod test { App::new() .app_data(Server::app_data( build_room_repo(conf.clone()), - conf.rpc.clone(), + conf.rpc, )) .configure(Server::configure) }) diff --git a/src/api/control/error_codes.rs b/src/api/control/error_codes.rs index f7d215325..f3947f1e5 100644 --- a/src/api/control/error_codes.rs +++ b/src/api/control/error_codes.rs @@ -278,6 +278,12 @@ pub enum ErrorCode { #[display(fmt = "Invalid callback URL.")] InvalidCallbackUrl = 1022, + /// Encountered negative duration. + /// + /// Code: __1023__. + #[display(fmt = "Encountered negative duration")] + NegativeDuration = 1023, + /// Unexpected server error. /// /// Use this [`ErrorCode`] only with [`ErrorResponse::unexpected`] @@ -337,6 +343,14 @@ impl From for ErrorResponse { String::from("No element was provided"), Some(id), ), + NegativeDuration(id, field) => Self::with_explanation( + ErrorCode::NegativeDuration, + format!( + "Element [id = {}] contains negative duration field `{}`", + id, field + ), + Some(id), + ), } } } diff --git a/src/api/control/member.rs b/src/api/control/member.rs index 94daa9e5c..cdef215f1 100644 --- a/src/api/control/member.rs +++ b/src/api/control/member.rs @@ -2,7 +2,11 @@ //! //! [Control API]: https://tinyurl.com/yxsqplq7 -use std::{collections::HashMap, convert::TryFrom}; +use std::{ + collections::HashMap, + convert::{TryFrom, TryInto}, + time::Duration, +}; use derive_more::{Display, From}; use medea_control_api_proto::grpc::api as proto; @@ -61,6 +65,20 @@ pub struct MemberSpec { /// URL to which `OnLeave` Control API callback will be sent. on_leave: Option, + + /// Timeout of receiving heartbeat messages from the `Member` via Client + /// API. + /// + /// Once reached, the `Member` is considered being idle. + idle_timeout: Option, + + /// Timeout of the `Member` reconnecting via Client API. + /// + /// Once reached, the `Member` is considered disconnected. + reconnect_timeout: Option, + + /// Interval of sending `Ping`s to the `Member` via Client API. + ping_interval: Option, } impl Into for MemberSpec { @@ -70,6 +88,9 @@ impl Into for MemberSpec { credentials: self.credentials, on_join: self.on_join, on_leave: self.on_leave, + idle_timeout: self.idle_timeout, + reconnect_timeout: self.reconnect_timeout, + ping_interval: self.ping_interval, } } } @@ -82,12 +103,18 @@ impl MemberSpec { credentials: String, on_join: Option, on_leave: Option, + idle_timeout: Option, + reconnect_timeout: Option, + ping_interval: Option, ) -> Self { Self { pipeline, credentials, on_join, on_leave, + idle_timeout, + reconnect_timeout, + ping_interval, } } @@ -142,6 +169,26 @@ impl MemberSpec { pub fn on_leave(&self) -> &Option { &self.on_leave } + + /// Returns timeout of receiving heartbeat messages from the `Member` via + /// Client API. + /// + /// Once reached, the `Member` is considered being idle. + pub fn idle_timeout(&self) -> Option { + self.idle_timeout + } + + /// Returns timeout of the `Member` reconnecting via Client API. + /// + /// Once reached, the `Member` is considered disconnected. + pub fn reconnect_timeout(&self) -> Option { + self.reconnect_timeout + } + + /// Returns interval of sending `Ping`s to the `Member` via Client API. + pub fn ping_interval(&self) -> Option { + self.ping_interval + } } /// Generates alphanumeric credentials for [`Member`] with @@ -163,6 +210,16 @@ impl TryFrom for MemberSpec { type Error = TryFromProtobufError; fn try_from(member: proto::Member) -> Result { + fn parse_duration>( + duration: Option, + member_id: &str, + field: &'static str, + ) -> Result, TryFromProtobufError> { + duration.map(TryInto::try_into).transpose().map_err(|_| { + TryFromProtobufError::NegativeDuration(member_id.into(), field) + }) + } + let mut pipeline = HashMap::new(); for (id, member_element) in member.pipeline { if let Some(elem) = member_element.el { @@ -196,11 +253,24 @@ impl TryFrom for MemberSpec { } }; + let idle_timeout = + parse_duration(member.idle_timeout, &member.id, "idle_timeout")?; + let reconnect_timeout = parse_duration( + member.reconnect_timeout, + &member.id, + "reconnect_timeout", + )?; + let ping_interval = + parse_duration(member.ping_interval, &member.id, "ping_interval")?; + Ok(Self { pipeline: Pipeline::new(pipeline), credentials, on_join, on_leave, + idle_timeout, + reconnect_timeout, + ping_interval, }) } } @@ -241,11 +311,17 @@ impl TryFrom<&RoomElement> for MemberSpec { credentials, on_leave, on_join, + idle_timeout, + reconnect_timeout, + ping_interval, } => Ok(Self { pipeline: spec.clone(), credentials: credentials.clone(), on_leave: on_leave.clone(), on_join: on_join.clone(), + idle_timeout: *idle_timeout, + reconnect_timeout: *reconnect_timeout, + ping_interval: *ping_interval, }), _ => Err(TryFromElementError::NotMember), } diff --git a/src/api/control/mod.rs b/src/api/control/mod.rs index 91fd5a6ac..e648b35ee 100644 --- a/src/api/control/mod.rs +++ b/src/api/control/mod.rs @@ -65,14 +65,26 @@ pub enum TryFromProtobufError { #[display(fmt = "Expected element of type [{}]. Id [{}]", _0, _1)] ExpectedOtherElement(String, String), + /// Element is `None`, but expected `Some`. #[display(fmt = "Element is None, expected Some. Id [{}]", _0)] EmptyElement(String), + /// Provided `Endpoint` is unimplemented. #[display(fmt = "Endpoint is unimplemented. Id [{}]", _0)] UnimplementedEndpoint(String), + /// Error while [`CallbackUrl`] parsing. #[display(fmt = "Error while parsing callback URL. {:?}", _0)] CallbackUrlParseErr(CallbackUrlParseError), + + /// Some element from a spec contains negative [`Duration`], but it's not + /// supported. + #[display( + fmt = "Element [id = {}] contains negative duration field `{}`", + _0, + _1 + )] + NegativeDuration(String, &'static str), } impl From for TryFromProtobufError { diff --git a/src/api/control/room.rs b/src/api/control/room.rs index 901989a2f..ae64a1146 100644 --- a/src/api/control/room.rs +++ b/src/api/control/room.rs @@ -2,7 +2,7 @@ //! //! [Control API]: https://tinyurl.com/yxsqplq7 -use std::{collections::HashMap, convert::TryFrom}; +use std::{collections::HashMap, convert::TryFrom, time::Duration}; use derive_more::{Display, From}; use medea_control_api_proto::grpc::api as proto; @@ -38,6 +38,12 @@ pub enum RoomElement { credentials: String, on_leave: Option, on_join: Option, + #[serde(default, with = "humantime_serde")] + idle_timeout: Option, + #[serde(default, with = "humantime_serde")] + reconnect_timeout: Option, + #[serde(default, with = "humantime_serde")] + ping_interval: Option, }, } diff --git a/src/conf/rpc.rs b/src/conf/rpc.rs index 0ee4c787d..f3caa7ca9 100644 --- a/src/conf/rpc.rs +++ b/src/conf/rpc.rs @@ -6,22 +6,37 @@ use serde::{Deserialize, Serialize}; use smart_default::SmartDefault; /// RPC connection settings. -#[derive(Clone, Debug, Deserialize, Serialize, SmartDefault)] +#[derive(Clone, Copy, Debug, Deserialize, Serialize, SmartDefault)] #[serde(default)] pub struct Rpc { - /// Duration, after which remote RPC client will be considered idle if no - /// heartbeat messages received. Defaults to `10s`. + /// Duration, after which remote RPC client will be considered idle + /// if no heartbeat messages received. + /// + /// It applies to all related pipelines as default value, but can be + /// overridden for each specific case via Control API. + /// + /// Defaults to `10s`. #[default(Duration::from_secs(10))] #[serde(with = "humantime_serde")] pub idle_timeout: Duration, /// Duration, after which the server deletes the client session if /// the remote RPC client does not reconnect after it is idle. + /// + /// It applies to all related pipelines as default value, but can be + /// overridden for each specific case via Control API. + /// + /// Defaults to `10s`. #[default(Duration::from_secs(10))] #[serde(with = "humantime_serde")] pub reconnect_timeout: Duration, /// Interval of sending `Ping`s from the server to the client. + /// + /// It applies to all related pipelines as default value, but can be + /// overridden for each specific case via Control API. + /// + /// Defaults to `3s`. #[default(Duration::from_secs(3))] #[serde(with = "humantime_serde")] pub ping_interval: Duration, diff --git a/src/signalling/elements/member.rs b/src/signalling/elements/member.rs index 743402d13..fbe5c6676 100644 --- a/src/signalling/elements/member.rs +++ b/src/signalling/elements/member.rs @@ -7,6 +7,7 @@ use std::{ collections::HashMap, convert::TryFrom as _, rc::{Rc, Weak}, + time::Duration, }; use derive_more::Display; @@ -22,6 +23,7 @@ use crate::{ EndpointId, MemberId, MemberSpec, RoomId, RoomSpec, TryFromElementError, WebRtcPlayId, WebRtcPublishId, }, + conf::Rpc as RpcConf, log::prelude::*, }; @@ -86,6 +88,20 @@ struct MemberInner { /// URL to which `on_leave` Control API callback will be sent. on_leave: Option, + + /// Timeout of receiving heartbeat messages from the [`Member`] via Client + /// API. + /// + /// Once reached, the [`Member`] is considered being idle. + idle_timeout: Duration, + + /// Timeout of the [`Member`] reconnecting via Client API. + /// + /// Once reached, the [`Member`] is considered disconnected. + reconnect_timeout: Duration, + + /// Interval of sending heartbeat `Ping`s to the [`Member`] via Client API. + ping_interval: Duration, } impl Member { @@ -93,7 +109,14 @@ impl Member { /// /// To fill this [`Member`], you need to call [`Member::load`] /// function. - pub fn new(id: MemberId, credentials: String, room_id: RoomId) -> Self { + pub fn new( + id: MemberId, + credentials: String, + room_id: RoomId, + idle_timeout: Duration, + reconnect_timeout: Duration, + ping_interval: Duration, + ) -> Self { Self(Rc::new(RefCell::new(MemberInner { id, srcs: HashMap::new(), @@ -102,6 +125,9 @@ impl Member { room_id, on_leave: None, on_join: None, + idle_timeout, + reconnect_timeout, + ping_interval, }))) } @@ -412,6 +438,27 @@ impl Member { self.0.borrow().on_leave.clone() } + /// Returns timeout of receiving heartbeat messages from the [`Member`] via + /// Client API. + /// + /// Once reached, the [`Member`] is considered being idle. + pub fn get_idle_timeout(&self) -> Duration { + self.0.borrow().idle_timeout + } + + /// Returns timeout of the [`Member`] reconnecting via Client API. + /// + /// Once reached, the [`Member`] is considered disconnected. + pub fn get_reconnect_timeout(&self) -> Duration { + self.0.borrow().reconnect_timeout + } + + /// Returns interval of sending heartbeat `Ping`s to the [`Member`] via + /// Client API. + pub fn get_ping_interval(&self) -> Duration { + self.0.borrow().ping_interval + } + /// Sets all [`CallbackUrl`]'s from [`MemberSpec`]. pub fn set_callback_urls(&self, spec: &MemberSpec) { self.0.borrow_mut().on_leave = spec.on_leave().clone(); @@ -450,6 +497,7 @@ impl WeakMember { /// Errors with [`MembersLoadError`] if loading [`Member`] fails. pub fn parse_members( room_spec: &RoomSpec, + rpc_conf: RpcConf, ) -> Result, MembersLoadError> { let members_spec = room_spec.members().map_err(|e| { MembersLoadError::TryFromError( @@ -465,6 +513,11 @@ pub fn parse_members( id.clone(), member.credentials().to_string(), room_spec.id.clone(), + member.idle_timeout().unwrap_or(rpc_conf.idle_timeout), + member + .reconnect_timeout() + .unwrap_or(rpc_conf.reconnect_timeout), + member.ping_interval().unwrap_or(rpc_conf.ping_interval), ); (id.clone(), new_member) }) @@ -522,6 +575,9 @@ impl Into for Member { .get_on_join() .map(|c| c.to_string()) .unwrap_or_default(), + reconnect_timeout: Some(self.get_reconnect_timeout().into()), + idle_timeout: Some(self.get_idle_timeout().into()), + ping_interval: Some(self.get_ping_interval().into()), pipeline: member_pipeline, } } @@ -596,7 +652,7 @@ mod tests { let room_element: RootElement = serde_yaml::from_str(TEST_SPEC).unwrap(); let room_spec = RoomSpec::try_from(&room_element).unwrap(); - parse_members(&room_spec).unwrap() + parse_members(&room_spec, RpcConf::default()).unwrap() } #[test] diff --git a/src/signalling/participants.rs b/src/signalling/participants.rs index c1e173bfa..9fa48f477 100644 --- a/src/signalling/participants.rs +++ b/src/signalling/participants.rs @@ -7,10 +7,7 @@ //! [`RpcConnection`]: crate::api::client::rpc_connection::RpcConnection //! [`ParticipantService`]: crate::signalling::participants::ParticipantService -use std::{ - collections::HashMap, - time::{Duration, Instant}, -}; +use std::{collections::HashMap, time::Instant}; use actix::{ fut::wrap_future, AsyncContext, Context, ContextFutureSpawner as _, @@ -31,13 +28,16 @@ use crate::{ }, control::{ refs::{Fid, ToEndpoint, ToMember}, - MemberId, RoomId, RoomSpec, + MemberId, MemberSpec, RoomId, RoomSpec, }, }, + conf::Rpc as RpcConf, log::prelude::*, signalling::{ elements::{ - member::MemberError, parse_members, Member, MembersLoadError, + endpoints::webrtc::{WebRtcPlayEndpoint, WebRtcPublishEndpoint}, + member::MemberError, + parse_members, Member, MembersLoadError, }, room::{ActFuture, RoomError}, Room, @@ -90,9 +90,11 @@ pub struct ParticipantService { /// before dropping it irrevocably in case it gets reestablished. drop_connection_tasks: HashMap, - /// Duration, after which the server deletes the client session if - /// the remote RPC client does not reconnect after it is idle. - rpc_reconnect_timeout: Duration, + /// Default values for the RPC connection settings. + /// + /// If nothing provided into [`Member`] element spec then this values will + /// be used. + rpc_conf: RpcConf, } impl ParticipantService { @@ -107,10 +109,10 @@ impl ParticipantService { ) -> Result { Ok(Self { room_id: room_spec.id().clone(), - members: parse_members(room_spec)?, + members: parse_members(room_spec, context.config.rpc)?, connections: HashMap::new(), drop_connection_tasks: HashMap::new(), - rpc_reconnect_timeout: context.config.rpc.reconnect_timeout, + rpc_conf: context.config.rpc, }) } @@ -266,19 +268,26 @@ impl ParticipantService { // now. } ClosedReason::Lost => { - self.drop_connection_tasks.insert( - member_id.clone(), - ctx.run_later(self.rpc_reconnect_timeout, move |_, ctx| { - info!( - "Member [id = {}] connection lost at {:?}.", - member_id, closed_at, - ); - ctx.notify(RpcConnectionClosed { - member_id, - reason: ClosedReason::Closed { normal: false }, - }) - }), - ); + if let Some(member) = self.get_member_by_id(&member_id) { + self.drop_connection_tasks.insert( + member_id.clone(), + ctx.run_later( + member.get_reconnect_timeout(), + move |_, ctx| { + info!( + "Member [id = {}] connection lost at {:?}.", + member_id, closed_at, + ); + ctx.notify(RpcConnectionClosed { + member_id, + reason: ClosedReason::Closed { + normal: false, + }, + }) + }, + ), + ); + } } } } @@ -345,4 +354,170 @@ impl ParticipantService { pub fn iter_members(&self) -> impl Iterator { self.members.iter() } + + /// Creates new [`Member`] in this [`ParticipantService`]. + /// + /// This function will check that new [`Member`]'s ID is not present in + /// [`ParticipantService`]. + /// + /// # Errors + /// + /// Errors with [`RoomError::MemberAlreadyExists`] if [`Member`] with + /// provided [`MemberId`] already exists in [`ParticipantService`]. + pub fn create_member( + &mut self, + id: MemberId, + spec: &MemberSpec, + ) -> Result<(), RoomError> { + if self.get_member_by_id(&id).is_some() { + return Err(RoomError::MemberAlreadyExists( + self.get_fid_to_member(id), + )); + } + let signalling_member = Member::new( + id.clone(), + spec.credentials().to_string(), + self.room_id.clone(), + spec.idle_timeout().unwrap_or(self.rpc_conf.idle_timeout), + spec.reconnect_timeout() + .unwrap_or(self.rpc_conf.reconnect_timeout), + spec.ping_interval().unwrap_or(self.rpc_conf.ping_interval), + ); + + signalling_member.set_callback_urls(spec); + + for (id, publish) in spec.publish_endpoints() { + let signalling_publish = WebRtcPublishEndpoint::new( + id.clone(), + publish.p2p, + signalling_member.downgrade(), + publish.force_relay, + ); + signalling_member.insert_src(signalling_publish); + } + + for (id, play) in spec.play_endpoints() { + let partner_member = self.get_member(&play.src.member_id)?; + let src = partner_member + .get_src_by_id(&play.src.endpoint_id) + .ok_or_else(|| { + MemberError::EndpointNotFound( + partner_member.get_fid_to_endpoint( + play.src.endpoint_id.clone().into(), + ), + ) + })?; + + let sink = WebRtcPlayEndpoint::new( + id.clone(), + play.src.clone(), + src.downgrade(), + signalling_member.downgrade(), + play.force_relay, + ); + + signalling_member.insert_sink(sink); + } + + // This is needed for atomicity. + for (_, sink) in signalling_member.sinks() { + let src = sink.src(); + src.add_sink(sink.downgrade()); + } + + self.insert_member(id, signalling_member); + + Ok(()) + } +} + +#[cfg(test)] +mod test { + use std::time::Duration; + + use crate::{api::control::pipeline::Pipeline, conf::Conf}; + + use super::*; + + pub fn empty_participants_service() -> ParticipantService { + let room_spec = RoomSpec { + id: RoomId::from("test"), + pipeline: Pipeline::new(HashMap::new()), + }; + let ctx = AppContext::new( + Conf::default(), + crate::turn::new_turn_auth_service_mock(), + ); + + ParticipantService::new(&room_spec, &ctx).unwrap() + } + + /// Tests that when no RPC settings is provided in the `Member` element + /// spec, default RPC settings from config will be used. + #[test] + fn use_conf_when_no_rpc_settings_in_member_spec() { + let mut members = empty_participants_service(); + + let test_member_spec = MemberSpec::new( + Pipeline::new(HashMap::new()), + String::from("w/e"), + None, + None, + None, + None, + None, + ); + + let test_member_id = MemberId(String::from("test-member")); + members + .create_member(test_member_id.clone(), &test_member_spec) + .unwrap(); + + let test_member = members.get_member_by_id(&test_member_id).unwrap(); + let default_rpc_conf = Conf::default().rpc; + + assert_eq!( + test_member.get_ping_interval(), + default_rpc_conf.ping_interval + ); + assert_eq!( + test_member.get_idle_timeout(), + default_rpc_conf.idle_timeout + ); + assert_eq!( + test_member.get_reconnect_timeout(), + default_rpc_conf.reconnect_timeout + ); + } + + /// Tests that when RPC settings is provided in the `Member` element spec, + /// this RPC settings will be used. + #[test] + fn use_rpc_settings_from_member_spec() { + let mut members = empty_participants_service(); + + let idle_timeout = Duration::from_secs(60); + let ping_interval = Duration::from_secs(61); + let reconnect_timeout = Duration::from_secs(62); + + let test_member_spec = MemberSpec::new( + Pipeline::new(HashMap::new()), + String::from("w/e"), + None, + None, + Some(idle_timeout), + Some(reconnect_timeout), + Some(ping_interval), + ); + + let test_member_id = MemberId(String::from("test-member")); + members + .create_member(test_member_id.clone(), &test_member_spec) + .unwrap(); + + let test_member = members.get_member_by_id(&test_member_id).unwrap(); + assert_eq!(test_member.get_ping_interval(), ping_interval); + assert_eq!(test_member.get_idle_timeout(), idle_timeout); + assert_eq!(test_member.get_reconnect_timeout(), reconnect_timeout); + } } diff --git a/src/signalling/room.rs b/src/signalling/room.rs index 840c87326..b693c52cf 100644 --- a/src/signalling/room.rs +++ b/src/signalling/room.rs @@ -23,6 +23,7 @@ use crate::{ client::rpc_connection::{ AuthorizationError, Authorize, ClosedReason, CommandMessage, RpcConnection, RpcConnectionClosed, RpcConnectionEstablished, + RpcConnectionSettings, }, control::{ callback::{ @@ -621,78 +622,6 @@ impl Room { Ok(()) } - /// Creates new [`Member`] in this [`ParticipantService`]. - /// - /// This function will check that new [`Member`]'s ID is not present in - /// [`ParticipantService`]. - /// - /// # Errors - /// - /// Errors with [`RoomError::MemberAlreadyExists`] if [`Member`] with - /// provided [`MemberId`] already exists in [`ParticipantService`]. - pub fn create_member( - &mut self, - id: MemberId, - spec: &MemberSpec, - ) -> Result<(), RoomError> { - if self.members.get_member_by_id(&id).is_some() { - return Err(RoomError::MemberAlreadyExists( - self.members.get_fid_to_member(id), - )); - } - let signalling_member = Member::new( - id.clone(), - spec.credentials().to_string(), - self.id.clone(), - ); - - signalling_member.set_callback_urls(spec); - - for (id, publish) in spec.publish_endpoints() { - let signalling_publish = WebRtcPublishEndpoint::new( - id.clone(), - publish.p2p, - signalling_member.downgrade(), - publish.force_relay, - ); - signalling_member.insert_src(signalling_publish); - } - - for (id, play) in spec.play_endpoints() { - let partner_member = - self.members.get_member(&play.src.member_id)?; - let src = partner_member - .get_src_by_id(&play.src.endpoint_id) - .ok_or_else(|| { - MemberError::EndpointNotFound( - partner_member.get_fid_to_endpoint( - play.src.endpoint_id.clone().into(), - ), - ) - })?; - - let sink = WebRtcPlayEndpoint::new( - id.clone(), - play.src.clone(), - src.downgrade(), - signalling_member.downgrade(), - play.force_relay, - ); - - signalling_member.insert_sink(sink); - } - - // This is needed for atomicity. - for (_, sink) in signalling_member.sinks() { - let src = sink.src(); - src.add_sink(sink.downgrade()); - } - - self.members.insert_member(id, signalling_member); - - Ok(()) - } - /// Validates given [`CommandMessage`]. /// /// Two assertions are made: @@ -1032,7 +961,7 @@ impl Handler for Room { } impl Handler for Room { - type Result = Result<(), AuthorizationError>; + type Result = Result; /// Responses with `Ok` if `RpcConnection` is authorized, otherwise `Err`s. fn handle( @@ -1042,7 +971,10 @@ impl Handler for Room { ) -> Self::Result { self.members .get_member_by_id_and_credentials(&msg.member_id, &msg.credentials) - .map(|_| ()) + .map(move |member| RpcConnectionSettings { + idle_timeout: member.get_idle_timeout(), + ping_interval: member.get_ping_interval(), + }) } } @@ -1279,7 +1211,7 @@ impl Handler for Room { msg: CreateMember, _: &mut Self::Context, ) -> Self::Result { - self.create_member(msg.0.clone(), &msg.1)?; + self.members.create_member(msg.0.clone(), &msg.1)?; debug!( "Member [id = {}] created in Room [id = {}].", msg.0, self.id @@ -1355,9 +1287,13 @@ mod test { String::from("w/e"), None, None, + None, + None, + None, ); - room.create_member(MemberId(String::from("member1")), &member1) + room.members + .create_member(MemberId(String::from("member1")), &member1) .unwrap(); let no_such_peer = CommandMessage::new( @@ -1389,9 +1325,13 @@ mod test { String::from("w/e"), None, None, + None, + None, + None, ); - room.create_member(MemberId(String::from("member1")), &member1) + room.members + .create_member(MemberId(String::from("member1")), &member1) .unwrap(); let no_such_peer = CommandMessage::new( diff --git a/tests/e2e/callbacks/member.rs b/tests/e2e/callbacks/member.rs index 409102d0f..59831112d 100644 --- a/tests/e2e/callbacks/member.rs +++ b/tests/e2e/callbacks/member.rs @@ -56,7 +56,8 @@ async fn callback_test(name: &'static str, port: u16) -> CallbackTestItem { let deadline = Some(Duration::from_secs(5)); let client = TestMember::connect( create_response.get(name).unwrap(), - Box::new(on_event), + Some(Box::new(on_event)), + None, deadline, ) .await; diff --git a/tests/e2e/grpc_control_api/mod.rs b/tests/e2e/grpc_control_api/mod.rs index b5082a0aa..278fff14b 100644 --- a/tests/e2e/grpc_control_api/mod.rs +++ b/tests/e2e/grpc_control_api/mod.rs @@ -5,9 +5,10 @@ mod create; mod delete; +mod rpc_settings; mod signaling; -use std::collections::HashMap; +use std::{collections::HashMap, time::Duration}; use derive_builder::*; use medea_control_api_proto::grpc::api::{ @@ -209,6 +210,12 @@ pub struct Member { #[builder(default = "None")] #[builder(setter(strip_option))] on_leave: Option, + #[builder(default = "None")] + ping_interval: Option, + #[builder(default = "None")] + idle_timeout: Option, + #[builder(default = "None")] + reconnect_timeout: Option, } impl Into for Member { @@ -225,6 +232,9 @@ impl Into for Member { on_leave: self.on_leave.unwrap_or_default(), on_join: self.on_join.unwrap_or_default(), credentials: self.credentials.unwrap_or_default(), + ping_interval: self.ping_interval.map(Into::into), + idle_timeout: self.idle_timeout.map(Into::into), + reconnect_timeout: self.reconnect_timeout.map(Into::into), } } } diff --git a/tests/e2e/grpc_control_api/rpc_settings.rs b/tests/e2e/grpc_control_api/rpc_settings.rs new file mode 100644 index 000000000..513c162d5 --- /dev/null +++ b/tests/e2e/grpc_control_api/rpc_settings.rs @@ -0,0 +1,72 @@ +//! Tests for the RPC settings in `Member` element spec. + +use std::time::{Duration, Instant}; + +use futures::channel::oneshot; + +use crate::{ + grpc_control_api::{ControlClient, MemberBuilder, RoomBuilder}, + signalling::{ConnectionEvent, TestMember}, +}; + +/// Tests that RPC settings in `Member` element spec works. +/// +/// # Algorithm +/// +/// 1. Create `Room` with `Member` with `ping_interval: 10`, `idle_timeout: 3`, +/// `reconnect_timeout: 0`; +/// +/// 2. Connect with [`TestMember`] as created `Member`; +/// +/// 3. When connection will be started, store [`Instant`]; +/// +/// 4. Wait for connection drop because idle, and verify that diff between +/// connection open and drop if >3 and <4. +#[actix_rt::test] +async fn rpc_settings_from_spec_works() { + const ROOM_ID: &str = "rpc_settings_from_spec_works"; + + let mut control_client = ControlClient::new().await; + let create_room = RoomBuilder::default() + .id(ROOM_ID) + .add_member( + MemberBuilder::default() + .id("member") + .credentials("test") + .ping_interval(Some(Duration::from_secs(10))) + .idle_timeout(Some(Duration::from_secs(3))) + .reconnect_timeout(Some(Duration::from_secs(0))) + .build() + .unwrap(), + ) + .build() + .unwrap() + .build_request(String::new()); + control_client.create(create_room).await; + + let (test_end_tx, test_end_rx) = oneshot::channel(); + let mut test_end_tx = Some(test_end_tx); + + let mut opened = None; + TestMember::start( + format!("ws://127.0.0.1:8080/ws/{}/member/test", ROOM_ID), + None, + Some(Box::new(move |event| match event { + ConnectionEvent::Started => { + opened = Some(Instant::now()); + } + ConnectionEvent::Stopped => { + let diff = Instant::now() - opened.unwrap(); + + assert!(diff > Duration::from_secs(3)); + assert!(diff < Duration::from_secs(4)); + + test_end_tx.take().unwrap().send(()).unwrap(); + } + _ => {} + })), + Some(Duration::from_secs(10)), + ); + + test_end_rx.await.unwrap(); +} diff --git a/tests/e2e/grpc_control_api/signaling.rs b/tests/e2e/grpc_control_api/signaling.rs index 622044148..5b43d914e 100644 --- a/tests/e2e/grpc_control_api/signaling.rs +++ b/tests/e2e/grpc_control_api/signaling.rs @@ -83,7 +83,8 @@ async fn signalling_starts_when_create_play_member_after_pub_member() { TestMember::connect( &format!("ws://127.0.0.1:8080/ws/{}/publisher/test", TEST_NAME), - Box::new(on_event.clone()), + Some(Box::new(on_event.clone())), + None, deadline, ) .await; @@ -105,7 +106,8 @@ async fn signalling_starts_when_create_play_member_after_pub_member() { control_client.create(create_play_member).await; TestMember::connect( &format!("ws://127.0.0.1:8080/ws/{}/responder/qwerty", TEST_NAME), - Box::new(on_event), + Some(Box::new(on_event)), + None, deadline, ) .await; @@ -147,7 +149,8 @@ async fn signalling_starts_when_create_play_endpoint_after_pub_member() { TestMember::connect( &format!("ws://127.0.0.1:8080/ws/{}/publisher/test", TEST_NAME), - Box::new(on_event.clone()), + Some(Box::new(on_event.clone())), + None, deadline, ) .await; @@ -171,7 +174,8 @@ async fn signalling_starts_when_create_play_endpoint_after_pub_member() { TestMember::connect( &format!("ws://127.0.0.1:8080/ws/{}/responder/qwerty", TEST_NAME), - Box::new(on_event), + Some(Box::new(on_event)), + None, deadline, ) .await; @@ -213,7 +217,8 @@ async fn signalling_starts_in_loopback_scenario() { TestMember::connect( &format!("ws://127.0.0.1:8080/ws/{}/publisher/test", TEST_NAME), - Box::new(on_event), + Some(Box::new(on_event)), + None, deadline, ) .await; @@ -299,12 +304,14 @@ async fn peers_removed_on_delete_member() { let deadline = Some(Duration::from_secs(5)); TestMember::start( format!("ws://127.0.0.1:8080/ws/{}/publisher/test", TEST_NAME), - Box::new(on_event.clone()), + Some(Box::new(on_event.clone())), + None, deadline, ); TestMember::start( format!("ws://127.0.0.1:8080/ws/{}/responder/test", TEST_NAME), - Box::new(on_event), + Some(Box::new(on_event)), + None, deadline, ); } diff --git a/tests/e2e/signalling/command_validation.rs b/tests/e2e/signalling/command_validation.rs index a4a311033..f42c45bd0 100644 --- a/tests/e2e/signalling/command_validation.rs +++ b/tests/e2e/signalling/command_validation.rs @@ -61,13 +61,14 @@ async fn command_validation() { let deadline = Some(std::time::Duration::from_secs(5)); let member1 = TestMember::connect( &format!("ws://127.0.0.1:8080/ws/{}/publisher/test", TEST_NAME), - Box::new( + Some(Box::new( move |event: &Event, _: &mut Context, _: Vec<&Event>| { tx1.unbounded_send(event.clone()).unwrap(); }, - ), + )), + None, deadline, ) .await; @@ -75,13 +76,14 @@ async fn command_validation() { let (tx2, mut rx2) = unbounded(); TestMember::start( format!("ws://127.0.0.1:8080/ws/{}/responder/test", TEST_NAME), - Box::new( + Some(Box::new( move |event: &Event, _: &mut Context, _: Vec<&Event>| { tx2.unbounded_send(event.clone()).unwrap(); }, - ), + )), + None, deadline, ); diff --git a/tests/e2e/signalling/mod.rs b/tests/e2e/signalling/mod.rs index e8ca13c4b..3229bdc29 100644 --- a/tests/e2e/signalling/mod.rs +++ b/tests/e2e/signalling/mod.rs @@ -2,12 +2,14 @@ mod command_validation; mod pub_sub_signallng; +mod rpc_settings; mod three_pubs; use std::time::Duration; use actix::{ - Actor, Addr, Arbiter, AsyncContext, Context, Handler, StreamHandler, + Actor, ActorContext, Addr, Arbiter, AsyncContext, Context, Handler, + StreamHandler, }; use actix_codec::Framed; use actix_http::ws; @@ -18,12 +20,27 @@ use awc::{ }; use futures::{executor, stream::SplitSink, SinkExt as _, StreamExt as _}; use medea_client_api_proto::{ - ClientMsg, Command, Event, IceCandidate, PeerId, ServerMsg, + ClientMsg, Command, Event, IceCandidate, PeerId, RpcSettings, ServerMsg, }; pub type MessageHandler = Box, Vec<&Event>)>; +pub type ConnectionEventHandler = Box; + +/// Event which will be provided into [`ConnectionEventHandler`] when connection +/// will be established or disconnected. +pub enum ConnectionEvent { + /// Connection established. + Started, + + /// [`RpcSettings`] [`ServerMsg`] received. + SettingsReceived(RpcSettings), + + /// Connection disconnected. + Stopped, +} + /// Medea client for testing purposes. pub struct TestMember { /// Writer to WebSocket. @@ -43,7 +60,11 @@ pub struct TestMember { /// Function which will be called at every received [`Event`] /// by this [`TestMember`]. - on_message: MessageHandler, + on_message: Option, + + /// Function which will be called when connection will be established and + /// disconnected. + on_connection_event: Option, } impl TestMember { @@ -69,7 +90,8 @@ impl TestMember { /// [`TestMember`] actor. pub async fn connect( uri: &str, - on_message: MessageHandler, + on_message: Option, + on_connection_event: Option, deadline: Option, ) -> Addr { let (_, framed) = awc::Client::new().ws(uri).connect().await.unwrap(); @@ -84,20 +106,27 @@ impl TestMember { known_peers: vec![], deadline, on_message, + on_connection_event, } }) } /// Starts test member on current thread by given URI. + /// /// `on_message` - is function which will be called at every [`Event`] /// received from server. + /// + /// `on_connection_event` - is function which will be called when connection + /// will be established and disconnected. pub fn start( uri: String, - on_message: MessageHandler, + on_message: Option, + on_connection_event: Option, deadline: Option, ) { Arbiter::spawn(async move { - Self::connect(&uri, on_message, deadline).await; + Self::connect(&uri, on_message, on_connection_event, deadline) + .await; }) } } @@ -220,11 +249,31 @@ impl StreamHandler> for TestMember { } let mut events: Vec<&Event> = self.events.iter().collect(); events.push(&event); - (self.on_message)(&event, ctx, events); + if let Some(func) = self.on_message.as_mut() { + func(&event, ctx, events); + } self.events.push(event); } - ServerMsg::RpcSettings(_) => {} + ServerMsg::RpcSettings(settings) => { + if let Some(func) = self.on_connection_event.as_mut() { + func(ConnectionEvent::SettingsReceived(settings)) + }; + } } } } + + fn started(&mut self, _: &mut Self::Context) { + if let Some(func) = self.on_connection_event.as_mut() { + func(ConnectionEvent::Started) + }; + } + + fn finished(&mut self, ctx: &mut Self::Context) { + if let Some(func) = self.on_connection_event.as_mut() { + func(ConnectionEvent::Stopped) + }; + + ctx.stop() + } } diff --git a/tests/e2e/signalling/pub_sub_signallng.rs b/tests/e2e/signalling/pub_sub_signallng.rs index d8a29c25b..2b6161937 100644 --- a/tests/e2e/signalling/pub_sub_signallng.rs +++ b/tests/e2e/signalling/pub_sub_signallng.rs @@ -98,12 +98,14 @@ fn pub_sub_video_call() { let deadline = Some(std::time::Duration::from_secs(5)); TestMember::start( format!("{}/caller/test", base_url), - Box::new(test_fn), + Some(Box::new(test_fn)), + None, deadline, ); TestMember::start( format!("{}/responder/test", base_url), - Box::new(test_fn), + Some(Box::new(test_fn)), + None, deadline, ); }) diff --git a/tests/e2e/signalling/rpc_settings.rs b/tests/e2e/signalling/rpc_settings.rs new file mode 100644 index 000000000..4ff0d67a1 --- /dev/null +++ b/tests/e2e/signalling/rpc_settings.rs @@ -0,0 +1,57 @@ +//! Tests for the RPC settings in `Member` element spec. + +use std::time::Duration; + +use futures::channel::oneshot; + +use crate::{ + grpc_control_api::{ControlClient, MemberBuilder, RoomBuilder}, + signalling::{ConnectionEvent, TestMember}, +}; + +/// Tests that RPC settings configured via Control API request are propagated in +/// [`ServerMsg::RpcSettings`] server message. +#[actix_rt::test] +async fn rpc_settings_server_msg() { + const ROOM_ID: &str = "rpc_settings_server_msg"; + const PING_INTERVAL_SECS: u64 = 111; + const IDLE_TIMEOUT_SECS: u64 = 222; + + let mut control_client = ControlClient::new().await; + let create_room = RoomBuilder::default() + .id(ROOM_ID) + .add_member( + MemberBuilder::default() + .id("member") + .credentials("test") + .ping_interval(Some(Duration::from_secs(PING_INTERVAL_SECS))) + .idle_timeout(Some(Duration::from_secs(IDLE_TIMEOUT_SECS))) + .reconnect_timeout(Some(Duration::from_secs(0))) + .build() + .unwrap(), + ) + .build() + .unwrap() + .build_request(String::new()); + control_client.create(create_room).await; + + let (end_tx, end_rx) = oneshot::channel(); + let mut end_tx = Some(end_tx); + TestMember::start( + format!("ws://127.0.0.1:8080/ws/{}/member/test", ROOM_ID), + None, + Some(Box::new(move |event| { + if let ConnectionEvent::SettingsReceived(settings) = event { + assert_eq!(settings.idle_timeout_ms, IDLE_TIMEOUT_SECS * 1000); + assert_eq!( + settings.ping_interval_ms, + PING_INTERVAL_SECS * 1000 + ); + end_tx.take().unwrap().send(()).unwrap(); + } + })), + Some(Duration::from_secs(10)), + ); + + end_rx.await.unwrap(); +} diff --git a/tests/e2e/signalling/three_pubs.rs b/tests/e2e/signalling/three_pubs.rs index 04acb2540..4a2f3f4a0 100644 --- a/tests/e2e/signalling/three_pubs.rs +++ b/tests/e2e/signalling/three_pubs.rs @@ -122,17 +122,20 @@ fn three_members_p2p_video_call() { let deadline = Some(std::time::Duration::from_secs(5)); TestMember::start( format!("{}/member-1/test", base_url), - Box::new(test_fn.clone()), + Some(Box::new(test_fn.clone())), + None, deadline, ); TestMember::start( format!("{}/member-2/test", base_url), - Box::new(test_fn.clone()), + Some(Box::new(test_fn.clone())), + None, deadline, ); TestMember::start( format!("{}/member-3/test", base_url), - Box::new(test_fn), + Some(Box::new(test_fn)), + None, deadline, ); })