Skip to content

Commit

Permalink
[web-wasm] Decouple frame production from input implementation (#906)
Browse files Browse the repository at this point in the history
  • Loading branch information
noituri authored Jan 7, 2025
1 parent b7d80d7 commit 55cddb9
Show file tree
Hide file tree
Showing 10 changed files with 1,258 additions and 909 deletions.
168 changes: 38 additions & 130 deletions ts/@live-compositor/web-wasm/src/input/input.ts
Original file line number Diff line number Diff line change
@@ -1,41 +1,35 @@
import type { InputId } from '@live-compositor/browser-render';
import { CompositorEventType, type EventSender } from '../eventSender';
import type InputSource from './source';
import { Queue } from '@datastructures-js/queue';
import { H264Decoder } from './decoder/h264Decoder';
import { FrameRef } from './frame';
import { assert, framerateToDurationMs } from '../utils';
import type { FrameRef } from './frame';
import { assert } from '../utils';
import type InputFrameProducer from './inputFrameProducer';

export type InputState = 'waiting_for_start' | 'buffering' | 'playing' | 'finished';

const MAX_BUFFERING_SIZE = 3;

export class Input {
private id: InputId;
private state: InputState;
private source: InputSource;
private decoder: H264Decoder;
private frameProducer: InputFrameProducer;
private eventSender: EventSender;
private frames: Queue<FrameRef>;
/**
* Queue PTS of the first frame
*/
private startPtsMs?: number;

public constructor(id: InputId, source: InputSource, eventSender: EventSender) {
public constructor(id: InputId, frameProducer: InputFrameProducer, eventSender: EventSender) {
this.id = id;
this.state = 'waiting_for_start';
this.source = source;
this.frameProducer = frameProducer;
this.eventSender = eventSender;
this.frames = new Queue();
this.decoder = new H264Decoder({
onFrame: frame => {
this.frames.push(new FrameRef(frame));
},
});

this.source.registerCallbacks({
onDecoderConfig: config => this.decoder.configure(config),
this.frameProducer.registerCallbacks({
onReady: () => {
this.state = 'playing';
this.eventSender.sendEvent({
type: CompositorEventType.VIDEO_INPUT_PLAYING,
inputId: this.id,
});
},
});
}

Expand All @@ -45,140 +39,54 @@ export class Input {
return;
}

this.source.start();
this.frameProducer.start();
this.state = 'buffering';
this.eventSender.sendEvent({
type: CompositorEventType.VIDEO_INPUT_DELIVERED,
inputId: this.id,
});
}

public async getFrameRef(currentQueuePts: number): Promise<FrameRef | undefined> {
if (this.state === 'buffering') {
this.handleBuffering();
return;
}
if (this.state !== 'playing') {
return;
}
if (this.startPtsMs === undefined) {
this.startPtsMs = currentQueuePts;
}

this.dropOldFrames(currentQueuePts);
this.enqueueChunks(currentQueuePts);

// No more chunks will be produced. Flush all the remaining frames from the decoder
if (this.source.isFinished() && this.decoder.decodeQueueSize() !== 0) {
await this.decoder.flush();
/**
* Called on every queue tick. Produces frames for given `currentQueuePts` & handles EOS.
*/
public async onQueueTick(currentQueuePts: number): Promise<void> {
let targetPts: number | undefined;
if (this.startPtsMs !== undefined) {
targetPts = this.queuePtsToInputPts(currentQueuePts);
}

let frame: FrameRef | undefined;
if (this.source.isFinished() && this.frames.size() == 1) {
// Last frame is not poped by `dropOldFrames`
frame = this.frames.pop();
} else {
frame = this.getLatestFrame();
}
await this.frameProducer.produce(targetPts);

if (frame) {
return frame;
}
if (this.state === 'playing' && this.frameProducer.isFinished()) {
// EOS received and no more frames will be produced.
this.state = 'finished';
this.eventSender.sendEvent({
type: CompositorEventType.VIDEO_INPUT_EOS,
inputId: this.id,
});

// Source received EOS & there is no more frames
if (this.source.isFinished()) {
this.handleEos();
return;
this.frameProducer.close();
}

return undefined;
}

/**
* Retrieves latest frame and increments its reference count
* Retrieves reference of a frame closest to the provided `currentQueuePts`.
*/
private getLatestFrame(): FrameRef | undefined {
const frame = this.frames.front();
if (frame) {
frame.incrementRefCount();
return frame;
}

return undefined;
}

/**
* Finds frame with PTS closest to `currentQueuePts` and removes frames older than it
*/
private dropOldFrames(currentQueuePts: number): void {
if (this.frames.isEmpty()) {
public getFrameRef(currentQueuePts: number): FrameRef | undefined {
if (this.state !== 'playing') {
return;
}

const frames = this.frames.toArray();
const targetPts = this.queuePtsToInputPts(currentQueuePts);

const targetFrame = frames.reduce((prevFrame, frame) => {
const prevPtsDiff = Math.abs(prevFrame.getPtsMs() - targetPts);
const currPtsDiff = Math.abs(frame.getPtsMs() - targetPts);
return prevPtsDiff < currPtsDiff ? prevFrame : frame;
});

for (const frame of frames) {
if (frame.getPtsMs() < targetFrame.getPtsMs()) {
frame.decrementRefCount();
this.frames.pop();
}
}
}

private handleBuffering() {
if (this.frames.size() < MAX_BUFFERING_SIZE) {
this.tryEnqueueChunk();
return;
if (this.startPtsMs === undefined) {
this.startPtsMs = currentQueuePts;
}

this.state = 'playing';
this.eventSender.sendEvent({
type: CompositorEventType.VIDEO_INPUT_PLAYING,
inputId: this.id,
});
}

private handleEos() {
this.state = 'finished';
this.eventSender.sendEvent({
type: CompositorEventType.VIDEO_INPUT_EOS,
inputId: this.id,
});

this.decoder.close();
const framePts = this.queuePtsToInputPts(currentQueuePts);
return this.frameProducer.getFrameRef(framePts);
}

private queuePtsToInputPts(queuePts: number): number {
assert(this.startPtsMs !== undefined);
return queuePts - this.startPtsMs;
}

private tryEnqueueChunk() {
const chunk = this.source.nextChunk();
if (chunk) {
this.decoder.decode(chunk.data);
}
}

private enqueueChunks(currentQueuePts: number) {
const framrate = this.source.getFramerate();
assert(framrate);

const frameDuration = framerateToDurationMs(framrate);
const targetPts = this.queuePtsToInputPts(currentQueuePts) + frameDuration * MAX_BUFFERING_SIZE;

let chunk = this.source.peekChunk();
while (chunk && chunk.ptsMs < targetPts) {
this.decoder.decode(chunk.data);
this.source.nextChunk();
chunk = this.source.peekChunk();
}
}
}
36 changes: 36 additions & 0 deletions ts/@live-compositor/web-wasm/src/input/inputFrameProducer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import type { RegisterInputRequest } from '@live-compositor/core';
import type { FrameRef } from './frame';
import DecodingFrameProducer from './producer/decodingFrameProducer';
import MP4Source from './mp4/source';

export type InputFrameProducerCallbacks = {
onReady(): void;
};

export default interface InputFrameProducer {
init(): Promise<void>;
/**
* Starts resources required for producing frames. `init()` has to be called beforehand.
*/
start(): void;
registerCallbacks(callbacks: InputFrameProducerCallbacks): void;
/**
* Produce next frame.
* @param framePts - Desired PTS of the frame in milliseconds.
*/
produce(framePts?: number): Promise<void>;
getFrameRef(framePts: number): FrameRef | undefined;
/**
* if `true` no more frames will be produced.
*/
isFinished(): boolean;
close(): void;
}

export function producerFromRequest(request: RegisterInputRequest): InputFrameProducer {
if (request.type === 'mp4') {
return new DecodingFrameProducer(new MP4Source(request.url!));
} else {
throw new Error(`Unknown input type ${(request as any).type}`);
}
}
10 changes: 8 additions & 2 deletions ts/@live-compositor/web-wasm/src/input/mp4/demuxer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export class MP4Demuxer {
private fileOffset: number;
private callbacks: MP4DemuxerCallbacks;
private samplesCount?: number;
private ptsOffset?: number;

public constructor(callbacks: MP4DemuxerCallbacks) {
this.file = MP4Box.createFile();
Expand Down Expand Up @@ -77,10 +78,15 @@ export class MP4Demuxer {
assert(this.samplesCount !== undefined);

for (const sample of samples) {
const pts = (sample.cts * 1_000) / sample.timescale;
if (this.ptsOffset === undefined) {
this.ptsOffset = -pts;
}

const chunk = new EncodedVideoChunk({
type: sample.is_sync ? 'key' : 'delta',
timestamp: (sample.cts * 1_000_000) / sample.timescale,
duration: (sample.duration * 1_000_000) / sample.timescale,
timestamp: pts + this.ptsOffset,
duration: (sample.duration * 1_000) / sample.timescale,
data: sample.data,
});

Expand Down
26 changes: 5 additions & 21 deletions ts/@live-compositor/web-wasm/src/input/mp4/source.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import type { Mp4ReadyData } from './demuxer';
import { MP4Demuxer } from './demuxer';
import type InputSource from '../source';
import type { InputSourceCallbacks, SourcePayload, VideoChunk } from '../source';
import type { InputSourceCallbacks, SourcePayload } from '../source';
import { Queue } from '@datastructures-js/queue';
import { assert } from '../../utils';
import type { Framerate } from '../../compositor';

export default class MP4Source implements InputSource {
Expand All @@ -13,7 +12,6 @@ export default class MP4Source implements InputSource {
private callbacks?: InputSourceCallbacks;
private chunks: Queue<EncodedVideoChunk>;
private eosReceived: boolean = false;
private ptsOffset?: number;
private framerate?: Framerate;

public constructor(fileUrl: string) {
Expand Down Expand Up @@ -51,14 +49,12 @@ export default class MP4Source implements InputSource {
return this.framerate;
}

public nextChunk(): VideoChunk | undefined {
const chunk = this.chunks.pop();
return chunk && this.intoVideoChunk(chunk);
public nextChunk(): EncodedVideoChunk | undefined {
return this.chunks.pop();
}

public peekChunk(): VideoChunk | undefined {
const chunk = this.chunks.front();
return chunk && this.intoVideoChunk(chunk);
public peekChunk(): EncodedVideoChunk | undefined {
return this.chunks.front();
}

private handleOnReady(data: Mp4ReadyData) {
Expand All @@ -68,21 +64,9 @@ export default class MP4Source implements InputSource {

private handlePayload(payload: SourcePayload) {
if (payload.type === 'chunk') {
if (this.ptsOffset === undefined) {
this.ptsOffset = -payload.chunk.timestamp;
}
this.chunks.push(payload.chunk);
} else if (payload.type === 'eos') {
this.eosReceived = true;
}
}

private intoVideoChunk(chunk: EncodedVideoChunk): VideoChunk {
assert(this.ptsOffset !== undefined);

return {
data: chunk,
ptsMs: (this.ptsOffset + chunk.timestamp) / 1000,
};
}
}
Loading

0 comments on commit 55cddb9

Please sign in to comment.