Skip to content

Commit

Permalink
Honor ignore_local_publications in subscription options (backport #508)…
Browse files Browse the repository at this point in the history
… (#514)

Signed-off-by: Alejandro Hernandez Cordero <[email protected]>
Co-authored-by: Alejandro Hernández Cordero <[email protected]>
  • Loading branch information
mergify[bot] and ahcorde authored Mar 7, 2025
1 parent 399eac9 commit 10dce88
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 9 deletions.
6 changes: 4 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_node_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,8 @@ bool NodeData::create_sub_data(
std::size_t id,
const std::string & topic_name,
const rosidl_message_type_support_t * type_support,
const rmw_qos_profile_t * qos_profile)
const rmw_qos_profile_t * qos_profile,
const rmw_subscription_options_t & sub_options)
{
std::lock_guard<std::recursive_mutex> lock_guard(mutex_);
if (is_shutdown_) {
Expand All @@ -215,7 +216,8 @@ bool NodeData::create_sub_data(
std::move(id),
std::move(topic_name),
type_support,
qos_profile);
qos_profile,
sub_options);
if (sub_data == nullptr) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
Expand Down
3 changes: 2 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_node_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ class NodeData final
std::size_t id,
const std::string & topic_name,
const rosidl_message_type_support_t * type_support,
const rmw_qos_profile_t * qos_profile);
const rmw_qos_profile_t * qos_profile,
const rmw_subscription_options_t & sub_options);

// Retrieve the SubscriptionData for a given rmw_subscription_t if present.
SubscriptionDataPtr get_sub_data(const rmw_subscription_t * const subscription);
Expand Down
20 changes: 17 additions & 3 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ std::shared_ptr<SubscriptionData> SubscriptionData::make(
std::size_t subscription_id,
const std::string & topic_name,
const rosidl_message_type_support_t * type_support,
const rmw_qos_profile_t * qos_profile)
const rmw_qos_profile_t * qos_profile,
const rmw_subscription_options_t & sub_options)
{
rmw_qos_profile_t adapted_qos_profile = *qos_profile;
rmw_ret_t ret = QoS::get().best_available_qos(
Expand Down Expand Up @@ -107,7 +108,8 @@ std::shared_ptr<SubscriptionData> SubscriptionData::make(
std::move(entity),
std::move(session),
type_support->data,
std::move(message_type_support)
std::move(message_type_support),
sub_options
});

if (!sub_data->init()) {
Expand All @@ -125,13 +127,15 @@ SubscriptionData::SubscriptionData(
std::shared_ptr<liveliness::Entity> entity,
std::shared_ptr<zenoh::Session> session,
const void * type_support_impl,
std::unique_ptr<MessageTypeSupport> type_support)
std::unique_ptr<MessageTypeSupport> type_support,
rmw_subscription_options_t sub_options)
: rmw_node_(rmw_node),
graph_cache_(std::move(graph_cache)),
entity_(std::move(entity)),
sess_(std::move(session)),
type_support_impl_(type_support_impl),
type_support_(std::move(type_support)),
sub_options_(std::move(sub_options)),
last_known_published_msg_({}),
wait_set_data_(nullptr),
is_shutdown_(false),
Expand All @@ -140,6 +144,7 @@ SubscriptionData::SubscriptionData(
events_mgr_ = std::make_shared<EventsManager>();
}

///=============================================================================
// We have to use an "init" function here, rather than do this in the constructor, because we use
// enable_shared_from_this, which is not available in constructors.
bool SubscriptionData::init()
Expand All @@ -164,6 +169,15 @@ bool SubscriptionData::init()
zenoh::ext::SessionExt::QueryingSubscriberOptions::create_default();
const std::string selector = "*/" + entity_->topic_info()->topic_keyexpr_;
zenoh::KeyExpr selector_ke(selector);

// By default, this subscription will receive publications from publishers within and outside of
// the same Zenoh session as this subscription.
// If ignore_local_publications is true, we restrict this subscription to only receive samples
// from publishers in remote sessions.
if (sub_options_.ignore_local_publications) {
sub_options.allowed_origin = ZC_LOCALITY_REMOTE;
}

sub_options.query_keyexpr = std::move(selector_ke);
// Tell the PublicationCache's Queryable that the query accepts any key expression as a reply.
// By default a query accepts only replies that matches its query selector.
Expand Down
8 changes: 6 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ class SubscriptionData final : public std::enable_shared_from_this<SubscriptionD
std::size_t Subscription_id,
const std::string & topic_name,
const rosidl_message_type_support_t * type_support,
const rmw_qos_profile_t * qos_profile);
const rmw_qos_profile_t * qos_profile,
const rmw_subscription_options_t & sub_options);

// Get a copy of the keyexpr_hash of this SubscriptionData's liveliness::Entity.
std::size_t keyexpr_hash() const;
Expand Down Expand Up @@ -124,7 +125,8 @@ class SubscriptionData final : public std::enable_shared_from_this<SubscriptionD
std::shared_ptr<liveliness::Entity> entity,
std::shared_ptr<zenoh::Session> session,
const void * type_support_impl,
std::unique_ptr<MessageTypeSupport> type_support);
std::unique_ptr<MessageTypeSupport> type_support,
rmw_subscription_options_t sub_options);

bool init();

Expand All @@ -145,6 +147,8 @@ class SubscriptionData final : public std::enable_shared_from_this<SubscriptionD
// Type support fields
const void * type_support_impl_;
std::unique_ptr<MessageTypeSupport> type_support_;
// Subscription options.
rmw_subscription_options_t sub_options_;
std::deque<std::unique_ptr<Message>> message_queue_;
// Map GID of a subscription to the sequence number of the message it published.
std::unordered_map<size_t, int64_t> last_known_published_msg_;
Expand Down
3 changes: 2 additions & 1 deletion rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,8 @@ rmw_create_subscription(
context_impl->get_next_entity_id(),
topic_name,
type_support,
qos_profile))
qos_profile,
*subscription_options))
{
// Error already handled.
return nullptr;
Expand Down

0 comments on commit 10dce88

Please sign in to comment.