Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ts-sdk] Ensure compositor instance cleanup (Node.js and browser) #907

Merged
merged 1 commit into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ts/@live-compositor/browser-render/src/wasm.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import init, * as wasm from './generated/compositor_web';
* @param wasmModuleUrl {string} - An URL for `live-compositor.wasm` file. The file is located in `dist` folder.
*/
export async function loadWasmModule(wasmModuleUrl: string) {
await init(wasmModuleUrl);
await init({ module_or_path: wasmModuleUrl });
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

passing string works, but it triggers warning

}

export { wasm };
1 change: 1 addition & 0 deletions ts/@live-compositor/core/src/compositorManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ export interface CompositorManager {
setupInstance(opts: SetupInstanceOptions): Promise<void>;
sendRequest(request: ApiRequest): Promise<object>;
registerEventListener(cb: (event: unknown) => void): void;
terminate(): Promise<void>;
}
9 changes: 8 additions & 1 deletion ts/@live-compositor/core/src/live/compositor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ export class LiveCompositor {

public async unregisterOutput(outputId: string): Promise<object> {
this.logger.info({ outputId }, 'Unregister output');
this.outputs[outputId].close();
await this.outputs[outputId].close();
delete this.outputs[outputId];
// TODO: wait for event
return this.api.unregisterOutput(outputId, {});
Expand Down Expand Up @@ -140,6 +140,13 @@ export class LiveCompositor {
this.startTime = startTime;
}

public async terminate(): Promise<void> {
for (const output of Object.values(this.outputs)) {
await output.close();
}
await this.manager.terminate();
}

private handleEvent(rawEvent: unknown) {
const event = parseEvent(rawEvent, this.logger);
if (!event) {
Expand Down
46 changes: 21 additions & 25 deletions ts/@live-compositor/core/src/live/output.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import type { ApiClient, Api } from '../api.js';
import Renderer from '../renderer.js';
import type { RegisterOutput } from '../api/output.js';
import { intoAudioInputsConfiguration } from '../api/output.js';
import { throttle } from '../utils.js';
import { OutputRootComponent, OutputShutdownStateStore } from '../rootComponent.js';
import { ThrottledFunction } from '../utils.js';
import { OutputRootComponent } from '../rootComponent.js';
import type { Logger } from 'pino';

type AudioContext = _liveCompositorInternals.AudioContext;
Expand All @@ -21,11 +21,10 @@ class Output {
audioContext: AudioContext;
timeContext: LiveTimeContext;
internalInputStreamStore: LiveInputStreamStore<number>;
outputShutdownStateStore: OutputShutdownStateStore;
logger: Logger;

shouldUpdateWhenReady: boolean = false;
throttledUpdate: () => void;
throttledUpdate: ThrottledFunction;

supportsAudio: boolean;
supportsVideo: boolean;
Expand All @@ -44,16 +43,21 @@ class Output {
this.api = api;
this.logger = logger;
this.outputId = outputId;
this.outputShutdownStateStore = new OutputShutdownStateStore();
this.shouldUpdateWhenReady = false;
this.throttledUpdate = () => {
this.shouldUpdateWhenReady = true;
};
this.throttledUpdate = new ThrottledFunction(
async () => {
this.shouldUpdateWhenReady = true;
},
{
timeoutMs: 30,
logger: this.logger,
}
);

this.supportsAudio = 'audio' in registerRequest && !!registerRequest.audio;
this.supportsVideo = 'video' in registerRequest && !!registerRequest.video;

const onUpdate = () => this.throttledUpdate();
const onUpdate = () => this.throttledUpdate.scheduleCall();
this.audioContext = new _liveCompositorInternals.AudioContext(onUpdate);
this.timeContext = new _liveCompositorInternals.LiveTimeContext();
this.internalInputStreamStore = new _liveCompositorInternals.LiveInputStreamStore(this.logger);
Expand All @@ -64,7 +68,6 @@ class Output {
const rootElement = createElement(OutputRootComponent, {
outputContext: new OutputContext(this, this.outputId, store),
outputRoot: root,
outputShutdownStateStore: this.outputShutdownStateStore,
childrenLifetimeContext: new _liveCompositorInternals.ChildrenLifetimeContext(() => {}),
});

Expand All @@ -87,25 +90,18 @@ class Output {
};
}

public close(): void {
this.throttledUpdate = () => {};
// close will switch a scene to just a <View />, so we need replace `throttledUpdate`
// callback before it is called
this.outputShutdownStateStore.close();
public async close(): Promise<void> {
this.throttledUpdate.setFn(async () => {});
this.renderer.stop();
await this.throttledUpdate.waitForPendingCalls();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed the throttledUpdate to class because I need to be able to wait for pending requests to finish, otherwise LiveCompositor.terminate() might kill the server before pending request are done

}

public async ready() {
this.throttledUpdate = throttle(
async () => {
await this.api.updateScene(this.outputId, this.scene());
},
{
timeoutMs: 30,
logger: this.logger,
}
);
this.throttledUpdate.setFn(async () => {
await this.api.updateScene(this.outputId, this.scene());
});
if (this.shouldUpdateWhenReady) {
this.throttledUpdate();
this.throttledUpdate.scheduleCall();
}
}

Expand Down
2 changes: 1 addition & 1 deletion ts/@live-compositor/core/src/offline/compositor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ export class OfflineCompositor {
await this.api.registerOutput(OFFLINE_OUTPUT_ID, apiRequest);
await output.scheduleAllUpdates();
// at this point all scene update requests should already be delivered
output.outputShutdownStateStore.close();

if (durationMs) {
await this.api.unregisterOutput(OFFLINE_OUTPUT_ID, { schedule_time_ms: durationMs });
Expand All @@ -78,6 +77,7 @@ export class OfflineCompositor {
await this.api.start();

await renderPromise;
await this.manager.terminate();
}

public async registerInput(inputId: string, request: RegisterInput): Promise<object> {
Expand Down
7 changes: 2 additions & 5 deletions ts/@live-compositor/core/src/offline/output.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import type { RegisterOutput } from '../api/output.js';
import { intoAudioInputsConfiguration } from '../api/output.js';
import { sleep } from '../utils.js';
import { OFFLINE_OUTPUT_ID } from './compositor.js';
import { OutputRootComponent, OutputShutdownStateStore } from '../rootComponent.js';
import { OutputRootComponent } from '../rootComponent.js';
import type { Logger } from 'pino';

type AudioContext = _liveCompositorInternals.AudioContext;
Expand All @@ -26,7 +26,6 @@ class OfflineOutput {
timeContext: OfflineTimeContext;
childrenLifetimeContext: ChildrenLifetimeContext;
internalInputStreamStore: OfflineInputStreamStore<number>;
outputShutdownStateStore: OutputShutdownStateStore;
logger: Logger;

durationMs?: number;
Expand All @@ -48,7 +47,6 @@ class OfflineOutput {
this.api = api;
this.logger = logger;
this.outputId = OFFLINE_OUTPUT_ID;
this.outputShutdownStateStore = new OutputShutdownStateStore();
this.durationMs = durationMs;

this.supportsAudio = 'audio' in registerRequest && !!registerRequest.audio;
Expand All @@ -70,7 +68,6 @@ class OfflineOutput {
const rootElement = createElement(OutputRootComponent, {
outputContext: new OutputContext(this, this.outputId, store),
outputRoot: root,
outputShutdownStateStore: this.outputShutdownStateStore,
childrenLifetimeContext: this.childrenLifetimeContext,
});

Expand Down Expand Up @@ -117,7 +114,7 @@ class OfflineOutput {

this.timeContext.setNextTimestamp();
}
this.outputShutdownStateStore.close();
this.renderer.stop();
}
}

Expand Down
11 changes: 11 additions & 0 deletions ts/@live-compositor/core/src/renderer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ class Renderer {
public readonly root: FiberRootNode;
public readonly onUpdate: () => void;
private logger: Logger;
private lastScene?: Api.Component;

constructor({ rootElement, onUpdate, idPrefix, logger }: RendererOptions) {
this.logger = logger;
Expand All @@ -256,6 +257,11 @@ class Renderer {
}

public scene(): Api.Component {
if (this.lastScene) {
// Renderer was already stopped just return old scene
return this.lastScene;
}

// When resetAfterCommit is called `this.root.current` is not updated yet, so we need to rely
// on `pendingChildren`. I'm not sure it is always populated, so there is a fallback to
// `root.current`.
Expand All @@ -264,6 +270,11 @@ class Renderer {
this.root.pendingChildren[0] ?? rootHostComponent(this.root.current, this.logger);
return rootComponent.scene();
}

public stop() {
this.lastScene = this.scene();
CompositorRenderer.updateContainer(null, this.root, null, () => {});
}
}

function rootHostComponent(root: any, logger: Logger): LiveCompositorHostComponent {
Expand Down
42 changes: 2 additions & 40 deletions ts/@live-compositor/core/src/rootComponent.ts
Original file line number Diff line number Diff line change
@@ -1,59 +1,21 @@
import { _liveCompositorInternals, useAfterTimestamp, View } from 'live-compositor';
import { createElement, useEffect, useSyncExternalStore, type ReactElement } from 'react';
import { _liveCompositorInternals, useAfterTimestamp } from 'live-compositor';
import { createElement, useEffect, type ReactElement } from 'react';

type CompositorOutputContext = _liveCompositorInternals.CompositorOutputContext;
type ChildrenLifetimeContext = _liveCompositorInternals.ChildrenLifetimeContext;

// External store to share shutdown information between React tree
// and external code that is managing it.
export class OutputShutdownStateStore {
private shutdown: boolean = false;
private onChangeCallbacks: Set<() => void> = new Set();

public close() {
this.shutdown = true;
this.onChangeCallbacks.forEach(cb => cb());
}

// callback for useSyncExternalStore
public getSnapshot = (): boolean => {
return this.shutdown;
};

// callback for useSyncExternalStore
public subscribe = (onStoreChange: () => void): (() => void) => {
this.onChangeCallbacks.add(onStoreChange);
return () => {
this.onChangeCallbacks.delete(onStoreChange);
};
};
}

const globalDelayRef = Symbol();

export function OutputRootComponent({
outputContext,
outputRoot,
outputShutdownStateStore,
childrenLifetimeContext,
}: {
outputContext: CompositorOutputContext;
outputRoot: ReactElement;
outputShutdownStateStore: OutputShutdownStateStore;
childrenLifetimeContext: ChildrenLifetimeContext;
}) {
const shouldShutdown = useSyncExternalStore(
outputShutdownStateStore.subscribe,
outputShutdownStateStore.getSnapshot
);

useMinimalStreamDuration(childrenLifetimeContext);

if (shouldShutdown) {
// replace root with view to stop all the dynamic code
return createElement(View, {});
}

return createElement(
_liveCompositorInternals.LiveCompositorContext.Provider,
{ value: outputContext },
Expand Down
56 changes: 36 additions & 20 deletions ts/@live-compositor/core/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,37 +5,53 @@ type ThrottleOptions = {
timeoutMs: number;
};

export function throttle(fn: () => Promise<void>, opts: ThrottleOptions): () => void {
let shouldCall: boolean = false;
let running: boolean = false;
export class ThrottledFunction {
private fn: () => Promise<void>;
private shouldCall: boolean = false;
private runningPromise?: Promise<void> = undefined;
private opts: ThrottleOptions;

const start = async () => {
while (shouldCall) {
constructor(fn: () => Promise<void>, opts: ThrottleOptions) {
this.opts = opts;
this.fn = fn;
}

public scheduleCall() {
this.shouldCall = true;
if (this.runningPromise) {
return;
}
this.runningPromise = this.doCall();
}

public async waitForPendingCalls(): Promise<void> {
while (this.runningPromise) {
await this.runningPromise;
}
}

public setFn(fn: () => Promise<void>) {
this.fn = fn;
}

private async doCall() {
while (this.shouldCall) {
const start = Date.now();
shouldCall = false;
this.shouldCall = false;

try {
await fn();
await this.fn();
} catch (error) {
opts.logger.error(error);
this.opts.logger.error(error);
}

const timeoutLeft = start + opts.timeoutMs - Date.now();
const timeoutLeft = start + this.opts.timeoutMs - Date.now();
if (timeoutLeft > 0) {
await sleep(timeoutLeft);
}
running = false;
}
};

return () => {
shouldCall = true;
if (running) {
return;
this.runningPromise = undefined;
}
running = true;
void start();
};
}
}

export async function sleep(timeoutMs: number): Promise<void> {
Expand Down
2 changes: 1 addition & 1 deletion ts/@live-compositor/node/src/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export function compositorInstanceLoggerOptions(): {

if ([LoggerLevel.WARN, LoggerLevel.ERROR].includes(loggerLevel)) {
return {
level: loggerLevel,
level: LoggerLevel.ERROR,
format,
};
} else if (loggerLevel === LoggerLevel.INFO) {
Expand Down
6 changes: 5 additions & 1 deletion ts/@live-compositor/node/src/manager/existingInstance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,11 @@ class ExistingInstance implements CompositorManager {
}

public registerEventListener(cb: (event: object) => void): void {
this.wsConnection?.registerEventListener(cb);
this.wsConnection.registerEventListener(cb);
}

public async terminate(): Promise<void> {
await this.wsConnection.close();
}
}

Expand Down
Loading
Loading