Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add apis to retrieve service/client QoS #27

Open
wants to merge 16 commits into
base: irobot/add-events-executor
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -493,4 +493,32 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client)
return rmw_fastrtps_shared_cpp::__rmw_destroy_client(
eprosima_fastrtps_identifier, node, client);
}

rmw_ret_t
rmw_client_set_on_new_response_callback(
rmw_client_t * rmw_client,
rmw_event_callback_t callback,
const void * user_data)
{
return rmw_fastrtps_shared_cpp::__rmw_client_set_on_new_response_callback(
rmw_client,
callback,
user_data);
}

rmw_ret_t
rmw_client_get_actual_qos(
const rmw_client_t * client,
rmw_qos_profile_t * qos)
{
RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
client,
client->implementation_identifier,
eprosima_fastrtps_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT);

return rmw_fastrtps_shared_cpp::__rmw_client_get_actual_qos(client, qos);
}
} // extern "C"
12 changes: 12 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,16 @@ rmw_subscription_event_init(
subscription->data,
event_type);
}

rmw_ret_t
rmw_event_set_callback(
rmw_event_t * rmw_event,
rmw_event_callback_t callback,
const void * user_data)
{
return rmw_fastrtps_shared_cpp::__rmw_event_set_callback(
rmw_event,
callback,
user_data);
}
} // extern "C"
28 changes: 28 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -492,4 +492,32 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service)
return rmw_fastrtps_shared_cpp::__rmw_destroy_service(
eprosima_fastrtps_identifier, node, service);
}

rmw_ret_t
rmw_service_set_on_new_request_callback(
rmw_service_t * rmw_service,
rmw_event_callback_t callback,
const void * user_data)
{
return rmw_fastrtps_shared_cpp::__rmw_service_set_on_new_request_callback(
rmw_service,
callback,
user_data);
}

rmw_ret_t
rmw_service_get_actual_qos(
const rmw_service_t * service,
rmw_qos_profile_t * qos)
{
RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
service,
service->implementation_identifier,
eprosima_fastrtps_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT);

return rmw_fastrtps_shared_cpp::__rmw_service_get_actual_qos(service, qos);
}
} // extern "C"
12 changes: 12 additions & 0 deletions rmw_fastrtps_cpp/src/rmw_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,16 @@ rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription)
return rmw_fastrtps_shared_cpp::__rmw_destroy_subscription(
eprosima_fastrtps_identifier, node, subscription);
}

rmw_ret_t
rmw_subscription_set_on_new_message_callback(
rmw_subscription_t * rmw_subscription,
rmw_event_callback_t callback,
const void * user_data)
{
return rmw_fastrtps_shared_cpp::__rmw_subscription_set_on_new_message_callback(
rmw_subscription,
callback,
user_data);
}
} // extern "C"
2 changes: 1 addition & 1 deletion rmw_fastrtps_cpp/src/subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ create_subscription(
/////
// Create Listener
if (create_subscription_listener) {
info->listener_ = new (std::nothrow) SubListener(info);
info->listener_ = new (std::nothrow) SubListener(info, qos_policies->depth);

if (!info->listener_) {
RMW_SET_ERROR_MSG("create_subscription() could not create subscription listener");
Expand Down
16 changes: 16 additions & 0 deletions rmw_fastrtps_dynamic_cpp/src/rmw_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -540,4 +540,20 @@ rmw_destroy_client(rmw_node_t * node, rmw_client_t * client)
return rmw_fastrtps_shared_cpp::__rmw_destroy_client(
eprosima_fastrtps_identifier, node, client);
}

rmw_ret_t
rmw_client_get_actual_qos(
const rmw_client_t * client,
rmw_qos_profile_t * qos)
{
RMW_CHECK_ARGUMENT_FOR_NULL(client, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
client,
client->implementation_identifier,
eprosima_fastrtps_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT);

return rmw_fastrtps_shared_cpp::__rmw_client_get_actual_qos(client, qos);
}
} // extern "C"
16 changes: 16 additions & 0 deletions rmw_fastrtps_dynamic_cpp/src/rmw_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -539,4 +539,20 @@ rmw_destroy_service(rmw_node_t * node, rmw_service_t * service)
return rmw_fastrtps_shared_cpp::__rmw_destroy_service(
eprosima_fastrtps_identifier, node, service);
}

rmw_ret_t
rmw_service_get_actual_qos(
const rmw_service_t * service,
rmw_qos_profile_t * qos)
{
RMW_CHECK_ARGUMENT_FOR_NULL(service, RMW_RET_INVALID_ARGUMENT);
RMW_CHECK_TYPE_IDENTIFIERS_MATCH(
service,
service->implementation_identifier,
eprosima_fastrtps_identifier,
return RMW_RET_INCORRECT_RMW_IMPLEMENTATION);
RMW_CHECK_ARGUMENT_FOR_NULL(qos, RMW_RET_INVALID_ARGUMENT);

return rmw_fastrtps_shared_cpp::__rmw_service_get_actual_qos(service, qos);
}
} // extern "C"
2 changes: 1 addition & 1 deletion rmw_fastrtps_dynamic_cpp/src/subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ create_subscription(
/////
// Create Listener
if (create_subscription_listener) {
info->listener_ = new (std::nothrow) SubListener(info);
info->listener_ = new (std::nothrow) SubListener(info, qos_policies->depth);

if (!info->listener_) {
RMW_SET_ERROR_MSG("create_subscription() could not create subscription listener");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "fastdds/dds/subscriber/DataReader.hpp"
#include "fastdds/dds/subscriber/DataReaderListener.hpp"
#include "fastdds/dds/subscriber/SampleInfo.hpp"
#include "fastdds/dds/subscriber/qos/DataReaderQos.hpp"
#include "fastdds/dds/topic/TypeSupport.hpp"

#include "fastdds/rtps/common/Guid.h"
Expand All @@ -41,6 +42,8 @@

#include "rcpputils/thread_safety_annotations.hpp"

#include "rmw/event_callback_type.h"

#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"

class ClientListener;
Expand Down Expand Up @@ -104,6 +107,12 @@ class ClientListener : public eprosima::fastdds::dds::DataReaderListener
response.sample_identity_.writer_guid() == info_->writer_guid_)
{
std::lock_guard<std::mutex> lock(internalMutex_);
const eprosima::fastrtps::HistoryQosPolicy & history = reader->get_qos().history();
if (eprosima::fastrtps::KEEP_LAST_HISTORY_QOS == history.kind) {
while (list.size() >= static_cast<size_t>(history.depth)) {
list.pop_front();
}
}

if (conditionMutex_ != nullptr) {
std::unique_lock<std::mutex> clock(*conditionMutex_);
Expand All @@ -118,6 +127,14 @@ class ClientListener : public eprosima::fastdds::dds::DataReaderListener
list.emplace_back(std::move(response));
list_has_data_.store(true);
}

std::unique_lock<std::mutex> lock_mutex(on_new_response_m_);

if(on_new_response_cb_) {
on_new_response_cb_(user_data_, 1);
} else {
unread_count_++;
}
}
}
}
Expand Down Expand Up @@ -174,6 +191,29 @@ class ClientListener : public eprosima::fastdds::dds::DataReaderListener
info_->response_subscriber_matched_count_.store(publishers_.size());
}

// Provide handlers to perform an action when a
// new event from this listener has ocurred
void
set_on_new_response_callback(
const void * user_data,
rmw_event_callback_t callback)
{
std::unique_lock<std::mutex> lock_mutex(on_new_response_m_);

if (callback) {
// Push events arrived before setting the the executor callback
if (unread_count_) {
callback(user_data, unread_count_);
unread_count_ = 0;
}
user_data_ = user_data;
on_new_response_cb_ = callback;
} else {
user_data_ = nullptr;
on_new_response_cb_ = nullptr;
}
}

private:
bool popResponse(CustomClientResponse & response) RCPPUTILS_TSA_REQUIRES(internalMutex_)
{
Expand All @@ -193,6 +233,11 @@ class ClientListener : public eprosima::fastdds::dds::DataReaderListener
std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::set<eprosima::fastrtps::rtps::GUID_t> publishers_;

rmw_event_callback_t on_new_response_cb_{nullptr};
const void * user_data_{nullptr};
std::mutex on_new_response_m_;
uint64_t unread_count_ = 0;
};

class ClientPubListener : public eprosima::fastdds::dds::DataWriterListener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "fastcdr/FastBuffer.h"

#include "rmw/event.h"
#include "rmw/event_callback_type.h"

#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"

Expand Down Expand Up @@ -58,6 +59,17 @@ class EventListenerInterface
* \return `false` if data was not available, in this case nothing was written to event_info.
*/
virtual bool takeNextEvent(rmw_event_type_t event_type, void * event_info) = 0;

// Provide handlers to perform an action when a
// new event from this listener has ocurred
virtual void set_on_new_event_callback(
const void * user_data,
rmw_event_callback_t callback) = 0;

rmw_event_callback_t on_new_event_cb_{nullptr};
const void * user_data_{nullptr};
uint64_t unread_events_count_ = 0;
std::mutex on_new_event_m_;
};

class EventListenerInterface::ConditionalScopedLock
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,11 @@ class PubListener : public EventListenerInterface, public eprosima::fastdds::dds
bool
hasEvent(rmw_event_type_t event_type) const final;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
void set_on_new_event_callback(
const void * user_data,
rmw_event_callback_t callback) final;

RMW_FASTRTPS_SHARED_CPP_PUBLIC
bool
takeNextEvent(rmw_event_type_t event_type, void * event_info) final;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "fastdds/dds/subscriber/DataReader.hpp"
#include "fastdds/dds/subscriber/DataReaderListener.hpp"
#include "fastdds/dds/subscriber/SampleInfo.hpp"
#include "fastdds/dds/subscriber/qos/DataReaderQos.hpp"
#include "fastdds/dds/topic/TypeSupport.hpp"

#include "fastdds/rtps/common/Guid.h"
Expand All @@ -39,6 +40,8 @@

#include "rcpputils/thread_safety_annotations.hpp"

#include "rmw/event_callback_type.h"

#include "rmw_fastrtps_shared_cpp/guid_utils.hpp"
#include "rmw_fastrtps_shared_cpp/TypeSupport.hpp"

Expand Down Expand Up @@ -225,6 +228,12 @@ class ServiceListener : public eprosima::fastdds::dds::DataReaderListener
info_->pub_listener_->endpoint_add_reader_and_writer(reader_guid, writer_guid);

std::lock_guard<std::mutex> lock(internalMutex_);
const eprosima::fastrtps::HistoryQosPolicy & history = reader->get_qos().history();
if (eprosima::fastrtps::KEEP_LAST_HISTORY_QOS == history.kind) {
while (list.size() >= static_cast<size_t>(history.depth)) {
list.pop_front();
}
}

if (conditionMutex_ != nullptr) {
std::unique_lock<std::mutex> clock(*conditionMutex_);
Expand All @@ -239,6 +248,14 @@ class ServiceListener : public eprosima::fastdds::dds::DataReaderListener
list.push_back(request);
list_has_data_.store(true);
}

std::unique_lock<std::mutex> lock_mutex(on_new_request_m_);

if(on_new_request_cb_) {
on_new_request_cb_(user_data_, 1);
} else {
unread_count_++;
}
}
}
}
Expand Down Expand Up @@ -289,13 +306,41 @@ class ServiceListener : public eprosima::fastdds::dds::DataReaderListener
return list_has_data_.load();
}

// Provide handlers to perform an action when a
// new event from this listener has ocurred
void
set_on_new_request_callback(
const void * user_data,
rmw_event_callback_t callback)
{
std::unique_lock<std::mutex> lock_mutex(on_new_request_m_);

if (callback) {
// Push events arrived before setting the the executor callback
if (unread_count_) {
callback(user_data, unread_count_);
unread_count_ = 0;
}
user_data_ = user_data;
on_new_request_cb_ = callback;
} else {
user_data_ = nullptr;
on_new_request_cb_ = nullptr;
}
}

private:
CustomServiceInfo * info_;
std::mutex internalMutex_;
std::list<CustomServiceRequest> list RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::atomic_bool list_has_data_;
std::mutex * conditionMutex_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);
std::condition_variable * conditionVariable_ RCPPUTILS_TSA_GUARDED_BY(internalMutex_);

rmw_event_callback_t on_new_request_cb_{nullptr};
const void * user_data_{nullptr};
std::mutex on_new_request_m_;
uint64_t unread_count_ = 0;
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_SERVICE_INFO_HPP_
Loading