Skip to content

Commit fee6247

Browse files
improve trigger subscription performance
1 parent 05ab981 commit fee6247

File tree

7 files changed

+36
-53
lines changed

7 files changed

+36
-53
lines changed

include/zenoh-pico/collections/bytes.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,11 @@ z_result_t _z_bytes_append_slice(_z_bytes_t *dst, _z_arc_slice_t *s);
5858
z_result_t _z_bytes_copy(_z_bytes_t *dst, const _z_bytes_t *src);
5959
_z_bytes_t _z_bytes_duplicate(const _z_bytes_t *src);
6060
z_result_t _z_bytes_move(_z_bytes_t *dst, _z_bytes_t *src);
61-
_z_bytes_t _z_bytes_steal(_z_bytes_t *src);
61+
static inline _z_bytes_t _z_bytes_steal(_z_bytes_t *src) {
62+
_z_bytes_t b = *src;
63+
*src = _z_bytes_null();
64+
return b;
65+
}
6266
void _z_bytes_drop(_z_bytes_t *bytes);
6367
void _z_bytes_free(_z_bytes_t **bs);
6468
size_t _z_bytes_num_slices(const _z_bytes_t *bs);

include/zenoh-pico/net/encoding.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,11 @@ void _z_encoding_clear(_z_encoding_t *encoding);
4242
z_result_t _z_encoding_copy(_z_encoding_t *dst, const _z_encoding_t *src);
4343
_z_encoding_t _z_encoding_alias(_z_encoding_t src);
4444
z_result_t _z_encoding_move(_z_encoding_t *dst, _z_encoding_t *src);
45-
_z_encoding_t _z_encoding_steal(_z_encoding_t *val);
45+
static inline _z_encoding_t _z_encoding_steal(_z_encoding_t *val) {
46+
_z_encoding_t ret = *val;
47+
*val = _z_encoding_null();
48+
return ret;
49+
}
4650

4751
#ifdef __cplusplus
4852
}

include/zenoh-pico/protocol/keyexpr.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,12 @@ _z_keyexpr_t *_z_keyexpr_clone(const _z_keyexpr_t *src);
5050
/// or keyexpr defined by its suffix only, with 0 id and no mapping. This is to be used only when forwarding
5151
/// keyexpr in user api to properly separate declared keyexpr from its suffix.
5252
_z_keyexpr_t _z_keyexpr_alias_from_user_defined(_z_keyexpr_t src, bool try_declared);
53-
_z_keyexpr_t _z_keyexpr_steal(_Z_MOVE(_z_keyexpr_t) src);
53+
static inline _z_keyexpr_t _z_keyexpr_steal(_Z_MOVE(_z_keyexpr_t) src) {
54+
_z_keyexpr_t stolen = *src;
55+
*src = _z_keyexpr_null();
56+
return stolen;
57+
}
58+
5459
bool _z_keyexpr_equals(const _z_keyexpr_t *left, const _z_keyexpr_t *right);
5560
z_result_t _z_keyexpr_move(_z_keyexpr_t *dst, _z_keyexpr_t *src);
5661
void _z_keyexpr_clear(_z_keyexpr_t *rk);

src/collections/bytes.c

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -174,12 +174,6 @@ z_result_t _z_bytes_move(_z_bytes_t *dst, _z_bytes_t *src) {
174174
return _Z_RES_OK;
175175
}
176176

177-
_z_bytes_t _z_bytes_steal(_z_bytes_t *src) {
178-
_z_bytes_t b = *src;
179-
*src = _z_bytes_null();
180-
return b;
181-
}
182-
183177
_z_bytes_reader_t _z_bytes_get_reader(const _z_bytes_t *bytes) {
184178
_z_bytes_reader_t r;
185179
r.bytes = bytes;

src/net/encoding.c

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,3 @@ z_result_t _z_encoding_move(_z_encoding_t *dst, _z_encoding_t *src) {
7575
}
7676
return _Z_RES_OK;
7777
}
78-
79-
_z_encoding_t _z_encoding_steal(_z_encoding_t *val) {
80-
_z_encoding_t ret = {
81-
.id = val->id,
82-
.schema = _z_string_steal(&val->schema),
83-
};
84-
val->id = _Z_ENCODING_ID_DEFAULT;
85-
return ret;
86-
}

src/protocol/keyexpr.c

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -102,12 +102,6 @@ _z_keyexpr_t *_z_keyexpr_clone(const _z_keyexpr_t *src) {
102102
return dst;
103103
}
104104

105-
_z_keyexpr_t _z_keyexpr_steal(_Z_MOVE(_z_keyexpr_t) src) {
106-
_z_keyexpr_t stolen = *src;
107-
*src = _z_keyexpr_null();
108-
return stolen;
109-
}
110-
111105
z_result_t _z_keyexpr_move(_z_keyexpr_t *dst, _z_keyexpr_t *src) {
112106
dst->_id = src->_id;
113107
dst->_mapping = src->_mapping;

src/session/subscription.c

Lines changed: 20 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -246,54 +246,45 @@ static z_result_t _z_trigger_subscriptions_inner(_z_session_t *zn, _z_subscriber
246246
// Retrieve sub infos
247247
_z_subscription_cache_data_t sub_infos = _z_subscription_cache_data_null();
248248
sub_infos.ke_in = _z_keyexpr_alias(keyexpr);
249-
_Z_RETURN_IF_ERR(_z_subscription_get_infos(zn, sub_kind, &sub_infos));
249+
_Z_CLEAN_RETURN_IF_ERR(_z_subscription_get_infos(zn, sub_kind, &sub_infos), _z_keyexpr_clear(keyexpr);
250+
_z_encoding_clear(encoding); _z_bytes_drop(payload); _z_bytes_drop(attachment););
250251
// Check if there are subs
251252
_Z_DEBUG("Triggering %ju subs for key %d - %.*s", (uintmax_t)sub_infos.sub_nb, sub_infos.ke_out._id,
252253
(int)_z_string_len(&sub_infos.ke_out._suffix), _z_string_data(&sub_infos.ke_out._suffix));
253-
if (sub_infos.sub_nb == 0) {
254-
_z_keyexpr_clear(&sub_infos.ke_out);
255-
#if Z_FEATURE_RX_CACHE == 0
256-
_z_subscription_infos_svec_release(&sub_infos.infos); // Otherwise it's released with cache
257-
#endif
258-
return _Z_RES_OK;
259-
}
260-
z_result_t ret = _Z_RES_OK;
261254
// Create sample
262255
_z_sample_t sample =
263256
_z_sample_alias(&sub_infos.ke_out, payload, timestamp, encoding, sample_kind, qos, attachment, reliability);
257+
258+
z_result_t ret = _Z_RES_OK;
264259
// Parse subscription infos svec
265-
for (size_t i = 1; i < sub_infos.sub_nb; i++) {
266-
_z_sample_t sample_copy;
267-
ret = _z_sample_copy(&sample_copy, &sample);
268-
if (ret != _Z_RES_OK) {
269-
break;
270-
}
260+
for (size_t i = 0; i < sub_infos.sub_nb; i++) {
271261
_z_subscription_infos_t *sub_info = _z_subscription_infos_svec_get(&sub_infos.infos, i);
272-
sub_info->callback(&sample_copy, sub_info->arg);
273-
_z_sample_clear(&sample_copy);
262+
if (i + 1 == sub_infos.sub_nb) {
263+
sub_info->callback(&sample, sub_info->arg);
264+
} else {
265+
_z_sample_t sample_copy;
266+
ret = _z_sample_copy(&sample_copy, &sample);
267+
if (ret != _Z_RES_OK) {
268+
break;
269+
}
270+
sub_info->callback(&sample_copy, sub_info->arg);
271+
_z_sample_clear(&sample_copy);
272+
}
274273
}
275-
_z_subscription_infos_t *sub_info = _z_subscription_infos_svec_get(&sub_infos.infos, 0);
276-
sub_info->callback(&sample, sub_info->arg);
277274
_z_sample_clear(&sample);
278275
#if Z_FEATURE_RX_CACHE == 0
279-
_z_subscription_infos_svec_release(&sub_infos.infos); // Otherwise it's released with cache
276+
_z_subscription_infos_svec_release(&sub_infos.infos);
280277
#endif
281-
278+
_z_keyexpr_clear(keyexpr);
282279
return ret;
283280
}
284281

285282
z_result_t _z_trigger_subscriptions_impl(_z_session_t *zn, _z_subscriber_kind_t sub_kind, _z_keyexpr_t *keyexpr,
286283
_z_bytes_t *payload, _z_encoding_t *encoding, const _z_zint_t sample_kind,
287284
const _z_timestamp_t *timestamp, const _z_n_qos_t qos, _z_bytes_t *attachment,
288285
z_reliability_t reliability) {
289-
z_result_t ret = _z_trigger_subscriptions_inner(zn, sub_kind, keyexpr, payload, encoding, sample_kind, timestamp,
290-
qos, attachment, reliability);
291-
// Clean up
292-
_z_keyexpr_clear(keyexpr);
293-
_z_encoding_clear(encoding);
294-
_z_bytes_drop(payload);
295-
_z_bytes_drop(attachment);
296-
return ret;
286+
return _z_trigger_subscriptions_inner(zn, sub_kind, keyexpr, payload, encoding, sample_kind, timestamp, qos,
287+
attachment, reliability);
297288
}
298289

299290
void _z_unregister_subscription(_z_session_t *zn, _z_subscriber_kind_t kind, _z_subscription_rc_t *sub) {

0 commit comments

Comments
 (0)