From 2a9dcda141eff0fb898b82624aef0d2817780dac Mon Sep 17 00:00:00 2001 From: Paul Berberian Date: Mon, 13 Jan 2025 16:44:25 +0100 Subject: [PATCH 1/4] Remove MediaSourceContentInitializer, add CoreInterface This is a proof-of-concept where I try to put in common the "content initialization" logic for our "multithread" mode and our monothreaded mode. The idea is to replace that mode-specific code to a very thin layer (here called `CoreInterface`) between our "init" code (always running in main thread) and our "core" code (in a WebWorker in "multithread" mode) which would handle both modes: - in multithreaded mode, it would be the part doing `postmessage` calls and `onmessage` registering for thread communication - in monothreaded mode, it would just do the same thing through a very simple EventEmitter-like approach Or written another way: "multithread" and single-threaded mode would now share the same logic beside the communication system used at the frontier between the main-thread and potential worker (respectively `src/main_thread` code and `src/core` code). The end goal is to remove a lot of code, and to reduce the difference between the multithreaded and monothreaded logic, so our tests (integration, manual tests etc.) actually almost test the two in one go. There might be some performance lost due to steps we are now performing unnecessarily when in monothreaded mode (e.g. we serialize the Manifest structure even though it's not needed when a single thread is used, we create another PlaybackObserver on the core even though the main thread one could be re-used etc.). To see if that lead to a visible difference and if it does, it shouldn't be that hard to work-around. --- src/core/main/worker/content_preparer.ts | 34 +- src/core/main/worker/send_message.ts | 28 - src/core/main/worker/utils.ts | 11 + src/core/main/worker/worker_main.ts | 127 +- src/core_interface.ts | 119 ++ .../features/__tests__/local.test.ts | 6 +- .../features/__tests__/metaplaylist.test.ts | 6 +- src/experimental/features/local.ts | 4 +- src/experimental/features/metaplaylist.ts | 4 +- src/features/list/__tests__/dash.test.ts | 6 +- src/features/list/__tests__/smooth.test.ts | 6 +- src/features/list/dash.ts | 4 +- src/features/list/dash_wasm.ts | 4 +- src/features/list/media_source_main.ts | 4 +- src/features/list/smooth.ts | 4 +- src/features/types.ts | 3 +- src/main_thread/api/public_api.ts | 110 +- .../init/media_source_content_initializer.ts | 1367 ----------------- .../init/multi_thread_content_initializer.ts | 178 +-- src/main_thread/init/send_message.ts | 15 - src/multithread_types.ts | 21 +- src/worker_entry_point.ts | 54 +- tests/integration/scenarios/dash_live.test.js | 8 + .../dash_live_SegmentTemplate.test.js | 3 + .../scenarios/dash_multi-track.test.js | 1 + .../integration/scenarios/dash_static.test.js | 1 + .../scenarios/initial_playback.test.js | 2 + .../utils/launch_tests_for_content.js | 1 + 28 files changed, 487 insertions(+), 1644 deletions(-) delete mode 100644 src/core/main/worker/send_message.ts create mode 100644 src/core/main/worker/utils.ts create mode 100644 src/core_interface.ts delete mode 100644 src/main_thread/init/media_source_content_initializer.ts delete mode 100644 src/main_thread/init/send_message.ts diff --git a/src/core/main/worker/content_preparer.ts b/src/core/main/worker/content_preparer.ts index 92a12af54d..55a90b018f 100644 --- a/src/core/main/worker/content_preparer.ts +++ b/src/core/main/worker/content_preparer.ts @@ -8,10 +8,10 @@ import WorkerMediaSourceInterface from "../../../mse/worker_media_source_interfa import type { IAttachMediaSourceWorkerMessagePayload, IContentInitializationData, + IWorkerMessage, } from "../../../multithread_types"; import { WorkerMessageType } from "../../../multithread_types"; import type { IPlayerError } from "../../../public_types"; -import assert from "../../../utils/assert"; import idGenerator from "../../../utils/id_generator"; import objectAssign from "../../../utils/object_assign"; import type { @@ -28,8 +28,8 @@ import SegmentSinksStore from "../../segment_sinks"; import type { INeedsMediaSourceReloadPayload } from "../../stream"; import DecipherabilityFreezeDetector from "../common/DecipherabilityFreezeDetector"; import { limitVideoResolution, throttleVideoBitrate } from "./globals"; -import sendMessage, { formatErrorForSender } from "./send_message"; import TrackChoiceSetter from "./track_choice_setter"; +import { formatErrorForSender } from "./utils"; import WorkerTextDisplayerInterface from "./worker_text_displayer_interface"; const generateMediaSourceId = idGenerator(); @@ -74,6 +74,7 @@ export default class ContentPreparer { } public initializeNewContent( + sendMessage: (msg: IWorkerMessage, transferables?: Transferable[]) => void, context: IContentInitializationData, ): Promise { return new Promise((res, rej) => { @@ -84,19 +85,20 @@ export default class ContentPreparer { currentMediaSourceCanceller.linkToSignal(contentCanceller.signal); - const { contentId, url, hasText, transportOptions } = context; + const { contentId, url, hasText, transport, transportOptions } = context; let manifest: IManifest | null = null; - // TODO better way - assert( - features.transports.dash !== undefined, - "Multithread RxPlayer should have access to the DASH feature", - ); + const transportFn = features.transports[transport]; + if (typeof transportFn !== "function") { + // Stop previous content and reset its state + // XXX TODO: send fatal error + throw new Error(`transport "${transport}" not supported`); + } const representationFilter = typeof transportOptions.representationFilter === "string" ? createRepresentationFilterFromFnString(transportOptions.representationFilter) - : undefined; - const dashPipelines = features.transports.dash({ + : transportOptions.representationFilter; + const transportPipelines = transportFn({ ...transportOptions, representationFilter, }); @@ -105,7 +107,7 @@ export default class ContentPreparer { context.cmcd === undefined ? null : new CmcdDataBuilder(context.cmcd); const manifestFetcher = new ManifestFetcher( url === undefined ? undefined : [url], - dashPipelines, + transportPipelines, { cmcdDataBuilder, ...context.manifestRetryOptions, @@ -130,7 +132,7 @@ export default class ContentPreparer { ); const segmentQueueCreator = new SegmentQueueCreator( - dashPipelines, + transportPipelines, cmcdDataBuilder, context.segmentRetryOptions, contentCanceller.signal, @@ -140,6 +142,7 @@ export default class ContentPreparer { const [mediaSource, segmentSinksStore, workerTextSender] = createMediaSourceAndBuffersStore( + sendMessage, contentId, { hasMseInWorker: this._hasMseInWorker, @@ -255,7 +258,10 @@ export default class ContentPreparer { this._currentContent?.manifestFetcher.scheduleManualRefresh(settings); } - public reloadMediaSource(reloadInfo: INeedsMediaSourceReloadPayload): Promise { + public reloadMediaSource( + sendMessage: (msg: IWorkerMessage, transferables?: Transferable[]) => void, + reloadInfo: INeedsMediaSourceReloadPayload, + ): Promise { this._currentMediaSourceCanceller.cancel(); if (this._currentContent === null) { return Promise.reject(new Error("CP: No content anymore")); @@ -274,6 +280,7 @@ export default class ContentPreparer { const [mediaSource, segmentSinksStore, workerTextSender] = createMediaSourceAndBuffersStore( + sendMessage, this._currentContent.contentId, { hasMseInWorker: this._hasMseInWorker, @@ -380,6 +387,7 @@ export interface IPreparedContentData { * @returns {Array.} */ function createMediaSourceAndBuffersStore( + sendMessage: (msg: IWorkerMessage, transferables?: Transferable[]) => void, contentId: string, capabilities: { hasMseInWorker: boolean; diff --git a/src/core/main/worker/send_message.ts b/src/core/main/worker/send_message.ts deleted file mode 100644 index df1d969c35..0000000000 --- a/src/core/main/worker/send_message.ts +++ /dev/null @@ -1,28 +0,0 @@ -import { formatError } from "../../../errors"; -import log from "../../../log"; -import type { ISentError, IWorkerMessage } from "../../../multithread_types"; - -export default function sendMessage( - msg: IWorkerMessage, - transferables?: Transferable[], -): void { - log.debug("<--- Sending to Main:", msg.type); - if (transferables === undefined) { - postMessage(msg); - } else { - // TypeScript made a mistake here, and 2busy2fix - (postMessage as (msg: IWorkerMessage, transferables: Transferable[]) => void)( - msg, - transferables, - ); - } -} - -export function formatErrorForSender(error: unknown): ISentError { - const formattedError = formatError(error, { - defaultCode: "NONE", - defaultReason: "An unknown error stopped content playback.", - }); - - return formattedError.serialize(); -} diff --git a/src/core/main/worker/utils.ts b/src/core/main/worker/utils.ts new file mode 100644 index 0000000000..f0d4ea2ef9 --- /dev/null +++ b/src/core/main/worker/utils.ts @@ -0,0 +1,11 @@ +import { formatError } from "../../../errors"; +import type { ISentError } from "../../../multithread_types"; + +export function formatErrorForSender(error: unknown): ISentError { + const formattedError = formatError(error, { + defaultCode: "NONE", + defaultReason: "An unknown error stopped content playback.", + }); + + return formattedError.serialize(); +} diff --git a/src/core/main/worker/worker_main.ts b/src/core/main/worker/worker_main.ts index 75003dc352..4dc245cd3a 100644 --- a/src/core/main/worker/worker_main.ts +++ b/src/core/main/worker/worker_main.ts @@ -8,18 +8,15 @@ import type { IDiscontinuityUpdateWorkerMessagePayload, IMainThreadMessage, IReferenceUpdateMessage, + IWorkerMessage, } from "../../../multithread_types"; import { MainThreadMessageType, WorkerMessageType } from "../../../multithread_types"; -import DashFastJsParser from "../../../parsers/manifest/dash/fast-js-parser"; -import DashWasmParser from "../../../parsers/manifest/dash/wasm-parser"; import { ObservationPosition } from "../../../playback_observer"; import type { IWorkerPlaybackObservation } from "../../../playback_observer/worker_playback_observer"; import WorkerPlaybackObserver from "../../../playback_observer/worker_playback_observer"; import type { IPlayerError, ITrackType } from "../../../public_types"; -import createDashPipelines from "../../../transports/dash"; import arrayFind from "../../../utils/array_find"; import assert, { assertUnreachable } from "../../../utils/assert"; -import globalScope from "../../../utils/global_scope"; import type { ILogFormat, ILoggerLevel } from "../../../utils/logger"; import { scaleTimestamp } from "../../../utils/monotonic_timestamp"; import objectAssign from "../../../utils/object_assign"; @@ -33,20 +30,34 @@ import type { IStreamStatusPayload, } from "../../stream"; import StreamOrchestrator from "../../stream"; +import type { IResolutionInfo } from "../../types"; import createContentTimeBoundariesObserver from "../common/create_content_time_boundaries_observer"; import getBufferedDataPerMediaBuffer from "../common/get_buffered_data_per_media_buffer"; import ContentPreparer from "./content_preparer"; -import { - limitVideoResolution, - maxBufferAhead, - maxBufferBehind, - maxVideoBufferSize, - throttleVideoBitrate, - wantedBufferAhead, -} from "./globals"; -import sendMessage, { formatErrorForSender } from "./send_message"; - -export default function initializeWorkerMain() { +import { formatErrorForSender } from "./utils"; + +export type IMessageReceiverCallback = (evt: { data: IMainThreadMessage }) => void; + +/** + * Initialize a `WorkerMain`, which is the part of the RxPlayer acting as an + * entry point to all its "core" code. + * + * Its role is to receive and react to messages coming from "main thead", which + * may include loading and playing a content, and to send back messages for the main + * thread. + * @param {Function} setMessageReceiver - Declares the function that will + * receive messages coming from the "main thread" part of the RxPlayer logic. + * @param {Function} sendMessage - Function allowing to send messages to the + * "main thread" part of the RxPlayer logic. + * @param {Object} refs - Collection of so-called "references": values + * configuring playback that may be updated at any time and that the WorkerMain + * should react on. + */ +export default function initializeWorkerMain( + setMessageReceiver: (cb: IMessageReceiverCallback) => void, + sendMessage: (msg: IWorkerMessage, transferables?: Transferable[]) => void, + refs: ICoreReferences, +): void { /** * `true` once the worker has been initialized. * Allow to enforce the fact that it is only initialized once. @@ -69,22 +80,11 @@ export default function initializeWorkerMain() { */ let currentLoadedContentTaskCanceller: TaskCanceller | null = null; - // Initialize Manually a `DashWasmParser` and add the feature. - // TODO allow worker-side feature-switching? Not sure how - const dashWasmParser = new DashWasmParser(); - features.dashParsers.wasm = dashWasmParser; - features.dashParsers.fastJs = DashFastJsParser; - features.transports.dash = createDashPipelines; - /** * When set, emit playback observation made on the main thread. */ let playbackObservationRef: SharedReference | null = null; - - globalScope.onmessageerror = (_msg: MessageEvent) => { - log.error("MTCI: Error when receiving message from main thread."); - }; - onmessage = function (e: MessageEvent) { + setMessageReceiver((e) => { log.debug("Worker: received message", e.data.type); const msg = e.data; @@ -98,7 +98,12 @@ export default function initializeWorkerMain() { msg.value.logFormat, msg.value.sendBackLogs, ); - if (msg.value.dashWasmUrl !== undefined && dashWasmParser.isCompatible()) { + const dashWasmParser = features.dashParsers.wasm; + if ( + dashWasmParser !== null && + msg.value.dashWasmUrl !== undefined && + dashWasmParser.isCompatible() + ) { dashWasmParser.initialize({ wasmUrl: msg.value.dashWasmUrl }).catch((err) => { const error = err instanceof Error ? err.toString() : "Unknown Error"; log.error("Worker: Could not initialize DASH_WASM parser", error); @@ -125,7 +130,7 @@ export default function initializeWorkerMain() { break; case MainThreadMessageType.PrepareContent: - prepareNewContent(contentPreparer, msg.value); + prepareNewContent(sendMessage, contentPreparer, msg.value); break; case MainThreadMessageType.StartPreparedContent: { @@ -151,9 +156,11 @@ export default function initializeWorkerMain() { currentContentObservationRef.finish(); }); loadOrReloadPreparedContent( + sendMessage, msg.value, contentPreparer, currentContentObservationRef, + refs, currentCanceller.signal, ); break; @@ -185,7 +192,7 @@ export default function initializeWorkerMain() { } case MainThreadMessageType.ReferenceUpdate: - updateGlobalReference(msg); + updateCoreReference(msg, refs); break; case MainThreadMessageType.StopContent: @@ -401,7 +408,7 @@ export default function initializeWorkerMain() { } case MainThreadMessageType.PullSegmentSinkStoreInfos: { - sendSegmentSinksStoreInfos(contentPreparer, msg.value.messageId); + sendSegmentSinksStoreInfos(sendMessage, contentPreparer, msg.value.messageId); break; } @@ -413,14 +420,27 @@ export default function initializeWorkerMain() { default: assertUnreachable(msg); } - }; + }); } +/** + * Performs steps needed to prepare a future content to be played: + * - Load its Manifest file + * - Create MSE `MediaSource` for that content. + * - Initialize all modules that will follow that content + * - etc. + * @param {Function} sendMessage - Function allowing to send messages to the + * "main thread" part of the RxPlayer logic. + * @param {ContentPreparer} contentPreparer + * @param {Object} contentInitData - Configuration wanted for the content to + * load. + */ function prepareNewContent( + sendMessage: (msg: IWorkerMessage, transferables?: Transferable[]) => void, contentPreparer: ContentPreparer, contentInitData: IContentInitializationData, ): void { - contentPreparer.initializeNewContent(contentInitData).then( + contentPreparer.initializeNewContent(sendMessage, contentInitData).then( (manifest) => { sendMessage({ type: WorkerMessageType.ManifestReady, @@ -438,25 +458,25 @@ function prepareNewContent( ); } -function updateGlobalReference(msg: IReferenceUpdateMessage): void { +function updateCoreReference(msg: IReferenceUpdateMessage, refs: ICoreReferences): void { switch (msg.value.name) { case "wantedBufferAhead": - wantedBufferAhead.setValueIfChanged(msg.value.newVal); + refs.wantedBufferAhead.setValueIfChanged(msg.value.newVal); break; case "maxVideoBufferSize": - maxVideoBufferSize.setValueIfChanged(msg.value.newVal); + refs.maxVideoBufferSize.setValueIfChanged(msg.value.newVal); break; case "maxBufferBehind": - maxBufferBehind.setValueIfChanged(msg.value.newVal); + refs.maxBufferBehind.setValueIfChanged(msg.value.newVal); break; case "maxBufferAhead": - maxBufferAhead.setValueIfChanged(msg.value.newVal); + refs.maxBufferAhead.setValueIfChanged(msg.value.newVal); break; case "limitVideoResolution": - limitVideoResolution.setValueIfChanged(msg.value.newVal); + refs.limitVideoResolution.setValueIfChanged(msg.value.newVal); break; case "throttleVideoBitrate": - throttleVideoBitrate.setValueIfChanged(msg.value.newVal); + refs.throttleVideoBitrate.setValueIfChanged(msg.value.newVal); break; default: assertUnreachable(msg.value); @@ -482,9 +502,11 @@ interface IBufferingInitializationInformation { } function loadOrReloadPreparedContent( + sendMessage: (msg: IWorkerMessage, transferables?: Transferable[]) => void, val: IBufferingInitializationInformation, contentPreparer: ContentPreparer, playbackObservationRef: IReadOnlySharedReference, + refs: ICoreReferences, parentCancelSignal: CancellationSignal, ) { const currentLoadCanceller = new TaskCanceller(); @@ -600,10 +622,10 @@ function loadOrReloadPreparedContent( segmentSinksStore, segmentQueueCreator, { - wantedBufferAhead, - maxVideoBufferSize, - maxBufferAhead, - maxBufferBehind, + wantedBufferAhead: refs.wantedBufferAhead, + maxVideoBufferSize: refs.maxVideoBufferSize, + maxBufferAhead: refs.maxBufferAhead, + maxBufferBehind: refs.maxBufferBehind, drmSystemId, enableFastSwitching, onCodecSwitch, @@ -881,9 +903,10 @@ function loadOrReloadPreparedContent( if (currentLoadCanceller !== null) { currentLoadCanceller.cancel(); } - contentPreparer.reloadMediaSource(payload).then( + contentPreparer.reloadMediaSource(sendMessage, payload).then( () => { loadOrReloadPreparedContent( + sendMessage, { initialTime: newInitialTime, drmSystemId: val.drmSystemId, @@ -892,6 +915,7 @@ function loadOrReloadPreparedContent( }, contentPreparer, playbackObservationRef, + refs, parentCancelSignal, ); }, @@ -942,10 +966,12 @@ function updateLoggerLevel( /** * Send a message `SegmentSinkStoreUpdate` to the main thread with * a serialized object that represents the segmentSinksStore state. + * @param {Function} sendMessage - Function allowing to send messages to the + * "main thread" part of the RxPlayer logic. * @param {ContentPreparer} contentPreparer - * @returns {void} */ function sendSegmentSinksStoreInfos( + sendMessage: (msg: IWorkerMessage, transferables?: Transferable[]) => void, contentPreparer: ContentPreparer, messageId: number, ): void { @@ -960,3 +986,12 @@ function sendSegmentSinksStoreInfos( value: { segmentSinkMetrics: segmentSinksMetrics, messageId }, }); } + +export interface ICoreReferences { + limitVideoResolution: SharedReference; + maxBufferAhead: SharedReference; + maxBufferBehind: SharedReference; + maxVideoBufferSize: SharedReference; + throttleVideoBitrate: SharedReference; + wantedBufferAhead: SharedReference; +} diff --git a/src/core_interface.ts b/src/core_interface.ts new file mode 100644 index 0000000000..8d16ca883b --- /dev/null +++ b/src/core_interface.ts @@ -0,0 +1,119 @@ +import type { IMessageReceiverCallback } from "./core/main/worker/worker_main"; +import log from "./log"; +import type { IMainThreadMessage, IWorkerMessage } from "./multithread_types"; +import noop from "./utils/noop"; + +export default abstract class CoreInterface { + protected listeners: Array<(evt: IWorkerMessage) => void> = []; + protected listenersError: Array<() => void> = []; + + public abstract sendMessage(msg: IMainThreadMessage): void; + + public addMessageListener(cb: (evt: IWorkerMessage) => void): void { + this.listeners.push(cb); + } + + public removeMessageListener(cb: (evt: IWorkerMessage) => void): void { + const index = this.listeners.indexOf(cb); + if (index >= 0) { + this.listeners.splice(index, 1); + } + } + + public addErrorListener(cb: () => void): void { + this.listenersError.push(cb); + } + + public removeErrorListener(cb: () => void): void { + const index = this.listenersError.indexOf(cb); + if (index >= 0) { + this.listenersError.splice(index, 1); + } + } + + public dispose(): void { + this.listeners.length = 0; + this.listenersError.length = 0; + } +} + +export class MonoThreadCoreInterface extends CoreInterface { + private _currentCoreListener: IMessageReceiverCallback; + + constructor() { + super(); + this._currentCoreListener = noop; + } + + public sendMessage(msg: IMainThreadMessage) { + log.debug("---> Sending to Core:", msg.type); + queueMicrotask(() => { + // NOTE: We don't clone for performance reasons + this._currentCoreListener({ data: msg }); + }); + } + + public getCallbacks(): { + setCoreMessageReceiver: (handler: IMessageReceiverCallback) => void; + sendCoreMessage: (msg: IWorkerMessage, transferables?: Transferable[]) => void; + } { + const setCoreMessageReceiver = (handler: IMessageReceiverCallback): void => { + this._currentCoreListener = handler; + }; + const sendCoreMessage = (msg: IWorkerMessage, _transferables?: Transferable[]) => { + queueMicrotask(() => { + log.debug("<--- Receiving from Core:", msg.type); + this.listeners.forEach((listener) => { + listener(msg); + }); + }); + }; + return { setCoreMessageReceiver, sendCoreMessage }; + } +} + +/** + * `CoreInterface` implementation for when the core will run in a WebWorker. + */ +export class WorkerCoreInterface extends CoreInterface { + private _worker: Worker; + + /** + * Initialize a `WorkerCoreInterface` for the given `WebWorker` instance. + * + * The `addMessageListener` and `addMessageListener` will then register + * listeners respectively for the `onmessage` and `onmessageerror` events + * from this `WebWorker`. + * The `sendMessage` method will allow to send messages to the `WebWorker`. + * @param {Worker} worker + */ + constructor(worker: Worker) { + super(); + this._worker = worker; + this._worker.onmessageerror = () => { + this.listenersError.forEach((listener) => { + listener(); + }); + }; + this._worker.onmessage = (evt) => { + this.listeners.forEach((listener) => { + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument + listener(evt.data); + }); + }; + } + + /** + * Send given message to the `WebWorker`. + * @param {Object} msg + * @param {Array.} [transferables] + */ + public sendMessage(msg: IMainThreadMessage, transferables?: Transferable[]): void { + log.debug("---> Sending to Worker:", msg.type); + if (transferables === undefined) { + this._worker.postMessage(msg); + } else { + this._worker.postMessage(msg, transferables); + } + } +} diff --git a/src/experimental/features/__tests__/local.test.ts b/src/experimental/features/__tests__/local.test.ts index c1a4901a44..da452d767d 100644 --- a/src/experimental/features/__tests__/local.test.ts +++ b/src/experimental/features/__tests__/local.test.ts @@ -1,6 +1,6 @@ import { describe, it, expect } from "vitest"; import type { IFeaturesObject } from "../../../features/types"; -import MediaSourceContentInitializer from "../../../main_thread/init/media_source_content_initializer"; +import MultiThreadContentInitializer from "../../../main_thread/init/multi_thread_content_initializer"; import local from "../../../transports/local"; import addLocalManifestFeature from "../local"; @@ -10,9 +10,9 @@ describe("Features list - LOCAL_MANIFEST", () => { addLocalManifestFeature(featureObject); expect(featureObject).toEqual({ transports: { local }, - mainThreadMediaSourceInit: MediaSourceContentInitializer, + mainThreadMediaSourceInit: MultiThreadContentInitializer, }); expect(featureObject.transports.local).toBe(local); - expect(featureObject.mainThreadMediaSourceInit).toBe(MediaSourceContentInitializer); + expect(featureObject.mainThreadMediaSourceInit).toBe(MultiThreadContentInitializer); }); }); diff --git a/src/experimental/features/__tests__/metaplaylist.test.ts b/src/experimental/features/__tests__/metaplaylist.test.ts index e6421be305..3ac6f131f9 100644 --- a/src/experimental/features/__tests__/metaplaylist.test.ts +++ b/src/experimental/features/__tests__/metaplaylist.test.ts @@ -1,6 +1,6 @@ import { describe, it, expect } from "vitest"; import type { IFeaturesObject } from "../../../features/types"; -import MediaSourceContentInitializer from "../../../main_thread/init/media_source_content_initializer"; +import MultiThreadContentInitializer from "../../../main_thread/init/multi_thread_content_initializer"; import metaplaylist from "../../../transports/metaplaylist"; import addLocalManifestFeature from "../metaplaylist"; @@ -10,9 +10,9 @@ describe("Features list - METAPLAYLIST", () => { addLocalManifestFeature(featureObject); expect(featureObject).toEqual({ transports: { metaplaylist }, - mainThreadMediaSourceInit: MediaSourceContentInitializer, + mainThreadMediaSourceInit: MultiThreadContentInitializer, }); expect(featureObject.transports.metaplaylist).toBe(metaplaylist); - expect(featureObject.mainThreadMediaSourceInit).toBe(MediaSourceContentInitializer); + expect(featureObject.mainThreadMediaSourceInit).toBe(MultiThreadContentInitializer); }); }); diff --git a/src/experimental/features/local.ts b/src/experimental/features/local.ts index 8a1e393d8c..e6b49e2684 100644 --- a/src/experimental/features/local.ts +++ b/src/experimental/features/local.ts @@ -15,12 +15,12 @@ */ import type { IFeaturesObject } from "../../features/types"; -import MediaSourceContentInitializer from "../../main_thread/init/media_source_content_initializer"; +import MultiThreadContentInitializer from "../../main_thread/init/multi_thread_content_initializer"; import local from "../../transports/local"; function addLocalManifestFeature(features: IFeaturesObject): void { features.transports.local = local; - features.mainThreadMediaSourceInit = MediaSourceContentInitializer; + features.mainThreadMediaSourceInit = MultiThreadContentInitializer; } export { addLocalManifestFeature as LOCAL_MANIFEST }; diff --git a/src/experimental/features/metaplaylist.ts b/src/experimental/features/metaplaylist.ts index 1f062bd30f..b3f8781aab 100644 --- a/src/experimental/features/metaplaylist.ts +++ b/src/experimental/features/metaplaylist.ts @@ -15,12 +15,12 @@ */ import type { IFeaturesObject } from "../../features/types"; -import MediaSourceContentInitializer from "../../main_thread/init/media_source_content_initializer"; +import MultiThreadContentInitializer from "../../main_thread/init/multi_thread_content_initializer"; import metaplaylist from "../../transports/metaplaylist"; function addMetaPlaylistFeature(features: IFeaturesObject): void { features.transports.metaplaylist = metaplaylist; - features.mainThreadMediaSourceInit = MediaSourceContentInitializer; + features.mainThreadMediaSourceInit = MultiThreadContentInitializer; } export { addMetaPlaylistFeature as METAPLAYLIST }; diff --git a/src/features/list/__tests__/dash.test.ts b/src/features/list/__tests__/dash.test.ts index 8f996ffc72..700e09be45 100644 --- a/src/features/list/__tests__/dash.test.ts +++ b/src/features/list/__tests__/dash.test.ts @@ -1,5 +1,5 @@ import { describe, it, expect } from "vitest"; -import MediaSourceContentInitializer from "../../../main_thread/init/media_source_content_initializer"; +import MultiThreadContentInitializer from "../../../main_thread/init/multi_thread_content_initializer"; import nativeDashParser from "../../../parsers/manifest/dash/native-parser"; import DASHFeature from "../../../transports/dash"; import type { IFeaturesObject } from "../../types"; @@ -16,9 +16,9 @@ describe("Features list - DASH", () => { expect(featureObject).toEqual({ transports: { dash: DASHFeature }, dashParsers: { native: nativeDashParser, fastJs: null, wasm: null }, - mainThreadMediaSourceInit: MediaSourceContentInitializer, + mainThreadMediaSourceInit: MultiThreadContentInitializer, }); expect(featureObject.transports.dash).toBe(DASHFeature); - expect(featureObject.mainThreadMediaSourceInit).toBe(MediaSourceContentInitializer); + expect(featureObject.mainThreadMediaSourceInit).toBe(MultiThreadContentInitializer); }); }); diff --git a/src/features/list/__tests__/smooth.test.ts b/src/features/list/__tests__/smooth.test.ts index 586bacc9e3..8bf5b4a7d0 100644 --- a/src/features/list/__tests__/smooth.test.ts +++ b/src/features/list/__tests__/smooth.test.ts @@ -1,5 +1,5 @@ import { describe, it, expect } from "vitest"; -import MediaSourceContentInitializer from "../../../main_thread/init/media_source_content_initializer"; +import MultiThreadContentInitializer from "../../../main_thread/init/multi_thread_content_initializer"; import SmoothFeature from "../../../transports/smooth"; import type { IFeaturesObject } from "../../types"; import addSmoothFeature from "../smooth"; @@ -10,9 +10,9 @@ describe("Features list - Smooth", () => { addSmoothFeature(featureObject); expect(featureObject).toEqual({ transports: { smooth: SmoothFeature }, - mainThreadMediaSourceInit: MediaSourceContentInitializer, + mainThreadMediaSourceInit: MultiThreadContentInitializer, }); expect(featureObject.transports.smooth).toBe(SmoothFeature); - expect(featureObject.mainThreadMediaSourceInit).toBe(MediaSourceContentInitializer); + expect(featureObject.mainThreadMediaSourceInit).toBe(MultiThreadContentInitializer); }); }); diff --git a/src/features/list/dash.ts b/src/features/list/dash.ts index 4f1f70b338..d02e6d7cb1 100644 --- a/src/features/list/dash.ts +++ b/src/features/list/dash.ts @@ -14,7 +14,7 @@ * limitations under the License. */ -import MediaSourceContentInitializer from "../../main_thread/init/media_source_content_initializer"; +import MultiThreadContentInitializer from "../../main_thread/init/multi_thread_content_initializer"; import dashJsParser from "../../parsers/manifest/dash/native-parser"; import dash from "../../transports/dash"; import type { IFeaturesObject } from "../types"; @@ -28,7 +28,7 @@ function addDASHFeature(features: IFeaturesObject): void { features.transports.dash = dash; } features.dashParsers.native = dashJsParser; - features.mainThreadMediaSourceInit = MediaSourceContentInitializer; + features.mainThreadMediaSourceInit = MultiThreadContentInitializer; } export { addDASHFeature as DASH }; diff --git a/src/features/list/dash_wasm.ts b/src/features/list/dash_wasm.ts index 0dc8ae348b..9bd6821aec 100644 --- a/src/features/list/dash_wasm.ts +++ b/src/features/list/dash_wasm.ts @@ -15,7 +15,7 @@ */ import type { IFeaturesObject } from "../../features/types"; -import MediaSourceContentInitializer from "../../main_thread/init/media_source_content_initializer"; +import MultiThreadContentInitializer from "../../main_thread/init/multi_thread_content_initializer"; import type { IDashWasmParserOptions } from "../../parsers/manifest/dash/wasm-parser"; import DashWasmParser from "../../parsers/manifest/dash/wasm-parser"; import dash from "../../transports/dash"; @@ -27,7 +27,7 @@ const dashWasmFeature = { features.transports.dash = dash; } features.dashParsers.wasm = dashWasmParser; - features.mainThreadMediaSourceInit = MediaSourceContentInitializer; + features.mainThreadMediaSourceInit = MultiThreadContentInitializer; }, initialize(opts: IDashWasmParserOptions): Promise { diff --git a/src/features/list/media_source_main.ts b/src/features/list/media_source_main.ts index fc509a3d09..0245c4bf52 100644 --- a/src/features/list/media_source_main.ts +++ b/src/features/list/media_source_main.ts @@ -1,4 +1,4 @@ -import MediaSourceContentInitializer from "../../main_thread/init/media_source_content_initializer"; +import MultiThreadContentInitializer from "../../main_thread/init/multi_thread_content_initializer"; import type { IFeaturesObject } from "../types"; /** @@ -6,7 +6,7 @@ import type { IFeaturesObject } from "../types"; * @param {Object} features */ function addMediaSourceMainFeature(features: IFeaturesObject): void { - features.mainThreadMediaSourceInit = MediaSourceContentInitializer; + features.mainThreadMediaSourceInit = MultiThreadContentInitializer; } export { addMediaSourceMainFeature as MEDIA_SOURCE_MAIN }; diff --git a/src/features/list/smooth.ts b/src/features/list/smooth.ts index 23588e691d..ba5a847d78 100644 --- a/src/features/list/smooth.ts +++ b/src/features/list/smooth.ts @@ -14,7 +14,7 @@ * limitations under the License. */ -import MediaSourceContentInitializer from "../../main_thread/init/media_source_content_initializer"; +import MultiThreadContentInitializer from "../../main_thread/init/multi_thread_content_initializer"; import smooth from "../../transports/smooth"; import type { IFeaturesObject } from "../types"; @@ -26,7 +26,7 @@ function addSmoothFeature(features: IFeaturesObject): void { if (features.transports.smooth === undefined) { features.transports.smooth = smooth; } - features.mainThreadMediaSourceInit = MediaSourceContentInitializer; + features.mainThreadMediaSourceInit = MultiThreadContentInitializer; } export { addSmoothFeature as SMOOTH }; diff --git a/src/features/types.ts b/src/features/types.ts index 26fcacafe1..91405a65a6 100644 --- a/src/features/types.ts +++ b/src/features/types.ts @@ -18,7 +18,6 @@ import type { IMediaElement } from "../compat/browser_compatibility_types"; import type { SegmentSink } from "../core/segment_sinks"; import type ContentDecryptor from "../main_thread/decrypt"; import type DirectFileContentInitializer from "../main_thread/init/directfile_content_initializer"; -import type MediaSourceContentInitializer from "../main_thread/init/media_source_content_initializer"; import type MultiThreadContentInitializer from "../main_thread/init/multi_thread_content_initializer"; import type HTMLTextDisplayer from "../main_thread/text_displayer/html"; import type NativeTextDisplayer from "../main_thread/text_displayer/native/native_text_displayer"; @@ -117,7 +116,7 @@ export interface IFeaturesObject { * Feature allowing to load contents through MediaSource API through the * main thread. */ - mainThreadMediaSourceInit: typeof MediaSourceContentInitializer | null; + mainThreadMediaSourceInit: typeof MultiThreadContentInitializer | null; /** * Features allowing to load contents through MediaSource API through * a WebWorker. diff --git a/src/main_thread/api/public_api.ts b/src/main_thread/api/public_api.ts index 00cc94bc86..ef0f5495c4 100644 --- a/src/main_thread/api/public_api.ts +++ b/src/main_thread/api/public_api.ts @@ -33,13 +33,16 @@ import hasMseInWorker from "../../compat/has_mse_in_worker"; import hasWorkerApi from "../../compat/has_worker_api"; import isDebugModeEnabled from "../../compat/is_debug_mode_enabled"; import config from "../../config"; +import initializeWorkerMain from "../../core/main/worker"; import type { ISegmentSinkMetrics } from "../../core/segment_sinks/segment_sinks_store"; import type { IAdaptationChoice, IInbandEvent, IABRThrottlers, IBufferType, + IResolutionInfo, } from "../../core/types"; +import { MonoThreadCoreInterface, WorkerCoreInterface } from "../../core_interface"; import type { IDefaultConfig } from "../../default_config"; import type { IErrorCode, IErrorType } from "../../errors"; import { ErrorCodes, ErrorTypes, formatError, MediaError } from "../../errors"; @@ -61,7 +64,6 @@ import { getMaximumSafePosition, getMinimumSafePosition, ManifestMetadataFormat, - createRepresentationFilterFromFnString, } from "../../manifest"; import type { IWorkerMessage } from "../../multithread_types"; import { MainThreadMessageType, WorkerMessageType } from "../../multithread_types"; @@ -821,6 +823,10 @@ class Player extends EventEmitter { limitResolution: {}, }; + let throttleVideoBitrate: IReadOnlySharedReference = new SharedReference( + Infinity, + ); + if (this._priv_throttleVideoBitrateWhenHidden) { if (!relyOnVideoVisibilityAndSize) { log.warn( @@ -828,15 +834,16 @@ class Player extends EventEmitter { "browser can't be trusted for visibility.", ); } else { - throttlers.throttleBitrate = { - video: createMappedReference( - getVideoVisibilityRef( - this._priv_pictureInPictureRef, - currentContentCanceller.signal, - ), - (isActive) => (isActive ? Infinity : 0), + throttleVideoBitrate = createMappedReference( + getVideoVisibilityRef( + this._priv_pictureInPictureRef, currentContentCanceller.signal, ), + (isActive) => (isActive ? Infinity : 0), + currentContentCanceller.signal, + ); + throttlers.throttleBitrate = { + video: throttleVideoBitrate, }; } } @@ -889,6 +896,19 @@ class Player extends EventEmitter { connectionTimeout: requestConfig.segment?.connectionTimeout, }; + const transportOptions = { + lowLatencyMode, + checkMediaSegmentIntegrity, + checkManifestIntegrity, + referenceDateTime, + serverSyncInfos, + manifestLoader, + segmentLoader, + representationFilter: options.representationFilter, + __priv_manifestUpdateUrl, + __priv_patchLastSegmentInSidx, + }; + const canRunInMultiThread = features.multithread !== null && this._priv_worker !== null && @@ -904,40 +924,49 @@ class Player extends EventEmitter { "`MEDIA_SOURCE_MAIN` feature", ); } - const transportFn = features.transports[transport]; - if (typeof transportFn !== "function") { - // Stop previous content and reset its state - this.stop(); - this._priv_currentError = null; - throw new Error(`transport "${transport}" not supported`); - } - - const representationFilter = - typeof options.representationFilter === "string" - ? createRepresentationFilterFromFnString(options.representationFilter) - : options.representationFilter; - log.info("API: Initializing MediaSource mode in the main thread"); - const transportPipelines = transportFn({ - lowLatencyMode, - checkMediaSegmentIntegrity, - checkManifestIntegrity, - manifestLoader, - referenceDateTime, - representationFilter, - segmentLoader, - serverSyncInfos, - __priv_manifestUpdateUrl, - __priv_patchLastSegmentInSidx, + const coreInterface = new MonoThreadCoreInterface(); + const coreInterfaceCallbacks = coreInterface.getCallbacks(); + initializeWorkerMain( + coreInterfaceCallbacks.setCoreMessageReceiver, + coreInterfaceCallbacks.sendCoreMessage, + { + // XXX TODO: + limitVideoResolution: new SharedReference({ + height: undefined, + width: undefined, + pixelRatio: 1, + }), + maxBufferAhead: bufferOptions.maxBufferAhead, + maxBufferBehind: bufferOptions.maxBufferBehind, + maxVideoBufferSize: bufferOptions.maxVideoBufferSize, + throttleVideoBitrate: new SharedReference(Infinity), + wantedBufferAhead: bufferOptions.wantedBufferAhead, + }, + ); + coreInterface.sendMessage({ + type: MainThreadMessageType.Init, + value: { + dashWasmUrl: undefined, + hasVideo: this.videoElement?.nodeName.toLowerCase() === "video", + hasMseInWorker: false, + logLevel: log.getLevel(), + logFormat: log.getFormat(), + sendBackLogs: false, + date: Date.now(), + timestamp: getMonotonicTimeStamp(), + }, }); initializer = new features.mainThreadMediaSourceInit({ + coreInterface, adaptiveOptions, autoPlay, bufferOptions, cmcd, keySystems, lowLatencyMode, - transport: transportPipelines, + transport, + transportOptions, manifestRequestSettings, segmentRequestOptions, speed: this._priv_speed, @@ -960,32 +989,21 @@ class Player extends EventEmitter { assert(typeof options.representationFilter !== "function"); useWorker = true; log.info("API: Initializing MediaSource mode in a WebWorker"); - const transportOptions = { - lowLatencyMode, - checkMediaSegmentIntegrity, - checkManifestIntegrity, - referenceDateTime, - serverSyncInfos, - manifestLoader: undefined, - segmentLoader: undefined, - representationFilter: options.representationFilter, - __priv_manifestUpdateUrl, - __priv_patchLastSegmentInSidx, - }; initializer = new features.multithread.init({ + coreInterface: new WorkerCoreInterface(this._priv_worker), adaptiveOptions, autoPlay, bufferOptions, cmcd, keySystems, lowLatencyMode, + transport, transportOptions, manifestRequestSettings, segmentRequestOptions, speed: this._priv_speed, startAt, textTrackOptions, - worker: this._priv_worker, url, }); } diff --git a/src/main_thread/init/media_source_content_initializer.ts b/src/main_thread/init/media_source_content_initializer.ts deleted file mode 100644 index 81767de45c..0000000000 --- a/src/main_thread/init/media_source_content_initializer.ts +++ /dev/null @@ -1,1367 +0,0 @@ -/** - * Copyright 2015 CANAL+ Group - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import type { IMediaElement } from "../../compat/browser_compatibility_types"; -import isCodecSupported from "../../compat/is_codec_supported"; -import mayMediaElementFailOnUndecipherableData from "../../compat/may_media_element_fail_on_undecipherable_data"; -import shouldReloadMediaSourceOnDecipherabilityUpdate from "../../compat/should_reload_media_source_on_decipherability_update"; -import config from "../../config"; -import type { - IAdaptiveRepresentationSelectorArguments, - IRepresentationEstimator, -} from "../../core/adaptive"; -import AdaptiveRepresentationSelector from "../../core/adaptive"; -import CmcdDataBuilder from "../../core/cmcd"; -import { ManifestFetcher, SegmentQueueCreator } from "../../core/fetchers"; -import createContentTimeBoundariesObserver from "../../core/main/common/create_content_time_boundaries_observer"; -import DecipherabilityFreezeDetector from "../../core/main/common/DecipherabilityFreezeDetector"; -import SegmentSinksStore from "../../core/segment_sinks"; -import type { - IStreamOrchestratorOptions, - IStreamOrchestratorCallbacks, - INeedsBufferFlushPayload, -} from "../../core/stream"; -import StreamOrchestrator from "../../core/stream"; -import type { ITextDisplayerInterface } from "../../core/types"; -import type { EncryptedMediaError } from "../../errors"; -import { MediaError } from "../../errors"; -import features from "../../features"; -import log from "../../log"; -import type { IManifest, IPeriodMetadata, ICodecSupportInfo } from "../../manifest"; -import type MainMediaSourceInterface from "../../mse/main_media_source_interface"; -import type { IMediaElementPlaybackObserver } from "../../playback_observer"; -import type { - ICmcdOptions, - IInitialManifest, - IKeySystemOption, - IPlayerError, -} from "../../public_types"; -import type { ITransportPipelines } from "../../transports"; -import areArraysOfNumbersEqual from "../../utils/are_arrays_of_numbers_equal"; -import assert from "../../utils/assert"; -import createCancellablePromise from "../../utils/create_cancellable_promise"; -import isNullOrUndefined from "../../utils/is_null_or_undefined"; -import noop from "../../utils/noop"; -import objectAssign from "../../utils/object_assign"; -import type { IReadOnlySharedReference } from "../../utils/reference"; -import type { ISyncOrAsyncValue } from "../../utils/sync_or_async"; -import SyncOrAsync from "../../utils/sync_or_async"; -import type { CancellationSignal } from "../../utils/task_canceller"; -import TaskCanceller from "../../utils/task_canceller"; -import { ContentDecryptorState, getKeySystemConfiguration } from "../decrypt"; -import type { IProcessedProtectionData } from "../decrypt"; -import type ContentDecryptor from "../decrypt"; -import type { ITextDisplayer } from "../text_displayer"; -import type { ITextDisplayerOptions } from "./types"; -import { ContentInitializer } from "./types"; -import createCorePlaybackObserver from "./utils/create_core_playback_observer"; -import createMediaSource from "./utils/create_media_source"; -import type { IInitialTimeOptions } from "./utils/get_initial_time"; -import getInitialTime from "./utils/get_initial_time"; -import getLoadedReference from "./utils/get_loaded_reference"; -import performInitialSeekAndPlay from "./utils/initial_seek_and_play"; -import initializeContentDecryption from "./utils/initialize_content_decryption"; -import MainThreadTextDisplayerInterface from "./utils/main_thread_text_displayer_interface"; -import RebufferingController from "./utils/rebuffering_controller"; -import StreamEventsEmitter from "./utils/stream_events_emitter"; -import listenToMediaError from "./utils/throw_on_media_error"; - -/** - * Allows to load a new content thanks to the MediaSource Extensions (a.k.a. MSE) - * Web APIs. - * - * Through this `ContentInitializer`, a Manifest will be fetched (and depending - * on the situation, refreshed), a `MediaSource` instance will be linked to the - * wanted `HTMLMediaElement` and chunks of media data, called segments, will be - * pushed on buffers associated to this `MediaSource` instance. - * - * @class MediaSourceContentInitializer - */ -export default class MediaSourceContentInitializer extends ContentInitializer { - /** Constructor settings associated to this `MediaSourceContentInitializer`. */ - private _initSettings: IInitializeArguments; - /** - * `TaskCanceller` allowing to abort everything that the - * `MediaSourceContentInitializer` is doing. - */ - private _initCanceller: TaskCanceller; - /** Interface allowing to fetch and refresh the Manifest. */ - private _manifestFetcher: ManifestFetcher; - /** - * Reference to the `Manifest` Object: - * - as an asynchronous value if it is still in the process of being loaded. - * - as an synchronous value if it has been loaded - * - `null` if the load task has not started yet. - */ - private _manifest: ISyncOrAsyncValue | null; - - private _cmcdDataBuilder: CmcdDataBuilder | null; - - /** - * Describes the decryption capabilities on the current content, discriminated - * by a `status` property: - * - * - If set to `"uninitialized"`, decryption capabilities have not been - * set up yet. - * - * - If set to `"disabled"`, decryption capabilities are explicitely - * disabled. If encrypted content needs to be decrypted, the accompanying - * error `value` describes the reason why decryption is not enabled. - * - * - If set to `"enabled"`, decryption capabilities are available, and - * `value` points to the corresponding `ContentDecryptor`. - */ - private _decryptionCapabilities: - | { - status: "uninitialized"; - value: null; - } - | { - status: "disabled"; - value: EncryptedMediaError; - } - | { - status: "enabled"; - value: ContentDecryptor; - }; - - /** - * Create a new `MediaSourceContentInitializer`, associated to the given - * settings. - * @param {Object} settings - */ - constructor(settings: IInitializeArguments) { - super(); - this._initSettings = settings; - this._initCanceller = new TaskCanceller(); - this._manifest = null; - this._decryptionCapabilities = { status: "uninitialized", value: null }; - const urls = settings.url === undefined ? undefined : [settings.url]; - this._cmcdDataBuilder = - settings.cmcd === undefined ? null : new CmcdDataBuilder(settings.cmcd); - this._manifestFetcher = new ManifestFetcher(urls, settings.transport, { - ...settings.manifestRequestSettings, - lowLatencyMode: settings.lowLatencyMode, - cmcdDataBuilder: this._cmcdDataBuilder, - }); - } - - /** - * Perform non-destructive preparation steps, to prepare a future content. - * For now, this mainly mean loading the Manifest document. - */ - public prepare(): void { - if (this._manifest !== null) { - return; - } - this._manifest = SyncOrAsync.createAsync( - createCancellablePromise(this._initCanceller.signal, (res, rej) => { - this._manifestFetcher.addEventListener("warning", (err: IPlayerError) => - this.trigger("warning", err), - ); - this._manifestFetcher.addEventListener("error", (err: unknown) => { - this.trigger("error", err); - rej(err); - }); - this._manifestFetcher.addEventListener("manifestReady", (manifest) => { - res(manifest); - }); - }), - ); - this._manifestFetcher.start(); - this._initCanceller.signal.register(() => { - this._manifestFetcher.dispose(); - }); - } - - /** - * @param {HTMLMediaElement} mediaElement - * @param {Object} playbackObserver - */ - public start( - mediaElement: IMediaElement, - playbackObserver: IMediaElementPlaybackObserver, - ): void { - this.prepare(); // Load Manifest if not already done - - /** Translate errors coming from the media element into RxPlayer errors. */ - listenToMediaError( - mediaElement, - (error: MediaError) => this._onFatalError(error), - this._initCanceller.signal, - ); - - this._setupInitialMediaSourceAndDecryption(mediaElement) - .then((initResult) => - this._onInitialMediaSourceReady( - mediaElement, - initResult.mediaSource, - playbackObserver, - initResult.drmSystemId, - initResult.unlinkMediaSource, - ), - ) - .catch((err) => { - this._onFatalError(err); - }); - } - - /** - * Update URL of the Manifest. - * @param {Array.|undefined} urls - URLs to reach that Manifest from - * the most prioritized URL to the least prioritized URL. - * @param {boolean} refreshNow - If `true` the resource in question (e.g. - * DASH's MPD) will be refreshed immediately. - */ - public updateContentUrls(urls: string[] | undefined, refreshNow: boolean): void { - this._manifestFetcher.updateContentUrls(urls, refreshNow); - } - - /** - * Stop content and free all resources linked to this - * `MediaSourceContentInitializer`. - */ - public dispose(): void { - this._initCanceller.cancel(); - } - - /** - * Callback called when an error interrupting playback arised. - * @param {*} err - */ - private _onFatalError(err: unknown) { - if (this._initCanceller.isUsed()) { - return; - } - this._initCanceller.cancel(); - this.trigger("error", err); - } - - /** - * Initialize decryption mechanisms if needed and begin creating and relying - * on the initial `MediaSourceInterface` for this content. - * @param {HTMLMediaElement|null} mediaElement - * @returns {Promise.} - */ - private _setupInitialMediaSourceAndDecryption(mediaElement: IMediaElement): Promise<{ - mediaSource: MainMediaSourceInterface; - drmSystemId: string | undefined; - unlinkMediaSource: TaskCanceller; - }> { - const initCanceller = this._initCanceller; - return createCancellablePromise(initCanceller.signal, (resolve) => { - const { keySystems } = this._initSettings; - - /** Initialize decryption capabilities. */ - const { statusRef: drmInitRef, contentDecryptor } = initializeContentDecryption( - mediaElement, - keySystems, - { - onWarning: (err: IPlayerError) => this.trigger("warning", err), - onError: (err: Error) => this._onFatalError(err), - onBlackListProtectionData: (val) => { - // Ugly IIFE workaround to allow async event listener - (async () => { - if (this._manifest === null) { - return; - } - const manifest = - this._manifest.syncValue ?? (await this._manifest.getValueAsAsync()); - blackListProtectionDataOnManifest(manifest, val); - })().catch(noop); - }, - onKeyIdsCompatibilityUpdate: (updates) => { - // Ugly IIFE workaround to allow async event listener - (async () => { - if (this._manifest === null) { - return; - } - const manifest = - this._manifest.syncValue ?? (await this._manifest.getValueAsAsync()); - updateKeyIdsDecipherabilityOnManifest( - manifest, - updates.whitelistedKeyIds, - updates.blacklistedKeyIds, - updates.delistedKeyIds, - ); - })().catch(noop); - }, - - onCodecSupportUpdate: () => { - const syncManifest = this._manifest?.syncValue; - if (isNullOrUndefined(syncManifest)) { - // The Manifest is not yet fetched, but we will be able to check - // the codecs once it is the case - this._manifest?.getValueAsAsync().then((loadedManifest) => { - if (this._initCanceller.isUsed()) { - return; - } - this._refreshManifestCodecSupport(loadedManifest); - }, noop); - } else { - this._refreshManifestCodecSupport(syncManifest); - } - }, - }, - initCanceller.signal, - ); - - if (contentDecryptor.enabled) { - this._decryptionCapabilities = { - status: "enabled", - value: contentDecryptor.value, - }; - } else { - this._decryptionCapabilities = { - status: "disabled", - value: contentDecryptor.value, - }; - } - - drmInitRef.onUpdate( - (drmStatus, stopListeningToDrmUpdates) => { - if (drmStatus.initializationState.type === "uninitialized") { - return; - } - stopListeningToDrmUpdates(); - - const mediaSourceCanceller = new TaskCanceller(); - mediaSourceCanceller.linkToSignal(initCanceller.signal); - createMediaSource(mediaElement, mediaSourceCanceller.signal) - .then((mediaSource) => { - const lastDrmStatus = drmInitRef.getValue(); - if (lastDrmStatus.initializationState.type === "awaiting-media-link") { - lastDrmStatus.initializationState.value.isMediaLinked.setValue(true); - drmInitRef.onUpdate( - (newDrmStatus, stopListeningToDrmUpdatesAgain) => { - if (newDrmStatus.initializationState.type === "initialized") { - stopListeningToDrmUpdatesAgain(); - resolve({ - mediaSource, - drmSystemId: newDrmStatus.drmSystemId, - unlinkMediaSource: mediaSourceCanceller, - }); - return; - } - }, - { emitCurrentValue: true, clearSignal: initCanceller.signal }, - ); - } else if (drmStatus.initializationState.type === "initialized") { - resolve({ - mediaSource, - drmSystemId: drmStatus.drmSystemId, - unlinkMediaSource: mediaSourceCanceller, - }); - return; - } - }) - .catch((err) => { - if (mediaSourceCanceller.isUsed()) { - return; - } - this._onFatalError(err); - }); - }, - { emitCurrentValue: true, clearSignal: initCanceller.signal }, - ); - }); - } - - private async _onInitialMediaSourceReady( - mediaElement: IMediaElement, - initialMediaSource: MainMediaSourceInterface, - playbackObserver: IMediaElementPlaybackObserver, - drmSystemId: string | undefined, - initialMediaSourceCanceller: TaskCanceller, - ): Promise { - const { - adaptiveOptions, - autoPlay, - bufferOptions, - lowLatencyMode, - segmentRequestOptions, - speed, - startAt, - textTrackOptions, - transport, - } = this._initSettings; - const initCanceller = this._initCanceller; - assert(this._manifest !== null); - let manifest: IManifest; - try { - manifest = this._manifest.syncValue ?? (await this._manifest.getValueAsAsync()); - } catch (_e) { - return; // The error should already have been processed through an event listener - } - - manifest.addEventListener( - "manifestUpdate", - (updates) => { - this.trigger("manifestUpdate", updates); - this._refreshManifestCodecSupport(manifest); - }, - initCanceller.signal, - ); - - manifest.addEventListener( - "decipherabilityUpdate", - (elts) => { - this.trigger("decipherabilityUpdate", elts); - }, - initCanceller.signal, - ); - - manifest.addEventListener( - "supportUpdate", - () => { - this.trigger("codecSupportUpdate", null); - }, - initCanceller.signal, - ); - - log.debug("Init: Calculating initial time"); - const initialTime = getInitialTime(manifest, lowLatencyMode, startAt); - log.debug("Init: Initial time calculated:", initialTime); - - /** Choose the right "Representation" for a given "Adaptation". */ - const representationEstimator = AdaptiveRepresentationSelector(adaptiveOptions); - const subBufferOptions = objectAssign( - { textTrackOptions, drmSystemId }, - bufferOptions, - ); - - const segmentQueueCreator = new SegmentQueueCreator( - transport, - this._cmcdDataBuilder, - segmentRequestOptions, - initCanceller.signal, - ); - - this._refreshManifestCodecSupport(manifest); - this.trigger("manifestReady", manifest); - if (initCanceller.isUsed()) { - return; - } - - // handle initial load and reloads - this._setupContentWithNewMediaSource( - { - mediaElement, - playbackObserver, - mediaSource: initialMediaSource, - initialTime, - autoPlay, - manifest, - representationEstimator, - segmentQueueCreator, - speed, - bufferOptions: subBufferOptions, - }, - initialMediaSourceCanceller, - ); - } - - /** - * Load the content defined by the Manifest in the mediaSource given at the - * given position and playing status. - * This function recursively re-call itself when a MediaSource reload is - * wanted. - * @param {Object} args - * @param {Object} currentCanceller - */ - private _setupContentWithNewMediaSource( - args: IBufferingMediaSettings, - currentCanceller: TaskCanceller, - ): void { - this._startLoadingContentOnMediaSource( - args, - this._createReloadMediaSourceCallback(args, currentCanceller), - currentCanceller.signal, - ); - } - - /** - * Create `IReloadMediaSourceCallback` allowing to handle reload orders. - * @param {Object} args - * @param {Object} currentCanceller - */ - private _createReloadMediaSourceCallback( - args: IBufferingMediaSettings, - currentCanceller: TaskCanceller, - ): IReloadMediaSourceCallback { - const initCanceller = this._initCanceller; - return (reloadOrder: { position: number; autoPlay: boolean }): void => { - currentCanceller.cancel(); - if (initCanceller.isUsed()) { - return; - } - this.trigger("reloadingMediaSource", reloadOrder); - if (initCanceller.isUsed()) { - return; - } - - const newCanceller = new TaskCanceller(); - newCanceller.linkToSignal(initCanceller.signal); - createMediaSource(args.mediaElement, newCanceller.signal) - .then((newMediaSource) => { - this._setupContentWithNewMediaSource( - { - ...args, - mediaSource: newMediaSource, - initialTime: reloadOrder.position, - autoPlay: reloadOrder.autoPlay, - }, - newCanceller, - ); - }) - .catch((err) => { - if (newCanceller.isUsed()) { - return; - } - this._onFatalError(err); - }); - }; - } - - /** - * Buffer the content on the given MediaSource. - * @param {Object} args - * @param {function} onReloadOrder - * @param {Object} cancelSignal - */ - private _startLoadingContentOnMediaSource( - args: IBufferingMediaSettings, - onReloadOrder: IReloadMediaSourceCallback, - cancelSignal: CancellationSignal, - ): void { - const { - autoPlay, - bufferOptions, - initialTime, - manifest, - mediaElement, - mediaSource, - playbackObserver, - representationEstimator, - segmentQueueCreator, - speed, - } = args; - - const initialPeriod = - manifest.getPeriodForTime(initialTime) ?? manifest.getNextPeriod(initialTime); - if (initialPeriod === undefined) { - const error = new MediaError( - "MEDIA_STARTING_TIME_NOT_FOUND", - "Wanted starting time not found in the Manifest.", - ); - return this._onFatalError(error); - } - - let textDisplayerInterface: ITextDisplayerInterface | null = null; - const textDisplayer = createTextDisplayer( - mediaElement, - this._initSettings.textTrackOptions, - ); - if (textDisplayer !== null) { - const sender = new MainThreadTextDisplayerInterface(textDisplayer); - textDisplayerInterface = sender; - cancelSignal.register(() => { - sender.stop(); - textDisplayer?.stop(); - }); - } - - /** Interface to create media buffers. */ - const segmentSinksStore = new SegmentSinksStore( - mediaSource, - mediaElement.nodeName === "VIDEO", - textDisplayerInterface, - ); - - cancelSignal.register(() => { - segmentSinksStore.disposeAll(); - }); - - const { autoPlayResult, initialPlayPerformed } = performInitialSeekAndPlay( - { - mediaElement, - playbackObserver, - startTime: initialTime, - mustAutoPlay: autoPlay, - onWarning: (err) => { - this.trigger("warning", err); - }, - isDirectfile: false, - }, - cancelSignal, - ); - - if (cancelSignal.isCancelled()) { - return; - } - - initialPlayPerformed.onUpdate( - (isPerformed, stopListening) => { - if (isPerformed) { - stopListening(); - const streamEventsEmitter = new StreamEventsEmitter( - manifest, - mediaElement, - playbackObserver, - ); - manifest.addEventListener( - "manifestUpdate", - () => { - streamEventsEmitter.onManifestUpdate(manifest); - }, - cancelSignal, - ); - streamEventsEmitter.addEventListener( - "event", - (payload) => { - this.trigger("streamEvent", payload); - }, - cancelSignal, - ); - streamEventsEmitter.addEventListener( - "eventSkip", - (payload) => { - this.trigger("streamEventSkip", payload); - }, - cancelSignal, - ); - streamEventsEmitter.start(); - cancelSignal.register(() => { - streamEventsEmitter.stop(); - }); - } - }, - { clearSignal: cancelSignal, emitCurrentValue: true }, - ); - - const coreObserver = createCorePlaybackObserver( - playbackObserver, - { - autoPlay, - manifest, - mediaSource, - textDisplayer, - initialPlayPerformed, - speed, - }, - cancelSignal, - ); - - this._cmcdDataBuilder?.startMonitoringPlayback(coreObserver); - cancelSignal.register(() => { - this._cmcdDataBuilder?.stopMonitoringPlayback(); - }); - - const rebufferingController = this._createRebufferingController( - playbackObserver, - manifest, - speed, - cancelSignal, - ); - const decipherabilityFreezeDetector = new DecipherabilityFreezeDetector( - segmentSinksStore, - ); - - if (mayMediaElementFailOnUndecipherableData) { - // On some devices, just reload immediately when data become undecipherable - manifest.addEventListener( - "decipherabilityUpdate", - (elts) => { - if (elts.some((e) => e.representation.decipherable !== true)) { - reloadMediaSource(0, undefined, undefined); - } - }, - cancelSignal, - ); - } - - playbackObserver.listen( - (observation) => { - if (decipherabilityFreezeDetector.needToReload(observation)) { - let position: number; - const lastObservation = playbackObserver.getReference().getValue(); - if (lastObservation.position.isAwaitingFuturePosition()) { - position = lastObservation.position.getWanted(); - } else { - position = playbackObserver.getCurrentTime(); - } - - const autoplay = initialPlayPerformed.getValue() - ? !playbackObserver.getIsPaused() - : autoPlay; - onReloadOrder({ position, autoPlay: autoplay }); - } - }, - { clearSignal: cancelSignal }, - ); - - // Synchronize SegmentSinks with what has been buffered. - coreObserver.listen( - (observation) => { - ["video" as const, "audio" as const, "text" as const].forEach((tType) => { - const segmentSinkStatus = segmentSinksStore.getStatus(tType); - if (segmentSinkStatus.type === "initialized") { - segmentSinkStatus.value.synchronizeInventory( - observation.buffered[tType] ?? [], - ); - } - }); - }, - { clearSignal: cancelSignal }, - ); - - const contentTimeBoundariesObserver = createContentTimeBoundariesObserver( - manifest, - mediaSource, - coreObserver, - segmentSinksStore, - { - onWarning: (err: IPlayerError) => this.trigger("warning", err), - onPeriodChanged: (period: IPeriodMetadata) => - this.trigger("activePeriodChanged", { period }), - }, - cancelSignal, - ); - - /** - * Emit a "loaded" events once the initial play has been performed and the - * media can begin playback. - * Also emits warning events if issues arise when doing so. - */ - autoPlayResult - .then(() => { - getLoadedReference(playbackObserver, mediaElement, false, cancelSignal).onUpdate( - (isLoaded, stopListening) => { - if (isLoaded) { - stopListening(); - this.trigger("loaded", { - getSegmentSinkMetrics: async () => { - return new Promise((resolve) => - resolve(segmentSinksStore.getSegmentSinksMetrics()), - ); - }, - }); - } - }, - { emitCurrentValue: true, clearSignal: cancelSignal }, - ); - }) - .catch((err) => { - if (cancelSignal.isCancelled()) { - return; // Current loading cancelled, no need to trigger the error - } - this._onFatalError(err); - }); - - // eslint-disable-next-line @typescript-eslint/no-this-alias - const self = this; - StreamOrchestrator( - { manifest, initialPeriod }, - coreObserver, - representationEstimator, - segmentSinksStore, - segmentQueueCreator, - bufferOptions, - handleStreamOrchestratorCallbacks(), - cancelSignal, - ); - - /** - * Returns Object handling the callbacks from a `StreamOrchestrator`, which - * are basically how it communicates about events. - * @returns {Object} - */ - function handleStreamOrchestratorCallbacks(): IStreamOrchestratorCallbacks { - return { - needsBufferFlush: (payload?: INeedsBufferFlushPayload) => { - let wantedSeekingTime: number; - const currentTime = playbackObserver.getCurrentTime(); - const relativeResumingPosition = payload?.relativeResumingPosition ?? 0; - const canBeApproximateSeek = Boolean(payload?.relativePosHasBeenDefaulted); - - if (relativeResumingPosition === 0 && canBeApproximateSeek) { - // in case relativeResumingPosition is 0, we still perform - // a tiny seek to be sure that the browser will correclty reload the video. - wantedSeekingTime = currentTime + 0.001; - } else { - wantedSeekingTime = currentTime + relativeResumingPosition; - } - playbackObserver.setCurrentTime(wantedSeekingTime); - - // Seek again once data begins to be buffered. - // This is sadly necessary on some browsers to avoid decoding - // issues after a flush. - // - // NOTE: there's in theory a potential race condition in the following - // logic as the callback could be called when media data is still - // being removed by the browser - which is an asynchronous process. - // The following condition checking for buffered data could thus lead - // to a false positive where we're actually checking previous data. - // For now, such scenario is avoided by setting the - // `includeLastObservation` option to `false` and calling - // `needsBufferFlush` once MSE media removal operations have been - // explicitely validated by the browser, but that's a complex and easy - // to break system. - playbackObserver.listen( - (obs, stopListening) => { - if ( - // Data is buffered around the current position - obs.currentRange !== null || - // Or, for whatever reason, we have no buffer but we're already advancing - obs.position.getPolled() > wantedSeekingTime + 0.1 - ) { - stopListening(); - playbackObserver.setCurrentTime(obs.position.getWanted() + 0.001); - } - }, - { includeLastObservation: false, clearSignal: cancelSignal }, - ); - }, - - streamStatusUpdate(value) { - // Announce discontinuities if found - const { period, bufferType, imminentDiscontinuity, position } = value; - rebufferingController.updateDiscontinuityInfo({ - period, - bufferType, - discontinuity: imminentDiscontinuity, - position, - }); - if (cancelSignal.isCancelled()) { - return; // Previous call has stopped streams due to a side-effect - } - - // If the status for the last Period indicates that segments are all loaded - // or on the contrary that the loading resumed, announce it to the - // ContentTimeBoundariesObserver. - if ( - manifest.isLastPeriodKnown && - value.period.id === manifest.periods[manifest.periods.length - 1].id - ) { - const hasFinishedLoadingLastPeriod = - value.hasFinishedLoading || value.isEmptyStream; - if (hasFinishedLoadingLastPeriod) { - contentTimeBoundariesObserver.onLastSegmentFinishedLoading( - value.bufferType, - ); - } else { - contentTimeBoundariesObserver.onLastSegmentLoadingResume(value.bufferType); - } - } - }, - - needsManifestRefresh: () => - self._manifestFetcher.scheduleManualRefresh({ - enablePartialRefresh: true, - canUseUnsafeMode: true, - }), - - manifestMightBeOufOfSync: () => { - const { OUT_OF_SYNC_MANIFEST_REFRESH_DELAY } = config.getCurrent(); - self._manifestFetcher.scheduleManualRefresh({ - enablePartialRefresh: false, - canUseUnsafeMode: false, - delay: OUT_OF_SYNC_MANIFEST_REFRESH_DELAY, - }); - }, - - lockedStream: (value) => - rebufferingController.onLockedStream(value.bufferType, value.period), - - adaptationChange: (value) => { - self.trigger("adaptationChange", value); - if (cancelSignal.isCancelled()) { - return; // Previous call has stopped streams due to a side-effect - } - contentTimeBoundariesObserver.onAdaptationChange( - value.type, - value.period, - value.adaptation, - ); - }, - - representationChange: (value) => { - self.trigger("representationChange", value); - if (cancelSignal.isCancelled()) { - return; // Previous call has stopped streams due to a side-effect - } - contentTimeBoundariesObserver.onRepresentationChange(value.type, value.period); - }, - - inbandEvent: (value) => self.trigger("inbandEvents", value), - - warning: (value) => self.trigger("warning", value), - - periodStreamReady: (value) => self.trigger("periodStreamReady", value), - - periodStreamCleared: (value) => { - contentTimeBoundariesObserver.onPeriodCleared(value.type, value.period); - if (cancelSignal.isCancelled()) { - return; // Previous call has stopped streams due to a side-effect - } - self.trigger("periodStreamCleared", value); - }, - - bitrateEstimateChange: (value) => { - self._cmcdDataBuilder?.updateThroughput(value.type, value.bitrate); - self.trigger("bitrateEstimateChange", value); - }, - - needsMediaSourceReload: (payload) => { - reloadMediaSource( - payload.timeOffset, - payload.minimumPosition, - payload.maximumPosition, - ); - }, - - needsDecipherabilityFlush() { - const keySystem = getKeySystemConfiguration(mediaElement); - if (shouldReloadMediaSourceOnDecipherabilityUpdate(keySystem?.[0])) { - const lastObservation = coreObserver.getReference().getValue(); - const position = lastObservation.position.isAwaitingFuturePosition() - ? lastObservation.position.getWanted() - : (coreObserver.getCurrentTime() ?? lastObservation.position.getPolled()); - const isPaused = - lastObservation.paused.pending ?? - coreObserver.getIsPaused() ?? - lastObservation.paused.last; - onReloadOrder({ position, autoPlay: !isPaused }); - } else { - const lastObservation = coreObserver.getReference().getValue(); - const position = lastObservation.position.isAwaitingFuturePosition() - ? lastObservation.position.getWanted() - : (coreObserver.getCurrentTime() ?? lastObservation.position.getPolled()); - // simple seek close to the current position - // to flush the buffers - if (position + 0.001 < lastObservation.duration) { - playbackObserver.setCurrentTime(mediaElement.currentTime + 0.001); - } else { - playbackObserver.setCurrentTime(position); - } - } - }, - - encryptionDataEncountered: (value) => { - if (self._decryptionCapabilities.status === "disabled") { - self._onFatalError(self._decryptionCapabilities.value); - return; - } else if (self._decryptionCapabilities.status === "uninitialized") { - // Should never happen - log.error( - "Init: received encryption data without known decryption capabilities", - ); - return; - } - for (const protectionData of value) { - self._decryptionCapabilities.value.onInitializationData(protectionData); - if (cancelSignal.isCancelled()) { - return; // Previous call has stopped streams due to a side-effect - } - } - }, - - error: (err) => self._onFatalError(err), - }; - } - - /** - * Callback allowing to reload the current content. - * @param {number} deltaPosition - Position you want to seek to after - * reloading, as a delta in seconds from the last polled playing position. - * @param {number|undefined} minimumPosition - If set, minimum time bound - * in seconds after `deltaPosition` has been applied. - * @param {number|undefined} maximumPosition - If set, minimum time bound - * in seconds after `deltaPosition` has been applied. - */ - function reloadMediaSource( - deltaPosition: number, - minimumPosition: number | undefined, - maximumPosition: number | undefined, - ): void { - const lastObservation = coreObserver.getReference().getValue(); - const currentPosition = lastObservation.position.isAwaitingFuturePosition() - ? lastObservation.position.getWanted() - : (coreObserver.getCurrentTime() ?? lastObservation.position.getPolled()); - const isPaused = - lastObservation.paused.pending ?? - coreObserver.getIsPaused() ?? - lastObservation.paused.last; - let position = currentPosition + deltaPosition; - if (minimumPosition !== undefined) { - position = Math.max(minimumPosition, position); - } - if (maximumPosition !== undefined) { - position = Math.min(maximumPosition, position); - } - onReloadOrder({ position, autoPlay: !isPaused }); - } - } - - /** - * Creates a `RebufferingController`, a class trying to avoid various stalling - * situations (such as rebuffering periods), and returns it. - * - * Various methods from that class need then to be called at various events - * (see `RebufferingController` definition). - * - * This function also handles the `RebufferingController`'s events: - * - emit "stalled" events when stalling situations cannot be prevented, - * - emit "unstalled" events when we could get out of one, - * - emit "warning" on various rebuffering-related minor issues - * like discontinuity skipping. - * @param {Object} playbackObserver - * @param {Object} manifest - * @param {Object} speed - * @param {Object} cancelSignal - * @returns {Object} - */ - private _createRebufferingController( - playbackObserver: IMediaElementPlaybackObserver, - manifest: IManifest, - speed: IReadOnlySharedReference, - cancelSignal: CancellationSignal, - ): RebufferingController { - const rebufferingController = new RebufferingController( - playbackObserver, - manifest, - speed, - ); - // Bubble-up events - rebufferingController.addEventListener("stalled", (evt) => - this.trigger("stalled", evt), - ); - rebufferingController.addEventListener("unstalled", () => - this.trigger("unstalled", null), - ); - rebufferingController.addEventListener("warning", (err) => - this.trigger("warning", err), - ); - cancelSignal.register(() => rebufferingController.destroy()); - rebufferingController.start(); - return rebufferingController; - } - - /** - * Evaluates a list of codecs to determine their support status. - * - * @param {Array} codecsToCheck - The list of codecs to check. - * @returns {Array} - The list of evaluated codecs with their support status updated. - */ - private getCodecsSupportInfo( - codecsToCheck: Array<{ mimeType: string; codec: string }>, - ): ICodecSupportInfo[] { - const codecsSupportInfo: ICodecSupportInfo[] = codecsToCheck.map((codecToCheck) => { - const inputCodec = `${codecToCheck.mimeType};codecs="${codecToCheck.codec}"`; - const isSupported = isCodecSupported(inputCodec); - if (!isSupported) { - return { - mimeType: codecToCheck.mimeType, - codec: codecToCheck.codec, - supported: false, - supportedIfEncrypted: false, - }; - } - /** - * `true` if the codec is supported when encrypted, `false` if it is not - * supported, or `undefined` if we cannot obtain that information. - */ - let supportedIfEncrypted: boolean | undefined; - if (this._decryptionCapabilities.status === "uninitialized") { - supportedIfEncrypted = undefined; - } else if (this._decryptionCapabilities.status === "disabled") { - // It's ambiguous here, but let's say that no ContentDecryptor means that - // the codec is supported by it. - supportedIfEncrypted = true; - } else { - const contentDecryptor = this._decryptionCapabilities.value; - if (contentDecryptor.getState() !== ContentDecryptorState.Initializing) { - // No information is available regarding the support status. - // Defaulting to assume the codec is supported. - supportedIfEncrypted = - contentDecryptor.isCodecSupported( - codecToCheck.mimeType, - codecToCheck.codec, - ) ?? true; - } - } - return { - mimeType: codecToCheck.mimeType, - codec: codecToCheck.codec, - supported: isSupported, - supportedIfEncrypted, - }; - }); - return codecsSupportInfo; - } - - /** - * Update the support status of all Representations in the Manifest. - * - * To call anytime either the Manifest is linked to new codecs or new means - * to test for codec support are available. - * @param {Object} manifest - */ - private _refreshManifestCodecSupport(manifest: IManifest): void { - const codecsToTest = manifest.getCodecsWithUnknownSupport(); - const codecsSupportInfo = this.getCodecsSupportInfo(codecsToTest); - if (codecsSupportInfo.length > 0) { - try { - manifest.updateCodecSupport(codecsSupportInfo); - } catch (err) { - this._onFatalError(err); - } - } - } -} - -function createTextDisplayer( - mediaElement: IMediaElement, - textTrackOptions: ITextDisplayerOptions, -): ITextDisplayer | null { - if (textTrackOptions.textTrackMode === "html" && features.htmlTextDisplayer !== null) { - return new features.htmlTextDisplayer( - mediaElement, - textTrackOptions.textTrackElement, - ); - } else if (features.nativeTextDisplayer !== null) { - return new features.nativeTextDisplayer(mediaElement); - } - return null; -} - -/** Arguments to give to the `InitializeOnMediaSource` function. */ -export interface IInitializeArguments { - /** Options concerning the ABR logic. */ - adaptiveOptions: IAdaptiveRepresentationSelectorArguments; - /** `true` if we should play when loaded. */ - autoPlay: boolean; - /** Options concerning the media buffers. */ - bufferOptions: { - /** Buffer "goal" at which we stop downloading new segments. */ - wantedBufferAhead: IReadOnlySharedReference; - /** Buffer maximum size in kiloBytes at which we stop downloading */ - maxVideoBufferSize: IReadOnlySharedReference; - /** Max buffer size after the current position, in seconds (we GC further up). */ - maxBufferAhead: IReadOnlySharedReference; - /** Max buffer size before the current position, in seconds (we GC further down). */ - maxBufferBehind: IReadOnlySharedReference; - /** - * Enable/Disable fastSwitching: allow to replace lower-quality segments by - * higher-quality ones to have a faster transition. - */ - enableFastSwitching: boolean; - /** Behavior when a new video and/or audio codec is encountered. */ - onCodecSwitch: "continue" | "reload"; - }; - /** - * When set to an object, enable "Common Media Client Data", or "CMCD". - */ - cmcd?: ICmcdOptions | undefined; - /** Every encryption configuration set. */ - keySystems: IKeySystemOption[]; - /** `true` to play low-latency contents optimally. */ - lowLatencyMode: boolean; - /** Settings linked to Manifest requests. */ - manifestRequestSettings: { - /** Maximum number of time a request on error will be retried. */ - maxRetry: number | undefined; - /** - * Timeout after which request are aborted and, depending on other options, - * retried. - * To set to `-1` for no timeout. - * `undefined` will lead to a default, large, timeout being used. - */ - requestTimeout: number | undefined; - /** - * Connection timeout, in milliseconds, after which the request is canceled - * if the responses headers has not being received. - * Do not set or set to "undefined" to disable it. - */ - connectionTimeout: number | undefined; - /** Limit the frequency of Manifest updates. */ - minimumManifestUpdateInterval: number; - /** - * Potential first Manifest to rely on, allowing to skip the initial Manifest - * request. - */ - initialManifest: IInitialManifest | undefined; - }; - /** Logic linked Manifest and segment loading and parsing. */ - transport: ITransportPipelines; - /** Configuration for the segment requesting logic. */ - segmentRequestOptions: { - lowLatencyMode: boolean; - /** - * Amount of time after which a request should be aborted. - * `undefined` indicates that a default value is wanted. - * `-1` indicates no timeout. - */ - requestTimeout: number | undefined; - /** - * Amount of time, in milliseconds, after which a request that hasn't receive - * the headers and status code should be aborted and optionnaly retried, - * depending on the maxRetry configuration. - */ - connectionTimeout: number | undefined; - /** Maximum number of time a request on error will be retried. */ - maxRetry: number | undefined; - }; - /** Emit the playback rate (speed) set by the user. */ - speed: IReadOnlySharedReference; - /** The configured starting position. */ - startAt?: IInitialTimeOptions | undefined; - /** Configuration specific to the text track. */ - textTrackOptions: ITextDisplayerOptions; - /** URL of the Manifest. `undefined` if unknown or not pertinent. */ - url: string | undefined; -} - -/** Arguments needed when starting to buffer media on a specific MediaSource. */ -interface IBufferingMediaSettings { - /** Various stream-related options. */ - bufferOptions: IStreamOrchestratorOptions; - /* Manifest of the content we want to play. */ - manifest: IManifest; - /** Media Element on which the content will be played. */ - mediaElement: IMediaElement; - /** Emit playback conditions regularly. */ - playbackObserver: IMediaElementPlaybackObserver; - /** Estimate the right Representation. */ - representationEstimator: IRepresentationEstimator; - /** Module to facilitate segment fetching. */ - segmentQueueCreator: SegmentQueueCreator; - /** Last wanted playback rate. */ - speed: IReadOnlySharedReference; - /** `MediaSource` element on which the media will be buffered. */ - mediaSource: MainMediaSourceInterface; - /** The initial position to seek to in media time, in seconds. */ - initialTime: number; - /** If `true` it should automatically play once enough data is loaded. */ - autoPlay: boolean; -} - -/** - * Change the decipherability of Representations which have their key id in one - * of the given Arrays: - * - * - Those who have a key id listed in `whitelistedKeyIds` will have their - * decipherability updated to `true` - * - * - Those who have a key id listed in `blacklistedKeyIds` will have their - * decipherability updated to `false` - * - * - Those who have a key id listed in `delistedKeyIds` will have their - * decipherability updated to `undefined`. - * - * @param {Object} manifest - * @param {Array.} whitelistedKeyIds - * @param {Array.} blacklistedKeyIds - * @param {Array.} delistedKeyIds - */ -function updateKeyIdsDecipherabilityOnManifest( - manifest: IManifest, - whitelistedKeyIds: Uint8Array[], - blacklistedKeyIds: Uint8Array[], - delistedKeyIds: Uint8Array[], -): void { - manifest.updateRepresentationsDeciperability((ctx) => { - const { representation } = ctx; - if (representation.contentProtections === undefined) { - return representation.decipherable; - } - const contentKIDs = representation.contentProtections.keyIds; - if (contentKIDs !== undefined) { - for (const elt of contentKIDs) { - for (const blacklistedKeyId of blacklistedKeyIds) { - if (areArraysOfNumbersEqual(blacklistedKeyId, elt)) { - return false; - } - } - for (const whitelistedKeyId of whitelistedKeyIds) { - if (areArraysOfNumbersEqual(whitelistedKeyId, elt)) { - return true; - } - } - for (const delistedKeyId of delistedKeyIds) { - if (areArraysOfNumbersEqual(delistedKeyId, elt)) { - return undefined; - } - } - } - } - return representation.decipherable; - }); -} - -/** - * Update decipherability to `false` to any Representation which is linked to - * the given initialization data. - * @param {Object} manifest - * @param {Object} initData - */ -function blackListProtectionDataOnManifest( - manifest: IManifest, - initData: IProcessedProtectionData, -) { - manifest.updateRepresentationsDeciperability((ctx) => { - const rep = ctx.representation; - if (rep.decipherable === false) { - return false; - } - const segmentProtections = rep.contentProtections?.initData ?? []; - for (const protection of segmentProtections) { - if (initData.type === undefined || protection.type === initData.type) { - const containedInitData = initData.values - .getFormattedValues() - .every((undecipherableVal) => { - return protection.values.some((currVal) => { - return ( - (undecipherableVal.systemId === undefined || - currVal.systemId === undecipherableVal.systemId) && - areArraysOfNumbersEqual(currVal.data, undecipherableVal.data) - ); - }); - }); - if (containedInitData) { - return false; - } - } - } - return rep.decipherable; - }); -} - -/** - * Function to call when you want to "reload" the MediaSource: basically - * restarting playback on a new MediaSource for the same content (it may - * be for varied reasons, such as ensuring data buffers are empty, or - * restarting after some kind of fatal error). - * @param {Object} reloadOrder - * @param {number} reloadOrder.position - Position in seconds at which we - * should restart from when playback restarts. - * @param {boolean} reloadOrder.autoPlay - If `true` we will directly play - * once enough data is re-loaded. - */ -type IReloadMediaSourceCallback = (reloadOrder: { - position: number; - autoPlay: boolean; -}) => void; diff --git a/src/main_thread/init/multi_thread_content_initializer.ts b/src/main_thread/init/multi_thread_content_initializer.ts index 89ca5a1e56..27126b51fc 100644 --- a/src/main_thread/init/multi_thread_content_initializer.ts +++ b/src/main_thread/init/multi_thread_content_initializer.ts @@ -7,6 +7,7 @@ import type { IAdaptationChoice, IResolutionInfo, } from "../../core/types"; +import type CoreInterface from "../../core_interface"; import { EncryptedMediaError, MediaError, @@ -39,6 +40,7 @@ import type { IInitialManifest, IKeySystemOption, IPlayerError, + IRepresentationFilter, } from "../../public_types"; import type { ITransportOptions } from "../../transports"; import arrayFind from "../../utils/array_find"; @@ -55,7 +57,6 @@ import type { IContentProtection } from "../decrypt"; import type IContentDecryptor from "../decrypt"; import { ContentDecryptorState, getKeySystemConfiguration } from "../decrypt"; import type { ITextDisplayer } from "../text_displayer"; -import sendMessage from "./send_message"; import type { ITextDisplayerOptions } from "./types"; import { ContentInitializer } from "./types"; import createCorePlaybackObserver from "./utils/create_core_playback_observer"; @@ -82,18 +83,18 @@ export default class MultiThreadContentInitializer extends ContentInitializer { private _settings: IInitializeArguments; /** - * The WebWorker may be sending messages as soon as we're preparing the - * content but the `MultiThreadContentInitializer` is only able to handle all of - * them only once `start`ed. + * The Core may be sending messages as soon as we're preparing the content but + * the `MultiThreadContentInitializer` is only able to handle all of them only + * once `start`ed. * - * As such `_queuedWorkerMessages` is set to an Array when `prepare` has been - * called but not `start` yet, and contains all worker messages that have to + * As such `_queuedCoreMessages` is set to an Array when `prepare` has been + * called but not `start` yet, and contains all core messages that have to * be processed when `start` is called. * * It is set to `null` when there's no need to rely on that queue (either not * yet `prepare`d or already `start`ed). */ - private _queuedWorkerMessages: MessageEvent[] | null; + private _queuedCoreMessages: IWorkerMessage[] | null; /** * Information relative to the current loaded content. @@ -116,7 +117,7 @@ export default class MultiThreadContentInitializer extends ContentInitializer { private _currentMediaSourceCanceller: TaskCanceller; /** - * Stores the resolvers and the current messageId that is sent to the web worker to + * Stores the resolvers and the current messageId that is sent to the core to * receive segment sink metrics. * The purpose of collecting metrics is for monitoring and debugging. */ @@ -141,7 +142,7 @@ export default class MultiThreadContentInitializer extends ContentInitializer { lastMessageId: 0, resolvers: new Map(), }; - this._queuedWorkerMessages = null; + this._queuedCoreMessages = null; } /** @@ -152,7 +153,8 @@ export default class MultiThreadContentInitializer extends ContentInitializer { return; } const contentId = generateContentId(); - const { adaptiveOptions, transportOptions, worker } = this._settings; + const { adaptiveOptions, transport, transportOptions, coreInterface } = + this._settings; const { wantedBufferAhead, maxVideoBufferSize, maxBufferAhead, maxBufferBehind } = this._settings.bufferOptions; const initialVideoBitrate = adaptiveOptions.initialBitrates.video; @@ -168,13 +170,14 @@ export default class MultiThreadContentInitializer extends ContentInitializer { autoPlay: undefined, initialPlayPerformed: null, }; - sendMessage(worker, { + coreInterface.sendMessage({ type: MainThreadMessageType.PrepareContent, value: { contentId, cmcd: this._settings.cmcd, url: this._settings.url, hasText: this._hasTextBufferFeature(), + transport, transportOptions, initialVideoBitrate, initialAudioBitrate, @@ -186,7 +189,7 @@ export default class MultiThreadContentInitializer extends ContentInitializer { }, }); this._initCanceller.signal.register(() => { - sendMessage(worker, { + coreInterface.sendMessage({ type: MainThreadMessageType.StopContent, contentId, value: null, @@ -195,10 +198,9 @@ export default class MultiThreadContentInitializer extends ContentInitializer { if (this._initCanceller.isUsed()) { return; } - this._queuedWorkerMessages = []; - log.debug("MTCI: addEventListener prepare buffering worker messages"); - const onmessage = (evt: MessageEvent): void => { - const msgData = evt.data as unknown as IWorkerMessage; + this._queuedCoreMessages = []; + log.debug("MTCI: addEventListener prepare buffering core messages"); + const onmessage = (msgData: IWorkerMessage): void => { const type = msgData.type; switch (type) { case WorkerMessageType.LogMessage: { @@ -213,7 +215,7 @@ export default class MultiThreadContentInitializer extends ContentInitializer { if (l === null) { return null; } - return formatWorkerError(l); + return formatCoreError(l); default: assertUnreachable(l); } @@ -239,29 +241,29 @@ export default class MultiThreadContentInitializer extends ContentInitializer { break; } default: - if (this._queuedWorkerMessages !== null) { - this._queuedWorkerMessages.push(evt); + if (this._queuedCoreMessages !== null) { + this._queuedCoreMessages.push(msgData); } break; } }; - this._settings.worker.addEventListener("message", onmessage); - const onmessageerror = (_msg: MessageEvent) => { - log.error("MTCI: Error when receiving message from worker."); + this._settings.coreInterface.addMessageListener(onmessage); + const onmessageerror = () => { + log.error("MTCI: Error when receiving message from core."); }; - this._settings.worker.addEventListener("messageerror", onmessageerror); + this._settings.coreInterface.addErrorListener(onmessageerror); this._initCanceller.signal.register(() => { - log.debug("MTCI: removeEventListener prepare for worker message"); - this._settings.worker.removeEventListener("message", onmessage); - this._settings.worker.removeEventListener("messageerror", onmessageerror); + log.debug("MTCI: removeEventListener prepare for core message"); + this._settings.coreInterface.removeMessageListener(onmessage); + this._settings.coreInterface.removeErrorListener(onmessageerror); }); // Also bind all `SharedReference` objects: const throttleVideoBitrate = adaptiveOptions.throttlers.throttleBitrate.video ?? new SharedReference(Infinity); - bindNumberReferencesToWorker( - worker, + bindNumberReferencesToCore( + coreInterface, this._initCanceller.signal, [wantedBufferAhead, "wantedBufferAhead"], [maxVideoBufferSize, "maxVideoBufferSize"], @@ -279,7 +281,7 @@ export default class MultiThreadContentInitializer extends ContentInitializer { }); limitVideoResolution.onUpdate( (newVal) => { - sendMessage(worker, { + coreInterface.sendMessage({ type: MainThreadMessageType.ReferenceUpdate, value: { name: "limitVideoResolution", newVal }, }); @@ -299,7 +301,7 @@ export default class MultiThreadContentInitializer extends ContentInitializer { if (this._currentContentInfo === null) { return; } - sendMessage(this._settings.worker, { + this._settings.coreInterface.sendMessage({ type: MainThreadMessageType.ContentUrlsUpdate, contentId: this._currentContentInfo.contentId, value: { urls, refreshNow }, @@ -439,8 +441,7 @@ export default class MultiThreadContentInitializer extends ContentInitializer { ); }; - const onmessage = (msg: MessageEvent) => { - const msgData = msg.data as unknown as IWorkerMessage; + const onmessage = (msgData: IWorkerMessage) => { switch (msgData.type) { case WorkerMessageType.AttachMediaSource: { if (this._currentContentInfo?.contentId !== msgData.contentId) { @@ -479,14 +480,14 @@ export default class MultiThreadContentInitializer extends ContentInitializer { if (this._currentContentInfo?.contentId !== msgData.contentId) { return; } - this.trigger("warning", formatWorkerError(msgData.value)); + this.trigger("warning", formatCoreError(msgData.value)); break; case WorkerMessageType.Error: if (this._currentContentInfo?.contentId !== msgData.contentId) { return; } - this._onFatalError(formatWorkerError(msgData.value)); + this._onFatalError(formatCoreError(msgData.value)); break; case WorkerMessageType.CreateMediaSource: @@ -494,7 +495,7 @@ export default class MultiThreadContentInitializer extends ContentInitializer { msgData, mediaElement, mediaSourceStatus, - this._settings.worker, + this._settings.coreInterface, ); break; @@ -533,7 +534,7 @@ export default class MultiThreadContentInitializer extends ContentInitializer { sourceBuffer .appendBuffer(msgData.value.data, msgData.value.params) .then((buffered) => { - sendMessage(this._settings.worker, { + this._settings.coreInterface.sendMessage({ type: MainThreadMessageType.SourceBufferSuccess, mediaSourceId: mediaSource.id, sourceBufferType: sourceBuffer.type, @@ -542,7 +543,7 @@ export default class MultiThreadContentInitializer extends ContentInitializer { }); }) .catch((error) => { - sendMessage(this._settings.worker, { + this._settings.coreInterface.sendMessage({ type: MainThreadMessageType.SourceBufferError, mediaSourceId: mediaSource.id, sourceBufferType: sourceBuffer.type, @@ -575,7 +576,7 @@ export default class MultiThreadContentInitializer extends ContentInitializer { sourceBuffer .remove(msgData.value.start, msgData.value.end) .then((buffered) => { - sendMessage(this._settings.worker, { + this._settings.coreInterface.sendMessage({ type: MainThreadMessageType.SourceBufferSuccess, mediaSourceId: mediaSource.id, sourceBufferType: sourceBuffer.type, @@ -584,7 +585,7 @@ export default class MultiThreadContentInitializer extends ContentInitializer { }); }) .catch((error) => { - sendMessage(this._settings.worker, { + this._settings.coreInterface.sendMessage({ type: MainThreadMessageType.SourceBufferError, mediaSourceId: mediaSource.id, sourceBufferType: sourceBuffer.type, @@ -921,7 +922,7 @@ export default class MultiThreadContentInitializer extends ContentInitializer { stopListening(); return; } - sendMessage(this._settings.worker, { + this._settings.coreInterface.sendMessage({ type: MainThreadMessageType.RepresentationUpdate, contentId: this._currentContentInfo.contentId, value: { @@ -935,7 +936,7 @@ export default class MultiThreadContentInitializer extends ContentInitializer { { clearSignal: this._initCanceller.signal }, ); } - sendMessage(this._settings.worker, { + this._settings.coreInterface.sendMessage({ type: MainThreadMessageType.TrackUpdate, contentId: this._currentContentInfo.contentId, value: { @@ -1016,14 +1017,14 @@ export default class MultiThreadContentInitializer extends ContentInitializer { } else { try { const ranges = textDisplayer.pushTextData(msgData.value); - sendMessage(this._settings.worker, { + this._settings.coreInterface.sendMessage({ type: MainThreadMessageType.PushTextDataSuccess, contentId: msgData.contentId, value: { ranges }, }); } catch (err) { const message = err instanceof Error ? err.message : "Unknown error"; - sendMessage(this._settings.worker, { + this._settings.coreInterface.sendMessage({ type: MainThreadMessageType.PushTextDataError, contentId: msgData.contentId, value: { message }, @@ -1047,14 +1048,14 @@ export default class MultiThreadContentInitializer extends ContentInitializer { msgData.value.start, msgData.value.end, ); - sendMessage(this._settings.worker, { + this._settings.coreInterface.sendMessage({ type: MainThreadMessageType.RemoveTextDataSuccess, contentId: msgData.contentId, value: { ranges }, }); } catch (err) { const message = err instanceof Error ? err.message : "Unknown error"; - sendMessage(this._settings.worker, { + this._settings.coreInterface.sendMessage({ type: MainThreadMessageType.RemoveTextDataError, contentId: msgData.contentId, value: { message }, @@ -1157,19 +1158,19 @@ export default class MultiThreadContentInitializer extends ContentInitializer { } }; - log.debug("MTCI: addEventListener for worker message"); - if (this._queuedWorkerMessages !== null) { - const bufferedMessages = this._queuedWorkerMessages.slice(); + log.debug("MTCI: addEventListener for core message"); + if (this._queuedCoreMessages !== null) { + const bufferedMessages = this._queuedCoreMessages.slice(); log.debug("MTCI: Processing buffered messages", bufferedMessages.length); for (const message of bufferedMessages) { onmessage(message); } - this._queuedWorkerMessages = null; + this._queuedCoreMessages = null; } - this._settings.worker.addEventListener("message", onmessage); + this._settings.coreInterface.addMessageListener(onmessage); this._initCanceller.signal.register(() => { - log.debug("MTCI: removeEventListener for worker message"); - this._settings.worker.removeEventListener("message", onmessage); + log.debug("MTCI: removeEventListener for core message"); + this._settings.coreInterface.removeMessageListener(onmessage); }); } @@ -1280,7 +1281,7 @@ export default class MultiThreadContentInitializer extends ContentInitializer { ) { reloadMediaSource(); } else { - sendMessage(this._settings.worker, { + this._settings.coreInterface.sendMessage({ type: MainThreadMessageType.DecipherabilityStatusUpdate, contentId: this._currentContentInfo.contentId, value: manUpdates.map((s) => ({ @@ -1308,7 +1309,7 @@ export default class MultiThreadContentInitializer extends ContentInitializer { ) { reloadMediaSource(); } else { - sendMessage(this._settings.worker, { + this._settings.coreInterface.sendMessage({ type: MainThreadMessageType.DecipherabilityStatusUpdate, contentId: this._currentContentInfo.contentId, value: manUpdates.map((s) => ({ @@ -1370,7 +1371,7 @@ export default class MultiThreadContentInitializer extends ContentInitializer { /** * Retrieves all unknown codecs from the current manifest, checks these unknown codecs * to determine if they are supported, updates the manifest with the support - * status of these codecs, and forwards the list of supported codecs to the web worker. + * status of these codecs, and forwards the list of supported codecs to core. * @param manifest */ private _updateCodecSupport(manifest: IManifestMetadata) { @@ -1380,11 +1381,11 @@ export default class MultiThreadContentInitializer extends ContentInitializer { this._currentContentInfo?.contentDecryptor ?? null, ); if (updatedCodecs.length > 0) { - sendMessage(this._settings.worker, { + this._settings.coreInterface.sendMessage({ type: MainThreadMessageType.CodecSupportUpdate, value: updatedCodecs, }); - // TODO what if one day the worker updates codec support by itself? + // TODO what if one day the core updates codec support by itself? // We wouldn't know... this.trigger("codecSupportUpdate", null); } @@ -1440,7 +1441,7 @@ export default class MultiThreadContentInitializer extends ContentInitializer { const contentId = this._currentContentInfo.contentId; corePlaybackObserver.listen( (obs) => { - sendMessage(this._settings.worker, { + this._settings.coreInterface.sendMessage({ type: MainThreadMessageType.PlaybackObservation, contentId, value: objectAssign(obs, { @@ -1467,7 +1468,7 @@ export default class MultiThreadContentInitializer extends ContentInitializer { * time a content is loaded AND re-loaded on a `HTMLMediaElement`, when the * manifest is known. * - * Note that this does not include reacting to incoming worker messages nor + * Note that this does not include reacting to incoming core messages nor * sending them, those actions have to be handled separately. * * @param {Object} parameters @@ -1597,7 +1598,7 @@ export default class MultiThreadContentInitializer extends ContentInitializer { > = async () => { this._segmentMetrics.lastMessageId++; const messageId = this._segmentMetrics.lastMessageId; - sendMessage(this._settings.worker, { + this._settings.coreInterface.sendMessage({ type: MainThreadMessageType.PullSegmentSinkStoreInfos, value: { messageId }, }); @@ -1709,7 +1710,7 @@ export default class MultiThreadContentInitializer extends ContentInitializer { const sentInitialObservation = objectAssign(initialObservation, { position: initialObservation.position.serialize(), }); - sendMessage(this._settings.worker, { + this._settings.coreInterface.sendMessage({ type: MainThreadMessageType.StartPreparedContent, contentId, value: { @@ -1723,7 +1724,7 @@ export default class MultiThreadContentInitializer extends ContentInitializer { corePlaybackObserver.listen( (obs) => { - sendMessage(this._settings.worker, { + this._settings.coreInterface.sendMessage({ type: MainThreadMessageType.PlaybackObservation, contentId, value: objectAssign(obs, { position: obs.position.serialize() }), @@ -1739,18 +1740,17 @@ export default class MultiThreadContentInitializer extends ContentInitializer { } /** - * Handles Worker messages asking to create a MediaSource. - * @param {Object} msg - The worker's message received. + * Handles core messages asking to create a MediaSource. + * @param {Object} msg - The core's message received. * @param {HTMLMediaElement} mediaElement - HTMLMediaElement on which the * content plays. - * @param {Worker} worker - The WebWorker concerned, messages may be sent back - * to it. + * @param {Object} coreInterface - The interface to the core. */ private _onCreateMediaSourceMessage( msg: ICreateMediaSourceWorkerMessage, mediaElement: IMediaElement, mediaSourceStatus: SharedReference, - worker: Worker, + coreInterface: CoreInterface, ): void { if (this._currentContentInfo?.contentId !== msg.contentId) { log.info("MTCI: Ignoring MediaSource attachment due to wrong `contentId`"); @@ -1768,21 +1768,21 @@ export default class MultiThreadContentInitializer extends ContentInitializer { const mediaSource = new MainMediaSourceInterface(mediaSourceId); this._currentContentInfo.mainThreadMediaSource = mediaSource; mediaSource.addEventListener("mediaSourceOpen", () => { - sendMessage(worker, { + coreInterface.sendMessage({ type: MainThreadMessageType.MediaSourceReadyStateChange, mediaSourceId, value: "open", }); }); mediaSource.addEventListener("mediaSourceEnded", () => { - sendMessage(worker, { + coreInterface.sendMessage({ type: MainThreadMessageType.MediaSourceReadyStateChange, mediaSourceId, value: "ended", }); }); mediaSource.addEventListener("mediaSourceClose", () => { - sendMessage(worker, { + coreInterface.sendMessage({ type: MainThreadMessageType.MediaSourceReadyStateChange, mediaSourceId, value: "closed", @@ -1825,7 +1825,7 @@ export default class MultiThreadContentInitializer extends ContentInitializer { export interface IMultiThreadContentInitializerContentInfos { /** * "contentId", which is the identifier for the currently loaded content. - * Allows to ensure that the WebWorker is referencing the current content, not + * Allows to ensure that the Core is referencing the current content, not * a previously stopped one. */ contentId: string; @@ -1882,7 +1882,16 @@ export interface IMultiThreadContentInitializerContentInfos { /** Arguments to give to the `InitializeOnMediaSource` function. */ export interface IInitializeArguments { - worker: Worker; + /** + * The `MultiThreadContentInitializer` will interact with the RxPlayer's core + * logic (the one loading media data) by exchanging messages through an + * interface called the `CoreInterface`. + * + * This `CoreInterface` allows to abstract its actual current implementation. + * E.g., the core logic could be running in a WebWorker or in main thread, in + * which cases message exchanging mechanisms would be different. + */ + coreInterface: CoreInterface; /** Options concerning the ABR logic. */ adaptiveOptions: IAdaptiveRepresentationSelectorArguments; /** `true` if we should play when loaded. */ @@ -1913,18 +1922,13 @@ export interface IInitializeArguments { keySystems: IKeySystemOption[]; /** `true` to play low-latency contents optimally. */ lowLatencyMode: boolean; + /** + * The type of "transport" wanted, e.g. "dash" or "smooth". + */ + transport: string; /** Options relative to the streaming protocol. */ - transportOptions: Omit< - ITransportOptions, - "manifestLoader" | "segmentLoader" | "representationFilter" - > & { - // Unsupported features have to be disabled explicitely - // TODO support them - manifestLoader: undefined; - segmentLoader: undefined; - - // Option which has to be set as a Funtion string to work. - representationFilter: string | undefined; + transportOptions: Omit & { + representationFilter?: IRepresentationFilter | string | undefined; }; /** Settings linked to Manifest requests. */ manifestRequestSettings: { @@ -1979,8 +1983,8 @@ export interface IInitializeArguments { url: string | undefined; } -function bindNumberReferencesToWorker( - worker: Worker, +function bindNumberReferencesToCore( + coreInterface: CoreInterface, cancellationSignal: CancellationSignal, ...refs: Array< [ @@ -2000,7 +2004,7 @@ function bindNumberReferencesToWorker( (newVal) => { // NOTE: The TypeScript checks have already been made by this function's // overload, but the body here is not aware of that. - sendMessage(worker, { + coreInterface.sendMessage({ type: MainThreadMessageType.ReferenceUpdate, // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-explicit-any, @typescript-eslint/no-unsafe-call, @typescript-eslint/no-unsafe-member-access value: { name: ref[1] as any, newVal: newVal as any }, @@ -2011,7 +2015,7 @@ function bindNumberReferencesToWorker( } } -function formatWorkerError(sentError: ISentError): IPlayerError { +function formatCoreError(sentError: ISentError): IPlayerError { switch (sentError.name) { case "NetworkError": return new NetworkError( diff --git a/src/main_thread/init/send_message.ts b/src/main_thread/init/send_message.ts deleted file mode 100644 index 42165f232b..0000000000 --- a/src/main_thread/init/send_message.ts +++ /dev/null @@ -1,15 +0,0 @@ -import log from "../../log"; -import type { IMainThreadMessage } from "../../multithread_types"; - -export default function sendMessage( - worker: Worker, - msg: IMainThreadMessage, - transferables?: Transferable[], -): void { - log.debug("---> Sending to Worker:", msg.type); - if (transferables === undefined) { - worker.postMessage(msg); - } else { - worker.postMessage(msg, transferables); - } -} diff --git a/src/multithread_types.ts b/src/multithread_types.ts index e7e4b926a7..9f96241e9b 100644 --- a/src/multithread_types.ts +++ b/src/multithread_types.ts @@ -29,7 +29,7 @@ import type { SourceBufferType, } from "./mse"; import type { IFreezingStatus, IRebufferingStatus } from "./playback_observer"; -import type { ICmcdOptions, ITrackType } from "./public_types"; +import type { ICmcdOptions, IRepresentationFilter, ITrackType } from "./public_types"; import type { ITransportOptions } from "./transports"; import type { ILogFormat, ILoggerLevel } from "./utils/logger"; import type { IRange } from "./utils/ranges"; @@ -116,21 +116,12 @@ export interface IContentInitializationData { /** If `true`, text buffer (e.g. for subtitles) is enabled. */ hasText: boolean; /** - * Options relative to the streaming protocol. - * - * Options not yet supported in a WebWorker environment are omitted. + * The type of "transport" wanted, e.g. "dash" or "smooth". */ - transportOptions: Omit< - ITransportOptions, - "manifestLoader" | "segmentLoader" | "representationFilter" - > & { - // Unsupported features have to be disabled explicitely - // TODO support them - manifestLoader: undefined; - segmentLoader: undefined; - - // Option which has to be set as a Funtion string to work. - representationFilter: string | undefined; + transport: string; + /** Options relative to the streaming protocol. */ + transportOptions: Omit & { + representationFilter?: IRepresentationFilter | string | undefined; }; /** Initial video bitrate on which the adaptive logic will base itself. */ initialVideoBitrate?: number | undefined; diff --git a/src/worker_entry_point.ts b/src/worker_entry_point.ts index a180b35ea4..b6f3aa0874 100644 --- a/src/worker_entry_point.ts +++ b/src/worker_entry_point.ts @@ -4,4 +4,56 @@ */ import initializeWorkerMain from "./core/main/worker"; -initializeWorkerMain(); +import { + limitVideoResolution, + maxBufferAhead, + maxBufferBehind, + maxVideoBufferSize, + throttleVideoBitrate, + wantedBufferAhead, +} from "./core/main/worker/globals"; +import log from "./experimental/tools/mediaCapabilitiesProber/log"; +import features from "./features"; +import type { IWorkerMessage } from "./multithread_types"; +import DashFastJsParser from "./parsers/manifest/dash/fast-js-parser"; +import DashWasmParser from "./parsers/manifest/dash/wasm-parser"; +import createDashPipelines from "./transports/dash"; +import globalScope from "./utils/global_scope"; + +// Initialize Manually a `DashWasmParser` and add the feature. +// TODO allow worker-side feature-switching? Not sure how +const dashWasmParser = new DashWasmParser(); +features.dashParsers.wasm = dashWasmParser; +features.dashParsers.fastJs = DashFastJsParser; +features.transports.dash = createDashPipelines; + +globalScope.onmessageerror = (_msg: MessageEvent) => { + log.error("Worker: Error when receiving message from main thread."); +}; +initializeWorkerMain( + (handler) => { + onmessage = handler; + }, + sendMessage, + { + limitVideoResolution, + maxBufferAhead, + maxBufferBehind, + maxVideoBufferSize, + throttleVideoBitrate, + wantedBufferAhead, + }, +); + +function sendMessage(msg: IWorkerMessage, transferables?: Transferable[]): void { + log.debug("<--- Sending to Main:", msg.type); + if (transferables === undefined) { + postMessage(msg); + } else { + // TypeScript made a mistake here, and 2busy2fix + (postMessage as (msg: IWorkerMessage, transferables: Transferable[]) => void)( + msg, + transferables, + ); + } +} diff --git a/tests/integration/scenarios/dash_live.test.js b/tests/integration/scenarios/dash_live.test.js index 01ba31aa64..353c898f17 100644 --- a/tests/integration/scenarios/dash_live.test.js +++ b/tests/integration/scenarios/dash_live.test.js @@ -183,6 +183,7 @@ describe("DASH live content (SegmentTimeline)", function () { }); expect(player.getAvailableVideoTracks()).to.eql([]); + await sleep(0); expect(player.getAvailableVideoTracks()).to.eql([]); expect(manifestLoaderCalledTimes).to.equal(1); await checkAfterSleepWithBackoff(null, () => { @@ -248,6 +249,7 @@ describe("DASH live content (SegmentTimeline)", function () { manifestLoader, segmentLoader, }); + await sleep(0); expect(manifestLoaderCalledTimes).to.equal(1); await checkAfterSleepWithBackoff({ maxTimeMs: 2000 }, () => { @@ -273,6 +275,7 @@ describe("DASH live content (SegmentTimeline)", function () { manifestLoader, segmentLoader, }); + await sleep(0); expect(manifestLoaderCalledTimes).to.equal(1); await checkAfterSleepWithBackoff(null, () => { @@ -341,6 +344,7 @@ describe("DASH live content with no timeShiftBufferDepth (SegmentTimeline)", fun segmentLoader, }); expect(player.getAvailableAudioTracks()).to.eql([]); + await sleep(0); expect(manifestLoaderCalledTimes).to.equal(1); await checkAfterSleepWithBackoff(null, () => { @@ -402,6 +406,7 @@ describe("DASH live content with no timeShiftBufferDepth (SegmentTimeline)", fun }); expect(player.getAvailableTextTracks()).to.eql([]); + await sleep(0); expect(player.getAvailableTextTracks()).to.eql([]); expect(manifestLoaderCalledTimes).to.equal(1); @@ -460,6 +465,7 @@ describe("DASH live content with no timeShiftBufferDepth (SegmentTimeline)", fun }); expect(player.getAvailableVideoTracks()).to.eql([]); + await sleep(0); expect(player.getAvailableVideoTracks()).to.eql([]); expect(manifestLoaderCalledTimes).to.equal(1); await checkAfterSleepWithBackoff(null, () => { @@ -528,6 +534,7 @@ describe("DASH live content with no timeShiftBufferDepth (SegmentTimeline)", fun segmentLoader, }); + await sleep(0); expect(manifestLoaderCalledTimes).to.equal(1); await checkAfterSleepWithBackoff(null, () => { expect(player.getMinimumPosition()).to.equal(6); @@ -555,6 +562,7 @@ describe("DASH live content with no timeShiftBufferDepth (SegmentTimeline)", fun segmentLoader, }); + await sleep(0); expect(manifestLoaderCalledTimes).to.equal(1); await checkAfterSleepWithBackoff(null, () => { expect(player.getMaximumPosition()).to.be.closeTo(1527508062, 1); diff --git a/tests/integration/scenarios/dash_live_SegmentTemplate.test.js b/tests/integration/scenarios/dash_live_SegmentTemplate.test.js index 516cbf178a..bd150e6b8c 100644 --- a/tests/integration/scenarios/dash_live_SegmentTemplate.test.js +++ b/tests/integration/scenarios/dash_live_SegmentTemplate.test.js @@ -5,6 +5,7 @@ import { noTimeShiftBufferDepthManifestInfos, } from "../../contents/DASH_dynamic_SegmentTemplate"; import { checkAfterSleepWithBackoff } from "../../utils/checkAfterSleepWithBackoff.js"; +import sleep from "../../utils/sleep.js"; describe("DASH live content (SegmentTemplate)", function () { let player; @@ -41,6 +42,7 @@ describe("DASH live content (SegmentTemplate)", function () { }, }, }); + await sleep(0); expect(manifestLoaderCalledTimes).to.equal(1); expect(player.getPlayerState()).to.equal("LOADING"); @@ -280,6 +282,7 @@ describe("DASH live content without timeShiftBufferDepth (SegmentTemplate)", fun }, }, }); + await sleep(0); expect(manifestLoaderCalledTimes).to.equal(1); expect(player.getPlayerState()).to.equal("LOADING"); diff --git a/tests/integration/scenarios/dash_multi-track.test.js b/tests/integration/scenarios/dash_multi-track.test.js index 8070630b69..49916602fc 100644 --- a/tests/integration/scenarios/dash_multi-track.test.js +++ b/tests/integration/scenarios/dash_multi-track.test.js @@ -302,6 +302,7 @@ describe("DASH multi-track content (SegmentTimeline)", function () { manifestLoader, segmentLoader, }); + await sleep(0); expect(manifestLoaderCalledTimes).to.equal(1); expect(requestedSegments).to.be.empty; diff --git a/tests/integration/scenarios/dash_static.test.js b/tests/integration/scenarios/dash_static.test.js index 41c69006e8..c0a7ae12af 100644 --- a/tests/integration/scenarios/dash_static.test.js +++ b/tests/integration/scenarios/dash_static.test.js @@ -283,6 +283,7 @@ describe("DASH non-linear content with number-based SegmentTimeline", function ( segmentLoader, }); + await sleep(0); expect(requestedManifests).to.have.length(1); expect(requestedSegments).to.be.empty; expect(requestedManifests[0]).to.equal(numberBasedTimelineManifestInfos.url); diff --git a/tests/integration/scenarios/initial_playback.test.js b/tests/integration/scenarios/initial_playback.test.js index 4df7f718d7..a27956de97 100644 --- a/tests/integration/scenarios/initial_playback.test.js +++ b/tests/integration/scenarios/initial_playback.test.js @@ -278,6 +278,7 @@ function runInitialPlaybackTests({ multithread } = {}) { manifestLoader, segmentLoader, }); + await sleep(0); expect(manifestLoaderCalledTimes).to.equal(1); await sleep(500); @@ -307,6 +308,7 @@ function runInitialPlaybackTests({ multithread } = {}) { manifestLoader, segmentLoader, }); + await sleep(0); expect(manifestLoaderCalledTimes).to.equal(1); await sleep(100); diff --git a/tests/integration/utils/launch_tests_for_content.js b/tests/integration/utils/launch_tests_for_content.js index 8651dca5dc..65a7ef67ef 100644 --- a/tests/integration/utils/launch_tests_for_content.js +++ b/tests/integration/utils/launch_tests_for_content.js @@ -113,6 +113,7 @@ export default function launchTestsForContent(manifestInfos, { multithread } = { manifestLoader, segmentLoader, }); + await sleep(0); // should only have the manifest for now expect(manifestLoaderCalledTimes).to.equal(1); From f3bce75c74c6903beff3539595f6bd569a5e1c83 Mon Sep 17 00:00:00 2001 From: Paul Berberian Date: Mon, 13 Jan 2025 19:42:04 +0100 Subject: [PATCH 2/4] Perform data-serialization as sendMessage-time --- src/core/main/worker/content_preparer.ts | 16 +++------- src/multithread_types.ts | 40 ++++++++++++++++++++++-- src/worker_entry_point.ts | 36 ++++++++++++++++++++- 3 files changed, 77 insertions(+), 15 deletions(-) diff --git a/src/core/main/worker/content_preparer.ts b/src/core/main/worker/content_preparer.ts index 55a90b018f..4ecb50cf7a 100644 --- a/src/core/main/worker/content_preparer.ts +++ b/src/core/main/worker/content_preparer.ts @@ -1,6 +1,6 @@ import features from "../../../features"; import log from "../../../log"; -import type { IManifest, IManifestMetadata } from "../../../manifest"; +import type { IManifest } from "../../../manifest"; import { createRepresentationFilterFromFnString } from "../../../manifest"; import type { IMediaSourceInterface } from "../../../mse"; import MainMediaSourceInterface from "../../../mse/main_media_source_interface"; @@ -13,7 +13,6 @@ import type { import { WorkerMessageType } from "../../../multithread_types"; import type { IPlayerError } from "../../../public_types"; import idGenerator from "../../../utils/id_generator"; -import objectAssign from "../../../utils/object_assign"; import type { CancellationError, CancellationSignal, @@ -76,7 +75,7 @@ export default class ContentPreparer { public initializeNewContent( sendMessage: (msg: IWorkerMessage, transferables?: Transferable[]) => void, context: IContentInitializationData, - ): Promise { + ): Promise { return new Promise((res, rej) => { this.disposeCurrentContent(); const contentCanceller = this._contentCanceller; @@ -222,7 +221,6 @@ export default class ContentPreparer { return; } - const sentManifest = manifest.getMetadataSnapshot(); manifest.addEventListener( "manifestUpdate", (updates) => { @@ -230,22 +228,16 @@ export default class ContentPreparer { // TODO log warn? return; } - - // Remove `periods` key to reduce cost of an unnecessary manifest - // clone. - const snapshot = objectAssign(manifest.getMetadataSnapshot(), { - periods: [], - }); sendMessage({ type: WorkerMessageType.ManifestUpdate, contentId, - value: { manifest: snapshot, updates }, + value: { manifest, updates }, }); }, contentCanceller.signal, ); unbindRejectOnCancellation(); - res(sentManifest); + res(manifest); } }); } diff --git a/src/multithread_types.ts b/src/multithread_types.ts index 9f96241e9b..b1f7248342 100644 --- a/src/multithread_types.ts +++ b/src/multithread_types.ts @@ -772,17 +772,53 @@ export interface IRepresentationChangeWorkerMessage { }; } +/** Message sent by the Worker when the Manifest is first loaded. */ export interface IManifestReadyWorkerMessage { + /** Identify this particular message. */ type: WorkerMessageType.ManifestReady; + /** The `contentId` linked to this Manifest. */ contentId: string; - value: { manifest: IManifestMetadata }; + value: { + /** + * The actual `Manifest` loaded. + * + * When possible, this should be a `Manifest` instance. + * + * Only if this is not possible (e.g. because the `Manifest` cannot be + * communicated as is between both Worker and main_thread) might you convert + * it to another object also respecting the `IManifestMetadata` interface. + * + * However doing this might lead to some loss of performance and minor + * features. + */ + manifest: IManifestMetadata; + }; } +/** Message sent by the Worker everytime the Manifest is updated. */ export interface IManifestUpdateWorkerMessage { + /** Identify this particular message. */ type: WorkerMessageType.ManifestUpdate; + /** The `contentId` linked to this Manifest. */ contentId: string | undefined; value: { - manifest: IManifestMetadata; // TODO only subpart that changed? + /** + * The new manifest once updated. + * + * When possible, this should be a `Manifest` instance to improve + * performance and allow some advanced features. + * + * Only if this is not possible (e.g. because the `Manifest` cannot be + * communicated as is between both Worker and main_thread) might you convert + * it to another object also respecting the `IManifestMetadata` interface. + * In that last case, you're also authorized to reset the `periods` property + * of that `IManifestMetadata` to an empty array to save up message-passing + * performance. + */ + manifest: IManifestMetadata; + /** + * Object describing what has changed in this update. + */ updates: IPeriodsUpdateResult; }; } diff --git a/src/worker_entry_point.ts b/src/worker_entry_point.ts index b6f3aa0874..3ee53b2f9c 100644 --- a/src/worker_entry_point.ts +++ b/src/worker_entry_point.ts @@ -14,7 +14,8 @@ import { } from "./core/main/worker/globals"; import log from "./experimental/tools/mediaCapabilitiesProber/log"; import features from "./features"; -import type { IWorkerMessage } from "./multithread_types"; +import Manifest from "./manifest/classes"; +import { WorkerMessageType, type IWorkerMessage } from "./multithread_types"; import DashFastJsParser from "./parsers/manifest/dash/fast-js-parser"; import DashWasmParser from "./parsers/manifest/dash/wasm-parser"; import createDashPipelines from "./transports/dash"; @@ -45,7 +46,15 @@ initializeWorkerMain( }, ); +/** + * Perform a `postMessage` to main thread with the given message. + * Arguments follow the `postMessage` API. + * @param {Object} msg + * @param {Array.} [transferables] + */ function sendMessage(msg: IWorkerMessage, transferables?: Transferable[]): void { + updateMessageFormat(msg); + log.debug("<--- Sending to Main:", msg.type); if (transferables === undefined) { postMessage(msg); @@ -57,3 +66,28 @@ function sendMessage(msg: IWorkerMessage, transferables?: Transferable[]): void ); } } + +/** + * Ensure that we're sending data that can be serialized, as this is a + * requirement for the `postMessage` browser API. + * + * If necessary, mutations are done in place. + * @param {Object} msg + */ +function updateMessageFormat(msg: IWorkerMessage): void { + if ( + msg.type === WorkerMessageType.ManifestReady || + msg.type === WorkerMessageType.ManifestUpdate + ) { + if (msg.value.manifest instanceof Manifest) { + msg.value.manifest = msg.value.manifest.getMetadataSnapshot(); + if (msg.type === WorkerMessageType.ManifestUpdate) { + // Remove `periods` key to reduce cost of an unnecessary manifest + // clone. + msg.value.manifest.periods = []; + } + } else { + log.warn("Worker: the Manifest instance should be communicated to `sendMessage`."); + } + } +} From a5968a35c8f288aaa0085053e6f645ac4d4fe6ab Mon Sep 17 00:00:00 2001 From: Paul Berberian Date: Tue, 21 Jan 2025 10:46:11 +0100 Subject: [PATCH 3/4] Fix remaining CoreInterface issues --- src/core/main/worker/content_preparer.ts | 19 +++--- src/core/main/worker/globals.ts | 38 ------------ src/core/main/worker/worker_main.ts | 74 +++++++++++++++++------- src/main_thread/api/public_api.ts | 33 +++-------- src/worker_entry_point.ts | 25 +------- 5 files changed, 74 insertions(+), 115 deletions(-) delete mode 100644 src/core/main/worker/globals.ts diff --git a/src/core/main/worker/content_preparer.ts b/src/core/main/worker/content_preparer.ts index 4ecb50cf7a..bf6ab31b81 100644 --- a/src/core/main/worker/content_preparer.ts +++ b/src/core/main/worker/content_preparer.ts @@ -20,13 +20,13 @@ import type { import TaskCanceller from "../../../utils/task_canceller"; import type { IRepresentationEstimator } from "../../adaptive"; import createAdaptiveRepresentationSelector from "../../adaptive"; +import type { IRepresentationEstimatorThrottlers } from "../../adaptive/adaptive_representation_selector"; import CmcdDataBuilder from "../../cmcd"; import type { IManifestRefreshSettings } from "../../fetchers"; import { ManifestFetcher, SegmentQueueCreator } from "../../fetchers"; import SegmentSinksStore from "../../segment_sinks"; import type { INeedsMediaSourceReloadPayload } from "../../stream"; import DecipherabilityFreezeDetector from "../common/DecipherabilityFreezeDetector"; -import { limitVideoResolution, throttleVideoBitrate } from "./globals"; import TrackChoiceSetter from "./track_choice_setter"; import { formatErrorForSender } from "./utils"; import WorkerTextDisplayerInterface from "./worker_text_displayer_interface"; @@ -75,6 +75,8 @@ export default class ContentPreparer { public initializeNewContent( sendMessage: (msg: IWorkerMessage, transferables?: Transferable[]) => void, context: IContentInitializationData, + /** Allows to filter which Representations can be choosen. */ + throttlers: IRepresentationEstimatorThrottlers, ): Promise { return new Promise((res, rej) => { this.disposeCurrentContent(); @@ -89,9 +91,13 @@ export default class ContentPreparer { const transportFn = features.transports[transport]; if (typeof transportFn !== "function") { - // Stop previous content and reset its state - // XXX TODO: send fatal error - throw new Error(`transport "${transport}" not supported`); + rej( + new Error( + `transport "${transport}" not supported. ` + + "Did you add the corresponding feature?", + ), + ); + return; } const representationFilter = typeof transportOptions.representationFilter === "string" @@ -118,10 +124,7 @@ export default class ContentPreparer { video: context.initialVideoBitrate ?? 0, }, lowLatencyMode: transportOptions.lowLatencyMode, - throttlers: { - limitResolution: { video: limitVideoResolution }, - throttleBitrate: { video: throttleVideoBitrate }, - }, + throttlers, }); const unbindRejectOnCancellation = currentMediaSourceCanceller.signal.register( diff --git a/src/core/main/worker/globals.ts b/src/core/main/worker/globals.ts deleted file mode 100644 index 8617de65b3..0000000000 --- a/src/core/main/worker/globals.ts +++ /dev/null @@ -1,38 +0,0 @@ -import config from "../../../config"; -import SharedReference from "../../../utils/reference"; -import type { IResolutionInfo } from "../../adaptive"; - -const { - DEFAULT_WANTED_BUFFER_AHEAD, - DEFAULT_MAX_VIDEO_BUFFER_SIZE, - DEFAULT_MAX_BUFFER_AHEAD, - DEFAULT_MAX_BUFFER_BEHIND, -} = config.getCurrent(); - -/** Buffer "goal" at which we stop downloading new segments. */ -const wantedBufferAhead = new SharedReference(DEFAULT_WANTED_BUFFER_AHEAD); - -/** Buffer maximum size in kiloBytes at which we stop downloading */ -const maxVideoBufferSize = new SharedReference(DEFAULT_MAX_VIDEO_BUFFER_SIZE); - -/** Max buffer size after the current position, in seconds (we GC further up). */ -const maxBufferAhead = new SharedReference(DEFAULT_MAX_BUFFER_AHEAD); - -/** Max buffer size before the current position, in seconds (we GC further down). */ -const maxBufferBehind = new SharedReference(DEFAULT_MAX_BUFFER_BEHIND); - -const limitVideoResolution = new SharedReference({ - height: undefined, - width: undefined, - pixelRatio: 1, -}); -const throttleVideoBitrate = new SharedReference(Infinity); - -export { - wantedBufferAhead, - maxVideoBufferSize, - maxBufferBehind, - maxBufferAhead, - limitVideoResolution, - throttleVideoBitrate, -}; diff --git a/src/core/main/worker/worker_main.ts b/src/core/main/worker/worker_main.ts index 4dc245cd3a..24a28505f1 100644 --- a/src/core/main/worker/worker_main.ts +++ b/src/core/main/worker/worker_main.ts @@ -49,15 +49,30 @@ export type IMessageReceiverCallback = (evt: { data: IMainThreadMessage }) => vo * receive messages coming from the "main thread" part of the RxPlayer logic. * @param {Function} sendMessage - Function allowing to send messages to the * "main thread" part of the RxPlayer logic. - * @param {Object} refs - Collection of so-called "references": values - * configuring playback that may be updated at any time and that the WorkerMain - * should react on. */ export default function initializeWorkerMain( setMessageReceiver: (cb: IMessageReceiverCallback) => void, sendMessage: (msg: IWorkerMessage, transferables?: Transferable[]) => void, - refs: ICoreReferences, ): void { + const { + DEFAULT_WANTED_BUFFER_AHEAD, + DEFAULT_MAX_VIDEO_BUFFER_SIZE, + DEFAULT_MAX_BUFFER_AHEAD, + DEFAULT_MAX_BUFFER_BEHIND, + } = config.getCurrent(); + const refs: ICoreReferences = { + wantedBufferAhead: new SharedReference(DEFAULT_WANTED_BUFFER_AHEAD), + maxVideoBufferSize: new SharedReference(DEFAULT_MAX_VIDEO_BUFFER_SIZE), + maxBufferAhead: new SharedReference(DEFAULT_MAX_BUFFER_AHEAD), + maxBufferBehind: new SharedReference(DEFAULT_MAX_BUFFER_BEHIND), + limitVideoResolution: new SharedReference({ + height: undefined, + width: undefined, + pixelRatio: 1, + }), + throttleVideoBitrate: new SharedReference(Infinity), + }; + /** * `true` once the worker has been initialized. * Allow to enforce the fact that it is only initialized once. @@ -130,7 +145,7 @@ export default function initializeWorkerMain( break; case MainThreadMessageType.PrepareContent: - prepareNewContent(sendMessage, contentPreparer, msg.value); + prepareNewContent(sendMessage, contentPreparer, msg.value, refs); break; case MainThreadMessageType.StartPreparedContent: { @@ -434,28 +449,37 @@ export default function initializeWorkerMain( * @param {ContentPreparer} contentPreparer * @param {Object} contentInitData - Configuration wanted for the content to * load. + * @param {Object} refs - Collection of so-called "references": values + * configuring playback that may be updated at any time and that the + * WorkerMain should react on. */ function prepareNewContent( sendMessage: (msg: IWorkerMessage, transferables?: Transferable[]) => void, contentPreparer: ContentPreparer, contentInitData: IContentInitializationData, + refs: ICoreReferences, ): void { - contentPreparer.initializeNewContent(sendMessage, contentInitData).then( - (manifest) => { - sendMessage({ - type: WorkerMessageType.ManifestReady, - contentId: contentInitData.contentId, - value: { manifest }, - }); - }, - (err: unknown) => { - sendMessage({ - type: WorkerMessageType.Error, - contentId: contentInitData.contentId, - value: formatErrorForSender(err), - }); - }, - ); + contentPreparer + .initializeNewContent(sendMessage, contentInitData, { + limitResolution: { video: refs.limitVideoResolution }, + throttleBitrate: { video: refs.throttleVideoBitrate }, + }) + .then( + (manifest) => { + sendMessage({ + type: WorkerMessageType.ManifestReady, + contentId: contentInitData.contentId, + value: { manifest }, + }); + }, + (err: unknown) => { + sendMessage({ + type: WorkerMessageType.Error, + contentId: contentInitData.contentId, + value: formatErrorForSender(err), + }); + }, + ); } function updateCoreReference(msg: IReferenceUpdateMessage, refs: ICoreReferences): void { @@ -987,11 +1011,19 @@ function sendSegmentSinksStoreInfos( }); } +/** + * Collection of so-called "references": values configuring playback that may + * be updated at any time and that the WorkerMain should react on. + */ export interface ICoreReferences { limitVideoResolution: SharedReference; + /** Max buffer size after the current position, in seconds (we GC further up). */ maxBufferAhead: SharedReference; + /** Max buffer size before the current position, in seconds (we GC further down). */ maxBufferBehind: SharedReference; + /** Buffer maximum size in kiloBytes at which we stop downloading */ maxVideoBufferSize: SharedReference; throttleVideoBitrate: SharedReference; + /** Buffer "goal" at which we stop downloading new segments. */ wantedBufferAhead: SharedReference; } diff --git a/src/main_thread/api/public_api.ts b/src/main_thread/api/public_api.ts index ef0f5495c4..0ec235c2e3 100644 --- a/src/main_thread/api/public_api.ts +++ b/src/main_thread/api/public_api.ts @@ -40,7 +40,6 @@ import type { IInbandEvent, IABRThrottlers, IBufferType, - IResolutionInfo, } from "../../core/types"; import { MonoThreadCoreInterface, WorkerCoreInterface } from "../../core_interface"; import type { IDefaultConfig } from "../../default_config"; @@ -823,10 +822,6 @@ class Player extends EventEmitter { limitResolution: {}, }; - let throttleVideoBitrate: IReadOnlySharedReference = new SharedReference( - Infinity, - ); - if (this._priv_throttleVideoBitrateWhenHidden) { if (!relyOnVideoVisibilityAndSize) { log.warn( @@ -834,16 +829,15 @@ class Player extends EventEmitter { "browser can't be trusted for visibility.", ); } else { - throttleVideoBitrate = createMappedReference( - getVideoVisibilityRef( - this._priv_pictureInPictureRef, + throttlers.throttleBitrate = { + video: createMappedReference( + getVideoVisibilityRef( + this._priv_pictureInPictureRef, + currentContentCanceller.signal, + ), + (isActive) => (isActive ? Infinity : 0), currentContentCanceller.signal, ), - (isActive) => (isActive ? Infinity : 0), - currentContentCanceller.signal, - ); - throttlers.throttleBitrate = { - video: throttleVideoBitrate, }; } } @@ -930,19 +924,6 @@ class Player extends EventEmitter { initializeWorkerMain( coreInterfaceCallbacks.setCoreMessageReceiver, coreInterfaceCallbacks.sendCoreMessage, - { - // XXX TODO: - limitVideoResolution: new SharedReference({ - height: undefined, - width: undefined, - pixelRatio: 1, - }), - maxBufferAhead: bufferOptions.maxBufferAhead, - maxBufferBehind: bufferOptions.maxBufferBehind, - maxVideoBufferSize: bufferOptions.maxVideoBufferSize, - throttleVideoBitrate: new SharedReference(Infinity), - wantedBufferAhead: bufferOptions.wantedBufferAhead, - }, ); coreInterface.sendMessage({ type: MainThreadMessageType.Init, diff --git a/src/worker_entry_point.ts b/src/worker_entry_point.ts index 3ee53b2f9c..58beb85d0d 100644 --- a/src/worker_entry_point.ts +++ b/src/worker_entry_point.ts @@ -4,14 +4,6 @@ */ import initializeWorkerMain from "./core/main/worker"; -import { - limitVideoResolution, - maxBufferAhead, - maxBufferBehind, - maxVideoBufferSize, - throttleVideoBitrate, - wantedBufferAhead, -} from "./core/main/worker/globals"; import log from "./experimental/tools/mediaCapabilitiesProber/log"; import features from "./features"; import Manifest from "./manifest/classes"; @@ -31,20 +23,9 @@ features.transports.dash = createDashPipelines; globalScope.onmessageerror = (_msg: MessageEvent) => { log.error("Worker: Error when receiving message from main thread."); }; -initializeWorkerMain( - (handler) => { - onmessage = handler; - }, - sendMessage, - { - limitVideoResolution, - maxBufferAhead, - maxBufferBehind, - maxVideoBufferSize, - throttleVideoBitrate, - wantedBufferAhead, - }, -); +initializeWorkerMain((handler) => { + onmessage = handler; +}, sendMessage); /** * Perform a `postMessage` to main thread with the given message. From 3126d2ea86bede6de091b33465f31f36cffe1710 Mon Sep 17 00:00:00 2001 From: Paul Berberian Date: Wed, 22 Jan 2025 10:00:05 +0100 Subject: [PATCH 4/4] CoreInterface and initializeWorkerMain are now part of the features object --- src/core/main/worker/index.ts | 2 + src/core/types.ts | 4 + src/core_interface.ts | 119 ------------------ .../features/__tests__/local.test.ts | 13 +- .../features/__tests__/metaplaylist.test.ts | 13 +- .../features/__tests__/multi_thread.test.ts | 7 +- src/experimental/features/local.ts | 8 +- src/experimental/features/metaplaylist.ts | 8 +- src/experimental/features/multi_thread.ts | 6 +- src/features/features_object.ts | 2 +- src/features/list/__tests__/dash.test.ts | 15 ++- src/features/list/__tests__/smooth.test.ts | 13 +- src/features/list/dash.ts | 8 +- src/features/list/dash_wasm.ts | 8 +- src/features/list/media_source_main.ts | 8 +- src/features/list/smooth.ts | 8 +- src/features/types.ts | 24 +++- src/main_thread/api/public_api.ts | 12 +- src/main_thread/core_interface/README.md | 22 ++++ src/main_thread/core_interface/base.ts | 35 ++++++ src/main_thread/core_interface/monothread.ts | 40 ++++++ src/main_thread/core_interface/multithread.ts | 49 ++++++++ src/main_thread/core_interface/types.ts | 5 + .../init/multi_thread_content_initializer.ts | 2 +- 24 files changed, 284 insertions(+), 147 deletions(-) delete mode 100644 src/core_interface.ts create mode 100644 src/main_thread/core_interface/README.md create mode 100644 src/main_thread/core_interface/base.ts create mode 100644 src/main_thread/core_interface/monothread.ts create mode 100644 src/main_thread/core_interface/multithread.ts create mode 100644 src/main_thread/core_interface/types.ts diff --git a/src/core/main/worker/index.ts b/src/core/main/worker/index.ts index a6a0fc8f2a..00378b2bd1 100644 --- a/src/core/main/worker/index.ts +++ b/src/core/main/worker/index.ts @@ -1,2 +1,4 @@ +import type { IMessageReceiverCallback } from "./worker_main"; import initializeWorkerMain from "./worker_main"; export default initializeWorkerMain; +export type { IMessageReceiverCallback }; diff --git a/src/core/types.ts b/src/core/types.ts index e4a0e61e3d..685fd0c19a 100644 --- a/src/core/types.ts +++ b/src/core/types.ts @@ -7,6 +7,7 @@ import type { IManifestFetcherSettings, ISegmentQueueCreatorBackoffOptions, } from "./fetchers"; +import type { IMessageReceiverCallback } from "./main/worker"; import type { IBufferedChunk, IBufferType, @@ -48,4 +49,7 @@ export type { IStreamOrchestratorPlaybackObservation, IRepresentationsChoice, ITrackSwitchingMode, + + // CoreMain + IMessageReceiverCallback, }; diff --git a/src/core_interface.ts b/src/core_interface.ts deleted file mode 100644 index 8d16ca883b..0000000000 --- a/src/core_interface.ts +++ /dev/null @@ -1,119 +0,0 @@ -import type { IMessageReceiverCallback } from "./core/main/worker/worker_main"; -import log from "./log"; -import type { IMainThreadMessage, IWorkerMessage } from "./multithread_types"; -import noop from "./utils/noop"; - -export default abstract class CoreInterface { - protected listeners: Array<(evt: IWorkerMessage) => void> = []; - protected listenersError: Array<() => void> = []; - - public abstract sendMessage(msg: IMainThreadMessage): void; - - public addMessageListener(cb: (evt: IWorkerMessage) => void): void { - this.listeners.push(cb); - } - - public removeMessageListener(cb: (evt: IWorkerMessage) => void): void { - const index = this.listeners.indexOf(cb); - if (index >= 0) { - this.listeners.splice(index, 1); - } - } - - public addErrorListener(cb: () => void): void { - this.listenersError.push(cb); - } - - public removeErrorListener(cb: () => void): void { - const index = this.listenersError.indexOf(cb); - if (index >= 0) { - this.listenersError.splice(index, 1); - } - } - - public dispose(): void { - this.listeners.length = 0; - this.listenersError.length = 0; - } -} - -export class MonoThreadCoreInterface extends CoreInterface { - private _currentCoreListener: IMessageReceiverCallback; - - constructor() { - super(); - this._currentCoreListener = noop; - } - - public sendMessage(msg: IMainThreadMessage) { - log.debug("---> Sending to Core:", msg.type); - queueMicrotask(() => { - // NOTE: We don't clone for performance reasons - this._currentCoreListener({ data: msg }); - }); - } - - public getCallbacks(): { - setCoreMessageReceiver: (handler: IMessageReceiverCallback) => void; - sendCoreMessage: (msg: IWorkerMessage, transferables?: Transferable[]) => void; - } { - const setCoreMessageReceiver = (handler: IMessageReceiverCallback): void => { - this._currentCoreListener = handler; - }; - const sendCoreMessage = (msg: IWorkerMessage, _transferables?: Transferable[]) => { - queueMicrotask(() => { - log.debug("<--- Receiving from Core:", msg.type); - this.listeners.forEach((listener) => { - listener(msg); - }); - }); - }; - return { setCoreMessageReceiver, sendCoreMessage }; - } -} - -/** - * `CoreInterface` implementation for when the core will run in a WebWorker. - */ -export class WorkerCoreInterface extends CoreInterface { - private _worker: Worker; - - /** - * Initialize a `WorkerCoreInterface` for the given `WebWorker` instance. - * - * The `addMessageListener` and `addMessageListener` will then register - * listeners respectively for the `onmessage` and `onmessageerror` events - * from this `WebWorker`. - * The `sendMessage` method will allow to send messages to the `WebWorker`. - * @param {Worker} worker - */ - constructor(worker: Worker) { - super(); - this._worker = worker; - this._worker.onmessageerror = () => { - this.listenersError.forEach((listener) => { - listener(); - }); - }; - this._worker.onmessage = (evt) => { - this.listeners.forEach((listener) => { - // eslint-disable-next-line @typescript-eslint/no-unsafe-argument - listener(evt.data); - }); - }; - } - - /** - * Send given message to the `WebWorker`. - * @param {Object} msg - * @param {Array.} [transferables] - */ - public sendMessage(msg: IMainThreadMessage, transferables?: Transferable[]): void { - log.debug("---> Sending to Worker:", msg.type); - if (transferables === undefined) { - this._worker.postMessage(msg); - } else { - this._worker.postMessage(msg, transferables); - } - } -} diff --git a/src/experimental/features/__tests__/local.test.ts b/src/experimental/features/__tests__/local.test.ts index da452d767d..2898e8fa7c 100644 --- a/src/experimental/features/__tests__/local.test.ts +++ b/src/experimental/features/__tests__/local.test.ts @@ -1,5 +1,7 @@ import { describe, it, expect } from "vitest"; +import initializeWorkerMain from "../../../core/main/worker"; import type { IFeaturesObject } from "../../../features/types"; +import { MonoThreadCoreInterface } from "../../../main_thread/core_interface/monothread"; import MultiThreadContentInitializer from "../../../main_thread/init/multi_thread_content_initializer"; import local from "../../../transports/local"; import addLocalManifestFeature from "../local"; @@ -10,9 +12,16 @@ describe("Features list - LOCAL_MANIFEST", () => { addLocalManifestFeature(featureObject); expect(featureObject).toEqual({ transports: { local }, - mainThreadMediaSourceInit: MultiThreadContentInitializer, + monothread: { + init: MultiThreadContentInitializer, + coreInterface: MonoThreadCoreInterface, + workerMain: initializeWorkerMain, + }, }); expect(featureObject.transports.local).toBe(local); - expect(featureObject.mainThreadMediaSourceInit).toBe(MultiThreadContentInitializer); + expect(featureObject.monothread).not.toBe(null); + expect(featureObject.monothread?.init).toBe(MultiThreadContentInitializer); + expect(featureObject.monothread?.coreInterface).toBe(MonoThreadCoreInterface); + expect(featureObject.monothread?.workerMain).toBe(initializeWorkerMain); }); }); diff --git a/src/experimental/features/__tests__/metaplaylist.test.ts b/src/experimental/features/__tests__/metaplaylist.test.ts index 3ac6f131f9..bb4ba3c281 100644 --- a/src/experimental/features/__tests__/metaplaylist.test.ts +++ b/src/experimental/features/__tests__/metaplaylist.test.ts @@ -1,5 +1,7 @@ import { describe, it, expect } from "vitest"; +import initializeWorkerMain from "../../../core/main/worker"; import type { IFeaturesObject } from "../../../features/types"; +import { MonoThreadCoreInterface } from "../../../main_thread/core_interface/monothread"; import MultiThreadContentInitializer from "../../../main_thread/init/multi_thread_content_initializer"; import metaplaylist from "../../../transports/metaplaylist"; import addLocalManifestFeature from "../metaplaylist"; @@ -10,9 +12,16 @@ describe("Features list - METAPLAYLIST", () => { addLocalManifestFeature(featureObject); expect(featureObject).toEqual({ transports: { metaplaylist }, - mainThreadMediaSourceInit: MultiThreadContentInitializer, + monothread: { + init: MultiThreadContentInitializer, + coreInterface: MonoThreadCoreInterface, + workerMain: initializeWorkerMain, + }, }); expect(featureObject.transports.metaplaylist).toBe(metaplaylist); - expect(featureObject.mainThreadMediaSourceInit).toBe(MultiThreadContentInitializer); + expect(featureObject.monothread).not.toBe(null); + expect(featureObject.monothread?.init).toBe(MultiThreadContentInitializer); + expect(featureObject.monothread?.coreInterface).toBe(MonoThreadCoreInterface); + expect(featureObject.monothread?.workerMain).toBe(initializeWorkerMain); }); }); diff --git a/src/experimental/features/__tests__/multi_thread.test.ts b/src/experimental/features/__tests__/multi_thread.test.ts index 0f426b5ec7..21c2f3895f 100644 --- a/src/experimental/features/__tests__/multi_thread.test.ts +++ b/src/experimental/features/__tests__/multi_thread.test.ts @@ -1,5 +1,6 @@ import { describe, it, expect } from "vitest"; import type { IFeaturesObject } from "../../../features/types"; +import { WorkerCoreInterface } from "../../../main_thread/core_interface/multithread"; import MultiThreadContentInitializer from "../../../main_thread/init/multi_thread_content_initializer"; import addMultiThreadFeature from "../multi_thread"; @@ -8,10 +9,14 @@ describe("Features list - EME", () => { const featureObject: IFeaturesObject = {} as IFeaturesObject; addMultiThreadFeature(featureObject); expect(featureObject).toEqual({ - multithread: { init: MultiThreadContentInitializer }, + multithread: { + init: MultiThreadContentInitializer, + coreInterface: WorkerCoreInterface, + }, }); expect(featureObject.multithread).not.toBe(null); expect(featureObject.multithread).not.toBe(undefined); expect(featureObject.multithread?.init).toBe(MultiThreadContentInitializer); + expect(featureObject.multithread?.coreInterface).toBe(WorkerCoreInterface); }); }); diff --git a/src/experimental/features/local.ts b/src/experimental/features/local.ts index e6b49e2684..9de3005bab 100644 --- a/src/experimental/features/local.ts +++ b/src/experimental/features/local.ts @@ -14,13 +14,19 @@ * limitations under the License. */ +import initializeWorkerMain from "../../core/main/worker"; import type { IFeaturesObject } from "../../features/types"; +import { MonoThreadCoreInterface } from "../../main_thread/core_interface/monothread"; import MultiThreadContentInitializer from "../../main_thread/init/multi_thread_content_initializer"; import local from "../../transports/local"; function addLocalManifestFeature(features: IFeaturesObject): void { features.transports.local = local; - features.mainThreadMediaSourceInit = MultiThreadContentInitializer; + features.monothread = { + init: MultiThreadContentInitializer, + coreInterface: MonoThreadCoreInterface, + workerMain: initializeWorkerMain, + }; } export { addLocalManifestFeature as LOCAL_MANIFEST }; diff --git a/src/experimental/features/metaplaylist.ts b/src/experimental/features/metaplaylist.ts index b3f8781aab..1f24a87bdc 100644 --- a/src/experimental/features/metaplaylist.ts +++ b/src/experimental/features/metaplaylist.ts @@ -14,13 +14,19 @@ * limitations under the License. */ +import initializeWorkerMain from "../../core/main/worker"; import type { IFeaturesObject } from "../../features/types"; +import { MonoThreadCoreInterface } from "../../main_thread/core_interface/monothread"; import MultiThreadContentInitializer from "../../main_thread/init/multi_thread_content_initializer"; import metaplaylist from "../../transports/metaplaylist"; function addMetaPlaylistFeature(features: IFeaturesObject): void { features.transports.metaplaylist = metaplaylist; - features.mainThreadMediaSourceInit = MultiThreadContentInitializer; + features.monothread = { + init: MultiThreadContentInitializer, + coreInterface: MonoThreadCoreInterface, + workerMain: initializeWorkerMain, + }; } export { addMetaPlaylistFeature as METAPLAYLIST }; diff --git a/src/experimental/features/multi_thread.ts b/src/experimental/features/multi_thread.ts index 2443c96acb..23e7fdcb1a 100644 --- a/src/experimental/features/multi_thread.ts +++ b/src/experimental/features/multi_thread.ts @@ -1,4 +1,5 @@ import type { IFeaturesObject } from "../../features/types"; +import { WorkerCoreInterface } from "../../main_thread/core_interface/multithread"; import MultiThreadContentInitializer from "../../main_thread/init/multi_thread_content_initializer"; /** @@ -6,7 +7,10 @@ import MultiThreadContentInitializer from "../../main_thread/init/multi_thread_c * @param {Object} features */ function addMultiThreadFeature(features: IFeaturesObject): void { - features.multithread = { init: MultiThreadContentInitializer }; + features.multithread = { + init: MultiThreadContentInitializer, + coreInterface: WorkerCoreInterface, + }; } export { addMultiThreadFeature as MULTI_THREAD }; diff --git a/src/features/features_object.ts b/src/features/features_object.ts index 3adfdbdc04..6b2f74550e 100644 --- a/src/features/features_object.ts +++ b/src/features/features_object.ts @@ -27,7 +27,7 @@ const features: IFeaturesObject = { decrypt: null, htmlTextDisplayer: null, htmlTextTracksParsers: {}, - mainThreadMediaSourceInit: null, + monothread: null, multithread: null, nativeTextDisplayer: null, nativeTextTracksParsers: {}, diff --git a/src/features/list/__tests__/dash.test.ts b/src/features/list/__tests__/dash.test.ts index 700e09be45..4cfd830655 100644 --- a/src/features/list/__tests__/dash.test.ts +++ b/src/features/list/__tests__/dash.test.ts @@ -1,4 +1,6 @@ import { describe, it, expect } from "vitest"; +import initializeWorkerMain from "../../../core/main/worker"; +import { MonoThreadCoreInterface } from "../../../main_thread/core_interface/monothread"; import MultiThreadContentInitializer from "../../../main_thread/init/multi_thread_content_initializer"; import nativeDashParser from "../../../parsers/manifest/dash/native-parser"; import DASHFeature from "../../../transports/dash"; @@ -10,15 +12,22 @@ describe("Features list - DASH", () => { const featureObject = { transports: {}, dashParsers: { fastJs: null, native: null, wasm: null }, - mainThreadMediaSourceInit: null, + monothread: null, } as unknown as IFeaturesObject; addDASHFeature(featureObject); expect(featureObject).toEqual({ transports: { dash: DASHFeature }, dashParsers: { native: nativeDashParser, fastJs: null, wasm: null }, - mainThreadMediaSourceInit: MultiThreadContentInitializer, + monothread: { + init: MultiThreadContentInitializer, + coreInterface: MonoThreadCoreInterface, + workerMain: initializeWorkerMain, + }, }); expect(featureObject.transports.dash).toBe(DASHFeature); - expect(featureObject.mainThreadMediaSourceInit).toBe(MultiThreadContentInitializer); + expect(featureObject.monothread).not.toBe(null); + expect(featureObject.monothread?.init).toBe(MultiThreadContentInitializer); + expect(featureObject.monothread?.coreInterface).toBe(MonoThreadCoreInterface); + expect(featureObject.monothread?.workerMain).toBe(initializeWorkerMain); }); }); diff --git a/src/features/list/__tests__/smooth.test.ts b/src/features/list/__tests__/smooth.test.ts index 8bf5b4a7d0..ff62dc64cd 100644 --- a/src/features/list/__tests__/smooth.test.ts +++ b/src/features/list/__tests__/smooth.test.ts @@ -1,4 +1,6 @@ import { describe, it, expect } from "vitest"; +import initializeWorkerMain from "../../../core/main/worker"; +import { MonoThreadCoreInterface } from "../../../main_thread/core_interface/monothread"; import MultiThreadContentInitializer from "../../../main_thread/init/multi_thread_content_initializer"; import SmoothFeature from "../../../transports/smooth"; import type { IFeaturesObject } from "../../types"; @@ -10,9 +12,16 @@ describe("Features list - Smooth", () => { addSmoothFeature(featureObject); expect(featureObject).toEqual({ transports: { smooth: SmoothFeature }, - mainThreadMediaSourceInit: MultiThreadContentInitializer, + monothread: { + init: MultiThreadContentInitializer, + coreInterface: MonoThreadCoreInterface, + workerMain: initializeWorkerMain, + }, }); expect(featureObject.transports.smooth).toBe(SmoothFeature); - expect(featureObject.mainThreadMediaSourceInit).toBe(MultiThreadContentInitializer); + expect(featureObject.monothread).not.toBe(null); + expect(featureObject.monothread?.init).toBe(MultiThreadContentInitializer); + expect(featureObject.monothread?.coreInterface).toBe(MonoThreadCoreInterface); + expect(featureObject.monothread?.workerMain).toBe(initializeWorkerMain); }); }); diff --git a/src/features/list/dash.ts b/src/features/list/dash.ts index d02e6d7cb1..6caecb8f7e 100644 --- a/src/features/list/dash.ts +++ b/src/features/list/dash.ts @@ -14,6 +14,8 @@ * limitations under the License. */ +import initializeWorkerMain from "../../core/main/worker"; +import { MonoThreadCoreInterface } from "../../main_thread/core_interface/monothread"; import MultiThreadContentInitializer from "../../main_thread/init/multi_thread_content_initializer"; import dashJsParser from "../../parsers/manifest/dash/native-parser"; import dash from "../../transports/dash"; @@ -28,7 +30,11 @@ function addDASHFeature(features: IFeaturesObject): void { features.transports.dash = dash; } features.dashParsers.native = dashJsParser; - features.mainThreadMediaSourceInit = MultiThreadContentInitializer; + features.monothread = { + init: MultiThreadContentInitializer, + coreInterface: MonoThreadCoreInterface, + workerMain: initializeWorkerMain, + }; } export { addDASHFeature as DASH }; diff --git a/src/features/list/dash_wasm.ts b/src/features/list/dash_wasm.ts index 9bd6821aec..ab781862df 100644 --- a/src/features/list/dash_wasm.ts +++ b/src/features/list/dash_wasm.ts @@ -14,7 +14,9 @@ * limitations under the License. */ +import initializeWorkerMain from "../../core/main/worker"; import type { IFeaturesObject } from "../../features/types"; +import { MonoThreadCoreInterface } from "../../main_thread/core_interface/monothread"; import MultiThreadContentInitializer from "../../main_thread/init/multi_thread_content_initializer"; import type { IDashWasmParserOptions } from "../../parsers/manifest/dash/wasm-parser"; import DashWasmParser from "../../parsers/manifest/dash/wasm-parser"; @@ -27,7 +29,11 @@ const dashWasmFeature = { features.transports.dash = dash; } features.dashParsers.wasm = dashWasmParser; - features.mainThreadMediaSourceInit = MultiThreadContentInitializer; + features.monothread = { + init: MultiThreadContentInitializer, + coreInterface: MonoThreadCoreInterface, + workerMain: initializeWorkerMain, + }; }, initialize(opts: IDashWasmParserOptions): Promise { diff --git a/src/features/list/media_source_main.ts b/src/features/list/media_source_main.ts index 0245c4bf52..051cf84972 100644 --- a/src/features/list/media_source_main.ts +++ b/src/features/list/media_source_main.ts @@ -1,3 +1,5 @@ +import initializeWorkerMain from "../../core/main/worker"; +import { MonoThreadCoreInterface } from "../../main_thread/core_interface/monothread"; import MultiThreadContentInitializer from "../../main_thread/init/multi_thread_content_initializer"; import type { IFeaturesObject } from "../types"; @@ -6,7 +8,11 @@ import type { IFeaturesObject } from "../types"; * @param {Object} features */ function addMediaSourceMainFeature(features: IFeaturesObject): void { - features.mainThreadMediaSourceInit = MultiThreadContentInitializer; + features.monothread = { + init: MultiThreadContentInitializer, + coreInterface: MonoThreadCoreInterface, + workerMain: initializeWorkerMain, + }; } export { addMediaSourceMainFeature as MEDIA_SOURCE_MAIN }; diff --git a/src/features/list/smooth.ts b/src/features/list/smooth.ts index ba5a847d78..017b908b1a 100644 --- a/src/features/list/smooth.ts +++ b/src/features/list/smooth.ts @@ -14,6 +14,8 @@ * limitations under the License. */ +import initializeWorkerMain from "../../core/main/worker"; +import { MonoThreadCoreInterface } from "../../main_thread/core_interface/monothread"; import MultiThreadContentInitializer from "../../main_thread/init/multi_thread_content_initializer"; import smooth from "../../transports/smooth"; import type { IFeaturesObject } from "../types"; @@ -26,7 +28,11 @@ function addSmoothFeature(features: IFeaturesObject): void { if (features.transports.smooth === undefined) { features.transports.smooth = smooth; } - features.mainThreadMediaSourceInit = MultiThreadContentInitializer; + features.monothread = { + init: MultiThreadContentInitializer, + coreInterface: MonoThreadCoreInterface, + workerMain: initializeWorkerMain, + }; } export { addSmoothFeature as SMOOTH }; diff --git a/src/features/types.ts b/src/features/types.ts index 91405a65a6..84f664c648 100644 --- a/src/features/types.ts +++ b/src/features/types.ts @@ -15,7 +15,12 @@ */ import type { IMediaElement } from "../compat/browser_compatibility_types"; +import type IInitializeWorkerMain from "../core/main/worker"; import type { SegmentSink } from "../core/segment_sinks"; +import type { + WorkerCoreInterface, + MonoThreadCoreInterface, +} from "../main_thread/core_interface/types"; import type ContentDecryptor from "../main_thread/decrypt"; import type DirectFileContentInitializer from "../main_thread/init/directfile_content_initializer"; import type MultiThreadContentInitializer from "../main_thread/init/multi_thread_content_initializer"; @@ -116,14 +121,29 @@ export interface IFeaturesObject { * Feature allowing to load contents through MediaSource API through the * main thread. */ - mainThreadMediaSourceInit: typeof MultiThreadContentInitializer | null; + monothread: { + /** Class to load a content through the MediaSource API. */ + init: typeof MultiThreadContentInitializer; + /** The RxPlayer's core logic. */ + workerMain: typeof IInitializeWorkerMain; + /** + * Class allowing to exchange messages with the RxPlayer's `core`, here + * running in `workerMain` + */ + coreInterface: typeof MonoThreadCoreInterface; + } | null; /** * Features allowing to load contents through MediaSource API through * a WebWorker. */ multithread: { - /** Class to load a content through MediaSource API via a WebWorker. */ + /** Class to load a content through the MediaSource API. */ init: typeof MultiThreadContentInitializer; + /** + * Class allowing to exchange messages with the RxPlayer's `core`, here + * running in another thread. + */ + coreInterface: typeof WorkerCoreInterface; } | null; /** * Function for loading and parsing contents through various protocols, by diff --git a/src/main_thread/api/public_api.ts b/src/main_thread/api/public_api.ts index 0ec235c2e3..445fc05a97 100644 --- a/src/main_thread/api/public_api.ts +++ b/src/main_thread/api/public_api.ts @@ -33,7 +33,6 @@ import hasMseInWorker from "../../compat/has_mse_in_worker"; import hasWorkerApi from "../../compat/has_worker_api"; import isDebugModeEnabled from "../../compat/is_debug_mode_enabled"; import config from "../../config"; -import initializeWorkerMain from "../../core/main/worker"; import type { ISegmentSinkMetrics } from "../../core/segment_sinks/segment_sinks_store"; import type { IAdaptationChoice, @@ -41,7 +40,6 @@ import type { IABRThrottlers, IBufferType, } from "../../core/types"; -import { MonoThreadCoreInterface, WorkerCoreInterface } from "../../core_interface"; import type { IDefaultConfig } from "../../default_config"; import type { IErrorCode, IErrorType } from "../../errors"; import { ErrorCodes, ErrorTypes, formatError, MediaError } from "../../errors"; @@ -912,16 +910,16 @@ class Player extends EventEmitter { ) && typeof options.representationFilter !== "function"; if (mode === "main" || (mode === "auto" && !canRunInMultiThread)) { - if (features.mainThreadMediaSourceInit === null) { + if (features.monothread === null) { throw new Error( "Cannot load video, neither in a WebWorker nor with the " + "`MEDIA_SOURCE_MAIN` feature", ); } log.info("API: Initializing MediaSource mode in the main thread"); - const coreInterface = new MonoThreadCoreInterface(); + const coreInterface = new features.monothread.coreInterface(); const coreInterfaceCallbacks = coreInterface.getCallbacks(); - initializeWorkerMain( + features.monothread.workerMain( coreInterfaceCallbacks.setCoreMessageReceiver, coreInterfaceCallbacks.sendCoreMessage, ); @@ -938,7 +936,7 @@ class Player extends EventEmitter { timestamp: getMonotonicTimeStamp(), }, }); - initializer = new features.mainThreadMediaSourceInit({ + initializer = new features.monothread.init({ coreInterface, adaptiveOptions, autoPlay, @@ -971,7 +969,7 @@ class Player extends EventEmitter { useWorker = true; log.info("API: Initializing MediaSource mode in a WebWorker"); initializer = new features.multithread.init({ - coreInterface: new WorkerCoreInterface(this._priv_worker), + coreInterface: new features.multithread.coreInterface(this._priv_worker), adaptiveOptions, autoPlay, bufferOptions, diff --git a/src/main_thread/core_interface/README.md b/src/main_thread/core_interface/README.md new file mode 100644 index 0000000000..42681c3740 --- /dev/null +++ b/src/main_thread/core_interface/README.md @@ -0,0 +1,22 @@ +# CoreInterface + +| Consideration | Status | +| ----------------------- | ---------------------------------------------------- | +| Preferred import style | Only through the `features` object | +| Multithread environment | May run in WebWorker depending on the implementation | + +## Overview + +This directory defines `CoreInterface` implementations, which is the link between the +RxPlayer's "main_thread" logic (which implements logic running always in main thread: the +API, decryption, subtitles rendering, track management...) to the RxPlayer's "core" logic +(which implements logic which may run in a WebWorker: manifest loading, segment loading +and parsing, adaptive...). + +Depending on if playback for the current content currently relies on a single thread or +multiple ones, a different `CoreInterface` implementation will be selected. + +This directory should not be imported directly by the main logic (beside types). Rather, +the RxPlayer's global `features` object should refer to the implementations currently +available instead. This is to ensure that the `CoreInterface` logic is only imported if +the application has explicitly asked for the corresponding feature. diff --git a/src/main_thread/core_interface/base.ts b/src/main_thread/core_interface/base.ts new file mode 100644 index 0000000000..e800f7d3c5 --- /dev/null +++ b/src/main_thread/core_interface/base.ts @@ -0,0 +1,35 @@ +import type { IMainThreadMessage, IWorkerMessage } from "../../multithread_types"; + +export default abstract class CoreInterface { + protected listeners: Array<(evt: IWorkerMessage) => void> = []; + protected listenersError: Array<() => void> = []; + + public abstract sendMessage(msg: IMainThreadMessage): void; + + public addMessageListener(cb: (evt: IWorkerMessage) => void): void { + this.listeners.push(cb); + } + + public removeMessageListener(cb: (evt: IWorkerMessage) => void): void { + const index = this.listeners.indexOf(cb); + if (index >= 0) { + this.listeners.splice(index, 1); + } + } + + public addErrorListener(cb: () => void): void { + this.listenersError.push(cb); + } + + public removeErrorListener(cb: () => void): void { + const index = this.listenersError.indexOf(cb); + if (index >= 0) { + this.listenersError.splice(index, 1); + } + } + + public dispose(): void { + this.listeners.length = 0; + this.listenersError.length = 0; + } +} diff --git a/src/main_thread/core_interface/monothread.ts b/src/main_thread/core_interface/monothread.ts new file mode 100644 index 0000000000..6e2885204e --- /dev/null +++ b/src/main_thread/core_interface/monothread.ts @@ -0,0 +1,40 @@ +import type { IMessageReceiverCallback } from "../../core/types"; +import log from "../../log"; +import type { IMainThreadMessage, IWorkerMessage } from "../../multithread_types"; +import noop from "../../utils/noop"; +import CoreInterface from "./base"; + +export class MonoThreadCoreInterface extends CoreInterface { + private _currentCoreListener: IMessageReceiverCallback; + + constructor() { + super(); + this._currentCoreListener = noop; + } + + public sendMessage(msg: IMainThreadMessage) { + log.debug("---> Sending to Core:", msg.type); + queueMicrotask(() => { + // NOTE: We don't clone for performance reasons + this._currentCoreListener({ data: msg }); + }); + } + + public getCallbacks(): { + setCoreMessageReceiver: (handler: IMessageReceiverCallback) => void; + sendCoreMessage: (msg: IWorkerMessage, transferables?: Transferable[]) => void; + } { + const setCoreMessageReceiver = (handler: IMessageReceiverCallback): void => { + this._currentCoreListener = handler; + }; + const sendCoreMessage = (msg: IWorkerMessage, _transferables?: Transferable[]) => { + queueMicrotask(() => { + log.debug("<--- Receiving from Core:", msg.type); + this.listeners.forEach((listener) => { + listener(msg); + }); + }); + }; + return { setCoreMessageReceiver, sendCoreMessage }; + } +} diff --git a/src/main_thread/core_interface/multithread.ts b/src/main_thread/core_interface/multithread.ts new file mode 100644 index 0000000000..631f6219f5 --- /dev/null +++ b/src/main_thread/core_interface/multithread.ts @@ -0,0 +1,49 @@ +import log from "../../log"; +import type { IMainThreadMessage } from "../../multithread_types"; +import CoreInterface from "./base"; + +/** + * `CoreInterface` implementation for when the core will run in a WebWorker. + */ +export class WorkerCoreInterface extends CoreInterface { + private _worker: Worker; + + /** + * Initialize a `WorkerCoreInterface` for the given `WebWorker` instance. + * + * The `addMessageListener` and `addMessageListener` will then register + * listeners respectively for the `onmessage` and `onmessageerror` events + * from this `WebWorker`. + * The `sendMessage` method will allow to send messages to the `WebWorker`. + * @param {Worker} worker + */ + constructor(worker: Worker) { + super(); + this._worker = worker; + this._worker.onmessageerror = () => { + this.listenersError.forEach((listener) => { + listener(); + }); + }; + this._worker.onmessage = (evt) => { + this.listeners.forEach((listener) => { + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument + listener(evt.data); + }); + }; + } + + /** + * Send given message to the `WebWorker`. + * @param {Object} msg + * @param {Array.} [transferables] + */ + public sendMessage(msg: IMainThreadMessage, transferables?: Transferable[]): void { + log.debug("---> Sending to Worker:", msg.type); + if (transferables === undefined) { + this._worker.postMessage(msg); + } else { + this._worker.postMessage(msg, transferables); + } + } +} diff --git a/src/main_thread/core_interface/types.ts b/src/main_thread/core_interface/types.ts new file mode 100644 index 0000000000..37527a94b5 --- /dev/null +++ b/src/main_thread/core_interface/types.ts @@ -0,0 +1,5 @@ +import type CoreInterface from "./base"; +import type { MonoThreadCoreInterface } from "./monothread"; +import type { WorkerCoreInterface } from "./multithread"; +export type { MonoThreadCoreInterface, WorkerCoreInterface }; +export default CoreInterface; diff --git a/src/main_thread/init/multi_thread_content_initializer.ts b/src/main_thread/init/multi_thread_content_initializer.ts index 27126b51fc..ce35aafea1 100644 --- a/src/main_thread/init/multi_thread_content_initializer.ts +++ b/src/main_thread/init/multi_thread_content_initializer.ts @@ -7,7 +7,6 @@ import type { IAdaptationChoice, IResolutionInfo, } from "../../core/types"; -import type CoreInterface from "../../core_interface"; import { EncryptedMediaError, MediaError, @@ -53,6 +52,7 @@ import SharedReference from "../../utils/reference"; import { RequestError } from "../../utils/request"; import type { CancellationSignal } from "../../utils/task_canceller"; import TaskCanceller, { CancellationError } from "../../utils/task_canceller"; +import type CoreInterface from "../core_interface/types"; import type { IContentProtection } from "../decrypt"; import type IContentDecryptor from "../decrypt"; import { ContentDecryptorState, getKeySystemConfiguration } from "../decrypt";