Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement connection restoring #799

Merged
merged 1 commit into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,29 @@ jobs:
CMAKE_GENERATOR=Ninja ASAN=ON make
python3 ./build/tests/no_router.py
timeout-minutes: 5

connection_restore_test:
needs: zenoh_build
name: Connection restore test
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Download Zenoh artifacts
uses: actions/download-artifact@v4
with:
name: ${{ needs.zenoh_build.outputs.artifact-name }}

- name: Unzip Zenoh artifacts
run: unzip ${{ needs.zenoh_build.outputs.artifact-name }} -d zenoh-standalone

- name: Build project and run test
run: |
sudo apt install -y ninja-build
CMAKE_GENERATOR=Ninja ASAN=ON CMAKE_BUILD_TYPE=Debug ZENOH_DEBUG=3 make
RUST_LOG=debug sudo python3 ./build/tests/connection_restore.py ./zenoh-standalone/zenohd
timeout-minutes: 15

markdown_lint:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -331,7 +354,7 @@ jobs:
ci:
name: CI status checks
runs-on: ubuntu-latest
needs: [run_tests, check_format, c99_build, raweth_build, zenoh_build, modular_build, unstable_build, st_build, fragment_test, attachment_test, memory_leak_test, no_router, markdown_lint, build_shared, build_static, integration, multicast]
needs: [run_tests, check_format, c99_build, raweth_build, zenoh_build, modular_build, unstable_build, st_build, fragment_test, attachment_test, memory_leak_test, no_router, connection_restore_test, markdown_lint, build_shared, build_static, integration, multicast]
if: always()
steps:
- name: Check whether all jobs pass
Expand Down
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ set(Z_FEATURE_LOCAL_SUBSCRIBER 0 CACHE STRING "Toggle local subscriptions")
set(Z_FEATURE_PUBLISHER_SESSION_CHECK 1 CACHE STRING "Toggle publisher session check")
set(Z_FEATURE_BATCHING 1 CACHE STRING "Toggle batching")
set(Z_FEATURE_RX_CACHE 0 CACHE STRING "Toggle RX_CACHE")
set(Z_FEATURE_AUTO_RECONNECT 1 CACHE STRING "Toggle automatic reconnection")

# Add a warning message if someone tries to enable Z_FEATURE_LINK_SERIAL_USB directly
if(Z_FEATURE_LINK_SERIAL_USB AND NOT Z_FEATURE_UNSTABLE_API)
Expand All @@ -261,6 +262,7 @@ message(STATUS "Building with feature confing:\n\
* QUERYABLE: ${Z_FEATURE_QUERYABLE}\n\
* LIVELINESS: ${Z_FEATURE_LIVELINESS}\n\
* INTEREST: ${Z_FEATURE_INTEREST}\n\
* AUTO_RECONNECT: ${Z_FEATURE_AUTO_RECONNECT}\n\
* RAWETH: ${Z_FEATURE_RAWETH_TRANSPORT}")

configure_file(
Expand Down Expand Up @@ -527,6 +529,7 @@ if(UNIX OR MSVC)
configure_file(${PROJECT_SOURCE_DIR}/tests/attachment.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/attachment.py COPYONLY)
configure_file(${PROJECT_SOURCE_DIR}/tests/no_router.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/no_router.py COPYONLY)
configure_file(${PROJECT_SOURCE_DIR}/tests/memory_leak.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/memory_leak.py COPYONLY)
configure_file(${PROJECT_SOURCE_DIR}/tests/connection_restore.py ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/connection_restore.py COPYONLY)

enable_testing()
add_test(z_data_struct_test ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/z_data_struct_test)
Expand Down
41 changes: 21 additions & 20 deletions include/zenoh-pico/collections/list.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,31 +49,32 @@ _z_list_t *_z_list_push(_z_list_t *xs, void *x);
_z_list_t *_z_list_push_back(_z_list_t *xs, void *x);
_z_list_t *_z_list_pop(_z_list_t *xs, z_element_free_f f_f, void **x);

_z_list_t *_z_list_find(const _z_list_t *xs, z_element_eq_f f_f, void *e);
_z_list_t *_z_list_find(const _z_list_t *xs, z_element_eq_f f_f, const void *e);

_z_list_t *_z_list_drop_filter(_z_list_t *xs, z_element_free_f f_f, z_element_eq_f c_f, void *left);
_z_list_t *_z_list_drop_filter(_z_list_t *xs, z_element_free_f f_f, z_element_eq_f c_f, const void *left);

_z_list_t *_z_list_clone(const _z_list_t *xs, z_element_clone_f d_f);
void _z_list_free(_z_list_t **xs, z_element_free_f f_f);

#define _Z_LIST_DEFINE(name, type) \
typedef _z_list_t name##_list_t; \
static inline name##_list_t *name##_list_new(void) { return NULL; } \
static inline size_t name##_list_len(const name##_list_t *l) { return _z_list_len(l); } \
static inline bool name##_list_is_empty(const name##_list_t *l) { return _z_list_is_empty(l); } \
static inline type *name##_list_head(const name##_list_t *l) { return (type *)_z_list_head(l); } \
static inline name##_list_t *name##_list_tail(const name##_list_t *l) { return _z_list_tail(l); } \
static inline name##_list_t *name##_list_push(name##_list_t *l, type *e) { return _z_list_push(l, e); } \
static inline name##_list_t *name##_list_pop(name##_list_t *l, type **x) { \
return _z_list_pop(l, name##_elem_free, (void **)x); \
} \
static inline name##_list_t *name##_list_find(const name##_list_t *l, name##_eq_f c_f, type *e) { \
return _z_list_find(l, (z_element_eq_f)c_f, e); \
} \
static inline name##_list_t *name##_list_drop_filter(name##_list_t *l, name##_eq_f c_f, type *e) { \
return _z_list_drop_filter(l, name##_elem_free, (z_element_eq_f)c_f, e); \
} \
static inline name##_list_t *name##_list_clone(name##_list_t *l) { return _z_list_clone(l, name##_elem_clone); } \
#define _Z_LIST_DEFINE(name, type) \
typedef _z_list_t name##_list_t; \
static inline name##_list_t *name##_list_new(void) { return NULL; } \
static inline size_t name##_list_len(const name##_list_t *l) { return _z_list_len(l); } \
static inline bool name##_list_is_empty(const name##_list_t *l) { return _z_list_is_empty(l); } \
static inline type *name##_list_head(const name##_list_t *l) { return (type *)_z_list_head(l); } \
static inline name##_list_t *name##_list_tail(const name##_list_t *l) { return _z_list_tail(l); } \
static inline name##_list_t *name##_list_push(name##_list_t *l, type *e) { return _z_list_push(l, e); } \
static inline name##_list_t *name##_list_push_back(name##_list_t *l, type *e) { return _z_list_push_back(l, e); } \
static inline name##_list_t *name##_list_pop(name##_list_t *l, type **x) { \
return _z_list_pop(l, name##_elem_free, (void **)x); \
} \
static inline name##_list_t *name##_list_find(const name##_list_t *l, name##_eq_f c_f, const type *e) { \
return _z_list_find(l, (z_element_eq_f)c_f, e); \
} \
static inline name##_list_t *name##_list_drop_filter(name##_list_t *l, name##_eq_f c_f, const type *e) { \
return _z_list_drop_filter(l, name##_elem_free, (z_element_eq_f)c_f, e); \
} \
static inline name##_list_t *name##_list_clone(name##_list_t *l) { return _z_list_clone(l, name##_elem_clone); } \
static inline void name##_list_free(name##_list_t **l) { _z_list_free(l, name##_elem_free); }

#ifdef __cplusplus
Expand Down
3 changes: 2 additions & 1 deletion include/zenoh-pico/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
#define Z_FEATURE_SUBSCRIPTION 1
#define Z_FEATURE_QUERY 1
#define Z_FEATURE_QUERYABLE 1
#define Z_FEATURE_LIVELINESS 0
#define Z_FEATURE_LIVELINESS 1
#define Z_FEATURE_RAWETH_TRANSPORT 0
#define Z_FEATURE_INTEREST 1
#define Z_FEATURE_DYNAMIC_MEMORY_ALLOCATION 0
Expand All @@ -48,6 +48,7 @@
#define Z_FEATURE_PUBLISHER_SESSION_CHECK 1
#define Z_FEATURE_BATCHING 1
#define Z_FEATURE_RX_CACHE 0
#define Z_FEATURE_AUTO_RECONNECT 1
// End of CMake generation

/*------------------ Runtime configuration properties ------------------*/
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/config.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
#define Z_FEATURE_PUBLISHER_SESSION_CHECK @Z_FEATURE_PUBLISHER_SESSION_CHECK@
#define Z_FEATURE_BATCHING @Z_FEATURE_BATCHING@
#define Z_FEATURE_RX_CACHE @Z_FEATURE_RX_CACHE@
#define Z_FEATURE_AUTO_RECONNECT @Z_FEATURE_AUTO_RECONNECT@
// End of CMake generation

/*------------------ Runtime configuration properties ------------------*/
Expand Down
4 changes: 2 additions & 2 deletions include/zenoh-pico/link/endpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ bool _z_locator_eq(const _z_locator_t *left, const _z_locator_t *right);

void _z_locator_init(_z_locator_t *locator);
_z_string_t _z_locator_to_string(const _z_locator_t *loc);
z_result_t _z_locator_from_string(_z_locator_t *lc, _z_string_t *s);
z_result_t _z_locator_from_string(_z_locator_t *lc, const _z_string_t *s);

size_t _z_locator_size(_z_locator_t *lc);
void _z_locator_clear(_z_locator_t *lc);
Expand All @@ -72,7 +72,7 @@ typedef struct {
} _z_endpoint_t;

_z_string_t _z_endpoint_to_string(const _z_endpoint_t *e);
z_result_t _z_endpoint_from_string(_z_endpoint_t *ep, _z_string_t *s);
z_result_t _z_endpoint_from_string(_z_endpoint_t *ep, const _z_string_t *s);
void _z_endpoint_clear(_z_endpoint_t *ep);
void _z_endpoint_free(_z_endpoint_t **ep);

Expand Down
4 changes: 2 additions & 2 deletions include/zenoh-pico/link/link.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,8 @@ typedef struct _z_link_t {

void _z_link_clear(_z_link_t *zl);
void _z_link_free(_z_link_t **zl);
z_result_t _z_open_link(_z_link_t *zl, _z_string_t *locator);
z_result_t _z_listen_link(_z_link_t *zl, _z_string_t *locator);
z_result_t _z_open_link(_z_link_t *zl, const _z_string_t *locator);
z_result_t _z_listen_link(_z_link_t *zl, const _z_string_t *locator);

z_result_t _z_link_send_wbuf(const _z_link_t *zl, const _z_wbuf_t *wbf);
size_t _z_link_recv_zbuf(const _z_link_t *zl, _z_zbuf_t *zbf, _z_slice_t *addr);
Expand Down
42 changes: 40 additions & 2 deletions include/zenoh-pico/net/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "zenoh-pico/collections/list.h"
#include "zenoh-pico/config.h"
#include "zenoh-pico/protocol/core.h"
#include "zenoh-pico/protocol/definitions/network.h"
#include "zenoh-pico/session/liveliness.h"
#include "zenoh-pico/session/queryable.h"
#include "zenoh-pico/session/session.h"
Expand Down Expand Up @@ -55,6 +56,14 @@ typedef struct _z_session_t {
_z_resource_list_t *_local_resources;
_z_resource_list_t *_remote_resources;

#if Z_FEATURE_AUTO_RECONNECT == 1
// Information for session restoring
_z_config_t _config;
_z_network_message_list_t *_decalaration_cache;
z_task_attr_t *_lease_task_attr;
z_task_attr_t *_read_task_attr;
#endif

// Session subscriptions
#if Z_FEATURE_SUBSCRIPTION == 1
_z_subscription_rc_list_t *_subscriptions;
Expand Down Expand Up @@ -99,14 +108,43 @@ _Z_REFCOUNT_DEFINE(_z_session, _z_session)
* Open a zenoh-net session
*
* Parameters:
* zn: A pointer of A :c:type:`_z_session_rc_t` used as a return value.
* config: A set of properties. The caller keeps its ownership.
* zn: A pointer of A :c:type:`_z_session_t` used as a return value.
* zid: A pointer to Zenoh ID.
*
* Returns:
* ``0`` in case of success, or a ``negative value`` in case of failure.
*/
z_result_t _z_open(_z_session_rc_t *zn, _z_config_t *config, const _z_id_t *zid);

/**
* Reopen a disconnected zenoh-net session
*
* Parameters:
* zn: Existing zenoh-net session.
*
* Returns:
* ``0`` in case of success, or a ``negative value`` in case of failure.
*/
z_result_t _z_reopen(_z_session_rc_t *zn);

/**
* Store declaration network message to cache for resend it after session restore
*
* Parameters:
* zs: A zenoh-net session.
* z_msg: Network message with declaration
*/
void _z_cache_declaration(_z_session_t *zs, const _z_network_message_t *n_msg);

/**
* Remove corresponding declaration from the cache
*
* Parameters:
* zs: A zenoh-net session.
* z_msg: Network message with undeclaration
*/
z_result_t _z_open(_z_session_rc_t *zn, _z_config_t *config);
void _z_prune_declaration(_z_session_t *zs, const _z_network_message_t *n_msg);

/**
* Close a zenoh-net session.
Expand Down
2 changes: 2 additions & 0 deletions include/zenoh-pico/protocol/definitions/network.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ inline static void _z_msg_clear(_z_zenoh_message_t *msg) { _z_n_msg_clear(msg);
inline static void _z_msg_free(_z_zenoh_message_t **msg) { _z_n_msg_free(msg); }
_Z_ELEM_DEFINE(_z_network_message, _z_network_message_t, _z_noop_size, _z_n_msg_clear, _z_noop_copy, _z_noop_move)
_Z_SVEC_DEFINE(_z_network_message, _z_network_message_t)
_Z_LIST_DEFINE(_z_network_message, _z_network_message_t)

void _z_msg_fix_mapping(_z_zenoh_message_t *msg, uint16_t mapping);
_z_network_message_t _z_msg_make_query(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_slice_t) parameters, _z_zint_t qid,
Expand All @@ -315,6 +316,7 @@ _z_network_message_t _z_n_msg_make_declare(_z_declaration_t declaration, bool ha
_z_network_message_t _z_n_msg_make_push(_Z_MOVE(_z_keyexpr_t) key, _Z_MOVE(_z_push_body_t) body);
_z_network_message_t _z_n_msg_make_interest(_z_interest_t interest);
z_result_t _z_n_msg_copy(_z_network_message_t *dst, const _z_network_message_t *src);
_z_network_message_t *_z_n_msg_clone(const _z_network_message_t *src);

#ifdef __cplusplus
}
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/session/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ extern "C" {
_z_hello_list_t *_z_scout_inner(const z_what_t what, _z_id_t id, _z_string_t *locator, const uint32_t timeout,
const bool exit_on_first);

z_result_t _z_session_init(_z_session_rc_t *zsrc, _z_id_t *zid);
z_result_t _z_session_init(_z_session_t *zn, const _z_id_t *zid);
void _z_session_clear(_z_session_t *zn);
z_result_t _z_session_close(_z_session_t *zn, uint8_t reason);

Expand Down
30 changes: 30 additions & 0 deletions include/zenoh-pico/transport/common/transport.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
//
// Copyright (c) 2024 ZettaScale Technology
//
// This program and the accompanying materials are made available under the
// terms of the Eclipse Public License 2.0 which is available at
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
// which is available at https://www.apache.org/licenses/LICENSE-2.0.
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
//
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//

#ifndef ZENOH_PICO_COMMON_TRANSPORT_H
#define ZENOH_PICO_COMMON_TRANSPORT_H

#include "zenoh-pico/transport/transport.h"

#ifdef __cplusplus
extern "C" {
#endif

void _z_common_transport_clear(_z_transport_common_t *ztc, bool detach_tasks);

#ifdef __cplusplus
}
#endif

#endif /* ZENOH_PICO_COMMON_TRANSPORT_H*/
3 changes: 2 additions & 1 deletion include/zenoh-pico/transport/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ enum _z_peer_op_e {
_Z_PEER_OP_LISTEN = 1,
};

z_result_t _z_new_transport(_z_transport_t *zt, _z_id_t *bs, _z_string_t *locator, z_whatami_t mode, int peer_op);
z_result_t _z_new_transport(_z_transport_t *zt, const _z_id_t *bs, const _z_string_t *locator, z_whatami_t mode,
int peer_op);
void _z_free_transport(_z_transport_t **zt);

#ifdef __cplusplus
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/transport/multicast/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ z_result_t _z_multicast_open_client(_z_transport_multicast_establish_param_t *pa
const _z_id_t *local_zid);
z_result_t _z_multicast_send_close(_z_transport_multicast_t *ztm, uint8_t reason, bool link_only);
z_result_t _z_multicast_transport_close(_z_transport_multicast_t *ztm, uint8_t reason);
void _z_multicast_transport_clear(_z_transport_t *zt);
void _z_multicast_transport_clear(_z_transport_multicast_t *ztm, bool detach_tasks);

#if (Z_FEATURE_MULTICAST_TRANSPORT == 1 || Z_FEATURE_RAWETH_TRANSPORT == 1) && Z_FEATURE_MULTI_THREAD == 1
static inline void _z_multicast_peer_mutex_lock(_z_transport_multicast_t *ztm) { _z_mutex_lock(&ztm->_mutex_peer); }
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/transport/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ typedef struct {
uint8_t _seq_num_res;
} _z_transport_multicast_establish_param_t;

_z_transport_common_t *_z_transport_get_common(_z_transport_t *zt);
z_result_t _z_transport_close(_z_transport_t *zt, uint8_t reason);
void _z_transport_clear(_z_transport_t *zt);
void _z_transport_free(_z_transport_t **zt);
Expand Down
2 changes: 1 addition & 1 deletion include/zenoh-pico/transport/unicast/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ z_result_t _z_unicast_open_peer(_z_transport_unicast_establish_param_t *param, c
const _z_id_t *local_zid, int peer_op);
z_result_t _z_unicast_send_close(_z_transport_unicast_t *ztu, uint8_t reason, bool link_only);
z_result_t _z_unicast_transport_close(_z_transport_unicast_t *ztu, uint8_t reason);
void _z_unicast_transport_clear(_z_transport_t *zt);
void _z_unicast_transport_clear(_z_transport_unicast_t *ztu, bool detach_tasks);

#ifdef __cplusplus
}
Expand Down
Loading
Loading