diff --git a/src/main/java/com/amazonaws/kinesisvideo/demoapp/activity/WebRtcActivity.java b/src/main/java/com/amazonaws/kinesisvideo/demoapp/activity/WebRtcActivity.java index ba5e223..38be986 100644 --- a/src/main/java/com/amazonaws/kinesisvideo/demoapp/activity/WebRtcActivity.java +++ b/src/main/java/com/amazonaws/kinesisvideo/demoapp/activity/WebRtcActivity.java @@ -11,8 +11,12 @@ import static com.amazonaws.kinesisvideo.demoapp.fragment.StreamWebRtcConfigurationFragment.KEY_REGION; import static com.amazonaws.kinesisvideo.demoapp.fragment.StreamWebRtcConfigurationFragment.KEY_SEND_AUDIO; import static com.amazonaws.kinesisvideo.demoapp.fragment.StreamWebRtcConfigurationFragment.KEY_STREAM_ARN; +import static com.amazonaws.kinesisvideo.demoapp.fragment.StreamWebRtcConfigurationFragment.KEY_USE_STUN; import static com.amazonaws.kinesisvideo.demoapp.fragment.StreamWebRtcConfigurationFragment.KEY_WEBRTC_ENDPOINT; import static com.amazonaws.kinesisvideo.demoapp.fragment.StreamWebRtcConfigurationFragment.KEY_WSS_ENDPOINT; +import static com.amazonaws.kinesisvideo.utils.Constants.EXPONENTIAL_BACKOFF_CAP_MILLISECONDS; +import static com.amazonaws.kinesisvideo.utils.Constants.LOG_STATS_INTERVAL_SECONDS; +import static com.amazonaws.kinesisvideo.utils.Constants.WEBSOCKET_MESSAGE_DELIVERY_TIMEOUT_MILLISECONDS; import android.annotation.SuppressLint; import android.app.NotificationChannel; @@ -40,6 +44,9 @@ import androidx.core.app.NotificationCompat; import androidx.core.app.NotificationManagerCompat; +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.ClientConfiguration; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSSessionCredentials; import com.amazonaws.kinesisvideo.demoapp.KinesisVideoWebRtcDemoApp; @@ -54,6 +61,7 @@ import com.amazonaws.kinesisvideo.webrtc.KinesisVideoSdpObserver; import com.amazonaws.regions.Region; import com.amazonaws.services.kinesisvideowebrtcstorage.AWSKinesisVideoWebRTCStorageClient; +import com.amazonaws.services.kinesisvideowebrtcstorage.model.ClientLimitExceededException; import com.amazonaws.services.kinesisvideowebrtcstorage.model.JoinStorageSessionRequest; import com.google.common.base.Strings; @@ -84,7 +92,9 @@ import org.webrtc.VideoTrack; import org.webrtc.audio.JavaAudioDeviceModule; +import java.net.SocketTimeoutException; import java.net.URI; +import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.ArrayList; @@ -97,9 +107,15 @@ import java.util.Optional; import java.util.Queue; import java.util.UUID; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; public class WebRtcActivity extends AppCompatActivity { private static final String TAG = "KVSWebRtcActivity"; @@ -114,7 +130,11 @@ public class WebRtcActivity extends AppCompatActivity { private static final boolean ENABLE_H264_HIGH_PROFILE = true; private static volatile SignalingServiceWebSocketClient client; + private AWSKinesisVideoWebRTCStorageClient storageClient; + private final AtomicInteger runIdentifier = new AtomicInteger(); + private final Queue joinStorageSessionFailureDates = new ConcurrentLinkedQueue<>(); // Holds epoch milliseconds of joinStorageSession failures private PeerConnectionFactory peerConnectionFactory; + private ScheduledFuture printStatsTask; private VideoSource videoSource; private VideoTrack localVideoTrack; @@ -129,6 +149,7 @@ public class WebRtcActivity extends AppCompatActivity { private SurfaceViewRenderer remoteView; private PeerConnection localPeer; + private final ReentrantLock localPeerLock = new ReentrantLock(true); private EglBase rootEglBase = null; private VideoCapturer videoCapturer; @@ -136,6 +157,9 @@ public class WebRtcActivity extends AppCompatActivity { private final List peerIceServers = new ArrayList<>(); private boolean gotException = false; + private final AtomicBoolean sdpOfferReceived = new AtomicBoolean(false); + + private SignalingListener signalingListener; private String recipientClientId; @@ -185,7 +209,7 @@ private void initWsConnection() { final String masterEndpoint = mWssEndpoint + "?" + Constants.CHANNEL_ARN_QUERY_PARAM + "=" + mChannelArn; // See https://docs.aws.amazon.com/kinesisvideostreams-webrtc-dg/latest/devguide/kvswebrtc-websocket-apis-1.html - final String viewerEndpoint = mWssEndpoint + "?" + Constants.CHANNEL_ARN_QUERY_PARAM + "=" + mChannelArn + "&" + Constants.CLIENT_ID_QUERY_PARAM+ "=" + mClientId; + final String viewerEndpoint = mWssEndpoint + "?" + Constants.CHANNEL_ARN_QUERY_PARAM + "=" + mChannelArn + "&" + Constants.CLIENT_ID_QUERY_PARAM + "=" + mClientId; runOnUiThread(() -> mCreds = KinesisVideoWebRtcDemoApp.getCredentialsProvider().getCredentials()); @@ -206,26 +230,49 @@ private void initWsConnection() { } final String wsHost = signedUri.toString(); + if (wsHost == null) { + gotException = true; + return; + } // Step 10. Create Signaling Client Event Listeners. // When we receive messages, we need to take the appropriate action. - final SignalingListener signalingListener = new SignalingListener() { + signalingListener = new SignalingListener() { @Override - public void onSdpOffer(final Event offerEvent) { - Log.d(TAG, "Received SDP Offer: Setting Remote Description "); + public synchronized void onSdpOffer(final Event offerEvent) { + Log.d(TAG, "Received SDP Offer: Setting Remote Description"); + + // TODO: Change this from a single variable to another data structure + // (e.g. map of clientId -> PeerConnection) to be able to have multiple simultaneous + // peer connections + localPeerLock.lock(); + try { + if (localPeer != null && localPeer.getRemoteDescription() != null) { + Log.w(TAG, "Peer already exists, resetting."); + peerConnectionFoundMap.remove(offerEvent.getSenderClientId()); + localPeer.dispose(); + localPeer = null; + createLocalPeerConnection(); + } + sdpOfferReceived.set(true); + + final String sdp = Event.parseOfferEvent(offerEvent); - final String sdp = Event.parseOfferEvent(offerEvent); + localPeer.setRemoteDescription(new KinesisVideoSdpObserver(), new SessionDescription(SessionDescription.Type.OFFER, sdp)); - localPeer.setRemoteDescription(new KinesisVideoSdpObserver(), new SessionDescription(SessionDescription.Type.OFFER, sdp)); - recipientClientId = offerEvent.getSenderClientId(); - Log.d(TAG, "Received SDP offer for client ID: " + recipientClientId + ". Creating answer"); + recipientClientId = offerEvent.getSenderClientId(); - createSdpAnswer(); + Log.d(TAG, "Received SDP offer for client ID: " + recipientClientId + ". Creating answer"); + + createSdpAnswer(); + } finally { + localPeerLock.unlock(); + } if (master && webrtcEndpoint != null) { - runOnUiThread(() -> Toast.makeText(getApplicationContext(), "Media is being recorded to " + mStreamArn, Toast.LENGTH_LONG).show()); - Log.i(TAG, "Media is being recorded to " + mStreamArn); + runOnUiThread(() -> Toast.makeText(getApplicationContext(), "Received offer from the storage session.", Toast.LENGTH_SHORT).show()); + Log.i(TAG, "Offer received from the storage session."); } } @@ -238,14 +285,20 @@ public void onSdpAnswer(final Event answerEvent) { final SessionDescription sdpAnswer = new SessionDescription(SessionDescription.Type.ANSWER, sdp); - localPeer.setRemoteDescription(new KinesisVideoSdpObserver() { - @Override - public void onCreateFailure(final String error) { - super.onCreateFailure(error); - } - }, sdpAnswer); - Log.d(TAG, "Answer Client ID: " + answerEvent.getSenderClientId()); - peerConnectionFoundMap.put(answerEvent.getSenderClientId(), localPeer); + localPeerLock.lock(); + try { + localPeer.setRemoteDescription(new KinesisVideoSdpObserver() { + @Override + public void onCreateFailure(final String error) { + super.onCreateFailure(error); + } + }, sdpAnswer); + + Log.d(TAG, "Answer Client ID: " + answerEvent.getSenderClientId()); + peerConnectionFoundMap.put(answerEvent.getSenderClientId(), localPeer); + } finally { + localPeerLock.unlock(); + } // Check if ICE candidates are available in the queue and add the candidate handlePendingIceCandidates(answerEvent.getSenderClientId()); @@ -278,54 +331,56 @@ public void onException(final Exception e) { // Step 11. Create SignalingServiceWebSocketClient. // This is the actual client that is used to send messages over the signaling channel. // SignalingServiceWebSocketClient will attempt to open the connection in its constructor. - if (wsHost != null) { - try { - client = new SignalingServiceWebSocketClient(wsHost, signalingListener, Executors.newFixedThreadPool(10)); - - Log.d(TAG, "Client connection " + (client.isOpen() ? "Successful" : "Failed")); - } catch (final Exception e) { - Log.e(TAG, "Exception with websocket client: " + e); - gotException = true; - return; + try { + long pingIntervalSeconds = 0; + if (webrtcEndpoint != null) { + // Send a ping message every 4 min, 30s, since the WebSocket closes after being idle for 10 minutes. + // 4 minutes 30 seconds will allow a ping to be missed + pingIntervalSeconds = TimeUnit.MINUTES.toSeconds(4L) + TimeUnit.SECONDS.toSeconds(30L); } + final ExecutorService executor = Executors.newFixedThreadPool(10); + client = new SignalingServiceWebSocketClient(wsHost, signalingListener, executor, pingIntervalSeconds); - if (isValidClient()) { + Log.d(TAG, "Client connection " + (client.isOpen() ? "Successful" : "Failed")); + } catch (final Exception e) { + Log.e(TAG, "Exception with websocket client: " + e); + gotException = true; + return; + } - Log.d(TAG, "Client connected to Signaling service " + client.isOpen()); + if (!isValidClient()) { + Log.e(TAG, "Error in connecting to signaling service"); + gotException = true; + } - if (master) { + Log.d(TAG, "Client connected to Signaling service " + client.isOpen()); - // If webrtc endpoint is non-null ==> Ingest media was checked - if (webrtcEndpoint != null) { - new Thread(() -> { - try { - final AWSKinesisVideoWebRTCStorageClient storageClient = - new AWSKinesisVideoWebRTCStorageClient( - KinesisVideoWebRtcDemoApp.getCredentialsProvider().getCredentials()); - storageClient.setRegion(Region.getRegion(mRegion)); - storageClient.setSignerRegionOverride(mRegion); - storageClient.setServiceNameIntern("kinesisvideo"); - storageClient.setEndpoint(webrtcEndpoint); - - Log.i(TAG, "Channel ARN is: " + mChannelArn); - storageClient.joinStorageSession(new JoinStorageSessionRequest() - .withChannelArn(mChannelArn)); - Log.i(TAG, "Join storage session request sent!"); - } catch (Exception ex) { - Log.e(TAG, "Error sending join storage session request!", ex); - } - }).start(); + if (master) { + // If webrtc endpoint is non-null ==> We want to use media ingestion feature + if (webrtcEndpoint != null) { + new Thread(() -> { + try { + final ClientConfiguration storageClientConfiguration = new ClientConfiguration() + .withMaxErrorRetry(0); + storageClient = new AWSKinesisVideoWebRTCStorageClient( + KinesisVideoWebRtcDemoApp.getCredentialsProvider().getCredentials(), + storageClientConfiguration); + storageClient.setRegion(Region.getRegion(mRegion)); + storageClient.setSignerRegionOverride(mRegion); + storageClient.setServiceNameIntern("kinesisvideo"); + storageClient.setEndpoint(webrtcEndpoint); + + callJoinStorageSessionUntilSdpOfferReceived(runIdentifier.incrementAndGet()); + } catch (Exception ex) { + Log.e(TAG, "Error sending join storage session request!", ex); } - } else { - Log.d(TAG, "Signaling service is connected: " + - "Sending offer as viewer to remote peer"); // Viewer - - createSdpOffer(); - } - } else { - Log.e(TAG, "Error in connecting to signaling service"); - gotException = true; + }).start(); } + } else { + Log.d(TAG, "Signaling service is connected: " + + "Sending offer as viewer to remote peer"); // Viewer + + createSdpOffer(); } } @@ -383,8 +438,17 @@ private void checkAndAddIceCandidate(final Event message, final IceCandidate ice else { Log.d(TAG, "Peer connection found already"); // Remote sent us ICE candidates, add to local peer connection - final PeerConnection peer = peerConnectionFoundMap.get(message.getSenderClientId()); - final boolean addIce = peer.addIceCandidate(iceCandidate); + + boolean addIce = false; + localPeerLock.lock(); + try { + final PeerConnection peer = peerConnectionFoundMap.get(message.getSenderClientId()); + if (peer != null) { + addIce = peer.addIceCandidate(iceCandidate); + } + } finally { + localPeerLock.unlock(); + } Log.d(TAG, "Added ice candidate " + iceCandidate + " " + (addIce ? "Successfully" : "Failed")); } @@ -394,6 +458,8 @@ private void checkAndAddIceCandidate(final Event message, final IceCandidate ice protected void onDestroy() { Thread.setDefaultUncaughtExceptionHandler(null); printStatsExecutor.shutdownNow(); + sdpOfferReceived.set(true); + runIdentifier.incrementAndGet(); audioManager.setMode(originalAudioMode); audioManager.setSpeakerphoneOn(originalSpeakerphoneOn); @@ -408,9 +474,14 @@ protected void onDestroy() { remoteView = null; } - if (localPeer != null) { - localPeer.dispose(); - localPeer = null; + localPeerLock.lock(); + try { + if (localPeer != null) { + localPeer.dispose(); + localPeer = null; + } + } finally { + localPeerLock.unlock(); } if (videoSource != null) { @@ -452,7 +523,7 @@ protected void onPostCreate(@Nullable Bundle savedInstanceState) { initWsConnection(); if (!gotException && isValidClient()) { - Toast.makeText(this, "Signaling Connected", Toast.LENGTH_LONG).show(); + Toast.makeText(this, "Signaling Connected", Toast.LENGTH_SHORT).show(); } else { notifySignalingConnectionFailed(); } @@ -460,7 +531,7 @@ protected void onPostCreate(@Nullable Bundle savedInstanceState) { private void notifySignalingConnectionFailed() { finish(); - Toast.makeText(this, "Connection error to signaling", Toast.LENGTH_LONG).show(); + Toast.makeText(this, "Connection error to signaling", Toast.LENGTH_SHORT).show(); } @Override @@ -491,14 +562,16 @@ protected void onCreate(final Bundle savedInstanceState) { rootEglBase = EglBase.create(); - //TODO: add ui to control TURN only option + if (intent.getBooleanExtra(KEY_USE_STUN, true)) { + final PeerConnection.IceServer stun = PeerConnection + .IceServer + .builder(Collections.singletonList(String.format("stun:stun.kinesisvideo.%s.amazonaws.com:443", mRegion))) + .createIceServer(); - final PeerConnection.IceServer stun = PeerConnection - .IceServer - .builder(String.format("stun:stun.kinesisvideo.%s.amazonaws.com:443", mRegion)) - .createIceServer(); - - peerIceServers.add(stun); + peerIceServers.add(stun); + } else { + Log.i(TAG, "Not using STUN."); + } if (mUrisList != null) { for (int i = 0; i < mUrisList.size(); i++) { @@ -653,9 +726,19 @@ public void onAddStream(final MediaStream mediaStream) { public void onIceConnectionChange(final PeerConnection.IceConnectionState iceConnectionState) { super.onIceConnectionChange(iceConnectionState); if (iceConnectionState == PeerConnection.IceConnectionState.FAILED) { - runOnUiThread(() -> Toast.makeText(getApplicationContext(), "Connection to peer failed!", Toast.LENGTH_LONG).show()); + if (mStreamArn == null) { + runOnUiThread(() -> Toast.makeText(getApplicationContext(), "Connection to peer failed!", Toast.LENGTH_SHORT).show()); + } else { + onPeerConnectionFailed(); + } } else if (iceConnectionState == PeerConnection.IceConnectionState.CONNECTED) { - runOnUiThread(() -> Toast.makeText(getApplicationContext(), "Connected to peer!", Toast.LENGTH_LONG).show()); + if (master && mStreamArn != null) { + runOnUiThread(() -> Toast.makeText(getApplicationContext(), "Joined storage session. Media is being recorded to " + mStreamArn, Toast.LENGTH_SHORT).show()); + Log.i(TAG, "Success! Media is being recorded to " + mStreamArn); + } else { + runOnUiThread(() -> Toast.makeText(getApplicationContext(), "Connected to peer!", Toast.LENGTH_SHORT).show()); + Log.i(TAG, "Successfully connected to peer!"); + } } } @@ -705,21 +788,59 @@ public void onMessage(DataChannel.Buffer buffer) { } }); - if (localPeer != null) { - printStatsExecutor.scheduleWithFixedDelay(() -> { - localPeer.getStats(rtcStatsReport -> { - final Map statsMap = rtcStatsReport.getStatsMap(); - for (final Map.Entry entry : statsMap.entrySet()) { - Log.d(TAG, "Stats: " + entry.getKey() + ", " + entry.getValue()); + // Print stats periodically + if (LOG_STATS_INTERVAL_SECONDS > 0) { + if (printStatsTask != null) { + printStatsTask.cancel(false); + } + + printStatsTask = printStatsExecutor.scheduleWithFixedDelay(() -> { + localPeerLock.lock(); + try { + if (localPeer != null) { + localPeer.getStats(rtcStatsReport -> { + final Map statsMap = rtcStatsReport.getStatsMap(); + for (final Map.Entry entry : statsMap.entrySet()) { + Log.d(TAG, "Stats: " + entry.getKey() + ", " + entry.getValue()); + } + }); } - }); - }, 0, 10, TimeUnit.SECONDS); + } finally { + localPeerLock.unlock(); + } + }, 0, LOG_STATS_INTERVAL_SECONDS, TimeUnit.SECONDS); } addDataChannelToLocalPeer(); addStreamToLocalPeer(); } + private void onPeerConnectionFailed() { + sdpOfferReceived.set(false); + joinStorageSessionFailureDates.add(new Date().getTime()); + if (shouldStopRetryingJoinStorageSession()) { + Log.e(TAG, "Failed to join the storage session " + Constants.MAX_CONNECTION_FAILURES_WITHIN_INTERVAL_FOR_JOIN_STORAGE_SESSION_RETRIES + " times within a " + TimeUnit.MILLISECONDS.toMinutes(Constants.JOIN_STORAGE_SESSION_RETRIES_INTERVAL_MILLIS) + "-minute interval! (" + joinStorageSessionFailureDates + ")"); + runOnUiThread(() -> { + Toast.makeText(getApplicationContext(), "Failed to connect to the storage session too many times within a short interval!", Toast.LENGTH_SHORT).show(); + onBackPressed(); + }); + return; + } + runOnUiThread(() -> Toast.makeText(getApplicationContext(), "Attempting to connect to the storage session again!", Toast.LENGTH_SHORT).show()); + Log.i(TAG, "Attempting to reconnect to media service."); + new Thread(() -> { + try { + if (!client.isOpen()) { + Log.i(TAG, "The WebSocket isn't open. Need to re-open."); + reopenWebsocket(); + } + callJoinStorageSessionUntilSdpOfferReceived(runIdentifier.incrementAndGet()); + } catch (Exception ex) { + Log.e(TAG, "Error sending join storage session request!", ex); + } + }).start(); + } + private Message createIceCandidateMessage(final IceCandidate iceCandidate) { final String sdpMid = iceCandidate.sdpMid; final int sdpMLineIndex = iceCandidate.sdpMLineIndex; @@ -742,7 +863,6 @@ private Message createIceCandidateMessage(final IceCandidate iceCandidate) { } private void addStreamToLocalPeer() { - final MediaStream stream = peerConnectionFactory.createLocalMediaStream(LOCAL_MEDIA_STREAM_LABEL); if (!stream.addTrack(localVideoTrack)) { @@ -762,7 +882,6 @@ private void addStreamToLocalPeer() { Log.d(TAG, "Sending audio track"); } } - } private void addDataChannelToLocalPeer() { @@ -814,29 +933,33 @@ private void createSdpOffer() { sdpMediaConstraints.mandatory.add(new MediaConstraints.KeyValuePair("OfferToReceiveVideo", "true")); sdpMediaConstraints.mandatory.add(new MediaConstraints.KeyValuePair("OfferToReceiveAudio", "true")); - if (localPeer == null) { - - createLocalPeerConnection(); - } + localPeerLock.lock(); + try { + if (localPeer == null) { + createLocalPeerConnection(); + } - localPeer.createOffer(new KinesisVideoSdpObserver() { + localPeer.createOffer(new KinesisVideoSdpObserver() { - @Override - public void onCreateSuccess(SessionDescription sessionDescription) { + @Override + public void onCreateSuccess(SessionDescription sessionDescription) { - super.onCreateSuccess(sessionDescription); + super.onCreateSuccess(sessionDescription); - localPeer.setLocalDescription(new KinesisVideoSdpObserver(), sessionDescription); + localPeer.setLocalDescription(new KinesisVideoSdpObserver(), sessionDescription); - final Message sdpOfferMessage = Message.createOfferMessage(sessionDescription, mClientId); + final Message sdpOfferMessage = Message.createOfferMessage(sessionDescription, mClientId); - if (isValidClient()) { - client.sendSdpOffer(sdpOfferMessage); - } else { - notifySignalingConnectionFailed(); + if (isValidClient()) { + client.sendSdpOffer(sdpOfferMessage); + } else { + notifySignalingConnectionFailed(); + } } - } - }, sdpMediaConstraints); + }, sdpMediaConstraints); + } finally { + localPeerLock.unlock(); + } } @@ -860,6 +983,27 @@ public void onCreateSuccess(final SessionDescription sessionDescription) { peerConnectionFoundMap.put(recipientClientId, localPeer); handlePendingIceCandidates(recipientClientId); + + if (mStreamArn != null) { + new Thread() { + @Override + public void run() { + final PeerConnection pc = localPeer; + try { + Thread.sleep(TimeUnit.SECONDS.toMillis(Constants.TIMEOUT_TO_ESTABLISH_CONNECTION_WITH_MEDIA_SERVER_SECONDS)); + if (pc == localPeer && + localPeer.iceConnectionState() != PeerConnection.IceConnectionState.CONNECTED && + localPeer.iceConnectionState() != PeerConnection.IceConnectionState.DISCONNECTED && + localPeer.iceConnectionState() != PeerConnection.IceConnectionState.CLOSED) { + Log.e(TAG, "Couldn't connect to media server within " + Constants.TIMEOUT_TO_ESTABLISH_CONNECTION_WITH_MEDIA_SERVER_SECONDS + " seconds!"); + onPeerConnectionFailed(); + } + } catch (final Exception ex) { + Log.e(TAG, "Exception while waiting!", ex); + } + } + }.start(); + } } @Override @@ -921,7 +1065,7 @@ private URI getSignedUri(final String endpoint) { .orElse(""); if (accessKey.isEmpty() || secretKey.isEmpty()) { - Toast.makeText(this, "Failed to fetch credentials!", Toast.LENGTH_LONG).show(); + Toast.makeText(this, "Failed to fetch credentials!", Toast.LENGTH_SHORT).show(); return null; } @@ -1017,4 +1161,105 @@ private void createNotificationChannel() { notificationManager.createNotificationChannel(channel); } } + + private void reopenWebsocket() { + Log.i(TAG, "Reconnecting to the WebSocket!"); + // See https://docs.aws.amazon.com/kinesisvideostreams-webrtc-dg/latest/devguide/kvswebrtc-websocket-apis-2.html + final String masterEndpoint = mWssEndpoint + "?" + Constants.CHANNEL_ARN_QUERY_PARAM + "=" + mChannelArn; + + // See https://docs.aws.amazon.com/kinesisvideostreams-webrtc-dg/latest/devguide/kvswebrtc-websocket-apis-1.html + final String viewerEndpoint = mWssEndpoint + "?" + Constants.CHANNEL_ARN_QUERY_PARAM + "=" + mChannelArn + "&" + Constants.CLIENT_ID_QUERY_PARAM + "=" + mClientId; + + mCreds = KinesisVideoWebRtcDemoApp.getCredentialsProvider().getCredentials(); + + final URI signedUri; + if (master) { + signedUri = getSignedUri(masterEndpoint); + } else { + signedUri = getSignedUri(viewerEndpoint); + } + + if (signedUri == null) { + Log.e(TAG, "There was an error generating the URI"); + return; + } + + if (client != null && client.isOpen()) { + client.disconnect(); + } + + long pingIntervalSeconds = 0; + if (webrtcEndpoint != null) { + // Send a ping message every 4 min, 30s, since the WebSocket closes after being idle for 10 minutes. + // 4 minutes 30 seconds will allow a ping to be missed + pingIntervalSeconds = TimeUnit.MINUTES.toSeconds(4L) + TimeUnit.SECONDS.toSeconds(30L); + } + final ExecutorService executor = Executors.newFixedThreadPool(10); + + client = new SignalingServiceWebSocketClient(signedUri.toString(), signalingListener, executor, pingIntervalSeconds); + } + + private boolean shouldStopRetryingJoinStorageSession() { + // It should stop retrying if there are too many failures within the past INTERVAL. + final long intervalAgoEpochMillis = new Date().getTime() - Constants.JOIN_STORAGE_SESSION_RETRIES_INTERVAL_MILLIS; + + Long front = joinStorageSessionFailureDates.peek(); + while (front != null && front < intervalAgoEpochMillis) { + joinStorageSessionFailureDates.remove(); + front = joinStorageSessionFailureDates.peek(); + } + + joinStorageSessionFailureDates.removeIf(failureEpochMillis -> failureEpochMillis < intervalAgoEpochMillis); + + return joinStorageSessionFailureDates.size() >= Constants.MAX_CONNECTION_FAILURES_WITHIN_INTERVAL_FOR_JOIN_STORAGE_SESSION_RETRIES; + } + + private void callJoinStorageSessionUntilSdpOfferReceived(final int currentIteration) { + boolean doRetry = true; + int currentRetryCount = 1; + while (doRetry && currentIteration == runIdentifier.get() && currentRetryCount < LOG_STATS_INTERVAL_SECONDS) { + if (currentRetryCount == 1) { + Log.i(TAG, "Retrying joinStorageSession."); + } + Log.i(TAG, "Channel ARN is: " + mChannelArn); + try { + storageClient.joinStorageSession(new JoinStorageSessionRequest() + .withChannelArn(mChannelArn)); + + Thread.sleep(WEBSOCKET_MESSAGE_DELIVERY_TIMEOUT_MILLISECONDS); + + doRetry = !sdpOfferReceived.get(); + } catch (final AmazonServiceException ex) { + doRetry = ex.getStatusCode() == 500 || ex instanceof ClientLimitExceededException; + if (doRetry) { + Log.e(TAG, "Encountered retryable service exception!", ex); + } else { + Log.e(TAG, "Encountered non-retryable service exception!", ex); + } + } catch (final AmazonClientException ex) { + doRetry = ex.getCause() instanceof UnknownHostException || ex.getCause() instanceof SocketTimeoutException; + if (doRetry) { + Log.e(TAG, "Encountered retryable client exception!", ex); + } else { + Log.e(TAG, "Encountered non-retryable client exception!", ex); + } + } catch (final Exception ex) { + Log.e(TAG, "Encountered non-retryable exception!", ex); + doRetry = false; + } + Log.i(TAG, "Join storage session request sent!"); + currentRetryCount++; + + try { + Thread.sleep(calculateExponentialBackoff(currentRetryCount)); + } catch (final InterruptedException e) { + Log.e(TAG, "Interrupted while sleeping!", e); + return; + } + } + } + + private long calculateExponentialBackoff(final int retryCount) { + return (long) (Math.random() * Math.min(Constants.EXPONENTIAL_BACKOFF_COEFFICIENT_MILLISECONDS * Math.pow(2, retryCount - 1), EXPONENTIAL_BACKOFF_CAP_MILLISECONDS)); + } } diff --git a/src/main/java/com/amazonaws/kinesisvideo/demoapp/fragment/StreamWebRtcConfigurationFragment.java b/src/main/java/com/amazonaws/kinesisvideo/demoapp/fragment/StreamWebRtcConfigurationFragment.java index 3b4f8e7..b81c702 100644 --- a/src/main/java/com/amazonaws/kinesisvideo/demoapp/fragment/StreamWebRtcConfigurationFragment.java +++ b/src/main/java/com/amazonaws/kinesisvideo/demoapp/fragment/StreamWebRtcConfigurationFragment.java @@ -1,7 +1,6 @@ package com.amazonaws.kinesisvideo.demoapp.fragment; import android.Manifest; -import android.app.Activity; import android.content.Intent; import android.content.pm.PackageManager; import android.os.AsyncTask; @@ -11,7 +10,6 @@ import android.view.LayoutInflater; import android.view.View; import android.view.ViewGroup; -import android.view.inputmethod.InputMethodManager; import android.widget.ArrayAdapter; import android.widget.Button; import android.widget.CheckBox; @@ -19,7 +17,6 @@ import android.widget.EditText; import android.widget.ListView; import android.widget.Spinner; -import android.widget.Toast; import androidx.annotation.NonNull; import androidx.annotation.Nullable; @@ -32,6 +29,7 @@ import com.amazonaws.kinesisvideo.demoapp.R; import com.amazonaws.kinesisvideo.demoapp.activity.SimpleNavActivity; import com.amazonaws.kinesisvideo.demoapp.activity.WebRtcActivity; +import com.amazonaws.kinesisvideo.utils.Constants; import com.amazonaws.regions.Region; import com.amazonaws.services.kinesisvideo.AWSKinesisVideoClient; import com.amazonaws.services.kinesisvideo.model.ChannelRole; @@ -55,7 +53,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.Optional; public class StreamWebRtcConfigurationFragment extends Fragment { private static final String TAG = StreamWebRtcConfigurationFragment.class.getSimpleName(); @@ -76,6 +73,8 @@ public class StreamWebRtcConfigurationFragment extends Fragment { private static final String KEY_SEND_VIDEO = "sendVideo"; public static final String KEY_SEND_AUDIO = "sendAudio"; + private static final String KEY_USE_TURN = "useTurn"; + public static final String KEY_USE_STUN = "useStun"; private static final String[] WEBRTC_OPTIONS = { "Send Video", @@ -92,7 +91,9 @@ public class StreamWebRtcConfigurationFragment extends Fragment { private EditText mClientId; private EditText mRegion; private Spinner mCameras; - private CheckBox mIngestMedia; + private CheckBox mMediaIngestionFeature; + private CheckBox mUseTurn; + private CheckBox mUseStun; private final List mEndpointList = new ArrayList<>(); private final List mIceServerList = new ArrayList<>(); private String mChannelArn = null; @@ -133,7 +134,9 @@ public void onViewCreated(final View view, Bundle savedInstanceState) { mChannelName = view.findViewById(R.id.channel_name); mClientId = view.findViewById(R.id.client_id); mRegion = view.findViewById(R.id.region); - mIngestMedia = view.findViewById(R.id.ingest_media); + mMediaIngestionFeature = view.findViewById(R.id.ingest_media); + mUseTurn = view.findViewById(R.id.use_turn); + mUseStun = view.findViewById(R.id.use_stun); setRegionFromCognito(); mOptions = view.findViewById(R.id.webrtc_options); @@ -195,22 +198,6 @@ public void onClick(final View view) { } private void startMasterActivity() { - - if (mIngestMedia.isChecked()) { - // Check that the "Send Audio" and "Send Video" boxes are enabled. - final SparseBooleanArray checked = mOptions.getCheckedItemPositions(); - for (int i = 0; i < mOptions.getCount(); i++) { - if (!checked.get(i)) { - new AlertDialog.Builder(getActivity()) - .setPositiveButton("OK", null) - .setMessage("Audio and video must be sent to ingest media!") - .create() - .show(); - return; - } - } - } - if (!updateSignalingChannelInfo(mRegion.getText().toString(), mChannelName.getText().toString(), ChannelRole.MASTER)) { @@ -298,6 +285,9 @@ private Bundle setExtras(boolean isMaster) { extras.putBoolean(KEY_OF_OPTIONS[i], checked.get(i)); } + extras.putBoolean(KEY_USE_TURN, mUseTurn.isChecked()); + extras.putBoolean(KEY_USE_STUN, mUseStun.isChecked()); + extras.putBoolean(KEY_CAMERA_FRONT_FACING, mCameras.getSelectedItem().equals("Front Camera")); return extras; @@ -367,6 +357,13 @@ protected String doInBackground(final Object... objects) { final String channelName = (String) objects[1]; final ChannelRole role = (ChannelRole) objects[2]; + final Region regionObj = Region.getRegion(region); + + // Validate region + if (regionObj == null) { + return "The region: " + region + " is invalid or not supported!"; + } + // Step 1. Create Kinesis Video Client final AWSKinesisVideoClient awsKinesisVideoClient; try { @@ -407,27 +404,60 @@ protected String doInBackground(final Object... objects) { // Step 3. If we are ingesting media, we need to check if the Signaling Channel has a Kinesis Video // Stream configured to write media to. We can call the DescribeMediaStorageConfiguration API // to determine this. - if (role == ChannelRole.MASTER && mFragment.get().mIngestMedia.isChecked()) { + if (role == ChannelRole.MASTER && mFragment.get().mMediaIngestionFeature.isChecked()) { + + if (!Constants.INGESTION_PREVIEW_REGIONS.contains(regionObj)) { + return "The media ingestion feature is not supported in " + regionObj.getName() + ". It is only supported in " + Constants.INGESTION_PREVIEW_REGIONS; + } + + + try { + final DescribeMediaStorageConfigurationResult describeMediaStorageConfigurationResult = awsKinesisVideoClient.describeMediaStorageConfiguration( + new DescribeMediaStorageConfigurationRequest() + .withChannelARN(mFragment.get().mChannelArn)); + + if ("ENABLED".equalsIgnoreCase(describeMediaStorageConfigurationResult.getMediaStorageConfiguration().getStatus())) { + Log.i(TAG, "Media storage is enabled for this channel."); + + // Check that the "Send Audio" and "Send Video" boxes are enabled. + final SparseBooleanArray checked = mFragment.get().mOptions.getCheckedItemPositions(); + for (int i = 0; i < mFragment.get().mOptions.getCount(); i++) { + if (!checked.get(i)) { + return "Audio and video must be sent to ingest media!"; + } + } + + mFragment.get().mStreamArn = describeMediaStorageConfigurationResult.getMediaStorageConfiguration().getStreamARN(); + } else { + Log.i(TAG, "Media storage is not enabled for this channel!"); + } + } catch (Exception ex) { + return "Describe Media Storage Configuration failed with exception " + ex.getLocalizedMessage(); + } + } else if (role == ChannelRole.VIEWER && mFragment.get().mMediaIngestionFeature.isChecked() && Constants.INGESTION_PREVIEW_REGIONS.contains(regionObj)) { + Log.i(TAG, "Checking if the signaling channel is in media service mode."); try { final DescribeMediaStorageConfigurationResult describeMediaStorageConfigurationResult = awsKinesisVideoClient.describeMediaStorageConfiguration( new DescribeMediaStorageConfigurationRequest() .withChannelARN(mFragment.get().mChannelArn)); - if (!"ENABLED".equalsIgnoreCase(describeMediaStorageConfigurationResult.getMediaStorageConfiguration().getStatus())) { - Log.e(TAG, "Media storage is not enabled for this channel!"); - return "Media Storage is DISABLED for this channel!"; + if ("ENABLED".equalsIgnoreCase(describeMediaStorageConfigurationResult.getMediaStorageConfiguration().getStatus())) { + return "This signaling channel is configured for WebRTC Ingestion. Regular peer-to-peer connections can no longer occur."; } - mFragment.get().mStreamArn = describeMediaStorageConfigurationResult.getMediaStorageConfiguration().getStreamARN(); } catch (Exception ex) { - return "Describe Media Storage Configuration failed with Exception " + ex.getLocalizedMessage(); + return "Describe Media Storage Configuration failed with exception " + ex.getLocalizedMessage(); } + } else { + mFragment.get().mStreamArn = null; } final String[] protocols; - if (mFragment.get().mIngestMedia.isChecked()) { - protocols = new String[]{"WSS", "HTTPS", "WEBRTC"}; - } else { + if (mFragment.get().mStreamArn == null) { + // Regular WebRTC protocols = new String[]{"WSS", "HTTPS"}; + } else { + // Media ingestion mode + protocols = new String[]{"WSS", "HTTPS", "WEBRTC"}; } // Step 4. Use the Kinesis Video Client to call GetSignalingChannelEndpoint. @@ -451,25 +481,29 @@ protected String doInBackground(final Object... objects) { return "Get Signaling Endpoint failed with Exception " + e.getLocalizedMessage(); } - String dataEndpoint = null; - for (ResourceEndpointListItem endpoint : mFragment.get().mEndpointList) { - if (endpoint.getProtocol().equals("HTTPS")) { - dataEndpoint = endpoint.getResourceEndpoint(); + if (mFragment.get().mUseTurn.isChecked()) { + String dataEndpoint = null; + for (ResourceEndpointListItem endpoint : mFragment.get().mEndpointList) { + if (endpoint.getProtocol().equals("HTTPS")) { + dataEndpoint = endpoint.getResourceEndpoint(); + } } - } - // Step 5. Construct the Kinesis Video Signaling Client. The HTTPS endpoint from the - // GetSignalingChannelEndpoint response above is used with this client. This - // client is just used for getting ICE servers, not for actual signaling. - // Step 6. Call GetIceServerConfig in order to obtain TURN ICE server info. - // Note: the STUN endpoint will be `stun:stun.kinesisvideo.${region}.amazonaws.com:443` - try { - final AWSKinesisVideoSignalingClient awsKinesisVideoSignalingClient = mFragment.get().getAwsKinesisVideoSignalingClient(region, dataEndpoint); - GetIceServerConfigResult getIceServerConfigResult = awsKinesisVideoSignalingClient.getIceServerConfig( - new GetIceServerConfigRequest().withChannelARN(mFragment.get().mChannelArn).withClientId(role.name())); - mFragment.get().mIceServerList.addAll(getIceServerConfigResult.getIceServerList()); - } catch (Exception e) { - return "Get Ice Server Config failed with Exception " + e.getLocalizedMessage(); + // Step 5. Construct the Kinesis Video Signaling Client. The HTTPS endpoint from the + // GetSignalingChannelEndpoint response above is used with this client. This + // client is just used for getting ICE servers, not for actual signaling. + // Step 6. Call GetIceServerConfig in order to obtain TURN ICE server info. + // Note: the STUN endpoint will be `stun:stun.kinesisvideo.${region}.amazonaws.com:443` + try { + final AWSKinesisVideoSignalingClient awsKinesisVideoSignalingClient = mFragment.get().getAwsKinesisVideoSignalingClient(region, dataEndpoint); + GetIceServerConfigResult getIceServerConfigResult = awsKinesisVideoSignalingClient.getIceServerConfig( + new GetIceServerConfigRequest().withChannelARN(mFragment.get().mChannelArn).withClientId(role.name())); + mFragment.get().mIceServerList.addAll(getIceServerConfigResult.getIceServerList()); + } catch (Exception e) { + return "Get Ice Server Config failed with Exception " + e.getLocalizedMessage(); + } + } else { + Log.i(TAG, "Not fetching TURN servers."); } return null; diff --git a/src/main/java/com/amazonaws/kinesisvideo/signaling/SignalingListener.java b/src/main/java/com/amazonaws/kinesisvideo/signaling/SignalingListener.java index 29be647..e76f7a5 100644 --- a/src/main/java/com/amazonaws/kinesisvideo/signaling/SignalingListener.java +++ b/src/main/java/com/amazonaws/kinesisvideo/signaling/SignalingListener.java @@ -38,7 +38,7 @@ public void onMessage(final String message) { switch (evt.getMessageType().toUpperCase()) { case "SDP_OFFER": Log.d(TAG, "Offer received: SenderClientId=" + evt.getSenderClientId()); - Log.d(TAG, new String(Base64.decode(evt.getMessagePayload(), 0))); + Log.d(TAG, new String(Base64.decode(evt.getMessagePayload(), Base64.DEFAULT))); onSdpOffer(evt); break; @@ -49,7 +49,7 @@ public void onMessage(final String message) { break; case "ICE_CANDIDATE": Log.d(TAG, "Ice Candidate received: SenderClientId=" + evt.getSenderClientId()); - Log.d(TAG, new String(Base64.decode(evt.getMessagePayload(), 0))); + Log.d(TAG, new String(Base64.decode(evt.getMessagePayload(), Base64.DEFAULT))); onIceCandidate(evt); break; diff --git a/src/main/java/com/amazonaws/kinesisvideo/signaling/tyrus/SignalingServiceWebSocketClient.java b/src/main/java/com/amazonaws/kinesisvideo/signaling/tyrus/SignalingServiceWebSocketClient.java index 908718d..a9b990f 100644 --- a/src/main/java/com/amazonaws/kinesisvideo/signaling/tyrus/SignalingServiceWebSocketClient.java +++ b/src/main/java/com/amazonaws/kinesisvideo/signaling/tyrus/SignalingServiceWebSocketClient.java @@ -28,8 +28,13 @@ public class SignalingServiceWebSocketClient { public SignalingServiceWebSocketClient(final String uri, final SignalingListener signalingListener, final ExecutorService executorService) { + this(uri, signalingListener, executorService, 0L); + } + + public SignalingServiceWebSocketClient(final String uri, final SignalingListener signalingListener, + final ExecutorService executorService, final long pingIntervalSeconds) { Log.d(TAG, "Connecting to URI " + uri + " as master"); - websocketClient = new WebSocketClient(uri, new ClientManager(), signalingListener, executorService); + websocketClient = new WebSocketClient(uri, new ClientManager(), signalingListener, executorService, pingIntervalSeconds); this.executorService = executorService; } diff --git a/src/main/java/com/amazonaws/kinesisvideo/signaling/tyrus/WebSocketClient.java b/src/main/java/com/amazonaws/kinesisvideo/signaling/tyrus/WebSocketClient.java index 0fa49f6..c529286 100644 --- a/src/main/java/com/amazonaws/kinesisvideo/signaling/tyrus/WebSocketClient.java +++ b/src/main/java/com/amazonaws/kinesisvideo/signaling/tyrus/WebSocketClient.java @@ -15,10 +15,14 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import javax.websocket.ClientEndpointConfig; @@ -40,9 +44,12 @@ class WebSocketClient { private final ExecutorService executorService; + private final ScheduledExecutorService pingService; + WebSocketClient(final String uri, final ClientManager clientManager, final SignalingListener signalingListener, - final ExecutorService executorService) { + final ExecutorService executorService, + final long pingIntervalSeconds) { this.executorService = executorService; final ClientEndpointConfig cec = ClientEndpointConfig.Builder.create() @@ -99,6 +106,38 @@ public void onError(final Session session, final Throwable thr) { }); await().atMost(10, TimeUnit.SECONDS).until(WebSocketClient.this::isOpen); + + if (pingIntervalSeconds > 0) { + pingService = Executors.newSingleThreadScheduledExecutor(); + + pingService.scheduleAtFixedRate(() -> { + try { + if (session == null || !session.isOpen()) { + Log.e(TAG, "Unable to send ping. Session has been closed."); + if (!pingService.isShutdown()) { + pingService.shutdown(); + } + return; + } + session.getAsyncRemote().sendPing(ByteBuffer.wrap("".getBytes(StandardCharsets.UTF_8))); + Log.i(TAG, "Sent ping! Sending another in " + pingIntervalSeconds + " seconds."); + } catch (final Exception ex) { + Log.e(TAG, "Exception sending ping message", ex); + } + }, 0, pingIntervalSeconds, TimeUnit.SECONDS); + + Log.i(TAG, "Sending a ping keep-alive message every " + pingIntervalSeconds + " seconds."); + } else { + pingService = null; + Log.d(TAG, "Not sending any pings."); + } + } + + WebSocketClient(final String uri, final ClientManager clientManager, + final SignalingListener signalingListener, + final ExecutorService executorService) { + + this(uri, clientManager, signalingListener, executorService, 0L); } boolean isOpen() { @@ -129,14 +168,19 @@ void disconnect() { return; } - if (!session.isOpen()) { - Log.w(TAG, "Connection already closed for " + session.getRequestURI()); - return; + if (pingService != null && !pingService.isShutdown()) { + pingService.shutdownNow(); } try { - session.close(); - executorService.shutdownNow(); + if (!session.isOpen()) { + Log.w(TAG, "Connection already closed for " + session.getRequestURI()); + } else { + session.close(); + } + if (!executorService.isShutdown()) { + executorService.shutdownNow(); + } Log.i(TAG, "Disconnected from " + session.getRequestURI() + " successfully!"); } catch (final IOException e) { Log.e(TAG, "Exception closing: " + e.getMessage()); diff --git a/src/main/java/com/amazonaws/kinesisvideo/utils/Constants.java b/src/main/java/com/amazonaws/kinesisvideo/utils/Constants.java index f9d7d0e..ca7ea7f 100644 --- a/src/main/java/com/amazonaws/kinesisvideo/utils/Constants.java +++ b/src/main/java/com/amazonaws/kinesisvideo/utils/Constants.java @@ -1,5 +1,12 @@ package com.amazonaws.kinesisvideo.utils; +import com.amazonaws.regions.Region; +import com.amazonaws.regions.Regions; +import com.google.common.collect.Sets; + +import java.util.Set; +import java.util.concurrent.TimeUnit; + public class Constants { /** * SDK identifier @@ -19,4 +26,27 @@ public class Constants { * Query parameter for Client Id. Only used for viewers. Used for calling Kinesis Video Websocket APIs. */ public static final String CLIENT_ID_QUERY_PARAM = "X-Amz-ClientId"; + + /** + * Regions for WebRTC Ingestion & Storage feature. + */ + public static final Set INGESTION_PREVIEW_REGIONS = Sets.newHashSet(Region.getRegion("us-west-2")); + + public static final int MAX_CONNECTION_FAILURES_WITHIN_INTERVAL_FOR_JOIN_STORAGE_SESSION_RETRIES = 5; + + public static final long JOIN_STORAGE_SESSION_RETRIES_INTERVAL_MILLIS = TimeUnit.MINUTES.toMillis(15); + + public static final long WEBSOCKET_MESSAGE_DELIVERY_TIMEOUT_MILLISECONDS = TimeUnit.SECONDS.toMillis(6); + + /** + * Interval (in seconds) in which to print the peer connection stats. + * 0 to disable. + */ + public static final int LOG_STATS_INTERVAL_SECONDS = 30; + + public static final long EXPONENTIAL_BACKOFF_CAP_MILLISECONDS = TimeUnit.SECONDS.toMillis(10); + + public static final long EXPONENTIAL_BACKOFF_COEFFICIENT_MILLISECONDS = TimeUnit.MILLISECONDS.toMillis(200); + + public static final int TIMEOUT_TO_ESTABLISH_CONNECTION_WITH_MEDIA_SERVER_SECONDS = 5; } diff --git a/src/main/res/layout/fragment_stream_webrtc_configuration.xml b/src/main/res/layout/fragment_stream_webrtc_configuration.xml index cf28afc..416d19b 100644 --- a/src/main/res/layout/fragment_stream_webrtc_configuration.xml +++ b/src/main/res/layout/fragment_stream_webrtc_configuration.xml @@ -89,20 +89,51 @@ android:layout_width="match_parent" android:layout_height="20dp" /> + + + + - + android:layout_height="wrap_content" + android:text="Use STUN" + android:layoutDirection="rtl" + android:id="@+id/use_stun" + android:layout_marginLeft="16dp" + android:layout_marginRight="16dp" + android:textSize="16sp" + android:checked="true" + tools:ignore="HardcodedText" /> + +