From a9256384ad1e0a20f00f22d03869fdb77dd0bcee Mon Sep 17 00:00:00 2001 From: Mauro Date: Fri, 2 Oct 2020 16:35:37 +0100 Subject: [PATCH 01/11] add RMW listener APIs add constness Move set callback APIs Put unread events in queue when setting callback Put unread events in queue when setting callback Use or discard previous events: Guard conditions Unset callbacks support Rename EventHandle to ExecutorEventHandle Update name Add support for events void return on set_events_executor_callback Revert "void return on set_events_executor_callback" Add mutexes set to nullptr Remove use_executor_callback_, not needed Rename ExecutorEventCallback -> EventsExecutorCallback Rename set_events_executor_callback->set_listener_callback Use data types when setting callbacks Move rcutils/executor_event_types.h to rmw/ rename event types Rename executor_context->callback_context Rename callback_context->user_data Reorder APIs arguments rename rmw_listener_cb_t->rmw_listener_callback_t Only check callback pointer validity This is the only item that is used in the RMW layer, while the others are simply forwarded. use void * to pass executor ptr Rework executor callback data Use RMW renamed file Signed-off-by: Alberto Soragna --- rmw_fastrtps_cpp/src/rmw_client.cpp | 12 ++++ rmw_fastrtps_cpp/src/rmw_event.cpp | 14 ++++ rmw_fastrtps_cpp/src/rmw_guard_condition.cpp | 14 ++++ rmw_fastrtps_cpp/src/rmw_service.cpp | 12 ++++ rmw_fastrtps_cpp/src/rmw_subscription.cpp | 12 ++++ .../custom_client_info.hpp | 42 ++++++++++++ .../custom_event_info.hpp | 13 ++++ .../custom_publisher_info.hpp | 6 ++ .../custom_service_info.hpp | 43 ++++++++++++ .../custom_subscriber_info.hpp | 46 ++++++++++++- .../rmw_fastrtps_shared_cpp/rmw_common.hpp | 37 +++++++++++ .../src/custom_publisher_info.cpp | 44 +++++++++++++ .../src/custom_subscriber_info.cpp | 44 +++++++++++++ rmw_fastrtps_shared_cpp/src/rmw_client.cpp | 13 ++++ rmw_fastrtps_shared_cpp/src/rmw_event.cpp | 16 +++++ .../src/rmw_guard_condition.cpp | 15 +++++ rmw_fastrtps_shared_cpp/src/rmw_service.cpp | 13 ++++ .../src/rmw_subscription.cpp | 13 ++++ .../src/types/guard_condition.hpp | 66 ++++++++++++++++--- 19 files changed, 464 insertions(+), 11 deletions(-) diff --git a/rmw_fastrtps_cpp/src/rmw_client.cpp b/rmw_fastrtps_cpp/src/rmw_client.cpp index 18f364c84..9dc1e39ba 100644 --- a/rmw_fastrtps_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_cpp/src/rmw_client.cpp @@ -375,4 +375,16 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) return rmw_fastrtps_shared_cpp::__rmw_destroy_client( eprosima_fastrtps_identifier, node, client); } + +rmw_ret_t +rmw_client_set_listener_callback( + rmw_client_t * rmw_client, + rmw_listener_callback_t callback, + const void * user_data) +{ + return rmw_fastrtps_shared_cpp::__rmw_client_set_listener_callback( + rmw_client, + callback, + user_data); +} } // extern "C" diff --git a/rmw_fastrtps_cpp/src/rmw_event.cpp b/rmw_fastrtps_cpp/src/rmw_event.cpp index 5f4684777..922a65ded 100644 --- a/rmw_fastrtps_cpp/src/rmw_event.cpp +++ b/rmw_fastrtps_cpp/src/rmw_event.cpp @@ -47,4 +47,18 @@ rmw_subscription_event_init( subscription->data, event_type); } + +rmw_ret_t +rmw_event_set_listener_callback( + rmw_event_t * rmw_event, + rmw_listener_callback_t callback, + const void * user_data, + bool use_previous_events) +{ + return rmw_fastrtps_shared_cpp::__rmw_event_set_listener_callback( + rmw_event, + callback, + user_data, + use_previous_events); +} } // extern "C" diff --git a/rmw_fastrtps_cpp/src/rmw_guard_condition.cpp b/rmw_fastrtps_cpp/src/rmw_guard_condition.cpp index d4bb1a248..676712bb8 100644 --- a/rmw_fastrtps_cpp/src/rmw_guard_condition.cpp +++ b/rmw_fastrtps_cpp/src/rmw_guard_condition.cpp @@ -42,4 +42,18 @@ rmw_destroy_guard_condition(rmw_guard_condition_t * guard_condition) return rmw_fastrtps_shared_cpp::__rmw_destroy_guard_condition( guard_condition); } + +rmw_ret_t +rmw_guard_condition_set_listener_callback( + rmw_guard_condition_t * rmw_guard_condition, + rmw_listener_callback_t callback, + const void * user_data, + bool use_previous_events) +{ + return rmw_fastrtps_shared_cpp::__rmw_guard_condition_set_listener_callback( + rmw_guard_condition, + callback, + user_data, + use_previous_events); +} } // extern "C" diff --git a/rmw_fastrtps_cpp/src/rmw_service.cpp b/rmw_fastrtps_cpp/src/rmw_service.cpp index e36e791ee..af23dfcbe 100644 --- a/rmw_fastrtps_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_cpp/src/rmw_service.cpp @@ -381,4 +381,16 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service) return rmw_fastrtps_shared_cpp::__rmw_destroy_service( eprosima_fastrtps_identifier, node, service); } + +rmw_ret_t +rmw_service_set_listener_callback( + rmw_service_t * rmw_service, + rmw_listener_callback_t callback, + const void * user_data) +{ + return rmw_fastrtps_shared_cpp::__rmw_service_set_listener_callback( + rmw_service, + callback, + user_data); +} } // extern "C" diff --git a/rmw_fastrtps_cpp/src/rmw_subscription.cpp b/rmw_fastrtps_cpp/src/rmw_subscription.cpp index 6ce16e3ad..9abc7c954 100644 --- a/rmw_fastrtps_cpp/src/rmw_subscription.cpp +++ b/rmw_fastrtps_cpp/src/rmw_subscription.cpp @@ -167,4 +167,16 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription) return rmw_fastrtps_shared_cpp::__rmw_destroy_subscription( eprosima_fastrtps_identifier, node, subscription); } + +rmw_ret_t +rmw_subscription_set_listener_callback( + rmw_subscription_t * rmw_subscription, + rmw_listener_callback_t callback, + const void * user_data) +{ + return rmw_fastrtps_shared_cpp::__rmw_subscription_set_listener_callback( + rmw_subscription, + callback, + user_data); +} } // extern "C" diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp index 0ba4f9342..87ae94687 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp @@ -34,6 +34,8 @@ #include "rcpputils/thread_safety_annotations.hpp" +#include "rmw/listener_callback_type.h" + #include "rmw_fastrtps_shared_cpp/TypeSupport.hpp" class ClientListener; @@ -107,6 +109,15 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener list.emplace_back(std::move(response)); list_has_data_.store(true); } + + // Add the client event to the event queue + std::unique_lock lock_mutex(listener_callback_mutex_); + + if(listener_callback_) { + listener_callback_(user_data_); + } else { + unread_count_++; + } } } } @@ -164,6 +175,32 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener info_->response_subscriber_matched_count_.store(publishers_.size()); } + // Provide handlers to perform an action when a + // new event from this listener has ocurred + void + clientSetExecutorCallback( + const void * user_data, + rmw_listener_callback_t callback) + { + std::unique_lock lock_mutex(listener_callback_mutex_); + + if (callback) { + // Push events arrived before setting the the executor callback + for(uint64_t i = 0; i < unread_count_; i++) { + callback(user_data); + } + user_data_ = user_data; + listener_callback_ = callback; + } else { + user_data_ = nullptr; + listener_callback_ = nullptr; + return; + } + + // Reset unread count + unread_count_ = 0; + } + private: bool popResponse(CustomClientResponse & response) RCPPUTILS_TSA_REQUIRES(internalMutex_) { @@ -183,6 +220,11 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); std::set publishers_; + + rmw_listener_callback_t listener_callback_{nullptr}; + const void * user_data_{nullptr}; + std::mutex listener_callback_mutex_; + uint64_t unread_count_ = 0; }; class ClientPubListener : public eprosima::fastrtps::PublisherListener diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp index d3139afe2..cc679233d 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp @@ -32,6 +32,7 @@ #include "fastrtps/publisher/PublisherListener.h" #include "rmw/event.h" +#include "rmw/listener_callback_type.h" #include "rmw_fastrtps_shared_cpp/TypeSupport.hpp" @@ -65,6 +66,18 @@ class EventListenerInterface * \return `false` if data was not available, in this case nothing was written to event_info. */ virtual bool takeNextEvent(rmw_event_type_t event_type, void * event_info) = 0; + + // Provide handlers to perform an action when a + // new event from this listener has ocurred + virtual void eventSetExecutorCallback( + const void * user_data, + rmw_listener_callback_t callback, + bool use_previous_events) = 0; + + rmw_listener_callback_t listener_callback_{nullptr}; + const void * user_data_{nullptr}; + uint64_t unread_events_count_ = 0; + std::mutex listener_callback_mutex_; }; class EventListenerInterface::ConditionalScopedLock diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp index 5d626a2ee..0e3176a7f 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp @@ -93,6 +93,12 @@ class PubListener : public EventListenerInterface, public eprosima::fastrtps::Pu bool hasEvent(rmw_event_type_t event_type) const final; + RMW_FASTRTPS_SHARED_CPP_PUBLIC + void eventSetExecutorCallback( + const void * user_data, + rmw_listener_callback_t callback, + bool use_previous_events) final; + RMW_FASTRTPS_SHARED_CPP_PUBLIC bool takeNextEvent(rmw_event_type_t event_type, void * event_info) final; diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp index 2b30ab929..505f13756 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp @@ -33,6 +33,8 @@ #include "rcpputils/thread_safety_annotations.hpp" +#include "rmw/listener_callback_type.h" + #include "rmw_fastrtps_shared_cpp/TypeSupport.hpp" #include "rmw_fastrtps_shared_cpp/guid_utils.hpp" @@ -229,6 +231,15 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener list.push_back(request); list_has_data_.store(true); } + + // Add the service to the event queue + std::unique_lock lock_mutex(listener_callback_mutex_); + + if(listener_callback_) { + listener_callback_(user_data_); + } else { + unread_count_++; + } } } } @@ -279,6 +290,33 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener return list_has_data_.load(); } + // Provide handlers to perform an action when a + // new event from this listener has ocurred + void + serviceSetExecutorCallback( + const void * user_data, + rmw_listener_callback_t callback) + { + std::unique_lock lock_mutex(listener_callback_mutex_); + + if (callback) { + // Push events arrived before setting the the executor callback + for(uint64_t i = 0; i < unread_count_; i++) { + callback(user_data); + } + + user_data_ = user_data; + listener_callback_ = callback; + } else { + user_data_ = nullptr; + listener_callback_ = nullptr; + return; + } + + // Reset unread count + unread_count_ = 0; + } + private: CustomServiceInfo * info_; std::mutex internalMutex_; @@ -286,6 +324,11 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener std::atomic_bool list_has_data_; std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + + rmw_listener_callback_t listener_callback_{nullptr}; + const void * user_data_{nullptr}; + std::mutex listener_callback_mutex_; + uint64_t unread_count_ = 0; }; #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_ diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp index bb52cbe4d..0f1b49caa 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp @@ -27,6 +27,7 @@ #include "rcpputils/thread_safety_annotations.hpp" #include "rmw/impl/cpp/macros.hpp" +#include "rmw/listener_callback_type.h" #include "rmw_fastrtps_shared_cpp/TypeSupport.hpp" #include "rmw_fastrtps_shared_cpp/custom_event_info.hpp" @@ -83,7 +84,15 @@ class SubListener : public EventListenerInterface, public eprosima::fastrtps::Su void onNewDataMessage(eprosima::fastrtps::Subscriber * sub) final { - update_unread_count(sub); + // Callback: add the subscription event to the event queue + std::unique_lock lock_mutex(listener_callback_mutex_); + + if(listener_callback_) { + listener_callback_(user_data_); + } else { + update_unread_count(sub); + new_data_unread_count_++; + } } RMW_FASTRTPS_SHARED_CPP_PUBLIC @@ -103,6 +112,12 @@ class SubListener : public EventListenerInterface, public eprosima::fastrtps::Su bool hasEvent(rmw_event_type_t event_type) const final; + RMW_FASTRTPS_SHARED_CPP_PUBLIC + void eventSetExecutorCallback( + const void * user_data, + rmw_listener_callback_t callback, + bool use_previous_events) final; + RMW_FASTRTPS_SHARED_CPP_PUBLIC bool takeNextEvent(rmw_event_type_t event_type, void * event_info) final; @@ -152,6 +167,33 @@ class SubListener : public EventListenerInterface, public eprosima::fastrtps::Su return publishers_.size(); } + // Provide handlers to perform an action when a + // new event from this listener has ocurred + void + subcriptionSetExecutorCallback( + const void * user_data, + rmw_listener_callback_t callback) + { + std::unique_lock lock_mutex(listener_callback_mutex_); + + if (callback) { + // Push events arrived before setting the executor's callback + for(uint64_t i = 0; i < new_data_unread_count_; i++) { + callback(user_data); + } + + user_data_ = user_data; + listener_callback_ = callback; + } else { + user_data_ = nullptr; + listener_callback_ = nullptr; + return; + } + + // Reset unread count + new_data_unread_count_ = 0; + } + private: mutable std::mutex internalMutex_; @@ -169,6 +211,8 @@ class SubListener : public EventListenerInterface, public eprosima::fastrtps::Su std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); std::set publishers_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + + uint64_t new_data_unread_count_ = 0; }; #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SUBSCRIBER_INFO_HPP_ diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp index 082e5629a..07cbe6707 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp @@ -416,6 +416,43 @@ __rmw_subscription_get_network_flow_endpoints( rcutils_allocator_t * allocator, rmw_network_flow_endpoint_array_t * network_flow_endpoint_array); +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_subscription_set_listener_callback( + rmw_subscription_t * rmw_subscription, + rmw_listener_callback_t callback, + const void * user_data); + +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_service_set_listener_callback( + rmw_service_t * rmw_service, + rmw_listener_callback_t callback, + const void * user_data); + +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_client_set_listener_callback( + rmw_client_t * rmw_client, + rmw_listener_callback_t callback, + const void * user_data); + +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_guard_condition_set_listener_callback( + rmw_guard_condition_t * rmw_guard_condition, + rmw_listener_callback_t callback, + const void * user_data, + bool use_previous_events); + +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_event_set_listener_callback( + rmw_event_t * rmw_event, + rmw_listener_callback_t callback, + const void * user_data, + bool use_previous_events); + } // namespace rmw_fastrtps_shared_cpp #endif // RMW_FASTRTPS_SHARED_CPP__RMW_COMMON_HPP_ diff --git a/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp b/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp index f38f653dd..44c27a4b4 100644 --- a/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp +++ b/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp @@ -38,6 +38,15 @@ PubListener::on_offered_deadline_missed( offered_deadline_missed_status_.total_count_change += status.total_count_change; deadline_changes_.store(true, std::memory_order_relaxed); + + // Callback: add the change in qos event to the event queue + std::unique_lock lock_mutex(listener_callback_mutex_); + + if(listener_callback_){ + listener_callback_(user_data_); + } else { + unread_events_count_++; + } } void PubListener::on_liveliness_lost( @@ -56,6 +65,15 @@ void PubListener::on_liveliness_lost( liveliness_lost_status_.total_count_change += status.total_count_change; liveliness_changes_.store(true, std::memory_order_relaxed); + + // Callback: add the change in qos event to the event queue + std::unique_lock lock_mutex(listener_callback_mutex_); + + if(listener_callback_) { + listener_callback_(user_data_); + } else { + unread_events_count_++; + } } bool PubListener::hasEvent(rmw_event_type_t event_type) const @@ -72,6 +90,32 @@ bool PubListener::hasEvent(rmw_event_type_t event_type) const return false; } +void PubListener::eventSetExecutorCallback( + const void * user_data, + rmw_listener_callback_t callback, + bool use_previous_events) +{ + std::unique_lock lock_mutex(listener_callback_mutex_); + + if (callback) { + if (use_previous_events) { + // Push events arrived before setting the executor's callback + for(uint64_t i = 0; i < unread_events_count_; i++) { + callback(user_data); + } + } + user_data_ = user_data; + listener_callback_ = callback; + } else { + user_data_ = nullptr; + listener_callback_ = nullptr; + return; + } + + // Reset unread count + unread_events_count_ = 0; +} + bool PubListener::takeNextEvent(rmw_event_type_t event_type, void * event_info) { assert(rmw_fastrtps_shared_cpp::internal::is_event_supported(event_type)); diff --git a/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp b/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp index 172c20ab1..aa4d26cb2 100644 --- a/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp +++ b/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp @@ -38,6 +38,15 @@ SubListener::on_requested_deadline_missed( requested_deadline_missed_status_.total_count_change += status.total_count_change; deadline_changes_.store(true, std::memory_order_relaxed); + + // Callback: add the subscription event to the event queue + std::unique_lock lock_mutex(listener_callback_mutex_); + + if(listener_callback_) { + listener_callback_(user_data_); + } else { + unread_events_count_++; + } } void SubListener::on_liveliness_changed( @@ -58,6 +67,15 @@ void SubListener::on_liveliness_changed( liveliness_changed_status_.not_alive_count_change += status.not_alive_count_change; liveliness_changes_.store(true, std::memory_order_relaxed); + + // Callback: add the subscription event to the event queue + std::unique_lock lock_mutex(listener_callback_mutex_); + + if(listener_callback_) { + listener_callback_(user_data_); + } else { + unread_events_count_++; + } } bool SubListener::hasEvent(rmw_event_type_t event_type) const @@ -74,6 +92,32 @@ bool SubListener::hasEvent(rmw_event_type_t event_type) const return false; } +void SubListener::eventSetExecutorCallback( + const void * user_data, + rmw_listener_callback_t callback, + bool use_previous_events) +{ + std::unique_lock lock_mutex(listener_callback_mutex_); + + if (callback) { + if (use_previous_events) { + // Push events arrived before setting the executor's callback + for(uint64_t i = 0; i < unread_events_count_; i++) { + callback(user_data); + } + } + user_data_ = user_data; + listener_callback_ = callback; + } else { + user_data_ = nullptr; + listener_callback_ = nullptr; + return; + } + + // Reset unread count + unread_events_count_ = 0; +} + bool SubListener::takeNextEvent(rmw_event_type_t event_type, void * event_info) { assert(rmw_fastrtps_shared_cpp::internal::is_event_supported(event_type)); diff --git a/rmw_fastrtps_shared_cpp/src/rmw_client.cpp b/rmw_fastrtps_shared_cpp/src/rmw_client.cpp index 67f226bdc..67ba23dae 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_client.cpp @@ -80,4 +80,17 @@ __rmw_destroy_client( RCUTILS_CAN_RETURN_WITH_ERROR_OF(RMW_RET_ERROR); // on completion return ret; } + +rmw_ret_t +__rmw_client_set_listener_callback( + rmw_client_t * rmw_client, + rmw_listener_callback_t callback, + const void * user_data) +{ + auto custom_client_info = static_cast(rmw_client->data); + custom_client_info->listener_->clientSetExecutorCallback( + user_data, + callback); + return RMW_RET_OK; +} } // namespace rmw_fastrtps_shared_cpp diff --git a/rmw_fastrtps_shared_cpp/src/rmw_event.cpp b/rmw_fastrtps_shared_cpp/src/rmw_event.cpp index 4544d3136..3811a88cc 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_event.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_event.cpp @@ -17,6 +17,7 @@ #include "rmw/impl/cpp/macros.hpp" #include "rmw_fastrtps_shared_cpp/rmw_common.hpp" +#include "rmw_fastrtps_shared_cpp/custom_event_info.hpp" #include "types/event_types.hpp" static const std::unordered_set g_rmw_event_type_set{ @@ -67,4 +68,19 @@ __rmw_init_event( return RMW_RET_OK; } +rmw_ret_t +__rmw_event_set_listener_callback( + rmw_event_t * rmw_event, + rmw_listener_callback_t callback, + const void * user_data, + bool use_previous_events) +{ + auto custom_event_info = static_cast(rmw_event->data); + custom_event_info->getListener()->eventSetExecutorCallback( + user_data, + callback, + use_previous_events); + return RMW_RET_OK; +} + } // namespace rmw_fastrtps_shared_cpp diff --git a/rmw_fastrtps_shared_cpp/src/rmw_guard_condition.cpp b/rmw_fastrtps_shared_cpp/src/rmw_guard_condition.cpp index cdcac523c..862737c5c 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_guard_condition.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_guard_condition.cpp @@ -46,4 +46,19 @@ __rmw_destroy_guard_condition(rmw_guard_condition_t * guard_condition) RCUTILS_CAN_RETURN_WITH_ERROR_OF(RMW_RET_ERROR); // on completion return ret; } + +rmw_ret_t +__rmw_guard_condition_set_listener_callback( + rmw_guard_condition_t * rmw_guard_condition, + rmw_listener_callback_t callback, + const void * user_data, + bool use_previous_events) +{ + auto guard_condition = static_cast(rmw_guard_condition->data); + guard_condition->guardConditionSetExecutorCallback( + user_data, + callback, + use_previous_events); + return RMW_RET_OK; +} } // namespace rmw_fastrtps_shared_cpp diff --git a/rmw_fastrtps_shared_cpp/src/rmw_service.cpp b/rmw_fastrtps_shared_cpp/src/rmw_service.cpp index df5b47133..ef2ec0682 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_service.cpp @@ -93,4 +93,17 @@ __rmw_destroy_service( RCUTILS_CAN_RETURN_WITH_ERROR_OF(RMW_RET_ERROR); // on completion return ret; } + +rmw_ret_t +__rmw_service_set_listener_callback( + rmw_service_t * rmw_service, + rmw_listener_callback_t callback, + const void * user_data) +{ + auto custom_service_info = static_cast(rmw_service->data); + custom_service_info->listener_->serviceSetExecutorCallback( + user_data, + callback); + return RMW_RET_OK; +} } // namespace rmw_fastrtps_shared_cpp diff --git a/rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp b/rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp index 4ce18ac5e..14968ae25 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp @@ -106,4 +106,17 @@ __rmw_subscription_get_actual_qos( return RMW_RET_OK; } + +rmw_ret_t +__rmw_subscription_set_listener_callback( + rmw_subscription_t * rmw_subscription, + rmw_listener_callback_t callback, + const void * user_data) +{ + auto custom_subscriber_info = static_cast(rmw_subscription->data); + custom_subscriber_info->listener_->subcriptionSetExecutorCallback( + user_data, + callback); + return RMW_RET_OK; +} } // namespace rmw_fastrtps_shared_cpp diff --git a/rmw_fastrtps_shared_cpp/src/types/guard_condition.hpp b/rmw_fastrtps_shared_cpp/src/types/guard_condition.hpp index ab5076522..69a6081ef 100644 --- a/rmw_fastrtps_shared_cpp/src/types/guard_condition.hpp +++ b/rmw_fastrtps_shared_cpp/src/types/guard_condition.hpp @@ -24,6 +24,8 @@ #include "rcpputils/thread_safety_annotations.hpp" +#include "rmw/listener_callback_type.h" + class GuardCondition { public: @@ -34,18 +36,27 @@ class GuardCondition void trigger() { - std::lock_guard lock(internalMutex_); + std::unique_lock lock_mutex(listener_callback_mutex_); - if (conditionMutex_ != nullptr) { - std::unique_lock clock(*conditionMutex_); - // the change to hasTriggered_ needs to be mutually exclusive with - // rmw_wait() which checks hasTriggered() and decides if wait() needs to - // be called - hasTriggered_ = true; - clock.unlock(); - conditionVariable_->notify_one(); + if(listener_callback_) + { + listener_callback_(user_data_); } else { - hasTriggered_ = true; + std::lock_guard lock(internalMutex_); + + if (conditionMutex_ != nullptr) { + std::unique_lock clock(*conditionMutex_); + // the change to hasTriggered_ needs to be mutually exclusive with + // rmw_wait() which checks hasTriggered() and decides if wait() needs to + // be called + hasTriggered_ = true; + clock.unlock(); + conditionVariable_->notify_one(); + } else { + hasTriggered_ = true; + } + + unread_count_++; } } @@ -77,11 +88,46 @@ class GuardCondition return hasTriggered_.exchange(false); } + // Provide handlers to perform an action when a + // new event from this listener has ocurred + void + guardConditionSetExecutorCallback( + const void * user_data, + rmw_listener_callback_t callback, + bool use_previous_events) + { + std::unique_lock lock_mutex(listener_callback_mutex_); + + if (callback) { + if (use_previous_events) { + // Push events arrived before setting the executor's callback + for(uint64_t i = 0; i < unread_count_; i++) { + callback(user_data); + } + } + user_data_ = user_data; + listener_callback_ = callback; + } else { + user_data_ = nullptr; + listener_callback_ = nullptr; + return; + } + + + // Reset unread count + unread_count_ = 0; + } + private: std::mutex internalMutex_; std::atomic_bool hasTriggered_; std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + + rmw_listener_callback_t listener_callback_{nullptr}; + const void * user_data_{nullptr}; + std::mutex listener_callback_mutex_; + uint64_t unread_count_ = 0; }; #endif // TYPES__GUARD_CONDITION_HPP_ From 6de720004f700f0560a4a91b19c68e3239f1766a Mon Sep 17 00:00:00 2001 From: Mauro Passerino Date: Wed, 24 Mar 2021 16:54:57 -0300 Subject: [PATCH 02/11] Remove use_previous_event Signed-off-by: Mauro Passerino --- rmw_fastrtps_cpp/src/rmw_event.cpp | 6 ++---- rmw_fastrtps_cpp/src/rmw_guard_condition.cpp | 6 ++---- .../rmw_fastrtps_shared_cpp/custom_event_info.hpp | 3 +-- .../custom_publisher_info.hpp | 3 +-- .../custom_subscriber_info.hpp | 3 +-- .../include/rmw_fastrtps_shared_cpp/rmw_common.hpp | 6 ++---- .../src/custom_publisher_info.cpp | 13 +++++-------- .../src/custom_subscriber_info.cpp | 13 +++++-------- rmw_fastrtps_shared_cpp/src/rmw_event.cpp | 6 ++---- rmw_fastrtps_shared_cpp/src/rmw_guard_condition.cpp | 6 ++---- .../src/types/guard_condition.hpp | 12 ++++-------- 11 files changed, 27 insertions(+), 50 deletions(-) diff --git a/rmw_fastrtps_cpp/src/rmw_event.cpp b/rmw_fastrtps_cpp/src/rmw_event.cpp index 922a65ded..a125ca00d 100644 --- a/rmw_fastrtps_cpp/src/rmw_event.cpp +++ b/rmw_fastrtps_cpp/src/rmw_event.cpp @@ -52,13 +52,11 @@ rmw_ret_t rmw_event_set_listener_callback( rmw_event_t * rmw_event, rmw_listener_callback_t callback, - const void * user_data, - bool use_previous_events) + const void * user_data) { return rmw_fastrtps_shared_cpp::__rmw_event_set_listener_callback( rmw_event, callback, - user_data, - use_previous_events); + user_data); } } // extern "C" diff --git a/rmw_fastrtps_cpp/src/rmw_guard_condition.cpp b/rmw_fastrtps_cpp/src/rmw_guard_condition.cpp index 676712bb8..4b49adf7c 100644 --- a/rmw_fastrtps_cpp/src/rmw_guard_condition.cpp +++ b/rmw_fastrtps_cpp/src/rmw_guard_condition.cpp @@ -47,13 +47,11 @@ rmw_ret_t rmw_guard_condition_set_listener_callback( rmw_guard_condition_t * rmw_guard_condition, rmw_listener_callback_t callback, - const void * user_data, - bool use_previous_events) + const void * user_data) { return rmw_fastrtps_shared_cpp::__rmw_guard_condition_set_listener_callback( rmw_guard_condition, callback, - user_data, - use_previous_events); + user_data); } } // extern "C" diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp index cc679233d..a1c35c7bb 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp @@ -71,8 +71,7 @@ class EventListenerInterface // new event from this listener has ocurred virtual void eventSetExecutorCallback( const void * user_data, - rmw_listener_callback_t callback, - bool use_previous_events) = 0; + rmw_listener_callback_t callback) = 0; rmw_listener_callback_t listener_callback_{nullptr}; const void * user_data_{nullptr}; diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp index 0e3176a7f..fe8818079 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp @@ -96,8 +96,7 @@ class PubListener : public EventListenerInterface, public eprosima::fastrtps::Pu RMW_FASTRTPS_SHARED_CPP_PUBLIC void eventSetExecutorCallback( const void * user_data, - rmw_listener_callback_t callback, - bool use_previous_events) final; + rmw_listener_callback_t callback) final; RMW_FASTRTPS_SHARED_CPP_PUBLIC bool diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp index 0f1b49caa..a3476b2ad 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp @@ -115,8 +115,7 @@ class SubListener : public EventListenerInterface, public eprosima::fastrtps::Su RMW_FASTRTPS_SHARED_CPP_PUBLIC void eventSetExecutorCallback( const void * user_data, - rmw_listener_callback_t callback, - bool use_previous_events) final; + rmw_listener_callback_t callback) final; RMW_FASTRTPS_SHARED_CPP_PUBLIC bool diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp index 07cbe6707..e70ea610a 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp @@ -442,16 +442,14 @@ rmw_ret_t __rmw_guard_condition_set_listener_callback( rmw_guard_condition_t * rmw_guard_condition, rmw_listener_callback_t callback, - const void * user_data, - bool use_previous_events); + const void * user_data); RMW_FASTRTPS_SHARED_CPP_PUBLIC rmw_ret_t __rmw_event_set_listener_callback( rmw_event_t * rmw_event, rmw_listener_callback_t callback, - const void * user_data, - bool use_previous_events); + const void * user_data); } // namespace rmw_fastrtps_shared_cpp diff --git a/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp b/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp index 44c27a4b4..26fc01188 100644 --- a/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp +++ b/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp @@ -91,18 +91,15 @@ bool PubListener::hasEvent(rmw_event_type_t event_type) const } void PubListener::eventSetExecutorCallback( - const void * user_data, - rmw_listener_callback_t callback, - bool use_previous_events) + const void * user_data, + rmw_listener_callback_t callback) { std::unique_lock lock_mutex(listener_callback_mutex_); if (callback) { - if (use_previous_events) { - // Push events arrived before setting the executor's callback - for(uint64_t i = 0; i < unread_events_count_; i++) { - callback(user_data); - } + // Push events arrived before setting the executor's callback + for(uint64_t i = 0; i < unread_events_count_; i++) { + callback(user_data); } user_data_ = user_data; listener_callback_ = callback; diff --git a/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp b/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp index aa4d26cb2..1cd3a970b 100644 --- a/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp +++ b/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp @@ -93,18 +93,15 @@ bool SubListener::hasEvent(rmw_event_type_t event_type) const } void SubListener::eventSetExecutorCallback( - const void * user_data, - rmw_listener_callback_t callback, - bool use_previous_events) + const void * user_data, + rmw_listener_callback_t callback) { std::unique_lock lock_mutex(listener_callback_mutex_); if (callback) { - if (use_previous_events) { - // Push events arrived before setting the executor's callback - for(uint64_t i = 0; i < unread_events_count_; i++) { - callback(user_data); - } + // Push events arrived before setting the executor's callback + for(uint64_t i = 0; i < unread_events_count_; i++) { + callback(user_data); } user_data_ = user_data; listener_callback_ = callback; diff --git a/rmw_fastrtps_shared_cpp/src/rmw_event.cpp b/rmw_fastrtps_shared_cpp/src/rmw_event.cpp index 3811a88cc..2973db19a 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_event.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_event.cpp @@ -72,14 +72,12 @@ rmw_ret_t __rmw_event_set_listener_callback( rmw_event_t * rmw_event, rmw_listener_callback_t callback, - const void * user_data, - bool use_previous_events) + const void * user_data) { auto custom_event_info = static_cast(rmw_event->data); custom_event_info->getListener()->eventSetExecutorCallback( user_data, - callback, - use_previous_events); + callback); return RMW_RET_OK; } diff --git a/rmw_fastrtps_shared_cpp/src/rmw_guard_condition.cpp b/rmw_fastrtps_shared_cpp/src/rmw_guard_condition.cpp index 862737c5c..c32a3b63f 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_guard_condition.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_guard_condition.cpp @@ -51,14 +51,12 @@ rmw_ret_t __rmw_guard_condition_set_listener_callback( rmw_guard_condition_t * rmw_guard_condition, rmw_listener_callback_t callback, - const void * user_data, - bool use_previous_events) + const void * user_data) { auto guard_condition = static_cast(rmw_guard_condition->data); guard_condition->guardConditionSetExecutorCallback( user_data, - callback, - use_previous_events); + callback); return RMW_RET_OK; } } // namespace rmw_fastrtps_shared_cpp diff --git a/rmw_fastrtps_shared_cpp/src/types/guard_condition.hpp b/rmw_fastrtps_shared_cpp/src/types/guard_condition.hpp index 69a6081ef..3d34dc507 100644 --- a/rmw_fastrtps_shared_cpp/src/types/guard_condition.hpp +++ b/rmw_fastrtps_shared_cpp/src/types/guard_condition.hpp @@ -93,17 +93,14 @@ class GuardCondition void guardConditionSetExecutorCallback( const void * user_data, - rmw_listener_callback_t callback, - bool use_previous_events) + rmw_listener_callback_t callback) { std::unique_lock lock_mutex(listener_callback_mutex_); if (callback) { - if (use_previous_events) { - // Push events arrived before setting the executor's callback - for(uint64_t i = 0; i < unread_count_; i++) { - callback(user_data); - } + // Push events arrived before setting the executor's callback + for(uint64_t i = 0; i < unread_count_; i++) { + callback(user_data); } user_data_ = user_data; listener_callback_ = callback; @@ -113,7 +110,6 @@ class GuardCondition return; } - // Reset unread count unread_count_ = 0; } From 37925825f44b20f2bf933bfb2f54d72a2134a5ce Mon Sep 17 00:00:00 2001 From: Mauro Passerino Date: Thu, 25 Mar 2021 11:40:57 -0300 Subject: [PATCH 03/11] Use unread_count_ as arg Signed-off-by: Mauro Passerino --- .../rmw_fastrtps_shared_cpp/custom_client_info.hpp | 11 ++++------- .../rmw_fastrtps_shared_cpp/custom_service_info.hpp | 12 ++++-------- .../custom_subscriber_info.hpp | 12 ++++-------- .../src/custom_publisher_info.cpp | 13 +++++-------- .../src/custom_subscriber_info.cpp | 12 +++++------- .../src/types/guard_condition.hpp | 10 ++++------ 6 files changed, 26 insertions(+), 44 deletions(-) diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp index 87ae94687..4ab421c58 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp @@ -114,7 +114,7 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener std::unique_lock lock_mutex(listener_callback_mutex_); if(listener_callback_) { - listener_callback_(user_data_); + listener_callback_(user_data_, 1); } else { unread_count_++; } @@ -186,19 +186,16 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener if (callback) { // Push events arrived before setting the the executor callback - for(uint64_t i = 0; i < unread_count_; i++) { - callback(user_data); + if (unread_count_) { + callback(user_data, unread_count_); + unread_count_ = 0; } user_data_ = user_data; listener_callback_ = callback; } else { user_data_ = nullptr; listener_callback_ = nullptr; - return; } - - // Reset unread count - unread_count_ = 0; } private: diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp index 505f13756..88ccba326 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp @@ -236,7 +236,7 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener std::unique_lock lock_mutex(listener_callback_mutex_); if(listener_callback_) { - listener_callback_(user_data_); + listener_callback_(user_data_, 1); } else { unread_count_++; } @@ -301,20 +301,16 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener if (callback) { // Push events arrived before setting the the executor callback - for(uint64_t i = 0; i < unread_count_; i++) { - callback(user_data); + if (unread_count_) { + callback(user_data, unread_count_); + unread_count_ = 0; } - user_data_ = user_data; listener_callback_ = callback; } else { user_data_ = nullptr; listener_callback_ = nullptr; - return; } - - // Reset unread count - unread_count_ = 0; } private: diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp index a3476b2ad..e5782a5b6 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp @@ -88,7 +88,7 @@ class SubListener : public EventListenerInterface, public eprosima::fastrtps::Su std::unique_lock lock_mutex(listener_callback_mutex_); if(listener_callback_) { - listener_callback_(user_data_); + listener_callback_(user_data_, 1); } else { update_unread_count(sub); new_data_unread_count_++; @@ -177,20 +177,16 @@ class SubListener : public EventListenerInterface, public eprosima::fastrtps::Su if (callback) { // Push events arrived before setting the executor's callback - for(uint64_t i = 0; i < new_data_unread_count_; i++) { - callback(user_data); + if (new_data_unread_count_) { + callback(user_data, new_data_unread_count_); + new_data_unread_count_ = 0; } - user_data_ = user_data; listener_callback_ = callback; } else { user_data_ = nullptr; listener_callback_ = nullptr; - return; } - - // Reset unread count - new_data_unread_count_ = 0; } private: diff --git a/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp b/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp index 26fc01188..ff786b684 100644 --- a/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp +++ b/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp @@ -43,7 +43,7 @@ PubListener::on_offered_deadline_missed( std::unique_lock lock_mutex(listener_callback_mutex_); if(listener_callback_){ - listener_callback_(user_data_); + listener_callback_(user_data_, 1); } else { unread_events_count_++; } @@ -70,7 +70,7 @@ void PubListener::on_liveliness_lost( std::unique_lock lock_mutex(listener_callback_mutex_); if(listener_callback_) { - listener_callback_(user_data_); + listener_callback_(user_data_, 1); } else { unread_events_count_++; } @@ -98,19 +98,16 @@ void PubListener::eventSetExecutorCallback( if (callback) { // Push events arrived before setting the executor's callback - for(uint64_t i = 0; i < unread_events_count_; i++) { - callback(user_data); + if (unread_events_count_) { + callback(user_data, unread_events_count_); + unread_events_count_ = 0; } user_data_ = user_data; listener_callback_ = callback; } else { user_data_ = nullptr; listener_callback_ = nullptr; - return; } - - // Reset unread count - unread_events_count_ = 0; } bool PubListener::takeNextEvent(rmw_event_type_t event_type, void * event_info) diff --git a/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp b/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp index 1cd3a970b..761960216 100644 --- a/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp +++ b/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp @@ -43,7 +43,7 @@ SubListener::on_requested_deadline_missed( std::unique_lock lock_mutex(listener_callback_mutex_); if(listener_callback_) { - listener_callback_(user_data_); + listener_callback_(user_data_, 1); } else { unread_events_count_++; } @@ -72,7 +72,7 @@ void SubListener::on_liveliness_changed( std::unique_lock lock_mutex(listener_callback_mutex_); if(listener_callback_) { - listener_callback_(user_data_); + listener_callback_(user_data_, 1); } else { unread_events_count_++; } @@ -100,8 +100,9 @@ void SubListener::eventSetExecutorCallback( if (callback) { // Push events arrived before setting the executor's callback - for(uint64_t i = 0; i < unread_events_count_; i++) { - callback(user_data); + if (unread_events_count_) { + callback(user_data, unread_events_count_); + unread_events_count_ = 0; } user_data_ = user_data; listener_callback_ = callback; @@ -110,9 +111,6 @@ void SubListener::eventSetExecutorCallback( listener_callback_ = nullptr; return; } - - // Reset unread count - unread_events_count_ = 0; } bool SubListener::takeNextEvent(rmw_event_type_t event_type, void * event_info) diff --git a/rmw_fastrtps_shared_cpp/src/types/guard_condition.hpp b/rmw_fastrtps_shared_cpp/src/types/guard_condition.hpp index 3d34dc507..69bf74125 100644 --- a/rmw_fastrtps_shared_cpp/src/types/guard_condition.hpp +++ b/rmw_fastrtps_shared_cpp/src/types/guard_condition.hpp @@ -40,7 +40,7 @@ class GuardCondition if(listener_callback_) { - listener_callback_(user_data_); + listener_callback_(user_data_, 1); } else { std::lock_guard lock(internalMutex_); @@ -99,8 +99,9 @@ class GuardCondition if (callback) { // Push events arrived before setting the executor's callback - for(uint64_t i = 0; i < unread_count_; i++) { - callback(user_data); + if (unread_count_) { + callback(user_data, unread_count_); + unread_count_ = 0; } user_data_ = user_data; listener_callback_ = callback; @@ -109,9 +110,6 @@ class GuardCondition listener_callback_ = nullptr; return; } - - // Reset unread count - unread_count_ = 0; } private: From 8b169a1be93a3094d187053aebb1eb45006388cb Mon Sep 17 00:00:00 2001 From: Mauro Passerino Date: Wed, 31 Mar 2021 17:55:09 -0300 Subject: [PATCH 04/11] Remove guard condition listener Signed-off-by: Mauro Passerino --- rmw_fastrtps_cpp/src/rmw_guard_condition.cpp | 12 ---- .../rmw_fastrtps_shared_cpp/rmw_common.hpp | 7 --- .../src/rmw_guard_condition.cpp | 13 ---- .../src/types/guard_condition.hpp | 60 ++++--------------- 4 files changed, 10 insertions(+), 82 deletions(-) diff --git a/rmw_fastrtps_cpp/src/rmw_guard_condition.cpp b/rmw_fastrtps_cpp/src/rmw_guard_condition.cpp index 4b49adf7c..d4bb1a248 100644 --- a/rmw_fastrtps_cpp/src/rmw_guard_condition.cpp +++ b/rmw_fastrtps_cpp/src/rmw_guard_condition.cpp @@ -42,16 +42,4 @@ rmw_destroy_guard_condition(rmw_guard_condition_t * guard_condition) return rmw_fastrtps_shared_cpp::__rmw_destroy_guard_condition( guard_condition); } - -rmw_ret_t -rmw_guard_condition_set_listener_callback( - rmw_guard_condition_t * rmw_guard_condition, - rmw_listener_callback_t callback, - const void * user_data) -{ - return rmw_fastrtps_shared_cpp::__rmw_guard_condition_set_listener_callback( - rmw_guard_condition, - callback, - user_data); -} } // extern "C" diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp index e70ea610a..7a2dadec4 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp @@ -437,13 +437,6 @@ __rmw_client_set_listener_callback( rmw_listener_callback_t callback, const void * user_data); -RMW_FASTRTPS_SHARED_CPP_PUBLIC -rmw_ret_t -__rmw_guard_condition_set_listener_callback( - rmw_guard_condition_t * rmw_guard_condition, - rmw_listener_callback_t callback, - const void * user_data); - RMW_FASTRTPS_SHARED_CPP_PUBLIC rmw_ret_t __rmw_event_set_listener_callback( diff --git a/rmw_fastrtps_shared_cpp/src/rmw_guard_condition.cpp b/rmw_fastrtps_shared_cpp/src/rmw_guard_condition.cpp index c32a3b63f..cdcac523c 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_guard_condition.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_guard_condition.cpp @@ -46,17 +46,4 @@ __rmw_destroy_guard_condition(rmw_guard_condition_t * guard_condition) RCUTILS_CAN_RETURN_WITH_ERROR_OF(RMW_RET_ERROR); // on completion return ret; } - -rmw_ret_t -__rmw_guard_condition_set_listener_callback( - rmw_guard_condition_t * rmw_guard_condition, - rmw_listener_callback_t callback, - const void * user_data) -{ - auto guard_condition = static_cast(rmw_guard_condition->data); - guard_condition->guardConditionSetExecutorCallback( - user_data, - callback); - return RMW_RET_OK; -} } // namespace rmw_fastrtps_shared_cpp diff --git a/rmw_fastrtps_shared_cpp/src/types/guard_condition.hpp b/rmw_fastrtps_shared_cpp/src/types/guard_condition.hpp index 69bf74125..ab5076522 100644 --- a/rmw_fastrtps_shared_cpp/src/types/guard_condition.hpp +++ b/rmw_fastrtps_shared_cpp/src/types/guard_condition.hpp @@ -24,8 +24,6 @@ #include "rcpputils/thread_safety_annotations.hpp" -#include "rmw/listener_callback_type.h" - class GuardCondition { public: @@ -36,27 +34,18 @@ class GuardCondition void trigger() { - std::unique_lock lock_mutex(listener_callback_mutex_); + std::lock_guard lock(internalMutex_); - if(listener_callback_) - { - listener_callback_(user_data_, 1); + if (conditionMutex_ != nullptr) { + std::unique_lock clock(*conditionMutex_); + // the change to hasTriggered_ needs to be mutually exclusive with + // rmw_wait() which checks hasTriggered() and decides if wait() needs to + // be called + hasTriggered_ = true; + clock.unlock(); + conditionVariable_->notify_one(); } else { - std::lock_guard lock(internalMutex_); - - if (conditionMutex_ != nullptr) { - std::unique_lock clock(*conditionMutex_); - // the change to hasTriggered_ needs to be mutually exclusive with - // rmw_wait() which checks hasTriggered() and decides if wait() needs to - // be called - hasTriggered_ = true; - clock.unlock(); - conditionVariable_->notify_one(); - } else { - hasTriggered_ = true; - } - - unread_count_++; + hasTriggered_ = true; } } @@ -88,40 +77,11 @@ class GuardCondition return hasTriggered_.exchange(false); } - // Provide handlers to perform an action when a - // new event from this listener has ocurred - void - guardConditionSetExecutorCallback( - const void * user_data, - rmw_listener_callback_t callback) - { - std::unique_lock lock_mutex(listener_callback_mutex_); - - if (callback) { - // Push events arrived before setting the executor's callback - if (unread_count_) { - callback(user_data, unread_count_); - unread_count_ = 0; - } - user_data_ = user_data; - listener_callback_ = callback; - } else { - user_data_ = nullptr; - listener_callback_ = nullptr; - return; - } - } - private: std::mutex internalMutex_; std::atomic_bool hasTriggered_; std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); - - rmw_listener_callback_t listener_callback_{nullptr}; - const void * user_data_{nullptr}; - std::mutex listener_callback_mutex_; - uint64_t unread_count_ = 0; }; #endif // TYPES__GUARD_CONDITION_HPP_ From adc68d0fd4af4d64e8946843537bbabc067eaf8a Mon Sep 17 00:00:00 2001 From: William Woodall Date: Thu, 1 Apr 2021 21:00:36 -0700 Subject: [PATCH 05/11] refactor to remove listener term Signed-off-by: William Woodall --- rmw_fastrtps_cpp/src/rmw_client.cpp | 6 +++--- rmw_fastrtps_cpp/src/rmw_event.cpp | 6 +++--- rmw_fastrtps_cpp/src/rmw_service.cpp | 6 +++--- rmw_fastrtps_cpp/src/rmw_subscription.cpp | 6 +++--- .../custom_client_info.hpp | 6 +++--- .../custom_event_info.hpp | 6 +++--- .../custom_publisher_info.hpp | 2 +- .../custom_service_info.hpp | 6 +++--- .../custom_subscriber_info.hpp | 6 +++--- .../rmw_fastrtps_shared_cpp/rmw_common.hpp | 16 ++++++++-------- .../src/custom_publisher_info.cpp | 2 +- .../src/custom_subscriber_info.cpp | 2 +- rmw_fastrtps_shared_cpp/src/rmw_client.cpp | 4 ++-- rmw_fastrtps_shared_cpp/src/rmw_event.cpp | 4 ++-- rmw_fastrtps_shared_cpp/src/rmw_service.cpp | 4 ++-- rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp | 4 ++-- 16 files changed, 43 insertions(+), 43 deletions(-) diff --git a/rmw_fastrtps_cpp/src/rmw_client.cpp b/rmw_fastrtps_cpp/src/rmw_client.cpp index 9dc1e39ba..010c6fb29 100644 --- a/rmw_fastrtps_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_cpp/src/rmw_client.cpp @@ -377,12 +377,12 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) } rmw_ret_t -rmw_client_set_listener_callback( +rmw_client_set_on_new_response_callback( rmw_client_t * rmw_client, - rmw_listener_callback_t callback, + rmw_event_callback_t callback, const void * user_data) { - return rmw_fastrtps_shared_cpp::__rmw_client_set_listener_callback( + return rmw_fastrtps_shared_cpp::__rmw_client_set_on_new_response_callback( rmw_client, callback, user_data); diff --git a/rmw_fastrtps_cpp/src/rmw_event.cpp b/rmw_fastrtps_cpp/src/rmw_event.cpp index a125ca00d..9bdc819e7 100644 --- a/rmw_fastrtps_cpp/src/rmw_event.cpp +++ b/rmw_fastrtps_cpp/src/rmw_event.cpp @@ -49,12 +49,12 @@ rmw_subscription_event_init( } rmw_ret_t -rmw_event_set_listener_callback( +rmw_event_set_callback( rmw_event_t * rmw_event, - rmw_listener_callback_t callback, + rmw_event_callback_t callback, const void * user_data) { - return rmw_fastrtps_shared_cpp::__rmw_event_set_listener_callback( + return rmw_fastrtps_shared_cpp::__rmw_event_set_callback( rmw_event, callback, user_data); diff --git a/rmw_fastrtps_cpp/src/rmw_service.cpp b/rmw_fastrtps_cpp/src/rmw_service.cpp index af23dfcbe..2ba9864f5 100644 --- a/rmw_fastrtps_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_cpp/src/rmw_service.cpp @@ -383,12 +383,12 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service) } rmw_ret_t -rmw_service_set_listener_callback( +rmw_service_set_on_new_request_callback( rmw_service_t * rmw_service, - rmw_listener_callback_t callback, + rmw_event_callback_t callback, const void * user_data) { - return rmw_fastrtps_shared_cpp::__rmw_service_set_listener_callback( + return rmw_fastrtps_shared_cpp::__rmw_service_set_on_new_request_callback( rmw_service, callback, user_data); diff --git a/rmw_fastrtps_cpp/src/rmw_subscription.cpp b/rmw_fastrtps_cpp/src/rmw_subscription.cpp index 9abc7c954..58367e4ba 100644 --- a/rmw_fastrtps_cpp/src/rmw_subscription.cpp +++ b/rmw_fastrtps_cpp/src/rmw_subscription.cpp @@ -169,12 +169,12 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription) } rmw_ret_t -rmw_subscription_set_listener_callback( +rmw_subscription_set_on_new_message_callback( rmw_subscription_t * rmw_subscription, - rmw_listener_callback_t callback, + rmw_event_callback_t callback, const void * user_data) { - return rmw_fastrtps_shared_cpp::__rmw_subscription_set_listener_callback( + return rmw_fastrtps_shared_cpp::__rmw_subscription_set_on_new_message_callback( rmw_subscription, callback, user_data); diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp index 4ab421c58..8fe281f81 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp @@ -34,7 +34,7 @@ #include "rcpputils/thread_safety_annotations.hpp" -#include "rmw/listener_callback_type.h" +#include "rmw/event_callback_type.h" #include "rmw_fastrtps_shared_cpp/TypeSupport.hpp" @@ -180,7 +180,7 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener void clientSetExecutorCallback( const void * user_data, - rmw_listener_callback_t callback) + rmw_event_callback_t callback) { std::unique_lock lock_mutex(listener_callback_mutex_); @@ -218,7 +218,7 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); std::set publishers_; - rmw_listener_callback_t listener_callback_{nullptr}; + rmw_event_callback_t listener_callback_{nullptr}; const void * user_data_{nullptr}; std::mutex listener_callback_mutex_; uint64_t unread_count_ = 0; diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp index a1c35c7bb..12bcc3c6d 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp @@ -32,7 +32,7 @@ #include "fastrtps/publisher/PublisherListener.h" #include "rmw/event.h" -#include "rmw/listener_callback_type.h" +#include "rmw/event_callback_type.h" #include "rmw_fastrtps_shared_cpp/TypeSupport.hpp" @@ -71,9 +71,9 @@ class EventListenerInterface // new event from this listener has ocurred virtual void eventSetExecutorCallback( const void * user_data, - rmw_listener_callback_t callback) = 0; + rmw_event_callback_t callback) = 0; - rmw_listener_callback_t listener_callback_{nullptr}; + rmw_event_callback_t listener_callback_{nullptr}; const void * user_data_{nullptr}; uint64_t unread_events_count_ = 0; std::mutex listener_callback_mutex_; diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp index fe8818079..ff3cad444 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp @@ -96,7 +96,7 @@ class PubListener : public EventListenerInterface, public eprosima::fastrtps::Pu RMW_FASTRTPS_SHARED_CPP_PUBLIC void eventSetExecutorCallback( const void * user_data, - rmw_listener_callback_t callback) final; + rmw_event_callback_t callback) final; RMW_FASTRTPS_SHARED_CPP_PUBLIC bool diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp index 88ccba326..ce33da162 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp @@ -33,7 +33,7 @@ #include "rcpputils/thread_safety_annotations.hpp" -#include "rmw/listener_callback_type.h" +#include "rmw/event_callback_type.h" #include "rmw_fastrtps_shared_cpp/TypeSupport.hpp" #include "rmw_fastrtps_shared_cpp/guid_utils.hpp" @@ -295,7 +295,7 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener void serviceSetExecutorCallback( const void * user_data, - rmw_listener_callback_t callback) + rmw_event_callback_t callback) { std::unique_lock lock_mutex(listener_callback_mutex_); @@ -321,7 +321,7 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); - rmw_listener_callback_t listener_callback_{nullptr}; + rmw_event_callback_t listener_callback_{nullptr}; const void * user_data_{nullptr}; std::mutex listener_callback_mutex_; uint64_t unread_count_ = 0; diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp index e5782a5b6..e4872ab9e 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp @@ -27,7 +27,7 @@ #include "rcpputils/thread_safety_annotations.hpp" #include "rmw/impl/cpp/macros.hpp" -#include "rmw/listener_callback_type.h" +#include "rmw/event_callback_type.h" #include "rmw_fastrtps_shared_cpp/TypeSupport.hpp" #include "rmw_fastrtps_shared_cpp/custom_event_info.hpp" @@ -115,7 +115,7 @@ class SubListener : public EventListenerInterface, public eprosima::fastrtps::Su RMW_FASTRTPS_SHARED_CPP_PUBLIC void eventSetExecutorCallback( const void * user_data, - rmw_listener_callback_t callback) final; + rmw_event_callback_t callback) final; RMW_FASTRTPS_SHARED_CPP_PUBLIC bool @@ -171,7 +171,7 @@ class SubListener : public EventListenerInterface, public eprosima::fastrtps::Su void subcriptionSetExecutorCallback( const void * user_data, - rmw_listener_callback_t callback) + rmw_event_callback_t callback) { std::unique_lock lock_mutex(listener_callback_mutex_); diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp index 7a2dadec4..a66112acb 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp @@ -418,30 +418,30 @@ __rmw_subscription_get_network_flow_endpoints( RMW_FASTRTPS_SHARED_CPP_PUBLIC rmw_ret_t -__rmw_subscription_set_listener_callback( +__rmw_subscription_set_on_new_message_callback( rmw_subscription_t * rmw_subscription, - rmw_listener_callback_t callback, + rmw_event_callback_t callback, const void * user_data); RMW_FASTRTPS_SHARED_CPP_PUBLIC rmw_ret_t -__rmw_service_set_listener_callback( +__rmw_service_set_on_new_request_callback( rmw_service_t * rmw_service, - rmw_listener_callback_t callback, + rmw_event_callback_t callback, const void * user_data); RMW_FASTRTPS_SHARED_CPP_PUBLIC rmw_ret_t -__rmw_client_set_listener_callback( +__rmw_client_set_on_new_response_callback( rmw_client_t * rmw_client, - rmw_listener_callback_t callback, + rmw_event_callback_t callback, const void * user_data); RMW_FASTRTPS_SHARED_CPP_PUBLIC rmw_ret_t -__rmw_event_set_listener_callback( +__rmw_event_set_callback( rmw_event_t * rmw_event, - rmw_listener_callback_t callback, + rmw_event_callback_t callback, const void * user_data); } // namespace rmw_fastrtps_shared_cpp diff --git a/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp b/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp index ff786b684..79de0a367 100644 --- a/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp +++ b/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp @@ -92,7 +92,7 @@ bool PubListener::hasEvent(rmw_event_type_t event_type) const void PubListener::eventSetExecutorCallback( const void * user_data, - rmw_listener_callback_t callback) + rmw_event_callback_t callback) { std::unique_lock lock_mutex(listener_callback_mutex_); diff --git a/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp b/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp index 761960216..6b3385cc5 100644 --- a/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp +++ b/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp @@ -94,7 +94,7 @@ bool SubListener::hasEvent(rmw_event_type_t event_type) const void SubListener::eventSetExecutorCallback( const void * user_data, - rmw_listener_callback_t callback) + rmw_event_callback_t callback) { std::unique_lock lock_mutex(listener_callback_mutex_); diff --git a/rmw_fastrtps_shared_cpp/src/rmw_client.cpp b/rmw_fastrtps_shared_cpp/src/rmw_client.cpp index 67ba23dae..04a690b19 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_client.cpp @@ -82,9 +82,9 @@ __rmw_destroy_client( } rmw_ret_t -__rmw_client_set_listener_callback( +__rmw_client_set_on_new_response_callback( rmw_client_t * rmw_client, - rmw_listener_callback_t callback, + rmw_event_callback_t callback, const void * user_data) { auto custom_client_info = static_cast(rmw_client->data); diff --git a/rmw_fastrtps_shared_cpp/src/rmw_event.cpp b/rmw_fastrtps_shared_cpp/src/rmw_event.cpp index 2973db19a..7c13fd3d2 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_event.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_event.cpp @@ -69,9 +69,9 @@ __rmw_init_event( } rmw_ret_t -__rmw_event_set_listener_callback( +__rmw_event_set_callback( rmw_event_t * rmw_event, - rmw_listener_callback_t callback, + rmw_event_callback_t callback, const void * user_data) { auto custom_event_info = static_cast(rmw_event->data); diff --git a/rmw_fastrtps_shared_cpp/src/rmw_service.cpp b/rmw_fastrtps_shared_cpp/src/rmw_service.cpp index ef2ec0682..e401e50d6 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_service.cpp @@ -95,9 +95,9 @@ __rmw_destroy_service( } rmw_ret_t -__rmw_service_set_listener_callback( +__rmw_service_set_on_new_request_callback( rmw_service_t * rmw_service, - rmw_listener_callback_t callback, + rmw_event_callback_t callback, const void * user_data) { auto custom_service_info = static_cast(rmw_service->data); diff --git a/rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp b/rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp index 14968ae25..188266e92 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp @@ -108,9 +108,9 @@ __rmw_subscription_get_actual_qos( } rmw_ret_t -__rmw_subscription_set_listener_callback( +__rmw_subscription_set_on_new_message_callback( rmw_subscription_t * rmw_subscription, - rmw_listener_callback_t callback, + rmw_event_callback_t callback, const void * user_data) { auto custom_subscriber_info = static_cast(rmw_subscription->data); From b6c50240d9e64b30cffcd22c375b5bd41aeac593 Mon Sep 17 00:00:00 2001 From: Mauro Passerino Date: Fri, 23 Apr 2021 17:14:27 -0300 Subject: [PATCH 06/11] Rename listener callbacks Signed-off-by: Mauro Passerino --- .../custom_client_info.hpp | 19 ++++++++-------- .../custom_event_info.hpp | 6 ++--- .../custom_publisher_info.hpp | 2 +- .../custom_service_info.hpp | 19 ++++++++-------- .../custom_subscriber_info.hpp | 22 ++++++++++--------- .../src/custom_publisher_info.cpp | 22 +++++++++---------- .../src/custom_subscriber_info.cpp | 22 +++++++++---------- rmw_fastrtps_shared_cpp/src/rmw_client.cpp | 2 +- rmw_fastrtps_shared_cpp/src/rmw_event.cpp | 2 +- rmw_fastrtps_shared_cpp/src/rmw_service.cpp | 2 +- .../src/rmw_subscription.cpp | 2 +- 11 files changed, 58 insertions(+), 62 deletions(-) diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp index 8fe281f81..be2f8389e 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp @@ -110,11 +110,10 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener list_has_data_.store(true); } - // Add the client event to the event queue - std::unique_lock lock_mutex(listener_callback_mutex_); + std::unique_lock lock_mutex(on_new_response_m_); - if(listener_callback_) { - listener_callback_(user_data_, 1); + if(on_new_response_cb_) { + on_new_response_cb_(user_data_, 1); } else { unread_count_++; } @@ -178,11 +177,11 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener // Provide handlers to perform an action when a // new event from this listener has ocurred void - clientSetExecutorCallback( + set_on_new_response_callback( const void * user_data, rmw_event_callback_t callback) { - std::unique_lock lock_mutex(listener_callback_mutex_); + std::unique_lock lock_mutex(on_new_response_m_); if (callback) { // Push events arrived before setting the the executor callback @@ -191,10 +190,10 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener unread_count_ = 0; } user_data_ = user_data; - listener_callback_ = callback; + on_new_response_cb_ = callback; } else { user_data_ = nullptr; - listener_callback_ = nullptr; + on_new_response_cb_ = nullptr; } } @@ -218,9 +217,9 @@ class ClientListener : public eprosima::fastrtps::SubscriberListener std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); std::set publishers_; - rmw_event_callback_t listener_callback_{nullptr}; + rmw_event_callback_t on_new_response_cb_{nullptr}; const void * user_data_{nullptr}; - std::mutex listener_callback_mutex_; + std::mutex on_new_response_m_; uint64_t unread_count_ = 0; }; diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp index 12bcc3c6d..843dc1345 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp @@ -69,14 +69,14 @@ class EventListenerInterface // Provide handlers to perform an action when a // new event from this listener has ocurred - virtual void eventSetExecutorCallback( + virtual void set_on_new_event_callback( const void * user_data, rmw_event_callback_t callback) = 0; - rmw_event_callback_t listener_callback_{nullptr}; + rmw_event_callback_t on_new_event_cb_{nullptr}; const void * user_data_{nullptr}; uint64_t unread_events_count_ = 0; - std::mutex listener_callback_mutex_; + std::mutex on_new_event_m_; }; class EventListenerInterface::ConditionalScopedLock diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp index ff3cad444..b35bd7a2a 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp @@ -94,7 +94,7 @@ class PubListener : public EventListenerInterface, public eprosima::fastrtps::Pu hasEvent(rmw_event_type_t event_type) const final; RMW_FASTRTPS_SHARED_CPP_PUBLIC - void eventSetExecutorCallback( + void set_on_new_event_callback( const void * user_data, rmw_event_callback_t callback) final; diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp index ce33da162..e5eee6bf1 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp @@ -232,11 +232,10 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener list_has_data_.store(true); } - // Add the service to the event queue - std::unique_lock lock_mutex(listener_callback_mutex_); + std::unique_lock lock_mutex(on_new_request_m_); - if(listener_callback_) { - listener_callback_(user_data_, 1); + if(on_new_request_cb_) { + on_new_request_cb_(user_data_, 1); } else { unread_count_++; } @@ -293,11 +292,11 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener // Provide handlers to perform an action when a // new event from this listener has ocurred void - serviceSetExecutorCallback( + set_on_new_request_callback( const void * user_data, rmw_event_callback_t callback) { - std::unique_lock lock_mutex(listener_callback_mutex_); + std::unique_lock lock_mutex(on_new_request_m_); if (callback) { // Push events arrived before setting the the executor callback @@ -306,10 +305,10 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener unread_count_ = 0; } user_data_ = user_data; - listener_callback_ = callback; + on_new_request_cb_ = callback; } else { user_data_ = nullptr; - listener_callback_ = nullptr; + on_new_request_cb_ = nullptr; } } @@ -321,9 +320,9 @@ class ServiceListener : public eprosima::fastrtps::SubscriberListener std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); - rmw_event_callback_t listener_callback_{nullptr}; + rmw_event_callback_t on_new_request_cb_{nullptr}; const void * user_data_{nullptr}; - std::mutex listener_callback_mutex_; + std::mutex on_new_request_m_; uint64_t unread_count_ = 0; }; diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp index e4872ab9e..d66b913dc 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp @@ -84,13 +84,13 @@ class SubListener : public EventListenerInterface, public eprosima::fastrtps::Su void onNewDataMessage(eprosima::fastrtps::Subscriber * sub) final { - // Callback: add the subscription event to the event queue - std::unique_lock lock_mutex(listener_callback_mutex_); + update_unread_count(sub); + + std::unique_lock lock_mutex(on_new_message_m_); - if(listener_callback_) { - listener_callback_(user_data_, 1); + if(on_new_message_cb_) { + on_new_message_cb_(user_data_, 1); } else { - update_unread_count(sub); new_data_unread_count_++; } } @@ -113,7 +113,7 @@ class SubListener : public EventListenerInterface, public eprosima::fastrtps::Su hasEvent(rmw_event_type_t event_type) const final; RMW_FASTRTPS_SHARED_CPP_PUBLIC - void eventSetExecutorCallback( + void set_on_new_event_callback( const void * user_data, rmw_event_callback_t callback) final; @@ -169,11 +169,11 @@ class SubListener : public EventListenerInterface, public eprosima::fastrtps::Su // Provide handlers to perform an action when a // new event from this listener has ocurred void - subcriptionSetExecutorCallback( + set_on_new_message_callback( const void * user_data, rmw_event_callback_t callback) { - std::unique_lock lock_mutex(listener_callback_mutex_); + std::unique_lock lock_mutex(on_new_message_m_); if (callback) { // Push events arrived before setting the executor's callback @@ -182,10 +182,10 @@ class SubListener : public EventListenerInterface, public eprosima::fastrtps::Su new_data_unread_count_ = 0; } user_data_ = user_data; - listener_callback_ = callback; + on_new_message_cb_ = callback; } else { user_data_ = nullptr; - listener_callback_ = nullptr; + on_new_message_cb_ = nullptr; } } @@ -207,6 +207,8 @@ class SubListener : public EventListenerInterface, public eprosima::fastrtps::Su std::set publishers_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + rmw_event_callback_t on_new_message_cb_{nullptr}; + std::mutex on_new_message_m_; uint64_t new_data_unread_count_ = 0; }; diff --git a/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp b/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp index 79de0a367..05fef56ba 100644 --- a/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp +++ b/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp @@ -39,11 +39,10 @@ PubListener::on_offered_deadline_missed( deadline_changes_.store(true, std::memory_order_relaxed); - // Callback: add the change in qos event to the event queue - std::unique_lock lock_mutex(listener_callback_mutex_); + std::unique_lock lock_mutex(on_new_event_m_); - if(listener_callback_){ - listener_callback_(user_data_, 1); + if(on_new_event_cb_){ + on_new_event_cb_(user_data_, 1); } else { unread_events_count_++; } @@ -66,11 +65,10 @@ void PubListener::on_liveliness_lost( liveliness_changes_.store(true, std::memory_order_relaxed); - // Callback: add the change in qos event to the event queue - std::unique_lock lock_mutex(listener_callback_mutex_); + std::unique_lock lock_mutex(on_new_event_m_); - if(listener_callback_) { - listener_callback_(user_data_, 1); + if(on_new_event_cb_) { + on_new_event_cb_(user_data_, 1); } else { unread_events_count_++; } @@ -90,11 +88,11 @@ bool PubListener::hasEvent(rmw_event_type_t event_type) const return false; } -void PubListener::eventSetExecutorCallback( +void PubListener::set_on_new_event_callback( const void * user_data, rmw_event_callback_t callback) { - std::unique_lock lock_mutex(listener_callback_mutex_); + std::unique_lock lock_mutex(on_new_event_m_); if (callback) { // Push events arrived before setting the executor's callback @@ -103,10 +101,10 @@ void PubListener::eventSetExecutorCallback( unread_events_count_ = 0; } user_data_ = user_data; - listener_callback_ = callback; + on_new_event_cb_ = callback; } else { user_data_ = nullptr; - listener_callback_ = nullptr; + on_new_event_cb_ = nullptr; } } diff --git a/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp b/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp index 6b3385cc5..863289788 100644 --- a/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp +++ b/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp @@ -39,11 +39,10 @@ SubListener::on_requested_deadline_missed( deadline_changes_.store(true, std::memory_order_relaxed); - // Callback: add the subscription event to the event queue - std::unique_lock lock_mutex(listener_callback_mutex_); + std::unique_lock lock_mutex(on_new_event_m_); - if(listener_callback_) { - listener_callback_(user_data_, 1); + if(on_new_event_cb_) { + on_new_event_cb_(user_data_, 1); } else { unread_events_count_++; } @@ -68,11 +67,10 @@ void SubListener::on_liveliness_changed( liveliness_changes_.store(true, std::memory_order_relaxed); - // Callback: add the subscription event to the event queue - std::unique_lock lock_mutex(listener_callback_mutex_); + std::unique_lock lock_mutex(on_new_event_m_); - if(listener_callback_) { - listener_callback_(user_data_, 1); + if(on_new_event_cb_) { + on_new_event_cb_(user_data_, 1); } else { unread_events_count_++; } @@ -92,11 +90,11 @@ bool SubListener::hasEvent(rmw_event_type_t event_type) const return false; } -void SubListener::eventSetExecutorCallback( +void SubListener::set_on_new_event_callback( const void * user_data, rmw_event_callback_t callback) { - std::unique_lock lock_mutex(listener_callback_mutex_); + std::unique_lock lock_mutex(on_new_event_m_); if (callback) { // Push events arrived before setting the executor's callback @@ -105,10 +103,10 @@ void SubListener::eventSetExecutorCallback( unread_events_count_ = 0; } user_data_ = user_data; - listener_callback_ = callback; + on_new_event_cb_ = callback; } else { user_data_ = nullptr; - listener_callback_ = nullptr; + on_new_event_cb_ = nullptr; return; } } diff --git a/rmw_fastrtps_shared_cpp/src/rmw_client.cpp b/rmw_fastrtps_shared_cpp/src/rmw_client.cpp index 04a690b19..043f1201b 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_client.cpp @@ -88,7 +88,7 @@ __rmw_client_set_on_new_response_callback( const void * user_data) { auto custom_client_info = static_cast(rmw_client->data); - custom_client_info->listener_->clientSetExecutorCallback( + custom_client_info->listener_->set_on_new_response_callback( user_data, callback); return RMW_RET_OK; diff --git a/rmw_fastrtps_shared_cpp/src/rmw_event.cpp b/rmw_fastrtps_shared_cpp/src/rmw_event.cpp index 7c13fd3d2..cbff3ceab 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_event.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_event.cpp @@ -75,7 +75,7 @@ __rmw_event_set_callback( const void * user_data) { auto custom_event_info = static_cast(rmw_event->data); - custom_event_info->getListener()->eventSetExecutorCallback( + custom_event_info->getListener()->set_on_new_event_callback( user_data, callback); return RMW_RET_OK; diff --git a/rmw_fastrtps_shared_cpp/src/rmw_service.cpp b/rmw_fastrtps_shared_cpp/src/rmw_service.cpp index e401e50d6..56718c2b2 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_service.cpp @@ -101,7 +101,7 @@ __rmw_service_set_on_new_request_callback( const void * user_data) { auto custom_service_info = static_cast(rmw_service->data); - custom_service_info->listener_->serviceSetExecutorCallback( + custom_service_info->listener_->set_on_new_request_callback( user_data, callback); return RMW_RET_OK; diff --git a/rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp b/rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp index 188266e92..b218b14c2 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp @@ -114,7 +114,7 @@ __rmw_subscription_set_on_new_message_callback( const void * user_data) { auto custom_subscriber_info = static_cast(rmw_subscription->data); - custom_subscriber_info->listener_->subcriptionSetExecutorCallback( + custom_subscriber_info->listener_->set_on_new_message_callback( user_data, callback); return RMW_RET_OK; From c268972fe04722805329918f2588f7b685397a2d Mon Sep 17 00:00:00 2001 From: Mauro Passerino Date: Fri, 23 Apr 2021 18:04:45 -0300 Subject: [PATCH 07/11] Use qos depth for initial callback call Signed-off-by: Mauro Passerino --- rmw_fastrtps_cpp/src/subscription.cpp | 2 +- rmw_fastrtps_dynamic_cpp/src/subscription.cpp | 2 +- .../rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp | 8 ++++++-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/rmw_fastrtps_cpp/src/subscription.cpp b/rmw_fastrtps_cpp/src/subscription.cpp index 30cbf6fb0..3e5d64e57 100644 --- a/rmw_fastrtps_cpp/src/subscription.cpp +++ b/rmw_fastrtps_cpp/src/subscription.cpp @@ -163,7 +163,7 @@ create_subscription( } if (create_subscription_listener) { - info->listener_ = new (std::nothrow) SubListener(info); + info->listener_ = new (std::nothrow) SubListener(info, qos_policies->depth); if (!info->listener_) { RMW_SET_ERROR_MSG("create_subscriber() could not create subscriber listener"); return nullptr; diff --git a/rmw_fastrtps_dynamic_cpp/src/subscription.cpp b/rmw_fastrtps_dynamic_cpp/src/subscription.cpp index 41091471e..2bd465ba2 100644 --- a/rmw_fastrtps_dynamic_cpp/src/subscription.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/subscription.cpp @@ -175,7 +175,7 @@ create_subscription( return nullptr; } - info->listener_ = new (std::nothrow) SubListener(info); + info->listener_ = new (std::nothrow) SubListener(info, qos_policies->depth); if (!info->listener_) { RMW_SET_ERROR_MSG("create_subscriber() could not create subscriber listener"); return nullptr; diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp index d66b913dc..8002cc495 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp @@ -17,6 +17,7 @@ #include #include +#include #include #include #include @@ -54,13 +55,14 @@ struct CustomSubscriberInfo : public CustomEventInfo class SubListener : public EventListenerInterface, public eprosima::fastrtps::SubscriberListener { public: - explicit SubListener(CustomSubscriberInfo * info) + explicit SubListener(CustomSubscriberInfo * info, size_t qos_depth) : data_(0), deadline_changes_(false), liveliness_changes_(false), conditionMutex_(nullptr), conditionVariable_(nullptr) { + qos_depth_ = (qos_depth > 0) ? qos_depth : std::numeric_limits::max(); // Field is not used right now (void)info; } @@ -178,7 +180,8 @@ class SubListener : public EventListenerInterface, public eprosima::fastrtps::Su if (callback) { // Push events arrived before setting the executor's callback if (new_data_unread_count_) { - callback(user_data, new_data_unread_count_); + auto unread_count = std::min(new_data_unread_count_, qos_depth_); + callback(user_data, unread_count); new_data_unread_count_ = 0; } user_data_ = user_data; @@ -209,6 +212,7 @@ class SubListener : public EventListenerInterface, public eprosima::fastrtps::Su rmw_event_callback_t on_new_message_cb_{nullptr}; std::mutex on_new_message_m_; + size_t qos_depth_; uint64_t new_data_unread_count_ = 0; }; From ea052775b566cab9d375be8f2f6f89f3231ec7cf Mon Sep 17 00:00:00 2001 From: Mauro Passerino Date: Tue, 25 May 2021 11:38:36 +0100 Subject: [PATCH 08/11] Use QoS depth on fastrtps_dynamic_cpp Signed-off-by: Mauro Passerino --- rmw_fastrtps_dynamic_cpp/src/subscription.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_fastrtps_dynamic_cpp/src/subscription.cpp b/rmw_fastrtps_dynamic_cpp/src/subscription.cpp index 4d2415220..4c20dbd09 100644 --- a/rmw_fastrtps_dynamic_cpp/src/subscription.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/subscription.cpp @@ -214,7 +214,7 @@ create_subscription( ///// // Create Listener if (create_subscription_listener) { - info->listener_ = new (std::nothrow) SubListener(info); + info->listener_ = new (std::nothrow) SubListener(info, qos_policies->depth); if (!info->listener_) { RMW_SET_ERROR_MSG("create_subscription() could not create subscription listener"); From 00e7e69b4937b9623ee1581169fe954272b9e3bf Mon Sep 17 00:00:00 2001 From: Mauro Passerino Date: Thu, 10 Jun 2021 13:03:27 +0100 Subject: [PATCH 09/11] Use size_t for new_data_unread_count_ Signed-off-by: Mauro Passerino --- .../include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp index df1974a4f..bc79ea934 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp @@ -223,7 +223,7 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds rmw_event_callback_t on_new_message_cb_{nullptr}; std::mutex on_new_message_m_; size_t qos_depth_; - uint64_t new_data_unread_count_ = 0; + size_t new_data_unread_count_ = 0; }; #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SUBSCRIBER_INFO_HPP_ From a8e6ef8fb52d5a2ce1c26f1d74986b58a4c49f03 Mon Sep 17 00:00:00 2001 From: Mauro Passerino Date: Fri, 17 Sep 2021 19:02:08 +0100 Subject: [PATCH 10/11] Add apis to retrieve service/client QoS Signed-off-by: Mauro Passerino --- rmw_fastrtps_cpp/src/rmw_client.cpp | 16 +++++++ rmw_fastrtps_cpp/src/rmw_service.cpp | 16 +++++++ rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp | 16 +++++++ rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp | 16 +++++++ .../rmw_fastrtps_shared_cpp/rmw_common.hpp | 12 +++++ rmw_fastrtps_shared_cpp/src/rmw_client.cpp | 42 +++++++++++++++++ rmw_fastrtps_shared_cpp/src/rmw_service.cpp | 45 +++++++++++++++++++ 7 files changed, 163 insertions(+) diff --git a/rmw_fastrtps_cpp/src/rmw_client.cpp b/rmw_fastrtps_cpp/src/rmw_client.cpp index e2f27a44d..aefb71f56 100644 --- a/rmw_fastrtps_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_cpp/src/rmw_client.cpp @@ -505,4 +505,20 @@ rmw_client_set_on_new_response_callback( callback, user_data); } + +rmw_ret_t +rmw_client_get_actual_qos( + const rmw_client_t * client, + rmw_qos_profile_t * qos) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + client, + client->implementation_identifier, + eprosima_fastrtps_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT); + + return rmw_fastrtps_shared_cpp::__rmw_client_get_actual_qos(client, qos); +} } // extern "C" diff --git a/rmw_fastrtps_cpp/src/rmw_service.cpp b/rmw_fastrtps_cpp/src/rmw_service.cpp index c7302161d..dc80712fb 100644 --- a/rmw_fastrtps_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_cpp/src/rmw_service.cpp @@ -504,4 +504,20 @@ rmw_service_set_on_new_request_callback( callback, user_data); } + +rmw_ret_t +rmw_service_get_actual_qos( + const rmw_service_t * service, + rmw_qos_profile_t * qos) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + service, + service->implementation_identifier, + eprosima_fastrtps_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT); + + return rmw_fastrtps_shared_cpp::__rmw_service_get_actual_qos(service, qos); +} } // extern "C" diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp index 0f8a82391..cc95e08dc 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp @@ -540,4 +540,20 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) return rmw_fastrtps_shared_cpp::__rmw_destroy_client( eprosima_fastrtps_identifier, node, client); } + +rmw_ret_t +rmw_client_get_actual_qos( + const rmw_client_t * client, + rmw_qos_profile_t * qos) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + client, + client->implementation_identifier, + eprosima_fastrtps_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT); + + return rmw_fastrtps_shared_cpp::__rmw_client_get_actual_qos(client, qos); +} } // extern "C" diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp index 214d4e1a5..8fc83e541 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp @@ -539,4 +539,20 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service) return rmw_fastrtps_shared_cpp::__rmw_destroy_service( eprosima_fastrtps_identifier, node, service); } + +rmw_ret_t +rmw_service_get_actual_qos( + const rmw_service_t * service, + rmw_qos_profile_t * qos) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + service, + service->implementation_identifier, + eprosima_fastrtps_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT); + + return rmw_fastrtps_shared_cpp::__rmw_service_get_actual_qos(service, qos); +} } // extern "C" diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp index d965f4f03..860ccd8da 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp @@ -469,6 +469,12 @@ __rmw_service_set_on_new_request_callback( rmw_event_callback_t callback, const void * user_data); +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_service_get_actual_qos( + const rmw_service_t * service, + rmw_qos_profile_t * qos); + RMW_FASTRTPS_SHARED_CPP_PUBLIC rmw_ret_t __rmw_client_set_on_new_response_callback( @@ -476,6 +482,12 @@ __rmw_client_set_on_new_response_callback( rmw_event_callback_t callback, const void * user_data); +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_client_get_actual_qos( + const rmw_client_t * client, + rmw_qos_profile_t * qos); + RMW_FASTRTPS_SHARED_CPP_PUBLIC rmw_ret_t __rmw_event_set_callback( diff --git a/rmw_fastrtps_shared_cpp/src/rmw_client.cpp b/rmw_fastrtps_shared_cpp/src/rmw_client.cpp index a0568f84c..0d9a1dcac 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_client.cpp @@ -139,4 +139,46 @@ __rmw_client_set_on_new_response_callback( callback); return RMW_RET_OK; } + +rmw_ret_t +__rmw_client_get_actual_qos( + const rmw_client_t * client, + rmw_qos_profile_t * qos) +{ + auto info = static_cast(client->data); + + eprosima::fastdds::dds::DataReader * fastdds_dr = info->response_reader_; + eprosima::fastdds::dds::DataWriter * fastdds_dw = info->request_writer_; + + rmw_qos_profile_t dr_qos = rmw_qos_profile_default; + rmw_qos_profile_t dw_qos = rmw_qos_profile_default; + + dds_qos_to_rmw_qos(fastdds_dr->get_qos(), &dr_qos); + dds_qos_to_rmw_qos(fastdds_dw->get_qos(), &dw_qos); + + // Check if the QoS of the data reader and the data writer match + if (dr_qos.history != dw_qos.history || + dr_qos.depth != dw_qos.depth || + dr_qos.reliability != dw_qos.reliability || + dr_qos.durability != dw_qos.durability || + dr_qos.liveliness != dw_qos.liveliness || + dr_qos.deadline.sec != dw_qos.deadline.sec || + dr_qos.deadline.nsec != dw_qos.deadline.nsec || + dr_qos.lifespan.sec != dw_qos.lifespan.sec || + dr_qos.lifespan.nsec != dw_qos.lifespan.nsec || + dr_qos.liveliness_lease_duration.sec != dw_qos.liveliness_lease_duration.sec || + dr_qos.liveliness_lease_duration.nsec != dw_qos.liveliness_lease_duration.nsec || + dr_qos.avoid_ros_namespace_conventions != dw_qos.avoid_ros_namespace_conventions) + { + // This situation can happen if we set system default settings for qos. + // As no qos is defined by the user, the dds han chose to assign one qos policy for the + // reader and a different for the writer. Currently seems to only happen to durability, + // which is set to volatile for data readers, and transient local for data writers. + RMW_SET_ERROR_MSG("client's datawriter QoS does not match client's datareader QoS"); + return RMW_RET_ERROR; + } + + *qos = dw_qos; + return RMW_RET_OK; +} } // namespace rmw_fastrtps_shared_cpp diff --git a/rmw_fastrtps_shared_cpp/src/rmw_service.cpp b/rmw_fastrtps_shared_cpp/src/rmw_service.cpp index cfdb4dc76..05aba98f8 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_service.cpp @@ -151,4 +151,49 @@ __rmw_service_set_on_new_request_callback( callback); return RMW_RET_OK; } + +rmw_ret_t +__rmw_service_get_actual_qos( + const rmw_service_t * service, + rmw_qos_profile_t * qos) +{ + auto info = static_cast(service->data); + + eprosima::fastdds::dds::DataReader * fastdds_dr = info->request_reader_; + eprosima::fastdds::dds::DataWriter * fastdds_dw = info->response_writer_; + + rmw_qos_profile_t dr_qos = rmw_qos_profile_default; + rmw_qos_profile_t dw_qos = rmw_qos_profile_default; + + dds_qos_to_rmw_qos(fastdds_dr->get_qos(), &dr_qos); + dds_qos_to_rmw_qos(fastdds_dw->get_qos(), &dw_qos); + + // Check if the QoS of the data reader and the data writer match + if (dr_qos.history != dw_qos.history || + dr_qos.depth != dw_qos.depth || + dr_qos.reliability != dw_qos.reliability || + dr_qos.durability != dw_qos.durability || + dr_qos.liveliness != dw_qos.liveliness || + dr_qos.deadline.sec != dw_qos.deadline.sec || + dr_qos.deadline.nsec != dw_qos.deadline.nsec || + dr_qos.lifespan.sec != dw_qos.lifespan.sec || + dr_qos.lifespan.nsec != dw_qos.lifespan.nsec || + dr_qos.liveliness_lease_duration.sec != dw_qos.liveliness_lease_duration.sec || + dr_qos.liveliness_lease_duration.nsec != dw_qos.liveliness_lease_duration.nsec || + dr_qos.avoid_ros_namespace_conventions != dw_qos.avoid_ros_namespace_conventions) + { + // This situation can happen if we set system default settings for qos. + // As no qos is defined by the user, the dds han chose to assign one qos policy for the + // reader and a different for the writer. Currently seems to only happen to durability, + // which is set to volatile for data readers, and transient local for data writers. + RMW_SET_ERROR_MSG("server's datawriter QoS does not match client's datareader QoS"); + return RMW_RET_ERROR; + } + + // The service has a single QoS, we can set it either as the qos + // of the data reader or data writer, as they match. + *qos = dw_qos; + + return RMW_RET_OK; +} } // namespace rmw_fastrtps_shared_cpp From bea69b8448e667b2cffe1e9b0d9460087a1084b0 Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Wed, 6 Oct 2021 23:58:25 +0800 Subject: [PATCH 11/11] fix QoS depth settings for clients/service ignored (#564) Signed-off-by: Chen Lihui Co-authored-by: Miguel Company --- .../include/rmw_fastrtps_shared_cpp/custom_client_info.hpp | 7 +++++++ .../rmw_fastrtps_shared_cpp/custom_service_info.hpp | 7 +++++++ 2 files changed, 14 insertions(+) diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp index 910385b1f..5278ee58c 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp @@ -33,6 +33,7 @@ #include "fastdds/dds/subscriber/DataReader.hpp" #include "fastdds/dds/subscriber/DataReaderListener.hpp" #include "fastdds/dds/subscriber/SampleInfo.hpp" +#include "fastdds/dds/subscriber/qos/DataReaderQos.hpp" #include "fastdds/dds/topic/TypeSupport.hpp" #include "fastdds/rtps/common/Guid.h" @@ -106,6 +107,12 @@ class ClientListener : public eprosima::fastdds::dds::DataReaderListener response.sample_identity_.writer_guid() == info_->writer_guid_) { std::lock_guard lock(internalMutex_); + const eprosima::fastrtps::HistoryQosPolicy & history = reader->get_qos().history(); + if (eprosima::fastrtps::KEEP_LAST_HISTORY_QOS == history.kind) { + while (list.size() >= static_cast(history.depth)) { + list.pop_front(); + } + } if (conditionMutex_ != nullptr) { std::unique_lock clock(*conditionMutex_); diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp index 931cf62ec..958cd9c75 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp @@ -31,6 +31,7 @@ #include "fastdds/dds/subscriber/DataReader.hpp" #include "fastdds/dds/subscriber/DataReaderListener.hpp" #include "fastdds/dds/subscriber/SampleInfo.hpp" +#include "fastdds/dds/subscriber/qos/DataReaderQos.hpp" #include "fastdds/dds/topic/TypeSupport.hpp" #include "fastdds/rtps/common/Guid.h" @@ -227,6 +228,12 @@ class ServiceListener : public eprosima::fastdds::dds::DataReaderListener info_->pub_listener_->endpoint_add_reader_and_writer(reader_guid, writer_guid); std::lock_guard lock(internalMutex_); + const eprosima::fastrtps::HistoryQosPolicy & history = reader->get_qos().history(); + if (eprosima::fastrtps::KEEP_LAST_HISTORY_QOS == history.kind) { + while (list.size() >= static_cast(history.depth)) { + list.pop_front(); + } + } if (conditionMutex_ != nullptr) { std::unique_lock clock(*conditionMutex_);