Skip to content

Commit 17d1655

Browse files
add synchronization for callback drop for advanced subscribers
1 parent 5ac5ddc commit 17d1655

File tree

10 files changed

+243
-262
lines changed

10 files changed

+243
-262
lines changed

include/zenoh-pico/api/advanced_subscriber.h

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,22 @@ _Z_HASHMAP_DEFINE(_z_entity_global_id, _ze_advanced_subscriber_sequenced_state,
8282
_ze_advanced_subscriber_sequenced_state_t)
8383
_Z_HASHMAP_DEFINE(_z_id, _ze_advanced_subscriber_timestamped_state, z_id_t, _ze_advanced_subscriber_timestamped_state_t)
8484

85-
static void _ze_closure_miss_copy(_ze_closure_miss_t *dst, const _ze_closure_miss_t *src) { *dst = *src; }
85+
static inline _ze_closure_miss_t _ze_closure_miss_null(void) {
86+
_ze_closure_miss_t miss = {0};
87+
return miss;
88+
}
89+
static inline void _ze_closure_miss_drop(_ze_closure_miss_t *closure) {
90+
if (closure->drop != NULL) {
91+
closure->drop(closure->context);
92+
}
93+
*closure = _ze_closure_miss_null();
94+
}
95+
96+
static inline void _ze_closure_miss_copy(_ze_closure_miss_t *dst, const _ze_closure_miss_t *src) { *dst = *src; }
97+
static inline void _ze_closure_miss_move(_ze_closure_miss_t *dst, _ze_closure_miss_t *src) {
98+
*dst = *src;
99+
*src = _ze_closure_miss_null();
100+
}
86101

87102
_Z_ELEM_DEFINE(_ze_closure_miss, _ze_closure_miss_t, _z_noop_size, _z_noop_clear, _ze_closure_miss_copy, _z_noop_move,
88103
_z_noop_eq, _z_noop_cmp, _z_noop_hash)
@@ -111,6 +126,8 @@ typedef struct {
111126
_ze_closure_miss_intmap_t _miss_handlers;
112127
bool _has_token;
113128
z_owned_liveliness_token_t _token;
129+
z_owned_cancellation_token_t _cancellation_token;
130+
bool _is_undeclaring;
114131
} _ze_advanced_subscriber_state_t;
115132

116133
_ze_advanced_subscriber_state_t _ze_advanced_subscriber_state_null(void);

include/zenoh-pico/net/session.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,7 @@ z_result_t _zp_process_periodic_tasks(_z_session_t *z);
261261
* Returns:
262262
* ``0`` in case of success, ``negative`` in case of failure.
263263
*/
264-
z_result_t _zp_periodic_task_add(_z_session_t *z, const _zp_closure_periodic_task_t *closure, uint64_t period_ms,
264+
z_result_t _zp_periodic_task_add(_z_session_t *z, _zp_closure_periodic_task_t *closure, uint64_t period_ms,
265265
uint32_t *id);
266266

267267
/*

include/zenoh-pico/utils/result.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ typedef enum {
4444
_Z_NO_DATA_PROCESSED = 3,
4545
Z_NO_DATA_PROCESSED = 3,
4646
_Z_RESOURCE_POSITIVE_REF_COUNT = 4,
47-
Z_CANCELLATION_TOKEN_ALREADY_CANCELLED = 5,
4847
Z_SYNC_GROUP_CLOSED = 6,
4948

5049
_Z_ERR_MESSAGE_DESERIALIZATION_FAILED = -119,
@@ -94,6 +93,7 @@ typedef enum {
9493
Z_EDESERIALIZE = -72,
9594
Z_ETIMEDOUT = -71,
9695
_Z_ERR_KEYEXPR_DECLARED_ON_ANOTHER_SESSION = -70,
96+
Z_ERR_CANCELLED = -69,
9797

9898
_Z_ERR_NULL = -127,
9999
_Z_ERR_GENERIC = -128

include/zenoh-pico/utils/scheduler.h

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,18 @@ typedef struct {
3939
z_closure_drop_callback_t drop;
4040
} _zp_closure_periodic_task_t;
4141

42+
static inline _zp_closure_periodic_task_t _zp_closure_periodic_task_null() {
43+
_zp_closure_periodic_task_t c = {0};
44+
return c;
45+
}
46+
47+
static inline void _zp_closure_periodic_task_drop(_zp_closure_periodic_task_t *closure) {
48+
if (closure->drop != NULL) {
49+
closure->drop(closure->context);
50+
};
51+
*closure = _zp_closure_periodic_task_null();
52+
}
53+
4254
typedef struct {
4355
uint32_t _id;
4456
uint64_t _period_ms;
@@ -101,7 +113,7 @@ static inline bool _zp_periodic_scheduler_check(const _zp_periodic_scheduler_t *
101113
}
102114
z_result_t _zp_periodic_scheduler_init(_zp_periodic_scheduler_t *scheduler);
103115
void _zp_periodic_scheduler_clear(_zp_periodic_scheduler_t *scheduler);
104-
z_result_t _zp_periodic_scheduler_add(_zp_periodic_scheduler_t *scheduler, const _zp_closure_periodic_task_t *closure,
116+
z_result_t _zp_periodic_scheduler_add(_zp_periodic_scheduler_t *scheduler, _zp_closure_periodic_task_t *closure,
105117
uint64_t period_ms, uint32_t *id);
106118
z_result_t _zp_periodic_scheduler_remove(_zp_periodic_scheduler_t *scheduler, uint32_t id);
107119

src/api/advanced_publisher.c

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -357,9 +357,7 @@ z_result_t ze_declare_advanced_publisher(const z_loaned_session_t *zs, ze_owned_
357357
_Z_CLEAN_RETURN_IF_ERR(
358358
_zp_periodic_task_add(_Z_RC_IN_VAL(zs), &closure, opt.sample_miss_detection.heartbeat_period_ms,
359359
&state->_state_publisher_task_id),
360-
361360
z_keyexpr_drop(z_keyexpr_move(&ke));
362-
_ze_advanced_publisher_state_weak_drop(ctx); z_free(ctx);
363361
_ze_advanced_publisher_state_rc_drop(&pub->_val._state);
364362
z_publisher_drop(z_publisher_move(&pub->_val._publisher));
365363
z_liveliness_token_drop(z_liveliness_token_move(&pub->_val._liveliness));

0 commit comments

Comments
 (0)