Skip to content

Commit 5ac5ddc

Browse files
add synchronization for callback drop for subscribers, queryables, queriers and sessions
1 parent bb3f9a8 commit 5ac5ddc

File tree

24 files changed

+281
-148
lines changed

24 files changed

+281
-148
lines changed

include/zenoh-pico/collections/sync_group.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ static inline _z_sync_group_notifier_t _z_sync_group_notifier_null(void) {
5151
return n;
5252
}
5353

54+
static inline _z_sync_group_notifier_t _z_sync_group_notifier_steal(_z_sync_group_notifier_t* notifier) {
55+
_z_sync_group_notifier_t n = *notifier;
56+
*notifier = _z_sync_group_notifier_null();
57+
return n;
58+
}
59+
5460
z_result_t _z_sync_group_create(_z_sync_group_t* sync_group);
5561
z_result_t _z_sync_group_wait(_z_sync_group_t* sync_group);
5662
void _z_sync_group_close(_z_sync_group_t* sync_group);
@@ -60,7 +66,7 @@ static inline bool _z_sync_group_check(const _z_sync_group_t* sync_group) {
6066
}
6167
bool _z_sync_group_is_closed(const _z_sync_group_t* sync_group);
6268
void _z_sync_group_drop(_z_sync_group_t* sync_group);
63-
z_result_t _z_sync_group_create_notifier(_z_sync_group_t* sync_group, _z_sync_group_notifier_t* notifier);
69+
z_result_t _z_sync_group_create_notifier(const _z_sync_group_t* sync_group, _z_sync_group_notifier_t* notifier);
6470
void _z_sync_group_notifier_drop(_z_sync_group_notifier_t* notifier);
6571
static inline bool _z_sync_group_notifier_check(const _z_sync_group_notifier_t* notifier) {
6672
return !_Z_RC_IS_NULL(&notifier->_state);

include/zenoh-pico/net/liveliness.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,12 @@ z_result_t _z_declare_liveliness_subscriber(_z_subscriber_t *subscriber, const _
4848
const _z_declared_keyexpr_t *keyexpr, _z_closure_sample_callback_t callback,
4949
_z_drop_handler_t dropper, bool history, void *arg);
5050

51+
z_result_t _z_register_liveliness_subscriber(uint32_t *out_sub_id, const _z_session_rc_t *zn,
52+
const _z_declared_keyexpr_t *keyexpr,
53+
_z_closure_sample_callback_t callback, _z_drop_handler_t dropper,
54+
bool history, void *arg,
55+
const _z_sync_group_t *opt_callback_drop_sync_group);
56+
5157
/**
5258
* Undeclare a liveliness :c:type:`_z_subscriber_t`.
5359
*

include/zenoh-pico/net/matching.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,10 @@ typedef struct _z_matching_listener_t {
2727
} _z_matching_listener_t;
2828

2929
#if Z_FEATURE_MATCHING == 1
30-
_z_matching_listener_t _z_matching_listener_declare(const _z_write_filter_ctx_rc_t *ctx, uint32_t id,
31-
_z_closure_matching_status_t *callback);
30+
z_result_t _z_matching_listener_declare(_z_matching_listener_t *listener, _z_session_t *s,
31+
const _z_write_filter_ctx_rc_t *ctx, _z_closure_matching_status_t *callback);
32+
z_result_t _z_matching_listener_register(uint32_t *listener_id, _z_session_t *s, const _z_write_filter_ctx_rc_t *ctx,
33+
_z_closure_matching_status_t *callback);
3234
z_result_t _z_matching_listener_undeclare(_z_matching_listener_t *listener);
3335
// Warning: None of the sub-types require a non-0 initialization. Add a init function if it changes.
3436
static inline _z_matching_listener_t _z_matching_listener_null(void) { return (_z_matching_listener_t){0}; }

include/zenoh-pico/net/primitives.h

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,9 @@ z_result_t _z_write(_z_session_t *zn, const _z_declared_keyexpr_t *keyexpr, _z_b
161161
* zn: The zenoh-net session. The caller keeps its ownership.
162162
* keyexpr: The resource key to subscribe.
163163
* callback: The callback function that will be called each time a data matching the subscribed resource is
164-
* received. arg: A pointer that will be passed to the **callback** on each call.
164+
* received.
165+
* dropper: A function that will be called once subscriber is undeclared.
166+
* arg: A pointer that will be passed to the **callback** on each call.
165167
*
166168
* Returns:
167169
* 0 in case of success, negative error code otherwise.
@@ -170,6 +172,10 @@ z_result_t _z_declare_subscriber(_z_subscriber_t *subscriber, const _z_session_r
170172
const _z_declared_keyexpr_t *keyexpr, _z_closure_sample_callback_t callback,
171173
_z_drop_handler_t dropper, void *arg, z_locality_t allowed_origin);
172174

175+
z_result_t _z_register_subscriber(uint32_t *out_sub_id, const _z_session_rc_t *zn, const _z_declared_keyexpr_t *keyexpr,
176+
_z_closure_sample_callback_t callback, _z_drop_handler_t dropper, void *arg,
177+
z_locality_t allowed_origin, const _z_sync_group_t *opt_callback_drop_sync_group);
178+
173179
/**
174180
* Undeclare a :c:type:`_z_subscriber_t`.
175181
*
@@ -202,6 +208,11 @@ z_result_t _z_declare_queryable(_z_queryable_t *queryable, const _z_session_rc_t
202208
_z_closure_query_callback_t callback, _z_drop_handler_t dropper, void *arg,
203209
z_locality_t allowed_origin);
204210

211+
z_result_t _z_register_queryable(uint32_t *queryable_id, const _z_session_rc_t *zn,
212+
const _z_declared_keyexpr_t *keyexpr, bool complete,
213+
_z_closure_query_callback_t callback, _z_drop_handler_t dropper, void *arg,
214+
z_locality_t allowed_origin, const _z_sync_group_t *sync_group);
215+
205216
/**
206217
* Undeclare a :c:type:`_z_queryable_t`.
207218
*
@@ -292,6 +303,7 @@ z_result_t _z_undeclare_querier(_z_querier_t *querier);
292303
*
293304
* Parameters:
294305
* session: The zenoh-net session.
306+
* querier_id: Optional id of querier.
295307
* keyexpr: The resource key to query.
296308
* parameters: An indication to matching queryables about the queried data.
297309
* parameters_len: Length of the parameters string.
@@ -309,12 +321,12 @@ z_result_t _z_undeclare_querier(_z_querier_t *querier);
309321
* opt_cancellation_token: Optional cancellation token to cancel the query, can be null.
310322
*
311323
*/
312-
z_result_t _z_query(const _z_session_rc_t *session, const _z_declared_keyexpr_t *keyexpr, const char *parameters,
313-
size_t parameters_len, z_query_target_t target, z_consolidation_mode_t consolidation,
314-
_z_bytes_t *payload, _z_encoding_t *encoding, _z_closure_reply_callback_t callback,
315-
_z_drop_handler_t dropper, void *arg, uint64_t timeout_ms, _z_bytes_t *attachment, _z_n_qos_t qos,
316-
_z_source_info_t *source_info, z_locality_t allowed_destination,
317-
_z_cancellation_token_rc_t *opt_cancellation_token);
324+
z_result_t _z_query(const _z_session_rc_t *session, _z_optional_id_t querier_id, const _z_declared_keyexpr_t *keyexpr,
325+
const char *parameters, size_t parameters_len, z_query_target_t target,
326+
z_consolidation_mode_t consolidation, _z_bytes_t *payload, _z_encoding_t *encoding,
327+
_z_closure_reply_callback_t callback, _z_drop_handler_t dropper, void *arg, uint64_t timeout_ms,
328+
_z_bytes_t *attachment, _z_n_qos_t qos, _z_source_info_t *source_info,
329+
z_locality_t allowed_destination, _z_cancellation_token_rc_t *opt_cancellation_token);
318330
#endif
319331

320332
#if Z_FEATURE_INTEREST == 1

include/zenoh-pico/net/query.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ _Z_REFCOUNT_DEFINE(_z_query, _z_query)
6060
*/
6161
typedef struct _z_querier_t {
6262
_z_declared_keyexpr_t _key;
63-
_z_zint_t _id;
63+
uint32_t _id;
6464
_z_session_weak_t _zn;
6565
_z_encoding_t _encoding;
6666
z_consolidation_mode_t _consolidation_mode;
@@ -86,6 +86,7 @@ static inline bool _z_querier_check(const _z_querier_t *querier) { return !_Z_RC
8686
typedef struct {
8787
uint32_t _entity_id;
8888
_z_session_weak_t _zn;
89+
_z_sync_group_t _callback_drop_sync_group;
8990
} _z_queryable_t;
9091

9192
#if Z_FEATURE_QUERYABLE == 1

include/zenoh-pico/net/session.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ typedef struct _z_session_t {
126126
uint32_t _admin_space_queryable_id;
127127
#endif
128128
#endif
129+
_z_sync_group_t _callback_drop_sync_group;
129130
} _z_session_t;
130131

131132
/**

include/zenoh-pico/net/subscribe.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ extern "C" {
3030
typedef struct {
3131
uint32_t _entity_id;
3232
_z_session_weak_t _zn;
33+
_z_sync_group_t _callback_drop_sync_group;
3334
} _z_subscriber_t;
3435

3536
#if Z_FEATURE_SUBSCRIPTION == 1

include/zenoh-pico/session/query.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ z_result_t _z_trigger_query_reply_err(_z_session_t *zn, _z_zint_t id, _z_msg_err
3434
_z_entity_global_id_t *replier_id);
3535
z_result_t _z_trigger_query_reply_final(_z_session_t *zn, _z_zint_t id);
3636
void _z_unregister_pending_query(_z_session_t *zn, _z_zint_t query_id);
37+
void _z_unregister_pending_queries_from_querier(_z_session_t *zn, uint32_t querier_id);
3738
void _z_flush_pending_queries(_z_session_t *zn);
3839
#endif
3940

include/zenoh-pico/session/session.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ typedef struct {
8282
_z_closure_sample_callback_t _callback;
8383
_z_drop_handler_t _dropper;
8484
void *_arg;
85+
_z_sync_group_notifier_t _session_callback_drop_notifier;
86+
_z_sync_group_notifier_t _subscriber_callback_drop_notifier;
8587
} _z_subscription_t;
8688

8789
bool _z_subscription_eq(const _z_subscription_t *one, const _z_subscription_t *two);
@@ -115,6 +117,8 @@ typedef struct {
115117
void *_arg;
116118
bool _complete;
117119
z_locality_t _allowed_origin;
120+
_z_sync_group_notifier_t _session_callback_drop_notifier;
121+
_z_sync_group_notifier_t _queryable_callback_drop_notifier;
118122
} _z_session_queryable_t;
119123

120124
bool _z_session_queryable_eq(const _z_session_queryable_t *one, const _z_session_queryable_t *two);
@@ -166,6 +170,7 @@ void _z_pending_query_cancellation_data_clear(_z_pending_query_cancellation_data
166170
#endif
167171
struct _z_pending_query_t {
168172
_z_keyexpr_t _key;
173+
_z_optional_id_t _querier_id;
169174
_z_zint_t _id;
170175
_z_closure_reply_callback_t _callback;
171176
_z_drop_handler_t _dropper;

src/api/api.c

Lines changed: 61 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -737,6 +737,7 @@ static z_result_t _z_session_rc_init(z_owned_session_t *zs, _z_id_t *zid) {
737737
}
738738

739739
z_result_t z_open(z_owned_session_t *zs, z_moved_config_t *config, const z_open_options_t *options) {
740+
z_internal_session_null(zs);
740741
#if Z_FEATURE_MULTI_THREAD == 1
741742
z_open_options_t opts;
742743
if (options == NULL) {
@@ -1337,26 +1338,32 @@ z_entity_global_id_t z_publisher_id(const z_loaned_publisher_t *publisher) {
13371338
#if Z_FEATURE_MATCHING == 1
13381339
z_result_t z_publisher_declare_background_matching_listener(const z_loaned_publisher_t *publisher,
13391340
z_moved_closure_matching_status_t *callback) {
1340-
z_owned_matching_listener_t listener;
1341-
_Z_RETURN_IF_ERR(z_publisher_declare_matching_listener(publisher, &listener, callback));
1342-
_z_matching_listener_clear(&listener._val);
1343-
return _Z_RES_OK;
1341+
return z_publisher_declare_matching_listener(publisher, NULL, callback);
13441342
}
13451343

13461344
z_result_t z_publisher_declare_matching_listener(const z_loaned_publisher_t *publisher,
13471345
z_owned_matching_listener_t *matching_listener,
13481346
z_moved_closure_matching_status_t *callback) {
1349-
matching_listener->_val = _z_matching_listener_null();
13501347
_z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&publisher->_zn);
1348+
z_result_t ret = _Z_RES_OK;
13511349
if (!_Z_RC_IS_NULL(&sess_rc)) {
1352-
matching_listener->_val = _z_matching_listener_declare(
1353-
&publisher->_filter.ctx, _z_get_entity_id(_Z_RC_IN_VAL(&sess_rc)), &callback->_this._val);
1350+
if (matching_listener != NULL) {
1351+
matching_listener->_val = _z_matching_listener_null();
1352+
ret = _z_matching_listener_declare(&matching_listener->_val, _Z_RC_IN_VAL(&sess_rc),
1353+
&publisher->_filter.ctx, &callback->_this._val);
1354+
} else {
1355+
uint32_t _listener_id;
1356+
ret = _z_matching_listener_register(&_listener_id, _Z_RC_IN_VAL(&sess_rc), &publisher->_filter.ctx,
1357+
&callback->_this._val);
1358+
}
13541359
_z_session_rc_drop(&sess_rc);
13551360
} else {
1356-
matching_listener->_val = _z_matching_listener_null();
1357-
z_internal_closure_matching_status_null(&callback->_this);
1361+
if (matching_listener != NULL) {
1362+
matching_listener->_val = _z_matching_listener_null();
1363+
}
1364+
ret = _Z_ERR_GENERIC;
13581365
}
1359-
return _z_matching_listener_check(&matching_listener->_val) ? _Z_RES_OK : _Z_ERR_GENERIC;
1366+
return ret;
13601367
}
13611368

13621369
z_result_t z_publisher_get_matching_status(const z_loaned_publisher_t *publisher,
@@ -1433,10 +1440,10 @@ z_result_t z_get_with_parameters_substr(const z_loaned_session_t *zs, const z_lo
14331440
#if Z_FEATURE_LOCAL_QUERYABLE == 1
14341441
allowed_destination = opt.allowed_destination;
14351442
#endif
1436-
ret = _z_query(zs, keyexpr, parameters, parameters_len, opt.target, opt.consolidation.mode,
1437-
_z_bytes_from_moved(opt.payload), _z_encoding_from_moved(opt.encoding), closure.call, closure.drop,
1438-
closure.context, opt.timeout_ms, _z_bytes_from_moved(opt.attachment), qos, source_info,
1439-
allowed_destination, cancellation_token);
1443+
ret = _z_query(zs, _z_optional_id_make_none(), keyexpr, parameters, parameters_len, opt.target,
1444+
opt.consolidation.mode, _z_bytes_from_moved(opt.payload), _z_encoding_from_moved(opt.encoding),
1445+
closure.call, closure.drop, closure.context, opt.timeout_ms, _z_bytes_from_moved(opt.attachment),
1446+
qos, source_info, allowed_destination, cancellation_token);
14401447
// Clean-up
14411448
#ifdef Z_FEATURE_UNSTABLE_API
14421449
z_cancellation_token_drop(opt.cancellation_token);
@@ -1557,10 +1564,11 @@ z_result_t z_querier_get_with_parameters_substr(const z_loaned_querier_t *querie
15571564
if (should_proceed) {
15581565
_z_n_qos_t qos = _z_n_qos_make(querier->_is_express, querier->_congestion_control == Z_CONGESTION_CONTROL_BLOCK,
15591566
querier->_priority);
1560-
ret = _z_query(&sess_rc, &querier->_key, parameters, parameters_len, querier->_target,
1561-
querier->_consolidation_mode, _z_bytes_from_moved(opt.payload), &encoding, closure.call,
1562-
closure.drop, closure.context, querier->_timeout_ms, _z_bytes_from_moved(opt.attachment), qos,
1563-
source_info, querier->_allowed_destination, cancellation_token);
1567+
ret = _z_query(&sess_rc, _z_optional_id_make_some(querier->_id), &querier->_key, parameters, parameters_len,
1568+
querier->_target, querier->_consolidation_mode, _z_bytes_from_moved(opt.payload), &encoding,
1569+
closure.call, closure.drop, closure.context, querier->_timeout_ms,
1570+
_z_bytes_from_moved(opt.attachment), qos, source_info, querier->_allowed_destination,
1571+
cancellation_token);
15641572
} else if (closure.drop != NULL) {
15651573
closure.drop(closure.context);
15661574
}
@@ -1613,25 +1621,31 @@ z_entity_global_id_t z_querier_id(const z_loaned_querier_t *querier) {
16131621
#if Z_FEATURE_MATCHING == 1
16141622
z_result_t z_querier_declare_background_matching_listener(const z_loaned_querier_t *querier,
16151623
z_moved_closure_matching_status_t *callback) {
1616-
z_owned_matching_listener_t listener;
1617-
_Z_RETURN_IF_ERR(z_querier_declare_matching_listener(querier, &listener, callback));
1618-
_z_matching_listener_clear(&listener._val);
1619-
return _Z_RES_OK;
1624+
return z_querier_declare_matching_listener(querier, NULL, callback);
16201625
}
16211626
z_result_t z_querier_declare_matching_listener(const z_loaned_querier_t *querier,
16221627
z_owned_matching_listener_t *matching_listener,
16231628
z_moved_closure_matching_status_t *callback) {
1624-
matching_listener->_val = _z_matching_listener_null();
16251629
_z_session_rc_t sess_rc = _z_session_weak_upgrade_if_open(&querier->_zn);
1630+
z_result_t ret = _Z_RES_OK;
16261631
if (!_Z_RC_IS_NULL(&sess_rc)) {
1627-
matching_listener->_val = _z_matching_listener_declare(
1628-
&querier->_filter.ctx, _z_get_entity_id(_Z_RC_IN_VAL(&sess_rc)), &callback->_this._val);
1632+
if (matching_listener != NULL) {
1633+
matching_listener->_val = _z_matching_listener_null();
1634+
ret = _z_matching_listener_declare(&matching_listener->_val, _Z_RC_IN_VAL(&sess_rc), &querier->_filter.ctx,
1635+
&callback->_this._val);
1636+
} else {
1637+
uint32_t _listener_id;
1638+
ret = _z_matching_listener_register(&_listener_id, _Z_RC_IN_VAL(&sess_rc), &querier->_filter.ctx,
1639+
&callback->_this._val);
1640+
}
16291641
_z_session_rc_drop(&sess_rc);
16301642
} else {
1631-
matching_listener->_val = _z_matching_listener_null();
1632-
z_internal_closure_matching_status_null(&callback->_this);
1643+
if (matching_listener != NULL) {
1644+
matching_listener->_val = _z_matching_listener_null();
1645+
}
1646+
ret = _Z_ERR_GENERIC;
16331647
}
1634-
return _z_matching_listener_check(&matching_listener->_val) ? _Z_RES_OK : _Z_ERR_GENERIC;
1648+
return ret;
16351649
}
16361650

16371651
z_result_t z_querier_get_matching_status(const z_loaned_querier_t *querier, z_matching_status_t *matching_status) {
@@ -1691,10 +1705,7 @@ void z_queryable_options_default(z_queryable_options_t *options) {
16911705

16921706
z_result_t z_declare_background_queryable(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr,
16931707
z_moved_closure_query_t *callback, const z_queryable_options_t *options) {
1694-
z_owned_queryable_t qle;
1695-
_Z_RETURN_IF_ERR(z_declare_queryable(zs, &qle, keyexpr, callback, options));
1696-
_z_queryable_clear(&qle._val);
1697-
return _Z_RES_OK;
1708+
return z_declare_queryable(zs, NULL, keyexpr, callback, options);
16981709
}
16991710

17001711
z_result_t z_declare_queryable(const z_loaned_session_t *zs, z_owned_queryable_t *queryable,
@@ -1713,8 +1724,14 @@ z_result_t z_declare_queryable(const z_loaned_session_t *zs, z_owned_queryable_t
17131724
#if Z_FEATURE_LOCAL_QUERYABLE == 1
17141725
allowed_origin = opt.allowed_origin;
17151726
#endif
1716-
return _z_declare_queryable(&queryable->_val, zs, keyexpr, opt.complete, closure.call, closure.drop,
1717-
closure.context, allowed_origin);
1727+
if (queryable != NULL) {
1728+
return _z_declare_queryable(&queryable->_val, zs, keyexpr, opt.complete, closure.call, closure.drop,
1729+
closure.context, allowed_origin);
1730+
} else {
1731+
uint32_t _queryable_id;
1732+
return _z_register_queryable(&_queryable_id, zs, keyexpr, opt.complete, closure.call, closure.drop,
1733+
closure.context, allowed_origin, NULL);
1734+
}
17181735
}
17191736

17201737
z_result_t z_undeclare_queryable(z_moved_queryable_t *queryable) {
@@ -1979,18 +1996,14 @@ void z_subscriber_options_default(z_subscriber_options_t *options) {
19791996

19801997
z_result_t z_declare_background_subscriber(const z_loaned_session_t *zs, const z_loaned_keyexpr_t *keyexpr,
19811998
z_moved_closure_sample_t *callback, const z_subscriber_options_t *options) {
1982-
z_owned_subscriber_t sub;
1983-
_Z_RETURN_IF_ERR(z_declare_subscriber(zs, &sub, keyexpr, callback, options));
1984-
_z_subscriber_clear(&sub._val);
1985-
return _Z_RES_OK;
1999+
return z_declare_subscriber(zs, NULL, keyexpr, callback, options);
19862000
}
19872001

19882002
z_result_t z_declare_subscriber(const z_loaned_session_t *zs, z_owned_subscriber_t *sub,
19892003
const z_loaned_keyexpr_t *keyexpr, z_moved_closure_sample_t *callback,
19902004
const z_subscriber_options_t *options) {
19912005
_z_closure_sample_t closure = callback->_this._val;
19922006
z_internal_closure_sample_null(&callback->_this);
1993-
sub->_val = _z_subscriber_null();
19942007

19952008
z_subscriber_options_t opt;
19962009
z_subscriber_options_default(&opt);
@@ -2002,7 +2015,15 @@ z_result_t z_declare_subscriber(const z_loaned_session_t *zs, z_owned_subscriber
20022015
#if Z_FEATURE_LOCAL_SUBSCRIBER == 1
20032016
allowed_origin = opt.allowed_origin;
20042017
#endif
2005-
return _z_declare_subscriber(&sub->_val, zs, keyexpr, closure.call, closure.drop, closure.context, allowed_origin);
2018+
if (sub != NULL) {
2019+
sub->_val = _z_subscriber_null();
2020+
return _z_declare_subscriber(&sub->_val, zs, keyexpr, closure.call, closure.drop, closure.context,
2021+
allowed_origin);
2022+
} else {
2023+
uint32_t _sub_id;
2024+
return _z_register_subscriber(&_sub_id, zs, keyexpr, closure.call, closure.drop, closure.context,
2025+
allowed_origin, NULL);
2026+
}
20062027
}
20072028

20082029
z_result_t z_undeclare_subscriber(z_moved_subscriber_t *sub) {

0 commit comments

Comments
 (0)