diff --git a/src/core/main/worker/index.ts b/src/core/main/worker/index.ts index a6a0fc8f2a..00378b2bd1 100644 --- a/src/core/main/worker/index.ts +++ b/src/core/main/worker/index.ts @@ -1,2 +1,4 @@ +import type { IMessageReceiverCallback } from "./worker_main"; import initializeWorkerMain from "./worker_main"; export default initializeWorkerMain; +export type { IMessageReceiverCallback }; diff --git a/src/core/types.ts b/src/core/types.ts index e4a0e61e3d..685fd0c19a 100644 --- a/src/core/types.ts +++ b/src/core/types.ts @@ -7,6 +7,7 @@ import type { IManifestFetcherSettings, ISegmentQueueCreatorBackoffOptions, } from "./fetchers"; +import type { IMessageReceiverCallback } from "./main/worker"; import type { IBufferedChunk, IBufferType, @@ -48,4 +49,7 @@ export type { IStreamOrchestratorPlaybackObservation, IRepresentationsChoice, ITrackSwitchingMode, + + // CoreMain + IMessageReceiverCallback, }; diff --git a/src/core_interface.ts b/src/core_interface.ts deleted file mode 100644 index 8d16ca883b..0000000000 --- a/src/core_interface.ts +++ /dev/null @@ -1,119 +0,0 @@ -import type { IMessageReceiverCallback } from "./core/main/worker/worker_main"; -import log from "./log"; -import type { IMainThreadMessage, IWorkerMessage } from "./multithread_types"; -import noop from "./utils/noop"; - -export default abstract class CoreInterface { - protected listeners: Array<(evt: IWorkerMessage) => void> = []; - protected listenersError: Array<() => void> = []; - - public abstract sendMessage(msg: IMainThreadMessage): void; - - public addMessageListener(cb: (evt: IWorkerMessage) => void): void { - this.listeners.push(cb); - } - - public removeMessageListener(cb: (evt: IWorkerMessage) => void): void { - const index = this.listeners.indexOf(cb); - if (index >= 0) { - this.listeners.splice(index, 1); - } - } - - public addErrorListener(cb: () => void): void { - this.listenersError.push(cb); - } - - public removeErrorListener(cb: () => void): void { - const index = this.listenersError.indexOf(cb); - if (index >= 0) { - this.listenersError.splice(index, 1); - } - } - - public dispose(): void { - this.listeners.length = 0; - this.listenersError.length = 0; - } -} - -export class MonoThreadCoreInterface extends CoreInterface { - private _currentCoreListener: IMessageReceiverCallback; - - constructor() { - super(); - this._currentCoreListener = noop; - } - - public sendMessage(msg: IMainThreadMessage) { - log.debug("---> Sending to Core:", msg.type); - queueMicrotask(() => { - // NOTE: We don't clone for performance reasons - this._currentCoreListener({ data: msg }); - }); - } - - public getCallbacks(): { - setCoreMessageReceiver: (handler: IMessageReceiverCallback) => void; - sendCoreMessage: (msg: IWorkerMessage, transferables?: Transferable[]) => void; - } { - const setCoreMessageReceiver = (handler: IMessageReceiverCallback): void => { - this._currentCoreListener = handler; - }; - const sendCoreMessage = (msg: IWorkerMessage, _transferables?: Transferable[]) => { - queueMicrotask(() => { - log.debug("<--- Receiving from Core:", msg.type); - this.listeners.forEach((listener) => { - listener(msg); - }); - }); - }; - return { setCoreMessageReceiver, sendCoreMessage }; - } -} - -/** - * `CoreInterface` implementation for when the core will run in a WebWorker. - */ -export class WorkerCoreInterface extends CoreInterface { - private _worker: Worker; - - /** - * Initialize a `WorkerCoreInterface` for the given `WebWorker` instance. - * - * The `addMessageListener` and `addMessageListener` will then register - * listeners respectively for the `onmessage` and `onmessageerror` events - * from this `WebWorker`. - * The `sendMessage` method will allow to send messages to the `WebWorker`. - * @param {Worker} worker - */ - constructor(worker: Worker) { - super(); - this._worker = worker; - this._worker.onmessageerror = () => { - this.listenersError.forEach((listener) => { - listener(); - }); - }; - this._worker.onmessage = (evt) => { - this.listeners.forEach((listener) => { - // eslint-disable-next-line @typescript-eslint/no-unsafe-argument - listener(evt.data); - }); - }; - } - - /** - * Send given message to the `WebWorker`. - * @param {Object} msg - * @param {Array.} [transferables] - */ - public sendMessage(msg: IMainThreadMessage, transferables?: Transferable[]): void { - log.debug("---> Sending to Worker:", msg.type); - if (transferables === undefined) { - this._worker.postMessage(msg); - } else { - this._worker.postMessage(msg, transferables); - } - } -} diff --git a/src/experimental/features/__tests__/local.test.ts b/src/experimental/features/__tests__/local.test.ts index da452d767d..2898e8fa7c 100644 --- a/src/experimental/features/__tests__/local.test.ts +++ b/src/experimental/features/__tests__/local.test.ts @@ -1,5 +1,7 @@ import { describe, it, expect } from "vitest"; +import initializeWorkerMain from "../../../core/main/worker"; import type { IFeaturesObject } from "../../../features/types"; +import { MonoThreadCoreInterface } from "../../../main_thread/core_interface/monothread"; import MultiThreadContentInitializer from "../../../main_thread/init/multi_thread_content_initializer"; import local from "../../../transports/local"; import addLocalManifestFeature from "../local"; @@ -10,9 +12,16 @@ describe("Features list - LOCAL_MANIFEST", () => { addLocalManifestFeature(featureObject); expect(featureObject).toEqual({ transports: { local }, - mainThreadMediaSourceInit: MultiThreadContentInitializer, + monothread: { + init: MultiThreadContentInitializer, + coreInterface: MonoThreadCoreInterface, + workerMain: initializeWorkerMain, + }, }); expect(featureObject.transports.local).toBe(local); - expect(featureObject.mainThreadMediaSourceInit).toBe(MultiThreadContentInitializer); + expect(featureObject.monothread).not.toBe(null); + expect(featureObject.monothread?.init).toBe(MultiThreadContentInitializer); + expect(featureObject.monothread?.coreInterface).toBe(MonoThreadCoreInterface); + expect(featureObject.monothread?.workerMain).toBe(initializeWorkerMain); }); }); diff --git a/src/experimental/features/__tests__/metaplaylist.test.ts b/src/experimental/features/__tests__/metaplaylist.test.ts index 3ac6f131f9..bb4ba3c281 100644 --- a/src/experimental/features/__tests__/metaplaylist.test.ts +++ b/src/experimental/features/__tests__/metaplaylist.test.ts @@ -1,5 +1,7 @@ import { describe, it, expect } from "vitest"; +import initializeWorkerMain from "../../../core/main/worker"; import type { IFeaturesObject } from "../../../features/types"; +import { MonoThreadCoreInterface } from "../../../main_thread/core_interface/monothread"; import MultiThreadContentInitializer from "../../../main_thread/init/multi_thread_content_initializer"; import metaplaylist from "../../../transports/metaplaylist"; import addLocalManifestFeature from "../metaplaylist"; @@ -10,9 +12,16 @@ describe("Features list - METAPLAYLIST", () => { addLocalManifestFeature(featureObject); expect(featureObject).toEqual({ transports: { metaplaylist }, - mainThreadMediaSourceInit: MultiThreadContentInitializer, + monothread: { + init: MultiThreadContentInitializer, + coreInterface: MonoThreadCoreInterface, + workerMain: initializeWorkerMain, + }, }); expect(featureObject.transports.metaplaylist).toBe(metaplaylist); - expect(featureObject.mainThreadMediaSourceInit).toBe(MultiThreadContentInitializer); + expect(featureObject.monothread).not.toBe(null); + expect(featureObject.monothread?.init).toBe(MultiThreadContentInitializer); + expect(featureObject.monothread?.coreInterface).toBe(MonoThreadCoreInterface); + expect(featureObject.monothread?.workerMain).toBe(initializeWorkerMain); }); }); diff --git a/src/experimental/features/__tests__/multi_thread.test.ts b/src/experimental/features/__tests__/multi_thread.test.ts index 0f426b5ec7..21c2f3895f 100644 --- a/src/experimental/features/__tests__/multi_thread.test.ts +++ b/src/experimental/features/__tests__/multi_thread.test.ts @@ -1,5 +1,6 @@ import { describe, it, expect } from "vitest"; import type { IFeaturesObject } from "../../../features/types"; +import { WorkerCoreInterface } from "../../../main_thread/core_interface/multithread"; import MultiThreadContentInitializer from "../../../main_thread/init/multi_thread_content_initializer"; import addMultiThreadFeature from "../multi_thread"; @@ -8,10 +9,14 @@ describe("Features list - EME", () => { const featureObject: IFeaturesObject = {} as IFeaturesObject; addMultiThreadFeature(featureObject); expect(featureObject).toEqual({ - multithread: { init: MultiThreadContentInitializer }, + multithread: { + init: MultiThreadContentInitializer, + coreInterface: WorkerCoreInterface, + }, }); expect(featureObject.multithread).not.toBe(null); expect(featureObject.multithread).not.toBe(undefined); expect(featureObject.multithread?.init).toBe(MultiThreadContentInitializer); + expect(featureObject.multithread?.coreInterface).toBe(WorkerCoreInterface); }); }); diff --git a/src/experimental/features/local.ts b/src/experimental/features/local.ts index e6b49e2684..9de3005bab 100644 --- a/src/experimental/features/local.ts +++ b/src/experimental/features/local.ts @@ -14,13 +14,19 @@ * limitations under the License. */ +import initializeWorkerMain from "../../core/main/worker"; import type { IFeaturesObject } from "../../features/types"; +import { MonoThreadCoreInterface } from "../../main_thread/core_interface/monothread"; import MultiThreadContentInitializer from "../../main_thread/init/multi_thread_content_initializer"; import local from "../../transports/local"; function addLocalManifestFeature(features: IFeaturesObject): void { features.transports.local = local; - features.mainThreadMediaSourceInit = MultiThreadContentInitializer; + features.monothread = { + init: MultiThreadContentInitializer, + coreInterface: MonoThreadCoreInterface, + workerMain: initializeWorkerMain, + }; } export { addLocalManifestFeature as LOCAL_MANIFEST }; diff --git a/src/experimental/features/metaplaylist.ts b/src/experimental/features/metaplaylist.ts index b3f8781aab..1f24a87bdc 100644 --- a/src/experimental/features/metaplaylist.ts +++ b/src/experimental/features/metaplaylist.ts @@ -14,13 +14,19 @@ * limitations under the License. */ +import initializeWorkerMain from "../../core/main/worker"; import type { IFeaturesObject } from "../../features/types"; +import { MonoThreadCoreInterface } from "../../main_thread/core_interface/monothread"; import MultiThreadContentInitializer from "../../main_thread/init/multi_thread_content_initializer"; import metaplaylist from "../../transports/metaplaylist"; function addMetaPlaylistFeature(features: IFeaturesObject): void { features.transports.metaplaylist = metaplaylist; - features.mainThreadMediaSourceInit = MultiThreadContentInitializer; + features.monothread = { + init: MultiThreadContentInitializer, + coreInterface: MonoThreadCoreInterface, + workerMain: initializeWorkerMain, + }; } export { addMetaPlaylistFeature as METAPLAYLIST }; diff --git a/src/experimental/features/multi_thread.ts b/src/experimental/features/multi_thread.ts index 2443c96acb..23e7fdcb1a 100644 --- a/src/experimental/features/multi_thread.ts +++ b/src/experimental/features/multi_thread.ts @@ -1,4 +1,5 @@ import type { IFeaturesObject } from "../../features/types"; +import { WorkerCoreInterface } from "../../main_thread/core_interface/multithread"; import MultiThreadContentInitializer from "../../main_thread/init/multi_thread_content_initializer"; /** @@ -6,7 +7,10 @@ import MultiThreadContentInitializer from "../../main_thread/init/multi_thread_c * @param {Object} features */ function addMultiThreadFeature(features: IFeaturesObject): void { - features.multithread = { init: MultiThreadContentInitializer }; + features.multithread = { + init: MultiThreadContentInitializer, + coreInterface: WorkerCoreInterface, + }; } export { addMultiThreadFeature as MULTI_THREAD }; diff --git a/src/features/features_object.ts b/src/features/features_object.ts index 3adfdbdc04..6b2f74550e 100644 --- a/src/features/features_object.ts +++ b/src/features/features_object.ts @@ -27,7 +27,7 @@ const features: IFeaturesObject = { decrypt: null, htmlTextDisplayer: null, htmlTextTracksParsers: {}, - mainThreadMediaSourceInit: null, + monothread: null, multithread: null, nativeTextDisplayer: null, nativeTextTracksParsers: {}, diff --git a/src/features/list/__tests__/dash.test.ts b/src/features/list/__tests__/dash.test.ts index 700e09be45..4cfd830655 100644 --- a/src/features/list/__tests__/dash.test.ts +++ b/src/features/list/__tests__/dash.test.ts @@ -1,4 +1,6 @@ import { describe, it, expect } from "vitest"; +import initializeWorkerMain from "../../../core/main/worker"; +import { MonoThreadCoreInterface } from "../../../main_thread/core_interface/monothread"; import MultiThreadContentInitializer from "../../../main_thread/init/multi_thread_content_initializer"; import nativeDashParser from "../../../parsers/manifest/dash/native-parser"; import DASHFeature from "../../../transports/dash"; @@ -10,15 +12,22 @@ describe("Features list - DASH", () => { const featureObject = { transports: {}, dashParsers: { fastJs: null, native: null, wasm: null }, - mainThreadMediaSourceInit: null, + monothread: null, } as unknown as IFeaturesObject; addDASHFeature(featureObject); expect(featureObject).toEqual({ transports: { dash: DASHFeature }, dashParsers: { native: nativeDashParser, fastJs: null, wasm: null }, - mainThreadMediaSourceInit: MultiThreadContentInitializer, + monothread: { + init: MultiThreadContentInitializer, + coreInterface: MonoThreadCoreInterface, + workerMain: initializeWorkerMain, + }, }); expect(featureObject.transports.dash).toBe(DASHFeature); - expect(featureObject.mainThreadMediaSourceInit).toBe(MultiThreadContentInitializer); + expect(featureObject.monothread).not.toBe(null); + expect(featureObject.monothread?.init).toBe(MultiThreadContentInitializer); + expect(featureObject.monothread?.coreInterface).toBe(MonoThreadCoreInterface); + expect(featureObject.monothread?.workerMain).toBe(initializeWorkerMain); }); }); diff --git a/src/features/list/__tests__/smooth.test.ts b/src/features/list/__tests__/smooth.test.ts index 8bf5b4a7d0..ff62dc64cd 100644 --- a/src/features/list/__tests__/smooth.test.ts +++ b/src/features/list/__tests__/smooth.test.ts @@ -1,4 +1,6 @@ import { describe, it, expect } from "vitest"; +import initializeWorkerMain from "../../../core/main/worker"; +import { MonoThreadCoreInterface } from "../../../main_thread/core_interface/monothread"; import MultiThreadContentInitializer from "../../../main_thread/init/multi_thread_content_initializer"; import SmoothFeature from "../../../transports/smooth"; import type { IFeaturesObject } from "../../types"; @@ -10,9 +12,16 @@ describe("Features list - Smooth", () => { addSmoothFeature(featureObject); expect(featureObject).toEqual({ transports: { smooth: SmoothFeature }, - mainThreadMediaSourceInit: MultiThreadContentInitializer, + monothread: { + init: MultiThreadContentInitializer, + coreInterface: MonoThreadCoreInterface, + workerMain: initializeWorkerMain, + }, }); expect(featureObject.transports.smooth).toBe(SmoothFeature); - expect(featureObject.mainThreadMediaSourceInit).toBe(MultiThreadContentInitializer); + expect(featureObject.monothread).not.toBe(null); + expect(featureObject.monothread?.init).toBe(MultiThreadContentInitializer); + expect(featureObject.monothread?.coreInterface).toBe(MonoThreadCoreInterface); + expect(featureObject.monothread?.workerMain).toBe(initializeWorkerMain); }); }); diff --git a/src/features/list/dash.ts b/src/features/list/dash.ts index d02e6d7cb1..6caecb8f7e 100644 --- a/src/features/list/dash.ts +++ b/src/features/list/dash.ts @@ -14,6 +14,8 @@ * limitations under the License. */ +import initializeWorkerMain from "../../core/main/worker"; +import { MonoThreadCoreInterface } from "../../main_thread/core_interface/monothread"; import MultiThreadContentInitializer from "../../main_thread/init/multi_thread_content_initializer"; import dashJsParser from "../../parsers/manifest/dash/native-parser"; import dash from "../../transports/dash"; @@ -28,7 +30,11 @@ function addDASHFeature(features: IFeaturesObject): void { features.transports.dash = dash; } features.dashParsers.native = dashJsParser; - features.mainThreadMediaSourceInit = MultiThreadContentInitializer; + features.monothread = { + init: MultiThreadContentInitializer, + coreInterface: MonoThreadCoreInterface, + workerMain: initializeWorkerMain, + }; } export { addDASHFeature as DASH }; diff --git a/src/features/list/dash_wasm.ts b/src/features/list/dash_wasm.ts index 9bd6821aec..ab781862df 100644 --- a/src/features/list/dash_wasm.ts +++ b/src/features/list/dash_wasm.ts @@ -14,7 +14,9 @@ * limitations under the License. */ +import initializeWorkerMain from "../../core/main/worker"; import type { IFeaturesObject } from "../../features/types"; +import { MonoThreadCoreInterface } from "../../main_thread/core_interface/monothread"; import MultiThreadContentInitializer from "../../main_thread/init/multi_thread_content_initializer"; import type { IDashWasmParserOptions } from "../../parsers/manifest/dash/wasm-parser"; import DashWasmParser from "../../parsers/manifest/dash/wasm-parser"; @@ -27,7 +29,11 @@ const dashWasmFeature = { features.transports.dash = dash; } features.dashParsers.wasm = dashWasmParser; - features.mainThreadMediaSourceInit = MultiThreadContentInitializer; + features.monothread = { + init: MultiThreadContentInitializer, + coreInterface: MonoThreadCoreInterface, + workerMain: initializeWorkerMain, + }; }, initialize(opts: IDashWasmParserOptions): Promise { diff --git a/src/features/list/media_source_main.ts b/src/features/list/media_source_main.ts index 0245c4bf52..051cf84972 100644 --- a/src/features/list/media_source_main.ts +++ b/src/features/list/media_source_main.ts @@ -1,3 +1,5 @@ +import initializeWorkerMain from "../../core/main/worker"; +import { MonoThreadCoreInterface } from "../../main_thread/core_interface/monothread"; import MultiThreadContentInitializer from "../../main_thread/init/multi_thread_content_initializer"; import type { IFeaturesObject } from "../types"; @@ -6,7 +8,11 @@ import type { IFeaturesObject } from "../types"; * @param {Object} features */ function addMediaSourceMainFeature(features: IFeaturesObject): void { - features.mainThreadMediaSourceInit = MultiThreadContentInitializer; + features.monothread = { + init: MultiThreadContentInitializer, + coreInterface: MonoThreadCoreInterface, + workerMain: initializeWorkerMain, + }; } export { addMediaSourceMainFeature as MEDIA_SOURCE_MAIN }; diff --git a/src/features/list/smooth.ts b/src/features/list/smooth.ts index ba5a847d78..017b908b1a 100644 --- a/src/features/list/smooth.ts +++ b/src/features/list/smooth.ts @@ -14,6 +14,8 @@ * limitations under the License. */ +import initializeWorkerMain from "../../core/main/worker"; +import { MonoThreadCoreInterface } from "../../main_thread/core_interface/monothread"; import MultiThreadContentInitializer from "../../main_thread/init/multi_thread_content_initializer"; import smooth from "../../transports/smooth"; import type { IFeaturesObject } from "../types"; @@ -26,7 +28,11 @@ function addSmoothFeature(features: IFeaturesObject): void { if (features.transports.smooth === undefined) { features.transports.smooth = smooth; } - features.mainThreadMediaSourceInit = MultiThreadContentInitializer; + features.monothread = { + init: MultiThreadContentInitializer, + coreInterface: MonoThreadCoreInterface, + workerMain: initializeWorkerMain, + }; } export { addSmoothFeature as SMOOTH }; diff --git a/src/features/types.ts b/src/features/types.ts index 91405a65a6..84f664c648 100644 --- a/src/features/types.ts +++ b/src/features/types.ts @@ -15,7 +15,12 @@ */ import type { IMediaElement } from "../compat/browser_compatibility_types"; +import type IInitializeWorkerMain from "../core/main/worker"; import type { SegmentSink } from "../core/segment_sinks"; +import type { + WorkerCoreInterface, + MonoThreadCoreInterface, +} from "../main_thread/core_interface/types"; import type ContentDecryptor from "../main_thread/decrypt"; import type DirectFileContentInitializer from "../main_thread/init/directfile_content_initializer"; import type MultiThreadContentInitializer from "../main_thread/init/multi_thread_content_initializer"; @@ -116,14 +121,29 @@ export interface IFeaturesObject { * Feature allowing to load contents through MediaSource API through the * main thread. */ - mainThreadMediaSourceInit: typeof MultiThreadContentInitializer | null; + monothread: { + /** Class to load a content through the MediaSource API. */ + init: typeof MultiThreadContentInitializer; + /** The RxPlayer's core logic. */ + workerMain: typeof IInitializeWorkerMain; + /** + * Class allowing to exchange messages with the RxPlayer's `core`, here + * running in `workerMain` + */ + coreInterface: typeof MonoThreadCoreInterface; + } | null; /** * Features allowing to load contents through MediaSource API through * a WebWorker. */ multithread: { - /** Class to load a content through MediaSource API via a WebWorker. */ + /** Class to load a content through the MediaSource API. */ init: typeof MultiThreadContentInitializer; + /** + * Class allowing to exchange messages with the RxPlayer's `core`, here + * running in another thread. + */ + coreInterface: typeof WorkerCoreInterface; } | null; /** * Function for loading and parsing contents through various protocols, by diff --git a/src/main_thread/api/public_api.ts b/src/main_thread/api/public_api.ts index 0ec235c2e3..445fc05a97 100644 --- a/src/main_thread/api/public_api.ts +++ b/src/main_thread/api/public_api.ts @@ -33,7 +33,6 @@ import hasMseInWorker from "../../compat/has_mse_in_worker"; import hasWorkerApi from "../../compat/has_worker_api"; import isDebugModeEnabled from "../../compat/is_debug_mode_enabled"; import config from "../../config"; -import initializeWorkerMain from "../../core/main/worker"; import type { ISegmentSinkMetrics } from "../../core/segment_sinks/segment_sinks_store"; import type { IAdaptationChoice, @@ -41,7 +40,6 @@ import type { IABRThrottlers, IBufferType, } from "../../core/types"; -import { MonoThreadCoreInterface, WorkerCoreInterface } from "../../core_interface"; import type { IDefaultConfig } from "../../default_config"; import type { IErrorCode, IErrorType } from "../../errors"; import { ErrorCodes, ErrorTypes, formatError, MediaError } from "../../errors"; @@ -912,16 +910,16 @@ class Player extends EventEmitter { ) && typeof options.representationFilter !== "function"; if (mode === "main" || (mode === "auto" && !canRunInMultiThread)) { - if (features.mainThreadMediaSourceInit === null) { + if (features.monothread === null) { throw new Error( "Cannot load video, neither in a WebWorker nor with the " + "`MEDIA_SOURCE_MAIN` feature", ); } log.info("API: Initializing MediaSource mode in the main thread"); - const coreInterface = new MonoThreadCoreInterface(); + const coreInterface = new features.monothread.coreInterface(); const coreInterfaceCallbacks = coreInterface.getCallbacks(); - initializeWorkerMain( + features.monothread.workerMain( coreInterfaceCallbacks.setCoreMessageReceiver, coreInterfaceCallbacks.sendCoreMessage, ); @@ -938,7 +936,7 @@ class Player extends EventEmitter { timestamp: getMonotonicTimeStamp(), }, }); - initializer = new features.mainThreadMediaSourceInit({ + initializer = new features.monothread.init({ coreInterface, adaptiveOptions, autoPlay, @@ -971,7 +969,7 @@ class Player extends EventEmitter { useWorker = true; log.info("API: Initializing MediaSource mode in a WebWorker"); initializer = new features.multithread.init({ - coreInterface: new WorkerCoreInterface(this._priv_worker), + coreInterface: new features.multithread.coreInterface(this._priv_worker), adaptiveOptions, autoPlay, bufferOptions, diff --git a/src/main_thread/core_interface/README.md b/src/main_thread/core_interface/README.md new file mode 100644 index 0000000000..42681c3740 --- /dev/null +++ b/src/main_thread/core_interface/README.md @@ -0,0 +1,22 @@ +# CoreInterface + +| Consideration | Status | +| ----------------------- | ---------------------------------------------------- | +| Preferred import style | Only through the `features` object | +| Multithread environment | May run in WebWorker depending on the implementation | + +## Overview + +This directory defines `CoreInterface` implementations, which is the link between the +RxPlayer's "main_thread" logic (which implements logic running always in main thread: the +API, decryption, subtitles rendering, track management...) to the RxPlayer's "core" logic +(which implements logic which may run in a WebWorker: manifest loading, segment loading +and parsing, adaptive...). + +Depending on if playback for the current content currently relies on a single thread or +multiple ones, a different `CoreInterface` implementation will be selected. + +This directory should not be imported directly by the main logic (beside types). Rather, +the RxPlayer's global `features` object should refer to the implementations currently +available instead. This is to ensure that the `CoreInterface` logic is only imported if +the application has explicitly asked for the corresponding feature. diff --git a/src/main_thread/core_interface/base.ts b/src/main_thread/core_interface/base.ts new file mode 100644 index 0000000000..e800f7d3c5 --- /dev/null +++ b/src/main_thread/core_interface/base.ts @@ -0,0 +1,35 @@ +import type { IMainThreadMessage, IWorkerMessage } from "../../multithread_types"; + +export default abstract class CoreInterface { + protected listeners: Array<(evt: IWorkerMessage) => void> = []; + protected listenersError: Array<() => void> = []; + + public abstract sendMessage(msg: IMainThreadMessage): void; + + public addMessageListener(cb: (evt: IWorkerMessage) => void): void { + this.listeners.push(cb); + } + + public removeMessageListener(cb: (evt: IWorkerMessage) => void): void { + const index = this.listeners.indexOf(cb); + if (index >= 0) { + this.listeners.splice(index, 1); + } + } + + public addErrorListener(cb: () => void): void { + this.listenersError.push(cb); + } + + public removeErrorListener(cb: () => void): void { + const index = this.listenersError.indexOf(cb); + if (index >= 0) { + this.listenersError.splice(index, 1); + } + } + + public dispose(): void { + this.listeners.length = 0; + this.listenersError.length = 0; + } +} diff --git a/src/main_thread/core_interface/monothread.ts b/src/main_thread/core_interface/monothread.ts new file mode 100644 index 0000000000..6e2885204e --- /dev/null +++ b/src/main_thread/core_interface/monothread.ts @@ -0,0 +1,40 @@ +import type { IMessageReceiverCallback } from "../../core/types"; +import log from "../../log"; +import type { IMainThreadMessage, IWorkerMessage } from "../../multithread_types"; +import noop from "../../utils/noop"; +import CoreInterface from "./base"; + +export class MonoThreadCoreInterface extends CoreInterface { + private _currentCoreListener: IMessageReceiverCallback; + + constructor() { + super(); + this._currentCoreListener = noop; + } + + public sendMessage(msg: IMainThreadMessage) { + log.debug("---> Sending to Core:", msg.type); + queueMicrotask(() => { + // NOTE: We don't clone for performance reasons + this._currentCoreListener({ data: msg }); + }); + } + + public getCallbacks(): { + setCoreMessageReceiver: (handler: IMessageReceiverCallback) => void; + sendCoreMessage: (msg: IWorkerMessage, transferables?: Transferable[]) => void; + } { + const setCoreMessageReceiver = (handler: IMessageReceiverCallback): void => { + this._currentCoreListener = handler; + }; + const sendCoreMessage = (msg: IWorkerMessage, _transferables?: Transferable[]) => { + queueMicrotask(() => { + log.debug("<--- Receiving from Core:", msg.type); + this.listeners.forEach((listener) => { + listener(msg); + }); + }); + }; + return { setCoreMessageReceiver, sendCoreMessage }; + } +} diff --git a/src/main_thread/core_interface/multithread.ts b/src/main_thread/core_interface/multithread.ts new file mode 100644 index 0000000000..631f6219f5 --- /dev/null +++ b/src/main_thread/core_interface/multithread.ts @@ -0,0 +1,49 @@ +import log from "../../log"; +import type { IMainThreadMessage } from "../../multithread_types"; +import CoreInterface from "./base"; + +/** + * `CoreInterface` implementation for when the core will run in a WebWorker. + */ +export class WorkerCoreInterface extends CoreInterface { + private _worker: Worker; + + /** + * Initialize a `WorkerCoreInterface` for the given `WebWorker` instance. + * + * The `addMessageListener` and `addMessageListener` will then register + * listeners respectively for the `onmessage` and `onmessageerror` events + * from this `WebWorker`. + * The `sendMessage` method will allow to send messages to the `WebWorker`. + * @param {Worker} worker + */ + constructor(worker: Worker) { + super(); + this._worker = worker; + this._worker.onmessageerror = () => { + this.listenersError.forEach((listener) => { + listener(); + }); + }; + this._worker.onmessage = (evt) => { + this.listeners.forEach((listener) => { + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument + listener(evt.data); + }); + }; + } + + /** + * Send given message to the `WebWorker`. + * @param {Object} msg + * @param {Array.} [transferables] + */ + public sendMessage(msg: IMainThreadMessage, transferables?: Transferable[]): void { + log.debug("---> Sending to Worker:", msg.type); + if (transferables === undefined) { + this._worker.postMessage(msg); + } else { + this._worker.postMessage(msg, transferables); + } + } +} diff --git a/src/main_thread/core_interface/types.ts b/src/main_thread/core_interface/types.ts new file mode 100644 index 0000000000..37527a94b5 --- /dev/null +++ b/src/main_thread/core_interface/types.ts @@ -0,0 +1,5 @@ +import type CoreInterface from "./base"; +import type { MonoThreadCoreInterface } from "./monothread"; +import type { WorkerCoreInterface } from "./multithread"; +export type { MonoThreadCoreInterface, WorkerCoreInterface }; +export default CoreInterface; diff --git a/src/main_thread/init/multi_thread_content_initializer.ts b/src/main_thread/init/multi_thread_content_initializer.ts index 27126b51fc..ce35aafea1 100644 --- a/src/main_thread/init/multi_thread_content_initializer.ts +++ b/src/main_thread/init/multi_thread_content_initializer.ts @@ -7,7 +7,6 @@ import type { IAdaptationChoice, IResolutionInfo, } from "../../core/types"; -import type CoreInterface from "../../core_interface"; import { EncryptedMediaError, MediaError, @@ -53,6 +52,7 @@ import SharedReference from "../../utils/reference"; import { RequestError } from "../../utils/request"; import type { CancellationSignal } from "../../utils/task_canceller"; import TaskCanceller, { CancellationError } from "../../utils/task_canceller"; +import type CoreInterface from "../core_interface/types"; import type { IContentProtection } from "../decrypt"; import type IContentDecryptor from "../decrypt"; import { ContentDecryptorState, getKeySystemConfiguration } from "../decrypt";