Skip to content

Commit

Permalink
Honor ignore_local_publications in subscription options (#508) (#513)
Browse files Browse the repository at this point in the history
make linters happy

Add comment on ZC_LOCALITY_REMOTE

Signed-off-by: Yadunund <[email protected]>
(cherry picked from commit f6a4fde)

Co-authored-by: Alejandro Hernández Cordero <[email protected]>
  • Loading branch information
mergify[bot] and ahcorde authored Mar 7, 2025
1 parent 0323702 commit e0860d4
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 9 deletions.
6 changes: 4 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_node_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,8 @@ bool NodeData::create_sub_data(
std::size_t id,
const std::string & topic_name,
const rosidl_message_type_support_t * type_support,
const rmw_qos_profile_t * qos_profile)
const rmw_qos_profile_t * qos_profile,
const rmw_subscription_options_t & sub_options)
{
std::lock_guard<std::recursive_mutex> lock_guard(mutex_);
if (is_shutdown_) {
Expand All @@ -216,7 +217,8 @@ bool NodeData::create_sub_data(
std::move(id),
std::move(topic_name),
type_support,
qos_profile);
qos_profile,
sub_options);
if (sub_data == nullptr) {
RMW_ZENOH_LOG_ERROR_NAMED(
"rmw_zenoh_cpp",
Expand Down
3 changes: 2 additions & 1 deletion rmw_zenoh_cpp/src/detail/rmw_node_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ class NodeData final
std::size_t id,
const std::string & topic_name,
const rosidl_message_type_support_t * type_support,
const rmw_qos_profile_t * qos_profile);
const rmw_qos_profile_t * qos_profile,
const rmw_subscription_options_t & sub_options);

// Retrieve the SubscriptionData for a given rmw_subscription_t if present.
SubscriptionDataPtr get_sub_data(const rmw_subscription_t * const subscription);
Expand Down
19 changes: 16 additions & 3 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ std::shared_ptr<SubscriptionData> SubscriptionData::make(
std::size_t subscription_id,
const std::string & topic_name,
const rosidl_message_type_support_t * type_support,
const rmw_qos_profile_t * qos_profile)
const rmw_qos_profile_t * qos_profile,
const rmw_subscription_options_t & sub_options)
{
rmw_qos_profile_t adapted_qos_profile = *qos_profile;
rmw_ret_t ret = QoS::get().best_available_qos(
Expand Down Expand Up @@ -120,7 +121,8 @@ std::shared_ptr<SubscriptionData> SubscriptionData::make(
std::move(entity),
std::move(session),
type_support->data,
std::move(message_type_support)
std::move(message_type_support),
sub_options
});

if (!sub_data->init()) {
Expand All @@ -138,13 +140,15 @@ SubscriptionData::SubscriptionData(
std::shared_ptr<liveliness::Entity> entity,
std::shared_ptr<zenoh::Session> session,
const void * type_support_impl,
std::unique_ptr<MessageTypeSupport> type_support)
std::unique_ptr<MessageTypeSupport> type_support,
rmw_subscription_options_t sub_options)
: rmw_node_(rmw_node),
graph_cache_(std::move(graph_cache)),
entity_(std::move(entity)),
sess_(std::move(session)),
type_support_impl_(type_support_impl),
type_support_(std::move(type_support)),
sub_options_(std::move(sub_options)),
last_known_published_msg_({}),
wait_set_data_(nullptr),
is_shutdown_(false),
Expand All @@ -153,6 +157,7 @@ SubscriptionData::SubscriptionData(
events_mgr_ = std::make_shared<EventsManager>();
}

///=============================================================================
// We have to use an "init" function here, rather than do this in the constructor, because we use
// enable_shared_from_this, which is not available in constructors.
bool SubscriptionData::init()
Expand All @@ -171,6 +176,14 @@ bool SubscriptionData::init()
using AdvancedSubscriberOptions = zenoh::ext::SessionExt::AdvancedSubscriberOptions;
auto adv_sub_opts = AdvancedSubscriberOptions::create_default();

// By default, this subscription will receive publications from publishers within and outside of
// the same Zenoh session as this subscription.
// If ignore_local_publications is true, we restrict this subscription to only receive samples
// from publishers in remote sessions.
if (sub_options_.ignore_local_publications) {
adv_sub_opts.subscriber_options.allowed_origin = ZC_LOCALITY_REMOTE;
}

// Instantiate the subscription with suitable options depending on the
// adapted_qos_profile.
if (entity_->topic_info()->qos_.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) {
Expand Down
8 changes: 6 additions & 2 deletions rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ class SubscriptionData final : public std::enable_shared_from_this<SubscriptionD
std::size_t Subscription_id,
const std::string & topic_name,
const rosidl_message_type_support_t * type_support,
const rmw_qos_profile_t * qos_profile);
const rmw_qos_profile_t * qos_profile,
const rmw_subscription_options_t & sub_options);

// Get a copy of the keyexpr_hash of this SubscriptionData's liveliness::Entity.
std::size_t keyexpr_hash() const;
Expand Down Expand Up @@ -124,7 +125,8 @@ class SubscriptionData final : public std::enable_shared_from_this<SubscriptionD
std::shared_ptr<liveliness::Entity> entity,
std::shared_ptr<zenoh::Session> session,
const void * type_support_impl,
std::unique_ptr<MessageTypeSupport> type_support);
std::unique_ptr<MessageTypeSupport> type_support,
rmw_subscription_options_t sub_options);

bool init();

Expand All @@ -145,6 +147,8 @@ class SubscriptionData final : public std::enable_shared_from_this<SubscriptionD
// Type support fields
const void * type_support_impl_;
std::unique_ptr<MessageTypeSupport> type_support_;
// Subscription options.
rmw_subscription_options_t sub_options_;
std::deque<std::unique_ptr<Message>> message_queue_;
// Map GID of a subscription to the sequence number of the message it published.
std::unordered_map<size_t, int64_t> last_known_published_msg_;
Expand Down
3 changes: 2 additions & 1 deletion rmw_zenoh_cpp/src/rmw_zenoh.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,8 @@ rmw_create_subscription(
context_impl->get_next_entity_id(),
topic_name,
type_support,
qos_profile))
qos_profile,
*subscription_options))
{
// Error already handled.
return nullptr;
Expand Down

0 comments on commit e0860d4

Please sign in to comment.