From e056bbdd063f0827f306b6799ac2960e84fa1af9 Mon Sep 17 00:00:00 2001 From: Wojciech Barczynski Date: Mon, 19 Aug 2024 17:28:36 +0200 Subject: [PATCH 1/4] WIP --- compositor_api/src/types.rs | 1 + .../src/types/from_register_output.rs | 64 +++++ compositor_api/src/types/register_output.rs | 10 + compositor_pipeline/src/pipeline/output.rs | 26 ++ .../src/pipeline/output/mp4.rs | 15 +- .../src/pipeline/output/rtmp.rs | 246 ++++++++++++++++++ src/routes/register_request.rs | 8 +- 7 files changed, 354 insertions(+), 16 deletions(-) create mode 100644 compositor_pipeline/src/pipeline/output/rtmp.rs diff --git a/compositor_api/src/types.rs b/compositor_api/src/types.rs index 3c9c4ed84..fd58ff4fb 100644 --- a/compositor_api/src/types.rs +++ b/compositor_api/src/types.rs @@ -35,6 +35,7 @@ pub use component::WebView; pub use register_input::Mp4Input; pub use register_output::Mp4Output; +pub use register_output::RtmpOutput; pub use register_output::RtpOutput; pub use register_input::DeckLink; diff --git a/compositor_api/src/types/from_register_output.rs b/compositor_api/src/types/from_register_output.rs index 0aa92ca48..3881d59bd 100644 --- a/compositor_api/src/types/from_register_output.rs +++ b/compositor_api/src/types/from_register_output.rs @@ -9,6 +9,7 @@ use compositor_pipeline::pipeline::{ output::{ self, mp4::{Mp4AudioTrack, Mp4OutputOptions, Mp4VideoTrack}, + rtmp::{RtmpAudioTrack, RtmpSenderOptions, RtmpVideoTrack}, }, }; @@ -173,6 +174,69 @@ impl TryFrom for pipeline::RegisterOutputOptions for pipeline::RegisterOutputOptions { + type Error = TypeError; + + fn try_from(value: RtmpOutput) -> Result { + let RtmpOutput { url, video, audio } = value; + + if video.is_none() && audio.is_none() { + return Err(TypeError::new( + "At least one of \"video\" and \"audio\" fields have to be specified.", + )); + } + + let rtmp_video = video.as_ref().map(|v| match v.encoder { + VideoEncoderOptions::FfmpegH264 { .. } => RtmpVideoTrack { + width: v.resolution.width as u32, + height: v.resolution.height as u32, + }, + }); + let rtmp_audio = audio.as_ref().map(|a| match &a.encoder { + Mp4AudioEncoderOptions::Aac { channels } => RtmpAudioTrack { + channels: channels.clone().into(), + }, + }); + + let (video_encoder_options, output_video_options) = maybe_video_options(video)?; + let (audio_encoder_options, output_audio_options) = match audio { + Some(OutputMp4AudioOptions { + mixing_strategy, + send_eos_when, + encoder, + initial, + }) => { + let audio_encoder_options: AudioEncoderOptions = encoder.into(); + let output_audio_options = pipeline::OutputAudioOptions { + initial: initial.try_into()?, + end_condition: send_eos_when.unwrap_or_default().try_into()?, + mixing_strategy: mixing_strategy.unwrap_or(MixingStrategy::SumClip).into(), + channels: audio_encoder_options.channels(), + }; + + (Some(audio_encoder_options), Some(output_audio_options)) + } + None => (None, None), + }; + + let output_options = output::OutputOptions { + output_protocol: output::OutputProtocolOptions::Rtmp(RtmpSenderOptions { + url, + video: rtmp_video, + audio: rtmp_audio, + }), + video: video_encoder_options, + audio: audio_encoder_options, + }; + + Ok(Self { + output_options, + video: output_video_options, + audio: output_audio_options, + }) + } +} + fn maybe_video_options( options: Option, ) -> Result< diff --git a/compositor_api/src/types/register_output.rs b/compositor_api/src/types/register_output.rs index aebb1e203..2c9b6ddcb 100644 --- a/compositor_api/src/types/register_output.rs +++ b/compositor_api/src/types/register_output.rs @@ -27,6 +27,16 @@ pub struct RtpOutput { pub audio: Option, } +#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct RtmpOutput { + pub url: String, + /// Video stream configuration. + pub video: Option, + /// Audio stream configuration. + pub audio: Option, +} + #[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)] #[serde(deny_unknown_fields)] pub struct Mp4Output { diff --git a/compositor_pipeline/src/pipeline/output.rs b/compositor_pipeline/src/pipeline/output.rs index f16806217..c0bf1bb63 100644 --- a/compositor_pipeline/src/pipeline/output.rs +++ b/compositor_pipeline/src/pipeline/output.rs @@ -3,6 +3,7 @@ use compositor_render::{ }; use crossbeam_channel::{bounded, Receiver, Sender}; use mp4::{Mp4FileWriter, Mp4OutputOptions}; +use rtmp::RtmpSenderOptions; use crate::{audio_mixer::OutputSamples, error::RegisterOutputError, queue::PipelineEvent}; @@ -15,6 +16,7 @@ use super::{ }; pub mod mp4; +pub mod rtmp; pub mod rtp; /// Options to configure public outputs that can be constructed via REST API @@ -28,6 +30,7 @@ pub struct OutputOptions { #[derive(Debug, Clone)] pub enum OutputProtocolOptions { Rtp(RtpSenderOptions), + Rtmp(RtmpSenderOptions), Mp4(Mp4OutputOptions), } @@ -64,6 +67,10 @@ pub enum Output { sender: RtpSender, encoder: Encoder, }, + Rtmp { + sender: rtmp::RmtpSender, + encoder: Encoder, + }, Mp4 { writer: Mp4FileWriter, encoder: Encoder, @@ -108,6 +115,17 @@ impl OutputOptionsExt> for OutputOptions { Ok((Output::Rtp { sender, encoder }, Some(port))) } + OutputProtocolOptions::Rtmp(rtmp_options) => { + let sender = rtmp::RmtpSender::new( + output_id, + rtmp_options.clone(), + packets, + ctx.output_sample_rate, + ) + .map_err(|e| RegisterOutputError::OutputError(output_id.clone(), e))?; + + Ok((Output::Rtmp { sender, encoder }, None)) + } OutputProtocolOptions::Mp4(mp4_opt) => { let writer = Mp4FileWriter::new( output_id.clone(), @@ -179,6 +197,7 @@ impl Output { pub fn frame_sender(&self) -> Option<&Sender>> { match &self { Output::Rtp { encoder, .. } => encoder.frame_sender(), + Output::Rtmp { encoder, .. } => encoder.frame_sender(), Output::Mp4 { encoder, .. } => encoder.frame_sender(), Output::EncodedData { encoder } => encoder.frame_sender(), Output::RawData { video, .. } => video.as_ref(), @@ -188,6 +207,7 @@ impl Output { pub fn samples_batch_sender(&self) -> Option<&Sender>> { match &self { Output::Rtp { encoder, .. } => encoder.samples_batch_sender(), + Output::Rtmp { encoder, .. } => encoder.samples_batch_sender(), Output::Mp4 { encoder, .. } => encoder.samples_batch_sender(), Output::EncodedData { encoder } => encoder.samples_batch_sender(), Output::RawData { audio, .. } => audio.as_ref(), @@ -197,6 +217,7 @@ impl Output { pub fn resolution(&self) -> Option { match &self { Output::Rtp { encoder, .. } => encoder.video.as_ref().map(|v| v.resolution()), + Output::Rtmp { encoder, .. } => encoder.video.as_ref().map(|v| v.resolution()), Output::Mp4 { encoder, .. } => encoder.video.as_ref().map(|v| v.resolution()), Output::EncodedData { encoder } => encoder.video.as_ref().map(|v| v.resolution()), Output::RawData { resolution, .. } => *resolution, @@ -206,6 +227,7 @@ impl Output { pub fn request_keyframe(&self, output_id: OutputId) -> Result<(), RequestKeyframeError> { let encoder = match &self { Output::Rtp { encoder, .. } => encoder, + Output::Rtmp { encoder, .. } => encoder, Output::Mp4 { encoder, .. } => encoder, Output::EncodedData { encoder } => encoder, Output::RawData { .. } => return Err(RequestKeyframeError::RawOutput(output_id)), @@ -226,6 +248,10 @@ impl Output { .video .as_ref() .map(|_| OutputFrameFormat::PlanarYuv420Bytes), + Output::Rtmp { encoder, .. } => encoder + .video + .as_ref() + .map(|_| OutputFrameFormat::PlanarYuv420Bytes), Output::EncodedData { encoder } => encoder .video .as_ref() diff --git a/compositor_pipeline/src/pipeline/output/mp4.rs b/compositor_pipeline/src/pipeline/output/mp4.rs index 808839464..cdf2eb9e8 100644 --- a/compositor_pipeline/src/pipeline/output/mp4.rs +++ b/compositor_pipeline/src/pipeline/output/mp4.rs @@ -32,15 +32,6 @@ pub struct Mp4AudioTrack { pub channels: AudioChannels, } -pub enum Mp4OutputVideoTrack { - H264 { width: u32, height: u32 }, -} - -pub struct Mp4WriterOptions { - pub output_path: PathBuf, - pub video: Option, -} - pub struct Mp4FileWriter; impl Mp4FileWriter { @@ -95,10 +86,6 @@ fn init_ffmpeg_output( .map(|v| { const VIDEO_TIME_BASE: i32 = 90000; - let codec = match v.codec { - VideoCodec::H264 => ffmpeg::codec::Id::H264, - }; - let mut stream = output_ctx .add_stream(ffmpeg::codec::Id::H264) .map_err(OutputInitError::FfmpegMp4Error)?; @@ -106,7 +93,7 @@ fn init_ffmpeg_output( stream.set_time_base(ffmpeg::Rational::new(1, VIDEO_TIME_BASE)); let codecpar = unsafe { &mut *(*stream.as_mut_ptr()).codecpar }; - codecpar.codec_id = codec.into(); + codecpar.codec_id = ffmpeg::codec::Id::H264.into(); codecpar.codec_type = ffmpeg::ffi::AVMediaType::AVMEDIA_TYPE_VIDEO; codecpar.width = v.width as i32; codecpar.height = v.height as i32; diff --git a/compositor_pipeline/src/pipeline/output/rtmp.rs b/compositor_pipeline/src/pipeline/output/rtmp.rs new file mode 100644 index 000000000..ed2ccb806 --- /dev/null +++ b/compositor_pipeline/src/pipeline/output/rtmp.rs @@ -0,0 +1,246 @@ +use std::ptr; + +use compositor_render::{event_handler::emit_event, OutputId}; +use crossbeam_channel::Receiver; +use ffmpeg_next as ffmpeg; +use tracing::{debug, error}; + +use crate::{ + audio_mixer::AudioChannels, + error::OutputInitError, + event::Event, + pipeline::{EncodedChunk, EncodedChunkKind, EncoderOutputEvent}, +}; + +#[derive(Debug, Clone)] +pub struct RtmpSenderOptions { + pub url: String, + pub video: Option, + pub audio: Option, +} + +#[derive(Debug, Clone)] +pub struct RtmpVideoTrack { + pub width: u32, + pub height: u32, +} + +#[derive(Debug, Clone)] +pub struct RtmpAudioTrack { + pub channels: AudioChannels, +} + +pub struct RmtpSender; + +impl RmtpSender { + pub fn new( + output_id: &OutputId, + options: RtmpSenderOptions, + packets_receiver: Receiver, + sample_rate: u32, + ) -> Result { + let (output_ctx, video_stream, audio_stream) = init_ffmpeg_output(options, sample_rate)?; + + let output_id = output_id.clone(); + std::thread::Builder::new() + .name(format!("RTMP sender thread for output {}", output_id)) + .spawn(move || { + let _span = + tracing::info_span!("RTMP sender writer", output_id = output_id.to_string()) + .entered(); + + run_ffmpeg_output_thread(output_ctx, video_stream, audio_stream, packets_receiver); + emit_event(Event::OutputDone(output_id)); + debug!("Closing RTMP sender thread."); + }) + .unwrap(); + Ok(Self) + } +} + +fn init_ffmpeg_output( + options: RtmpSenderOptions, + sample_rate: u32, +) -> Result< + ( + ffmpeg::format::context::Output, + Option, + Option, + ), + OutputInitError, +> { + let mut output_ctx = + ffmpeg::format::output_as(&options.url, "flv").map_err(OutputInitError::FfmpegMp4Error)?; + + let mut stream_count = 0; + + let video_stream = options + .video + .map(|v| { + const VIDEO_TIME_BASE: i32 = 90000; + + let mut stream = output_ctx + .add_stream(ffmpeg::codec::Id::H264) + .map_err(OutputInitError::FfmpegMp4Error)?; + + stream.set_time_base(ffmpeg::Rational::new(1, VIDEO_TIME_BASE)); + + let codecpar = unsafe { &mut *(*stream.as_mut_ptr()).codecpar }; + codecpar.codec_id = ffmpeg::codec::Id::H264.into(); + codecpar.codec_type = ffmpeg::ffi::AVMediaType::AVMEDIA_TYPE_VIDEO; + codecpar.width = v.width as i32; + codecpar.height = v.height as i32; + + let id = stream_count; + stream_count += 1; + + Ok::(Stream { + id, + time_base: VIDEO_TIME_BASE as f64, + }) + }) + .transpose()?; + + let audio_stream = options + .audio + .map(|a| { + let channels = match a.channels { + AudioChannels::Mono => 1, + AudioChannels::Stereo => 2, + }; + + let mut stream = output_ctx + .add_stream(ffmpeg::codec::Id::AAC) + .map_err(OutputInitError::FfmpegMp4Error)?; + + // If audio time base doesn't match sample rate, ffmpeg muxer produces incorrect timestamps. + stream.set_time_base(ffmpeg::Rational::new(1, sample_rate as i32)); + + let codecpar = unsafe { &mut *(*stream.as_mut_ptr()).codecpar }; + codecpar.codec_id = ffmpeg::codec::Id::AAC.into(); + codecpar.codec_type = ffmpeg::ffi::AVMediaType::AVMEDIA_TYPE_AUDIO; + codecpar.sample_rate = sample_rate as i32; + codecpar.ch_layout = ffmpeg::ffi::AVChannelLayout { + nb_channels: channels, + order: ffmpeg::ffi::AVChannelOrder::AV_CHANNEL_ORDER_UNSPEC, + // This value is ignored when order is AV_CHANNEL_ORDER_UNSPEC + u: ffmpeg::ffi::AVChannelLayout__bindgen_ty_1 { mask: 0 }, + // Field doc: "For some private data of the user." + opaque: ptr::null_mut(), + }; + + let id = stream_count; + stream_count += 1; + + Ok::(Stream { + id, + time_base: sample_rate as f64, + }) + }) + .transpose()?; + + output_ctx + .write_header() + .map_err(OutputInitError::FfmpegMp4Error)?; + + Ok((output_ctx, video_stream, audio_stream)) +} + +fn run_ffmpeg_output_thread( + mut output_ctx: ffmpeg::format::context::Output, + video_stream: Option, + audio_stream: Option, + packets_receiver: Receiver, +) { + let mut received_video_eos = video_stream.as_ref().map(|_| false); + let mut received_audio_eos = audio_stream.as_ref().map(|_| false); + + for packet in packets_receiver { + match packet { + EncoderOutputEvent::Data(chunk) => { + write_chunk(chunk, &video_stream, &audio_stream, &mut output_ctx); + } + EncoderOutputEvent::VideoEOS => match received_video_eos { + Some(false) => received_video_eos = Some(true), + Some(true) => { + error!("Received multiple video EOS events."); + } + None => { + error!("Received video EOS event on non video output."); + } + }, + EncoderOutputEvent::AudioEOS => match received_audio_eos { + Some(false) => received_audio_eos = Some(true), + Some(true) => { + error!("Received multiple audio EOS events."); + } + None => { + error!("Received audio EOS event on non audio output."); + } + }, + }; + + if received_video_eos.unwrap_or(true) && received_audio_eos.unwrap_or(true) { + if let Err(err) = output_ctx.write_trailer() { + error!("Failed to write trailer to RTMP stream: {}.", err); + }; + break; + } + } +} + +fn write_chunk( + chunk: EncodedChunk, + video_stream: &Option, + audio_stream: &Option, + output_ctx: &mut ffmpeg::format::context::Output, +) { + let packet = create_packet(chunk, video_stream, audio_stream); + if let Some(packet) = packet { + if let Err(err) = packet.write(output_ctx) { + error!("Failed to write packet to RTMP stream: {}.", err); + } + } +} + +fn create_packet( + chunk: EncodedChunk, + video_stream: &Option, + audio_stream: &Option, +) -> Option { + let (stream_id, timebase) = match chunk.kind { + EncodedChunkKind::Video(_) => { + match video_stream { + Some(Stream { id, time_base }) => Some((*id, *time_base)), + None => { + error!("Failed to create packet for video chunk. No video stream registered on init."); + None + } + } + } + EncodedChunkKind::Audio(_) => { + match audio_stream { + Some(Stream { id, time_base }) => Some((*id, *time_base)), + None => { + error!("Failed to create packet for audio chunk. No audio stream registered on init."); + None + } + } + } + }?; + + let mut packet = ffmpeg::Packet::copy(&chunk.data); + packet.set_pts(Some((chunk.pts.as_secs_f64() * timebase) as i64)); + let dts = chunk.dts.unwrap_or(chunk.pts); + packet.set_dts(Some((dts.as_secs_f64() * timebase) as i64)); + packet.set_time_base(ffmpeg::Rational::new(1, timebase as i32)); + packet.set_stream(stream_id); + + Some(packet) +} + +#[derive(Debug, Clone)] +struct Stream { + id: usize, + time_base: f64, +} diff --git a/src/routes/register_request.rs b/src/routes/register_request.rs index 1d4683f27..386c62cf0 100644 --- a/src/routes/register_request.rs +++ b/src/routes/register_request.rs @@ -10,8 +10,8 @@ use crate::{ use compositor_api::{ error::ApiError, types::{ - DeckLink, ImageSpec, InputId, Mp4Input, Mp4Output, OutputId, RendererId, RtpInput, - RtpOutput, ShaderSpec, WebRendererSpec, + DeckLink, ImageSpec, InputId, Mp4Input, Mp4Output, OutputId, RendererId, RtmpOutput, + RtpInput, RtpOutput, ShaderSpec, WebRendererSpec, }, }; @@ -30,6 +30,7 @@ pub enum RegisterInput { #[serde(tag = "type", rename_all = "snake_case")] pub enum RegisterOutput { RtpStream(RtpOutput), + RtmpStream(RtmpOutput), Mp4(Mp4Output), } @@ -75,6 +76,9 @@ pub(super) async fn handle_output( RegisterOutput::Mp4(mp4) => { Pipeline::register_output(&mut api.pipeline(), output_id.into(), mp4.try_into()?)? } + RegisterOutput::RtmpStream(rtmp) => { + Pipeline::register_output(&mut api.pipeline(), output_id.into(), rtmp.try_into()?)? + } }; match response { Some(Port(port)) => Ok(Response::RegisteredPort { port }), From 53a381f102737dfb163b44575776fd0700930fd7 Mon Sep 17 00:00:00 2001 From: Wojciech Barczynski Date: Wed, 21 Aug 2024 15:36:39 +0200 Subject: [PATCH 2/4] Fix RTMP time bases --- .../src/pipeline/output/rtmp.rs | 27 ++++++------------- 1 file changed, 8 insertions(+), 19 deletions(-) diff --git a/compositor_pipeline/src/pipeline/output/rtmp.rs b/compositor_pipeline/src/pipeline/output/rtmp.rs index ed2ccb806..d7d3b6b9b 100644 --- a/compositor_pipeline/src/pipeline/output/rtmp.rs +++ b/compositor_pipeline/src/pipeline/output/rtmp.rs @@ -77,14 +77,10 @@ fn init_ffmpeg_output( let video_stream = options .video .map(|v| { - const VIDEO_TIME_BASE: i32 = 90000; - let mut stream = output_ctx .add_stream(ffmpeg::codec::Id::H264) .map_err(OutputInitError::FfmpegMp4Error)?; - stream.set_time_base(ffmpeg::Rational::new(1, VIDEO_TIME_BASE)); - let codecpar = unsafe { &mut *(*stream.as_mut_ptr()).codecpar }; codecpar.codec_id = ffmpeg::codec::Id::H264.into(); codecpar.codec_type = ffmpeg::ffi::AVMediaType::AVMEDIA_TYPE_VIDEO; @@ -94,10 +90,7 @@ fn init_ffmpeg_output( let id = stream_count; stream_count += 1; - Ok::(Stream { - id, - time_base: VIDEO_TIME_BASE as f64, - }) + Ok::(Stream { id }) }) .transpose()?; @@ -132,10 +125,7 @@ fn init_ffmpeg_output( let id = stream_count; stream_count += 1; - Ok::(Stream { - id, - time_base: sample_rate as f64, - }) + Ok::(Stream { id }) }) .transpose()?; @@ -208,10 +198,10 @@ fn create_packet( video_stream: &Option, audio_stream: &Option, ) -> Option { - let (stream_id, timebase) = match chunk.kind { + let stream_id = match chunk.kind { EncodedChunkKind::Video(_) => { match video_stream { - Some(Stream { id, time_base }) => Some((*id, *time_base)), + Some(Stream { id }) => Some(*id), None => { error!("Failed to create packet for video chunk. No video stream registered on init."); None @@ -220,7 +210,7 @@ fn create_packet( } EncodedChunkKind::Audio(_) => { match audio_stream { - Some(Stream { id, time_base }) => Some((*id, *time_base)), + Some(Stream { id }) => Some(*id), None => { error!("Failed to create packet for audio chunk. No audio stream registered on init."); None @@ -230,10 +220,10 @@ fn create_packet( }?; let mut packet = ffmpeg::Packet::copy(&chunk.data); - packet.set_pts(Some((chunk.pts.as_secs_f64() * timebase) as i64)); + packet.set_pts(Some((chunk.pts.as_secs_f64() * 1000.0) as i64)); let dts = chunk.dts.unwrap_or(chunk.pts); - packet.set_dts(Some((dts.as_secs_f64() * timebase) as i64)); - packet.set_time_base(ffmpeg::Rational::new(1, timebase as i32)); + packet.set_dts(Some((dts.as_secs_f64() * 1000.0) as i64)); + packet.set_time_base(ffmpeg::Rational::new(1, 1000)); packet.set_stream(stream_id); Some(packet) @@ -242,5 +232,4 @@ fn create_packet( #[derive(Debug, Clone)] struct Stream { id: usize, - time_base: f64, } From fa7ef022dfa0135d5927e40796b3fe1302f6a88d Mon Sep 17 00:00:00 2001 From: Wojciech Barczynski Date: Fri, 6 Sep 2024 16:23:46 +0200 Subject: [PATCH 3/4] Add rtmp example --- .../src/pipeline/output/rtmp.rs | 60 ++++++++----- integration_tests/examples/rtmp.rs | 86 +++++++++++++++++++ 2 files changed, 124 insertions(+), 22 deletions(-) create mode 100644 integration_tests/examples/rtmp.rs diff --git a/compositor_pipeline/src/pipeline/output/rtmp.rs b/compositor_pipeline/src/pipeline/output/rtmp.rs index d7d3b6b9b..5ea4556ec 100644 --- a/compositor_pipeline/src/pipeline/output/rtmp.rs +++ b/compositor_pipeline/src/pipeline/output/rtmp.rs @@ -1,4 +1,4 @@ -use std::ptr; +use std::{ptr, time::Duration}; use compositor_render::{event_handler::emit_event, OutputId}; use crossbeam_channel::Receiver; @@ -64,8 +64,8 @@ fn init_ffmpeg_output( ) -> Result< ( ffmpeg::format::context::Output, - Option, - Option, + Option, + Option, ), OutputInitError, > { @@ -90,7 +90,10 @@ fn init_ffmpeg_output( let id = stream_count; stream_count += 1; - Ok::(Stream { id }) + Ok::(StreamState { + id, + timestamp_offset: None, + }) }) .transpose()?; @@ -125,7 +128,10 @@ fn init_ffmpeg_output( let id = stream_count; stream_count += 1; - Ok::(Stream { id }) + Ok::(StreamState { + id, + timestamp_offset: None, + }) }) .transpose()?; @@ -138,8 +144,8 @@ fn init_ffmpeg_output( fn run_ffmpeg_output_thread( mut output_ctx: ffmpeg::format::context::Output, - video_stream: Option, - audio_stream: Option, + mut video_stream: Option, + mut audio_stream: Option, packets_receiver: Receiver, ) { let mut received_video_eos = video_stream.as_ref().map(|_| false); @@ -148,7 +154,7 @@ fn run_ffmpeg_output_thread( for packet in packets_receiver { match packet { EncoderOutputEvent::Data(chunk) => { - write_chunk(chunk, &video_stream, &audio_stream, &mut output_ctx); + write_chunk(chunk, &mut video_stream, &mut audio_stream, &mut output_ctx); } EncoderOutputEvent::VideoEOS => match received_video_eos { Some(false) => received_video_eos = Some(true), @@ -181,8 +187,8 @@ fn run_ffmpeg_output_thread( fn write_chunk( chunk: EncodedChunk, - video_stream: &Option, - audio_stream: &Option, + video_stream: &mut Option, + audio_stream: &mut Option, output_ctx: &mut ffmpeg::format::context::Output, ) { let packet = create_packet(chunk, video_stream, audio_stream); @@ -195,13 +201,13 @@ fn write_chunk( fn create_packet( chunk: EncodedChunk, - video_stream: &Option, - audio_stream: &Option, + video_stream: &mut Option, + audio_stream: &mut Option, ) -> Option { - let stream_id = match chunk.kind { + let stream_state = match chunk.kind { EncodedChunkKind::Video(_) => { - match video_stream { - Some(Stream { id }) => Some(*id), + match video_stream.as_mut() { + Some(stream_state) => Some(stream_state), None => { error!("Failed to create packet for video chunk. No video stream registered on init."); None @@ -209,8 +215,8 @@ fn create_packet( } } EncodedChunkKind::Audio(_) => { - match audio_stream { - Some(Stream { id }) => Some(*id), + match audio_stream.as_mut() { + Some(stream_state) => Some(stream_state), None => { error!("Failed to create packet for audio chunk. No audio stream registered on init."); None @@ -218,18 +224,28 @@ fn create_packet( } } }?; + let timestamp_offset = stream_state.timestamp_offset(&chunk); + let pts = chunk.pts - timestamp_offset; + // let dts = chunk.dts.map(|dts| dts - timestamp_offset).unwrap_or(pts); + let dts = chunk.dts.map(|dts| dts - timestamp_offset); let mut packet = ffmpeg::Packet::copy(&chunk.data); - packet.set_pts(Some((chunk.pts.as_secs_f64() * 1000.0) as i64)); - let dts = chunk.dts.unwrap_or(chunk.pts); - packet.set_dts(Some((dts.as_secs_f64() * 1000.0) as i64)); + packet.set_pts(Some((pts.as_secs_f64() * 1000.0) as i64)); + packet.set_dts(dts.map(|dts| (dts.as_secs_f64() * 1000.0) as i64)); packet.set_time_base(ffmpeg::Rational::new(1, 1000)); - packet.set_stream(stream_id); + packet.set_stream(stream_state.id); Some(packet) } #[derive(Debug, Clone)] -struct Stream { +struct StreamState { id: usize, + timestamp_offset: Option, +} + +impl StreamState { + fn timestamp_offset(&mut self, chunk: &EncodedChunk) -> Duration { + *self.timestamp_offset.get_or_insert(chunk.pts) + } } diff --git a/integration_tests/examples/rtmp.rs b/integration_tests/examples/rtmp.rs new file mode 100644 index 000000000..a5926ed54 --- /dev/null +++ b/integration_tests/examples/rtmp.rs @@ -0,0 +1,86 @@ +use anyhow::Result; +use compositor_api::types::Resolution; +use serde_json::json; +use std::{process::Command, time::Duration}; + +use integration_tests::examples::{self, run_example}; + +const BUNNY_URL: &str = + "https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4"; + +const VIDEO_RESOLUTION: Resolution = Resolution { + width: 1280, + height: 720, +}; + +const OUTPUT_URL: &str = "rtmp://0.0.0.0:8002/liveapp/stream"; + +fn main() { + run_example(client_code); +} + +fn client_code() -> Result<()> { + Command::new("ffplay") + .args(["-listen", "1", OUTPUT_URL]) + .spawn()?; + + examples::post( + "input/input_1/register", + &json!({ + "type": "mp4", + "url": BUNNY_URL, + "required": true, + "offset_ms": 0, + }), + )?; + + let shader_source = include_str!("./silly.wgsl"); + examples::post( + "shader/shader_example_1/register", + &json!({ + "source": shader_source, + }), + )?; + + examples::post( + "output/output_1/register", + &json!({ + "type": "rtmp_stream", + "url": OUTPUT_URL, + "video": { + "resolution": { + "width": VIDEO_RESOLUTION.width, + "height": VIDEO_RESOLUTION.height, + }, + "encoder": { + "type": "ffmpeg_h264", + "preset": "ultrafast" + }, + "initial": { + "root": { + "id": "input_1", + "type": "input_stream", + "input_id": "input_1", + } + } + }, + "audio": { + "encoder": { + "type": "aac", + "channels": "stereo" + }, + "initial": { + "inputs": [ + {"input_id": "input_1"} + ] + } + } + }), + )?; + + std::thread::sleep(Duration::from_millis(500)); + + examples::post("start", &json!({}))?; + + Ok(()) +} From 7bfbf5ada8c65a47de6750a3188887c8d8e9a4a8 Mon Sep 17 00:00:00 2001 From: Wojciech Barczynski Date: Tue, 17 Sep 2024 11:34:09 +0200 Subject: [PATCH 4/4] Update example --- integration_tests/examples/rtmp.rs | 38 ++++++++++++++++++++++++++---- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/integration_tests/examples/rtmp.rs b/integration_tests/examples/rtmp.rs index a5926ed54..c13b79737 100644 --- a/integration_tests/examples/rtmp.rs +++ b/integration_tests/examples/rtmp.rs @@ -13,7 +13,7 @@ const VIDEO_RESOLUTION: Resolution = Resolution { height: 720, }; -const OUTPUT_URL: &str = "rtmp://0.0.0.0:8002/liveapp/stream"; +const OUTPUT_URL: &str = "rtmp://a.rtmp.youtube.com/live2/appkey"; fn main() { run_example(client_code); @@ -58,9 +58,39 @@ fn client_code() -> Result<()> { }, "initial": { "root": { - "id": "input_1", - "type": "input_stream", - "input_id": "input_1", + "type": "view", + "children": [ + { + "type": "rescaler", + "width": VIDEO_RESOLUTION.width, + "height": VIDEO_RESOLUTION.height, + "top": 0, + "left": 0, + "child": { + "type": "input_stream", + "input_id": "input_1", + } + }, + { + "type": "view", + "bottom": 0, + "left": 0, + "width": VIDEO_RESOLUTION.width, + "height": 100, + "background_color_rgba": "#00000088", + "children": [ + { "type": "view" }, + { + "type": "text", + "text": "LiveCompositor 😃😍", + "font_size": 80, + "color_rgba": "#40E0D0FF", + "weight": "bold", + }, + { "type": "view" } + ] + } + ] } } },