Skip to content

Commit

Permalink
Handle it better
Browse files Browse the repository at this point in the history
  • Loading branch information
noituri committed Oct 19, 2024
1 parent 937480a commit 8d01d72
Show file tree
Hide file tree
Showing 7 changed files with 15 additions and 91 deletions.
14 changes: 10 additions & 4 deletions compositor_pipeline/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,12 @@ impl Pipeline {
.get(&output_id)
.ok_or_else(|| UpdateSceneError::OutputNotRegistered(output_id.clone()))?;

if output.output.is_video_finished() {
return Err(UpdateSceneError::UpdateAfterEOS(output_id));
if let Some(cond) = &output.video_end_condition {
if cond.did_output_end() {
return Err(UpdateSceneError::UpdateAfterEOS(output_id.clone()));
}
}

let (Some(resolution), Some(frame_format)) = (
output.output.resolution(),
output.output.output_frame_format(),
Expand All @@ -404,8 +407,11 @@ impl Pipeline {
.outputs
.get(output_id)
.ok_or_else(|| UpdateSceneError::OutputNotRegistered(output_id.clone()))?;
if output.output.is_audio_finished() {
return Err(UpdateSceneError::UpdateAfterEOS(output_id.clone()));

if let Some(cond) = &output.audio_end_condition {
if cond.did_output_end() {
return Err(UpdateSceneError::UpdateAfterEOS(output_id.clone()));
}
}

info!(?output_id, "Update audio mixer {:#?}", audio);
Expand Down
13 changes: 0 additions & 13 deletions compositor_pipeline/src/pipeline/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,6 @@ impl VideoEncoder {
Self::H264(encoder) => encoder.request_keyframe(),
}
}

pub fn is_finished(&self) -> bool {
match self {
Self::H264(encoder) => encoder.is_finished(),
}
}
}

impl AudioEncoder {
Expand All @@ -174,13 +168,6 @@ impl AudioEncoder {
Self::Aac(encoder) => encoder.samples_batch_sender(),
}
}

pub fn is_finished(&self) -> bool {
match self {
AudioEncoder::Opus(encoder) => encoder.is_finished(),
AudioEncoder::Aac(encoder) => encoder.is_finished(),
}
}
}

impl AudioEncoderOptions {
Expand Down
13 changes: 0 additions & 13 deletions compositor_pipeline/src/pipeline/encoder/fdk_aac.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ use std::{
mem::{self, MaybeUninit},
os::raw::{c_int, c_void},
ptr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};

Expand All @@ -27,7 +23,6 @@ use crate::{
/// https://github.com/mstorsjo/fdk-aac/blob/master/documentation/aacEncoder.pdf
pub struct AacEncoder {
samples_batch_sender: Sender<PipelineEvent<OutputSamples>>,
is_finished: Arc<AtomicBool>,
}

#[derive(Debug, Clone)]
Expand All @@ -43,12 +38,10 @@ impl AacEncoder {
packets_sender: Sender<EncoderOutputEvent>,
) -> Result<Self, EncoderInitError> {
let (samples_batch_sender, samples_batch_receiver) = bounded(5);
let is_finished = Arc::new(AtomicBool::new(false));

// Since AAC encoder holds ref to internal structure (handler), it's unsafe to send it between threads.
let (init_result_sender, init_result_receiver) = bounded(0);
let output_id = output_id.to_string();
let is_finished_clone = is_finished.clone();

std::thread::Builder::new()
.name("AAC encoder thread".to_string())
Expand All @@ -62,7 +55,6 @@ impl AacEncoder {
samples_batch_receiver,
packets_sender,
);
is_finished_clone.store(true, Ordering::Relaxed);
debug!("Closing AAC encoder thread.");
})
.unwrap();
Expand All @@ -74,17 +66,12 @@ impl AacEncoder {

Ok(Self {
samples_batch_sender,
is_finished,
})
}

pub fn samples_batch_sender(&self) -> &Sender<PipelineEvent<OutputSamples>> {
&self.samples_batch_sender
}

pub fn is_finished(&self) -> bool {
self.is_finished.load(Ordering::Relaxed)
}
}

struct AacEncoderInner {
Expand Down
18 changes: 1 addition & 17 deletions compositor_pipeline/src/pipeline/encoder/ffmpeg_h264.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
time::Duration,
};
use std::time::Duration;

use compositor_render::{Frame, FrameData, OutputId, Resolution};
use crossbeam_channel::{Receiver, Sender};
Expand Down Expand Up @@ -95,7 +89,6 @@ pub struct LibavH264Encoder {
resolution: Resolution,
frame_sender: Sender<PipelineEvent<Frame>>,
keyframe_req_sender: Sender<()>,
is_finished: Arc<AtomicBool>,
}

impl LibavH264Encoder {
Expand All @@ -107,11 +100,9 @@ impl LibavH264Encoder {
let (frame_sender, frame_receiver) = crossbeam_channel::bounded(5);
let (result_sender, result_receiver) = crossbeam_channel::bounded(0);
let (keyframe_req_sender, keyframe_req_receiver) = crossbeam_channel::unbounded();
let is_finished = Arc::new(AtomicBool::new(false));

let options_clone = options.clone();
let output_id = output_id.clone();
let is_finished_clone = is_finished.clone();

std::thread::Builder::new()
.name(format!("Encoder thread for output {}", output_id))
Expand All @@ -130,8 +121,6 @@ impl LibavH264Encoder {
&result_sender,
);

is_finished_clone.store(true, Ordering::Relaxed);

if let Err(err) = encoder_result {
warn!(%err, "Encoder thread finished with an error.");
if let Err(err) = result_sender.send(Err(err)) {
Expand All @@ -148,7 +137,6 @@ impl LibavH264Encoder {
frame_sender,
resolution: options.resolution,
keyframe_req_sender,
is_finished,
})
}

Expand All @@ -165,10 +153,6 @@ impl LibavH264Encoder {
debug!(%err, "Failed to send keyframe request to the encoder.");
}
}

pub fn is_finished(&self) -> bool {
self.is_finished.load(Ordering::Relaxed)
}
}

fn run_encoder_thread(
Expand Down
14 changes: 0 additions & 14 deletions compositor_pipeline/src/pipeline/encoder/opus.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,3 @@
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};

use crossbeam_channel::{bounded, Receiver, Sender};
use log::error;
use tracing::{span, trace, warn, Level};
Expand All @@ -27,7 +22,6 @@ pub struct OpusEncoderOptions {

pub struct OpusEncoder {
samples_batch_sender: Sender<PipelineEvent<OutputSamples>>,
is_finished: Arc<AtomicBool>,
}

impl OpusEncoder {
Expand All @@ -37,34 +31,26 @@ impl OpusEncoder {
packets_sender: Sender<EncoderOutputEvent>,
) -> Result<Self, EncoderInitError> {
let (samples_batch_sender, samples_batch_receiver) = bounded(2);
let is_finished = Arc::new(AtomicBool::new(false));

let encoder =
opus::Encoder::new(sample_rate, options.channels.into(), options.preset.into())?;
let is_finished_clone = is_finished.clone();

std::thread::Builder::new()
.name("Opus encoder thread".to_string())
.spawn(move || {
let _span = span!(Level::INFO, "Opus encoder thread",).entered();
run_encoder_thread(encoder, samples_batch_receiver, packets_sender);
is_finished_clone.store(true, Ordering::Relaxed);
})
.unwrap();

Ok(Self {
samples_batch_sender,
is_finished,
})
}

pub fn samples_batch_sender(&self) -> &Sender<PipelineEvent<OutputSamples>> {
&self.samples_batch_sender
}

pub fn is_finished(&self) -> bool {
self.is_finished.load(Ordering::Relaxed)
}
}

fn run_encoder_thread(
Expand Down
30 changes: 0 additions & 30 deletions compositor_pipeline/src/pipeline/output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,36 +215,6 @@ impl Output {
Ok(())
}

pub fn is_video_finished(&self) -> bool {
let encoder = match self {
Output::Rtp { encoder, .. } => encoder,
Output::Mp4 { encoder, .. } => encoder,
Output::EncodedData { encoder, .. } => encoder,
Output::RawData { .. } => return false,
};

encoder
.video
.as_ref()
.map(|v| v.is_finished())
.unwrap_or(true)
}

pub fn is_audio_finished(&self) -> bool {
let encoder = match self {
Output::Rtp { encoder, .. } => encoder,
Output::Mp4 { encoder, .. } => encoder,
Output::EncodedData { encoder, .. } => encoder,
Output::RawData { .. } => return false,
};

encoder
.audio
.as_ref()
.map(|a| a.is_finished())
.unwrap_or(true)
}

pub(super) fn output_frame_format(&self) -> Option<OutputFrameFormat> {
match &self {
Output::Rtp { encoder, .. } => encoder
Expand Down
4 changes: 4 additions & 0 deletions compositor_pipeline/src/pipeline/pipeline_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ impl PipelineOutputEndConditionState {
EosStatus::None
}

pub(super) fn did_output_end(&self) -> bool {
self.did_end
}

pub(super) fn on_input_registered(&mut self, input_id: &InputId) {
self.on_event(StateChange::AddInput(input_id))
}
Expand Down

0 comments on commit 8d01d72

Please sign in to comment.