diff --git a/pjmedia/src/pjmedia/conference.c b/pjmedia/src/pjmedia/conference.c index 80bfa66c84..0e6277a219 100644 --- a/pjmedia/src/pjmedia/conference.c +++ b/pjmedia/src/pjmedia/conference.c @@ -340,34 +340,53 @@ static op_entry* get_free_op_entry(pjmedia_conf *conf) static void handle_op_queue(pjmedia_conf *conf) { - op_entry *op, *next_op; + /* The queue may grow while mutex is released, better put a limit? */ + enum { MAX_PROCESSED_OP = 100 }; + int i = 0; - op = conf->op_queue->next; - while (op != conf->op_queue) { - next_op = op->next; + while (i++ < MAX_PROCESSED_OP) { + op_entry *op; + op_type type; + op_param param; + + pj_mutex_lock(conf->mutex); + + /* Stop when queue empty */ + if (pj_list_empty(conf->op_queue)) { + pj_mutex_unlock(conf->mutex); + break; + } + + /* Copy op */ + op = conf->op_queue->next; + type = op->type; + param = op->param; + + /* Free op */ pj_list_erase(op); + op->type = OP_UNKNOWN; + pj_list_push_back(conf->op_queue_free, op); - switch(op->type) { + pj_mutex_unlock(conf->mutex); + + /* Process op */ + switch(type) { case OP_ADD_PORT: - op_add_port(conf, &op->param); + op_add_port(conf, ¶m); break; case OP_REMOVE_PORT: - op_remove_port(conf, &op->param); + op_remove_port(conf, ¶m); break; case OP_CONNECT_PORTS: - op_connect_ports(conf, &op->param); + op_connect_ports(conf, ¶m); break; case OP_DISCONNECT_PORTS: - op_disconnect_ports(conf, &op->param); + op_disconnect_ports(conf, ¶m); break; default: pj_assert(!"Invalid sync-op in conference"); break; } - - op->type = OP_UNKNOWN; - pj_list_push_back(conf->op_queue_free, op); - op = next_op; } } @@ -1722,7 +1741,10 @@ static void op_remove_port(pjmedia_conf *conf, const op_param *prm) } /* Remove the port. */ + pj_mutex_lock(conf->mutex); conf->ports[port] = NULL; + pj_mutex_unlock(conf->mutex); + if (!conf_port->is_new) --conf->port_cnt; @@ -2418,9 +2440,7 @@ static pj_status_t get_frame(pjmedia_port *this_port, */ if (!pj_list_empty(conf->op_queue)) { pj_log_push_indent(); - pj_mutex_lock(conf->mutex); handle_op_queue(conf); - pj_mutex_unlock(conf->mutex); pj_log_pop_indent(); } diff --git a/pjmedia/src/pjmedia/vid_conf.c b/pjmedia/src/pjmedia/vid_conf.c index 9634c39267..81ae52501c 100644 --- a/pjmedia/src/pjmedia/vid_conf.c +++ b/pjmedia/src/pjmedia/vid_conf.c @@ -202,37 +202,56 @@ static op_entry* get_free_op_entry(pjmedia_vid_conf *conf) static void handle_op_queue(pjmedia_vid_conf *conf) { - op_entry *op, *next_op; - - op = conf->op_queue->next; - while (op != conf->op_queue) { - next_op = op->next; + /* The queue may grow while mutex is released, better put a limit? */ + enum { MAX_PROCESSED_OP = 100 }; + int i = 0; + + while (i++ < MAX_PROCESSED_OP) { + op_entry* op; + op_type type; + op_param param; + + pj_mutex_lock(conf->mutex); + + /* Stop when queue empty */ + if (pj_list_empty(conf->op_queue)) { + pj_mutex_unlock(conf->mutex); + break; + } + + /* Copy op */ + op = conf->op_queue->next; + type = op->type; + param = op->param; + + /* Free op */ pj_list_erase(op); + op->type = OP_UNKNOWN; + pj_list_push_back(conf->op_queue_free, op); + + pj_mutex_unlock(conf->mutex); - switch(op->type) { + /* Process op */ + switch (type) { case OP_ADD_PORT: - op_add_port(conf, &op->param); + op_add_port(conf, ¶m); break; case OP_REMOVE_PORT: - op_remove_port(conf, &op->param); + op_remove_port(conf, ¶m); break; case OP_CONNECT_PORTS: - op_connect_ports(conf, &op->param); + op_connect_ports(conf, ¶m); break; case OP_DISCONNECT_PORTS: - op_disconnect_ports(conf, &op->param); + op_disconnect_ports(conf, ¶m); break; case OP_UPDATE_PORT: - op_update_port(conf, &op->param); + op_update_port(conf, ¶m); break; default: pj_assert(!"Invalid sync-op in video conference"); break; } - - op->type = OP_UNKNOWN; - pj_list_push_back(conf->op_queue_free, op); - op = next_op; } } @@ -708,7 +727,10 @@ static void op_remove_port(pjmedia_vid_conf *vid_conf, } /* Remove the port. */ + pj_mutex_lock(vid_conf->mutex); vid_conf->ports[slot] = NULL; + pj_mutex_unlock(vid_conf->mutex); + if (!cport->is_new) --vid_conf->port_cnt; @@ -1086,9 +1108,9 @@ static void on_clock_tick(const pj_timestamp *now, void *user_data) * the clock such as connect, disonnect, remove, update. */ if (!pj_list_empty(vid_conf->op_queue)) { - pj_mutex_lock(vid_conf->mutex); + pj_log_push_indent(); handle_op_queue(vid_conf); - pj_mutex_unlock(vid_conf->mutex); + pj_log_pop_indent(); } /* No mutex from this point! Otherwise it may cause deadlock as