diff --git a/lib/src/voip/README.md b/lib/src/voip/README.md index 74912690..fd2eb358 100644 --- a/lib/src/voip/README.md +++ b/lib/src/voip/README.md @@ -119,14 +119,25 @@ class MyVoipApp implements WebRTCDelegate { VideoRenderer createRenderer() => RTCVideoRenderer(); @override - void playRingtone(){ + Future playRingtone() async { // play ringtone } - void stopRingtone() { + Future stopRingtone() async { // stop ringtone } - void handleNewCall(CallSession session) { + Future registerListeners(CallSession session) async { + // register all listeners here + session.onCallStreamsChanged.stream.listen((CallStateChange event) async {}); + session.onCallReplaced.stream.listen((CallStateChange event) async {}); + session.onCallHangupNotifierForGroupCalls.stream.listen((CallStateChange event) async {}); + session.onCallStateChanged.stream.listen((CallStateChange event) async {}); + session.onCallEventChanged.stream.listen((CallStateChange event) async {}); + session.onStreamAdd.stream.listen((CallStateChange event) async {}); + session.onStreamRemoved.stream.listen((CallStateChange event) async {}); + } + + Future handleNewCall(CallSession session) async { // handle new call incoming or outgoing switch(session.direction) { case CallDirection.kIncoming: @@ -138,7 +149,7 @@ class MyVoipApp implements WebRTCDelegate { } } - void handleCallEnded(CallSession session) { + Future handleCallEnded(CallSession session) async { // handle call ended by local or remote } } @@ -170,7 +181,7 @@ newCall.onCallStateChanged.stream.listen((state) { /// Then you can pop up the incoming call window at MyVoipApp.handleNewCall. class MyVoipApp implements WebRTCDelegate { ... - void handleNewCall(CallSession session) { + Future handleNewCall(CallSession session) async { switch(session.direction) { case CallDirection.kOutgoing: // show outgoing call window @@ -185,13 +196,13 @@ newCall.hangup(); ### 4.Answer a incoming call -When a new incoming call comes in, handleNewCall will be called, and the answering interface can pop up at this time, and use `onCallStateChanged` to listen to the call state. +When a new incoming call comes in, registerListeners will be called right before handleNewCall is called, and the answering interface can pop up at this time, and use `onCallStateChanged` to listen to the call state. The incoming call window need display `answer` and `reject` buttons, by calling `newCall.answer();` or `newCall.reject();` to decide whether to connect the call. ```dart ... - void handleNewCall(CallSession newCall) { + Future registerListeners(CallSession newCall) async { switch(newCall.direction) { case CallDirection.kIncoming: /// show incoming call window diff --git a/lib/src/voip/backend/mesh_backend.dart b/lib/src/voip/backend/mesh_backend.dart index fd45a52d..1ea640e6 100644 --- a/lib/src/voip/backend/mesh_backend.dart +++ b/lib/src/voip/backend/mesh_backend.dart @@ -20,7 +20,11 @@ class MeshBackend extends CallBackend { /// participant:volume final Map _audioLevelsMap = {}; - StreamSubscription? _callSubscription; + /// The stream is used to prepare for incoming peer calls like registering listeners + StreamSubscription? _callSetupSubscription; + + /// The stream is used to signal the start of an incoming peer call + StreamSubscription? _callStartSubscription; Timer? _activeSpeakerLoopTimeout; @@ -109,14 +113,32 @@ class MeshBackend extends CallBackend { ); } + /// Register listeners for a peer call to use for the group calls, that is + /// needed before even call is added to `_callSessions`. + /// We do this here for onStreamAdd and onStreamRemoved to make sure we don't + /// miss any events that happen before the call is completely started. + void _registerListenersBeforeCallAdd(CallSession call) { + call.onStreamAdd.stream.listen((stream) { + if (!stream.isLocal()) { + onStreamAdd.add(stream); + } + }); + + call.onStreamRemoved.stream.listen((stream) { + if (!stream.isLocal()) { + onStreamRemoved.add(stream); + } + }); + } + Future _addCall(GroupCallSession groupCall, CallSession call) async { _callSessions.add(call); - await _initCall(groupCall, call); + _initCall(groupCall, call); groupCall.onGroupCallEvent.add(GroupCallStateChange.callsChanged); } /// init a peer call from group calls. - Future _initCall(GroupCallSession groupCall, CallSession call) async { + void _initCall(GroupCallSession groupCall, CallSession call) { if (call.remoteUserId == null) { throw MatrixSDKVoipException( 'Cannot init call without proper invitee user and device Id', @@ -141,18 +163,6 @@ class MeshBackend extends CallBackend { call.onCallHangupNotifierForGroupCalls.stream.listen((event) async { await _onCallHangup(groupCall, call); }); - - call.onStreamAdd.stream.listen((stream) { - if (!stream.isLocal()) { - onStreamAdd.add(stream); - } - }); - - call.onStreamRemoved.stream.listen((stream) { - if (!stream.isLocal()) { - onStreamRemoved.add(stream); - } - }); } Future _replaceCall( @@ -171,7 +181,8 @@ class MeshBackend extends CallBackend { _callSessions.add(replacementCall); await _disposeCall(groupCall, existingCall, CallErrorCode.replaced); - await _initCall(groupCall, replacementCall); + _registerListenersBeforeCallAdd(replacementCall); + _initCall(groupCall, replacementCall); groupCall.onGroupCallEvent.add(GroupCallStateChange.callsChanged); } @@ -657,7 +668,49 @@ class MeshBackend extends CallBackend { return; } - Future _onIncomingCall( + void _onIncomingCallInMeshSetup( + GroupCallSession groupCall, + CallSession newCall, + ) { + // The incoming calls may be for another room, which we will ignore. + if (newCall.room.id != groupCall.room.id) return; + + if (newCall.state != CallState.kRinging) { + Logs().v( + '[_onIncomingCallInMeshSetup] Incoming call no longer in ringing state. Ignoring.', + ); + return; + } + + if (newCall.groupCallId == null || + newCall.groupCallId != groupCall.groupCallId) { + Logs().v( + '[_onIncomingCallInMeshSetup] Incoming call with groupCallId ${newCall.groupCallId} ignored because it doesn\'t match the current group call', + ); + return; + } + + final existingCall = _getCallForParticipant( + groupCall, + CallParticipant( + groupCall.voip, + userId: newCall.remoteUserId!, + deviceId: newCall.remoteDeviceId, + ), + ); + + // if it's an existing call, `_registerListenersForCall` will be called in + // `_replaceCall` that is used in `_onIncomingCallStart`. + if (existingCall != null) return; + + Logs().v( + '[_onIncomingCallInMeshSetup] GroupCallSession: incoming call from: ${newCall.remoteUserId}${newCall.remoteDeviceId}${newCall.remotePartyId}', + ); + + _registerListenersBeforeCallAdd(newCall); + } + + Future _onIncomingCallInMeshStart( GroupCallSession groupCall, CallSession newCall, ) async { @@ -667,14 +720,16 @@ class MeshBackend extends CallBackend { } if (newCall.state != CallState.kRinging) { - Logs().w('Incoming call no longer in ringing state. Ignoring.'); + Logs().v( + '[_onIncomingCallInMeshStart] Incoming call no longer in ringing state. Ignoring.', + ); return; } if (newCall.groupCallId == null || newCall.groupCallId != groupCall.groupCallId) { Logs().v( - 'Incoming call with groupCallId ${newCall.groupCallId} ignored because it doesn\'t match the current group call', + '[_onIncomingCallInMeshStart] Incoming call with groupCallId ${newCall.groupCallId} ignored because it doesn\'t match the current group call', ); await newCall.reject(); return; @@ -694,7 +749,7 @@ class MeshBackend extends CallBackend { } Logs().v( - 'GroupCallSession: incoming call from: ${newCall.remoteUserId}${newCall.remoteDeviceId}${newCall.remotePartyId}', + '[_onIncomingCallInMeshStart] GroupCallSession: incoming call from: ${newCall.remoteUserId}${newCall.remoteDeviceId}${newCall.remotePartyId}', ); // Check if the user calling has an existing call and use this call instead. @@ -800,7 +855,8 @@ class MeshBackend extends CallBackend { _activeSpeaker = null; _activeSpeakerLoopTimeout?.cancel(); - await _callSubscription?.cancel(); + await _callSetupSubscription?.cancel(); + await _callStartSubscription?.cancel(); } @override @@ -826,11 +882,16 @@ class MeshBackend extends CallBackend { GroupCallSession groupCall, ) async { for (final call in _callSessions) { - await _onIncomingCall(groupCall, call); + _onIncomingCallInMeshSetup(groupCall, call); + await _onIncomingCallInMeshStart(groupCall, call); } - _callSubscription = groupCall.voip.onIncomingCall.stream.listen( - (newCall) => _onIncomingCall(groupCall, newCall), + _callSetupSubscription = groupCall.voip.onIncomingCallSetup.stream.listen( + (newCall) => _onIncomingCallInMeshSetup(groupCall, newCall), + ); + + _callStartSubscription = groupCall.voip.onIncomingCallStart.stream.listen( + (newCall) => _onIncomingCallInMeshStart(groupCall, newCall), ); _onActiveSpeakerLoop(groupCall); @@ -883,6 +944,8 @@ class MeshBackend extends CallBackend { // party id set to when answered newCall.remoteSessionId = mem.membershipId; + _registerListenersBeforeCallAdd(newCall); + await newCall.placeCallWithStreams( _getLocalStreams(), requestScreenSharing: mem.feeds?.any( diff --git a/lib/src/voip/models/webrtc_delegate.dart b/lib/src/voip/models/webrtc_delegate.dart index c499cdb8..9add8292 100644 --- a/lib/src/voip/models/webrtc_delegate.dart +++ b/lib/src/voip/models/webrtc_delegate.dart @@ -11,6 +11,7 @@ abstract class WebRTCDelegate { ]); Future playRingtone(); Future stopRingtone(); + Future registerListeners(CallSession session); Future handleNewCall(CallSession session); Future handleCallEnded(CallSession session); Future handleMissedCall(CallSession session); diff --git a/lib/src/voip/voip.dart b/lib/src/voip/voip.dart index 5c101600..ab073804 100644 --- a/lib/src/voip/voip.dart +++ b/lib/src/voip/voip.dart @@ -36,7 +36,13 @@ class VoIP { Map get groupCalls => _groupCalls; final Map _groupCalls = {}; - final CachedStreamController onIncomingCall = + /// The stream is used to prepare for incoming peer calls in a mesh call + /// For example, registering listeners + final CachedStreamController onIncomingCallSetup = + CachedStreamController(); + + /// The stream is used to signal the start of an incoming peer call in a mesh call + final CachedStreamController onIncomingCallStart = CachedStreamController(); VoipId? currentCID; @@ -479,6 +485,12 @@ class VoIP { // by terminate. currentCID = VoipId(roomId: room.id, callId: callId); + if (confId == null) { + await delegate.registerListeners(newCall); + } else { + onIncomingCallSetup.add(newCall); + } + await newCall.initWithInvite( callType, offer, @@ -493,8 +505,7 @@ class VoIP { } if (confId != null) { - // the stream is used to monitor incoming peer calls in a mesh call - onIncomingCall.add(newCall); + onIncomingCallStart.add(newCall); } } @@ -768,6 +779,8 @@ class VoIP { newCall.remoteUserId = userId; newCall.remoteDeviceId = deviceId; + await delegate.registerListeners(newCall); + currentCID = VoipId(roomId: roomId, callId: callId); await newCall.initOutboundCall(type).then((_) { delegate.handleNewCall(newCall); diff --git a/test/webrtc_stub.dart b/test/webrtc_stub.dart index c0f30631..76897198 100644 --- a/test/webrtc_stub.dart +++ b/test/webrtc_stub.dart @@ -15,6 +15,11 @@ class MockWebRTCDelegate implements WebRTCDelegate { ]) async => MockRTCPeerConnection(); + @override + Future registerListeners(CallSession session) async { + Logs().i('registerListeners called in MockWebRTCDelegate'); + } + @override Future handleCallEnded(CallSession session) async { Logs().i('handleCallEnded called in MockWebRTCDelegate');