From dc20d6c3aad85afa6119e2bfbf833db9b3ca2153 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Mon, 6 Jan 2025 16:13:32 +0100 Subject: [PATCH 1/8] feat: return is express val when encoding network message --- include/zenoh-pico/protocol/codec/network.h | 10 +++---- .../zenoh-pico/protocol/definitions/network.h | 2 ++ src/protocol/codec/network.c | 30 +++++++++++++------ src/protocol/codec/transport.c | 2 +- src/transport/raweth/tx.c | 5 ++-- tests/z_msgcodec_test.c | 8 ++--- 6 files changed, 36 insertions(+), 21 deletions(-) diff --git a/include/zenoh-pico/protocol/codec/network.h b/include/zenoh-pico/protocol/codec/network.h index 661ee9de2..9569cc36f 100644 --- a/include/zenoh-pico/protocol/codec/network.h +++ b/include/zenoh-pico/protocol/codec/network.h @@ -24,20 +24,20 @@ extern "C" { #endif -z_result_t _z_push_encode(_z_wbuf_t *wbf, const _z_n_msg_push_t *msg); +z_result_t _z_push_encode(_z_wbuf_t *wbf, const _z_n_msg_push_t *msg, bool *is_express); z_result_t _z_push_decode(_z_n_msg_push_t *msg, _z_zbuf_t *zbf, uint8_t header, _z_arc_slice_t *arcs); -z_result_t _z_request_encode(_z_wbuf_t *wbf, const _z_n_msg_request_t *msg); +z_result_t _z_request_encode(_z_wbuf_t *wbf, const _z_n_msg_request_t *msg, bool *is_express); z_result_t _z_request_decode(_z_n_msg_request_t *msg, _z_zbuf_t *zbf, uint8_t header, _z_arc_slice_t *arcs); -z_result_t _z_response_encode(_z_wbuf_t *wbf, const _z_n_msg_response_t *msg); +z_result_t _z_response_encode(_z_wbuf_t *wbf, const _z_n_msg_response_t *msg, bool *is_express); z_result_t _z_response_decode(_z_n_msg_response_t *msg, _z_zbuf_t *zbf, uint8_t header, _z_arc_slice_t *arcs); z_result_t _z_response_final_encode(_z_wbuf_t *wbf, const _z_n_msg_response_final_t *msg); z_result_t _z_response_final_decode(_z_n_msg_response_final_t *msg, _z_zbuf_t *zbf, uint8_t header); -z_result_t _z_declare_encode(_z_wbuf_t *wbf, const _z_n_msg_declare_t *decl); +z_result_t _z_declare_encode(_z_wbuf_t *wbf, const _z_n_msg_declare_t *decl, bool *is_express); z_result_t _z_declare_decode(_z_n_msg_declare_t *decl, _z_zbuf_t *zbf, uint8_t header); z_result_t _z_n_interest_encode(_z_wbuf_t *wbf, const _z_n_msg_interest_t *interest); z_result_t _z_n_interest_decode(_z_n_msg_interest_t *interest, _z_zbuf_t *zbf, uint8_t header); -z_result_t _z_network_message_encode(_z_wbuf_t *wbf, const _z_network_message_t *msg); +z_result_t _z_network_message_encode(_z_wbuf_t *wbf, const _z_network_message_t *msg, bool *is_express); z_result_t _z_network_message_decode(_z_network_message_t *msg, _z_zbuf_t *zbf, _z_arc_slice_t *arcs); #ifdef __cplusplus diff --git a/include/zenoh-pico/protocol/definitions/network.h b/include/zenoh-pico/protocol/definitions/network.h index 2af9d97ef..2bc7eb0ba 100644 --- a/include/zenoh-pico/protocol/definitions/network.h +++ b/include/zenoh-pico/protocol/definitions/network.h @@ -80,6 +80,8 @@ extern "C" { typedef _z_qos_t _z_n_qos_t; +#define _Z_N_QOS_IS_EXPRESS_FLAG (1 << 4) + static inline _z_qos_t _z_n_qos_create(bool express, z_congestion_control_t congestion_control, z_priority_t priority) { _z_n_qos_t ret; bool nodrop = congestion_control == Z_CONGESTION_CONTROL_DROP ? 0 : 1; diff --git a/src/protocol/codec/network.c b/src/protocol/codec/network.c index 44cf9cde9..93d829449 100644 --- a/src/protocol/codec/network.c +++ b/src/protocol/codec/network.c @@ -38,11 +38,14 @@ /*------------------ Push Message ------------------*/ -z_result_t _z_push_encode(_z_wbuf_t *wbf, const _z_n_msg_push_t *msg) { +z_result_t _z_push_encode(_z_wbuf_t *wbf, const _z_n_msg_push_t *msg, bool *is_express) { uint8_t header = _Z_MID_N_PUSH | (_z_keyexpr_is_local(&msg->_key) ? _Z_FLAG_N_REQUEST_M : 0); bool has_suffix = _z_keyexpr_has_suffix(&msg->_key); bool has_qos_ext = msg->_qos._val != _Z_N_QOS_DEFAULT._val; bool has_timestamp_ext = _z_timestamp_check(&msg->_timestamp); + if (is_express != NULL) { + *is_express = _Z_HAS_FLAG(msg->_qos._val, _Z_N_QOS_IS_EXPRESS_FLAG); + } if (has_suffix) { header |= _Z_FLAG_N_REQUEST_N; } @@ -111,7 +114,7 @@ z_result_t _z_push_decode(_z_n_msg_push_t *msg, _z_zbuf_t *zbf, uint8_t header, } /*------------------ Request Message ------------------*/ -z_result_t _z_request_encode(_z_wbuf_t *wbf, const _z_n_msg_request_t *msg) { +z_result_t _z_request_encode(_z_wbuf_t *wbf, const _z_n_msg_request_t *msg, bool *is_express) { z_result_t ret = _Z_RES_OK; uint8_t header = _Z_MID_N_REQUEST | (_z_keyexpr_is_local(&msg->_key) ? _Z_FLAG_N_REQUEST_M : 0); bool has_suffix = _z_keyexpr_has_suffix(&msg->_key); @@ -128,6 +131,9 @@ z_result_t _z_request_encode(_z_wbuf_t *wbf, const _z_n_msg_request_t *msg) { uint8_t extheader = 0x01 | _Z_MSG_EXT_ENC_ZINT | (exts.n ? _Z_FLAG_Z_Z : 0); _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, extheader)); _Z_RETURN_IF_ERR(_z_zsize_encode(wbf, msg->_ext_qos._val)); + if (is_express != NULL) { + *is_express = _Z_HAS_FLAG(msg->_ext_qos._val, _Z_N_QOS_IS_EXPRESS_FLAG); + } } if (exts.ext_tstamp) { exts.n -= 1; @@ -238,7 +244,7 @@ z_result_t _z_request_decode(_z_n_msg_request_t *msg, _z_zbuf_t *zbf, const uint } /*------------------ Response Message ------------------*/ -z_result_t _z_response_encode(_z_wbuf_t *wbf, const _z_n_msg_response_t *msg) { +z_result_t _z_response_encode(_z_wbuf_t *wbf, const _z_n_msg_response_t *msg, bool *is_express) { z_result_t ret = _Z_RES_OK; uint8_t header = _Z_MID_N_RESPONSE; _Z_DEBUG("Encoding _Z_MID_N_RESPONSE"); @@ -247,6 +253,9 @@ z_result_t _z_response_encode(_z_wbuf_t *wbf, const _z_n_msg_response_t *msg) { bool has_responder_ext = _z_id_check(msg->_ext_responder._zid) || msg->_ext_responder._eid != 0; int n_ext = (has_qos_ext ? 1 : 0) + (has_ts_ext ? 1 : 0) + (has_responder_ext ? 1 : 0); bool has_suffix = _z_keyexpr_has_suffix(&msg->_key); + if (is_express != NULL) { + *is_express = _Z_HAS_FLAG(msg->_ext_qos._val, _Z_N_QOS_IS_EXPRESS_FLAG); + } if (_z_keyexpr_is_local(&msg->_key)) { _Z_SET_FLAG(header, _Z_FLAG_N_RESPONSE_M); } @@ -389,10 +398,13 @@ z_result_t _z_response_final_decode(_z_n_msg_response_final_t *msg, _z_zbuf_t *z return ret; } -z_result_t _z_declare_encode(_z_wbuf_t *wbf, const _z_n_msg_declare_t *decl) { +z_result_t _z_declare_encode(_z_wbuf_t *wbf, const _z_n_msg_declare_t *decl, bool *is_express) { uint8_t header = _Z_MID_N_DECLARE; bool has_qos_ext = decl->_ext_qos._val != _Z_N_QOS_DEFAULT._val; bool has_timestamp_ext = _z_timestamp_check(&decl->_ext_timestamp); + if (is_express != NULL) { + *is_express = _Z_HAS_FLAG(decl->_ext_qos._val, _Z_N_QOS_IS_EXPRESS_FLAG); + } int n_ext = (has_qos_ext ? 1 : 0) + (has_timestamp_ext ? 1 : 0); if (n_ext != 0) { header |= _Z_FLAG_N_Z; @@ -489,19 +501,19 @@ z_result_t _z_n_interest_decode(_z_n_msg_interest_t *interest, _z_zbuf_t *zbf, u return _z_interest_decode(&interest->_interest, zbf, is_final, has_ext); } -z_result_t _z_network_message_encode(_z_wbuf_t *wbf, const _z_network_message_t *msg) { +z_result_t _z_network_message_encode(_z_wbuf_t *wbf, const _z_network_message_t *msg, bool *is_express) { switch (msg->_tag) { case _Z_N_DECLARE: { - return _z_declare_encode(wbf, &msg->_body._declare); + return _z_declare_encode(wbf, &msg->_body._declare, is_express); } break; case _Z_N_PUSH: { - return _z_push_encode(wbf, &msg->_body._push); + return _z_push_encode(wbf, &msg->_body._push, is_express); } break; case _Z_N_REQUEST: { - return _z_request_encode(wbf, &msg->_body._request); + return _z_request_encode(wbf, &msg->_body._request, is_express); } break; case _Z_N_RESPONSE: { - return _z_response_encode(wbf, &msg->_body._response); + return _z_response_encode(wbf, &msg->_body._response, is_express); } break; case _Z_N_RESPONSE_FINAL: { return _z_response_final_encode(wbf, &msg->_body._response_final); diff --git a/src/protocol/codec/transport.c b/src/protocol/codec/transport.c index 894fb41d8..3bc17d24b 100644 --- a/src/protocol/codec/transport.c +++ b/src/protocol/codec/transport.c @@ -388,7 +388,7 @@ z_result_t _z_frame_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_frame_ if (ret == _Z_RES_OK) { size_t len = _z_network_message_svec_len(&msg->_messages); for (size_t i = 0; i < len; i++) { - _Z_RETURN_IF_ERR(_z_network_message_encode(wbf, _z_network_message_svec_get(&msg->_messages, i))) + _Z_RETURN_IF_ERR(_z_network_message_encode(wbf, _z_network_message_svec_get(&msg->_messages, i), NULL)) } } diff --git a/src/transport/raweth/tx.c b/src/transport/raweth/tx.c index e8f9be4c4..1e61d6909 100644 --- a/src/transport/raweth/tx.c +++ b/src/transport/raweth/tx.c @@ -252,7 +252,7 @@ z_result_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_ _Z_CLEAN_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_common._wbuf, &t_msg), _z_transport_tx_mutex_unlock(&ztm->_common)); // Encode the network message - if (_z_network_message_encode(&ztm->_common._wbuf, n_msg) == _Z_RES_OK) { + if (_z_network_message_encode(&ztm->_common._wbuf, n_msg, NULL) == _Z_RES_OK) { // Write the eth header _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_common._link, &ztm->_common._wbuf), _z_transport_tx_mutex_unlock(&ztm->_common)); @@ -266,7 +266,8 @@ z_result_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_ // Create an expandable wbuf for fragmentation _z_wbuf_t fbf = _z_wbuf_make(_Z_FRAG_BUFF_BASE_SIZE, true); // Encode the message on the expandable wbuf - _Z_CLEAN_RETURN_IF_ERR(_z_network_message_encode(&fbf, n_msg), _z_transport_tx_mutex_unlock(&ztm->_common)); + _Z_CLEAN_RETURN_IF_ERR(_z_network_message_encode(&fbf, n_msg), _z_transport_tx_mutex_unlock(&ztm->_common), + NULL); // Fragment and send the message bool is_first = true; while (_z_wbuf_len(&fbf) > 0) { diff --git a/tests/z_msgcodec_test.c b/tests/z_msgcodec_test.c index a87c258e4..b1c288a2f 100644 --- a/tests/z_msgcodec_test.c +++ b/tests/z_msgcodec_test.c @@ -1163,7 +1163,7 @@ void declare_message(void) { _z_network_message_t n_msg = gen_declare_message(); // Encode - z_result_t res = _z_network_message_encode(&wbf, &n_msg); + z_result_t res = _z_network_message_encode(&wbf, &n_msg, NULL); assert(res == _Z_RES_OK); (void)(res); @@ -1433,7 +1433,7 @@ void push_message(void) { printf("\n>> Push message\n"); _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); _z_n_msg_push_t expected = gen_push(); - assert(_z_push_encode(&wbf, &expected) == _Z_RES_OK); + assert(_z_push_encode(&wbf, &expected, NULL) == _Z_RES_OK); _z_n_msg_push_t decoded = {0}; _z_arc_slice_t arcs = {0}; _z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf); @@ -1505,7 +1505,7 @@ void request_message(void) { printf("\n>> Request message\n"); _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); _z_n_msg_request_t expected = gen_request(); - assert(_z_request_encode(&wbf, &expected) == _Z_RES_OK); + assert(_z_request_encode(&wbf, &expected, NULL) == _Z_RES_OK); _z_n_msg_request_t decoded = {0}; _z_arc_slice_t arcs = {0}; _z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf); @@ -1564,7 +1564,7 @@ void response_message(void) { printf("\n>> Response message\n"); _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); _z_n_msg_response_t expected = gen_response(); - assert(_z_response_encode(&wbf, &expected) == _Z_RES_OK); + assert(_z_response_encode(&wbf, &expected, NULL) == _Z_RES_OK); _z_n_msg_response_t decoded = {0}; _z_arc_slice_t arcs = {0}; _z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf); From b8c525c25b33bf1f03bde988aeb5d110d8b25da3 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Mon, 6 Jan 2025 16:14:30 +0100 Subject: [PATCH 2/8] feat: send batch immediately in some cases --- src/transport/common/tx.c | 52 +++++++++++++++++++++++++-------------- 1 file changed, 34 insertions(+), 18 deletions(-) diff --git a/src/transport/common/tx.c b/src/transport/common/tx.c index 9479059f3..aef42a8e5 100644 --- a/src/transport/common/tx.c +++ b/src/transport/common/tx.c @@ -45,7 +45,7 @@ static z_result_t _z_transport_tx_send_fragment_inner(_z_transport_common_t *ztc bool is_first = true; _z_zint_t sn = first_sn; // Encode message on temp buffer - _Z_RETURN_IF_ERR(_z_network_message_encode(frag_buff, n_msg)); + _Z_RETURN_IF_ERR(_z_network_message_encode(frag_buff, n_msg, NULL)); // Fragment message while (_z_wbuf_len(frag_buff) > 0) { // Get fragment sequence number @@ -139,13 +139,19 @@ static z_result_t _z_transport_tx_batch_overflow(_z_transport_common_t *ztc, con _z_transport_message_t t_msg = _z_t_msg_make_frame_header(sn, reliability); _Z_RETURN_IF_ERR(_z_transport_message_encode(&ztc->_wbuf, &t_msg)); // Retry encode - z_result_t ret = _z_network_message_encode(&ztc->_wbuf, n_msg); + bool is_express = false; + z_result_t ret = _z_network_message_encode(&ztc->_wbuf, n_msg, &is_express); if (ret != _Z_RES_OK) { // Message still doesn't fit in buffer, send as fragments return _z_transport_tx_send_fragment(ztc, n_msg, reliability, sn); } else { - // Increment batch - ztc->_batch_count++; + if (is_express) { + // Send immediately + return _z_transport_tx_flush_buffer(ztc); + } else { + // Increment batch + ztc->_batch_count++; + } } return _Z_RES_OK; #else @@ -180,10 +186,16 @@ static z_result_t _z_transport_tx_send_n_msg_inner(_z_transport_common_t *ztc, c } // Try encoding the network message size_t prev_wpos = _z_transport_tx_save_wpos(&ztc->_wbuf); - z_result_t ret = _z_network_message_encode(&ztc->_wbuf, n_msg); + bool is_express = false; + z_result_t ret = _z_network_message_encode(&ztc->_wbuf, n_msg, &is_express); if (ret == _Z_RES_OK) { - // Flush buffer or increase batch - return _z_transport_tx_flush_or_incr_batch(ztc); + if (is_express) { + // Send immediately + return _z_transport_tx_flush_buffer(ztc); + } else { + // Flush buffer or increase batch + return _z_transport_tx_flush_or_incr_batch(ztc); + } } else if (!batch_has_data) { // Message doesn't fit in buffer, send as fragments return _z_transport_tx_send_fragment(ztc, n_msg, reliability, sn); @@ -193,22 +205,26 @@ static z_result_t _z_transport_tx_send_n_msg_inner(_z_transport_common_t *ztc, c } } +static z_result_t _z_transport_tx_send_t_msg_inner(_z_transport_common_t *ztc, const _z_transport_message_t *t_msg) { + // Send batch if needed + bool batch_has_data = _z_transport_tx_batch_has_data(ztc); + if (batch_has_data) { + _Z_RETURN_IF_ERR(_z_transport_tx_flush_buffer(ztc)); + } + // Encode transport message + __unsafe_z_prepare_wbuf(&ztc->_wbuf, ztc->_link._cap._flow); + _Z_RETURN_IF_ERR(_z_transport_message_encode(&ztc->_wbuf, t_msg)); + // Send message + return _z_transport_tx_flush_buffer(ztc); +} + z_result_t _z_transport_tx_send_t_msg(_z_transport_common_t *ztc, const _z_transport_message_t *t_msg) { z_result_t ret = _Z_RES_OK; _Z_DEBUG("Send session message"); _z_transport_tx_mutex_lock(ztc, true); - // Encode transport message - __unsafe_z_prepare_wbuf(&ztc->_wbuf, ztc->_link._cap._flow); - ret = _z_transport_message_encode(&ztc->_wbuf, t_msg); - if (ret == _Z_RES_OK) { - // Send message - __unsafe_z_finalize_wbuf(&ztc->_wbuf, ztc->_link._cap._flow); - ret = _z_link_send_wbuf(&ztc->_link, &ztc->_wbuf); - if (ret == _Z_RES_OK) { - ztc->_transmitted = true; // Tell session we transmitted data - } - } + ret = _z_transport_tx_send_t_msg_inner(ztc, t_msg); + _z_transport_tx_mutex_unlock(ztc); return ret; } From 0815ef308ac5efa5e32f09db322d776eafe6b45e Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Mon, 6 Jan 2025 16:14:52 +0100 Subject: [PATCH 3/8] feat: remove batch api from unstable --- include/zenoh-pico/api/primitives.h | 2 -- src/api/api.c | 2 -- 2 files changed, 4 deletions(-) diff --git a/include/zenoh-pico/api/primitives.h b/include/zenoh-pico/api/primitives.h index af4d45c21..5fa001a46 100644 --- a/include/zenoh-pico/api/primitives.h +++ b/include/zenoh-pico/api/primitives.h @@ -2077,7 +2077,6 @@ z_result_t z_declare_background_subscriber(const z_loaned_session_t *zs, const z const z_loaned_keyexpr_t *z_subscriber_keyexpr(const z_loaned_subscriber_t *subscriber); #endif -#ifdef Z_FEATURE_UNSTABLE_API #if Z_FEATURE_BATCHING == 1 /** * Activate the batching mechanism, any message that would have been sent on the network by a subsequent api call (e.g @@ -2114,7 +2113,6 @@ z_result_t zp_batch_flush(const z_loaned_session_t *zs); */ z_result_t zp_batch_stop(const z_loaned_session_t *zs); #endif -#endif /************* Multi Thread Tasks helpers **************/ /** diff --git a/src/api/api.c b/src/api/api.c index 334ccfee2..f6a209ac9 100644 --- a/src/api/api.c +++ b/src/api/api.c @@ -1490,7 +1490,6 @@ const z_loaned_keyexpr_t *z_subscriber_keyexpr(const z_loaned_subscriber_t *sub) } #endif -#ifdef Z_FEATURE_UNSTABLE_API #if Z_FEATURE_BATCHING == 1 z_result_t zp_batch_start(const z_loaned_session_t *zs) { if (_Z_RC_IS_NULL(zs)) { @@ -1519,7 +1518,6 @@ z_result_t zp_batch_stop(const z_loaned_session_t *zs) { return _z_send_n_batch(session, Z_CONGESTION_CONTROL_DEFAULT); } #endif -#endif /**************** Tasks ****************/ void zp_task_read_options_default(zp_task_read_options_t *options) { From 0bb4cac0b959c61a9efc08d1b9cf2b80215b55f5 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Mon, 6 Jan 2025 16:33:37 +0100 Subject: [PATCH 4/8] fix: misplaced raweth argument --- src/transport/raweth/tx.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/transport/raweth/tx.c b/src/transport/raweth/tx.c index 1e61d6909..9116930c5 100644 --- a/src/transport/raweth/tx.c +++ b/src/transport/raweth/tx.c @@ -266,8 +266,8 @@ z_result_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_ // Create an expandable wbuf for fragmentation _z_wbuf_t fbf = _z_wbuf_make(_Z_FRAG_BUFF_BASE_SIZE, true); // Encode the message on the expandable wbuf - _Z_CLEAN_RETURN_IF_ERR(_z_network_message_encode(&fbf, n_msg), _z_transport_tx_mutex_unlock(&ztm->_common), - NULL); + _Z_CLEAN_RETURN_IF_ERR(_z_network_message_encode(&fbf, n_msg, NULL), + _z_transport_tx_mutex_unlock(&ztm->_common)); // Fragment and send the message bool is_first = true; while (_z_wbuf_len(&fbf) > 0) { From fb34c35659d8f7cc3b78f748e2efd5cffa89cefb Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Tue, 7 Jan 2025 10:11:37 +0100 Subject: [PATCH 5/8] Revert "feat: return is express val when encoding network message" This reverts commit dc20d6c3aad85afa6119e2bfbf833db9b3ca2153. --- include/zenoh-pico/protocol/codec/network.h | 10 +++---- .../zenoh-pico/protocol/definitions/network.h | 2 -- src/protocol/codec/network.c | 30 ++++++------------- src/protocol/codec/transport.c | 2 +- src/transport/raweth/tx.c | 5 ++-- tests/z_msgcodec_test.c | 8 ++--- 6 files changed, 21 insertions(+), 36 deletions(-) diff --git a/include/zenoh-pico/protocol/codec/network.h b/include/zenoh-pico/protocol/codec/network.h index 9569cc36f..661ee9de2 100644 --- a/include/zenoh-pico/protocol/codec/network.h +++ b/include/zenoh-pico/protocol/codec/network.h @@ -24,20 +24,20 @@ extern "C" { #endif -z_result_t _z_push_encode(_z_wbuf_t *wbf, const _z_n_msg_push_t *msg, bool *is_express); +z_result_t _z_push_encode(_z_wbuf_t *wbf, const _z_n_msg_push_t *msg); z_result_t _z_push_decode(_z_n_msg_push_t *msg, _z_zbuf_t *zbf, uint8_t header, _z_arc_slice_t *arcs); -z_result_t _z_request_encode(_z_wbuf_t *wbf, const _z_n_msg_request_t *msg, bool *is_express); +z_result_t _z_request_encode(_z_wbuf_t *wbf, const _z_n_msg_request_t *msg); z_result_t _z_request_decode(_z_n_msg_request_t *msg, _z_zbuf_t *zbf, uint8_t header, _z_arc_slice_t *arcs); -z_result_t _z_response_encode(_z_wbuf_t *wbf, const _z_n_msg_response_t *msg, bool *is_express); +z_result_t _z_response_encode(_z_wbuf_t *wbf, const _z_n_msg_response_t *msg); z_result_t _z_response_decode(_z_n_msg_response_t *msg, _z_zbuf_t *zbf, uint8_t header, _z_arc_slice_t *arcs); z_result_t _z_response_final_encode(_z_wbuf_t *wbf, const _z_n_msg_response_final_t *msg); z_result_t _z_response_final_decode(_z_n_msg_response_final_t *msg, _z_zbuf_t *zbf, uint8_t header); -z_result_t _z_declare_encode(_z_wbuf_t *wbf, const _z_n_msg_declare_t *decl, bool *is_express); +z_result_t _z_declare_encode(_z_wbuf_t *wbf, const _z_n_msg_declare_t *decl); z_result_t _z_declare_decode(_z_n_msg_declare_t *decl, _z_zbuf_t *zbf, uint8_t header); z_result_t _z_n_interest_encode(_z_wbuf_t *wbf, const _z_n_msg_interest_t *interest); z_result_t _z_n_interest_decode(_z_n_msg_interest_t *interest, _z_zbuf_t *zbf, uint8_t header); -z_result_t _z_network_message_encode(_z_wbuf_t *wbf, const _z_network_message_t *msg, bool *is_express); +z_result_t _z_network_message_encode(_z_wbuf_t *wbf, const _z_network_message_t *msg); z_result_t _z_network_message_decode(_z_network_message_t *msg, _z_zbuf_t *zbf, _z_arc_slice_t *arcs); #ifdef __cplusplus diff --git a/include/zenoh-pico/protocol/definitions/network.h b/include/zenoh-pico/protocol/definitions/network.h index 2bc7eb0ba..2af9d97ef 100644 --- a/include/zenoh-pico/protocol/definitions/network.h +++ b/include/zenoh-pico/protocol/definitions/network.h @@ -80,8 +80,6 @@ extern "C" { typedef _z_qos_t _z_n_qos_t; -#define _Z_N_QOS_IS_EXPRESS_FLAG (1 << 4) - static inline _z_qos_t _z_n_qos_create(bool express, z_congestion_control_t congestion_control, z_priority_t priority) { _z_n_qos_t ret; bool nodrop = congestion_control == Z_CONGESTION_CONTROL_DROP ? 0 : 1; diff --git a/src/protocol/codec/network.c b/src/protocol/codec/network.c index 93d829449..44cf9cde9 100644 --- a/src/protocol/codec/network.c +++ b/src/protocol/codec/network.c @@ -38,14 +38,11 @@ /*------------------ Push Message ------------------*/ -z_result_t _z_push_encode(_z_wbuf_t *wbf, const _z_n_msg_push_t *msg, bool *is_express) { +z_result_t _z_push_encode(_z_wbuf_t *wbf, const _z_n_msg_push_t *msg) { uint8_t header = _Z_MID_N_PUSH | (_z_keyexpr_is_local(&msg->_key) ? _Z_FLAG_N_REQUEST_M : 0); bool has_suffix = _z_keyexpr_has_suffix(&msg->_key); bool has_qos_ext = msg->_qos._val != _Z_N_QOS_DEFAULT._val; bool has_timestamp_ext = _z_timestamp_check(&msg->_timestamp); - if (is_express != NULL) { - *is_express = _Z_HAS_FLAG(msg->_qos._val, _Z_N_QOS_IS_EXPRESS_FLAG); - } if (has_suffix) { header |= _Z_FLAG_N_REQUEST_N; } @@ -114,7 +111,7 @@ z_result_t _z_push_decode(_z_n_msg_push_t *msg, _z_zbuf_t *zbf, uint8_t header, } /*------------------ Request Message ------------------*/ -z_result_t _z_request_encode(_z_wbuf_t *wbf, const _z_n_msg_request_t *msg, bool *is_express) { +z_result_t _z_request_encode(_z_wbuf_t *wbf, const _z_n_msg_request_t *msg) { z_result_t ret = _Z_RES_OK; uint8_t header = _Z_MID_N_REQUEST | (_z_keyexpr_is_local(&msg->_key) ? _Z_FLAG_N_REQUEST_M : 0); bool has_suffix = _z_keyexpr_has_suffix(&msg->_key); @@ -131,9 +128,6 @@ z_result_t _z_request_encode(_z_wbuf_t *wbf, const _z_n_msg_request_t *msg, bool uint8_t extheader = 0x01 | _Z_MSG_EXT_ENC_ZINT | (exts.n ? _Z_FLAG_Z_Z : 0); _Z_RETURN_IF_ERR(_z_uint8_encode(wbf, extheader)); _Z_RETURN_IF_ERR(_z_zsize_encode(wbf, msg->_ext_qos._val)); - if (is_express != NULL) { - *is_express = _Z_HAS_FLAG(msg->_ext_qos._val, _Z_N_QOS_IS_EXPRESS_FLAG); - } } if (exts.ext_tstamp) { exts.n -= 1; @@ -244,7 +238,7 @@ z_result_t _z_request_decode(_z_n_msg_request_t *msg, _z_zbuf_t *zbf, const uint } /*------------------ Response Message ------------------*/ -z_result_t _z_response_encode(_z_wbuf_t *wbf, const _z_n_msg_response_t *msg, bool *is_express) { +z_result_t _z_response_encode(_z_wbuf_t *wbf, const _z_n_msg_response_t *msg) { z_result_t ret = _Z_RES_OK; uint8_t header = _Z_MID_N_RESPONSE; _Z_DEBUG("Encoding _Z_MID_N_RESPONSE"); @@ -253,9 +247,6 @@ z_result_t _z_response_encode(_z_wbuf_t *wbf, const _z_n_msg_response_t *msg, bo bool has_responder_ext = _z_id_check(msg->_ext_responder._zid) || msg->_ext_responder._eid != 0; int n_ext = (has_qos_ext ? 1 : 0) + (has_ts_ext ? 1 : 0) + (has_responder_ext ? 1 : 0); bool has_suffix = _z_keyexpr_has_suffix(&msg->_key); - if (is_express != NULL) { - *is_express = _Z_HAS_FLAG(msg->_ext_qos._val, _Z_N_QOS_IS_EXPRESS_FLAG); - } if (_z_keyexpr_is_local(&msg->_key)) { _Z_SET_FLAG(header, _Z_FLAG_N_RESPONSE_M); } @@ -398,13 +389,10 @@ z_result_t _z_response_final_decode(_z_n_msg_response_final_t *msg, _z_zbuf_t *z return ret; } -z_result_t _z_declare_encode(_z_wbuf_t *wbf, const _z_n_msg_declare_t *decl, bool *is_express) { +z_result_t _z_declare_encode(_z_wbuf_t *wbf, const _z_n_msg_declare_t *decl) { uint8_t header = _Z_MID_N_DECLARE; bool has_qos_ext = decl->_ext_qos._val != _Z_N_QOS_DEFAULT._val; bool has_timestamp_ext = _z_timestamp_check(&decl->_ext_timestamp); - if (is_express != NULL) { - *is_express = _Z_HAS_FLAG(decl->_ext_qos._val, _Z_N_QOS_IS_EXPRESS_FLAG); - } int n_ext = (has_qos_ext ? 1 : 0) + (has_timestamp_ext ? 1 : 0); if (n_ext != 0) { header |= _Z_FLAG_N_Z; @@ -501,19 +489,19 @@ z_result_t _z_n_interest_decode(_z_n_msg_interest_t *interest, _z_zbuf_t *zbf, u return _z_interest_decode(&interest->_interest, zbf, is_final, has_ext); } -z_result_t _z_network_message_encode(_z_wbuf_t *wbf, const _z_network_message_t *msg, bool *is_express) { +z_result_t _z_network_message_encode(_z_wbuf_t *wbf, const _z_network_message_t *msg) { switch (msg->_tag) { case _Z_N_DECLARE: { - return _z_declare_encode(wbf, &msg->_body._declare, is_express); + return _z_declare_encode(wbf, &msg->_body._declare); } break; case _Z_N_PUSH: { - return _z_push_encode(wbf, &msg->_body._push, is_express); + return _z_push_encode(wbf, &msg->_body._push); } break; case _Z_N_REQUEST: { - return _z_request_encode(wbf, &msg->_body._request, is_express); + return _z_request_encode(wbf, &msg->_body._request); } break; case _Z_N_RESPONSE: { - return _z_response_encode(wbf, &msg->_body._response, is_express); + return _z_response_encode(wbf, &msg->_body._response); } break; case _Z_N_RESPONSE_FINAL: { return _z_response_final_encode(wbf, &msg->_body._response_final); diff --git a/src/protocol/codec/transport.c b/src/protocol/codec/transport.c index 3bc17d24b..894fb41d8 100644 --- a/src/protocol/codec/transport.c +++ b/src/protocol/codec/transport.c @@ -388,7 +388,7 @@ z_result_t _z_frame_encode(_z_wbuf_t *wbf, uint8_t header, const _z_t_msg_frame_ if (ret == _Z_RES_OK) { size_t len = _z_network_message_svec_len(&msg->_messages); for (size_t i = 0; i < len; i++) { - _Z_RETURN_IF_ERR(_z_network_message_encode(wbf, _z_network_message_svec_get(&msg->_messages, i), NULL)) + _Z_RETURN_IF_ERR(_z_network_message_encode(wbf, _z_network_message_svec_get(&msg->_messages, i))) } } diff --git a/src/transport/raweth/tx.c b/src/transport/raweth/tx.c index 9116930c5..e8f9be4c4 100644 --- a/src/transport/raweth/tx.c +++ b/src/transport/raweth/tx.c @@ -252,7 +252,7 @@ z_result_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_ _Z_CLEAN_RETURN_IF_ERR(_z_transport_message_encode(&ztm->_common._wbuf, &t_msg), _z_transport_tx_mutex_unlock(&ztm->_common)); // Encode the network message - if (_z_network_message_encode(&ztm->_common._wbuf, n_msg, NULL) == _Z_RES_OK) { + if (_z_network_message_encode(&ztm->_common._wbuf, n_msg) == _Z_RES_OK) { // Write the eth header _Z_CLEAN_RETURN_IF_ERR(__unsafe_z_raweth_write_header(&ztm->_common._link, &ztm->_common._wbuf), _z_transport_tx_mutex_unlock(&ztm->_common)); @@ -266,8 +266,7 @@ z_result_t _z_raweth_send_n_msg(_z_session_t *zn, const _z_network_message_t *n_ // Create an expandable wbuf for fragmentation _z_wbuf_t fbf = _z_wbuf_make(_Z_FRAG_BUFF_BASE_SIZE, true); // Encode the message on the expandable wbuf - _Z_CLEAN_RETURN_IF_ERR(_z_network_message_encode(&fbf, n_msg, NULL), - _z_transport_tx_mutex_unlock(&ztm->_common)); + _Z_CLEAN_RETURN_IF_ERR(_z_network_message_encode(&fbf, n_msg), _z_transport_tx_mutex_unlock(&ztm->_common)); // Fragment and send the message bool is_first = true; while (_z_wbuf_len(&fbf) > 0) { diff --git a/tests/z_msgcodec_test.c b/tests/z_msgcodec_test.c index b1c288a2f..a87c258e4 100644 --- a/tests/z_msgcodec_test.c +++ b/tests/z_msgcodec_test.c @@ -1163,7 +1163,7 @@ void declare_message(void) { _z_network_message_t n_msg = gen_declare_message(); // Encode - z_result_t res = _z_network_message_encode(&wbf, &n_msg, NULL); + z_result_t res = _z_network_message_encode(&wbf, &n_msg); assert(res == _Z_RES_OK); (void)(res); @@ -1433,7 +1433,7 @@ void push_message(void) { printf("\n>> Push message\n"); _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); _z_n_msg_push_t expected = gen_push(); - assert(_z_push_encode(&wbf, &expected, NULL) == _Z_RES_OK); + assert(_z_push_encode(&wbf, &expected) == _Z_RES_OK); _z_n_msg_push_t decoded = {0}; _z_arc_slice_t arcs = {0}; _z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf); @@ -1505,7 +1505,7 @@ void request_message(void) { printf("\n>> Request message\n"); _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); _z_n_msg_request_t expected = gen_request(); - assert(_z_request_encode(&wbf, &expected, NULL) == _Z_RES_OK); + assert(_z_request_encode(&wbf, &expected) == _Z_RES_OK); _z_n_msg_request_t decoded = {0}; _z_arc_slice_t arcs = {0}; _z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf); @@ -1564,7 +1564,7 @@ void response_message(void) { printf("\n>> Response message\n"); _z_wbuf_t wbf = gen_wbuf(UINT16_MAX); _z_n_msg_response_t expected = gen_response(); - assert(_z_response_encode(&wbf, &expected, NULL) == _Z_RES_OK); + assert(_z_response_encode(&wbf, &expected) == _Z_RES_OK); _z_n_msg_response_t decoded = {0}; _z_arc_slice_t arcs = {0}; _z_zbuf_t zbf = _z_wbuf_to_zbuf(&wbf); From 3843b906149a015bdd7ff886aec943f9a9d2dbd9 Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Tue, 7 Jan 2025 10:32:14 +0100 Subject: [PATCH 6/8] feat: get is express status directly in tx --- .../zenoh-pico/protocol/definitions/network.h | 2 ++ src/transport/common/tx.c | 29 +++++++++++++++---- 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/include/zenoh-pico/protocol/definitions/network.h b/include/zenoh-pico/protocol/definitions/network.h index 2af9d97ef..2bc7eb0ba 100644 --- a/include/zenoh-pico/protocol/definitions/network.h +++ b/include/zenoh-pico/protocol/definitions/network.h @@ -80,6 +80,8 @@ extern "C" { typedef _z_qos_t _z_n_qos_t; +#define _Z_N_QOS_IS_EXPRESS_FLAG (1 << 4) + static inline _z_qos_t _z_n_qos_create(bool express, z_congestion_control_t congestion_control, z_priority_t priority) { _z_n_qos_t ret; bool nodrop = congestion_control == Z_CONGESTION_CONTROL_DROP ? 0 : 1; diff --git a/src/transport/common/tx.c b/src/transport/common/tx.c index aef42a8e5..452a6269c 100644 --- a/src/transport/common/tx.c +++ b/src/transport/common/tx.c @@ -26,6 +26,25 @@ /*------------------ Transmission helper ------------------*/ +static bool _z_transport_tx_get_express_status(const _z_network_message_t *msg) { + switch (msg->_tag) { + case _Z_N_DECLARE: { + return _Z_HAS_FLAG(msg->_body._declare._ext_qos._val, _Z_N_QOS_IS_EXPRESS_FLAG); + } break; + case _Z_N_PUSH: { + return _Z_HAS_FLAG(msg->_body._push._qos._val, _Z_N_QOS_IS_EXPRESS_FLAG); + } break; + case _Z_N_REQUEST: { + return _Z_HAS_FLAG(msg->_body._request._ext_qos._val, _Z_N_QOS_IS_EXPRESS_FLAG); + } break; + case _Z_N_RESPONSE: { + return _Z_HAS_FLAG(msg->_body._response._ext_qos._val, _Z_N_QOS_IS_EXPRESS_FLAG); + } break; + default: + return false; + } +} + static _z_zint_t _z_transport_tx_get_sn(_z_transport_common_t *ztc, z_reliability_t reliability) { _z_zint_t sn; if (reliability == Z_RELIABILITY_RELIABLE) { @@ -45,7 +64,7 @@ static z_result_t _z_transport_tx_send_fragment_inner(_z_transport_common_t *ztc bool is_first = true; _z_zint_t sn = first_sn; // Encode message on temp buffer - _Z_RETURN_IF_ERR(_z_network_message_encode(frag_buff, n_msg, NULL)); + _Z_RETURN_IF_ERR(_z_network_message_encode(frag_buff, n_msg)); // Fragment message while (_z_wbuf_len(frag_buff) > 0) { // Get fragment sequence number @@ -139,8 +158,8 @@ static z_result_t _z_transport_tx_batch_overflow(_z_transport_common_t *ztc, con _z_transport_message_t t_msg = _z_t_msg_make_frame_header(sn, reliability); _Z_RETURN_IF_ERR(_z_transport_message_encode(&ztc->_wbuf, &t_msg)); // Retry encode - bool is_express = false; - z_result_t ret = _z_network_message_encode(&ztc->_wbuf, n_msg, &is_express); + bool is_express = _z_transport_tx_get_express_status(n_msg); + z_result_t ret = _z_network_message_encode(&ztc->_wbuf, n_msg); if (ret != _Z_RES_OK) { // Message still doesn't fit in buffer, send as fragments return _z_transport_tx_send_fragment(ztc, n_msg, reliability, sn); @@ -186,8 +205,8 @@ static z_result_t _z_transport_tx_send_n_msg_inner(_z_transport_common_t *ztc, c } // Try encoding the network message size_t prev_wpos = _z_transport_tx_save_wpos(&ztc->_wbuf); - bool is_express = false; - z_result_t ret = _z_network_message_encode(&ztc->_wbuf, n_msg, &is_express); + bool is_express = _z_transport_tx_get_express_status(n_msg); + z_result_t ret = _z_network_message_encode(&ztc->_wbuf, n_msg); if (ret == _Z_RES_OK) { if (is_express) { // Send immediately From 41cbec40bbca4ba117afa44f4d76c43a6481555a Mon Sep 17 00:00:00 2001 From: Jean-Roland Date: Tue, 7 Jan 2025 10:32:27 +0100 Subject: [PATCH 7/8] doc: update batch documentation --- include/zenoh-pico/api/primitives.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/include/zenoh-pico/api/primitives.h b/include/zenoh-pico/api/primitives.h index 5fa001a46..9cce79d99 100644 --- a/include/zenoh-pico/api/primitives.h +++ b/include/zenoh-pico/api/primitives.h @@ -2080,8 +2080,8 @@ const z_loaned_keyexpr_t *z_subscriber_keyexpr(const z_loaned_subscriber_t *subs #if Z_FEATURE_BATCHING == 1 /** * Activate the batching mechanism, any message that would have been sent on the network by a subsequent api call (e.g - * z_put, z_get) will be instead stored until the batch is full, flushed with :c:func:`zp_batch_flush` or batching is - * stopped with :c:func:`zp_batch_stop`. + * z_put, z_get) will be instead stored until either: the batch is full, flushed with :c:func:`zp_batch_flush`, batching + * is stopped with :c:func:`zp_batch_stop`, a message needs to be sent immediately. * * Parameters: * zs: Pointer to a :c:type:`z_loaned_session_t` that will start batching messages. From 6963753b9fb5419bc64c964d67f732cc0fdded3a Mon Sep 17 00:00:00 2001 From: Jean-Roland Gosse Date: Tue, 7 Jan 2025 14:19:04 +0100 Subject: [PATCH 8/8] fix: remove redundant break in switch Co-authored-by: Alexander Bushnev --- src/transport/common/tx.c | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/src/transport/common/tx.c b/src/transport/common/tx.c index 452a6269c..fc38fae5d 100644 --- a/src/transport/common/tx.c +++ b/src/transport/common/tx.c @@ -28,23 +28,18 @@ static bool _z_transport_tx_get_express_status(const _z_network_message_t *msg) { switch (msg->_tag) { - case _Z_N_DECLARE: { + case _Z_N_DECLARE: return _Z_HAS_FLAG(msg->_body._declare._ext_qos._val, _Z_N_QOS_IS_EXPRESS_FLAG); - } break; - case _Z_N_PUSH: { + case _Z_N_PUSH: return _Z_HAS_FLAG(msg->_body._push._qos._val, _Z_N_QOS_IS_EXPRESS_FLAG); - } break; - case _Z_N_REQUEST: { + case _Z_N_REQUEST: return _Z_HAS_FLAG(msg->_body._request._ext_qos._val, _Z_N_QOS_IS_EXPRESS_FLAG); - } break; - case _Z_N_RESPONSE: { + case _Z_N_RESPONSE: return _Z_HAS_FLAG(msg->_body._response._ext_qos._val, _Z_N_QOS_IS_EXPRESS_FLAG); - } break; default: return false; } } - static _z_zint_t _z_transport_tx_get_sn(_z_transport_common_t *ztc, z_reliability_t reliability) { _z_zint_t sn; if (reliability == Z_RELIABILITY_RELIABLE) {