From e8267f222c946ad75711dad8b59e7da9467e2871 Mon Sep 17 00:00:00 2001 From: Michael Franzl Date: Tue, 24 Apr 2018 13:07:50 +0200 Subject: [PATCH 1/3] upgraded to Janus API 0.4.0 --- janus_rtpforward.c | 154 +++++++++++++-------------------------------- 1 file changed, 43 insertions(+), 111 deletions(-) diff --git a/janus_rtpforward.c b/janus_rtpforward.c index 84e9bca..9677be3 100644 --- a/janus_rtpforward.c +++ b/janus_rtpforward.c @@ -29,7 +29,7 @@ #include "utils.h" #define RTPFORWARD_VERSION 1 -#define RTPFORWARD_VERSION_STRING "0.2.3" +#define RTPFORWARD_VERSION_STRING "0.4.0" // convention: always matching Janus version #define RTPFORWARD_DESCRIPTION "Forwards RTP and RTCP to an external UDP receiver/decoder" #define RTPFORWARD_NAME "rtpforward" #define RTPFORWARD_AUTHOR "Michael Karl Franzl" @@ -92,7 +92,6 @@ static GThread *handler_thread; static GThread *watchdog_thread; static void *rtpforward_handler_thread(void *data); -static void rtpforward_hangup_media_internal(janus_plugin_session *handle); typedef struct rtpforward_message { @@ -143,15 +142,30 @@ typedef struct rtpforward_session { janus_rtp_switching_context context; volatile gint hangingup; - gint64 destroyed; /* Time at which this session was marked as destroyed */ + volatile gint destroyed; + janus_refcount ref; } rtpforward_session; static GHashTable *sessions; -static GList *old_sessions; static janus_mutex sessions_mutex = JANUS_MUTEX_INITIALIZER; + +static void rtpforward_session_destroy(rtpforward_session *session) { + if(session && g_atomic_int_compare_and_exchange(&session->destroyed, 0, 1)) + janus_refcount_decrease(&session->ref); +} + + +static void rtpforward_session_free(const janus_refcount *session_ref) { + rtpforward_session *session = janus_refcount_containerof(session_ref, rtpforward_session, ref); + /* Remove the reference to the core plugin session */ + janus_refcount_decrease(&session->handle->ref); + /* This session can be destroyed, free all the resources */ + g_free(session); +} + static void rtpforward_message_free(rtpforward_message *msg) { if(!msg || msg == &exit_message) return; @@ -166,7 +180,7 @@ static void rtpforward_message_free(rtpforward_message *msg) { if(msg->jsep) json_decref(msg->jsep); msg->jsep = NULL; - + g_free(msg); } @@ -179,54 +193,6 @@ static void rtpforward_message_free(rtpforward_message *msg) { #define RTPFORWARD_ERROR_UNKNOWN_ERROR 416 -static void *rtpforward_watchdog_thread(void *data) { - JANUS_LOG(LOG_INFO, "%s watchdog started\n", RTPFORWARD_NAME); - gint64 now = 0; - while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) { - janus_mutex_lock(&sessions_mutex); - /* Iterate on all the sessions */ - now = janus_get_monotonic_time(); - if(old_sessions != NULL) { - GList *sl = old_sessions; - JANUS_LOG(LOG_HUGE, "%s Watchdog: Checking %d old sessions...\n", RTPFORWARD_NAME, g_list_length(old_sessions)); - while(sl) { - rtpforward_session *session = (rtpforward_session *)sl->data; - if(!session) { - sl = sl->next; - continue; - } - if(now - session->destroyed >= 5*G_USEC_PER_SEC) { - JANUS_LOG(LOG_INFO, "%s Watchdog: Freeing old session\n", RTPFORWARD_NAME); - GList *rm = sl->next; - old_sessions = g_list_delete_link(old_sessions, sl); - sl = rm; - - close(session->sendsockfd); - session->sendsockfd = -1; - - if(session->relay_thread != NULL) { - JANUS_LOG(LOG_INFO, "%s Watchdog: Joining session's relay thread\n", RTPFORWARD_NAME); - g_thread_join(session->relay_thread); // blocking - session->relay_thread = NULL; - JANUS_LOG(LOG_INFO, "%s Watchdog: Session's relay thread joined\n", RTPFORWARD_NAME); - } - - session->handle = NULL; - g_free(session); - session = NULL; - continue; - } - sl = sl->next; - } - } - janus_mutex_unlock(&sessions_mutex); - g_usleep(500000); - } - JANUS_LOG(LOG_INFO, "%s Leaving watchdog thread\n", RTPFORWARD_NAME); - return NULL; -} - - int rtpforward_init(janus_callbacks *callback, const char *config_path) { if(g_atomic_int_get(&stopping)) { return -1; @@ -237,18 +203,12 @@ int rtpforward_init(janus_callbacks *callback, const char *config_path) { return -1; } - sessions = g_hash_table_new(NULL, NULL); + sessions = g_hash_table_new_full(NULL, NULL, NULL, (GDestroyNotify)rtpforward_session_destroy); messages = g_async_queue_new_full((GDestroyNotify) rtpforward_message_free); gateway = callback; GError *error = NULL; - watchdog_thread = g_thread_try_new("rtpforward watchdog thread", &rtpforward_watchdog_thread, NULL, &error); - if(error != NULL) { - JANUS_LOG(LOG_ERR, "%s Got error %d (%s) trying to launch the watchdog thread...\n", RTPFORWARD_NAME, error->code, error->message ? error->message : "??"); - return -1; - } - handler_thread = g_thread_try_new("rtpforward message handler thread", rtpforward_handler_thread, NULL, &error); if(error != NULL) { JANUS_LOG(LOG_ERR, "%s Got error %d (%s) trying to launch the message handler thread...\n", RTPFORWARD_NAME, error->code, error->message ? error->message : "??"); @@ -327,6 +287,7 @@ void rtpforward_create_session(janus_plugin_session *handle, int *error) { rtpforward_session *session = (rtpforward_session *)g_malloc0(sizeof(rtpforward_session)); session->handle = handle; + janus_refcount_init(&session->ref, rtpforward_session_free); session->sendport_video_rtp = 0; session->sendport_video_rtcp = 0; @@ -351,12 +312,10 @@ void rtpforward_create_session(janus_plugin_session *handle, int *error) { session->drop_permille = 0; session->drop_video_packets = 0; session->drop_audio_packets = 0; - - - session->destroyed = 0; - + janus_rtp_switching_context_reset(&session->context); + g_atomic_int_set(&session->destroyed, 0); g_atomic_int_set(&session->hangingup, 0); handle->plugin_handle = session; @@ -374,7 +333,7 @@ void rtpforward_destroy_session(janus_plugin_session *handle, int *error) { if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) { *error = -1; return; - } + } janus_mutex_lock(&sessions_mutex); rtpforward_session *session = (rtpforward_session *)handle->plugin_handle; if(!session) { @@ -383,15 +342,19 @@ void rtpforward_destroy_session(janus_plugin_session *handle, int *error) { *error = -2; return; } - if(!session->destroyed) { - JANUS_LOG(LOG_INFO, "%s Destroy session...\n", RTPFORWARD_NAME); - rtpforward_hangup_media_internal(handle); - session->destroyed = janus_get_monotonic_time(); - - g_hash_table_remove(sessions, handle); - old_sessions = g_list_append(old_sessions, session); - + + JANUS_LOG(LOG_INFO, "%s Destroy session...\n", RTPFORWARD_NAME); + close(session->sendsockfd); + + if(session->relay_thread != NULL) { + JANUS_LOG(LOG_INFO, "%s Watchdog: Joining session's relay thread\n", RTPFORWARD_NAME); + g_thread_join(session->relay_thread); // blocking + session->relay_thread = NULL; + JANUS_LOG(LOG_INFO, "%s Watchdog: Session's relay thread joined\n", RTPFORWARD_NAME); } + + g_hash_table_remove(sessions, handle); + janus_mutex_unlock(&sessions_mutex); JANUS_LOG(LOG_INFO, "%s Session destroyed.\n", RTPFORWARD_NAME); @@ -568,7 +531,7 @@ struct janus_plugin_result *rtpforward_handle_message(janus_plugin_session *hand goto respond; } if (IN_MULTICAST(ntohl(inet_addr(sendipv4)))) { - u_int ttl = 0; // do not route UDP packets outside of local host + uint8_t ttl = 0; // do not route UDP packets outside of local host setsockopt(session->sendsockfd, IPPROTO_IP, IP_MULTICAST_TTL, &ttl, sizeof(ttl)); struct in_addr mcast_iface_addr; @@ -707,8 +670,8 @@ void rtpforward_incoming_rtp(janus_plugin_session *handle, int video, char *buf, if (missed) { JANUS_LOG(LOG_WARN, "%s Missed %d packets before sequence number %d\n", RTPFORWARD_NAME, missed, seqn_current); - // We have missined at least one packet. - // Some downstream decoders could be very sensitive to packet loss. + // We have missed at least one packet. + // Some downstream decoders could be sensitive to packet loss. // In this case, it is recommended to stop video forwarding, and only // re-start it at the next keyframe. if (session->disable_video_on_packetloss && session->video_enabled) { @@ -736,15 +699,11 @@ void rtpforward_incoming_rtp(janus_plugin_session *handle, int video, char *buf, } } - - session->seqnr_video_last = seqn_current; if (!session->video_enabled) return; - //JANUS_LOG(LOG_INFO, "%s Video %d\n", RTPFORWARD_NAME, seqn_current); - addr.sin_port = htons(session->sendport_video_rtp); @@ -794,28 +753,6 @@ void rtpforward_hangup_media(janus_plugin_session *handle) { - - -static void rtpforward_hangup_media_internal(janus_plugin_session *handle) { - JANUS_LOG(LOG_INFO, "%s rtpforward_hangup_media_internal\n", RTPFORWARD_NAME); - if(g_atomic_int_get(&stopping) || !g_atomic_int_get(&initialized)) - return; - rtpforward_session *session = (rtpforward_session *)handle->plugin_handle; - if(!session) { - JANUS_LOG(LOG_ERR, "%s rtpforward_hangup_media_internal: No session associated with this handle...\n", RTPFORWARD_NAME); - return; - } - if(session->destroyed) { - return; - } - if(g_atomic_int_add(&session->hangingup, 1)) { - return; - } -} - - - - /* Thread to handle incoming messages */ static void *rtpforward_handler_thread(void *data) { JANUS_LOG(LOG_VERB, "%s Starting msg handler thread\n", RTPFORWARD_NAME); @@ -824,7 +761,7 @@ static void *rtpforward_handler_thread(void *data) { char *error_cause = g_malloc0(512); json_t *body = NULL; - + while(g_atomic_int_get(&initialized) && !g_atomic_int_get(&stopping)) { msg = g_async_queue_pop(messages); @@ -891,8 +828,7 @@ static void *rtpforward_handler_thread(void *data) { JANUS_SDP_OA_DATA, FALSE, JANUS_SDP_OA_DONE ); - - janus_sdp_free(offer); + janus_sdp_destroy(offer); const char *negotiated_acodec, *negotiated_vcodec; @@ -915,9 +851,7 @@ static void *rtpforward_handler_thread(void *data) { } char *sdp_answer = janus_sdp_write(answer); - janus_sdp_free(answer); - - //JANUS_LOG(LOG_INFO, "%s SDP ANSWER ASYNC: %s\n", RTPFORWARD_NAME, sdp_answer); + janus_sdp_destroy(answer); const char *type = "answer"; json_t *jsep = json_pack("{ssss}", "type", type, "sdp", sdp_answer); @@ -928,10 +862,8 @@ static void *rtpforward_handler_thread(void *data) { // How long will the gateway take to push the reply? g_atomic_int_set(&session->hangingup, 0); - gint64 start = janus_get_monotonic_time(); int res = gateway->push_event(msg->handle, &rtpforward_plugin, msg->transaction, response, jsep); - JANUS_LOG(LOG_VERB, " >> Pushing event: %d (took %"SCNu64" us)\n", - res, janus_get_monotonic_time()-start); + JANUS_LOG(LOG_VERB, " >> Pushing event: %d\n", res); g_free(sdp_answer); // The Janus core increases the references to both the message and jsep *json_t objects. From 21c537c44ceed94a70e2cba42df5b688cb73d65d Mon Sep 17 00:00:00 2001 From: Michael Franzl Date: Tue, 24 Apr 2018 13:13:47 +0200 Subject: [PATCH 2/3] README simplification: gstreamer RTP payload numbers do not need to be specified --- README.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index e7753dc..593b39e 100644 --- a/README.md +++ b/README.md @@ -91,7 +91,7 @@ Note: The following two use cases use the UDP multicast IP address 225.0.0.37. T ## GStreamer display -The following example GStreamer pipeline will output the WebRTC audio and video emitted by this plugin (if the port numbers, the payload numbers, and encoding names match). The payload numbers are negotiated dynamically in the SDP exchange, and may differ from browser to browser, and even from session to session. You need to inspect each SDP exchange to find them on a per-session basis. Such a pipeline is thus best launched programmatically. +The following example GStreamer pipeline will output the WebRTC audio and video emitted by this plugin (if the port numbers and encoding names match). You probably also need to send a PLI to the browser to request a keyframe if the GStreamer pipeline is launched mid-stream. The following pipeline will start running only after a keyframe has been received. @@ -102,9 +102,9 @@ Note that you can lauch the same pipeline several times when you're multicasting ````shell gst-launch-1.0 -v \ rtpbin name=rtpbin latency=100 \ -udpsrc address=225.0.0.37 auto-multicast=true multicast-iface=lo port=60000 caps="application/x-rtp, media=audio, payload=111, encoding-name=OPUS, clock-rate=48000" ! rtpbin.recv_rtp_sink_0 \ +udpsrc address=225.0.0.37 auto-multicast=true multicast-iface=lo port=60000 caps="application/x-rtp, media=audio, encoding-name=OPUS, clock-rate=48000" ! rtpbin.recv_rtp_sink_0 \ udpsrc address=225.0.0.37 auto-multicast=true multicast-iface=lo port=60001 caps="application/x-rtcp" ! rtpbin.recv_rtcp_sink_0 \ -udpsrc address=225.0.0.37 auto-multicast=true multicast-iface=lo port=60002 caps="application/x-rtp, media=video, payload=96, encoding-name=VP8, clock-rate=90000" ! rtpbin.recv_rtp_sink_1 \ +udpsrc address=225.0.0.37 auto-multicast=true multicast-iface=lo port=60002 caps="application/x-rtp, media=video, encoding-name=VP8, clock-rate=90000" ! rtpbin.recv_rtp_sink_1 \ udpsrc address=225.0.0.37 auto-multicast=true multicast-iface=lo port=60003 caps="application/x-rtcp" ! rtpbin.recv_rtcp_sink_1 \ rtpbin. ! rtpvp8depay ! vp8dec ! autovideosink \ rtpbin. ! rtpopusdepay ! queue ! opusdec ! pulsesink @@ -120,9 +120,9 @@ The following GStreamer pipeline simply dumps the synchronized (by `rtpbin`) and gst-launch-1.0 -v -e \ matroskamux name=mux streamable=1 ! filesink location=/tmp/dump.mkv \ rtpbin name=rtpbin latency=100 \ -udpsrc address=225.0.0.37 auto-multicast=true multicast-iface=lo port=60000 caps="application/x-rtp, media=audio, payload=111, encoding-name=OPUS, clock-rate=48000" ! rtpbin.recv_rtp_sink_0 \ +udpsrc address=225.0.0.37 auto-multicast=true multicast-iface=lo port=60000 caps="application/x-rtp, media=audio, encoding-name=OPUS, clock-rate=48000" ! rtpbin.recv_rtp_sink_0 \ udpsrc address=225.0.0.37 auto-multicast=true multicast-iface=lo port=60001 caps="application/x-rtcp" ! rtpbin.recv_rtcp_sink_0 \ -udpsrc address=225.0.0.37 auto-multicast=true multicast-iface=lo port=60002 caps="application/x-rtp, media=video, payload=96, encoding-name=VP8, clock-rate=90000" ! rtpbin.recv_rtp_sink_1 \ +udpsrc address=225.0.0.37 auto-multicast=true multicast-iface=lo port=60002 caps="application/x-rtp, media=video, encoding-name=VP8, clock-rate=90000" ! rtpbin.recv_rtp_sink_1 \ udpsrc address=225.0.0.37 auto-multicast=true multicast-iface=lo port=60003 caps="application/x-rtcp" ! rtpbin.recv_rtcp_sink_1 \ rtpbin. ! rtpopusdepay ! mux.audio_0 \ rtpbin. ! rtpvp8depay ! mux.video_0 From 0df381e975f2672785287d757371ba34126bbe7b Mon Sep 17 00:00:00 2001 From: Michael Franzl Date: Tue, 24 Apr 2018 14:39:20 +0200 Subject: [PATCH 3/3] README update --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 593b39e..75c8e1a 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Janus rtpforward plugin -This plugin for the [Janus WebRTC gateway](https://github.com/meetecho/janus-gateway) (tested with revision dc81819 [2017-12-04]) takes RTP and RTCP packets from a WebRTC connection (Janus Session) and forwards/sends them to UDP ports for further processing or display by an external receiver/decoder (e.g. a GStreamer pipeline). +This plugin for the [Janus WebRTC gateway](https://github.com/meetecho/janus-gateway) (tested with 0.4.0, revision 673ac3fdc2c683c5 [2018-04-23]) takes RTP and RTCP packets from a WebRTC connection (Janus Session) and forwards/sends them to UDP ports for further processing or display by an external receiver/decoder (e.g. a GStreamer pipeline). Four destination UDP addresses/ports are used: