Skip to content

Commit

Permalink
to support a feature of content filtered topic
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Lihui <[email protected]>
  • Loading branch information
Chen Lihui authored and Chen Lihui committed Mar 24, 2021
1 parent 3fa511a commit ba4ece3
Show file tree
Hide file tree
Showing 7 changed files with 357 additions and 2 deletions.
35 changes: 35 additions & 0 deletions rclcpp/include/rclcpp/subscription_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,41 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
bool
exchange_in_use_by_wait_set_state(void * pointer_to_subscription_part, bool in_use_state);

/// Check if subscription instance can support content filter topic feature.
/**
* Depending on the middleware, this will return true if the middleware
* can support content filter topic feature for the subscription.
*
* \return boolean flag indicating if the subscription can support content filter topic feature.
*/
RCLCPP_PUBLIC
bool
is_cft_supported() const;

/// Set the filter expression and expression parameters for the subscription.
/**
* \param[in] filter_expression An filter expression to set.
* \param[in] expression_parameters Array of expression parameters to set.
*/
RCLCPP_PUBLIC
virtual
void
set_cft_expression_parameters(
const std::string & filter_expression,
const std::vector<std::string> & expression_parameters = {});

/// Get the filter expression and expression parameters for the subscription.
/**
* \param[out] filter_expression An filter expression to get.
* \param[out] expression_parameters Array of expression parameters to get.
*/
RCLCPP_PUBLIC
virtual
void
get_cft_expression_parameters(
std::string & filter_expression,
std::vector<std::string> & expression_parameters) const;

protected:
template<typename EventCallbackT>
void
Expand Down
70 changes: 69 additions & 1 deletion rclcpp/include/rclcpp/subscription_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
#include <string>
#include <vector>

#include "rcutils/strdup.h"
#include "rcutils/types/string_array.h"

#include "rclcpp/callback_group.hpp"
#include "rclcpp/detail/rmw_implementation_specific_subscription_payload.hpp"
#include "rclcpp/intra_process_buffer_type.hpp"
Expand Down Expand Up @@ -75,6 +78,17 @@ struct SubscriptionOptionsBase
TopicStatisticsOptions topic_stats_options;

QosOverridingOptions qos_overriding_options;

// Options to configure content filtered topic in the subscription.
struct ContentFilterOptions
{
// Filter expression like SQL statement
std::string filter_expression;
// Expression parameters for filter expression
std::vector<std::string> expression_parameters;
};

ContentFilterOptions content_filter_options;
};

/// Structure containing optional configuration for Subscriptions.
Expand Down Expand Up @@ -105,7 +119,8 @@ struct SubscriptionOptionsWithAllocator : public SubscriptionOptionsBase
using AllocatorTraits = std::allocator_traits<Allocator>;
using MessageAllocatorT = typename AllocatorTraits::template rebind_alloc<MessageT>;
auto message_alloc = std::make_shared<MessageAllocatorT>(*this->get_allocator().get());
result.allocator = allocator::get_rcl_allocator<MessageT>(*message_alloc);
rcl_allocator_t allocator = allocator::get_rcl_allocator<MessageT>(*message_alloc);
result.allocator = allocator;
result.qos = qos.get_rmw_qos_profile();
result.rmw_subscription_options.ignore_local_publications = this->ignore_local_publications;

Expand All @@ -114,6 +129,59 @@ struct SubscriptionOptionsWithAllocator : public SubscriptionOptionsBase
rmw_implementation_payload->modify_rmw_subscription_options(result.rmw_subscription_options);
}

auto fail_clean = [&result]()
{
rcl_ret_t ret = rcl_subscription_options_fini(&result);
if (RCL_RET_OK != ret) {
RCLCPP_ERROR(
rclcpp::get_logger("rclcpp"),
"Failed to fini subscription option: %s",
rcl_get_error_string().str);
rcl_reset_error();
}
};

// Copy content_filter_options into rcl_subscription_options if necessary.
if (!content_filter_options.filter_expression.empty()) {
char * expression =
rcutils_strdup(content_filter_options.filter_expression.c_str(), allocator);
if (!expression) {
throw std::runtime_error("failed to allocate memory for filter expression");
}
result.rmw_subscription_options.filter_expression = expression;

if (!content_filter_options.expression_parameters.empty()) {
rcutils_string_array_t * parameters =
static_cast<rcutils_string_array_t *>(allocator.zero_allocate(
1,
sizeof(rcutils_string_array_t),
allocator.state));
if (!parameters) {
fail_clean();
throw std::runtime_error("failed to allocate memory for expression parameters");
}
result.rmw_subscription_options.expression_parameters = parameters;
rcutils_ret_t ret = rcutils_string_array_init(
parameters, content_filter_options.expression_parameters.size(), &allocator);
if (RCUTILS_RET_OK != ret) {
fail_clean();
rclcpp::exceptions::throw_from_rcl_error(
RCL_RET_ERROR,
"failed to initialize string array for expression parameters");
}

for (size_t i = 0; i < content_filter_options.expression_parameters.size(); ++i) {
char * parameter =
rcutils_strdup(content_filter_options.expression_parameters[i].c_str(), allocator);
if (!parameter) {
fail_clean();
throw std::runtime_error("failed to allocate memory for expression parameters");
}
parameters->data[i] = parameter;
}
}
}

return result;
}

Expand Down
87 changes: 87 additions & 0 deletions rclcpp/src/rclcpp/subscription_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@
#include <string>
#include <vector>

#include "rcutils/strdup.h"
#include "rcutils/types/string_array.h"

#include "rclcpp/exceptions.hpp"
#include "rclcpp/expand_topic_or_service_name.hpp"
#include "rclcpp/experimental/intra_process_manager.hpp"
#include "rclcpp/logging.hpp"
#include "rclcpp/node_interfaces/node_base_interface.hpp"
#include "rclcpp/qos_event.hpp"
#include "rclcpp/scope_exit.hpp"

#include "rmw/error_handling.h"
#include "rmw/rmw.h"
Expand Down Expand Up @@ -288,3 +292,86 @@ SubscriptionBase::exchange_in_use_by_wait_set_state(
}
throw std::runtime_error("given pointer_to_subscription_part does not match any part");
}

bool
SubscriptionBase::is_cft_supported() const
{
return rcl_subscription_is_cft_supported(subscription_handle_.get());
}

void
SubscriptionBase::set_cft_expression_parameters(
const std::string & filter_expression,
const std::vector<std::string> & expression_parameters)
{
rcl_allocator_t allocator = rcl_get_default_allocator();
rcutils_string_array_t parameters;
parameters = rcutils_get_zero_initialized_string_array();
rcutils_ret_t rcutils_ret = rcutils_string_array_init(
&parameters, expression_parameters.size(), &allocator);
if (RCUTILS_RET_OK != rcutils_ret) {
rclcpp::exceptions::throw_from_rcl_error(
RCL_RET_ERROR,
"failed to initialize string array for expression parameters");
}
RCLCPP_SCOPE_EXIT(
{
rcutils_ret_t rcutils_ret = rcutils_string_array_fini(&parameters);
if (RCUTILS_RET_OK != rcutils_ret) {
RCLCPP_ERROR(
rclcpp::get_node_logger(node_handle_.get()).get_child("rclcpp"),
"failed to finalize parameters: %s",
rcl_get_error_string().str);
rcl_reset_error();
}
});

for (size_t i = 0; i < expression_parameters.size(); ++i) {
parameters.data[i] = rcutils_strdup(expression_parameters[i].c_str(), allocator);
if (!parameters.data[i]) {
throw std::runtime_error("failed to allocate memory for expression parameters");
}
}
rcl_ret_t ret = rcl_subscription_set_cft_expression_parameters(
subscription_handle_.get(),
filter_expression.c_str(),
&parameters);

if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "failed to set cft expression parameters");
}
}

void
SubscriptionBase::get_cft_expression_parameters(
std::string & filter_expression,
std::vector<std::string> & expression_parameters) const
{
char * expression = NULL;
rcutils_string_array_t parameters;
parameters = rcutils_get_zero_initialized_string_array();
rcl_ret_t ret = rcl_subscription_get_cft_expression_parameters(
subscription_handle_.get(),
&expression,
&parameters);

if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "failed to get cft expression parameters");
}

filter_expression = expression;
for (size_t i = 0; i < parameters.size; ++i) {
expression_parameters.push_back(parameters.data[i]);
}

rcutils_allocator_t allocator = rcutils_get_default_allocator();
allocator.deallocate(expression, allocator.state);
rcutils_ret_t rcutils_ret = rcutils_string_array_fini(&parameters);
if (RCUTILS_RET_OK != rcutils_ret) {
RCLCPP_ERROR(
rclcpp::get_node_logger(node_handle_.get()).get_child("rclcpp"),
"failed to finalize parameters: %s",
rcl_get_error_string().str);
rcl_reset_error();
}
}
25 changes: 24 additions & 1 deletion rclcpp/src/rclcpp/time_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,28 @@ void TimeSource::attachNode(
return result;
});

const rclcpp::QoS qos = (
rclcpp::QoS(rclcpp::QoSInitialization::from_rmw(rmw_qos_profile_parameter_events))
);

rclcpp::SubscriptionOptions options;
// TODO(iuhilnehc-ynos) Currently, SQL grammar can support to filter array with
// "new_parameters[0].name.data MATCH 'use_sim_time' or
// new_parameters[1].name.data MATCH 'use_sim_time' or ..." on rmw_connextdds
// but it's impossible to know how many parameters this subscription will receive,
// so ignore setting parameter name for 'use_sim_time'.
// If SQL grammar can filter array with
// "new_parameters[].name.data MATCH 'use_sim_time'" or other non-number related statement,
// I'll update it in the future.
options.content_filter_options.filter_expression =
std::string("node MATCH '") + node_base_->get_fully_qualified_name() + "'";

// TODO(tfoote) use parameters interface not subscribe to events via topic ticketed #609
parameter_subscription_ = rclcpp::AsyncParametersClient::on_parameter_event(
node_topics_,
std::bind(&TimeSource::on_parameter_event, this, std::placeholders::_1));
std::bind(&TimeSource::on_parameter_event, this, std::placeholders::_1),
qos,
options);
}

void TimeSource::detachNode()
Expand Down Expand Up @@ -262,6 +280,11 @@ void TimeSource::on_parameter_event(const rcl_interfaces::msg::ParameterEvent::S
{
// Filter out events on 'use_sim_time' parameter instances in other nodes.
if (event->node != node_base_->get_fully_qualified_name()) {
if (parameter_subscription_->is_cft_supported()) {
RCLCPP_ERROR(
logger_, "this node '%s' should not get parameter event of another node '%s'",
node_base_->get_fully_qualified_name(), event->node.c_str());
}
return;
}
// Filter for only 'use_sim_time' being added or changed.
Expand Down
75 changes: 75 additions & 0 deletions rclcpp/test/rclcpp/test_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "../mocking_utils/patch.hpp"
#include "../utils/rclcpp_gtest_macros.hpp"

#include "test_msgs/msg/basic_types.hpp"
#include "test_msgs/msg/empty.hpp"

using namespace std::chrono_literals;
Expand Down Expand Up @@ -144,6 +145,44 @@ class SubscriptionClass
}
};

class TestCftSubscription : public ::testing::Test
{
public:
static void SetUpTestCase()
{
rclcpp::init(0, nullptr);
}

static void TearDownTestCase()
{
rclcpp::shutdown();
}

void SetUp()
{
node = std::make_shared<rclcpp::Node>("test_cft_subscription", "/ns");
rclcpp::SubscriptionOptionsBase options_base;
options_base.content_filter_options.filter_expression =
"int32_value = %0";
options_base.content_filter_options.expression_parameters = {"10"};
rclcpp::SubscriptionOptionsWithAllocator<std::allocator<void>> subscription_options(
options_base);
auto callback = [](std::shared_ptr<const test_msgs::msg::BasicTypes>) {};
sub = node->create_subscription<test_msgs::msg::BasicTypes>(
"topic", 10, callback, subscription_options);
}

void TearDown()
{
sub.reset();
node.reset();
}

protected:
rclcpp::Node::SharedPtr node;
rclcpp::Subscription<test_msgs::msg::BasicTypes>::SharedPtr sub;
};

/*
Testing subscription construction and destruction.
*/
Expand Down Expand Up @@ -440,6 +479,42 @@ TEST_F(TestSubscription, handle_loaned_message) {
EXPECT_NO_THROW(sub->handle_loaned_message(&msg, message_info));
}

TEST_F(TestCftSubscription, is_cft_supported) {
{
auto mock = mocking_utils::patch_and_return(
"lib:rclcpp", rcl_subscription_is_cft_supported, false);
EXPECT_FALSE(sub->is_cft_supported());
}

{
auto mock = mocking_utils::patch_and_return(
"lib:rclcpp", rcl_subscription_is_cft_supported, true);
EXPECT_TRUE(sub->is_cft_supported());
}
}

TEST_F(TestCftSubscription, get_cft_expression_parameters_error) {
auto mock = mocking_utils::patch_and_return(
"lib:rclcpp", rcl_subscription_get_cft_expression_parameters, RCL_RET_ERROR);

std::string filter_expression;
std::vector<std::string> expression_parameters;
EXPECT_THROW(
sub->get_cft_expression_parameters(filter_expression, expression_parameters),
rclcpp::exceptions::RCLError);
}

TEST_F(TestCftSubscription, set_cft_expression_parameters_error) {
auto mock = mocking_utils::patch_and_return(
"lib:rclcpp", rcl_subscription_set_cft_expression_parameters, RCL_RET_ERROR);

std::string filter_expression = "int32_value = %0";
std::string expression_parameter = "100";
EXPECT_THROW(
sub->set_cft_expression_parameters(filter_expression, {expression_parameter}),
rclcpp::exceptions::RCLError);
}

/*
Testing subscription with intraprocess enabled and invalid QoS
*/
Expand Down
Loading

0 comments on commit ba4ece3

Please sign in to comment.