Skip to content

Commit

Permalink
[test] Add offline processing integration test (#661)
Browse files Browse the repository at this point in the history
  • Loading branch information
WojciechBarczynski authored Aug 13, 2024
1 parent 27d6ef0 commit 638e530
Show file tree
Hide file tree
Showing 11 changed files with 241 additions and 19 deletions.
5 changes: 3 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,4 @@ signal-hook = { workspace = true }
tokio-tungstenite = "0.21.0"
wgpu = { workspace = true }
image = { workspace = true }
regex = "1.10.6"
7 changes: 4 additions & 3 deletions integration_tests/src/compositor_instance.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::{anyhow, Result};
use crossbeam_channel::Sender;
use live_compositor::{
config::{read_config, LoggerConfig, LoggerFormat},
config::{read_config, Config, LoggerConfig, LoggerFormat},
logger::{self, FfmpegLogLevel},
server::run_api,
state::ApiState,
Expand Down Expand Up @@ -31,10 +31,11 @@ impl Drop for CompositorInstance {
}

impl CompositorInstance {
pub fn start() -> Self {
/// api port in config is overwritten
pub fn start(config: Option<Config>) -> Self {
init_compositor_prerequisites();
let api_port = get_free_port();
let mut config = read_config();
let mut config = config.unwrap_or(read_config());
config.api_port = api_port;

info!("Starting LiveCompositor Integration Test with config:\n{config:#?}",);
Expand Down
70 changes: 70 additions & 0 deletions integration_tests/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,76 @@
mod audio_only;
mod offline_processing;
mod push_input_before_start;
mod required_inputs;
mod schedule_update;
mod unregistering;
mod video_audio;

use crossbeam_channel::Sender;
use futures_util::{SinkExt as _, StreamExt as _};
use tokio_tungstenite::tungstenite;

pub fn start_server_msg_listener(port: u16, event_sender: Sender<tungstenite::Message>) {
std::thread::Builder::new()
.name("Websocket Thread".to_string())
.spawn(move || {
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async { server_msg_listener(port, event_sender).await });
})
.unwrap();
}

async fn server_msg_listener(port: u16, event_sender: Sender<tungstenite::Message>) {
let url = format!("ws://127.0.0.1:{}/ws", port);

let (ws_stream, _) = tokio_tungstenite::connect_async(url)
.await
.expect("Failed to connect");

let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let (mut outgoing, mut incoming) = ws_stream.split();

let sender_task = tokio::spawn(async move {
while let Some(msg) = rx.recv().await {
if let tungstenite::Message::Close(None) = &msg {
let _ = outgoing.send(msg).await;
return;
}
match outgoing.send(msg).await {
Ok(()) => (),
Err(e) => {
println!("Send Loop: {:?}", e);
let _ = outgoing.send(tungstenite::Message::Close(None)).await;
return;
}
}
}
});

let receiver_task = tokio::spawn(async move {
while let Some(result) = incoming.next().await {
match result {
Ok(tungstenite::Message::Close(_)) => {
let _ = tx.send(tungstenite::Message::Close(None));
return;
}
Ok(tungstenite::Message::Ping(data)) => {
if tx.send(tungstenite::Message::Pong(data)).is_err() {
return;
}
}
Err(_) => {
let _ = tx.send(tungstenite::Message::Close(None));
return;
}
Ok(msg) => {
event_sender.send(msg).unwrap();
}
}
}
});

sender_task.await.unwrap();
receiver_task.await.unwrap();
}
8 changes: 4 additions & 4 deletions integration_tests/src/tests/audio_only.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::{
#[test]
pub fn audio_mixing_with_offset() -> Result<()> {
const OUTPUT_DUMP_FILE: &str = "audio_mixing_with_offset_output.rtp";
let instance = CompositorInstance::start();
let instance = CompositorInstance::start(None);
let input_1_port = instance.get_port();
let input_2_port = instance.get_port();
let output_port = instance.get_port();
Expand Down Expand Up @@ -112,7 +112,7 @@ pub fn audio_mixing_with_offset() -> Result<()> {
#[test]
pub fn audio_mixing_no_offset() -> Result<()> {
const OUTPUT_DUMP_FILE: &str = "audio_mixing_no_offset_output.rtp";
let instance = CompositorInstance::start();
let instance = CompositorInstance::start(None);
let input_1_port = instance.get_port();
let input_2_port = instance.get_port();
let output_port = instance.get_port();
Expand Down Expand Up @@ -213,7 +213,7 @@ pub fn audio_mixing_no_offset() -> Result<()> {
#[test]
pub fn single_input_opus() -> Result<()> {
const OUTPUT_DUMP_FILE: &str = "single_input_opus_output.rtp";
let instance = CompositorInstance::start();
let instance = CompositorInstance::start(None);
let input_1_port = instance.get_port();
let output_port = instance.get_port();

Expand Down Expand Up @@ -295,7 +295,7 @@ pub fn single_input_opus() -> Result<()> {
#[test]
pub fn single_input_aac() -> Result<()> {
const OUTPUT_DUMP_FILE: &str = "single_input_aac_output.rtp";
let instance = CompositorInstance::start();
let instance = CompositorInstance::start(None);
let input_1_port = instance.get_port();
let output_port = instance.get_port();

Expand Down
149 changes: 149 additions & 0 deletions integration_tests/src/tests/offline_processing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
use std::{
fs::{self},
path::Path,
process::Command,
};

use anyhow::{anyhow, Result};
use live_compositor::config::read_config;
use log::info;
use regex::Regex;
use serde_json::json;
use tokio_tungstenite::tungstenite;

use crate::{tests::start_server_msg_listener, CompositorInstance};

const BUNNY_URL: &str =
"https://commondatastorage.googleapis.com/gtv-videos-bucket/sample/BigBuckBunny.mp4";

#[test]
pub fn offline_processing() -> Result<()> {
const OUTPUT_FILE: &str = "/tmp/offline_processing_output.mp4";
if Path::new(OUTPUT_FILE).exists() {
fs::remove_file(OUTPUT_FILE)?;
};

let mut config = read_config();
config.queue_options.ahead_of_time_processing = true;
config.queue_options.never_drop_output_frames = true;
let instance = CompositorInstance::start(Some(config));
let (msg_sender, msg_receiver) = crossbeam_channel::unbounded();
start_server_msg_listener(instance.api_port, msg_sender);

instance.send_request(
"input/input_1/register",
json!({
"type": "mp4",
"url": BUNNY_URL,
"required": true
}),
)?;

instance.send_request(
"output/output_1/register",
json!({
"type": "mp4",
"path": OUTPUT_FILE,
"video": {
"resolution": {
"width": 640,
"height": 320
},
"encoder": {
"type": "ffmpeg_h264",
"preset": "ultrafast",
},
"initial": {
"root": {
"type": "view",
"children": [{
"type": "rescaler",
"child": {
"type": "input_stream",
"input_id": "input_1"
}
}]
}
},
"send_eos_when": { "all_inputs": true }
},
"audio": {
"encoder": {
"type": "aac",
"channels": "stereo"
},
"initial": {
"inputs": [{ "input_id": "input_1" }]
},
"send_eos_when": { "all_inputs": true }
}
}),
)?;

instance.send_request(
"input/input_1/unregister",
json!({
"schedule_time_ms": 2000
}),
)?;
instance.send_request(
"output/output_1/unregister",
json!({
"schedule_time_ms": 2000
}),
)?;

instance.send_request("start", json!({}))?;

for msg in msg_receiver.iter() {
if let tungstenite::Message::Text(msg) = msg {
if msg.contains("\"type\":\"OUTPUT_DONE\",\"output_id\":\"output_1\"") {
info!("breaking");
break;
}
}
}

let command_output = Command::new("ffprobe")
.args(["-v", "error", "-show_format", OUTPUT_FILE])
.output()
.map_err(|e| anyhow!("Invalid mp4 file. FFprobe error: {}", e))?;

if !command_output.status.success() {
return Err(anyhow!(
"Invalid mp4 file. FFprobe error: {}",
String::from_utf8_lossy(&command_output.stderr)
));
}

let output_str = String::from_utf8_lossy(&command_output.stdout);
let (duration, bit_rate) = extract_ffprobe_info(&output_str)?;

if !(1.9..=2.1).contains(&duration) {
return Err(anyhow!("Invalid duration: {}", duration));
}
if !(950_000..=980_000).contains(&bit_rate) {
return Err(anyhow!("Invalid bit rate: {}", bit_rate));
}

Ok(())
}

fn extract_ffprobe_info(output: &str) -> Result<(f64, u64)> {
let re_duration = Regex::new(r"duration=(\d+\.\d+)").unwrap();
let re_bit_rate = Regex::new(r"bit_rate=(\d+)").unwrap();

let duration: f64 = re_duration
.captures(output)
.and_then(|caps| caps.get(1))
.map(|m| m.as_str().parse().unwrap_or(0.0))
.ok_or_else(|| anyhow!("Failed to extract duration"))?;

let bit_rate: u64 = re_bit_rate
.captures(output)
.and_then(|caps| caps.get(1))
.map(|m| m.as_str().parse().unwrap_or(0))
.ok_or_else(|| anyhow!("Failed to extract bit rate"))?;

Ok((duration, bit_rate))
}
8 changes: 4 additions & 4 deletions integration_tests/src/tests/push_input_before_start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::{
#[test]
pub fn push_input_before_start_tcp() -> Result<()> {
const OUTPUT_DUMP_FILE: &str = "push_entire_input_before_start_tcp.rtp";
let instance = CompositorInstance::start();
let instance = CompositorInstance::start(None);
let input_port = instance.get_port();
let output_port = instance.get_port();

Expand Down Expand Up @@ -103,7 +103,7 @@ pub fn push_input_before_start_tcp() -> Result<()> {
#[test]
pub fn push_input_before_start_udp() -> Result<()> {
const OUTPUT_DUMP_FILE: &str = "push_entire_input_before_start_udp.rtp";
let instance = CompositorInstance::start();
let instance = CompositorInstance::start(None);
let input_port = instance.get_port();
let output_port = instance.get_port();

Expand Down Expand Up @@ -188,7 +188,7 @@ pub fn push_input_before_start_udp() -> Result<()> {
#[test]
pub fn push_input_before_start_tcp_no_offset() -> Result<()> {
const OUTPUT_DUMP_FILE: &str = "push_entire_input_before_start_tcp_without_offset.rtp";
let instance = CompositorInstance::start();
let instance = CompositorInstance::start(None);
let input_port = instance.get_port();
let output_port = instance.get_port();

Expand Down Expand Up @@ -273,7 +273,7 @@ pub fn push_input_before_start_tcp_no_offset() -> Result<()> {
#[test]
pub fn push_input_before_start_udp_no_offset() -> Result<()> {
const OUTPUT_DUMP_FILE: &str = "push_entire_input_before_start_udp_without_offset.rtp";
let instance = CompositorInstance::start();
let instance = CompositorInstance::start(None);
let input_port = instance.get_port();
let output_port = instance.get_port();

Expand Down
6 changes: 3 additions & 3 deletions integration_tests/src/tests/required_inputs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use serde_json::json;
#[test]
pub fn required_inputs_no_offset() -> Result<()> {
const OUTPUT_DUMP_FILE: &str = "required_inputs_no_offset_output.rtp";
let instance = CompositorInstance::start();
let instance = CompositorInstance::start(None);
let input_1_port = instance.get_port();
let input_2_port = instance.get_port();
let output_port = instance.get_port();
Expand Down Expand Up @@ -131,7 +131,7 @@ pub fn required_inputs_no_offset() -> Result<()> {
#[test]
pub fn required_inputs_with_offset() -> Result<()> {
const OUTPUT_DUMP_FILE: &str = "required_inputs_with_offset_output.rtp";
let instance = CompositorInstance::start();
let instance = CompositorInstance::start(None);
let input_1_port = instance.get_port();
let input_2_port = instance.get_port();
let output_port = instance.get_port();
Expand Down Expand Up @@ -252,7 +252,7 @@ pub fn required_inputs_with_offset() -> Result<()> {
#[test]
pub fn optional_inputs_no_offset_flaky() -> Result<()> {
const OUTPUT_DUMP_FILE: &str = "optional_inputs_no_offset_output.rtp";
let instance = CompositorInstance::start();
let instance = CompositorInstance::start(None);
let input_1_port = instance.get_port();
let input_2_port = instance.get_port();
let output_port = instance.get_port();
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/src/tests/schedule_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use serde_json::json;
#[test]
pub fn schedule_update() -> Result<()> {
const OUTPUT_DUMP_FILE: &str = "schedule_update_output.rtp";
let instance = CompositorInstance::start();
let instance = CompositorInstance::start(None);
let input_1_port = instance.get_port();
let input_2_port = instance.get_port();
let output_port = instance.get_port();
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/src/tests/unregistering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use serde_json::json;
#[test]
pub fn unregistering() -> Result<()> {
const OUTPUT_DUMP_FILE: &str = "unregistering_test_output.rtp";
let instance = CompositorInstance::start();
let instance = CompositorInstance::start(None);
let input_port = instance.get_port();
let output_port = instance.get_port();

Expand Down
Loading

0 comments on commit 638e530

Please sign in to comment.