From 03fa731d2345c41e5a4e436bf0730012ece96ce5 Mon Sep 17 00:00:00 2001 From: Chen Lihui Date: Mon, 4 Apr 2022 11:50:50 +0800 Subject: [PATCH] add content-filtered-topic interfaces (#1561) * to support a feature of content filtered topic Signed-off-by: Chen Lihui * update for checking memory allocated Signed-off-by: Chen Lihui * set expression parameter only if filter is valid Signed-off-by: Chen Lihui * add test Signed-off-by: Chen Lihui * use a default empty value for expresion parameters Signed-off-by: Chen Lihui * remove std_msgs dependency Signed-off-by: Chen Lihui * use new rcl interface Signed-off-by: Chen Lihui * remove unused include header files and fix unscrutify Signed-off-by: Chen Lihui * update comment Signed-off-by: Chen Lihui * update comment Signed-off-by: Chen Lihui * rename Signed-off-by: Chen Lihui * refactor test Signed-off-by: Chen Lihui * relate to `rcutils_string_array_t expression_parameters` changed in rmw Signed-off-by: Chen Lihui * remove the implementation temporary, add them with fallback in the feature 1. add DEFINE_CONTENT_FILTER with default(ON) to build content filter interfaces 2. user need a compile option(CONTENT_FILTER_ENABLE) to turn on the feature to set the filter or call the interfaces Signed-off-by: Chen Lihui * update comments and check the option to build content filter test Signed-off-by: Chen Lihui * add DDS content filter implementation Signed-off-by: Chen Lihui * address review Signed-off-by: Chen Lihui * rcl api changed Signed-off-by: Chen Lihui * increase the filter propagation time Signed-off-by: Chen Lihui * add rclcpp::ContentFilterOptions and use it as the return type of get_content_filter Signed-off-by: Chen Lihui * increase the maximun time to wait message event and content filter propagation Signed-off-by: Chen Lihui * cft member should be defined in the structure. to check the macro for interfaces is enough. Signed-off-by: Chen Lihui * set test timeout to 120 Signed-off-by: Chen Lihui * update doc Signed-off-by: Chen Lihui --- rclcpp/CMakeLists.txt | 5 + rclcpp/include/rclcpp/subscription_base.hpp | 37 +++ .../subscription_content_filter_options.hpp | 38 +++ .../include/rclcpp/subscription_options.hpp | 19 ++ rclcpp/include/rclcpp/utilities.hpp | 9 + rclcpp/src/rclcpp/subscription_base.cpp | 92 ++++++ rclcpp/src/rclcpp/utilities.cpp | 13 + rclcpp/test/rclcpp/CMakeLists.txt | 20 ++ .../test_subscription_content_filter.cpp | 312 ++++++++++++++++++ 9 files changed, 545 insertions(+) create mode 100644 rclcpp/include/rclcpp/subscription_content_filter_options.hpp create mode 100644 rclcpp/test/rclcpp/test_subscription_content_filter.cpp diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index 9572f422a8..de424342ed 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -2,6 +2,11 @@ cmake_minimum_required(VERSION 3.12) project(rclcpp) +option(DEFINE_CONTENT_FILTER "Content filter feature support" ON) +if(DEFINE_CONTENT_FILTER) + add_definitions(-DCONTENT_FILTER_ENABLE) +endif() + find_package(Threads REQUIRED) find_package(ament_cmake_ros REQUIRED) diff --git a/rclcpp/include/rclcpp/subscription_base.hpp b/rclcpp/include/rclcpp/subscription_base.hpp index 26650cd121..0ad098b027 100644 --- a/rclcpp/include/rclcpp/subscription_base.hpp +++ b/rclcpp/include/rclcpp/subscription_base.hpp @@ -39,6 +39,7 @@ #include "rclcpp/qos.hpp" #include "rclcpp/qos_event.hpp" #include "rclcpp/serialized_message.hpp" +#include "rclcpp/subscription_content_filter_options.hpp" #include "rclcpp/type_support_decl.hpp" #include "rclcpp/visibility_control.hpp" @@ -491,6 +492,42 @@ class SubscriptionBase : public std::enable_shared_from_this event_handlers_[event_type]->clear_on_ready_callback(); } +#ifdef CONTENT_FILTER_ENABLE + /// Check if content filtered topic feature of the subscription instance is enabled. + /** + * \return boolean flag indicating if the content filtered topic of this subscription is enabled. + */ + RCLCPP_PUBLIC + bool + is_cft_enabled() const; + + /// Set the filter expression and expression parameters for the subscription. + /** + * \param[in] filter_expression A filter expression to set. + * \sa ContentFilterOptions::filter_expression + * An empty string ("") will clear the content filter setting of the subscription. + * \param[in] expression_parameters Array of expression parameters to set. + * \sa ContentFilterOptions::expression_parameters + * \throws RCLBadAlloc if memory cannot be allocated + * \throws RCLError if an unexpect error occurs + */ + RCLCPP_PUBLIC + void + set_content_filter( + const std::string & filter_expression, + const std::vector & expression_parameters = {}); + + /// Get the filter expression and expression parameters for the subscription. + /** + * \return rclcpp::ContentFilterOptions The content filter options to get. + * \throws RCLBadAlloc if memory cannot be allocated + * \throws RCLError if an unexpect error occurs + */ + RCLCPP_PUBLIC + rclcpp::ContentFilterOptions + get_content_filter() const; +#endif // CONTENT_FILTER_ENABLE + protected: template void diff --git a/rclcpp/include/rclcpp/subscription_content_filter_options.hpp b/rclcpp/include/rclcpp/subscription_content_filter_options.hpp new file mode 100644 index 0000000000..8de034ddf5 --- /dev/null +++ b/rclcpp/include/rclcpp/subscription_content_filter_options.hpp @@ -0,0 +1,38 @@ +// Copyright 2022 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef RCLCPP__SUBSCRIPTION_CONTENT_FILTER_OPTIONS_HPP_ +#define RCLCPP__SUBSCRIPTION_CONTENT_FILTER_OPTIONS_HPP_ + +#include +#include + +namespace rclcpp +{ + +/// Options to configure content filtered topic in the subscription. +struct ContentFilterOptions +{ + /// Filter expression is similar to the WHERE part of an SQL clause. + std::string filter_expression; + /** + * Expression parameters is the tokens placeholder ‘parameters’ (i.e., "%n" tokens begin from 0) + * in the filter_expression. The maximum expression_parameters size is 100. + */ + std::vector expression_parameters; +}; + +} // namespace rclcpp + +#endif // RCLCPP__SUBSCRIPTION_CONTENT_FILTER_OPTIONS_HPP_ diff --git a/rclcpp/include/rclcpp/subscription_options.hpp b/rclcpp/include/rclcpp/subscription_options.hpp index d6d2d4c60d..da81b8ffb0 100644 --- a/rclcpp/include/rclcpp/subscription_options.hpp +++ b/rclcpp/include/rclcpp/subscription_options.hpp @@ -28,6 +28,7 @@ #include "rclcpp/qos.hpp" #include "rclcpp/qos_event.hpp" #include "rclcpp/qos_overriding_options.hpp" +#include "rclcpp/subscription_content_filter_options.hpp" #include "rclcpp/topic_statistics_state.hpp" #include "rclcpp/visibility_control.hpp" @@ -81,6 +82,8 @@ struct SubscriptionOptionsBase TopicStatisticsOptions topic_stats_options; QosOverridingOptions qos_overriding_options; + + ContentFilterOptions content_filter_options; }; /// Structure containing optional configuration for Subscriptions. @@ -123,6 +126,22 @@ struct SubscriptionOptionsWithAllocator : public SubscriptionOptionsBase rmw_implementation_payload->modify_rmw_subscription_options(result.rmw_subscription_options); } +#ifdef CONTENT_FILTER_ENABLE + // Copy content_filter_options into rcl_subscription_options. + if (!content_filter_options.filter_expression.empty()) { + std::vector cstrings = + get_c_vector_string(content_filter_options.expression_parameters); + rcl_ret_t ret = rcl_subscription_options_set_content_filter_options( + get_c_string(content_filter_options.filter_expression), + cstrings.size(), + cstrings.data(), + &result); + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error( + ret, "failed to set content_filter_options"); + } + } +#endif // CONTENT_FILTER_ENABLE return result; } diff --git a/rclcpp/include/rclcpp/utilities.hpp b/rclcpp/include/rclcpp/utilities.hpp index fa48a010c4..5274cba33a 100644 --- a/rclcpp/include/rclcpp/utilities.hpp +++ b/rclcpp/include/rclcpp/utilities.hpp @@ -321,6 +321,15 @@ RCLCPP_PUBLIC const char * get_c_string(const std::string & string_in); +/// Return the std::vector of C string from the given std::vector. +/** + * \param[in] strings_in is a std::vector of std::string + * \return the std::vector of C string from the std::vector + */ +RCLCPP_PUBLIC +std::vector +get_c_vector_string(const std::vector & strings_in); + } // namespace rclcpp #endif // RCLCPP__UTILITIES_HPP_ diff --git a/rclcpp/src/rclcpp/subscription_base.cpp b/rclcpp/src/rclcpp/subscription_base.cpp index 12841fe6b6..dfe3c30ead 100644 --- a/rclcpp/src/rclcpp/subscription_base.cpp +++ b/rclcpp/src/rclcpp/subscription_base.cpp @@ -20,6 +20,8 @@ #include #include +#include "rcpputils/scope_exit.hpp" + #include "rclcpp/exceptions.hpp" #include "rclcpp/expand_topic_or_service_name.hpp" #include "rclcpp/experimental/intra_process_manager.hpp" @@ -354,3 +356,93 @@ SubscriptionBase::set_on_new_message_callback( throw_from_rcl_error(ret, "failed to set the on new message callback for subscription"); } } + +#ifdef CONTENT_FILTER_ENABLE +bool +SubscriptionBase::is_cft_enabled() const +{ + return rcl_subscription_is_cft_enabled(subscription_handle_.get()); +} + +void +SubscriptionBase::set_content_filter( + const std::string & filter_expression, + const std::vector & expression_parameters) +{ + rcl_subscription_content_filter_options_t options = + rcl_get_zero_initialized_subscription_content_filter_options(); + + std::vector cstrings = + get_c_vector_string(expression_parameters); + rcl_ret_t ret = rcl_subscription_content_filter_options_init( + subscription_handle_.get(), + get_c_string(filter_expression), + cstrings.size(), + cstrings.data(), + &options); + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error( + ret, "failed to init subscription content_filtered_topic option"); + } + RCPPUTILS_SCOPE_EXIT( + { + rcl_ret_t ret = rcl_subscription_content_filter_options_fini( + subscription_handle_.get(), &options); + if (RCL_RET_OK != ret) { + RCLCPP_ERROR( + rclcpp::get_logger("rclcpp"), + "Failed to fini subscription content_filtered_topic option: %s", + rcl_get_error_string().str); + rcl_reset_error(); + } + }); + + ret = rcl_subscription_set_content_filter( + subscription_handle_.get(), + &options); + + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error(ret, "failed to set cft expression parameters"); + } +} + +rclcpp::ContentFilterOptions +SubscriptionBase::get_content_filter() const +{ + rclcpp::ContentFilterOptions ret_options; + rcl_subscription_content_filter_options_t options = + rcl_get_zero_initialized_subscription_content_filter_options(); + + rcl_ret_t ret = rcl_subscription_get_content_filter( + subscription_handle_.get(), + &options); + + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error(ret, "failed to get cft expression parameters"); + } + + RCPPUTILS_SCOPE_EXIT( + { + rcl_ret_t ret = rcl_subscription_content_filter_options_fini( + subscription_handle_.get(), &options); + if (RCL_RET_OK != ret) { + RCLCPP_ERROR( + rclcpp::get_logger("rclcpp"), + "Failed to fini subscription content_filtered_topic option: %s", + rcl_get_error_string().str); + rcl_reset_error(); + } + }); + + rmw_subscription_content_filter_options_t & content_filter_options = + options.rmw_subscription_content_filter_options; + ret_options.filter_expression = content_filter_options.filter_expression; + + for (size_t i = 0; i < content_filter_options.expression_parameters.size; ++i) { + ret_options.expression_parameters.push_back( + content_filter_options.expression_parameters.data[i]); + } + + return ret_options; +} +#endif // CONTENT_FILTER_ENABLE diff --git a/rclcpp/src/rclcpp/utilities.cpp b/rclcpp/src/rclcpp/utilities.cpp index a6ead85c03..f300060010 100644 --- a/rclcpp/src/rclcpp/utilities.cpp +++ b/rclcpp/src/rclcpp/utilities.cpp @@ -214,4 +214,17 @@ get_c_string(const std::string & string_in) return string_in.c_str(); } +std::vector +get_c_vector_string(const std::vector & strings_in) +{ + std::vector cstrings; + cstrings.reserve(strings_in.size()); + + for (size_t i = 0; i < strings_in.size(); ++i) { + cstrings.push_back(strings_in[i].c_str()); + } + + return cstrings; +} + } // namespace rclcpp diff --git a/rclcpp/test/rclcpp/CMakeLists.txt b/rclcpp/test/rclcpp/CMakeLists.txt index 331c39fb27..2d7e9a3001 100644 --- a/rclcpp/test/rclcpp/CMakeLists.txt +++ b/rclcpp/test/rclcpp/CMakeLists.txt @@ -733,3 +733,23 @@ ament_add_gtest(test_graph_listener test_graph_listener.cpp) if(TARGET test_graph_listener) target_link_libraries(test_graph_listener ${PROJECT_NAME} mimick) endif() + +if(DEFINE_CONTENT_FILTER) + function(test_subscription_content_filter_for_rmw_implementation) + set(rmw_implementation_env_var RMW_IMPLEMENTATION=${rmw_implementation}) + ament_add_gmock(test_subscription_content_filter${target_suffix} + test_subscription_content_filter.cpp + ENV ${rmw_implementation_env_var} + TIMEOUT 120 + ) + if(TARGET test_subscription_content_filter${target_suffix}) + target_link_libraries(test_subscription_content_filter${target_suffix} ${PROJECT_NAME} mimick) + ament_target_dependencies(test_subscription_content_filter${target_suffix} + "rcpputils" + "rosidl_typesupport_cpp" + "test_msgs" + ) + endif() + endfunction() + call_for_each_rmw_implementation(test_subscription_content_filter_for_rmw_implementation) +endif() diff --git a/rclcpp/test/rclcpp/test_subscription_content_filter.cpp b/rclcpp/test/rclcpp/test_subscription_content_filter.cpp new file mode 100644 index 0000000000..942aa1274e --- /dev/null +++ b/rclcpp/test/rclcpp/test_subscription_content_filter.cpp @@ -0,0 +1,312 @@ +// Copyright 2022 Open Source Robotics Foundation, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include +#include +#include +#include + +#include "rclcpp/exceptions.hpp" +#include "rclcpp/rclcpp.hpp" +#include "rclcpp/wait_for_message.hpp" + +#include "../mocking_utils/patch.hpp" +#include "../utils/rclcpp_gtest_macros.hpp" + +#include "test_msgs/msg/basic_types.hpp" + +#ifdef RMW_IMPLEMENTATION +# define CLASSNAME_(NAME, SUFFIX) NAME ## __ ## SUFFIX +# define CLASSNAME(NAME, SUFFIX) CLASSNAME_(NAME, SUFFIX) +#else +# define CLASSNAME(NAME, SUFFIX) NAME +#endif + +class CLASSNAME (TestContentFilterSubscription, RMW_IMPLEMENTATION) : public ::testing::Test +{ +public: + static void SetUpTestCase() + { + rclcpp::init(0, nullptr); + } + + static void TearDownTestCase() + { + rclcpp::shutdown(); + } + + void SetUp() + { + node = std::make_shared("test_content_filter_node", "/ns"); + context = node->get_node_options().context(); + qos.reliable().transient_local(); + + auto options = rclcpp::SubscriptionOptions(); + options.content_filter_options.filter_expression = filter_expression_init; + options.content_filter_options.expression_parameters = expression_parameters_1; + + auto callback = [](std::shared_ptr) {}; + sub = node->create_subscription( + "content_filter_topic", qos, callback, options); + } + + void TearDown() + { + sub.reset(); + node.reset(); + } + + template + bool wait_for(const Condition & condition, const Duration & timeout) + { + using clock = std::chrono::system_clock; + auto start = clock::now(); + while (!condition()) { + if ((clock::now() - start) > timeout) { + return false; + } + rclcpp::spin_some(node); + } + return true; + } + +protected: + rclcpp::Node::SharedPtr node; + rclcpp::Context::SharedPtr context; + rclcpp::QoS qos{rclcpp::KeepLast{10}}; + rclcpp::Subscription::SharedPtr sub; + + std::string filter_expression_init = "int32_value = %0"; + std::vector expression_parameters_1 = {"3"}; + std::vector expression_parameters_2 = {"4"}; +}; + +bool operator==(const test_msgs::msg::BasicTypes & m1, const test_msgs::msg::BasicTypes & m2) +{ + return m1.bool_value == m2.bool_value && + m1.byte_value == m2.byte_value && + m1.char_value == m2.char_value && + m1.float32_value == m2.float32_value && + m1.float64_value == m2.float64_value && + m1.int8_value == m2.int8_value && + m1.uint8_value == m2.uint8_value && + m1.int16_value == m2.int16_value && + m1.uint16_value == m2.uint16_value && + m1.int32_value == m2.int32_value && + m1.uint32_value == m2.uint32_value && + m1.int64_value == m2.int64_value && + m1.uint64_value == m2.uint64_value; +} + +TEST_F(CLASSNAME(TestContentFilterSubscription, RMW_IMPLEMENTATION), is_cft_enabled) { + { + auto mock = mocking_utils::patch_and_return( + "lib:rclcpp", rcl_subscription_is_cft_enabled, false); + EXPECT_FALSE(sub->is_cft_enabled()); + } + + { + auto mock = mocking_utils::patch_and_return( + "lib:rclcpp", rcl_subscription_is_cft_enabled, true); + EXPECT_TRUE(sub->is_cft_enabled()); + } +} + +TEST_F(CLASSNAME(TestContentFilterSubscription, RMW_IMPLEMENTATION), get_content_filter_error) { + auto mock = mocking_utils::patch_and_return( + "lib:rclcpp", rcl_subscription_get_content_filter, RCL_RET_ERROR); + + rclcpp::ContentFilterOptions options; + EXPECT_THROW( + options = sub->get_content_filter(), + rclcpp::exceptions::RCLError); +} + +TEST_F(CLASSNAME(TestContentFilterSubscription, RMW_IMPLEMENTATION), set_content_filter_error) { + auto mock = mocking_utils::patch_and_return( + "lib:rclcpp", rcl_subscription_set_content_filter, RCL_RET_ERROR); + + std::string filter_expression = "int32_value = %0"; + std::string expression_parameter = "100"; + EXPECT_THROW( + sub->set_content_filter(filter_expression, {expression_parameter}), + rclcpp::exceptions::RCLError); +} + +TEST_F(CLASSNAME(TestContentFilterSubscription, RMW_IMPLEMENTATION), get_content_filter) { + rclcpp::ContentFilterOptions options; + + if (sub->is_cft_enabled()) { + EXPECT_NO_THROW( + options = sub->get_content_filter()); + + EXPECT_EQ(options.filter_expression, filter_expression_init); + EXPECT_EQ(options.expression_parameters, expression_parameters_1); + } else { + EXPECT_THROW( + options = sub->get_content_filter(), + rclcpp::exceptions::RCLError); + } +} + +TEST_F(CLASSNAME(TestContentFilterSubscription, RMW_IMPLEMENTATION), set_content_filter) { + if (sub->is_cft_enabled()) { + EXPECT_NO_THROW( + sub->set_content_filter(filter_expression_init, expression_parameters_2)); + } else { + EXPECT_THROW( + sub->set_content_filter(filter_expression_init, expression_parameters_2), + rclcpp::exceptions::RCLError); + } +} + +TEST_F(CLASSNAME(TestContentFilterSubscription, RMW_IMPLEMENTATION), content_filter_get_begin) { + using namespace std::chrono_literals; + { + test_msgs::msg::BasicTypes msg; + rclcpp::MessageInfo msg_info; + EXPECT_FALSE(sub->take(msg, msg_info)); + } + { + rclcpp::PublisherOptions po; + auto pub = node->create_publisher("content_filter_topic", qos, po); + + auto connected = [pub, sub = this->sub]() -> bool { + return pub->get_subscription_count() && sub->get_publisher_count(); + }; + ASSERT_TRUE(wait_for(connected, 10s)); + + test_msgs::msg::BasicTypes original_message; + original_message.int32_value = 3; + pub->publish(original_message); + + test_msgs::msg::BasicTypes output_message; + bool receive = wait_for_message(output_message, sub, context, 10s); + EXPECT_TRUE(receive); + EXPECT_EQ(original_message, output_message); + + if (sub->is_cft_enabled()) { + EXPECT_NO_THROW( + sub->set_content_filter(filter_expression_init, expression_parameters_2)); + // waiting to allow for filter propagation + std::this_thread::sleep_for(std::chrono::seconds(10)); + + test_msgs::msg::BasicTypes original_message; + original_message.int32_value = 3; + pub->publish(original_message); + + test_msgs::msg::BasicTypes output_message; + bool receive = wait_for_message(output_message, sub, context, 10s); + EXPECT_FALSE(receive); + } + } +} + +TEST_F(CLASSNAME(TestContentFilterSubscription, RMW_IMPLEMENTATION), content_filter_get_later) { + using namespace std::chrono_literals; + { + test_msgs::msg::BasicTypes msg; + rclcpp::MessageInfo msg_info; + EXPECT_FALSE(sub->take(msg, msg_info)); + } + { + rclcpp::PublisherOptions po; + auto pub = node->create_publisher("content_filter_topic", qos, po); + + auto connected = [pub, sub = this->sub]() -> bool { + return pub->get_subscription_count() && sub->get_publisher_count(); + }; + ASSERT_TRUE(wait_for(connected, 10s)); + + test_msgs::msg::BasicTypes original_message; + original_message.int32_value = 4; + pub->publish(original_message); + + test_msgs::msg::BasicTypes output_message; + bool receive = wait_for_message(output_message, sub, context, 10s); + if (sub->is_cft_enabled()) { + EXPECT_FALSE(receive); + } else { + EXPECT_TRUE(receive); + EXPECT_EQ(original_message, output_message); + } + + if (sub->is_cft_enabled()) { + EXPECT_NO_THROW( + sub->set_content_filter(filter_expression_init, expression_parameters_2)); + // waiting to allow for filter propagation + std::this_thread::sleep_for(std::chrono::seconds(10)); + + test_msgs::msg::BasicTypes original_message; + original_message.int32_value = 4; + pub->publish(original_message); + + test_msgs::msg::BasicTypes output_message; + bool receive = wait_for_message(output_message, sub, context, 10s); + EXPECT_TRUE(receive); + EXPECT_EQ(original_message, output_message); + } + } +} + +TEST_F(CLASSNAME(TestContentFilterSubscription, RMW_IMPLEMENTATION), content_filter_reset) { + using namespace std::chrono_literals; + { + test_msgs::msg::BasicTypes msg; + rclcpp::MessageInfo msg_info; + EXPECT_FALSE(sub->take(msg, msg_info)); + } + { + rclcpp::PublisherOptions po; + auto pub = node->create_publisher("content_filter_topic", qos, po); + + auto connected = [pub, sub = this->sub]() -> bool { + return pub->get_subscription_count() && sub->get_publisher_count(); + }; + ASSERT_TRUE(wait_for(connected, 10s)); + + test_msgs::msg::BasicTypes original_message; + original_message.int32_value = 4; + pub->publish(original_message); + + test_msgs::msg::BasicTypes output_message; + bool receive = wait_for_message(output_message, sub, context, 10s); + if (sub->is_cft_enabled()) { + EXPECT_FALSE(receive); + } else { + EXPECT_TRUE(receive); + EXPECT_EQ(original_message, output_message); + } + + if (sub->is_cft_enabled()) { + EXPECT_NO_THROW( + sub->set_content_filter("")); + // waiting to allow for filter propagation + std::this_thread::sleep_for(std::chrono::seconds(10)); + + test_msgs::msg::BasicTypes original_message; + original_message.int32_value = 4; + pub->publish(original_message); + + test_msgs::msg::BasicTypes output_message; + bool receive = wait_for_message(output_message, sub, context, 10s); + EXPECT_TRUE(receive); + EXPECT_EQ(original_message, output_message); + } + } +}