Skip to content

Commit

Permalink
Harness input.http set_url
Browse files Browse the repository at this point in the history
  • Loading branch information
toots committed Apr 29, 2024
1 parent 485d746 commit 00ec611
Showing 1 changed file with 111 additions and 82 deletions.
193 changes: 111 additions & 82 deletions src/core/io/ffmpeg_io.ml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,13 @@ let () =
Lifecycle.before_core_shutdown ~name:"input.ffmpeg shutdown" (fun () ->
Atomic.set shutdown true)

let string_of_source_status = function
| `Stopped -> "stopped"
| `Starting -> "starting"
| `Polling -> "polling"
| `Connected _ -> "connected"
| `Stopping -> "stopping"

class input ?(name = "input.ffmpeg") ~autostart ~self_sync ~poll_delay ~debug
~max_buffer ~on_error ~on_stop ~on_start ~on_connect ~metadata_filter
~on_disconnect ~new_track_on_metadata ?format ~opts ~trim_url url =
Expand All @@ -70,6 +77,14 @@ class input ?(name = "input.ffmpeg") ~autostart ~self_sync ~poll_delay ~debug
~name ~fallible:true ~on_start ~on_stop ~autostart () as super

val connect_task = Atomic.make None

initializer
let t =
Duppy.Async.add ~priority:`Blocking Tutils.scheduler self#do_connect
in
Atomic.set connect_task (Some t)

method connect_task = Option.get (Atomic.get connect_task)
method seek_source = (self :> Source.source)
method remaining = -1
method abort_track = Generator.add_track_mark self#buffer
Expand Down Expand Up @@ -104,77 +119,102 @@ class input ?(name = "input.ffmpeg") ~autostart ~self_sync ~poll_delay ~debug
let u = url () in
if trim_url then String.trim u else u

method set_url u = url <- u
method set_url u =
let old_url = url in
url <- u;
if u () <> old_url () then self#disconnect

method buffer_length = Frame.seconds_of_audio (Generator.length self#buffer)

method private connect_task () =
method private do_connect () =
Generator.set_max_length self#buffer max_length;
try
if self#source_status = `Stopping then raise Stopped;
assert (self#source_status = `Starting);
Atomic.set source_status `Polling;
let opts = Hashtbl.copy opts in
let url = self#url in
let closed = Atomic.make false in
let input =
Av.open_input
~interrupt:(fun () -> Atomic.get shutdown || Atomic.get closed)
?format ~opts url
in
if Hashtbl.length opts > 0 then
failwith
(Printf.sprintf "Unrecognized options: %s"
(Ffmpeg_format.string_of_options opts));
let content_type =
Ffmpeg_decoder.get_type ~format ~ctype:self#content_type ~url input
in
if not (Decoder.can_decode_type content_type self#content_type) then
failwith
(Printf.sprintf "url %S cannot produce content of type %s" url
(Frame.string_of_content_type self#content_type));
let streams =
Ffmpeg_decoder.mk_streams ~ctype:self#content_type
~decode_first_metadata:true input
in
let decoder =
Ffmpeg_decoder.mk_decoder ~streams ~target_position:(ref None) input
in
let buffer = Decoder.mk_buffer ~ctype:self#content_type self#buffer in
(* FFmpeg has memory leaks with chained ogg stream so we manually
reset the metadata after fetching it. *)
let get_metadata stream =
let m = Av.get_metadata stream in
Av.set_metadata stream [];
m
in
let get_metadata () =
normalize_metadata
(Ffmpeg_decoder.Streams.fold
(fun _ stream m ->
m
@
match stream with
| `Audio_frame (stream, _) -> get_metadata stream
| `Audio_packet (stream, _) -> get_metadata stream
| `Video_frame (stream, _) -> get_metadata stream
| `Video_packet (stream, _) -> get_metadata stream
| `Data_packet _ -> [])
streams
(Av.get_input_metadata input))
in
let last_meta = ref [] in
let get_metadata () =
let m = get_metadata () in
if m <> !last_meta then (
last_meta := m;
m)
else []
in
on_connect input;
Generator.add_track_mark self#buffer;
let container = { input; decoder; buffer; get_metadata; closed } in
Atomic.set source_status (`Connected (url, container));
-1.
match self#source_status with
| `Stopping -> raise Stopped
| `Stopped | `Polling | `Connected _ -> -1.
| `Starting ->
Atomic.set source_status `Polling;
let opts = Hashtbl.copy opts in
let url = self#url in
let closed = Atomic.make false in
let input =
Av.open_input
~interrupt:(fun () ->
Atomic.get shutdown || Atomic.get closed)
?format ~opts url
in
if Hashtbl.length opts > 0 then
failwith
(Printf.sprintf "Unrecognized options: %s"
(Ffmpeg_format.string_of_options opts));
let content_type =
Ffmpeg_decoder.get_type ~format ~ctype:self#content_type ~url
input
in
if not (Decoder.can_decode_type content_type self#content_type)
then
failwith
(Printf.sprintf "url %S cannot produce content of type %s" url
(Frame.string_of_content_type self#content_type));
let streams =
Ffmpeg_decoder.mk_streams ~ctype:self#content_type
~decode_first_metadata:true input
in
let decoder =
Ffmpeg_decoder.mk_decoder ~streams ~target_position:(ref None)
input
in
let buffer =
Decoder.mk_buffer ~ctype:self#content_type self#buffer
in
(* FFmpeg has memory leaks with chained ogg stream so we manually
reset the metadata after fetching it. *)
let get_metadata stream =
let m = Av.get_metadata stream in
Av.set_metadata stream [];
m
in
let get_metadata () =
normalize_metadata
(Ffmpeg_decoder.Streams.fold
(fun _ stream m ->
m
@
match stream with
| `Audio_frame (stream, _) -> get_metadata stream
| `Audio_packet (stream, _) -> get_metadata stream
| `Video_frame (stream, _) -> get_metadata stream
| `Video_packet (stream, _) -> get_metadata stream
| `Data_packet _ -> [])
streams
(Av.get_input_metadata input))
in
let last_meta = ref [] in
let get_metadata () =
let m = get_metadata () in
if m <> !last_meta then (
last_meta := m;
m)
else []
in
let container =
{ input; decoder; buffer; get_metadata; closed }
in
if
Atomic.compare_and_set source_status `Polling
(`Connected (url, container))
then (
on_connect input;
Generator.add_track_mark self#buffer)
else (
Atomic.set closed true;
Av.close input;
match self#source_status with
| `Stopping -> raise Stopped
| v ->
self#log#important "Inconsistent source status: %s"
(string_of_source_status v));
-1.
with
| Stopped ->
Atomic.set source_status `Stopped;
Expand All @@ -192,25 +232,14 @@ class input ?(name = "input.ffmpeg") ~autostart ~self_sync ~poll_delay ~debug
method private connect =
match self#source_status with
| `Starting | `Polling | `Connected _ -> ()
| `Stopping | `Stopped -> (
| `Stopping | `Stopped ->
Atomic.set source_status `Starting;
match Atomic.get connect_task with
| Some t -> Duppy.Async.wake_up t
| None ->
let t =
Duppy.Async.add ~priority:`Blocking Tutils.scheduler
self#connect_task
in
Atomic.set connect_task (Some t);
Duppy.Async.wake_up t)
Duppy.Async.wake_up self#connect_task

method private disconnect =
let stop_task () =
match Atomic.get connect_task with
| None -> ()
| Some t ->
Atomic.set source_status `Stopping;
Duppy.Async.wake_up t
Atomic.set source_status `Stopping;
Duppy.Async.wake_up self#connect_task
in
match self#source_status with
| `Stopping | `Stopped -> ()
Expand Down

0 comments on commit 00ec611

Please sign in to comment.