diff --git a/rmw_fastrtps_cpp/src/rmw_client.cpp b/rmw_fastrtps_cpp/src/rmw_client.cpp index d10a61f41..aefb71f56 100644 --- a/rmw_fastrtps_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_cpp/src/rmw_client.cpp @@ -493,4 +493,32 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) return rmw_fastrtps_shared_cpp::__rmw_destroy_client( eprosima_fastrtps_identifier, node, client); } + +rmw_ret_t +rmw_client_set_on_new_response_callback( + rmw_client_t * rmw_client, + rmw_event_callback_t callback, + const void * user_data) +{ + return rmw_fastrtps_shared_cpp::__rmw_client_set_on_new_response_callback( + rmw_client, + callback, + user_data); +} + +rmw_ret_t +rmw_client_get_actual_qos( + const rmw_client_t * client, + rmw_qos_profile_t * qos) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + client, + client->implementation_identifier, + eprosima_fastrtps_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT); + + return rmw_fastrtps_shared_cpp::__rmw_client_get_actual_qos(client, qos); +} } // extern "C" diff --git a/rmw_fastrtps_cpp/src/rmw_event.cpp b/rmw_fastrtps_cpp/src/rmw_event.cpp index 5f4684777..9bdc819e7 100644 --- a/rmw_fastrtps_cpp/src/rmw_event.cpp +++ b/rmw_fastrtps_cpp/src/rmw_event.cpp @@ -47,4 +47,16 @@ rmw_subscription_event_init( subscription->data, event_type); } + +rmw_ret_t +rmw_event_set_callback( + rmw_event_t * rmw_event, + rmw_event_callback_t callback, + const void * user_data) +{ + return rmw_fastrtps_shared_cpp::__rmw_event_set_callback( + rmw_event, + callback, + user_data); +} } // extern "C" diff --git a/rmw_fastrtps_cpp/src/rmw_service.cpp b/rmw_fastrtps_cpp/src/rmw_service.cpp index 7ac758fa6..dc80712fb 100644 --- a/rmw_fastrtps_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_cpp/src/rmw_service.cpp @@ -492,4 +492,32 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service) return rmw_fastrtps_shared_cpp::__rmw_destroy_service( eprosima_fastrtps_identifier, node, service); } + +rmw_ret_t +rmw_service_set_on_new_request_callback( + rmw_service_t * rmw_service, + rmw_event_callback_t callback, + const void * user_data) +{ + return rmw_fastrtps_shared_cpp::__rmw_service_set_on_new_request_callback( + rmw_service, + callback, + user_data); +} + +rmw_ret_t +rmw_service_get_actual_qos( + const rmw_service_t * service, + rmw_qos_profile_t * qos) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + service, + service->implementation_identifier, + eprosima_fastrtps_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT); + + return rmw_fastrtps_shared_cpp::__rmw_service_get_actual_qos(service, qos); +} } // extern "C" diff --git a/rmw_fastrtps_cpp/src/rmw_subscription.cpp b/rmw_fastrtps_cpp/src/rmw_subscription.cpp index 6ce16e3ad..58367e4ba 100644 --- a/rmw_fastrtps_cpp/src/rmw_subscription.cpp +++ b/rmw_fastrtps_cpp/src/rmw_subscription.cpp @@ -167,4 +167,16 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription) return rmw_fastrtps_shared_cpp::__rmw_destroy_subscription( eprosima_fastrtps_identifier, node, subscription); } + +rmw_ret_t +rmw_subscription_set_on_new_message_callback( + rmw_subscription_t * rmw_subscription, + rmw_event_callback_t callback, + const void * user_data) +{ + return rmw_fastrtps_shared_cpp::__rmw_subscription_set_on_new_message_callback( + rmw_subscription, + callback, + user_data); +} } // extern "C" diff --git a/rmw_fastrtps_cpp/src/subscription.cpp b/rmw_fastrtps_cpp/src/subscription.cpp index 557ad0549..817f5a3b5 100644 --- a/rmw_fastrtps_cpp/src/subscription.cpp +++ b/rmw_fastrtps_cpp/src/subscription.cpp @@ -200,7 +200,7 @@ create_subscription( ///// // Create Listener if (create_subscription_listener) { - info->listener_ = new (std::nothrow) SubListener(info); + info->listener_ = new (std::nothrow) SubListener(info, qos_policies->depth); if (!info->listener_) { RMW_SET_ERROR_MSG("create_subscription() could not create subscription listener"); diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp index 0f8a82391..cc95e08dc 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp @@ -540,4 +540,20 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client) return rmw_fastrtps_shared_cpp::__rmw_destroy_client( eprosima_fastrtps_identifier, node, client); } + +rmw_ret_t +rmw_client_get_actual_qos( + const rmw_client_t * client, + rmw_qos_profile_t * qos) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + client, + client->implementation_identifier, + eprosima_fastrtps_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT); + + return rmw_fastrtps_shared_cpp::__rmw_client_get_actual_qos(client, qos); +} } // extern "C" diff --git a/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp b/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp index 214d4e1a5..8fc83e541 100644 --- a/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp @@ -539,4 +539,20 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service) return rmw_fastrtps_shared_cpp::__rmw_destroy_service( eprosima_fastrtps_identifier, node, service); } + +rmw_ret_t +rmw_service_get_actual_qos( + const rmw_service_t * service, + rmw_qos_profile_t * qos) +{ + RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT); + RMW_CHECK_TYPE_IDENTIFIERS_MATCH( + service, + service->implementation_identifier, + eprosima_fastrtps_identifier, + return RMW_RET_INCORRECT_RMW_IMPLEMENTATION); + RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT); + + return rmw_fastrtps_shared_cpp::__rmw_service_get_actual_qos(service, qos); +} } // extern "C" diff --git a/rmw_fastrtps_dynamic_cpp/src/subscription.cpp b/rmw_fastrtps_dynamic_cpp/src/subscription.cpp index 4d2415220..4c20dbd09 100644 --- a/rmw_fastrtps_dynamic_cpp/src/subscription.cpp +++ b/rmw_fastrtps_dynamic_cpp/src/subscription.cpp @@ -214,7 +214,7 @@ create_subscription( ///// // Create Listener if (create_subscription_listener) { - info->listener_ = new (std::nothrow) SubListener(info); + info->listener_ = new (std::nothrow) SubListener(info, qos_policies->depth); if (!info->listener_) { RMW_SET_ERROR_MSG("create_subscription() could not create subscription listener"); diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp index dad3c01e3..5278ee58c 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_client_info.hpp @@ -33,6 +33,7 @@ #include "fastdds/dds/subscriber/DataReader.hpp" #include "fastdds/dds/subscriber/DataReaderListener.hpp" #include "fastdds/dds/subscriber/SampleInfo.hpp" +#include "fastdds/dds/subscriber/qos/DataReaderQos.hpp" #include "fastdds/dds/topic/TypeSupport.hpp" #include "fastdds/rtps/common/Guid.h" @@ -41,6 +42,8 @@ #include "rcpputils/thread_safety_annotations.hpp" +#include "rmw/event_callback_type.h" + #include "rmw_fastrtps_shared_cpp/TypeSupport.hpp" class ClientListener; @@ -104,6 +107,12 @@ class ClientListener : public eprosima::fastdds::dds::DataReaderListener response.sample_identity_.writer_guid() == info_->writer_guid_) { std::lock_guard lock(internalMutex_); + const eprosima::fastrtps::HistoryQosPolicy & history = reader->get_qos().history(); + if (eprosima::fastrtps::KEEP_LAST_HISTORY_QOS == history.kind) { + while (list.size() >= static_cast(history.depth)) { + list.pop_front(); + } + } if (conditionMutex_ != nullptr) { std::unique_lock clock(*conditionMutex_); @@ -118,6 +127,14 @@ class ClientListener : public eprosima::fastdds::dds::DataReaderListener list.emplace_back(std::move(response)); list_has_data_.store(true); } + + std::unique_lock lock_mutex(on_new_response_m_); + + if(on_new_response_cb_) { + on_new_response_cb_(user_data_, 1); + } else { + unread_count_++; + } } } } @@ -174,6 +191,29 @@ class ClientListener : public eprosima::fastdds::dds::DataReaderListener info_->response_subscriber_matched_count_.store(publishers_.size()); } + // Provide handlers to perform an action when a + // new event from this listener has ocurred + void + set_on_new_response_callback( + const void * user_data, + rmw_event_callback_t callback) + { + std::unique_lock lock_mutex(on_new_response_m_); + + if (callback) { + // Push events arrived before setting the the executor callback + if (unread_count_) { + callback(user_data, unread_count_); + unread_count_ = 0; + } + user_data_ = user_data; + on_new_response_cb_ = callback; + } else { + user_data_ = nullptr; + on_new_response_cb_ = nullptr; + } + } + private: bool popResponse(CustomClientResponse & response) RCPPUTILS_TSA_REQUIRES(internalMutex_) { @@ -193,6 +233,11 @@ class ClientListener : public eprosima::fastdds::dds::DataReaderListener std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); std::set publishers_; + + rmw_event_callback_t on_new_response_cb_{nullptr}; + const void * user_data_{nullptr}; + std::mutex on_new_response_m_; + uint64_t unread_count_ = 0; }; class ClientPubListener : public eprosima::fastdds::dds::DataWriterListener diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp index edd2794ad..d7df46611 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_event_info.hpp @@ -25,6 +25,7 @@ #include "fastcdr/FastBuffer.h" #include "rmw/event.h" +#include "rmw/event_callback_type.h" #include "rmw_fastrtps_shared_cpp/TypeSupport.hpp" @@ -58,6 +59,17 @@ class EventListenerInterface * \return `false` if data was not available, in this case nothing was written to event_info. */ virtual bool takeNextEvent(rmw_event_type_t event_type, void * event_info) = 0; + + // Provide handlers to perform an action when a + // new event from this listener has ocurred + virtual void set_on_new_event_callback( + const void * user_data, + rmw_event_callback_t callback) = 0; + + rmw_event_callback_t on_new_event_cb_{nullptr}; + const void * user_data_{nullptr}; + uint64_t unread_events_count_ = 0; + std::mutex on_new_event_m_; }; class EventListenerInterface::ConditionalScopedLock diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp index ebc37ca4a..a9e4512f2 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_publisher_info.hpp @@ -100,6 +100,11 @@ class PubListener : public EventListenerInterface, public eprosima::fastdds::dds bool hasEvent(rmw_event_type_t event_type) const final; + RMW_FASTRTPS_SHARED_CPP_PUBLIC + void set_on_new_event_callback( + const void * user_data, + rmw_event_callback_t callback) final; + RMW_FASTRTPS_SHARED_CPP_PUBLIC bool takeNextEvent(rmw_event_type_t event_type, void * event_info) final; diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp index f66cbdf60..958cd9c75 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_service_info.hpp @@ -31,6 +31,7 @@ #include "fastdds/dds/subscriber/DataReader.hpp" #include "fastdds/dds/subscriber/DataReaderListener.hpp" #include "fastdds/dds/subscriber/SampleInfo.hpp" +#include "fastdds/dds/subscriber/qos/DataReaderQos.hpp" #include "fastdds/dds/topic/TypeSupport.hpp" #include "fastdds/rtps/common/Guid.h" @@ -39,6 +40,8 @@ #include "rcpputils/thread_safety_annotations.hpp" +#include "rmw/event_callback_type.h" + #include "rmw_fastrtps_shared_cpp/guid_utils.hpp" #include "rmw_fastrtps_shared_cpp/TypeSupport.hpp" @@ -225,6 +228,12 @@ class ServiceListener : public eprosima::fastdds::dds::DataReaderListener info_->pub_listener_->endpoint_add_reader_and_writer(reader_guid, writer_guid); std::lock_guard lock(internalMutex_); + const eprosima::fastrtps::HistoryQosPolicy & history = reader->get_qos().history(); + if (eprosima::fastrtps::KEEP_LAST_HISTORY_QOS == history.kind) { + while (list.size() >= static_cast(history.depth)) { + list.pop_front(); + } + } if (conditionMutex_ != nullptr) { std::unique_lock clock(*conditionMutex_); @@ -239,6 +248,14 @@ class ServiceListener : public eprosima::fastdds::dds::DataReaderListener list.push_back(request); list_has_data_.store(true); } + + std::unique_lock lock_mutex(on_new_request_m_); + + if(on_new_request_cb_) { + on_new_request_cb_(user_data_, 1); + } else { + unread_count_++; + } } } } @@ -289,6 +306,29 @@ class ServiceListener : public eprosima::fastdds::dds::DataReaderListener return list_has_data_.load(); } + // Provide handlers to perform an action when a + // new event from this listener has ocurred + void + set_on_new_request_callback( + const void * user_data, + rmw_event_callback_t callback) + { + std::unique_lock lock_mutex(on_new_request_m_); + + if (callback) { + // Push events arrived before setting the the executor callback + if (unread_count_) { + callback(user_data, unread_count_); + unread_count_ = 0; + } + user_data_ = user_data; + on_new_request_cb_ = callback; + } else { + user_data_ = nullptr; + on_new_request_cb_ = nullptr; + } + } + private: CustomServiceInfo * info_; std::mutex internalMutex_; @@ -296,6 +336,11 @@ class ServiceListener : public eprosima::fastdds::dds::DataReaderListener std::atomic_bool list_has_data_; std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + + rmw_event_callback_t on_new_request_cb_{nullptr}; + const void * user_data_{nullptr}; + std::mutex on_new_request_m_; + uint64_t unread_count_ = 0; }; #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_ diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp index f9c261581..bc79ea934 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/custom_subscriber_info.hpp @@ -35,6 +35,7 @@ #include "rcpputils/thread_safety_annotations.hpp" #include "rmw/impl/cpp/macros.hpp" +#include "rmw/event_callback_type.h" #include "rmw_fastrtps_shared_cpp/custom_event_info.hpp" @@ -66,13 +67,14 @@ struct CustomSubscriberInfo : public CustomEventInfo class SubListener : public EventListenerInterface, public eprosima::fastdds::dds::DataReaderListener { public: - explicit SubListener(CustomSubscriberInfo * info) + explicit SubListener(CustomSubscriberInfo * info, size_t qos_depth) : data_(false), deadline_changes_(false), liveliness_changes_(false), conditionMutex_(nullptr), conditionVariable_(nullptr) { + qos_depth_ = (qos_depth > 0) ? qos_depth : std::numeric_limits::max(); // Field is not used right now (void)info; } @@ -98,6 +100,14 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds on_data_available(eprosima::fastdds::dds::DataReader * reader) final { update_has_data(reader); + + std::unique_lock lock_mutex(on_new_message_m_); + + if(on_new_message_cb_) { + on_new_message_cb_(user_data_, 1); + } else { + new_data_unread_count_++; + } } RMW_FASTRTPS_SHARED_CPP_PUBLIC @@ -117,6 +127,11 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds bool hasEvent(rmw_event_type_t event_type) const final; + RMW_FASTRTPS_SHARED_CPP_PUBLIC + void set_on_new_event_callback( + const void * user_data, + rmw_event_callback_t callback) final; + RMW_FASTRTPS_SHARED_CPP_PUBLIC bool takeNextEvent(rmw_event_type_t event_type, void * event_info) final; @@ -163,6 +178,30 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds return publishers_.size(); } + // Provide handlers to perform an action when a + // new event from this listener has ocurred + void + set_on_new_message_callback( + const void * user_data, + rmw_event_callback_t callback) + { + std::unique_lock lock_mutex(on_new_message_m_); + + if (callback) { + // Push events arrived before setting the executor's callback + if (new_data_unread_count_) { + auto unread_count = std::min(new_data_unread_count_, qos_depth_); + callback(user_data, unread_count); + new_data_unread_count_ = 0; + } + user_data_ = user_data; + on_new_message_cb_ = callback; + } else { + user_data_ = nullptr; + on_new_message_cb_ = nullptr; + } + } + private: mutable std::mutex internalMutex_; @@ -180,6 +219,11 @@ class SubListener : public EventListenerInterface, public eprosima::fastdds::dds std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); std::set publishers_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_); + + rmw_event_callback_t on_new_message_cb_{nullptr}; + std::mutex on_new_message_m_; + size_t qos_depth_; + size_t new_data_unread_count_ = 0; }; #endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SUBSCRIBER_INFO_HPP_ diff --git a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp index 7c187b31d..860ccd8da 100644 --- a/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp +++ b/rmw_fastrtps_shared_cpp/include/rmw_fastrtps_shared_cpp/rmw_common.hpp @@ -455,6 +455,46 @@ __rmw_subscription_get_network_flow_endpoints( rcutils_allocator_t * allocator, rmw_network_flow_endpoint_array_t * network_flow_endpoint_array); +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_subscription_set_on_new_message_callback( + rmw_subscription_t * rmw_subscription, + rmw_event_callback_t callback, + const void * user_data); + +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_service_set_on_new_request_callback( + rmw_service_t * rmw_service, + rmw_event_callback_t callback, + const void * user_data); + +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_service_get_actual_qos( + const rmw_service_t * service, + rmw_qos_profile_t * qos); + +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_client_set_on_new_response_callback( + rmw_client_t * rmw_client, + rmw_event_callback_t callback, + const void * user_data); + +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_client_get_actual_qos( + const rmw_client_t * client, + rmw_qos_profile_t * qos); + +RMW_FASTRTPS_SHARED_CPP_PUBLIC +rmw_ret_t +__rmw_event_set_callback( + rmw_event_t * rmw_event, + rmw_event_callback_t callback, + const void * user_data); + } // namespace rmw_fastrtps_shared_cpp #endif // RMW_FASTRTPS_SHARED_CPP__RMW_COMMON_HPP_ diff --git a/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp b/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp index 840d2e830..4d8151be3 100644 --- a/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp +++ b/rmw_fastrtps_shared_cpp/src/custom_publisher_info.cpp @@ -40,6 +40,14 @@ PubListener::on_offered_deadline_missed( offered_deadline_missed_status_.total_count_change += status.total_count_change; deadline_changes_.store(true, std::memory_order_relaxed); + + std::unique_lock lock_mutex(on_new_event_m_); + + if(on_new_event_cb_){ + on_new_event_cb_(user_data_, 1); + } else { + unread_events_count_++; + } } void PubListener::on_liveliness_lost( @@ -58,6 +66,14 @@ void PubListener::on_liveliness_lost( liveliness_lost_status_.total_count_change += status.total_count_change; liveliness_changes_.store(true, std::memory_order_relaxed); + + std::unique_lock lock_mutex(on_new_event_m_); + + if(on_new_event_cb_) { + on_new_event_cb_(user_data_, 1); + } else { + unread_events_count_++; + } } bool PubListener::hasEvent(rmw_event_type_t event_type) const @@ -74,6 +90,26 @@ bool PubListener::hasEvent(rmw_event_type_t event_type) const return false; } +void PubListener::set_on_new_event_callback( + const void * user_data, + rmw_event_callback_t callback) +{ + std::unique_lock lock_mutex(on_new_event_m_); + + if (callback) { + // Push events arrived before setting the executor's callback + if (unread_events_count_) { + callback(user_data, unread_events_count_); + unread_events_count_ = 0; + } + user_data_ = user_data; + on_new_event_cb_ = callback; + } else { + user_data_ = nullptr; + on_new_event_cb_ = nullptr; + } +} + bool PubListener::takeNextEvent(rmw_event_type_t event_type, void * event_info) { assert(rmw_fastrtps_shared_cpp::internal::is_event_supported(event_type)); diff --git a/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp b/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp index 17da1fa79..a62e0e871 100644 --- a/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp +++ b/rmw_fastrtps_shared_cpp/src/custom_subscriber_info.cpp @@ -42,6 +42,14 @@ SubListener::on_requested_deadline_missed( requested_deadline_missed_status_.total_count_change += status.total_count_change; deadline_changes_.store(true, std::memory_order_relaxed); + + std::unique_lock lock_mutex(on_new_event_m_); + + if(on_new_event_cb_) { + on_new_event_cb_(user_data_, 1); + } else { + unread_events_count_++; + } } void SubListener::on_liveliness_changed( @@ -62,6 +70,14 @@ void SubListener::on_liveliness_changed( liveliness_changed_status_.not_alive_count_change += status.not_alive_count_change; liveliness_changes_.store(true, std::memory_order_relaxed); + + std::unique_lock lock_mutex(on_new_event_m_); + + if(on_new_event_cb_) { + on_new_event_cb_(user_data_, 1); + } else { + unread_events_count_++; + } } bool SubListener::hasEvent(rmw_event_type_t event_type) const @@ -78,6 +94,27 @@ bool SubListener::hasEvent(rmw_event_type_t event_type) const return false; } +void SubListener::set_on_new_event_callback( + const void * user_data, + rmw_event_callback_t callback) +{ + std::unique_lock lock_mutex(on_new_event_m_); + + if (callback) { + // Push events arrived before setting the executor's callback + if (unread_events_count_) { + callback(user_data, unread_events_count_); + unread_events_count_ = 0; + } + user_data_ = user_data; + on_new_event_cb_ = callback; + } else { + user_data_ = nullptr; + on_new_event_cb_ = nullptr; + return; + } +} + bool SubListener::takeNextEvent(rmw_event_type_t event_type, void * event_info) { assert(rmw_fastrtps_shared_cpp::internal::is_event_supported(event_type)); diff --git a/rmw_fastrtps_shared_cpp/src/rmw_client.cpp b/rmw_fastrtps_shared_cpp/src/rmw_client.cpp index d3fde8d9c..0d9a1dcac 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_client.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_client.cpp @@ -126,4 +126,59 @@ __rmw_destroy_client( RCUTILS_CAN_RETURN_WITH_ERROR_OF(RMW_RET_ERROR); // on completion return final_ret; } + +rmw_ret_t +__rmw_client_set_on_new_response_callback( + rmw_client_t * rmw_client, + rmw_event_callback_t callback, + const void * user_data) +{ + auto custom_client_info = static_cast(rmw_client->data); + custom_client_info->listener_->set_on_new_response_callback( + user_data, + callback); + return RMW_RET_OK; +} + +rmw_ret_t +__rmw_client_get_actual_qos( + const rmw_client_t * client, + rmw_qos_profile_t * qos) +{ + auto info = static_cast(client->data); + + eprosima::fastdds::dds::DataReader * fastdds_dr = info->response_reader_; + eprosima::fastdds::dds::DataWriter * fastdds_dw = info->request_writer_; + + rmw_qos_profile_t dr_qos = rmw_qos_profile_default; + rmw_qos_profile_t dw_qos = rmw_qos_profile_default; + + dds_qos_to_rmw_qos(fastdds_dr->get_qos(), &dr_qos); + dds_qos_to_rmw_qos(fastdds_dw->get_qos(), &dw_qos); + + // Check if the QoS of the data reader and the data writer match + if (dr_qos.history != dw_qos.history || + dr_qos.depth != dw_qos.depth || + dr_qos.reliability != dw_qos.reliability || + dr_qos.durability != dw_qos.durability || + dr_qos.liveliness != dw_qos.liveliness || + dr_qos.deadline.sec != dw_qos.deadline.sec || + dr_qos.deadline.nsec != dw_qos.deadline.nsec || + dr_qos.lifespan.sec != dw_qos.lifespan.sec || + dr_qos.lifespan.nsec != dw_qos.lifespan.nsec || + dr_qos.liveliness_lease_duration.sec != dw_qos.liveliness_lease_duration.sec || + dr_qos.liveliness_lease_duration.nsec != dw_qos.liveliness_lease_duration.nsec || + dr_qos.avoid_ros_namespace_conventions != dw_qos.avoid_ros_namespace_conventions) + { + // This situation can happen if we set system default settings for qos. + // As no qos is defined by the user, the dds han chose to assign one qos policy for the + // reader and a different for the writer. Currently seems to only happen to durability, + // which is set to volatile for data readers, and transient local for data writers. + RMW_SET_ERROR_MSG("client's datawriter QoS does not match client's datareader QoS"); + return RMW_RET_ERROR; + } + + *qos = dw_qos; + return RMW_RET_OK; +} } // namespace rmw_fastrtps_shared_cpp diff --git a/rmw_fastrtps_shared_cpp/src/rmw_event.cpp b/rmw_fastrtps_shared_cpp/src/rmw_event.cpp index 4544d3136..cbff3ceab 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_event.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_event.cpp @@ -17,6 +17,7 @@ #include "rmw/impl/cpp/macros.hpp" #include "rmw_fastrtps_shared_cpp/rmw_common.hpp" +#include "rmw_fastrtps_shared_cpp/custom_event_info.hpp" #include "types/event_types.hpp" static const std::unordered_set g_rmw_event_type_set{ @@ -67,4 +68,17 @@ __rmw_init_event( return RMW_RET_OK; } +rmw_ret_t +__rmw_event_set_callback( + rmw_event_t * rmw_event, + rmw_event_callback_t callback, + const void * user_data) +{ + auto custom_event_info = static_cast(rmw_event->data); + custom_event_info->getListener()->set_on_new_event_callback( + user_data, + callback); + return RMW_RET_OK; +} + } // namespace rmw_fastrtps_shared_cpp diff --git a/rmw_fastrtps_shared_cpp/src/rmw_service.cpp b/rmw_fastrtps_shared_cpp/src/rmw_service.cpp index 1eba8692c..05aba98f8 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_service.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_service.cpp @@ -138,4 +138,62 @@ __rmw_destroy_service( RCUTILS_CAN_RETURN_WITH_ERROR_OF(RMW_RET_ERROR); // on completion return final_ret; } + +rmw_ret_t +__rmw_service_set_on_new_request_callback( + rmw_service_t * rmw_service, + rmw_event_callback_t callback, + const void * user_data) +{ + auto custom_service_info = static_cast(rmw_service->data); + custom_service_info->listener_->set_on_new_request_callback( + user_data, + callback); + return RMW_RET_OK; +} + +rmw_ret_t +__rmw_service_get_actual_qos( + const rmw_service_t * service, + rmw_qos_profile_t * qos) +{ + auto info = static_cast(service->data); + + eprosima::fastdds::dds::DataReader * fastdds_dr = info->request_reader_; + eprosima::fastdds::dds::DataWriter * fastdds_dw = info->response_writer_; + + rmw_qos_profile_t dr_qos = rmw_qos_profile_default; + rmw_qos_profile_t dw_qos = rmw_qos_profile_default; + + dds_qos_to_rmw_qos(fastdds_dr->get_qos(), &dr_qos); + dds_qos_to_rmw_qos(fastdds_dw->get_qos(), &dw_qos); + + // Check if the QoS of the data reader and the data writer match + if (dr_qos.history != dw_qos.history || + dr_qos.depth != dw_qos.depth || + dr_qos.reliability != dw_qos.reliability || + dr_qos.durability != dw_qos.durability || + dr_qos.liveliness != dw_qos.liveliness || + dr_qos.deadline.sec != dw_qos.deadline.sec || + dr_qos.deadline.nsec != dw_qos.deadline.nsec || + dr_qos.lifespan.sec != dw_qos.lifespan.sec || + dr_qos.lifespan.nsec != dw_qos.lifespan.nsec || + dr_qos.liveliness_lease_duration.sec != dw_qos.liveliness_lease_duration.sec || + dr_qos.liveliness_lease_duration.nsec != dw_qos.liveliness_lease_duration.nsec || + dr_qos.avoid_ros_namespace_conventions != dw_qos.avoid_ros_namespace_conventions) + { + // This situation can happen if we set system default settings for qos. + // As no qos is defined by the user, the dds han chose to assign one qos policy for the + // reader and a different for the writer. Currently seems to only happen to durability, + // which is set to volatile for data readers, and transient local for data writers. + RMW_SET_ERROR_MSG("server's datawriter QoS does not match client's datareader QoS"); + return RMW_RET_ERROR; + } + + // The service has a single QoS, we can set it either as the qos + // of the data reader or data writer, as they match. + *qos = dw_qos; + + return RMW_RET_OK; +} } // namespace rmw_fastrtps_shared_cpp diff --git a/rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp b/rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp index 98f42159f..b89f7531e 100644 --- a/rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp +++ b/rmw_fastrtps_shared_cpp/src/rmw_subscription.cpp @@ -106,4 +106,17 @@ __rmw_subscription_get_actual_qos( return RMW_RET_OK; } + +rmw_ret_t +__rmw_subscription_set_on_new_message_callback( + rmw_subscription_t * rmw_subscription, + rmw_event_callback_t callback, + const void * user_data) +{ + auto custom_subscriber_info = static_cast(rmw_subscription->data); + custom_subscriber_info->listener_->set_on_new_message_callback( + user_data, + callback); + return RMW_RET_OK; +} } // namespace rmw_fastrtps_shared_cpp