Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 22 additions & 11 deletions src/session/liveliness.c
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,11 @@ static z_result_t _z_liveliness_pending_query_reply(_z_session_t *zn, uint32_t i
_Z_RETURN_IF_ERR(_z_get_keyexpr_from_wireexpr(zn, &ke, wireexpr, peer, true));
z_result_t ret = _Z_RES_OK;

_Z_RETURN_IF_ERR(_z_session_mutex_lock_if_open(zn));
// Remember callback and its argument for dispatch after session lock
_z_closure_reply_callback_t reply_cb = NULL;
void *cb_arg = NULL;

_Z_CLEAN_RETURN_IF_ERR(_z_session_mutex_lock_if_open(zn), _z_keyexpr_clear(&ke));

const _z_liveliness_pending_query_t *pq =
_z_liveliness_pending_query_intmap_get(&zn->_liveliness_pending_queries, interest_id);
Expand All @@ -178,20 +182,27 @@ static z_result_t _z_liveliness_pending_query_reply(_z_session_t *zn, uint32_t i
}

if (ret == _Z_RES_OK) {
_z_encoding_t encoding = _z_encoding_null();
_z_bytes_t payload = _z_bytes_null();
_z_bytes_t attachment = _z_bytes_null();
_z_source_info_t source_info = _z_source_info_null();
_z_reply_t reply;
_z_reply_steal_data(&reply, &ke, _z_entity_global_id_null(), &payload, timestamp, &encoding,
Z_SAMPLE_KIND_PUT, &attachment, &source_info);

pq->_callback(&reply, pq->_arg);
_z_reply_clear(&reply);
// Capture the callback pointer and arg still under session lock
reply_cb = pq->_callback;
cb_arg = pq->_arg;
}
}

_z_session_mutex_unlock(zn);

// Invoke callback after session lock to prevent deadlock
if (reply_cb != NULL) {
_z_encoding_t encoding = _z_encoding_null();
_z_bytes_t payload = _z_bytes_null();
_z_bytes_t attachment = _z_bytes_null();
_z_source_info_t source_info = _z_source_info_null();
_z_reply_t reply;
_z_reply_steal_data(&reply, &ke, _z_entity_global_id_null(), &payload, timestamp, &encoding,
Z_SAMPLE_KIND_PUT, &attachment, &source_info);
reply_cb(&reply, cb_arg);
_z_reply_clear(&reply);
}

_z_keyexpr_clear(&ke);

return ret;
Expand Down
37 changes: 29 additions & 8 deletions src/session/query.c
Original file line number Diff line number Diff line change
Expand Up @@ -236,23 +236,44 @@ z_result_t _z_trigger_query_reply_final(_z_session_t *zn, _z_zint_t id) {

bool do_finalize = (pen_qry->_remaining_finals == 0);

if (pen_qry->_consolidation == Z_CONSOLIDATION_MODE_LATEST && do_finalize) {
while (pen_qry->_pending_replies != NULL) {
_z_pending_reply_t *pen_rep = _z_pending_reply_slist_value(pen_qry->_pending_replies);
// Extract pending callbacks under lock and dispatch afterwards
_z_pending_reply_slist_t *pending_replies = NULL;
_z_closure_reply_callback_t reply_cb = NULL;
_z_drop_handler_t drop_cb = NULL;
void *cb_arg = NULL;

// Trigger the query handler
_Z_DEBUG("deliver pending reply in final id=%jd", (intmax_t)id);
pen_qry->_callback(&pen_rep->_reply, pen_qry->_arg);
pen_qry->_pending_replies = _z_pending_reply_slist_pop(pen_qry->_pending_replies);
}
if (pen_qry->_consolidation == Z_CONSOLIDATION_MODE_LATEST && do_finalize) {
// Steal the buffered replies; we own this list now.
pending_replies = pen_qry->_pending_replies;
pen_qry->_pending_replies = NULL;
reply_cb = pen_qry->_callback;
}

// Finalize query if requested: drop pending query and trigger dropper callback,
// which is equivalent to a reply with FINAL.
if (do_finalize) {
// Steal dropper callback for invocation after session lock
drop_cb = pen_qry->_dropper;
pen_qry->_dropper = NULL;
cb_arg = pen_qry->_arg;

zn->_pending_queries =
_z_pending_query_slist_drop_first_filter(zn->_pending_queries, _z_pending_query_eq, pen_qry);
}
_z_session_mutex_unlock(zn);

// Dispatch user callbacks WITHOUT holding the mutex.
while (pending_replies != NULL) {
_z_pending_reply_t *pen_rep = _z_pending_reply_slist_value(pending_replies);

// Trigger the query handler
_Z_DEBUG("deliver pending reply in final id=%jd", (intmax_t)id);
reply_cb(&pen_rep->_reply, cb_arg);
pending_replies = _z_pending_reply_slist_pop(pending_replies);
}
if (drop_cb != NULL) {
drop_cb(cb_arg);
}
return _Z_RES_OK;
}

Expand Down
Loading