diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp index 23ca5e78..534b2341 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.cpp @@ -190,7 +190,8 @@ bool NodeData::create_sub_data( std::size_t id, const std::string & topic_name, const rosidl_message_type_support_t * type_support, - const rmw_qos_profile_t * qos_profile) + const rmw_qos_profile_t * qos_profile, + const rmw_subscription_options_t & sub_options) { std::lock_guard lock_guard(mutex_); if (is_shutdown_) { @@ -216,7 +217,8 @@ bool NodeData::create_sub_data( std::move(id), std::move(topic_name), type_support, - qos_profile); + qos_profile, + sub_options); if (sub_data == nullptr) { RMW_ZENOH_LOG_ERROR_NAMED( "rmw_zenoh_cpp", diff --git a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp index e6fbede5..a76ce8b7 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_node_data.hpp @@ -75,7 +75,8 @@ class NodeData final std::size_t id, const std::string & topic_name, const rosidl_message_type_support_t * type_support, - const rmw_qos_profile_t * qos_profile); + const rmw_qos_profile_t * qos_profile, + const rmw_subscription_options_t & sub_options); // Retrieve the SubscriptionData for a given rmw_subscription_t if present. SubscriptionDataPtr get_sub_data(const rmw_subscription_t * const subscription); diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp index 45d4ee3c..0ace5d23 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.cpp @@ -59,7 +59,8 @@ std::shared_ptr SubscriptionData::make( std::size_t subscription_id, const std::string & topic_name, const rosidl_message_type_support_t * type_support, - const rmw_qos_profile_t * qos_profile) + const rmw_qos_profile_t * qos_profile, + const rmw_subscription_options_t & sub_options) { rmw_qos_profile_t adapted_qos_profile = *qos_profile; rmw_ret_t ret = QoS::get().best_available_qos( @@ -120,7 +121,8 @@ std::shared_ptr SubscriptionData::make( std::move(entity), std::move(session), type_support->data, - std::move(message_type_support) + std::move(message_type_support), + sub_options }); if (!sub_data->init()) { @@ -138,13 +140,15 @@ SubscriptionData::SubscriptionData( std::shared_ptr entity, std::shared_ptr session, const void * type_support_impl, - std::unique_ptr type_support) + std::unique_ptr type_support, + rmw_subscription_options_t sub_options) : rmw_node_(rmw_node), graph_cache_(std::move(graph_cache)), entity_(std::move(entity)), sess_(std::move(session)), type_support_impl_(type_support_impl), type_support_(std::move(type_support)), + sub_options_(std::move(sub_options)), last_known_published_msg_({}), wait_set_data_(nullptr), is_shutdown_(false), @@ -153,6 +157,7 @@ SubscriptionData::SubscriptionData( events_mgr_ = std::make_shared(); } +///============================================================================= // We have to use an "init" function here, rather than do this in the constructor, because we use // enable_shared_from_this, which is not available in constructors. bool SubscriptionData::init() @@ -171,6 +176,14 @@ bool SubscriptionData::init() using AdvancedSubscriberOptions = zenoh::ext::SessionExt::AdvancedSubscriberOptions; auto adv_sub_opts = AdvancedSubscriberOptions::create_default(); + // By default, this subscription will receive publications from publishers within and outside of + // the same Zenoh session as this subscription. + // If ignore_local_publications is true, we restrict this subscription to only receive samples + // from publishers in remote sessions. + if (sub_options_.ignore_local_publications) { + adv_sub_opts.subscriber_options.allowed_origin = ZC_LOCALITY_REMOTE; + } + // Instantiate the subscription with suitable options depending on the // adapted_qos_profile. if (entity_->topic_info()->qos_.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) { diff --git a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp index 8f37dc25..88c8f2b7 100644 --- a/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp +++ b/rmw_zenoh_cpp/src/detail/rmw_subscription_data.hpp @@ -71,7 +71,8 @@ class SubscriptionData final : public std::enable_shared_from_this entity, std::shared_ptr session, const void * type_support_impl, - std::unique_ptr type_support); + std::unique_ptr type_support, + rmw_subscription_options_t sub_options); bool init(); @@ -145,6 +147,8 @@ class SubscriptionData final : public std::enable_shared_from_this type_support_; + // Subscription options. + rmw_subscription_options_t sub_options_; std::deque> message_queue_; // Map GID of a subscription to the sequence number of the message it published. std::unordered_map last_known_published_msg_; diff --git a/rmw_zenoh_cpp/src/rmw_zenoh.cpp b/rmw_zenoh_cpp/src/rmw_zenoh.cpp index d7310fab..0536dda1 100644 --- a/rmw_zenoh_cpp/src/rmw_zenoh.cpp +++ b/rmw_zenoh_cpp/src/rmw_zenoh.cpp @@ -985,7 +985,8 @@ rmw_create_subscription( context_impl->get_next_entity_id(), topic_name, type_support, - qos_profile)) + qos_profile, + *subscription_options)) { // Error already handled. return nullptr;