Skip to content

Commit

Permalink
add content-filtered-topic interfaces (#1561)
Browse files Browse the repository at this point in the history
* to support a feature of content filtered topic

Signed-off-by: Chen Lihui <[email protected]>

* update for checking memory allocated

Signed-off-by: Chen Lihui <[email protected]>

* set expression parameter only if filter is valid

Signed-off-by: Chen Lihui <[email protected]>

* add test

Signed-off-by: Chen Lihui <[email protected]>

* use a default empty value for expresion parameters

Signed-off-by: Chen Lihui <[email protected]>

* remove std_msgs dependency

Signed-off-by: Chen Lihui <[email protected]>

* use new rcl interface

Signed-off-by: Chen Lihui <[email protected]>

* remove unused include header files and fix unscrutify

Signed-off-by: Chen Lihui <[email protected]>

* update comment

Signed-off-by: Chen Lihui <[email protected]>

* update comment

Signed-off-by: Chen Lihui <[email protected]>

* rename

Signed-off-by: Chen Lihui <[email protected]>

* refactor test

Signed-off-by: Chen Lihui <[email protected]>

* relate to `rcutils_string_array_t expression_parameters` changed in rmw

Signed-off-by: Chen Lihui <[email protected]>

* remove the implementation temporary, add them with fallback in the feature

1. add DEFINE_CONTENT_FILTER with default(ON) to build content filter interfaces
2. user need a compile option(CONTENT_FILTER_ENABLE) to turn on the feature
to set the filter or call the interfaces

Signed-off-by: Chen Lihui <[email protected]>

* update comments and check the option to build content filter test

Signed-off-by: Chen Lihui <[email protected]>

* add DDS content filter implementation

Signed-off-by: Chen Lihui <[email protected]>

* address review

Signed-off-by: Chen Lihui <[email protected]>

* rcl api changed

Signed-off-by: Chen Lihui <[email protected]>

* increase the filter propagation time

Signed-off-by: Chen Lihui <[email protected]>

* add rclcpp::ContentFilterOptions and use it as the return type of get_content_filter

Signed-off-by: Chen Lihui <[email protected]>

* increase the maximun time to wait message event and content filter propagation

Signed-off-by: Chen Lihui <[email protected]>

* cft member should be defined in the structure.

to check the macro for interfaces is enough.

Signed-off-by: Chen Lihui <[email protected]>

* set test timeout to 120

Signed-off-by: Chen Lihui <[email protected]>

* update doc

Signed-off-by: Chen Lihui <[email protected]>
  • Loading branch information
Chen Lihui authored Apr 4, 2022
1 parent 0699aeb commit 03fa731
Show file tree
Hide file tree
Showing 9 changed files with 545 additions and 0 deletions.
5 changes: 5 additions & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ cmake_minimum_required(VERSION 3.12)

project(rclcpp)

option(DEFINE_CONTENT_FILTER "Content filter feature support" ON)
if(DEFINE_CONTENT_FILTER)
add_definitions(-DCONTENT_FILTER_ENABLE)
endif()

find_package(Threads REQUIRED)

find_package(ament_cmake_ros REQUIRED)
Expand Down
37 changes: 37 additions & 0 deletions rclcpp/include/rclcpp/subscription_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "rclcpp/qos.hpp"
#include "rclcpp/qos_event.hpp"
#include "rclcpp/serialized_message.hpp"
#include "rclcpp/subscription_content_filter_options.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/visibility_control.hpp"

Expand Down Expand Up @@ -491,6 +492,42 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
event_handlers_[event_type]->clear_on_ready_callback();
}

#ifdef CONTENT_FILTER_ENABLE
/// Check if content filtered topic feature of the subscription instance is enabled.
/**
* \return boolean flag indicating if the content filtered topic of this subscription is enabled.
*/
RCLCPP_PUBLIC
bool
is_cft_enabled() const;

/// Set the filter expression and expression parameters for the subscription.
/**
* \param[in] filter_expression A filter expression to set.
* \sa ContentFilterOptions::filter_expression
* An empty string ("") will clear the content filter setting of the subscription.
* \param[in] expression_parameters Array of expression parameters to set.
* \sa ContentFilterOptions::expression_parameters
* \throws RCLBadAlloc if memory cannot be allocated
* \throws RCLError if an unexpect error occurs
*/
RCLCPP_PUBLIC
void
set_content_filter(
const std::string & filter_expression,
const std::vector<std::string> & expression_parameters = {});

/// Get the filter expression and expression parameters for the subscription.
/**
* \return rclcpp::ContentFilterOptions The content filter options to get.
* \throws RCLBadAlloc if memory cannot be allocated
* \throws RCLError if an unexpect error occurs
*/
RCLCPP_PUBLIC
rclcpp::ContentFilterOptions
get_content_filter() const;
#endif // CONTENT_FILTER_ENABLE

protected:
template<typename EventCallbackT>
void
Expand Down
38 changes: 38 additions & 0 deletions rclcpp/include/rclcpp/subscription_content_filter_options.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Copyright 2022 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__SUBSCRIPTION_CONTENT_FILTER_OPTIONS_HPP_
#define RCLCPP__SUBSCRIPTION_CONTENT_FILTER_OPTIONS_HPP_

#include <string>
#include <vector>

namespace rclcpp
{

/// Options to configure content filtered topic in the subscription.
struct ContentFilterOptions
{
/// Filter expression is similar to the WHERE part of an SQL clause.
std::string filter_expression;
/**
* Expression parameters is the tokens placeholder ‘parameters’ (i.e., "%n" tokens begin from 0)
* in the filter_expression. The maximum expression_parameters size is 100.
*/
std::vector<std::string> expression_parameters;
};

} // namespace rclcpp

#endif // RCLCPP__SUBSCRIPTION_CONTENT_FILTER_OPTIONS_HPP_
19 changes: 19 additions & 0 deletions rclcpp/include/rclcpp/subscription_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "rclcpp/qos.hpp"
#include "rclcpp/qos_event.hpp"
#include "rclcpp/qos_overriding_options.hpp"
#include "rclcpp/subscription_content_filter_options.hpp"
#include "rclcpp/topic_statistics_state.hpp"
#include "rclcpp/visibility_control.hpp"

Expand Down Expand Up @@ -81,6 +82,8 @@ struct SubscriptionOptionsBase
TopicStatisticsOptions topic_stats_options;

QosOverridingOptions qos_overriding_options;

ContentFilterOptions content_filter_options;
};

/// Structure containing optional configuration for Subscriptions.
Expand Down Expand Up @@ -123,6 +126,22 @@ struct SubscriptionOptionsWithAllocator : public SubscriptionOptionsBase
rmw_implementation_payload->modify_rmw_subscription_options(result.rmw_subscription_options);
}

#ifdef CONTENT_FILTER_ENABLE
// Copy content_filter_options into rcl_subscription_options.
if (!content_filter_options.filter_expression.empty()) {
std::vector<const char *> cstrings =
get_c_vector_string(content_filter_options.expression_parameters);
rcl_ret_t ret = rcl_subscription_options_set_content_filter_options(
get_c_string(content_filter_options.filter_expression),
cstrings.size(),
cstrings.data(),
&result);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(
ret, "failed to set content_filter_options");
}
}
#endif // CONTENT_FILTER_ENABLE
return result;
}

Expand Down
9 changes: 9 additions & 0 deletions rclcpp/include/rclcpp/utilities.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,15 @@ RCLCPP_PUBLIC
const char *
get_c_string(const std::string & string_in);

/// Return the std::vector of C string from the given std::vector<std::string>.
/**
* \param[in] strings_in is a std::vector of std::string
* \return the std::vector of C string from the std::vector<std::string>
*/
RCLCPP_PUBLIC
std::vector<const char *>
get_c_vector_string(const std::vector<std::string> & strings_in);

} // namespace rclcpp

#endif // RCLCPP__UTILITIES_HPP_
92 changes: 92 additions & 0 deletions rclcpp/src/rclcpp/subscription_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
#include <unordered_map>
#include <vector>

#include "rcpputils/scope_exit.hpp"

#include "rclcpp/exceptions.hpp"
#include "rclcpp/expand_topic_or_service_name.hpp"
#include "rclcpp/experimental/intra_process_manager.hpp"
Expand Down Expand Up @@ -354,3 +356,93 @@ SubscriptionBase::set_on_new_message_callback(
throw_from_rcl_error(ret, "failed to set the on new message callback for subscription");
}
}

#ifdef CONTENT_FILTER_ENABLE
bool
SubscriptionBase::is_cft_enabled() const
{
return rcl_subscription_is_cft_enabled(subscription_handle_.get());
}

void
SubscriptionBase::set_content_filter(
const std::string & filter_expression,
const std::vector<std::string> & expression_parameters)
{
rcl_subscription_content_filter_options_t options =
rcl_get_zero_initialized_subscription_content_filter_options();

std::vector<const char *> cstrings =
get_c_vector_string(expression_parameters);
rcl_ret_t ret = rcl_subscription_content_filter_options_init(
subscription_handle_.get(),
get_c_string(filter_expression),
cstrings.size(),
cstrings.data(),
&options);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(
ret, "failed to init subscription content_filtered_topic option");
}
RCPPUTILS_SCOPE_EXIT(
{
rcl_ret_t ret = rcl_subscription_content_filter_options_fini(
subscription_handle_.get(), &options);
if (RCL_RET_OK != ret) {
RCLCPP_ERROR(
rclcpp::get_logger("rclcpp"),
"Failed to fini subscription content_filtered_topic option: %s",
rcl_get_error_string().str);
rcl_reset_error();
}
});

ret = rcl_subscription_set_content_filter(
subscription_handle_.get(),
&options);

if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "failed to set cft expression parameters");
}
}

rclcpp::ContentFilterOptions
SubscriptionBase::get_content_filter() const
{
rclcpp::ContentFilterOptions ret_options;
rcl_subscription_content_filter_options_t options =
rcl_get_zero_initialized_subscription_content_filter_options();

rcl_ret_t ret = rcl_subscription_get_content_filter(
subscription_handle_.get(),
&options);

if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "failed to get cft expression parameters");
}

RCPPUTILS_SCOPE_EXIT(
{
rcl_ret_t ret = rcl_subscription_content_filter_options_fini(
subscription_handle_.get(), &options);
if (RCL_RET_OK != ret) {
RCLCPP_ERROR(
rclcpp::get_logger("rclcpp"),
"Failed to fini subscription content_filtered_topic option: %s",
rcl_get_error_string().str);
rcl_reset_error();
}
});

rmw_subscription_content_filter_options_t & content_filter_options =
options.rmw_subscription_content_filter_options;
ret_options.filter_expression = content_filter_options.filter_expression;

for (size_t i = 0; i < content_filter_options.expression_parameters.size; ++i) {
ret_options.expression_parameters.push_back(
content_filter_options.expression_parameters.data[i]);
}

return ret_options;
}
#endif // CONTENT_FILTER_ENABLE
13 changes: 13 additions & 0 deletions rclcpp/src/rclcpp/utilities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -214,4 +214,17 @@ get_c_string(const std::string & string_in)
return string_in.c_str();
}

std::vector<const char *>
get_c_vector_string(const std::vector<std::string> & strings_in)
{
std::vector<const char *> cstrings;
cstrings.reserve(strings_in.size());

for (size_t i = 0; i < strings_in.size(); ++i) {
cstrings.push_back(strings_in[i].c_str());
}

return cstrings;
}

} // namespace rclcpp
20 changes: 20 additions & 0 deletions rclcpp/test/rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -733,3 +733,23 @@ ament_add_gtest(test_graph_listener test_graph_listener.cpp)
if(TARGET test_graph_listener)
target_link_libraries(test_graph_listener ${PROJECT_NAME} mimick)
endif()

if(DEFINE_CONTENT_FILTER)
function(test_subscription_content_filter_for_rmw_implementation)
set(rmw_implementation_env_var RMW_IMPLEMENTATION=${rmw_implementation})
ament_add_gmock(test_subscription_content_filter${target_suffix}
test_subscription_content_filter.cpp
ENV ${rmw_implementation_env_var}
TIMEOUT 120
)
if(TARGET test_subscription_content_filter${target_suffix})
target_link_libraries(test_subscription_content_filter${target_suffix} ${PROJECT_NAME} mimick)
ament_target_dependencies(test_subscription_content_filter${target_suffix}
"rcpputils"
"rosidl_typesupport_cpp"
"test_msgs"
)
endif()
endfunction()
call_for_each_rmw_implementation(test_subscription_content_filter_for_rmw_implementation)
endif()
Loading

0 comments on commit 03fa731

Please sign in to comment.