Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -492,8 +492,11 @@ jobs:

connection_restore_test:
needs: zenoh_build
name: Connection restore test
name: Connection restore test (multi-thread=${{ matrix.multi_thread }})
runs-on: ubuntu-latest
strategy:
matrix:
multi_thread: [1, 0]
steps:
- name: Checkout code
uses: actions/checkout@v4
Expand All @@ -510,8 +513,14 @@ jobs:
run: |
sudo apt install -y ninja-build
CMAKE_GENERATOR=Ninja ASAN=ON CMAKE_BUILD_TYPE=Debug ZENOH_DEBUG=3 make
RUST_LOG=debug sudo python3 ./build/tests/connection_restore.py ./zenoh-standalone/zenohd
if [ "$Z_FEATURE_MULTI_THREAD" = "0" ]; then
RUST_LOG=debug sudo python3 ./build/tests/connection_restore.py ./zenoh-standalone/zenohd --single-thread
else
RUST_LOG=debug sudo python3 ./build/tests/connection_restore.py ./zenoh-standalone/zenohd
fi
timeout-minutes: 20
env:
Z_FEATURE_MULTI_THREAD: ${{ matrix.multi_thread }}

routed_peer_client_test:
needs: zenoh_build
Expand Down
1 change: 1 addition & 0 deletions include/zenoh-pico/net/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ typedef struct _z_session_t {

#if Z_FEATURE_AUTO_RECONNECT == 1
_z_network_message_slist_t *_declaration_cache;
_z_string_t _last_connect_locator;
#endif

// Session subscriptions
Expand Down
2 changes: 2 additions & 0 deletions include/zenoh-pico/transport/unicast/transport.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ z_result_t _z_unicast_open_peer(_z_transport_unicast_establish_param_t *param, c
const _z_id_t *local_zid, int peer_op, _z_sys_net_socket_t *socket);
z_result_t _z_unicast_send_close(_z_transport_unicast_t *ztu, uint8_t reason, bool link_only);
z_result_t _z_unicast_transport_close(_z_transport_unicast_t *ztu, uint8_t reason);
void _z_unicast_transport_clear_connection(_z_transport_unicast_t *ztu);
void _z_unicast_transport_replace_connection(_z_transport_unicast_t *dst, _z_transport_unicast_t *src);
void _z_unicast_transport_clear(_z_transport_unicast_t *ztu);

#ifdef __cplusplus
Expand Down
300 changes: 245 additions & 55 deletions src/net/session.c

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions src/session/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ z_result_t _z_session_init(_z_session_t *zn, const _z_id_t *zid) {
_z_config_init(&zn->_config);
#if Z_FEATURE_AUTO_RECONNECT == 1
zn->_declaration_cache = NULL;
zn->_last_connect_locator = _z_string_null();
#endif

// Initialize the data structs
Expand Down Expand Up @@ -189,6 +190,7 @@ z_result_t _z_session_close(_z_session_t *zn) {
_Z_RETURN_IF_ERR(_z_session_mutex_lock(zn));
#if Z_FEATURE_AUTO_RECONNECT == 1
_z_network_message_slist_free(&zn->_declaration_cache);
_z_string_clear(&zn->_last_connect_locator);
#endif
_z_session_mutex_unlock(zn);
_z_flush_local_resources(zn);
Expand Down
31 changes: 30 additions & 1 deletion src/transport/common/tx.c
Original file line number Diff line number Diff line change
Expand Up @@ -273,8 +273,12 @@ z_result_t _z_transport_tx_send_t_msg(_z_transport_common_t *ztc, const _z_trans
// If sending to a peer list, make sure the peer mutex is locked
_z_transport_tx_mutex_lock(ztc, true);

ret = _z_transport_tx_send_t_msg_inner(ztc, t_msg, peers);
if (ztc->_state != _Z_TRANSPORT_STATE_OPEN) {
_z_transport_tx_mutex_unlock(ztc);
return _Z_ERR_TRANSPORT_NOT_AVAILABLE;
}

ret = _z_transport_tx_send_t_msg_inner(ztc, t_msg, peers);
_z_transport_tx_mutex_unlock(ztc);
return ret;
}
Expand All @@ -298,6 +302,13 @@ static z_result_t _z_transport_tx_send_n_msg(_z_transport_common_t *ztc, const _
return ret;
}
// Process message
if (ztc->_state != _Z_TRANSPORT_STATE_OPEN) {
if (!_z_transport_batch_hold_tx_mutex()) {
_z_transport_tx_mutex_unlock(ztc);
}
return _Z_ERR_TRANSPORT_NOT_AVAILABLE;
}

ret = _z_transport_tx_send_n_msg_inner(ztc, n_msg, reliability, peers);
if (!_z_transport_batch_hold_tx_mutex()) {
_z_transport_tx_mutex_unlock(ztc);
Expand All @@ -309,6 +320,9 @@ static z_result_t _z_transport_tx_send_n_batch(_z_transport_common_t *ztc, z_con
_z_transport_peer_unicast_slist_t *peers) {
#if Z_FEATURE_BATCHING == 1
z_result_t ret = _Z_RES_OK;
if (ztc->_state != _Z_TRANSPORT_STATE_OPEN) {
return _Z_ERR_TRANSPORT_NOT_AVAILABLE;
}
// Check batch size
if (ztc->_batch_count > 0) {
// Acquire the lock and drop the message if needed
Expand All @@ -320,6 +334,13 @@ static z_result_t _z_transport_tx_send_n_batch(_z_transport_common_t *ztc, z_con
return ret;
}
// Send batch
if (ztc->_state != _Z_TRANSPORT_STATE_OPEN) {
if (!_z_transport_batch_hold_tx_mutex()) {
_z_transport_tx_mutex_unlock(ztc);
}
return _Z_ERR_TRANSPORT_NOT_AVAILABLE;
}

_Z_DEBUG("Send network batch");
ret = _z_transport_tx_flush_buffer(ztc, peers);
if (!_z_transport_batch_hold_tx_mutex()) {
Expand Down Expand Up @@ -391,6 +412,10 @@ z_result_t _z_send_t_msg(_z_transport_t *zt, const _z_transport_message_t *t_msg
ret = _z_transport_tx_send_t_msg(&zt->_transport._multicast._common, t_msg, NULL);
break;
case _Z_TRANSPORT_RAWETH_TYPE:
if (zt->_transport._raweth._common._state != _Z_TRANSPORT_STATE_OPEN) {
ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE;
break;
}
ret = _z_raweth_send_t_msg(&zt->_transport._raweth._common, t_msg);
break;
default:
Expand Down Expand Up @@ -530,6 +555,10 @@ z_result_t _z_send_n_msg(_z_session_t *zn, const _z_network_message_t *z_msg, z_
_z_transport_tx_send_n_msg(&zn->_tp._transport._multicast._common, z_msg, reliability, cong_ctrl, NULL);
break;
case _Z_TRANSPORT_RAWETH_TYPE:
if (zn->_tp._transport._raweth._common._state != _Z_TRANSPORT_STATE_OPEN) {
ret = _Z_ERR_TRANSPORT_NOT_AVAILABLE;
break;
}
ret = _z_raweth_send_n_msg(zn, z_msg, reliability, cong_ctrl);
break;
default:
Expand Down
3 changes: 2 additions & 1 deletion src/transport/manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <stddef.h>
#include <stdlib.h>
#include <string.h>

#include "zenoh-pico/link/transport/socket.h"
#if Z_FEATURE_LINK_TLS == 1
Expand Down Expand Up @@ -83,7 +84,7 @@ static z_result_t _z_new_transport_client(_z_transport_t *zt, const _z_string_t
switch (zl->_cap._transport) {
// Unicast transport
case Z_LINK_CAP_TRANSPORT_UNICAST: {
_z_transport_unicast_establish_param_t tp_param;
_z_transport_unicast_establish_param_t tp_param = {0};
ret = _z_unicast_open_client(&tp_param, zl, local_zid);
if (ret != _Z_RES_OK) {
_z_link_free(&zl);
Expand Down
6 changes: 5 additions & 1 deletion src/transport/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ void _z_transport_free(_z_transport_t **zt) {
#if Z_FEATURE_BATCHING == 1
z_result_t _z_transport_start_batching(_z_transport_t *zt) {
_z_transport_common_t *ztc = _z_transport_get_common(zt);
if (ztc == NULL) {
if ((ztc == NULL) || (ztc->_state != _Z_TRANSPORT_STATE_OPEN)) {
_Z_ERROR_RETURN(_Z_ERR_TRANSPORT_NOT_AVAILABLE);
}
if (ztc->_batch_state == _Z_BATCHING_ACTIVE) {
Expand All @@ -133,6 +133,10 @@ z_result_t _z_transport_stop_batching(_z_transport_t *zt) {
_Z_ERROR_RETURN(_Z_ERR_TRANSPORT_NOT_AVAILABLE);
}

if (ztc->_batch_state != _Z_BATCHING_ACTIVE) {
return _Z_RES_OK;
}

#if Z_FEATURE_BATCH_TX_MUTEX == 1
_z_transport_tx_mutex_unlock(ztc);
#endif
Expand Down
35 changes: 27 additions & 8 deletions src/transport/unicast/lease.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ z_result_t _zp_unicast_send_keep_alive(_z_transport_unicast_t *ztu) {
}

_z_fut_fn_result_t _zp_unicast_failed_result(_z_transport_unicast_t *ztu, _z_executor_t *executor) {
if (ztu->_common._state == _Z_TRANSPORT_STATE_CLOSED) {
return _z_fut_fn_result_ready();
}
if (ztu->_common._state == _Z_TRANSPORT_STATE_RECONNECTING) {
return _z_fut_fn_result_suspend();
}

_z_session_t *zs = _z_transport_common_get_session(&ztu->_common);
#if Z_FEATURE_LIVELINESS == 1 && Z_FEATURE_SUBSCRIPTION == 1
_z_liveliness_subscription_undeclare_all(zs);
Expand All @@ -97,29 +104,41 @@ _z_fut_fn_result_t _zp_unicast_failed_result(_z_transport_unicast_t *ztu, _z_exe
}
#endif
_z_unicast_transport_close(ztu, _Z_CLOSE_EXPIRED);
_z_session_transport_mutex_lock(zs);
#if Z_FEATURE_AUTO_RECONNECT == 1
// Store weak session to reuse for reconnection.
_z_session_weak_t weak_session_clone = _z_session_weak_clone(&ztu->_common._session);
_z_session_weak_t weak_session_clone = _z_session_weak_null();
#endif
_z_session_transport_mutex_lock(zs);
#if Z_FEATURE_AUTO_RECONNECT == 1
if (zs->_mode == Z_WHATAMI_CLIENT) {
_z_unicast_transport_clear_connection(ztu);
} else {
// Keep the legacy peer-mode reopen path: clear the transport storage and let _z_open() rebuild it.
weak_session_clone = _z_session_weak_clone(&ztu->_common._session);
_z_transport_clear(&zs->_tp);
}
#else
_z_transport_clear(&zs->_tp);
#endif
_z_session_transport_mutex_unlock(zs);

#if Z_FEATURE_AUTO_RECONNECT == 1
ztu->_common._state = _Z_TRANSPORT_STATE_RECONNECTING;
ztu->_common._session = weak_session_clone;
if (zs->_mode != Z_WHATAMI_CLIENT) {
ztu->_common._state = _Z_TRANSPORT_STATE_RECONNECTING;
ztu->_common._session = weak_session_clone;
}
_z_fut_t f = _z_fut_null();
f._fut_arg = &ztu->_common;
f._fut_fn = _z_client_reopen_task_fn;
f._destroy_fn = _z_client_reopen_task_drop;
if (_z_fut_handle_is_null(_z_executor_spawn(executor, &f))) {
_Z_ERROR("Failed to spawn client reopen task after transport failure.");
ztu->_common._state = _Z_TRANSPORT_STATE_CLOSED;
_z_session_weak_drop(&ztu->_common._session);
if (zs->_mode != Z_WHATAMI_CLIENT) {
_z_session_weak_drop(&ztu->_common._session);
}
return _z_fut_fn_result_ready();
} else {
return _z_fut_fn_result_suspend();
}
return _z_fut_fn_result_suspend();
#else
return _z_fut_fn_result_ready();
#endif
Expand Down
67 changes: 67 additions & 0 deletions src/transport/unicast/transport.c
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
ztu->_common._batch_count = 0;
#endif

#if Z_FEATURE_MULTI_THREAD == 1

Check warning

Code scanning / Cppcheck (reported by Codacy)

misra violation 2009 with no text in the supplied rule-texts-file Warning

misra violation 2009 with no text in the supplied rule-texts-file
// Initialize the mutexes
_Z_RETURN_IF_ERR(_z_mutex_init(&ztu->_common._mutex_tx));
_Z_RETURN_IF_ERR(_z_mutex_rec_init(&ztu->_common._mutex_peer));
Expand Down Expand Up @@ -95,8 +95,8 @@
static z_result_t _z_unicast_handshake_open(_z_transport_unicast_establish_param_t *param, const _z_link_t *zl,
const _z_id_t *local_zid, z_whatami_t mode, _z_sys_net_socket_t *socket) {
z_clock_t recv_deadline = z_clock_now();
z_clock_advance_ms(&recv_deadline, Z_TRANSPORT_CONNECT_TIMEOUT);

Check warning

Code scanning / Cppcheck (reported by Codacy)

misra violation 1703 with no text in the supplied rule-texts-file Warning

misra violation 1703 with no text in the supplied rule-texts-file

Check warning

Code scanning / Cppcheck (reported by Codacy)

misra violation 1703 with no text in the supplied rule-texts-file Warning

misra violation 1703 with no text in the supplied rule-texts-file
_z_transport_message_t ism = _z_t_msg_make_init_syn(mode, *local_zid);
param->_seq_num_res = ism._body._init._seq_num_res; // The announced sn resolution
param->_req_id_res = ism._body._init._req_id_res; // The announced req id resolution
Expand Down Expand Up @@ -324,6 +324,68 @@
return _z_unicast_send_close(ztu, reason, false);
}

static void __unsafe_z_unicast_transport_clear_connection(_z_transport_unicast_t *ztu) {
_z_transport_peer_unicast_slist_free(&ztu->_peers);
_z_wbuf_clear(&ztu->_common._wbuf);
_z_zbuf_clear(&ztu->_common._zbuf);
_z_link_free(&ztu->_common._link);

Check warning

Code scanning / Cppcheck (reported by Codacy)

misra violation 1703 with no text in the supplied rule-texts-file Warning

misra violation 1703 with no text in the supplied rule-texts-file

Check warning

Code scanning / Cppcheck (reported by Codacy)

misra violation 1703 with no text in the supplied rule-texts-file Warning

misra violation 1703 with no text in the supplied rule-texts-file
ztu->_common._sn_res = 0;
ztu->_common._sn_tx_reliable = 0;
ztu->_common._sn_tx_best_effort = 0;
ztu->_common._lease = 0;
ztu->_common._transmitted = false;
#if Z_FEATURE_BATCHING == 1

Check warning

Code scanning / Cppcheck (reported by Codacy)

misra violation 2009 with no text in the supplied rule-texts-file Warning

misra violation 2009 with no text in the supplied rule-texts-file
ztu->_common._batch_state = _Z_BATCHING_IDLE;
ztu->_common._batch_count = 0;
#endif
}

void _z_unicast_transport_clear_connection(_z_transport_unicast_t *ztu) {

Check warning

Code scanning / Cppcheck (reported by Codacy)

misra violation 804 with no text in the supplied rule-texts-file Warning

misra violation 804 with no text in the supplied rule-texts-file
_z_transport_tx_mutex_lock(&ztu->_common, true);
ztu->_common._state = _Z_TRANSPORT_STATE_RECONNECTING;
_z_transport_peer_mutex_lock(&ztu->_common);
__unsafe_z_unicast_transport_clear_connection(ztu);
_z_transport_peer_mutex_unlock(&ztu->_common);
_z_transport_tx_mutex_unlock(&ztu->_common);
}

void _z_unicast_transport_replace_connection(_z_transport_unicast_t *dst, _z_transport_unicast_t *src) {
_z_transport_tx_mutex_lock(&dst->_common, true);
_z_transport_peer_mutex_lock(&dst->_common);

__unsafe_z_unicast_transport_clear_connection(dst);

dst->_common._link = src->_common._link;
dst->_common._wbuf = src->_common._wbuf;
dst->_common._zbuf = src->_common._zbuf;
dst->_common._sn_res = src->_common._sn_res;
dst->_common._sn_tx_reliable = src->_common._sn_tx_reliable;
dst->_common._sn_tx_best_effort = src->_common._sn_tx_best_effort;
dst->_common._lease = src->_common._lease;
dst->_common._transmitted = src->_common._transmitted;
#if Z_FEATURE_BATCHING == 1
dst->_common._batch_state = src->_common._batch_state;
dst->_common._batch_count = src->_common._batch_count;
#endif
dst->_peers = src->_peers;
dst->_common._state = _Z_TRANSPORT_STATE_OPEN;

src->_common._link = NULL;
src->_common._wbuf = _z_wbuf_null();
src->_common._zbuf = _z_zbuf_null();
src->_peers = NULL;

_z_transport_peer_mutex_unlock(&dst->_common);
_z_transport_tx_mutex_unlock(&dst->_common);

#if Z_FEATURE_MULTI_THREAD == 1
_z_mutex_drop(&src->_common._mutex_tx);
_z_mutex_rec_drop(&src->_common._mutex_peer);
#endif
_z_session_weak_drop(&src->_common._session);
}

void _z_unicast_transport_clear(_z_transport_unicast_t *ztu) {
_z_transport_peer_unicast_slist_free(&ztu->_peers);
_z_pending_peers_clear(&ztu->_pending_peers);
Expand Down Expand Up @@ -372,6 +434,11 @@
_Z_ERROR_RETURN(_Z_ERR_TRANSPORT_NOT_AVAILABLE);
}

void _z_unicast_transport_clear_connection(_z_transport_unicast_t *ztu) { _ZP_UNUSED(ztu); }
void _z_unicast_transport_replace_connection(_z_transport_unicast_t *dst, _z_transport_unicast_t *src) {
_ZP_UNUSED(dst);
_ZP_UNUSED(src);
}
void _z_unicast_transport_clear(_z_transport_unicast_t *ztu) { _ZP_UNUSED(ztu); }

#endif // Z_FEATURE_UNICAST_TRANSPORT == 1
27 changes: 26 additions & 1 deletion tests/connection_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,19 @@
LIVELINESS_CLIENT_COMMAND = STDBUF_CMD + [f'{DIR_EXAMPLES}/z_liveliness', '-e', f'tcp/127.0.0.1:{ZENOH_PORT}']
LIVELINESS_SUB_CLIENT_COMMAND = STDBUF_CMD + [f'{DIR_EXAMPLES}/z_sub_liveliness', '-h', '-e', f'tcp/127.0.0.1:{ZENOH_PORT}']

SINGLE_THREAD_ZENOH_PORT = "7448"
SINGLE_THREAD_ROUTER_ARGS = ['-l', f'tcp/0.0.0.0:{SINGLE_THREAD_ZENOH_PORT}', '--no-multicast-scouting']

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Line too long (104/100) Warning test

Line too long (104/100)
SINGLE_THREAD_ACTIVE_CLIENT_COMMAND = STDBUF_CMD + [
f'{DIR_EXAMPLES}/z_pub_st',
'-e',
f'tcp/127.0.0.1:{SINGLE_THREAD_ZENOH_PORT}',
]
SINGLE_THREAD_PASSIVE_CLIENT_COMMAND = STDBUF_CMD + [
f'{DIR_EXAMPLES}/z_sub_st',
'-e',
f'tcp/127.0.0.1:{SINGLE_THREAD_ZENOH_PORT}',
]

LIBASAN_PATH = subprocess.run(["gcc", "-print-file-name=libasan.so"], stdout=subprocess.PIPE, text=True, check=True).stdout.strip()


Expand Down Expand Up @@ -383,8 +396,20 @@


def main():
if len(sys.argv) == 3 and sys.argv[2] == "--single-thread":
router_command = STDBUF_CMD + [sys.argv[1]] + SINGLE_THREAD_ROUTER_ARGS
test_pub_sub_survive_router_restart(router_command,
SINGLE_THREAD_ACTIVE_CLIENT_COMMAND,
SINGLE_THREAD_PASSIVE_CLIENT_COMMAND,
8)
test_pub_before_restart_then_new_sub(router_command,
SINGLE_THREAD_ACTIVE_CLIENT_COMMAND,
SINGLE_THREAD_PASSIVE_CLIENT_COMMAND,
8)
return

if len(sys.argv) != 2:
print("Usage: sudo python3 ./connection_restore.py /path/to/zenohd")
print("Usage: sudo python3 ./connection_restore.py /path/to/zenohd [--single-thread]")
sys.exit(1)

router_command = STDBUF_CMD + [sys.argv[1]] + ROUTER_ARGS
Expand Down
Loading