diff --git a/include/zenoh-pico/api/primitives.h b/include/zenoh-pico/api/primitives.h index af4d45c21..9cce79d99 100644 --- a/include/zenoh-pico/api/primitives.h +++ b/include/zenoh-pico/api/primitives.h @@ -2077,12 +2077,11 @@ z_result_t z_declare_background_subscriber(const z_loaned_session_t *zs, const z const z_loaned_keyexpr_t *z_subscriber_keyexpr(const z_loaned_subscriber_t *subscriber); #endif -#ifdef Z_FEATURE_UNSTABLE_API #if Z_FEATURE_BATCHING == 1 /** * Activate the batching mechanism, any message that would have been sent on the network by a subsequent api call (e.g - * z_put, z_get) will be instead stored until the batch is full, flushed with :c:func:`zp_batch_flush` or batching is - * stopped with :c:func:`zp_batch_stop`. + * z_put, z_get) will be instead stored until either: the batch is full, flushed with :c:func:`zp_batch_flush`, batching + * is stopped with :c:func:`zp_batch_stop`, a message needs to be sent immediately. * * Parameters: * zs: Pointer to a :c:type:`z_loaned_session_t` that will start batching messages. @@ -2114,7 +2113,6 @@ z_result_t zp_batch_flush(const z_loaned_session_t *zs); */ z_result_t zp_batch_stop(const z_loaned_session_t *zs); #endif -#endif /************* Multi Thread Tasks helpers **************/ /** diff --git a/include/zenoh-pico/protocol/definitions/network.h b/include/zenoh-pico/protocol/definitions/network.h index 2af9d97ef..2bc7eb0ba 100644 --- a/include/zenoh-pico/protocol/definitions/network.h +++ b/include/zenoh-pico/protocol/definitions/network.h @@ -80,6 +80,8 @@ extern "C" { typedef _z_qos_t _z_n_qos_t; +#define _Z_N_QOS_IS_EXPRESS_FLAG (1 << 4) + static inline _z_qos_t _z_n_qos_create(bool express, z_congestion_control_t congestion_control, z_priority_t priority) { _z_n_qos_t ret; bool nodrop = congestion_control == Z_CONGESTION_CONTROL_DROP ? 0 : 1; diff --git a/src/api/api.c b/src/api/api.c index 334ccfee2..f6a209ac9 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -1490,7 +1490,6 @@ const z_loaned_keyexpr_t *z_subscriber_keyexpr(const z_loaned_subscriber_t *sub) } #endif -#ifdef Z_FEATURE_UNSTABLE_API #if Z_FEATURE_BATCHING == 1 z_result_t zp_batch_start(const z_loaned_session_t *zs) { if (_Z_RC_IS_NULL(zs)) { @@ -1519,7 +1518,6 @@ z_result_t zp_batch_stop(const z_loaned_session_t *zs) { return _z_send_n_batch(session, Z_CONGESTION_CONTROL_DEFAULT); } #endif -#endif /**************** Tasks ****************/ void zp_task_read_options_default(zp_task_read_options_t *options) { diff --git a/src/transport/common/tx.c b/src/transport/common/tx.c index 9479059f3..fc38fae5d 100644 --- a/src/transport/common/tx.c +++ b/src/transport/common/tx.c @@ -26,6 +26,20 @@ /*------------------ Transmission helper ------------------*/ +static bool _z_transport_tx_get_express_status(const _z_network_message_t *msg) { + switch (msg->_tag) { + case _Z_N_DECLARE: + return _Z_HAS_FLAG(msg->_body._declare._ext_qos._val, _Z_N_QOS_IS_EXPRESS_FLAG); + case _Z_N_PUSH: + return _Z_HAS_FLAG(msg->_body._push._qos._val, _Z_N_QOS_IS_EXPRESS_FLAG); + case _Z_N_REQUEST: + return _Z_HAS_FLAG(msg->_body._request._ext_qos._val, _Z_N_QOS_IS_EXPRESS_FLAG); + case _Z_N_RESPONSE: + return _Z_HAS_FLAG(msg->_body._response._ext_qos._val, _Z_N_QOS_IS_EXPRESS_FLAG); + default: + return false; + } +} static _z_zint_t _z_transport_tx_get_sn(_z_transport_common_t *ztc, z_reliability_t reliability) { _z_zint_t sn; if (reliability == Z_RELIABILITY_RELIABLE) { @@ -139,13 +153,19 @@ static z_result_t _z_transport_tx_batch_overflow(_z_transport_common_t *ztc, con _z_transport_message_t t_msg = _z_t_msg_make_frame_header(sn, reliability); _Z_RETURN_IF_ERR(_z_transport_message_encode(&ztc->_wbuf, &t_msg)); // Retry encode + bool is_express = _z_transport_tx_get_express_status(n_msg); z_result_t ret = _z_network_message_encode(&ztc->_wbuf, n_msg); if (ret != _Z_RES_OK) { // Message still doesn't fit in buffer, send as fragments return _z_transport_tx_send_fragment(ztc, n_msg, reliability, sn); } else { - // Increment batch - ztc->_batch_count++; + if (is_express) { + // Send immediately + return _z_transport_tx_flush_buffer(ztc); + } else { + // Increment batch + ztc->_batch_count++; + } } return _Z_RES_OK; #else @@ -180,10 +200,16 @@ static z_result_t _z_transport_tx_send_n_msg_inner(_z_transport_common_t *ztc, c } // Try encoding the network message size_t prev_wpos = _z_transport_tx_save_wpos(&ztc->_wbuf); + bool is_express = _z_transport_tx_get_express_status(n_msg); z_result_t ret = _z_network_message_encode(&ztc->_wbuf, n_msg); if (ret == _Z_RES_OK) { - // Flush buffer or increase batch - return _z_transport_tx_flush_or_incr_batch(ztc); + if (is_express) { + // Send immediately + return _z_transport_tx_flush_buffer(ztc); + } else { + // Flush buffer or increase batch + return _z_transport_tx_flush_or_incr_batch(ztc); + } } else if (!batch_has_data) { // Message doesn't fit in buffer, send as fragments return _z_transport_tx_send_fragment(ztc, n_msg, reliability, sn); @@ -193,22 +219,26 @@ static z_result_t _z_transport_tx_send_n_msg_inner(_z_transport_common_t *ztc, c } } +static z_result_t _z_transport_tx_send_t_msg_inner(_z_transport_common_t *ztc, const _z_transport_message_t *t_msg) { + // Send batch if needed + bool batch_has_data = _z_transport_tx_batch_has_data(ztc); + if (batch_has_data) { + _Z_RETURN_IF_ERR(_z_transport_tx_flush_buffer(ztc)); + } + // Encode transport message + __unsafe_z_prepare_wbuf(&ztc->_wbuf, ztc->_link._cap._flow); + _Z_RETURN_IF_ERR(_z_transport_message_encode(&ztc->_wbuf, t_msg)); + // Send message + return _z_transport_tx_flush_buffer(ztc); +} + z_result_t _z_transport_tx_send_t_msg(_z_transport_common_t *ztc, const _z_transport_message_t *t_msg) { z_result_t ret = _Z_RES_OK; _Z_DEBUG("Send session message"); _z_transport_tx_mutex_lock(ztc, true); - // Encode transport message - __unsafe_z_prepare_wbuf(&ztc->_wbuf, ztc->_link._cap._flow); - ret = _z_transport_message_encode(&ztc->_wbuf, t_msg); - if (ret == _Z_RES_OK) { - // Send message - __unsafe_z_finalize_wbuf(&ztc->_wbuf, ztc->_link._cap._flow); - ret = _z_link_send_wbuf(&ztc->_link, &ztc->_wbuf); - if (ret == _Z_RES_OK) { - ztc->_transmitted = true; // Tell session we transmitted data - } - } + ret = _z_transport_tx_send_t_msg_inner(ztc, t_msg); + _z_transport_tx_mutex_unlock(ztc); return ret; }