Skip to content

Commit

Permalink
Merge pull request #1956 from famedly/karthi/register-listener-callback
Browse files Browse the repository at this point in the history
fix: BREAKING! missed initial updates for stream listener callbacks in P2P & mesh calls
  • Loading branch information
td-famedly authored Dec 17, 2024
2 parents 4249cf1 + f3e2559 commit 9ae2384
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 34 deletions.
25 changes: 18 additions & 7 deletions lib/src/voip/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,25 @@ class MyVoipApp implements WebRTCDelegate {
VideoRenderer createRenderer() => RTCVideoRenderer();
@override
void playRingtone(){
Future<void> playRingtone() async {
// play ringtone
}
void stopRingtone() {
Future<void> stopRingtone() async {
// stop ringtone
}
void handleNewCall(CallSession session) {
Future<void> registerListeners(CallSession session) async {
// register all listeners here
session.onCallStreamsChanged.stream.listen((CallStateChange event) async {});
session.onCallReplaced.stream.listen((CallStateChange event) async {});
session.onCallHangupNotifierForGroupCalls.stream.listen((CallStateChange event) async {});
session.onCallStateChanged.stream.listen((CallStateChange event) async {});
session.onCallEventChanged.stream.listen((CallStateChange event) async {});
session.onStreamAdd.stream.listen((CallStateChange event) async {});
session.onStreamRemoved.stream.listen((CallStateChange event) async {});
}
Future<void> handleNewCall(CallSession session) async {
// handle new call incoming or outgoing
switch(session.direction) {
case CallDirection.kIncoming:
Expand All @@ -138,7 +149,7 @@ class MyVoipApp implements WebRTCDelegate {
}
}
void handleCallEnded(CallSession session) {
Future<void> handleCallEnded(CallSession session) async {
// handle call ended by local or remote
}
}
Expand Down Expand Up @@ -170,7 +181,7 @@ newCall.onCallStateChanged.stream.listen((state) {
/// Then you can pop up the incoming call window at MyVoipApp.handleNewCall.
class MyVoipApp implements WebRTCDelegate {
...
void handleNewCall(CallSession session) {
Future<void> handleNewCall(CallSession session) async {
switch(session.direction) {
case CallDirection.kOutgoing:
// show outgoing call window
Expand All @@ -185,13 +196,13 @@ newCall.hangup();

### 4.Answer a incoming call

When a new incoming call comes in, handleNewCall will be called, and the answering interface can pop up at this time, and use `onCallStateChanged` to listen to the call state.
When a new incoming call comes in, registerListeners will be called right before handleNewCall is called, and the answering interface can pop up at this time, and use `onCallStateChanged` to listen to the call state.

The incoming call window need display `answer` and `reject` buttons, by calling `newCall.answer();` or `newCall.reject();` to decide whether to connect the call.

```dart
...
void handleNewCall(CallSession newCall) {
Future<void> registerListeners(CallSession newCall) async {
switch(newCall.direction) {
case CallDirection.kIncoming:
/// show incoming call window
Expand Down
111 changes: 87 additions & 24 deletions lib/src/voip/backend/mesh_backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ class MeshBackend extends CallBackend {
/// participant:volume
final Map<CallParticipant, double> _audioLevelsMap = {};

StreamSubscription<CallSession>? _callSubscription;
/// The stream is used to prepare for incoming peer calls like registering listeners
StreamSubscription<CallSession>? _callSetupSubscription;

/// The stream is used to signal the start of an incoming peer call
StreamSubscription<CallSession>? _callStartSubscription;

Timer? _activeSpeakerLoopTimeout;

Expand Down Expand Up @@ -109,14 +113,32 @@ class MeshBackend extends CallBackend {
);
}

/// Register listeners for a peer call to use for the group calls, that is
/// needed before even call is added to `_callSessions`.
/// We do this here for onStreamAdd and onStreamRemoved to make sure we don't
/// miss any events that happen before the call is completely started.
void _registerListenersBeforeCallAdd(CallSession call) {
call.onStreamAdd.stream.listen((stream) {
if (!stream.isLocal()) {
onStreamAdd.add(stream);
}
});

call.onStreamRemoved.stream.listen((stream) {
if (!stream.isLocal()) {
onStreamRemoved.add(stream);
}
});
}

Future<void> _addCall(GroupCallSession groupCall, CallSession call) async {
_callSessions.add(call);
await _initCall(groupCall, call);
_initCall(groupCall, call);
groupCall.onGroupCallEvent.add(GroupCallStateChange.callsChanged);
}

/// init a peer call from group calls.
Future<void> _initCall(GroupCallSession groupCall, CallSession call) async {
void _initCall(GroupCallSession groupCall, CallSession call) {
if (call.remoteUserId == null) {
throw MatrixSDKVoipException(
'Cannot init call without proper invitee user and device Id',
Expand All @@ -141,18 +163,6 @@ class MeshBackend extends CallBackend {
call.onCallHangupNotifierForGroupCalls.stream.listen((event) async {
await _onCallHangup(groupCall, call);
});

call.onStreamAdd.stream.listen((stream) {
if (!stream.isLocal()) {
onStreamAdd.add(stream);
}
});

call.onStreamRemoved.stream.listen((stream) {
if (!stream.isLocal()) {
onStreamRemoved.add(stream);
}
});
}

Future<void> _replaceCall(
Expand All @@ -171,7 +181,8 @@ class MeshBackend extends CallBackend {
_callSessions.add(replacementCall);

await _disposeCall(groupCall, existingCall, CallErrorCode.replaced);
await _initCall(groupCall, replacementCall);
_registerListenersBeforeCallAdd(replacementCall);
_initCall(groupCall, replacementCall);

groupCall.onGroupCallEvent.add(GroupCallStateChange.callsChanged);
}
Expand Down Expand Up @@ -657,7 +668,49 @@ class MeshBackend extends CallBackend {
return;
}

Future<void> _onIncomingCall(
void _onIncomingCallInMeshSetup(
GroupCallSession groupCall,
CallSession newCall,
) {
// The incoming calls may be for another room, which we will ignore.
if (newCall.room.id != groupCall.room.id) return;

if (newCall.state != CallState.kRinging) {
Logs().v(
'[_onIncomingCallInMeshSetup] Incoming call no longer in ringing state. Ignoring.',
);
return;
}

if (newCall.groupCallId == null ||
newCall.groupCallId != groupCall.groupCallId) {
Logs().v(
'[_onIncomingCallInMeshSetup] Incoming call with groupCallId ${newCall.groupCallId} ignored because it doesn\'t match the current group call',
);
return;
}

final existingCall = _getCallForParticipant(
groupCall,
CallParticipant(
groupCall.voip,
userId: newCall.remoteUserId!,
deviceId: newCall.remoteDeviceId,
),
);

// if it's an existing call, `_registerListenersForCall` will be called in
// `_replaceCall` that is used in `_onIncomingCallStart`.
if (existingCall != null) return;

Logs().v(
'[_onIncomingCallInMeshSetup] GroupCallSession: incoming call from: ${newCall.remoteUserId}${newCall.remoteDeviceId}${newCall.remotePartyId}',
);

_registerListenersBeforeCallAdd(newCall);
}

Future<void> _onIncomingCallInMeshStart(
GroupCallSession groupCall,
CallSession newCall,
) async {
Expand All @@ -667,14 +720,16 @@ class MeshBackend extends CallBackend {
}

if (newCall.state != CallState.kRinging) {
Logs().w('Incoming call no longer in ringing state. Ignoring.');
Logs().v(
'[_onIncomingCallInMeshStart] Incoming call no longer in ringing state. Ignoring.',
);
return;
}

if (newCall.groupCallId == null ||
newCall.groupCallId != groupCall.groupCallId) {
Logs().v(
'Incoming call with groupCallId ${newCall.groupCallId} ignored because it doesn\'t match the current group call',
'[_onIncomingCallInMeshStart] Incoming call with groupCallId ${newCall.groupCallId} ignored because it doesn\'t match the current group call',
);
await newCall.reject();
return;
Expand All @@ -694,7 +749,7 @@ class MeshBackend extends CallBackend {
}

Logs().v(
'GroupCallSession: incoming call from: ${newCall.remoteUserId}${newCall.remoteDeviceId}${newCall.remotePartyId}',
'[_onIncomingCallInMeshStart] GroupCallSession: incoming call from: ${newCall.remoteUserId}${newCall.remoteDeviceId}${newCall.remotePartyId}',
);

// Check if the user calling has an existing call and use this call instead.
Expand Down Expand Up @@ -800,7 +855,8 @@ class MeshBackend extends CallBackend {

_activeSpeaker = null;
_activeSpeakerLoopTimeout?.cancel();
await _callSubscription?.cancel();
await _callSetupSubscription?.cancel();
await _callStartSubscription?.cancel();
}

@override
Expand All @@ -826,11 +882,16 @@ class MeshBackend extends CallBackend {
GroupCallSession groupCall,
) async {
for (final call in _callSessions) {
await _onIncomingCall(groupCall, call);
_onIncomingCallInMeshSetup(groupCall, call);
await _onIncomingCallInMeshStart(groupCall, call);
}

_callSubscription = groupCall.voip.onIncomingCall.stream.listen(
(newCall) => _onIncomingCall(groupCall, newCall),
_callSetupSubscription = groupCall.voip.onIncomingCallSetup.stream.listen(
(newCall) => _onIncomingCallInMeshSetup(groupCall, newCall),
);

_callStartSubscription = groupCall.voip.onIncomingCallStart.stream.listen(
(newCall) => _onIncomingCallInMeshStart(groupCall, newCall),
);

_onActiveSpeakerLoop(groupCall);
Expand Down Expand Up @@ -883,6 +944,8 @@ class MeshBackend extends CallBackend {
// party id set to when answered
newCall.remoteSessionId = mem.membershipId;

_registerListenersBeforeCallAdd(newCall);

await newCall.placeCallWithStreams(
_getLocalStreams(),
requestScreenSharing: mem.feeds?.any(
Expand Down
1 change: 1 addition & 0 deletions lib/src/voip/models/webrtc_delegate.dart
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ abstract class WebRTCDelegate {
]);
Future<void> playRingtone();
Future<void> stopRingtone();
Future<void> registerListeners(CallSession session);
Future<void> handleNewCall(CallSession session);
Future<void> handleCallEnded(CallSession session);
Future<void> handleMissedCall(CallSession session);
Expand Down
19 changes: 16 additions & 3 deletions lib/src/voip/voip.dart
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ class VoIP {
Map<VoipId, GroupCallSession> get groupCalls => _groupCalls;
final Map<VoipId, GroupCallSession> _groupCalls = {};

final CachedStreamController<CallSession> onIncomingCall =
/// The stream is used to prepare for incoming peer calls in a mesh call
/// For example, registering listeners
final CachedStreamController<CallSession> onIncomingCallSetup =
CachedStreamController();

/// The stream is used to signal the start of an incoming peer call in a mesh call
final CachedStreamController<CallSession> onIncomingCallStart =
CachedStreamController();

VoipId? currentCID;
Expand Down Expand Up @@ -479,6 +485,12 @@ class VoIP {
// by terminate.
currentCID = VoipId(roomId: room.id, callId: callId);

if (confId == null) {
await delegate.registerListeners(newCall);
} else {
onIncomingCallSetup.add(newCall);
}

await newCall.initWithInvite(
callType,
offer,
Expand All @@ -493,8 +505,7 @@ class VoIP {
}

if (confId != null) {
// the stream is used to monitor incoming peer calls in a mesh call
onIncomingCall.add(newCall);
onIncomingCallStart.add(newCall);
}
}

Expand Down Expand Up @@ -768,6 +779,8 @@ class VoIP {
newCall.remoteUserId = userId;
newCall.remoteDeviceId = deviceId;

await delegate.registerListeners(newCall);

currentCID = VoipId(roomId: roomId, callId: callId);
await newCall.initOutboundCall(type).then((_) {
delegate.handleNewCall(newCall);
Expand Down
5 changes: 5 additions & 0 deletions test/webrtc_stub.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ class MockWebRTCDelegate implements WebRTCDelegate {
]) async =>
MockRTCPeerConnection();

@override
Future<void> registerListeners(CallSession session) async {
Logs().i('registerListeners called in MockWebRTCDelegate');
}

@override
Future<void> handleCallEnded(CallSession session) async {
Logs().i('handleCallEnded called in MockWebRTCDelegate');
Expand Down

0 comments on commit 9ae2384

Please sign in to comment.