p2p robustness and scalability#1222
Conversation
Work in progress toward consistently starting interconnected peer zenoh-pico nodes at larger scales. - Make the listen connection limit configurable via CMake - Add connect timeout support to the C11 z_pub/z_sub examples - Update the peer mesh run script to use configurable connect timeout - Speed up run script log scanning and report time to full observed connectivity
| @@ -0,0 +1,79 @@ | |||
| nb=${1:-5} | |||
| connect_timeout=${3:--1} | ||
|
|
||
| echo "" > log | ||
| for i in $(seq -f "%03g" 1 1 $nb 2>/dev/null) |
| do | ||
| connect="$connect -e tcp/127.0.0.1:8$j -e tcp/127.0.0.1:9$j" | ||
| done | ||
| stdbuf -o0 ./build/examples/z_pub -m peer -l tcp/127.0.0.1:8$i -t $connect_timeout $connect -k demo/example/$i | while read line; do echo "[pub $i][$(date +%s.%N)] $line"; done >> log & |
| do | ||
| connect="$connect -e tcp/127.0.0.1:8$j -e tcp/127.0.0.1:9$j" | ||
| done | ||
| stdbuf -o0 ./build/examples/z_pub -m peer -l tcp/127.0.0.1:8$i -t $connect_timeout $connect -k demo/example/$i | while read line; do echo "[pub $i][$(date +%s.%N)] $line"; done >> log & |
| connect="$connect -e tcp/127.0.0.1:8$j -e tcp/127.0.0.1:9$j" | ||
| done | ||
| stdbuf -o0 ./build/examples/z_pub -m peer -l tcp/127.0.0.1:8$i -t $connect_timeout $connect -k demo/example/$i | while read line; do echo "[pub $i][$(date +%s.%N)] $line"; done >> log & | ||
| stdbuf -o0 ./build/examples/z_sub -m peer -l tcp/127.0.0.1:9$i -t $connect_timeout -e tcp/127.0.0.1:8$i $connect | while read line; do echo "[sub $i][$(date +%s.%N)] $line"; done >> log & |
| connect="$connect -e tcp/127.0.0.1:8$j -e tcp/127.0.0.1:9$j" | ||
| done | ||
| stdbuf -o0 ./build/examples/z_pub -m peer -l tcp/127.0.0.1:8$i -t $connect_timeout $connect -k demo/example/$i | while read line; do echo "[pub $i][$(date +%s.%N)] $line"; done >> log & | ||
| stdbuf -o0 ./build/examples/z_sub -m peer -l tcp/127.0.0.1:9$i -t $connect_timeout -e tcp/127.0.0.1:8$i $connect | while read line; do echo "[sub $i][$(date +%s.%N)] $line"; done >> log & |
| stdbuf -o0 ./build/examples/z_sub -m peer -l tcp/127.0.0.1:9$i -t $connect_timeout -e tcp/127.0.0.1:8$i $connect | while read line; do echo "[sub $i][$(date +%s.%N)] $line"; done >> log & | ||
| done | ||
|
|
||
| sleep $duration |
|
|
||
| if [[ $stop != "" ]] | ||
| then | ||
| echo OK $(($stop - $start)) seconds |
| then | ||
| echo OK $(($stop - $start)) seconds | ||
| else | ||
| echo KO $failure |
| bool *add_matching_listener) { | ||
| int opt; | ||
| while ((opt = getopt(argc, argv, "k:v:e:m:l:n:a")) != -1) { | ||
| while ((opt = getopt(argc, argv, "k:v:e:m:l:n:at:")) != -1) { |
| static int parse_args(int argc, char **argv, z_owned_config_t *config, char **ke, int *n) { | ||
| int opt; | ||
| while ((opt = getopt(argc, argv, "k:e:m:l:n:")) != -1) { | ||
| while ((opt = getopt(argc, argv, "k:e:m:l:n:t:")) != -1) { |
| bool *add_matching_listener) { | ||
| int opt; | ||
| while ((opt = getopt(argc, argv, "k:v:e:m:l:n:a")) != -1) { | ||
| while ((opt = getopt(argc, argv, "k:v:e:m:l:n:at:")) != -1) { |
| bool *add_matching_listener) { | ||
| int opt; | ||
| while ((opt = getopt(argc, argv, "k:v:e:m:l:n:a")) != -1) { | ||
| while ((opt = getopt(argc, argv, "k:v:e:m:l:n:at:")) != -1) { |
| break; | ||
| case 't': | ||
| #if defined(Z_FEATURE_UNSTABLE_API) | ||
| zp_config_insert(z_loan_mut(*config), Z_CONFIG_CONNECT_TIMEOUT_KEY, optarg); |
| case '?': | ||
| if (optopt == 'k' || optopt == 'v' || optopt == 'e' || optopt == 'm' || optopt == 'l' || | ||
| optopt == 'n') { | ||
| optopt == 'n' || optopt == 't') { |
| static int parse_args(int argc, char **argv, z_owned_config_t *config, char **ke, int *n) { | ||
| int opt; | ||
| while ((opt = getopt(argc, argv, "k:e:m:l:n:")) != -1) { | ||
| while ((opt = getopt(argc, argv, "k:e:m:l:n:t:")) != -1) { |
| static int parse_args(int argc, char **argv, z_owned_config_t *config, char **ke, int *n) { | ||
| int opt; | ||
| while ((opt = getopt(argc, argv, "k:e:m:l:n:")) != -1) { | ||
| while ((opt = getopt(argc, argv, "k:e:m:l:n:t:")) != -1) { |
| break; | ||
| case 't': | ||
| #if defined(Z_FEATURE_UNSTABLE_API) | ||
| zp_config_insert(z_loan_mut(*config), Z_CONFIG_CONNECT_TIMEOUT_KEY, optarg); |
| break; | ||
| case '?': | ||
| if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'l' || optopt == 'n') { | ||
| if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'l' || optopt == 'n' || |
| case '?': | ||
| if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'l' || optopt == 'n') { | ||
| if (optopt == 'k' || optopt == 'e' || optopt == 'm' || optopt == 'l' || optopt == 'n' || | ||
| optopt == 't') { |
| const _z_config_t *config) { | ||
| if (!_z_pending_peers_has_pending(pending_peers)) { | ||
| const _z_config_t *config, size_t max_attempts) { | ||
| size_t pending_count = _z_pending_peers_count_pending(pending_peers); |
|
|
||
| for (size_t attempt = 0; attempt < loop_count; attempt++) { | ||
| size_t i = 0; | ||
| if (!_z_pending_peers_next_pending_idx(pending_peers, &i)) { |
In single-thread mode, spin the async-opened session from only one test thread while waiting for peer discovery, avoiding concurrent zp_spin_once() calls on the same session.
Limit the background add-peers task to a small number of peer attempts per executor tick, rotating through pending locators before applying backoff. This prevents peer retry work from monopolizing single-thread runtimes while still retrying the full pending set before sleeping.
| } | ||
|
|
||
| #if Z_FEATURE_UNICAST_PEER == 1 | ||
| #define _Z_ADD_PEERS_ALL_PENDING 0 |
|
|
||
| #if Z_FEATURE_UNICAST_PEER == 1 | ||
| #define _Z_ADD_PEERS_ALL_PENDING 0 | ||
| #define _Z_ADD_PEERS_TASK_MAX_ATTEMPTS 1 |
| // Non-retryable error | ||
| bool has_pending = _z_pending_peers_has_pending(pending_peers); | ||
|
|
||
| if (result._last_non_retryable_ret != _Z_RES_OK) { |
|
|
||
| if (result._last_non_retryable_ret != _Z_RES_OK) { | ||
| // Non-retryable error. | ||
| if (exit_on_failure) { |
| _z_pending_peers_clear(pending_peers); | ||
| return _Z_RES_OK; | ||
| } | ||
|
|
| if (pending_peers->_remaining_attempts == 0) { | ||
| pending_peers->_remaining_attempts = _z_pending_peers_count_pending(pending_peers); | ||
| } | ||
|
|
| if (pending_peers->_remaining_attempts > 0) { | ||
| return _z_fut_fn_result_continue(); | ||
| } | ||
|
|
Investigation update: 50-node peer mesh startupCurrent testing still shows that a 50-process peer mesh does not reliably converge ( Latest local run: This suggests the mesh is not simply failing to attempt peer connections. Connections are being established, but peers begin expiring while peer addition is still ongoing. In this run, lease expiry starts around The key implication is that peer connection establishment and lease/keepalive management need to be aligned. As soon as a peer connection is established, the transport must have the lease/keepalive machinery in place to service it. Long synchronous peer-add work during startup can leave already-established peers live before the background tasks that maintain their leases are able to run. Older behavior around zenoh-pico 1.9.0 was also checked. The same broad design existed there: peer lease expiry removes peers from the peer transport, but expired configured peers are not requeued as desired peer connections. So this appears to be an existing limitation rather than a regression introduced by the recent retry work. Current understanding
Potential next steps
|
Description
Work in progress toward consistently starting interconnected peer zenoh-pico nodes at larger scales.
What does this PR do?
z_pub/z_subexamplesWhy is this change needed?
The peer-to-peer stress script needs to exercise larger interconnected zenoh-pico meshes, ideally up to 50 nodes, without relying on hardcoded connection limits or slow log parsing. This PR starts improving that workflow by making the listen limit configurable, allowing examples to keep retrying peer connects, and making the script report the actual time until full observed connectivity.
Related Issues
🏷️ Label-Based Checklist
Based on the labels applied to this PR, please complete these additional requirements:
Labels:
internal🏠 Internal Change
This PR is marked as internal (not user-facing):
Lighter review: Internal changes may have lighter review requirements.
Instructions:
- [ ]to- [x])This checklist updates automatically when labels change, but preserves your checked boxes.