Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Video MP4 output #646

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions compositor_api/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ pub use component::View;
pub use component::WebView;

pub use register_input::Mp4;
pub use register_output::Mp4Output;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is a user-facing value, use Mp4 too. If it not user-facing, I would rename input mp4 for consistency

pub use register_output::RtpOutputStream;

pub use register_input::DeckLink;
Expand Down
173 changes: 128 additions & 45 deletions compositor_api/src/types/from_register_output.rs
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll refactor types once I know what is needed for AAC decoding.

Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use compositor_pipeline::pipeline::{
self,
ffmpeg_h264::{self},
},
output,
output::{
self,
mp4::{Mp4AudioTrack, Mp4OutputOptions, Mp4VideoTrack},
},
};

use super::register_output::*;
Expand All @@ -22,7 +25,126 @@ impl TryFrom<RtpOutputStream> for pipeline::RegisterOutputOptions<output::Output
video,
audio,
} = request;
let video_codec = video.as_ref().map(|v| match v.encoder {
VideoEncoderOptions::FfmpegH264 { .. } => pipeline::VideoCodec::H264,
});
let audio_codec = audio.as_ref().map(|a| match a.encoder {
AudioEncoderOptions::Opus { .. } => pipeline::AudioCodec::Opus,
});

let ConvertedOptions {
video_encoder_options,
video_options,
audio_encoder_options,
audio_options,
} = (video, audio).try_into()?;

let connection_options = match transport_protocol.unwrap_or(TransportProtocol::Udp) {
TransportProtocol::Udp => {
let pipeline::rtp::RequestedPort::Exact(port) = port.try_into()? else {
return Err(TypeError::new(
"Port range can not be used with UDP output stream (transport_protocol=\"udp\").",
));
};
let Some(ip) = ip else {
return Err(TypeError::new(
"\"ip\" field is required when registering output UDP stream (transport_protocol=\"udp\").",
));
};
output::rtp::RtpConnectionOptions::Udp {
port: pipeline::Port(port),
ip,
}
}
TransportProtocol::TcpServer => {
if ip.is_some() {
return Err(TypeError::new(
"\"ip\" field is not allowed when registering TCP server connection (transport_protocol=\"tcp_server\").",
));
}

output::rtp::RtpConnectionOptions::TcpServer {
port: port.try_into()?,
}
}
};

let output_options = output::OutputOptions {
output_protocol: output::OutputProtocolOptions::Rtp(output::rtp::RtpSenderOptions {
connection_options,
video: video_codec,
audio: audio_codec,
}),
video: video_encoder_options,
audio: audio_encoder_options,
};

Ok(Self {
output_options,
video: video_options,
audio: audio_options,
})
}
}

impl TryFrom<Mp4Output> for pipeline::RegisterOutputOptions<output::OutputOptions> {
type Error = TypeError;

fn try_from(request: Mp4Output) -> Result<Self, Self::Error> {
let Mp4Output { path, video, audio } = request;

let mp4_video = video.as_ref().map(|v| match v.encoder {
VideoEncoderOptions::FfmpegH264 { .. } => Mp4VideoTrack {
codec: pipeline::VideoCodec::H264,
width: v.resolution.width as u32,
height: v.resolution.height as u32,
},
});
let mp4_audio = audio.as_ref().map(|a| match a.encoder {
AudioEncoderOptions::Opus { .. } => Mp4AudioTrack {
codec: pipeline::AudioCodec::Opus,
},
});

let ConvertedOptions {
video_encoder_options,
video_options,
audio_encoder_options,
audio_options,
} = (video, audio).try_into()?;

let output_options = output::OutputOptions {
output_protocol: output::OutputProtocolOptions::Mp4(Mp4OutputOptions {
output_path: path.into(),
video: mp4_video,
audio: mp4_audio,
}),
video: video_encoder_options,
audio: audio_encoder_options,
};

Ok(Self {
output_options,
video: video_options,
audio: audio_options,
})
}
}

struct ConvertedOptions {
video_encoder_options: Option<pipeline::encoder::VideoEncoderOptions>,
video_options: Option<pipeline::OutputVideoOptions>,
audio_encoder_options: Option<pipeline::encoder::AudioEncoderOptions>,
audio_options: Option<pipeline::OutputAudioOptions>,
}

impl TryFrom<(Option<OutputVideoOptions>, Option<OutputAudioOptions>)> for ConvertedOptions {
type Error = TypeError;

fn try_from(
value: (Option<OutputVideoOptions>, Option<OutputAudioOptions>),
) -> Result<Self, Self::Error> {
let (video, audio) = value;
if video.is_none() && audio.is_none() {
return Err(TypeError::new(
"At least one of \"video\" and \"audio\" fields have to be specified.",
Expand Down Expand Up @@ -84,50 +206,11 @@ impl TryFrom<RtpOutputStream> for pipeline::RegisterOutputOptions<output::Output
None => (None, None),
};

let connection_options = match transport_protocol.unwrap_or(TransportProtocol::Udp) {
TransportProtocol::Udp => {
let pipeline::rtp::RequestedPort::Exact(port) = port.try_into()? else {
return Err(TypeError::new(
"Port range can not be used with UDP output stream (transport_protocol=\"udp\").",
));
};
let Some(ip) = ip else {
return Err(TypeError::new(
"\"ip\" field is required when registering output UDP stream (transport_protocol=\"udp\").",
));
};
output::rtp::RtpConnectionOptions::Udp {
port: pipeline::Port(port),
ip,
}
}
TransportProtocol::TcpServer => {
if ip.is_some() {
return Err(TypeError::new(
"\"ip\" field is not allowed when registering TCP server connection (transport_protocol=\"tcp_server\").",
));
}

output::rtp::RtpConnectionOptions::TcpServer {
port: port.try_into()?,
}
}
};

let output_options = output::OutputOptions {
output_protocol: output::OutputProtocolOptions::Rtp(output::rtp::RtpSenderOptions {
connection_options,
video: video.map(|_| pipeline::VideoCodec::H264),
audio: audio.map(|_| pipeline::AudioCodec::Opus),
}),
video: video_encoder_options,
audio: audio_encoder_options,
};

Ok(Self {
output_options,
video: video_options,
audio: audio_options,
Ok(ConvertedOptions {
video_encoder_options,
video_options,
audio_encoder_options,
audio_options,
})
}
}
Expand Down
16 changes: 12 additions & 4 deletions compositor_api/src/types/register_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,21 @@ pub struct RtpOutputStream {
pub ip: Option<Arc<str>>,
/// (**default=`"udp"`**) Transport layer protocol that will be used to send RTP packets.
pub transport_protocol: Option<TransportProtocol>,
pub video: Option<OutputRtpVideoOptions>,
pub audio: Option<OutputRtpAudioOptions>,
pub video: Option<OutputVideoOptions>,
pub audio: Option<OutputAudioOptions>,
}

#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct OutputRtpVideoOptions {
pub struct Mp4Output {
pub path: String,
pub video: Option<OutputVideoOptions>,
pub audio: Option<OutputAudioOptions>,
}

#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct OutputVideoOptions {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't unify those values, after you add AAC here it will be incorrect for RTP. Start by duplicating and unifying when you know you can support unified interface everywhere.

Copy link
Member Author

@WojciechBarczynski WojciechBarczynski Jul 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. After implementing AAC I want to add support from AAC audio output in both: RTP and MP4
  2. I purposefully left types refactoring for later. AAC encoder has a lot of options and I need to test which of them should be available.
  3. Since writing opus audio into MP4 via FFmpeg didn't work and I couldn't easily debug it, I want to implement AAC encoding first, then debug what's wrong / test if it works with AAC, and modify types later on (maybe writing opus into MP4 won't be available).

/// Output resolution in pixels.
pub resolution: Resolution,
/// Defines when output stream should end if some of the input streams are finished. If output includes both audio and video streams, then EOS needs to be sent on both.
Expand All @@ -39,7 +47,7 @@ pub struct OutputRtpVideoOptions {

#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[serde(deny_unknown_fields)]
pub struct OutputRtpAudioOptions {
pub struct OutputAudioOptions {
/// (**default="sum_clip"**) Specifies how audio should be mixed.
pub mixing_strategy: Option<MixingStrategy>,
/// Condition for termination of output stream based on the input streams states.
Expand Down
6 changes: 6 additions & 0 deletions compositor_pipeline/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ pub enum OutputInitError {

#[error("Failed to register output. All ports in range {lower_bound} to {upper_bound} are already used or not available.")]
AllPortsAlreadyInUse { lower_bound: u16, upper_bound: u16 },

#[error("Path {path} already exist. Can't create a new mp4 file under that path.")]
Mp4PathExist { path: String },

#[error("Failed to register output. FFmpeg error: {0}.")]
FfmpegMp4Error(ffmpeg_next::Error),
}

#[derive(Debug, thiserror::Error)]
Expand Down
2 changes: 1 addition & 1 deletion compositor_pipeline/src/pipeline/encoder/opus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ fn run_encoder_thread(
let chunk = EncodedChunk {
data,
pts: batch.start_pts,
dts: None,
dts: Some(batch.start_pts),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

kind: EncodedChunkKind::Audio(AudioCodec::Opus),
};

Expand Down
39 changes: 30 additions & 9 deletions compositor_pipeline/src/pipeline/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use compositor_render::{
error::RequestKeyframeError, Frame, OutputFrameFormat, OutputId, Resolution,
};
use crossbeam_channel::{bounded, Receiver, Sender};
use mp4::{Mp4FileWriter, Mp4OutputOptions};

use crate::{audio_mixer::OutputSamples, error::RegisterOutputError, queue::PipelineEvent};

Expand All @@ -13,6 +14,7 @@ use super::{
PipelineCtx, Port, RawDataReceiver,
};

pub mod mp4;
pub mod rtp;

/// Options to configure public outputs that can be constructed via REST API
Expand All @@ -26,6 +28,7 @@ pub struct OutputOptions {
#[derive(Debug, Clone)]
pub enum OutputProtocolOptions {
Rtp(RtpSenderOptions),
Mp4(Mp4OutputOptions),
}

/// Options to configure output that sends h264 and opus audio via channel
Expand Down Expand Up @@ -61,6 +64,10 @@ pub enum Output {
sender: RtpSender,
encoder: Encoder,
},
Mp4 {
writer: Mp4FileWriter,
encoder: Encoder,
},
EncodedData {
encoder: Encoder,
},
Expand Down Expand Up @@ -99,7 +106,13 @@ impl OutputOptionsExt<Option<Port>> for OutputOptions {
rtp::RtpSender::new(output_id, rtp_options.clone(), packets)
.map_err(|e| RegisterOutputError::OutputError(output_id.clone(), e))?;

Ok((Output::Rtp { sender, encoder }, port))
Ok((Output::Rtp { sender, encoder }, Some(port)))
}
OutputProtocolOptions::Mp4(mp4_opt) => {
let writer = Mp4FileWriter::new(output_id, mp4_opt.clone(), packets)
.map_err(|e| RegisterOutputError::OutputError(output_id.clone(), e))?;

Ok((Output::Mp4 { writer, encoder }, None))
}
}
}
Expand Down Expand Up @@ -160,32 +173,36 @@ impl OutputOptionsExt<RawDataReceiver> for RawDataOutputOptions {
impl Output {
pub fn frame_sender(&self) -> Option<&Sender<PipelineEvent<Frame>>> {
match &self {
Output::Rtp { encoder, .. } => encoder.frame_sender(),
Output::EncodedData { encoder } => encoder.frame_sender(),
Output::Rtp { encoder, .. }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

personal opinion(here and below), but I think is less readable than the previous version and very annoying when you need to change anything

| Output::Mp4 { encoder, .. }
| Output::EncodedData { encoder } => encoder.frame_sender(),
Output::RawData { video, .. } => video.as_ref(),
}
}

pub fn samples_batch_sender(&self) -> Option<&Sender<PipelineEvent<OutputSamples>>> {
match &self {
Output::Rtp { encoder, .. } => encoder.samples_batch_sender(),
Output::EncodedData { encoder } => encoder.samples_batch_sender(),
Output::Rtp { encoder, .. }
| Output::Mp4 { encoder, .. }
| Output::EncodedData { encoder } => encoder.samples_batch_sender(),
Output::RawData { audio, .. } => audio.as_ref(),
}
}

pub fn resolution(&self) -> Option<Resolution> {
match &self {
Output::Rtp { encoder, .. } => encoder.video.as_ref().map(|v| v.resolution()),
Output::EncodedData { encoder } => encoder.video.as_ref().map(|v| v.resolution()),
Output::Rtp { encoder, .. }
| Output::Mp4 { encoder, .. }
| Output::EncodedData { encoder } => encoder.video.as_ref().map(|v| v.resolution()),
Output::RawData { resolution, .. } => *resolution,
}
}

pub fn request_keyframe(&self, output_id: OutputId) -> Result<(), RequestKeyframeError> {
let encoder = match &self {
Output::Rtp { encoder, .. } => encoder,
Output::EncodedData { encoder } => encoder,
Output::Rtp { encoder, .. }
| Output::Mp4 { encoder, .. }
| Output::EncodedData { encoder } => encoder,
Output::RawData { .. } => return Err(RequestKeyframeError::RawOutput(output_id)),
};

Expand All @@ -211,6 +228,10 @@ impl Output {
Output::RawData { video, .. } => {
video.as_ref().map(|_| OutputFrameFormat::RgbaWgpuTexture)
}
Output::Mp4 { encoder, .. } => encoder
.video
.as_ref()
.map(|_| OutputFrameFormat::PlanarYuv420Bytes),
}
}
}
Loading