diff --git a/compositor_pipeline/src/error.rs b/compositor_pipeline/src/error.rs index bfd5439b1..91cecfdab 100644 --- a/compositor_pipeline/src/error.rs +++ b/compositor_pipeline/src/error.rs @@ -70,6 +70,12 @@ pub enum OutputInitError { #[error(transparent)] SocketError(#[from] std::io::Error), + + #[error("Failed to register output. Port: {0} is already used or not available.")] + PortAlreadyInUse(u16), + + #[error("Failed to register output. All ports in range {lower_bound} to {upper_bound} are already used or not available.")] + AllPortsAlreadyInUse { lower_bound: u16, upper_bound: u16 }, } #[derive(Debug, thiserror::Error)] diff --git a/compositor_pipeline/src/pipeline.rs b/compositor_pipeline/src/pipeline.rs index 2b14dfd7e..0d5fe8bc1 100644 --- a/compositor_pipeline/src/pipeline.rs +++ b/compositor_pipeline/src/pipeline.rs @@ -35,6 +35,7 @@ pub mod input; pub mod output; mod pipeline_input; mod pipeline_output; +pub mod rtp; mod structs; use self::pipeline_input::new_pipeline_input; @@ -42,15 +43,9 @@ use self::pipeline_output::new_pipeline_output; pub use self::structs::AudioCodec; pub use self::structs::VideoCodec; -#[derive(Debug)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct Port(pub u16); -#[derive(Debug, Clone, Copy)] -pub enum RequestedPort { - Exact(u16), - Range((u16, u16)), -} - pub struct RegisterInputOptions { pub input_options: InputOptions, pub queue_options: queue::InputOptions, diff --git a/compositor_pipeline/src/pipeline/input/rtp.rs b/compositor_pipeline/src/pipeline/input/rtp.rs index e07bb6492..dbfc9372b 100644 --- a/compositor_pipeline/src/pipeline/input/rtp.rs +++ b/compositor_pipeline/src/pipeline/input/rtp.rs @@ -1,5 +1,4 @@ use std::{ - net, sync::{atomic::AtomicBool, Arc}, thread, }; @@ -7,8 +6,9 @@ use std::{ use crate::pipeline::{ decoder::{self, DecoderOptions}, encoder, + rtp::{bind_to_requested_port, BindToPortError, RequestedPort, TransportProtocol}, structs::EncodedChunkKind, - Port, RequestedPort, + Port, }; use compositor_render::InputId; use crossbeam_channel::{unbounded, Receiver}; @@ -52,12 +52,6 @@ pub struct RtpReceiverOptions { pub stream: RtpStream, } -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum TransportProtocol { - Udp, - TcpServer, -} - #[derive(Debug, Clone, PartialEq, Eq)] pub struct InputVideoStream { pub options: decoder::VideoDecoderOptions, @@ -137,7 +131,7 @@ impl RtpReceiver { ) .map_err(RtpReceiverError::SocketOptions)?; - let port = Self::bind_to_port(opts.port, &socket)?; + let port = bind_to_requested_port(opts.port, &socket)?; socket.listen(1).map_err(RtpReceiverError::SocketBind)?; @@ -174,7 +168,7 @@ impl RtpReceiver { } } - let port = Self::bind_to_port(opts.port, &socket)?; + let port = bind_to_requested_port(opts.port, &socket)?; socket .set_read_timeout(Some(std::time::Duration::from_millis(50))) @@ -189,53 +183,6 @@ impl RtpReceiver { Ok((port, receiver_thread, packets_rx)) } - - fn bind_to_port( - requested_port: RequestedPort, - socket: &socket2::Socket, - ) -> Result { - let port = match requested_port { - RequestedPort::Exact(port) => { - socket - .bind( - &net::SocketAddr::V4(net::SocketAddrV4::new( - net::Ipv4Addr::UNSPECIFIED, - port, - )) - .into(), - ) - .map_err(|err| match err.kind() { - std::io::ErrorKind::AddrInUse => RtpReceiverError::PortAlreadyInUse(port), - _ => RtpReceiverError::SocketBind(err), - })?; - port - } - RequestedPort::Range((lower_bound, upper_bound)) => { - let port = (lower_bound..upper_bound).find(|port| { - let bind_res = socket.bind( - &net::SocketAddr::V4(net::SocketAddrV4::new( - net::Ipv4Addr::UNSPECIFIED, - *port, - )) - .into(), - ); - - bind_res.is_ok() - }); - - match port { - Some(port) => port, - None => { - return Err(RtpReceiverError::AllPortsAlreadyInUse { - lower_bound, - upper_bound, - }) - } - } - } - }; - Ok(Port(port)) - } } impl Drop for RtpReceiver { @@ -357,3 +304,19 @@ pub enum DepayloadingError { #[error(transparent)] Rtp(#[from] rtp::Error), } + +impl From for RtpReceiverError { + fn from(value: BindToPortError) -> Self { + match value { + BindToPortError::SocketBind(err) => RtpReceiverError::SocketBind(err), + BindToPortError::PortAlreadyInUse(port) => RtpReceiverError::PortAlreadyInUse(port), + BindToPortError::AllPortsAlreadyInUse { + lower_bound, + upper_bound, + } => RtpReceiverError::AllPortsAlreadyInUse { + lower_bound, + upper_bound, + }, + } + } +} diff --git a/compositor_pipeline/src/pipeline/input/rtp/tcp_server.rs b/compositor_pipeline/src/pipeline/input/rtp/tcp_server.rs index 853ad943c..a1ba849d8 100644 --- a/compositor_pipeline/src/pipeline/input/rtp/tcp_server.rs +++ b/compositor_pipeline/src/pipeline/input/rtp/tcp_server.rs @@ -1,4 +1,5 @@ use std::{ + collections::VecDeque, io::{self, Read}, net::TcpStream, sync::{atomic::AtomicBool, Arc}, @@ -9,14 +10,13 @@ use std::{ use bytes::BytesMut; use compositor_render::error::ErrorStack; use crossbeam_channel::Sender; -use log::info; +use log::error; pub(super) fn run_tcp_server_receiver( socket: std::net::TcpListener, packets_tx: Sender, should_close: Arc, ) { - let mut buffer = BytesMut::zeroed(65536); // make accept non blocking so we have a chance to handle should_close value socket .set_nonblocking(true) @@ -28,73 +28,79 @@ pub(super) fn run_tcp_server_receiver( } // accept only one connection at the time - let accept_result = socket.accept(); - let Ok((socket, _)) = accept_result else { + let Ok((socket, _)) = socket.accept() else { thread::sleep(Duration::from_millis(50)); continue; }; - info!("Connection accepted"); - - socket - .set_read_timeout(Some(Duration::from_millis(50))) - .expect("Cannot set read timeout"); + let mut socket = TcpReadPacketStream::new(socket, should_close.clone()); loop { - let mut len_bytes = [0u8; 2]; - if let Err(err) = (&socket).read_exact_with_should_close(&mut len_bytes, &should_close) - { - maybe_log_err(err); - break; - }; - let len = u16::from_be_bytes(len_bytes) as usize; - - if let Err(err) = - (&socket).read_exact_with_should_close(&mut buffer[..len], &should_close) - { - maybe_log_err(err); - break; - }; - packets_tx - .send(bytes::Bytes::copy_from_slice(&buffer[..len])) - .unwrap(); + match socket.read_packet() { + Ok(packet) => { + packets_tx.send(packet).unwrap(); + } + Err(err) => { + error!( + "Error while reading from TCP socket: {}", + ErrorStack::new(&err).into_string() + ); + break; + } + } } } } -fn maybe_log_err(err: io::Error) { - if err.kind() != io::ErrorKind::WouldBlock { - log::error!( - "Unknown error when reading from TCP socket. {}", - ErrorStack::new(&err).into_string() - ); - } +struct TcpReadPacketStream { + socket: TcpStream, + buf: VecDeque, + read_buf: Vec, + should_close: Arc, } -trait TcpStreamExt { - fn read_exact_with_should_close( - &mut self, - buf: &mut [u8], - should_close: &Arc, - ) -> io::Result<()>; -} +impl TcpReadPacketStream { + fn new(socket: TcpStream, should_close: Arc) -> Self { + socket + .set_read_timeout(Some(Duration::from_millis(50))) + .expect("Cannot set read timeout"); + Self { + socket, + buf: VecDeque::new(), + read_buf: vec![0; 65536], + should_close, + } + } + fn read_packet(&mut self) -> io::Result { + self.read_until_buffer_size(2)?; + + let mut len_bytes = [0u8; 2]; + self.buf.read_exact(&mut len_bytes)?; + let len = u16::from_be_bytes(len_bytes) as usize; + + self.read_until_buffer_size(len)?; + let mut packet = BytesMut::zeroed(len); + self.buf.read_exact(&mut packet[..])?; + Ok(packet.freeze()) + } -impl TcpStreamExt for &TcpStream { - fn read_exact_with_should_close( - &mut self, - buf: &mut [u8], - should_close: &Arc, - ) -> io::Result<()> { + fn read_until_buffer_size(&mut self, buf_size: usize) -> io::Result<()> { loop { - match self.read_exact(buf) { - Ok(val) => return Ok(val), - Err(err) => match err.kind() { - std::io::ErrorKind::WouldBlock - if should_close.load(std::sync::atomic::Ordering::Relaxed) => - { - continue; + if self.buf.len() >= buf_size { + return Ok(()); + } + match self.socket.read(&mut self.read_buf) { + Ok(read_bytes) => { + self.buf.extend(self.read_buf[0..read_bytes].iter()); + } + Err(err) => { + let should_close = self.should_close.load(std::sync::atomic::Ordering::Relaxed); + match err.kind() { + std::io::ErrorKind::WouldBlock if !should_close => { + continue; + } + _ => return io::Result::Err(err), } - _ => return io::Result::Err(err), - }, + } }; } } diff --git a/compositor_pipeline/src/pipeline/input/rtp/udp.rs b/compositor_pipeline/src/pipeline/input/rtp/udp.rs index b7f4400a3..3e7428e6a 100644 --- a/compositor_pipeline/src/pipeline/input/rtp/udp.rs +++ b/compositor_pipeline/src/pipeline/input/rtp/udp.rs @@ -1,6 +1,6 @@ use std::sync::{atomic::AtomicBool, Arc}; -use bytes::BytesMut; +use bytes::{Bytes, BytesMut}; use crossbeam_channel::Sender; pub(super) fn run_udp_receiver( @@ -28,7 +28,7 @@ pub(super) fn run_udp_receiver( }; packets_tx - .send(bytes::Bytes::copy_from_slice(&buffer[..received_bytes])) + .send(Bytes::copy_from_slice(&buffer[..received_bytes])) .unwrap(); } } diff --git a/compositor_pipeline/src/pipeline/output/rtp.rs b/compositor_pipeline/src/pipeline/output/rtp.rs index c96bac447..6ec5973d5 100644 --- a/compositor_pipeline/src/pipeline/output/rtp.rs +++ b/compositor_pipeline/src/pipeline/output/rtp.rs @@ -1,6 +1,12 @@ use compositor_render::OutputId; use log::{debug, error}; -use std::sync::Arc; +use std::{ + io::{self, Write}, + sync::{atomic::AtomicBool, Arc}, + thread, + time::Duration, + u16, +}; use rand::Rng; use rtp::packetizer::Payloader; @@ -8,30 +14,38 @@ use webrtc_util::Marshal; use crate::{ error::OutputInitError, - pipeline::{structs::EncodedChunk, OutputAudioOptions, OutputVideoOptions}, + pipeline::{ + rtp::{bind_to_requested_port, BindToPortError, RequestedPort}, + structs::EncodedChunk, + Port, + }, }; #[derive(Debug)] pub struct RtpSender { - pub port: u16, - pub ip: Arc, + pub connection_options: RtpConnectionOptions, sender_thread: Option>, + should_close: Arc, } -pub struct RtpContext { +struct RtpContext { ssrc: u32, next_sequence_number: u16, payloader: rtp::codecs::h264::H264Payloader, - socket: std::net::UdpSocket, + socket: socket2::Socket, + should_close: Arc, } #[derive(Debug, Clone)] pub struct RtpSenderOptions { - pub port: u16, - pub ip: Arc, pub output_id: OutputId, - pub video: Option, - pub audio: Option, + pub connection_options: RtpConnectionOptions, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum RtpConnectionOptions { + Udp { port: Port, ip: Arc }, + TcpServer { port: RequestedPort }, } impl RtpSender { @@ -44,42 +58,142 @@ impl RtpSender { let next_sequence_number = rng.gen::(); let payloader = rtp::codecs::h264::H264Payloader::default(); - let socket = std::net::UdpSocket::bind(std::net::SocketAddrV4::new( - std::net::Ipv4Addr::UNSPECIFIED, - 0, - ))?; - - socket.connect((options.ip.as_ref(), options.port))?; + let (socket, _port) = match &options.connection_options { + RtpConnectionOptions::Udp { port, ip } => Self::udp_socket(ip, *port)?, + RtpConnectionOptions::TcpServer { port } => Self::tcp_socket(*port)?, + }; + let should_close = Arc::new(AtomicBool::new(false)); let mut ctx = RtpContext { ssrc, next_sequence_number, payloader, - socket, + socket: socket.try_clone()?, + should_close: should_close.clone(), }; + let connection_options = options.connection_options.clone(); let sender_thread = std::thread::Builder::new() .name(format!("RTP sender for output {}", options.output_id)) - .spawn(move || { - for packet in packets { - Self::send_data(&mut ctx, packet); - } + .spawn(move || match connection_options { + RtpConnectionOptions::Udp { .. } => run_udp_sender_thread(&mut ctx, packets), + RtpConnectionOptions::TcpServer { .. } => run_tcp_sender_thread(&mut ctx, packets), }) .unwrap(); Ok(Self { - port: options.port, - ip: options.ip, + connection_options: options.connection_options, sender_thread: Some(sender_thread), + should_close, }) } - /// this assumes, that a "packet" contains data about a single frame (access unit) - fn send_data(context: &mut RtpContext, packet: EncodedChunk) { + fn udp_socket(ip: &str, port: Port) -> Result<(socket2::Socket, Port), OutputInitError> { + let socket = std::net::UdpSocket::bind(std::net::SocketAddrV4::new( + std::net::Ipv4Addr::UNSPECIFIED, + 0, + ))?; + + socket.connect((ip, port.0))?; + Ok((socket.into(), port)) + } + + fn tcp_socket(port: RequestedPort) -> Result<(socket2::Socket, Port), OutputInitError> { + let socket = socket2::Socket::new( + socket2::Domain::IPV4, + socket2::Type::STREAM, + Some(socket2::Protocol::TCP), + ) + .map_err(OutputInitError::SocketError)?; + + let port = bind_to_requested_port(port, &socket)?; + + socket.listen(1).map_err(OutputInitError::SocketError)?; + Ok((socket, port)) + } +} + +fn run_tcp_sender_thread( + context: &mut RtpContext, + mut packets: Box + Send>, +) { + // make accept non blocking so we have a chance to handle should_close value + context + .socket + .set_nonblocking(true) + .expect("Cannot set non-blocking"); + loop { + if context + .should_close + .load(std::sync::atomic::Ordering::Relaxed) + { + return; + } + + // accept only one connection at the time + let Ok((socket, _)) = context.socket.accept() else { + thread::sleep(Duration::from_millis(50)); + continue; + }; + + let mut socket = TcpWritePacketStream::new(socket, context.should_close.clone()); + loop { + let Some(EncodedChunk { data, pts, .. }) = packets.next() else { + return; + }; + + let payloads = match context.payloader.payload(64000, &data) { + Ok(p) => p, + Err(e) => { + error!("Failed to payload a packet: {}", e); + return; + } + }; + let packets_amount = payloads.len(); + + for (i, payload) in payloads.into_iter().enumerate() { + let header = rtp::header::Header { + version: 2, + padding: false, + extension: false, + marker: i == packets_amount - 1, // marker needs to be set on the last packet of each frame + payload_type: 96, + sequence_number: context.next_sequence_number, + timestamp: (pts.as_secs_f64() * 90000.0) as u32, + ssrc: context.ssrc, + ..Default::default() + }; + + let packet = rtp::packet::Packet { header, payload }; + + let packet = match packet.marshal() { + Ok(p) => p, + Err(e) => { + error!("Failed to marshal a packet: {}", e); + return; + } + }; + + if let Err(err) = socket.write_packet(packet) { + debug!("Failed to send packet: {err}"); + } + + context.next_sequence_number = context.next_sequence_number.wrapping_add(1); + } + } + } +} + +/// this assumes, that a "packet" contains data about a single frame (access unit) +fn run_udp_sender_thread( + context: &mut RtpContext, + packets: Box + Send>, +) { + for packet in packets { // TODO: check if this is h264 let EncodedChunk { data, pts, .. } = packet; - let payloads = match context.payloader.payload(1500, &data) { + let payloads = match context.payloader.payload(1400, &data) { Ok(p) => p, Err(e) => { error!("Failed to payload a packet: {}", e); @@ -122,9 +236,73 @@ impl RtpSender { impl Drop for RtpSender { fn drop(&mut self) { + self.should_close + .store(true, std::sync::atomic::Ordering::Relaxed); match self.sender_thread.take() { Some(handle) => handle.join().unwrap(), None => error!("RTP sender thread was already joined."), } } } + +impl From for OutputInitError { + fn from(value: BindToPortError) -> Self { + match value { + BindToPortError::SocketBind(err) => OutputInitError::SocketError(err), + BindToPortError::PortAlreadyInUse(port) => OutputInitError::PortAlreadyInUse(port), + BindToPortError::AllPortsAlreadyInUse { + lower_bound, + upper_bound, + } => OutputInitError::AllPortsAlreadyInUse { + lower_bound, + upper_bound, + }, + } + } +} + +struct TcpWritePacketStream { + socket: socket2::Socket, + should_close: Arc, +} + +impl TcpWritePacketStream { + fn new(socket: socket2::Socket, should_close: Arc) -> Self { + socket + .set_write_timeout(Some(Duration::from_millis(50))) + .expect("Cannot set write timeout"); + Self { + socket, + should_close, + } + } + + fn write_packet(&mut self, data: bytes::Bytes) -> io::Result<()> { + self.write_bytes(&u16::to_be_bytes(data.len() as u16))?; + self.write_bytes(&data[..])?; + io::Result::Ok(()) + } + + fn write_bytes(&mut self, data: &[u8]) -> io::Result<()> { + let mut written_bytes = 0; + loop { + if written_bytes >= data.len() { + return Ok(()); + } + match self.socket.write(&data[written_bytes..]) { + Ok(bytes) => { + written_bytes += bytes; + } + Err(err) => { + let should_close = self.should_close.load(std::sync::atomic::Ordering::Relaxed); + match err.kind() { + std::io::ErrorKind::WouldBlock if !should_close => { + continue; + } + _ => return io::Result::Err(err), + } + } + }; + } + } +} diff --git a/compositor_pipeline/src/pipeline/rtp.rs b/compositor_pipeline/src/pipeline/rtp.rs new file mode 100644 index 000000000..b67941d11 --- /dev/null +++ b/compositor_pipeline/src/pipeline/rtp.rs @@ -0,0 +1,62 @@ +use std::net; + +use super::Port; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TransportProtocol { + Udp, + TcpServer, +} + +pub(super) enum BindToPortError { + SocketBind(std::io::Error), + PortAlreadyInUse(u16), + AllPortsAlreadyInUse { lower_bound: u16, upper_bound: u16 }, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RequestedPort { + Exact(u16), + Range((u16, u16)), +} + +pub(super) fn bind_to_requested_port( + requested_port: RequestedPort, + socket: &socket2::Socket, +) -> Result { + let port = match requested_port { + RequestedPort::Exact(port) => { + socket + .bind( + &net::SocketAddr::V4(net::SocketAddrV4::new(net::Ipv4Addr::UNSPECIFIED, port)) + .into(), + ) + .map_err(|err| match err.kind() { + std::io::ErrorKind::AddrInUse => BindToPortError::PortAlreadyInUse(port), + _ => BindToPortError::SocketBind(err), + })?; + port + } + RequestedPort::Range((lower_bound, upper_bound)) => { + let port = (lower_bound..upper_bound).find(|port| { + let bind_res = socket.bind( + &net::SocketAddr::V4(net::SocketAddrV4::new(net::Ipv4Addr::UNSPECIFIED, *port)) + .into(), + ); + + bind_res.is_ok() + }); + + match port { + Some(port) => port, + None => { + return Err(BindToPortError::AllPortsAlreadyInUse { + lower_bound, + upper_bound, + }) + } + } + } + }; + Ok(Port(port)) +} diff --git a/docs/pages/api/routes.md b/docs/pages/api/routes.md index fca3b52ae..18bdcf5d7 100644 --- a/docs/pages/api/routes.md +++ b/docs/pages/api/routes.md @@ -71,8 +71,9 @@ type RegisterOutputStream = { type: "register"; entity_type: "output_stream"; output_id: string; + transport_protocol?: "udp" | "tcp_server"; port: u16; - ip: string; + ip?: string; video: Video } @@ -82,10 +83,10 @@ type Video = { height: number }, initial: Component - encoder_preset?: EncoderPreset, + encoder_preset?: VideoEncoderPreset, } -type EncoderPreset = +type VideoEncoderPreset = | "ultrafast" | "superfast" | "veryfast" @@ -101,7 +102,13 @@ type EncoderPreset = Register a new RTP output stream. - `output_id` - An identifier for the output stream. It can be used in the `UpdateOutput` request to define what to render for the output stream. -- `port` / `ip` - UDP port and IP where compositor should send the stream. +- `transport_protocol` - (**default=`"udp"`**) Transport layer protocol that will be used to send RTP packets. + - `udp` - UDP protocol. + - `tcp_server` - TCP protocol where LiveCompositor is the server side of the connection. +- `port` - Depends on the value of the `transport_protocol` field: + - `udp` - An UDP port number that RTP packets will be sent to. + - `tcp_server` - A local TCP port number or a port range that LiveCompositor will listen for incoming connections. +- `ip` - Only valid if `transport_protocol="udp"`. IP address where RTP packets should be sent to. - `video.resolution` - Output resolution in pixels. - `video.initial` - Root of a component tree/scene that should be rendered for the output. Use [`update_output` request](#update-output) to update this value after registration. [Learn more](../concept/component). - `video.encoder_preset` - (**default=`"fast"`**) Preset for an encoder. See `FFmpeg` [docs](https://trac.ffmpeg.org/wiki/Encode/H.264#Preset) to learn more. diff --git a/examples/common/common.rs b/examples/common/common.rs index b055ea9f1..8a82db5ba 100644 --- a/examples/common/common.rs +++ b/examples/common/common.rs @@ -13,6 +13,7 @@ use video_compositor::config::config; use serde::Serialize; /// The SDP file will describe an RTP session on localhost with H264 encoding. +#[allow(dead_code)] pub fn write_example_sdp_file(ip: &str, port: u16) -> Result { let sdp_filepath = PathBuf::from(format!("/tmp/example_sdp_input_{}.sdp", port)); let mut file = File::create(&sdp_filepath)?; diff --git a/examples/rtp_tcp.rs b/examples/rtp_tcp.rs index 4129848b8..631bb929f 100644 --- a/examples/rtp_tcp.rs +++ b/examples/rtp_tcp.rs @@ -9,8 +9,6 @@ use std::{ }; use video_compositor::{config::config, http, logger, types::Resolution}; -use crate::common::write_example_sdp_file; - #[path = "./common/common.rs"] mod common; @@ -36,15 +34,6 @@ fn main() { } fn start_example_client_code() -> Result<()> { - info!("[example] Start listening on output port."); - let output_sdp = write_example_sdp_file("127.0.0.1", 8002)?; - Command::new("ffplay") - .args(["-protocol_whitelist", "file,rtp,udp", &output_sdp]) - .stdout(Stdio::null()) - .stderr(Stdio::null()) - .spawn()?; - thread::sleep(Duration::from_secs(2)); - info!("[example] Download sample."); let sample_path = env::current_dir()?.join(SAMPLE_FILE_PATH); fs::create_dir_all(sample_path.parent().unwrap())?; @@ -76,28 +65,34 @@ fn start_example_client_code() -> Result<()> { "type": "register", "entity_type": "output_stream", "output_id": "output_1", + "transport_protocol": "tcp_server", "port": 8002, - "ip": "127.0.0.1", - "resolution": { - "width": VIDEO_RESOLUTION.width, - "height": VIDEO_RESOLUTION.height, - }, - "encoder_preset": "medium", - "initial_scene": { - "type": "shader", - "id": "shader_node_1", - "shader_id": "shader_example_1", - "children": [ - { - "id": "input_1", - "type": "input_stream", - "input_id": "input_1", - } - ], - "resolution": { "width": VIDEO_RESOLUTION.width, "height": VIDEO_RESOLUTION.height }, + "video": { + "resolution": { + "width": VIDEO_RESOLUTION.width, + "height": VIDEO_RESOLUTION.height, + }, + "encoder_preset": "medium", + "initial": { + "type": "shader", + "id": "shader_node_1", + "shader_id": "shader_example_1", + "children": [ + { + "id": "input_1", + "type": "input_stream", + "input_id": "input_1", + } + ], + "resolution": { "width": VIDEO_RESOLUTION.width, "height": VIDEO_RESOLUTION.height }, + } } }))?; - + let gst_output_command = "gst-launch-1.0 -v tcpclientsrc host=127.0.0.1 port=8002 ! \"application/x-rtp-stream\" ! rtpstreamdepay ! rtph264depay ! decodebin ! videoconvert ! autovideosink".to_string(); + Command::new("bash") + .arg("-c") + .arg(gst_output_command) + .spawn()?; std::thread::sleep(Duration::from_millis(500)); info!("[example] Start pipeline"); @@ -106,8 +101,13 @@ fn start_example_client_code() -> Result<()> { }))?; let sample_path_str = sample_path.to_string_lossy().to_string(); - let gst_command = format!("gst-launch-1.0 -v funnel name=fn filesrc location={sample_path_str} ! qtdemux ! h264parse ! rtph264pay config-interval=1 pt=96 ! .send_rtp_sink rtpsession name=session .send_rtp_src ! fn. session.send_rtcp_src ! fn. fn. ! rtpstreampay ! tcpclientsink host=127.0.0.1 port=8004"); - Command::new("bash").arg("-c").arg(gst_command).spawn()?; + let gst_input_command = format!("gst-launch-1.0 -v funnel name=fn filesrc location={sample_path_str} ! qtdemux ! h264parse ! rtph264pay config-interval=1 pt=96 ! .send_rtp_sink rtpsession name=session .send_rtp_src ! fn. session.send_rtcp_src ! fn. fn. ! rtpstreampay ! tcpclientsink host=127.0.0.1 port=8004"); + Command::new("bash") + .arg("-c") + .stdout(Stdio::null()) + .stderr(Stdio::null()) + .arg(gst_input_command) + .spawn()?; Ok(()) } diff --git a/schemas/register.schema.json b/schemas/register.schema.json index c7b303a7f..c2a02e9ff 100644 --- a/schemas/register.schema.json +++ b/schemas/register.schema.json @@ -140,7 +140,6 @@ "type": "object", "required": [ "entity_type", - "ip", "output_id", "port" ], @@ -155,12 +154,23 @@ "$ref": "#/definitions/OutputId" }, "port": { - "type": "integer", - "format": "uint16", - "minimum": 0.0 + "$ref": "#/definitions/Port" }, "ip": { - "type": "string" + "type": [ + "string", + "null" + ] + }, + "transport_protocol": { + "anyOf": [ + { + "$ref": "#/definitions/TransportProtocol" + }, + { + "type": "null" + } + ] }, "video": { "anyOf": [ @@ -433,7 +443,7 @@ ] }, { - "description": "TCP protocol where LiveCompositor is a server side of the connection.", + "description": "TCP protocol where LiveCompositor is the server side of the connection.", "type": "string", "enum": [ "tcp_server" diff --git a/src/api.rs b/src/api.rs index e276b7d53..9e6b02b1d 100644 --- a/src/api.rs +++ b/src/api.rs @@ -64,16 +64,12 @@ pub enum UnregisterRequest { #[serde(tag = "query", rename_all = "snake_case")] pub enum QueryRequest { WaitForNextFrame { input_id: InputId }, - Inputs, - Outputs, } #[derive(Serialize, Debug)] #[serde(untagged)] pub enum Response { Ok {}, - Inputs { inputs: Vec }, - Outputs { outputs: Vec }, RegisteredPort { port: u16 }, } @@ -157,36 +153,6 @@ impl Api { ); Ok(ResponseHandler::DeferredResponse(receiver)) } - QueryRequest::Inputs => { - let inputs = self - .pipeline() - .inputs() - .map(|(id, node)| match node.input { - pipeline::input::Input::Rtp(ref rtp) => InputInfo::Rtp { - id: id.clone().into(), - port: rtp.port, - }, - - pipeline::input::Input::Mp4(ref mp4) => InputInfo::Mp4 { - id: mp4.input_id.clone().into(), - }, - }) - .collect(); - Ok(ResponseHandler::Response(Response::Inputs { inputs })) - } - QueryRequest::Outputs => { - let outputs = self.pipeline().with_outputs(|iter| { - iter.map(|(id, output)| match output.output { - pipeline::output::Output::Rtp(ref rtp) => OutputInfo { - id: id.clone().into(), - port: rtp.port, - ip: rtp.ip.clone(), - }, - }) - .collect() - }); - Ok(ResponseHandler::Response(Response::Outputs { outputs })) - } } } diff --git a/src/api/register_request.rs b/src/api/register_request.rs index ba0f0b4f9..62a19b698 100644 --- a/src/api/register_request.rs +++ b/src/api/register_request.rs @@ -1,10 +1,7 @@ -use compositor_pipeline::pipeline::{self, Port, RegisterInputOptions}; +use compositor_pipeline::pipeline::{Port, RegisterInputOptions}; use compositor_render::InputId; -use crate::{ - error::ApiError, - types::{RegisterOutputRequest, RegisterRequest}, -}; +use crate::{error::ApiError, types::RegisterRequest}; use super::{Api, ResponseHandler}; @@ -35,7 +32,10 @@ pub fn handle_register_request( let (input_id, register_options) = mp4.try_into()?; handle_register_input(api, input_id, register_options) } - RegisterRequest::OutputStream(output_stream) => handle_register_output(api, output_stream), + RegisterRequest::OutputStream(output_options) => { + api.pipeline().register_output(output_options.try_into()?)?; + Ok(ResponseHandler::Ok) + } RegisterRequest::Shader(spec) => { let spec = spec.try_into()?; api.pipeline().register_renderer(spec)?; @@ -53,34 +53,3 @@ pub fn handle_register_request( } } } - -fn handle_register_output( - api: &mut Api, - request: RegisterOutputRequest, -) -> Result { - let RegisterOutputRequest { - output_id, - port, - ip, - .. - } = request.clone(); - - api.pipeline().with_outputs(|mut iter| { - if let Some((node_id, _)) = iter.find(|(_, output)| match &output.output { - pipeline::output::Output::Rtp(rtp) => rtp.port == port && rtp.ip == ip, - }) { - return Err(ApiError::new( - "PORT_AND_IP_ALREADY_IN_USE", - format!("Failed to register output stream \"{output_id}\". Combination of port {port} and IP {ip} is already used by node \"{node_id}\""), - tiny_http::StatusCode(400) - )); - }; - Ok(()) - })?; - - api.pipeline - .lock() - .unwrap() - .register_output(request.try_into()?)?; - Ok(ResponseHandler::Ok) -} diff --git a/src/types.rs b/src/types.rs index 57d4c8601..e6dcd1bcd 100644 --- a/src/types.rs +++ b/src/types.rs @@ -38,7 +38,6 @@ pub use component::WebView; #[allow(unused_imports)] pub use register_request::Mp4; -pub use register_request::RegisterOutputRequest; pub use register_request::RegisterRequest; #[allow(unused_imports)] pub use register_request::RtpInputStream; diff --git a/src/types/from_register_request.rs b/src/types/from_register_request.rs index 816205a2b..b977992e7 100644 --- a/src/types/from_register_request.rs +++ b/src/types/from_register_request.rs @@ -7,11 +7,10 @@ use compositor_pipeline::{ encoder::{ self, ffmpeg_h264::{self, Options}, - VideoEncoderOptions, }, input, output::{self, rtp::RtpSenderOptions}, - OutputVideoOptions, + rtp, }, queue, }; @@ -68,8 +67,8 @@ impl TryFrom for (compositor_render::InputId, pipeline::Register input_id: input_id.clone().into(), stream: rtp_stream, transport_protocol: match transport_protocol.unwrap_or(TransportProtocol::Udp) { - TransportProtocol::Udp => input::rtp::TransportProtocol::Udp, - TransportProtocol::TcpServer => input::rtp::TransportProtocol::TcpServer, + TransportProtocol::Udp => rtp::TransportProtocol::Udp, + TransportProtocol::TcpServer => rtp::TransportProtocol::TcpServer, }, }); @@ -130,14 +129,14 @@ impl TryFrom for (compositor_render::InputId, pipeline::RegisterInputOption } } -impl TryFrom for pipeline::RequestedPort { +impl TryFrom for rtp::RequestedPort { type Error = TypeError; fn try_from(value: Port) -> Result { const PORT_CONVERSION_ERROR_MESSAGE: &str = "Port needs to be a number between 1 and 65535 or a string in the \"START:END\" format, where START and END represent a range of ports."; match value { Port::U16(0) => Err(TypeError::new(PORT_CONVERSION_ERROR_MESSAGE)), - Port::U16(v) => Ok(pipeline::RequestedPort::Exact(v)), + Port::U16(v) => Ok(rtp::RequestedPort::Exact(v)), Port::String(s) => { let (start, end) = s .split_once(':') @@ -158,7 +157,7 @@ impl TryFrom for pipeline::RequestedPort { return Err(TypeError::new(PORT_CONVERSION_ERROR_MESSAGE)); } - Ok(pipeline::RequestedPort::Range((start, end))) + Ok(rtp::RequestedPort::Range((start, end))) } } } @@ -189,69 +188,80 @@ impl From for audio_mixer::types::AudioChannels { } } -impl TryFrom for output::OutputOptions { +impl TryFrom for pipeline::RegisterOutputOptions { type Error = TypeError; - fn try_from(value: RegisterOutputRequest) -> Result { - let video = match value.video { - Some(v) => Some(OutputVideoOptions { - encoder_opts: VideoEncoderOptions::H264(ffmpeg_h264::Options { - preset: v.encoder_preset.into(), - resolution: v.resolution.into(), - output_id: value.output_id.clone().into(), - }), - initial: v.initial.try_into()?, - }), - None => None, - }; - - let audio = value.audio.map(|a| pipeline::OutputAudioOptions { - initial: a.initial.into(), - channels: a.channels.into(), - forward_error_correction: a.forward_error_correction.unwrap_or(false), - }); - - Ok(output::OutputOptions::Rtp(RtpSenderOptions { - port: value.port, - ip: value.ip, - output_id: value.output_id.clone().into(), + fn try_from(request: RegisterOutputRequest) -> Result { + let RegisterOutputRequest { + output_id, + port, + ip, + transport_protocol, video, audio, - })) - } -} - -impl TryFrom for pipeline::RegisterOutputOptions { - type Error = TypeError; - - fn try_from(value: RegisterOutputRequest) -> Result { - const NO_VIDEO_OR_AUDIO: &str = - "At least one of \"video\" and \"audio\" fields have to be specified."; + } = request; - if value.video.is_none() && value.audio.is_none() { - return Err(TypeError::new(NO_VIDEO_OR_AUDIO)); + if video.is_none() && audio.is_none() { + return Err(TypeError::new( + "At least one of \"video\" and \"audio\" fields have to be specified.", + )); } - let output_options = value.clone().try_into()?; - let video = match value.video { + + let video = match video { Some(v) => Some(pipeline::OutputVideoOptions { initial: v.initial.try_into()?, encoder_opts: pipeline::encoder::VideoEncoderOptions::H264(Options { preset: v.encoder_preset.into(), resolution: v.resolution.into(), - output_id: value.output_id.clone().into(), + output_id: output_id.clone().into(), }), }), None => None, }; - let audio = value.audio.map(|a| pipeline::OutputAudioOptions { + let audio = audio.map(|a| pipeline::OutputAudioOptions { initial: a.initial.into(), channels: a.channels.into(), forward_error_correction: a.forward_error_correction.unwrap_or(false), }); + let connection_options = match transport_protocol.unwrap_or(TransportProtocol::Udp) { + TransportProtocol::Udp => { + let rtp::RequestedPort::Exact(port) = port.try_into()? else { + return Err(TypeError::new( + "Port range can not be used with UDP output stream (transport_protocol=\"udp\").", + )); + }; + let Some(ip) = ip else { + return Err(TypeError::new( + "\"ip\" field is required when registering output UDP stream (transport_protocol=\"udp\").", + )); + }; + output::rtp::RtpConnectionOptions::Udp { + port: pipeline::Port(port), + ip, + } + } + TransportProtocol::TcpServer => { + if ip.is_some() { + return Err(TypeError::new( + "\"ip\" field is not allowed when registering TCP server connection (transport_protocol=\"tcp_server\").", + )); + } + + output::rtp::RtpConnectionOptions::TcpServer { + port: port.try_into()?, + } + } + }; + + let output_options = output::OutputOptions::Rtp(RtpSenderOptions { + output_id: output_id.clone().into(), + connection_options, + }); + Ok(Self { - output_id: value.output_id.into(), + output_id: output_id.into(), output_options, video, audio, @@ -262,16 +272,16 @@ impl TryFrom for pipeline::RegisterOutputOptions { impl From for encoder::ffmpeg_h264::EncoderPreset { fn from(value: EncoderPreset) -> Self { match value { - EncoderPreset::Ultrafast => pipeline::encoder::ffmpeg_h264::EncoderPreset::Ultrafast, - EncoderPreset::Superfast => pipeline::encoder::ffmpeg_h264::EncoderPreset::Superfast, - EncoderPreset::Veryfast => pipeline::encoder::ffmpeg_h264::EncoderPreset::Veryfast, - EncoderPreset::Faster => pipeline::encoder::ffmpeg_h264::EncoderPreset::Faster, - EncoderPreset::Fast => pipeline::encoder::ffmpeg_h264::EncoderPreset::Fast, - EncoderPreset::Medium => pipeline::encoder::ffmpeg_h264::EncoderPreset::Medium, - EncoderPreset::Slow => pipeline::encoder::ffmpeg_h264::EncoderPreset::Slow, - EncoderPreset::Slower => pipeline::encoder::ffmpeg_h264::EncoderPreset::Slower, - EncoderPreset::Veryslow => pipeline::encoder::ffmpeg_h264::EncoderPreset::Veryslow, - EncoderPreset::Placebo => pipeline::encoder::ffmpeg_h264::EncoderPreset::Placebo, + EncoderPreset::Ultrafast => ffmpeg_h264::EncoderPreset::Ultrafast, + EncoderPreset::Superfast => ffmpeg_h264::EncoderPreset::Superfast, + EncoderPreset::Veryfast => ffmpeg_h264::EncoderPreset::Veryfast, + EncoderPreset::Faster => ffmpeg_h264::EncoderPreset::Faster, + EncoderPreset::Fast => ffmpeg_h264::EncoderPreset::Fast, + EncoderPreset::Medium => ffmpeg_h264::EncoderPreset::Medium, + EncoderPreset::Slow => ffmpeg_h264::EncoderPreset::Slow, + EncoderPreset::Slower => ffmpeg_h264::EncoderPreset::Slower, + EncoderPreset::Veryslow => ffmpeg_h264::EncoderPreset::Veryslow, + EncoderPreset::Placebo => ffmpeg_h264::EncoderPreset::Placebo, } } } diff --git a/src/types/register_request.rs b/src/types/register_request.rs index c2d16b29d..82cd6e6ad 100644 --- a/src/types/register_request.rs +++ b/src/types/register_request.rs @@ -109,7 +109,7 @@ pub struct InputRtpAudioOptions { pub enum TransportProtocol { /// UDP protocol. Udp, - /// TCP protocol where LiveCompositor is a server side of the connection. + /// TCP protocol where LiveCompositor is the server side of the connection. TcpServer, } @@ -160,8 +160,9 @@ pub struct OutputAudioOptions { #[serde(deny_unknown_fields)] pub struct RegisterOutputRequest { pub output_id: OutputId, - pub port: u16, - pub ip: Arc, + pub port: Port, + pub ip: Option>, + pub transport_protocol: Option, pub video: Option, pub audio: Option, }