Skip to content

Commit

Permalink
Merge branch '0.4'
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Franzl committed Apr 24, 2018
2 parents 0ba0bae + 0df381e commit 5298534
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 117 deletions.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Janus rtpforward plugin

This plugin for the [Janus WebRTC gateway](https://github.com/meetecho/janus-gateway) (tested with revision dc81819 [2017-12-04]) takes RTP and RTCP packets from a WebRTC connection (Janus Session) and forwards/sends them to UDP ports for further processing or display by an external receiver/decoder (e.g. a GStreamer pipeline).
This plugin for the [Janus WebRTC gateway](https://github.com/meetecho/janus-gateway) (tested with 0.4.0, revision 673ac3fdc2c683c5 [2018-04-23]) takes RTP and RTCP packets from a WebRTC connection (Janus Session) and forwards/sends them to UDP ports for further processing or display by an external receiver/decoder (e.g. a GStreamer pipeline).

Four destination UDP addresses/ports are used:

Expand Down Expand Up @@ -91,7 +91,7 @@ Note: The following two use cases use the UDP multicast IP address 225.0.0.37. T

## GStreamer display

The following example GStreamer pipeline will output the WebRTC audio and video emitted by this plugin (if the port numbers, the payload numbers, and encoding names match). The payload numbers are negotiated dynamically in the SDP exchange, and may differ from browser to browser, and even from session to session. You need to inspect each SDP exchange to find them on a per-session basis. Such a pipeline is thus best launched programmatically.
The following example GStreamer pipeline will output the WebRTC audio and video emitted by this plugin (if the port numbers and encoding names match).

You probably also need to send a PLI to the browser to request a keyframe if the GStreamer pipeline is launched mid-stream. The following pipeline will start running only after a keyframe has been received.

Expand All @@ -102,9 +102,9 @@ Note that you can lauch the same pipeline several times when you're multicasting
````shell
gst-launch-1.0 -v \
rtpbin name=rtpbin latency=100 \
udpsrc address=225.0.0.37 auto-multicast=true multicast-iface=lo port=60000 caps="application/x-rtp, media=audio, payload=111, encoding-name=OPUS, clock-rate=48000" ! rtpbin.recv_rtp_sink_0 \
udpsrc address=225.0.0.37 auto-multicast=true multicast-iface=lo port=60000 caps="application/x-rtp, media=audio, encoding-name=OPUS, clock-rate=48000" ! rtpbin.recv_rtp_sink_0 \
udpsrc address=225.0.0.37 auto-multicast=true multicast-iface=lo port=60001 caps="application/x-rtcp" ! rtpbin.recv_rtcp_sink_0 \
udpsrc address=225.0.0.37 auto-multicast=true multicast-iface=lo port=60002 caps="application/x-rtp, media=video, payload=96, encoding-name=VP8, clock-rate=90000" ! rtpbin.recv_rtp_sink_1 \
udpsrc address=225.0.0.37 auto-multicast=true multicast-iface=lo port=60002 caps="application/x-rtp, media=video, encoding-name=VP8, clock-rate=90000" ! rtpbin.recv_rtp_sink_1 \
udpsrc address=225.0.0.37 auto-multicast=true multicast-iface=lo port=60003 caps="application/x-rtcp" ! rtpbin.recv_rtcp_sink_1 \
rtpbin. ! rtpvp8depay ! vp8dec ! autovideosink \
rtpbin. ! rtpopusdepay ! queue ! opusdec ! pulsesink
Expand All @@ -120,9 +120,9 @@ The following GStreamer pipeline simply dumps the synchronized (by `rtpbin`) and
gst-launch-1.0 -v -e \
matroskamux name=mux streamable=1 ! filesink location=/tmp/dump.mkv \
rtpbin name=rtpbin latency=100 \
udpsrc address=225.0.0.37 auto-multicast=true multicast-iface=lo port=60000 caps="application/x-rtp, media=audio, payload=111, encoding-name=OPUS, clock-rate=48000" ! rtpbin.recv_rtp_sink_0 \
udpsrc address=225.0.0.37 auto-multicast=true multicast-iface=lo port=60000 caps="application/x-rtp, media=audio, encoding-name=OPUS, clock-rate=48000" ! rtpbin.recv_rtp_sink_0 \
udpsrc address=225.0.0.37 auto-multicast=true multicast-iface=lo port=60001 caps="application/x-rtcp" ! rtpbin.recv_rtcp_sink_0 \
udpsrc address=225.0.0.37 auto-multicast=true multicast-iface=lo port=60002 caps="application/x-rtp, media=video, payload=96, encoding-name=VP8, clock-rate=90000" ! rtpbin.recv_rtp_sink_1 \
udpsrc address=225.0.0.37 auto-multicast=true multicast-iface=lo port=60002 caps="application/x-rtp, media=video, encoding-name=VP8, clock-rate=90000" ! rtpbin.recv_rtp_sink_1 \
udpsrc address=225.0.0.37 auto-multicast=true multicast-iface=lo port=60003 caps="application/x-rtcp" ! rtpbin.recv_rtcp_sink_1 \
rtpbin. ! rtpopusdepay ! mux.audio_0 \
rtpbin. ! rtpvp8depay ! mux.video_0
Expand Down
154 changes: 43 additions & 111 deletions janus_rtpforward.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
#include "utils.h"

#define RTPFORWARD_VERSION 1
#define RTPFORWARD_VERSION_STRING "0.2.4" // use with Janus commit dc81819 [2017-12-04]
#define RTPFORWARD_VERSION_STRING "0.4.0" // convention: always matching Janus version
#define RTPFORWARD_DESCRIPTION "Forwards RTP and RTCP to an external UDP receiver/decoder"
#define RTPFORWARD_NAME "rtpforward"
#define RTPFORWARD_AUTHOR "Michael Karl Franzl"
Expand Down Expand Up @@ -92,7 +92,6 @@ static GThread *handler_thread;
static GThread *watchdog_thread;

static void *rtpforward_handler_thread(void *data);
static void rtpforward_hangup_media_internal(janus_plugin_session *handle);


typedef struct rtpforward_message {
Expand Down Expand Up @@ -143,15 +142,30 @@ typedef struct rtpforward_session {

janus_rtp_switching_context context;
volatile gint hangingup;
gint64 destroyed; /* Time at which this session was marked as destroyed */
volatile gint destroyed;
janus_refcount ref;
} rtpforward_session;


static GHashTable *sessions;
static GList *old_sessions;
static janus_mutex sessions_mutex = JANUS_MUTEX_INITIALIZER;



static void rtpforward_session_destroy(rtpforward_session *session) {
if(session && g_atomic_int_compare_and_exchange(&session->destroyed, 0, 1))
janus_refcount_decrease(&session->ref);
}


static void rtpforward_session_free(const janus_refcount *session_ref) {
rtpforward_session *session = janus_refcount_containerof(session_ref, rtpforward_session, ref);
/* Remove the reference to the core plugin session */
janus_refcount_decrease(&session->handle->ref);
/* This session can be destroyed, free all the resources */
g_free(session);
}

static void rtpforward_message_free(rtpforward_message *msg) {
if(!msg || msg == &exit_message)
return;
Expand All @@ -166,7 +180,7 @@ static void rtpforward_message_free(rtpforward_message *msg) {
if(msg->jsep)
json_decref(msg->jsep);
msg->jsep = NULL;

g_free(msg);
}

Expand All @@ -179,54 +193,6 @@ static void rtpforward_message_free(rtpforward_message *msg) {
#define RTPFORWARD_ERROR_UNKNOWN_ERROR 416


static void *rtpforward_watchdog_thread(void *data) {
JANUS_LOG(LOG_INFO, "%s watchdog started\n", RTPFORWARD_NAME);
gint64 now = 0;
while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
janus_mutex_lock(&sessions_mutex);
/* Iterate on all the sessions */
now = janus_get_monotonic_time();
if(old_sessions != NULL) {
GList *sl = old_sessions;
JANUS_LOG(LOG_HUGE, "%s Watchdog: Checking %d old sessions...\n", RTPFORWARD_NAME, g_list_length(old_sessions));
while(sl) {
rtpforward_session *session = (rtpforward_session *)sl->data;
if(!session) {
sl = sl->next;
continue;
}
if(now - session->destroyed >= 5*G_USEC_PER_SEC) {
JANUS_LOG(LOG_INFO, "%s Watchdog: Freeing old session\n", RTPFORWARD_NAME);
GList *rm = sl->next;
old_sessions = g_list_delete_link(old_sessions, sl);
sl = rm;

close(session->sendsockfd);
session->sendsockfd = -1;

if(session->relay_thread != NULL) {
JANUS_LOG(LOG_INFO, "%s Watchdog: Joining session's relay thread\n", RTPFORWARD_NAME);
g_thread_join(session->relay_thread); // blocking
session->relay_thread = NULL;
JANUS_LOG(LOG_INFO, "%s Watchdog: Session's relay thread joined\n", RTPFORWARD_NAME);
}

session->handle = NULL;
g_free(session);
session = NULL;
continue;
}
sl = sl->next;
}
}
janus_mutex_unlock(&sessions_mutex);
g_usleep(500000);
}
JANUS_LOG(LOG_INFO, "%s Leaving watchdog thread\n", RTPFORWARD_NAME);
return NULL;
}


int rtpforward_init(janus_callbacks *callback, const char *config_path) {
if(g_atomic_int_get(&stopping)) {
return -1;
Expand All @@ -237,18 +203,12 @@ int rtpforward_init(janus_callbacks *callback, const char *config_path) {
return -1;
}

sessions = g_hash_table_new(NULL, NULL);
sessions = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)rtpforward_session_destroy);
messages = g_async_queue_new_full((GDestroyNotify) rtpforward_message_free);
gateway = callback;

GError *error = NULL;

watchdog_thread = g_thread_try_new("rtpforward watchdog thread", &rtpforward_watchdog_thread, NULL, &error);
if(error != NULL) {
JANUS_LOG(LOG_ERR, "%s Got error %d (%s) trying to launch the watchdog thread...\n", RTPFORWARD_NAME, error->code, error->message ? error->message : "??");
return -1;
}

handler_thread = g_thread_try_new("rtpforward message handler thread", rtpforward_handler_thread, NULL, &error);
if(error != NULL) {
JANUS_LOG(LOG_ERR, "%s Got error %d (%s) trying to launch the message handler thread...\n", RTPFORWARD_NAME, error->code, error->message ? error->message : "??");
Expand Down Expand Up @@ -327,6 +287,7 @@ void rtpforward_create_session(janus_plugin_session *handle, int *error) {

rtpforward_session *session = (rtpforward_session *)g_malloc0(sizeof(rtpforward_session));
session->handle = handle;
janus_refcount_init(&session->ref, rtpforward_session_free);

session->sendport_video_rtp = 0;
session->sendport_video_rtcp = 0;
Expand All @@ -351,12 +312,10 @@ void rtpforward_create_session(janus_plugin_session *handle, int *error) {
session->drop_permille = 0;
session->drop_video_packets = 0;
session->drop_audio_packets = 0;


session->destroyed = 0;


janus_rtp_switching_context_reset(&session->context);

g_atomic_int_set(&session->destroyed, 0);
g_atomic_int_set(&session->hangingup, 0);

handle->plugin_handle = session;
Expand All @@ -374,7 +333,7 @@ void rtpforward_destroy_session(janus_plugin_session *handle, int *error) {
if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) {
*error = -1;
return;
}
}
janus_mutex_lock(&sessions_mutex);
rtpforward_session *session = (rtpforward_session *)handle->plugin_handle;
if(!session) {
Expand All @@ -383,15 +342,19 @@ void rtpforward_destroy_session(janus_plugin_session *handle, int *error) {
*error = -2;
return;
}
if(!session->destroyed) {
JANUS_LOG(LOG_INFO, "%s Destroy session...\n", RTPFORWARD_NAME);
rtpforward_hangup_media_internal(handle);
session->destroyed = janus_get_monotonic_time();

g_hash_table_remove(sessions, handle);
old_sessions = g_list_append(old_sessions, session);


JANUS_LOG(LOG_INFO, "%s Destroy session...\n", RTPFORWARD_NAME);
close(session->sendsockfd);

if(session->relay_thread != NULL) {
JANUS_LOG(LOG_INFO, "%s Watchdog: Joining session's relay thread\n", RTPFORWARD_NAME);
g_thread_join(session->relay_thread); // blocking
session->relay_thread = NULL;
JANUS_LOG(LOG_INFO, "%s Watchdog: Session's relay thread joined\n", RTPFORWARD_NAME);
}

g_hash_table_remove(sessions, handle);

janus_mutex_unlock(&sessions_mutex);

JANUS_LOG(LOG_INFO, "%s Session destroyed.\n", RTPFORWARD_NAME);
Expand Down Expand Up @@ -568,7 +531,7 @@ struct janus_plugin_result *rtpforward_handle_message(janus_plugin_session *hand
goto respond;
}
if (IN_MULTICAST(ntohl(inet_addr(sendipv4)))) {
u_int ttl = 0; // do not route UDP packets outside of local host
uint8_t ttl = 0; // do not route UDP packets outside of local host
setsockopt(session->sendsockfd, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl));

struct in_addr mcast_iface_addr;
Expand Down Expand Up @@ -707,8 +670,8 @@ void rtpforward_incoming_rtp(janus_plugin_session *handle, int video, char *buf,
if (missed) {
JANUS_LOG(LOG_WARN, "%s Missed %d packets before sequence number %d\n", RTPFORWARD_NAME, missed, seqn_current);

// We have missined at least one packet.
// Some downstream decoders could be very sensitive to packet loss.
// We have missed at least one packet.
// Some downstream decoders could be sensitive to packet loss.
// In this case, it is recommended to stop video forwarding, and only
// re-start it at the next keyframe.
if (session->disable_video_on_packetloss && session->video_enabled) {
Expand Down Expand Up @@ -736,15 +699,11 @@ void rtpforward_incoming_rtp(janus_plugin_session *handle, int video, char *buf,
}
}



session->seqnr_video_last = seqn_current;

if (!session->video_enabled)
return;

//JANUS_LOG(LOG_INFO, "%s Video %d\n", RTPFORWARD_NAME, seqn_current);

addr.sin_port = htons(session->sendport_video_rtp);


Expand Down Expand Up @@ -794,28 +753,6 @@ void rtpforward_hangup_media(janus_plugin_session *handle) {





static void rtpforward_hangup_media_internal(janus_plugin_session *handle) {
JANUS_LOG(LOG_INFO, "%s rtpforward_hangup_media_internal\n", RTPFORWARD_NAME);
if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized))
return;
rtpforward_session *session = (rtpforward_session *)handle->plugin_handle;
if(!session) {
JANUS_LOG(LOG_ERR, "%s rtpforward_hangup_media_internal: No session associated with this handle...\n", RTPFORWARD_NAME);
return;
}
if(session->destroyed) {
return;
}
if(g_atomic_int_add(&session->hangingup, 1)) {
return;
}
}




/* Thread to handle incoming messages */
static void *rtpforward_handler_thread(void *data) {
JANUS_LOG(LOG_VERB, "%s Starting msg handler thread\n", RTPFORWARD_NAME);
Expand All @@ -824,7 +761,7 @@ static void *rtpforward_handler_thread(void *data) {
char *error_cause = g_malloc0(512);
json_t *body = NULL;


while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) {
msg = g_async_queue_pop(messages);

Expand Down Expand Up @@ -891,8 +828,7 @@ static void *rtpforward_handler_thread(void *data) {
JANUS_SDP_OA_DATA, FALSE,
JANUS_SDP_OA_DONE
);

janus_sdp_free(offer);
janus_sdp_destroy(offer);

const char *negotiated_acodec, *negotiated_vcodec;

Expand All @@ -915,9 +851,7 @@ static void *rtpforward_handler_thread(void *data) {
}

char *sdp_answer = janus_sdp_write(answer);
janus_sdp_free(answer);

//JANUS_LOG(LOG_INFO, "%s SDP ANSWER ASYNC: %s\n", RTPFORWARD_NAME, sdp_answer);
janus_sdp_destroy(answer);

const char *type = "answer";
json_t *jsep = json_pack("{ssss}", "type", type, "sdp", sdp_answer);
Expand All @@ -928,10 +862,8 @@ static void *rtpforward_handler_thread(void *data) {

// How long will the gateway take to push the reply?
g_atomic_int_set(&session->hangingup, 0);
gint64 start = janus_get_monotonic_time();
int res = gateway->push_event(msg->handle, &rtpforward_plugin, msg->transaction, response, jsep);
JANUS_LOG(LOG_VERB, " >> Pushing event: %d (took %"SCNu64" us)\n",
res, janus_get_monotonic_time()-start);
JANUS_LOG(LOG_VERB, " >> Pushing event: %d\n", res);
g_free(sdp_answer);

// The Janus core increases the references to both the message and jsep *json_t objects.
Expand Down

0 comments on commit 5298534

Please sign in to comment.