From 3a38f52132744d68f3cd96314db1377af5529196 Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Tue, 28 Feb 2023 09:53:28 +0800 Subject: [PATCH 1/2] Worked for merge --- src/api/SignalClient.ts | 7 + src/proto/livekit_models.ts | 50 +++--- src/proto/livekit_rtc.ts | 206 +++++++++++++++++----- src/room/RTCEngine.ts | 11 ++ src/room/Room.ts | 46 +++++ src/room/participant/RemoteParticipant.ts | 32 ++++ 6 files changed, 280 insertions(+), 72 deletions(-) diff --git a/src/api/SignalClient.ts b/src/api/SignalClient.ts index 4df1a4fd22..ee91c25ff2 100644 --- a/src/api/SignalClient.ts +++ b/src/api/SignalClient.ts @@ -12,6 +12,7 @@ import { } from '../proto/livekit_models'; import { AddTrackRequest, + AudioTrackMuxUpdate, ConnectionQualityUpdate, JoinResponse, LeaveRequest, @@ -134,6 +135,8 @@ export class SignalClient { onLeave?: (leave: LeaveRequest) => void; + onAudioMuxUpdate?: (update: AudioTrackMuxUpdate) => void; + connectOptions?: ConnectOpts; ws?: WebSocket; @@ -588,6 +591,10 @@ export class SignalClient { } } else if (msg.$case === 'pong') { this.resetPingTimeout(); + } else if (msg.$case === 'audioMuxUpdate') { + if (this.onAudioMuxUpdate) { + this.onAudioMuxUpdate(msg.audioMuxUpdate); + } } else if (msg.$case === 'pongResp') { this.rtt = Date.now() - msg.pongResp.lastPingTimestamp; this.resetPingTimeout(); diff --git a/src/proto/livekit_models.ts b/src/proto/livekit_models.ts index 6fa957c2ef..090364e4b0 100644 --- a/src/proto/livekit_models.ts +++ b/src/proto/livekit_models.ts @@ -288,31 +288,31 @@ export function disconnectReasonToJSON(object: DisconnectReason): string { } export enum ReconnectReason { - REASON_UNKOWN = 0, - REASON_SIGNAL_DISCONNECTED = 1, - REASON_PUBLISHER_FAILED = 2, - REASON_SUBSCRIBER_FAILED = 3, - REASON_SWITCH_CANDIDATE = 4, + RR_UNKOWN = 0, + RR_SIGNAL_DISCONNECTED = 1, + RR_PUBLISHER_FAILED = 2, + RR_SUBSCRIBER_FAILED = 3, + RR_SWITCH_CANDIDATE = 4, UNRECOGNIZED = -1, } export function reconnectReasonFromJSON(object: any): ReconnectReason { switch (object) { case 0: - case "REASON_UNKOWN": - return ReconnectReason.REASON_UNKOWN; + case "RR_UNKOWN": + return ReconnectReason.RR_UNKOWN; case 1: - case "REASON_SIGNAL_DISCONNECTED": - return ReconnectReason.REASON_SIGNAL_DISCONNECTED; + case "RR_SIGNAL_DISCONNECTED": + return ReconnectReason.RR_SIGNAL_DISCONNECTED; case 2: - case "REASON_PUBLISHER_FAILED": - return ReconnectReason.REASON_PUBLISHER_FAILED; + case "RR_PUBLISHER_FAILED": + return ReconnectReason.RR_PUBLISHER_FAILED; case 3: - case "REASON_SUBSCRIBER_FAILED": - return ReconnectReason.REASON_SUBSCRIBER_FAILED; + case "RR_SUBSCRIBER_FAILED": + return ReconnectReason.RR_SUBSCRIBER_FAILED; case 4: - case "REASON_SWITCH_CANDIDATE": - return ReconnectReason.REASON_SWITCH_CANDIDATE; + case "RR_SWITCH_CANDIDATE": + return ReconnectReason.RR_SWITCH_CANDIDATE; case -1: case "UNRECOGNIZED": default: @@ -322,16 +322,16 @@ export function reconnectReasonFromJSON(object: any): ReconnectReason { export function reconnectReasonToJSON(object: ReconnectReason): string { switch (object) { - case ReconnectReason.REASON_UNKOWN: - return "REASON_UNKOWN"; - case ReconnectReason.REASON_SIGNAL_DISCONNECTED: - return "REASON_SIGNAL_DISCONNECTED"; - case ReconnectReason.REASON_PUBLISHER_FAILED: - return "REASON_PUBLISHER_FAILED"; - case ReconnectReason.REASON_SUBSCRIBER_FAILED: - return "REASON_SUBSCRIBER_FAILED"; - case ReconnectReason.REASON_SWITCH_CANDIDATE: - return "REASON_SWITCH_CANDIDATE"; + case ReconnectReason.RR_UNKOWN: + return "RR_UNKOWN"; + case ReconnectReason.RR_SIGNAL_DISCONNECTED: + return "RR_SIGNAL_DISCONNECTED"; + case ReconnectReason.RR_PUBLISHER_FAILED: + return "RR_PUBLISHER_FAILED"; + case ReconnectReason.RR_SUBSCRIBER_FAILED: + return "RR_SUBSCRIBER_FAILED"; + case ReconnectReason.RR_SWITCH_CANDIDATE: + return "RR_SWITCH_CANDIDATE"; case ReconnectReason.UNRECOGNIZED: default: return "UNRECOGNIZED"; diff --git a/src/proto/livekit_rtc.ts b/src/proto/livekit_rtc.ts index 4683a50625..a10584579e 100644 --- a/src/proto/livekit_rtc.ts +++ b/src/proto/livekit_rtc.ts @@ -176,7 +176,8 @@ export interface SignalResponse { | { $case: "trackUnpublished"; trackUnpublished: TrackUnpublishedResponse } | { $case: "pong"; pong: number } | { $case: "reconnect"; reconnect: ReconnectResponse } - | { $case: "pongResp"; pongResp: Pong }; + | { $case: "pongResp"; pongResp: Pong } + | { $case: "audioMuxUpdate"; audioMuxUpdate: AudioTrackMuxUpdate }; } export interface SimulcastCodec { @@ -243,9 +244,6 @@ export interface JoinResponse { export interface ReconnectResponse { iceServers: ICEServer[]; clientConfiguration?: ClientConfiguration; - room?: Room; - participant?: ParticipantInfo; - otherParticipants: ParticipantInfo[]; } export interface TrackPublishedResponse { @@ -409,6 +407,16 @@ export interface SimulateScenario { | { $case: "switchCandidateProtocol"; switchCandidateProtocol: CandidateProtocol }; } +export interface AudioTrackMuxUpdate { + audioTrackMuxes: AudioTrackMuxInfo[]; +} + +export interface AudioTrackMuxInfo { + sdpTrackId: string; + trackSid: string; + participantSid: string; +} + export interface Ping { timestamp: number; /** rtt in milliseconds calculated by client */ @@ -789,6 +797,9 @@ export const SignalResponse = { if (message.message?.$case === "pongResp") { Pong.encode(message.message.pongResp, writer.uint32(162).fork()).ldelim(); } + if (message.message?.$case === "audioMuxUpdate") { + AudioTrackMuxUpdate.encode(message.message.audioMuxUpdate, writer.uint32(170).fork()).ldelim(); + } return writer; }, @@ -877,6 +888,12 @@ export const SignalResponse = { case 20: message.message = { $case: "pongResp", pongResp: Pong.decode(reader, reader.uint32()) }; break; + case 21: + message.message = { + $case: "audioMuxUpdate", + audioMuxUpdate: AudioTrackMuxUpdate.decode(reader, reader.uint32()), + }; + break; default: reader.skipType(tag & 7); break; @@ -931,6 +948,8 @@ export const SignalResponse = { ? { $case: "reconnect", reconnect: ReconnectResponse.fromJSON(object.reconnect) } : isSet(object.pongResp) ? { $case: "pongResp", pongResp: Pong.fromJSON(object.pongResp) } + : isSet(object.audioMuxUpdate) + ? { $case: "audioMuxUpdate", audioMuxUpdate: AudioTrackMuxUpdate.fromJSON(object.audioMuxUpdate) } : undefined, }; }, @@ -982,6 +1001,9 @@ export const SignalResponse = { (obj.reconnect = message.message?.reconnect ? ReconnectResponse.toJSON(message.message?.reconnect) : undefined); message.message?.$case === "pongResp" && (obj.pongResp = message.message?.pongResp ? Pong.toJSON(message.message?.pongResp) : undefined); + message.message?.$case === "audioMuxUpdate" && (obj.audioMuxUpdate = message.message?.audioMuxUpdate + ? AudioTrackMuxUpdate.toJSON(message.message?.audioMuxUpdate) + : undefined); return obj; }, @@ -1113,6 +1135,16 @@ export const SignalResponse = { ) { message.message = { $case: "pongResp", pongResp: Pong.fromPartial(object.message.pongResp) }; } + if ( + object.message?.$case === "audioMuxUpdate" && + object.message?.audioMuxUpdate !== undefined && + object.message?.audioMuxUpdate !== null + ) { + message.message = { + $case: "audioMuxUpdate", + audioMuxUpdate: AudioTrackMuxUpdate.fromPartial(object.message.audioMuxUpdate), + }; + } return message; }, }; @@ -1675,13 +1707,7 @@ export const JoinResponse = { }; function createBaseReconnectResponse(): ReconnectResponse { - return { - iceServers: [], - clientConfiguration: undefined, - room: undefined, - participant: undefined, - otherParticipants: [], - }; + return { iceServers: [], clientConfiguration: undefined }; } export const ReconnectResponse = { @@ -1692,15 +1718,6 @@ export const ReconnectResponse = { if (message.clientConfiguration !== undefined) { ClientConfiguration.encode(message.clientConfiguration, writer.uint32(18).fork()).ldelim(); } - if (message.room !== undefined) { - Room.encode(message.room, writer.uint32(26).fork()).ldelim(); - } - if (message.participant !== undefined) { - ParticipantInfo.encode(message.participant, writer.uint32(34).fork()).ldelim(); - } - for (const v of message.otherParticipants) { - ParticipantInfo.encode(v!, writer.uint32(42).fork()).ldelim(); - } return writer; }, @@ -1717,15 +1734,6 @@ export const ReconnectResponse = { case 2: message.clientConfiguration = ClientConfiguration.decode(reader, reader.uint32()); break; - case 3: - message.room = Room.decode(reader, reader.uint32()); - break; - case 4: - message.participant = ParticipantInfo.decode(reader, reader.uint32()); - break; - case 5: - message.otherParticipants.push(ParticipantInfo.decode(reader, reader.uint32())); - break; default: reader.skipType(tag & 7); break; @@ -1740,11 +1748,6 @@ export const ReconnectResponse = { clientConfiguration: isSet(object.clientConfiguration) ? ClientConfiguration.fromJSON(object.clientConfiguration) : undefined, - room: isSet(object.room) ? Room.fromJSON(object.room) : undefined, - participant: isSet(object.participant) ? ParticipantInfo.fromJSON(object.participant) : undefined, - otherParticipants: Array.isArray(object?.otherParticipants) - ? object.otherParticipants.map((e: any) => ParticipantInfo.fromJSON(e)) - : [], }; }, @@ -1758,14 +1761,6 @@ export const ReconnectResponse = { message.clientConfiguration !== undefined && (obj.clientConfiguration = message.clientConfiguration ? ClientConfiguration.toJSON(message.clientConfiguration) : undefined); - message.room !== undefined && (obj.room = message.room ? Room.toJSON(message.room) : undefined); - message.participant !== undefined && - (obj.participant = message.participant ? ParticipantInfo.toJSON(message.participant) : undefined); - if (message.otherParticipants) { - obj.otherParticipants = message.otherParticipants.map((e) => e ? ParticipantInfo.toJSON(e) : undefined); - } else { - obj.otherParticipants = []; - } return obj; }, @@ -1775,11 +1770,6 @@ export const ReconnectResponse = { message.clientConfiguration = (object.clientConfiguration !== undefined && object.clientConfiguration !== null) ? ClientConfiguration.fromPartial(object.clientConfiguration) : undefined; - message.room = (object.room !== undefined && object.room !== null) ? Room.fromPartial(object.room) : undefined; - message.participant = (object.participant !== undefined && object.participant !== null) - ? ParticipantInfo.fromPartial(object.participant) - : undefined; - message.otherParticipants = object.otherParticipants?.map((e) => ParticipantInfo.fromPartial(e)) || []; return message; }, }; @@ -3479,6 +3469,128 @@ export const SimulateScenario = { }, }; +function createBaseAudioTrackMuxUpdate(): AudioTrackMuxUpdate { + return { audioTrackMuxes: [] }; +} + +export const AudioTrackMuxUpdate = { + encode(message: AudioTrackMuxUpdate, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + for (const v of message.audioTrackMuxes) { + AudioTrackMuxInfo.encode(v!, writer.uint32(10).fork()).ldelim(); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): AudioTrackMuxUpdate { + const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseAudioTrackMuxUpdate(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + message.audioTrackMuxes.push(AudioTrackMuxInfo.decode(reader, reader.uint32())); + break; + default: + reader.skipType(tag & 7); + break; + } + } + return message; + }, + + fromJSON(object: any): AudioTrackMuxUpdate { + return { + audioTrackMuxes: Array.isArray(object?.audioTrackMuxes) + ? object.audioTrackMuxes.map((e: any) => AudioTrackMuxInfo.fromJSON(e)) + : [], + }; + }, + + toJSON(message: AudioTrackMuxUpdate): unknown { + const obj: any = {}; + if (message.audioTrackMuxes) { + obj.audioTrackMuxes = message.audioTrackMuxes.map((e) => e ? AudioTrackMuxInfo.toJSON(e) : undefined); + } else { + obj.audioTrackMuxes = []; + } + return obj; + }, + + fromPartial, I>>(object: I): AudioTrackMuxUpdate { + const message = createBaseAudioTrackMuxUpdate(); + message.audioTrackMuxes = object.audioTrackMuxes?.map((e) => AudioTrackMuxInfo.fromPartial(e)) || []; + return message; + }, +}; + +function createBaseAudioTrackMuxInfo(): AudioTrackMuxInfo { + return { sdpTrackId: "", trackSid: "", participantSid: "" }; +} + +export const AudioTrackMuxInfo = { + encode(message: AudioTrackMuxInfo, writer: _m0.Writer = _m0.Writer.create()): _m0.Writer { + if (message.sdpTrackId !== "") { + writer.uint32(10).string(message.sdpTrackId); + } + if (message.trackSid !== "") { + writer.uint32(18).string(message.trackSid); + } + if (message.participantSid !== "") { + writer.uint32(26).string(message.participantSid); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): AudioTrackMuxInfo { + const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = createBaseAudioTrackMuxInfo(); + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + message.sdpTrackId = reader.string(); + break; + case 2: + message.trackSid = reader.string(); + break; + case 3: + message.participantSid = reader.string(); + break; + default: + reader.skipType(tag & 7); + break; + } + } + return message; + }, + + fromJSON(object: any): AudioTrackMuxInfo { + return { + sdpTrackId: isSet(object.sdpTrackId) ? String(object.sdpTrackId) : "", + trackSid: isSet(object.trackSid) ? String(object.trackSid) : "", + participantSid: isSet(object.participantSid) ? String(object.participantSid) : "", + }; + }, + + toJSON(message: AudioTrackMuxInfo): unknown { + const obj: any = {}; + message.sdpTrackId !== undefined && (obj.sdpTrackId = message.sdpTrackId); + message.trackSid !== undefined && (obj.trackSid = message.trackSid); + message.participantSid !== undefined && (obj.participantSid = message.participantSid); + return obj; + }, + + fromPartial, I>>(object: I): AudioTrackMuxInfo { + const message = createBaseAudioTrackMuxInfo(); + message.sdpTrackId = object.sdpTrackId ?? ""; + message.trackSid = object.trackSid ?? ""; + message.participantSid = object.participantSid ?? ""; + return message; + }, +}; + function createBasePing(): Ping { return { timestamp: 0, rtt: 0 }; } diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index a2d6f8b0fa..11bf716cad 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -61,6 +61,12 @@ enum PCState { Closed, } +interface AudioMuxTrack { + track: MediaStreamTrack; + stream: MediaStream; + receiver: RTCRtpReceiver; +} + /** @internal */ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmitter) { publisher?: PCTransport; @@ -73,6 +79,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit peerConnectionTimeout: number = roomConnectOptionDefaults.peerConnectionTimeout; + audioMuxTracks: Map = new Map(); + get isClosed() { return this._isClosed; } @@ -357,6 +365,9 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit }; this.subscriber.pc.ontrack = (ev: RTCTrackEvent) => { + if (ev.track.kind === 'audio' && ev.track.id.includes('TR_AX')) { + this.audioMuxTracks.set(ev.track.id, {track: ev.track, stream: ev.streams[0], receiver: ev.receiver}) + } this.emit(EngineEvent.MediaTrackAdded, ev.track, ev.streams[0], ev.receiver); }; diff --git a/src/room/Room.ts b/src/room/Room.ts index b3b3080b38..e724ec61df 100644 --- a/src/room/Room.ts +++ b/src/room/Room.ts @@ -23,6 +23,7 @@ import { UserPacket, } from '../proto/livekit_models'; import { + AudioTrackMuxUpdate, ConnectionQualityUpdate, JoinResponse, SimulateScenario, @@ -64,6 +65,7 @@ import { supportsSetSinkId, unpackStreamId, } from './utils'; +import RemoteAudioTrack from './track/RemoteAudioTrack'; export enum ConnectionState { Disconnected = 'disconnected', @@ -133,6 +135,12 @@ class Room extends (EventEmitter as new () => TypedEmitter) private disconnectLock: Mutex; + /** mapping of sdp track id -> RemoteTrackPublication */ + private audioTrackMux: Map = new Map(); + + /** mapping of sdp track id -> RemoteAudioTrack */ + private roomAudioTracks: Map = new Map(); + /** * Creates a new Room, the primary construct for a LiveKit session. * @param options @@ -177,6 +185,7 @@ class Room extends (EventEmitter as new () => TypedEmitter) this.engine.client.onStreamStateUpdate = this.handleStreamStateUpdate; this.engine.client.onSubscriptionPermissionUpdate = this.handleSubscriptionPermissionUpdate; this.engine.client.onConnectionQuality = this.handleConnectionQualityUpdate; + this.engine.client.onAudioMuxUpdate = this.handleAudioMuxUpdate; this.engine .on( @@ -650,6 +659,8 @@ class Room extends (EventEmitter as new () => TypedEmitter) this.options.audioOutput.deviceId = prevDeviceId; throw e; } + + // TODO: set audio output for room audio tracks } } @@ -1042,6 +1053,41 @@ class Room extends (EventEmitter as new () => TypedEmitter) }); }; + private handleAudioMuxUpdate = (update: AudioTrackMuxUpdate) => { + // if track is not playing, detach from audio element + this.audioTrackMux.forEach((pub: RemoteTrackPublication) => { + if (!update.audioTrackMuxes.some((newPub) => newPub.trackSid === pub.trackSid)) { + pub.setTrack(undefined); + } + }) + + this.audioTrackMux.clear(); + update.audioTrackMuxes.forEach(info => { + const muxTrack = this.engine.audioMuxTracks.get(info.sdpTrackId); + if (!muxTrack) { + log.error(`can't find mux track for sdp track id ${info.sdpTrackId}`); + return; + } + + let remoteTrack = this.roomAudioTracks.get(info.sdpTrackId); + if (!remoteTrack) { + remoteTrack = new RemoteAudioTrack(muxTrack.track, info.trackSid, muxTrack.receiver, this.audioContext, this.options.audioOutput); + this.roomAudioTracks.set(info.sdpTrackId, remoteTrack); + } + + const participant = this.participants.get(info.participantSid); + if (!participant) { + log.error(`can't find participant for ${info.sdpTrackId}`); + return; + } + + const p = participant.addMuxAudioTrack(remoteTrack, muxTrack.track, info.trackSid, muxTrack.stream); + if (p) { + this.audioTrackMux.set(info.sdpTrackId, p) + } + }) + } + private async acquireAudioContext() { if ( typeof this.options.expWebAudioMix !== 'boolean' && diff --git a/src/room/participant/RemoteParticipant.ts b/src/room/participant/RemoteParticipant.ts index 38d37e9d27..1372c5d85a 100644 --- a/src/room/participant/RemoteParticipant.ts +++ b/src/room/participant/RemoteParticipant.ts @@ -119,6 +119,38 @@ export default class RemoteParticipant extends Participant { return this.volume; } + /** @internal */ + addMuxAudioTrack( + track: RemoteAudioTrack, + mediaTrack: MediaStreamTrack, + sid: Track.SID, + mediaStream: MediaStream, + ) { + let publication = this.getTrackPublication(sid); + if (!publication) { + log.error('could not find published track', { participant: this.sid, trackSid: sid }); + this.emit(ParticipantEvent.TrackSubscriptionFailed, sid); + return; + } + track.source = publication.source; + // keep publication's muted status + track.isMuted = publication.isMuted; + track.setMediaStream(mediaStream); + track.start(); + + publication.setTrack(track); + // set participant volume on new microphone tracks + if ( + this.volume !== undefined && + track instanceof RemoteAudioTrack && + track.source === Track.Source.Microphone + ) { + track.setVolume(this.volume); + } + + return publication; + } + /** @internal */ addSubscribedMediaTrack( mediaTrack: MediaStreamTrack, From 25efb6e9253d0f0c90a69627fcbe6c7654a97245 Mon Sep 17 00:00:00 2001 From: cnderrauber Date: Fri, 10 Mar 2023 11:36:46 +0800 Subject: [PATCH 2/2] work with webaudio --- src/room/RTCEngine.ts | 19 ++--- src/room/Room.ts | 75 +++++++++++-------- src/room/participant/RemoteParticipant.ts | 3 - src/room/track/MuxedRemoteAudioTrack.ts | 87 +++++++++++++++++++++++ 4 files changed, 144 insertions(+), 40 deletions(-) create mode 100644 src/room/track/MuxedRemoteAudioTrack.ts diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 11bf716cad..17599359c8 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -344,8 +344,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit 'primary peerconnection', false, subscriberPrimary - ? ReconnectReason.REASON_SUBSCRIBER_FAILED - : ReconnectReason.REASON_PUBLISHER_FAILED, + ? ReconnectReason.RR_SUBSCRIBER_FAILED + : ReconnectReason.RR_PUBLISHER_FAILED, ); } } @@ -358,13 +358,14 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit 'secondary peerconnection', false, subscriberPrimary - ? ReconnectReason.REASON_PUBLISHER_FAILED - : ReconnectReason.REASON_SUBSCRIBER_FAILED, + ? ReconnectReason.RR_PUBLISHER_FAILED + : ReconnectReason.RR_SUBSCRIBER_FAILED, ); } }; this.subscriber.pc.ontrack = (ev: RTCTrackEvent) => { + // todo-mux: firefox can't get TR_AX... track id if (ev.track.kind === 'audio' && ev.track.id.includes('TR_AX')) { this.audioMuxTracks.set(ev.track.id, {track: ev.track, stream: ev.streams[0], receiver: ev.receiver}) } @@ -430,7 +431,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit }; this.client.onClose = () => { - this.handleDisconnect('signal', false, ReconnectReason.REASON_SIGNAL_DISCONNECTED); + this.handleDisconnect('signal', false, ReconnectReason.RR_SIGNAL_DISCONNECTED); }; this.client.onLeave = (leave?: LeaveRequest) => { @@ -795,7 +796,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } if (recoverable) { - this.handleDisconnect('reconnect', requireSignalEvents, ReconnectReason.REASON_UNKOWN); + this.handleDisconnect('reconnect', requireSignalEvents, ReconnectReason.RR_UNKOWN); } else { log.info( `could not recover connection after ${this.reconnectAttempts} attempts, ${ @@ -1033,7 +1034,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit const negotiationTimeout = setTimeout(() => { reject('negotiation timed out'); - this.handleDisconnect('negotiation', false, ReconnectReason.REASON_SIGNAL_DISCONNECTED); + this.handleDisconnect('negotiation', false, ReconnectReason.RR_SIGNAL_DISCONNECTED); }, this.peerConnectionTimeout); const cleanup = () => { @@ -1054,7 +1055,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit if (e instanceof NegotiationError) { this.fullReconnectOnNext = true; } - this.handleDisconnect('negotiation', false, ReconnectReason.REASON_UNKOWN); + this.handleDisconnect('negotiation', false, ReconnectReason.RR_UNKOWN); }); }); } @@ -1092,7 +1093,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit // in case the engine is currently reconnecting, attempt a reconnect immediately after the browser state has changed to 'onLine' if (this.client.isReconnecting) { this.clearReconnectTimeout(); - this.attemptReconnect(true, ReconnectReason.REASON_SIGNAL_DISCONNECTED); + this.attemptReconnect(true, ReconnectReason.RR_SIGNAL_DISCONNECTED); } }; diff --git a/src/room/Room.ts b/src/room/Room.ts index e724ec61df..b3bc6037ac 100644 --- a/src/room/Room.ts +++ b/src/room/Room.ts @@ -51,6 +51,7 @@ import LocalTrackPublication from './track/LocalTrackPublication'; import LocalVideoTrack from './track/LocalVideoTrack'; import type RemoteTrack from './track/RemoteTrack'; import RemoteTrackPublication from './track/RemoteTrackPublication'; +import MuxedRemoteAudioTrack from './track/MuxedRemoteAudioTrack'; import { Track } from './track/Track'; import type { TrackPublication } from './track/TrackPublication'; import type { AdaptiveStreamSettings } from './track/types'; @@ -65,7 +66,6 @@ import { supportsSetSinkId, unpackStreamId, } from './utils'; -import RemoteAudioTrack from './track/RemoteAudioTrack'; export enum ConnectionState { Disconnected = 'disconnected', @@ -135,11 +135,10 @@ class Room extends (EventEmitter as new () => TypedEmitter) private disconnectLock: Mutex; - /** mapping of sdp track id -> RemoteTrackPublication */ - private audioTrackMux: Map = new Map(); + private lastMuxUpdate?: AudioTrackMuxUpdate; - /** mapping of sdp track id -> RemoteAudioTrack */ - private roomAudioTracks: Map = new Map(); + /** mapping of track id -> MuxedRemoteAudioTrack */ + private muxedTracks: Map = new Map(); /** * Creates a new Room, the primary construct for a LiveKit session. @@ -1054,38 +1053,58 @@ class Room extends (EventEmitter as new () => TypedEmitter) }; private handleAudioMuxUpdate = (update: AudioTrackMuxUpdate) => { - // if track is not playing, detach from audio element - this.audioTrackMux.forEach((pub: RemoteTrackPublication) => { - if (!update.audioTrackMuxes.some((newPub) => newPub.trackSid === pub.trackSid)) { - pub.setTrack(undefined); - } - }) - this.audioTrackMux.clear(); - update.audioTrackMuxes.forEach(info => { - const muxTrack = this.engine.audioMuxTracks.get(info.sdpTrackId); - if (!muxTrack) { - log.error(`can't find mux track for sdp track id ${info.sdpTrackId}`); - return; - } + const newlySubTracks = update.audioTrackMuxes.filter(val => !(this.lastMuxUpdate?.audioTrackMuxes.some(lval => lval.trackSid === val.trackSid))); + const newlyUnsubTracks = this.lastMuxUpdate?.audioTrackMuxes.filter(lval => !(update.audioTrackMuxes.some(val => val.trackSid === lval.trackSid))); + this.lastMuxUpdate = update; - let remoteTrack = this.roomAudioTracks.get(info.sdpTrackId); - if (!remoteTrack) { - remoteTrack = new RemoteAudioTrack(muxTrack.track, info.trackSid, muxTrack.receiver, this.audioContext, this.options.audioOutput); - this.roomAudioTracks.set(info.sdpTrackId, remoteTrack); + newlySubTracks.forEach(track => { + const muxTrack = this.engine.audioMuxTracks.get(track.sdpTrackId); + if (track.sdpTrackId !== '' && !muxTrack) { + log.error(`can't find mux track for sdp track id ${track.sdpTrackId}`); } - const participant = this.participants.get(info.participantSid); + const remoteTrack = new MuxedRemoteAudioTrack(track.trackSid, muxTrack?.track,muxTrack?.stream, muxTrack?.receiver, this.audioContext, this.options.audioOutput); + + const participant = this.participants.get(track.participantSid); if (!participant) { - log.error(`can't find participant for ${info.sdpTrackId}`); + log.error(`can't find participant for ${track.participantSid}`); return; } - const p = participant.addMuxAudioTrack(remoteTrack, muxTrack.track, info.trackSid, muxTrack.stream); - if (p) { - this.audioTrackMux.set(info.sdpTrackId, p) + participant.addMuxAudioTrack(remoteTrack, track.trackSid); + this.muxedTracks.set(track.trackSid, remoteTrack); + }); + + newlyUnsubTracks?.forEach(track => { + log.info(`unsub track ${track.trackSid}`); + const muxedTrack = this.muxedTracks.get(track.trackSid); + if (!muxedTrack) { + log.error(`can't find muxed track for participant ${track.participantSid}, track ${track.trackSid}`); + return + } + muxedTrack.close(); + this.muxedTracks.delete(track.trackSid); + }); + + update.audioTrackMuxes.forEach(track => { + const muxedTrack = this.muxedTracks.get(track.trackSid); + if(!muxedTrack) { + log.error(`can't find muxed track for participant ${track.participantSid}, track ${track.trackSid}`); + return } - }) + if (track.sdpTrackId === '') { + muxedTrack.unbind(); + } else { + const muxTrack = this.engine.audioMuxTracks.get(track.sdpTrackId) + if (!muxTrack) { + log.error(`can't find mux track for sdp track id ${track.sdpTrackId}`); + return + } + log.info(`track ${track.trackSid} muxed to ${track.sdpTrackId}`); + muxedTrack.bind(muxTrack.track, muxTrack.stream,muxTrack.receiver); + } + }); } private async acquireAudioContext() { diff --git a/src/room/participant/RemoteParticipant.ts b/src/room/participant/RemoteParticipant.ts index 1372c5d85a..f9773e82c2 100644 --- a/src/room/participant/RemoteParticipant.ts +++ b/src/room/participant/RemoteParticipant.ts @@ -122,9 +122,7 @@ export default class RemoteParticipant extends Participant { /** @internal */ addMuxAudioTrack( track: RemoteAudioTrack, - mediaTrack: MediaStreamTrack, sid: Track.SID, - mediaStream: MediaStream, ) { let publication = this.getTrackPublication(sid); if (!publication) { @@ -135,7 +133,6 @@ export default class RemoteParticipant extends Participant { track.source = publication.source; // keep publication's muted status track.isMuted = publication.isMuted; - track.setMediaStream(mediaStream); track.start(); publication.setTrack(track); diff --git a/src/room/track/MuxedRemoteAudioTrack.ts b/src/room/track/MuxedRemoteAudioTrack.ts new file mode 100644 index 0000000000..840bd5d8ca --- /dev/null +++ b/src/room/track/MuxedRemoteAudioTrack.ts @@ -0,0 +1,87 @@ +import log from '../../logger'; +import RemoteAudioTrack from "./RemoteAudioTrack"; +import type { AudioOutputOptions } from './options'; + +export default class MuxedRemoteAudioTrack extends RemoteAudioTrack { + private audioCtx: AudioContext; + + private streamDst: MediaStreamAudioDestinationNode; + + private streamDstTrack: MediaStreamTrack; + + private muxingTrack?: MediaStreamTrack; + + private muxingSourceNode?: AudioNode; + + private audioForWebRTCStream?: HTMLAudioElement; + + constructor( + sid: string, + mediaTrack?: MediaStreamTrack, + mediaStream?: MediaStream, + receiver?: RTCRtpReceiver, + audioContext?: AudioContext, + audioOutput?: AudioOutputOptions, + ) { + if (!audioContext) { + audioContext = new AudioContext() + } + const dst = audioContext.createMediaStreamDestination(); + // const dst = audioContext.destination; + const [streamDstTrack] = dst.stream.getAudioTracks(); + if (!streamDstTrack) { + throw Error('Could not get media stream audio track'); + } + super(streamDstTrack, sid, receiver, audioContext, audioOutput); + this.audioCtx = audioContext; + this.streamDst = dst; + this.streamDstTrack = streamDstTrack; + this.audioForWebRTCStream = new Audio(); + // this.streamDst.connect(audioContext.destination); + + if (mediaTrack && mediaStream) { + this.bind(mediaTrack, mediaStream, receiver); + } + this.setMediaStream(dst.stream); + } + + bind(track: MediaStreamTrack, stream: MediaStream, receiver?: RTCRtpReceiver) { + if (track === this.muxingTrack) { + return; + } + if (this.muxingSourceNode) { + this.muxingSourceNode.disconnect(); + } + log.info(`bind ${this.sid}`); + + // for chrome known bug: webrtc stream must attach to an element to play. + // https://bugs.chromium.org/p/chromium/issues/detail?id=933677&q=webrtc%20silent&can=2 + if (this.audioForWebRTCStream) { + this.audioForWebRTCStream.srcObject = stream; + } + + const srcNode = this.audioCtx.createMediaStreamSource(stream); + srcNode.connect(this.streamDst); + this.muxingSourceNode = srcNode; + this.muxingTrack = track; + this.receiver = receiver; + // this.streamDstTrack.enabled = true; + } + + unbind() { + if (this.muxingSourceNode) { + log.info(`unbind ${this.sid}`); + this.muxingSourceNode?.disconnect(); + this.muxingSourceNode = undefined; + this.muxingTrack = undefined; + this.receiver = undefined; + } + // this.streamDstTrack.enabled = false; + } + + close() { + this.unbind(); + // fire onremovetrack for Track Ended event + this.streamDst.stream.removeTrack(this.streamDstTrack); + } +} \ No newline at end of file