From aade9eaea785d24a59d6c4eb59ef90c1fea57378 Mon Sep 17 00:00:00 2001 From: Juanjo Garcia Date: Fri, 24 Jan 2025 12:55:42 +0100 Subject: [PATCH 1/7] Refs #22648: Regression test Signed-off-by: Juanjo Garcia --- .../common/DDSBlackboxTestsListeners.cpp | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/test/blackbox/common/DDSBlackboxTestsListeners.cpp b/test/blackbox/common/DDSBlackboxTestsListeners.cpp index 1ddab3ed6be..b36e0715ada 100644 --- a/test/blackbox/common/DDSBlackboxTestsListeners.cpp +++ b/test/blackbox/common/DDSBlackboxTestsListeners.cpp @@ -3436,6 +3436,56 @@ TEST(DDSStatus, keyed_reliable_positive_acks_disabled_on_unack_sample_removed) delete dummy_data; } +/*¡ +* Regression Test for 22648: on_unacknowledged_sample_removed callback is called when best effort writer with keep all +* history is used, when the history was full but before max_blocking_time a sample was acknowledged, as is_acked was +* checked before the waiting time, and is not re-checked. This should not happen. +*/ + +TEST(DDSStatus, reliable_keep_all_unack_sample_removed_call) +{ + PubSubWriter writer(TEST_TOPIC_NAME); + PubSubReader reader(TEST_TOPIC_NAME); + + writer.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS) + .durability_kind(eprosima::fastdds::dds::TRANSIENT_LOCAL_DURABILITY_QOS) + .history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS) + .history_depth(1) + .init(); + ASSERT_TRUE(writer.isInitialized()); + + reader.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS) + .durability_kind(eprosima::fastdds::dds::TRANSIENT_LOCAL_DURABILITY_QOS) + .init(); + ASSERT_TRUE(reader.isInitialized()); + + // Wait for discovery + writer.wait_discovery(); + reader.wait_discovery(); + + auto data = default_helloworld_data_generator(); + + bool firstOneSent = false; + + for (auto sample : data) + { + reader.stopReception(); + writer.send_sample(sample); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + if (firstOneSent) + { + reader.startReception(data); + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + else + { + firstOneSent = true; + } + } + + EXPECT_EQ(writer.times_unack_sample_removed(), 0u); +} + /*! * Test that checks with a writer of each type that having the same listener attached, the notified writer in the * callback is the corresponding writer that has removed a sample unacknowledged. From 2ae35ed566e689ba6ed98acfb5d87ccbf63b9913 Mon Sep 17 00:00:00 2001 From: Juanjo Garcia Date: Fri, 24 Jan 2025 13:06:36 +0100 Subject: [PATCH 2/7] Refs #22648: Corrected small uncrustify typo Signed-off-by: Juanjo Garcia --- test/blackbox/common/DDSBlackboxTestsListeners.cpp | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/test/blackbox/common/DDSBlackboxTestsListeners.cpp b/test/blackbox/common/DDSBlackboxTestsListeners.cpp index b36e0715ada..88d76dccb05 100644 --- a/test/blackbox/common/DDSBlackboxTestsListeners.cpp +++ b/test/blackbox/common/DDSBlackboxTestsListeners.cpp @@ -3437,11 +3437,10 @@ TEST(DDSStatus, keyed_reliable_positive_acks_disabled_on_unack_sample_removed) } /*¡ -* Regression Test for 22648: on_unacknowledged_sample_removed callback is called when best effort writer with keep all -* history is used, when the history was full but before max_blocking_time a sample was acknowledged, as is_acked was -* checked before the waiting time, and is not re-checked. This should not happen. -*/ - + * Regression Test for 22648: on_unacknowledged_sample_removed callback is called when best effort writer with keep all + * history is used, when the history was full but before max_blocking_time a sample was acknowledged, as is_acked was + * checked before the waiting time, and is not re-checked. This should not happen. + */ TEST(DDSStatus, reliable_keep_all_unack_sample_removed_call) { PubSubWriter writer(TEST_TOPIC_NAME); From bdae6aa915a76fa2ba3e2c408ce98351f95caeb9 Mon Sep 17 00:00:00 2001 From: Juanjo Garcia Date: Wed, 29 Jan 2025 11:19:24 +0100 Subject: [PATCH 3/7] Refs #22648: Final form of regression test Signed-off-by: Juanjo Garcia --- src/cpp/rtps/writer/StatefulWriter.cpp | 2 +- test/blackbox/api/dds-pim/PubSubWriter.hpp | 10 +++ .../common/DDSBlackboxTestsListeners.cpp | 79 ++++++++++++++----- 3 files changed, 72 insertions(+), 19 deletions(-) diff --git a/src/cpp/rtps/writer/StatefulWriter.cpp b/src/cpp/rtps/writer/StatefulWriter.cpp index f0dd8f2d883..eb1a7080e9d 100644 --- a/src/cpp/rtps/writer/StatefulWriter.cpp +++ b/src/cpp/rtps/writer/StatefulWriter.cpp @@ -1521,7 +1521,7 @@ void StatefulWriter::check_acked_status() if (min_low_mark >= get_seq_num_min()) { - may_remove_change_ = 1; + may_remove_change_ = (get_seq_num_min() == SequenceNumber_t::unknown()) ? 2 : 1; } min_readers_low_mark_ = min_low_mark; diff --git a/test/blackbox/api/dds-pim/PubSubWriter.hpp b/test/blackbox/api/dds-pim/PubSubWriter.hpp index 469c494691d..5b42ce4d00b 100644 --- a/test/blackbox/api/dds-pim/PubSubWriter.hpp +++ b/test/blackbox/api/dds-pim/PubSubWriter.hpp @@ -892,6 +892,16 @@ class PubSubWriter return *this; } + PubSubWriter& reliability( + const eprosima::fastdds::dds::ReliabilityQosPolicyKind kind, + const int max_blocking_time) + { + datawriter_qos_.reliability().kind = kind; + datawriter_qos_.reliability().max_blocking_time.seconds = max_blocking_time; + + return *this; + } + PubSubWriter& mem_policy( const eprosima::fastdds::rtps::MemoryManagementPolicy mem_policy) { diff --git a/test/blackbox/common/DDSBlackboxTestsListeners.cpp b/test/blackbox/common/DDSBlackboxTestsListeners.cpp index 88d76dccb05..7cd82d1e329 100644 --- a/test/blackbox/common/DDSBlackboxTestsListeners.cpp +++ b/test/blackbox/common/DDSBlackboxTestsListeners.cpp @@ -15,6 +15,10 @@ #include #include #include +#include +#include +#include +#include #include #include @@ -3443,18 +3447,70 @@ TEST(DDSStatus, keyed_reliable_positive_acks_disabled_on_unack_sample_removed) */ TEST(DDSStatus, reliable_keep_all_unack_sample_removed_call) { + auto test_transport = std::make_shared(); + test_transport->drop_data_messages_filter_ = [](eprosima::fastdds::rtps::CDRMessage_t& msg) -> bool + { + static std::vector> delayed_messages; + + uint32_t old_pos = msg.pos; + + // Parse writer ID and sequence number + msg.pos += 2; // flags + msg.pos += 2; // inline QoS + msg.pos += 4; // reader ID + auto writerID = eprosima::fastdds::helpers::cdr_parse_entity_id((char*)&msg.buffer[msg.pos]); + msg.pos += 4; + eprosima::fastdds::rtps::SequenceNumber_t sn; + sn.high = (int32_t)eprosima::fastdds::helpers::cdr_parse_u32((char*)&msg.buffer[msg.pos]); + msg.pos += 4; + sn.low = eprosima::fastdds::helpers::cdr_parse_u32((char*)&msg.buffer[msg.pos]); + + // Restore buffer position + msg.pos = old_pos; + + // Delay logic for user endpoints only + if ((writerID.value[3] & 0xC0) == 0) // only user endpoints + { + auto now = std::chrono::steady_clock::now(); + auto it = std::find_if(delayed_messages.begin(), delayed_messages.end(), + [&sn](const auto& pair) { + return pair.first == sn; + }); + + if (it == delayed_messages.end()) + { + // If the sequence number is encountered for the first time, start the delay + delayed_messages.emplace_back(sn, now + std::chrono::milliseconds(750)); // Add delay + return true; // Start dropping this message + } + else if (now < it->second) + { + // If the delay period has not elapsed, keep dropping the message + return true; + } + else + { + // Once the delay has elapsed, allow the message to proceed + delayed_messages.erase(it); + } + } + return false; // Allow message to proceed + }; + PubSubWriter writer(TEST_TOPIC_NAME); PubSubReader reader(TEST_TOPIC_NAME); - writer.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS) - .durability_kind(eprosima::fastdds::dds::TRANSIENT_LOCAL_DURABILITY_QOS) + writer.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS, 200) .history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS) - .history_depth(1) + .resource_limits_max_instances(1) + .resource_limits_max_samples(1) + .resource_limits_max_samples_per_instance(1) + .disable_builtin_transport() + .add_user_transport_to_pparams(test_transport) .init(); ASSERT_TRUE(writer.isInitialized()); reader.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS) - .durability_kind(eprosima::fastdds::dds::TRANSIENT_LOCAL_DURABILITY_QOS) .init(); ASSERT_TRUE(reader.isInitialized()); @@ -3462,24 +3518,11 @@ TEST(DDSStatus, reliable_keep_all_unack_sample_removed_call) writer.wait_discovery(); reader.wait_discovery(); - auto data = default_helloworld_data_generator(); - - bool firstOneSent = false; + auto data = default_helloworld_data_generator(2); for (auto sample : data) { - reader.stopReception(); writer.send_sample(sample); - std::this_thread::sleep_for(std::chrono::milliseconds(50)); - if (firstOneSent) - { - reader.startReception(data); - std::this_thread::sleep_for(std::chrono::milliseconds(50)); - } - else - { - firstOneSent = true; - } } EXPECT_EQ(writer.times_unack_sample_removed(), 0u); From 3d28b37c6e14fa07233c4240699831229782adf9 Mon Sep 17 00:00:00 2001 From: Juanjo Garcia Date: Wed, 29 Jan 2025 11:21:32 +0100 Subject: [PATCH 4/7] Refs #22648: Fixed bug Signed-off-by: Juanjo Garcia --- src/cpp/fastdds/publisher/DataWriterHistory.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/cpp/fastdds/publisher/DataWriterHistory.cpp b/src/cpp/fastdds/publisher/DataWriterHistory.cpp index 963ae6c3c39..844f63c8cfb 100644 --- a/src/cpp/fastdds/publisher/DataWriterHistory.cpp +++ b/src/cpp/fastdds/publisher/DataWriterHistory.cpp @@ -160,6 +160,8 @@ bool DataWriterHistory::prepare_change( if (history_qos_.kind == KEEP_ALL_HISTORY_QOS) { ret = this->mp_writer->try_remove_change(max_blocking_time, lock); + // If change was removed (ret == 1) in KeepAllHistory, it must have been acked + is_acked = ret; } else if (history_qos_.kind == KEEP_LAST_HISTORY_QOS) { From ae0d6a5493fbcc4fbb3f1bd4d547758156b48e3b Mon Sep 17 00:00:00 2001 From: Juanjo Garcia Date: Wed, 29 Jan 2025 11:33:05 +0100 Subject: [PATCH 5/7] Refs #22648: corrected failing CI Signed-off-by: Juanjo Garcia --- .../common/DDSBlackboxTestsListeners.cpp | 96 ++++++++++--------- 1 file changed, 49 insertions(+), 47 deletions(-) diff --git a/test/blackbox/common/DDSBlackboxTestsListeners.cpp b/test/blackbox/common/DDSBlackboxTestsListeners.cpp index 7cd82d1e329..89bdfb7e755 100644 --- a/test/blackbox/common/DDSBlackboxTestsListeners.cpp +++ b/test/blackbox/common/DDSBlackboxTestsListeners.cpp @@ -3449,54 +3449,56 @@ TEST(DDSStatus, reliable_keep_all_unack_sample_removed_call) { auto test_transport = std::make_shared(); test_transport->drop_data_messages_filter_ = [](eprosima::fastdds::rtps::CDRMessage_t& msg) -> bool - { - static std::vector> delayed_messages; - - uint32_t old_pos = msg.pos; - - // Parse writer ID and sequence number - msg.pos += 2; // flags - msg.pos += 2; // inline QoS - msg.pos += 4; // reader ID - auto writerID = eprosima::fastdds::helpers::cdr_parse_entity_id((char*)&msg.buffer[msg.pos]); - msg.pos += 4; - eprosima::fastdds::rtps::SequenceNumber_t sn; - sn.high = (int32_t)eprosima::fastdds::helpers::cdr_parse_u32((char*)&msg.buffer[msg.pos]); - msg.pos += 4; - sn.low = eprosima::fastdds::helpers::cdr_parse_u32((char*)&msg.buffer[msg.pos]); - - // Restore buffer position - msg.pos = old_pos; - - // Delay logic for user endpoints only - if ((writerID.value[3] & 0xC0) == 0) // only user endpoints - { - auto now = std::chrono::steady_clock::now(); - auto it = std::find_if(delayed_messages.begin(), delayed_messages.end(), - [&sn](const auto& pair) { - return pair.first == sn; - }); - - if (it == delayed_messages.end()) { - // If the sequence number is encountered for the first time, start the delay - delayed_messages.emplace_back(sn, now + std::chrono::milliseconds(750)); // Add delay - return true; // Start dropping this message - } - else if (now < it->second) - { - // If the delay period has not elapsed, keep dropping the message - return true; - } - else - { - // Once the delay has elapsed, allow the message to proceed - delayed_messages.erase(it); - } - } - return false; // Allow message to proceed - }; - + static std::vector> delayed_messages; + + uint32_t old_pos = msg.pos; + + // Parse writer ID and sequence number + msg.pos += 2; // flags + msg.pos += 2; // inline QoS + msg.pos += 4; // reader ID + auto writerID = eprosima::fastdds::helpers::cdr_parse_entity_id((char*)&msg.buffer[msg.pos]); + msg.pos += 4; + eprosima::fastdds::rtps::SequenceNumber_t sn; + sn.high = (int32_t)eprosima::fastdds::helpers::cdr_parse_u32((char*)&msg.buffer[msg.pos]); + msg.pos += 4; + sn.low = eprosima::fastdds::helpers::cdr_parse_u32((char*)&msg.buffer[msg.pos]); + + // Restore buffer position + msg.pos = old_pos; + + // Delay logic for user endpoints only + if ((writerID.value[3] & 0xC0) == 0) // only user endpoints + { + auto now = std::chrono::steady_clock::now(); + auto it = std::find_if(delayed_messages.begin(), delayed_messages.end(), + [&sn](const auto& pair) + { + return pair.first == sn; + }); + + if (it == delayed_messages.end()) + { + // If the sequence number is encountered for the first time, start the delay + delayed_messages.emplace_back(sn, now + std::chrono::milliseconds(750)); // Add delay + return true; // Start dropping this message + } + else if (now < it->second) + { + // If the delay period has not elapsed, keep dropping the message + return true; + } + else + { + // Once the delay has elapsed, allow the message to proceed + delayed_messages.erase(it); + } + } + return false; // Allow message to proceed + }; + PubSubWriter writer(TEST_TOPIC_NAME); PubSubReader reader(TEST_TOPIC_NAME); From 990a2013b48782fc60eddf93bba92805e69f0e58 Mon Sep 17 00:00:00 2001 From: Juanjo Garcia Date: Wed, 29 Jan 2025 16:32:47 +0100 Subject: [PATCH 6/7] Refs #22648: Modified helper function Signed-off-by: Juanjo Garcia --- test/blackbox/api/dds-pim/PubSubWriter.hpp | 4 ++-- test/blackbox/common/DDSBlackboxTestsListeners.cpp | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/test/blackbox/api/dds-pim/PubSubWriter.hpp b/test/blackbox/api/dds-pim/PubSubWriter.hpp index 5b42ce4d00b..5a6d081818d 100644 --- a/test/blackbox/api/dds-pim/PubSubWriter.hpp +++ b/test/blackbox/api/dds-pim/PubSubWriter.hpp @@ -894,10 +894,10 @@ class PubSubWriter PubSubWriter& reliability( const eprosima::fastdds::dds::ReliabilityQosPolicyKind kind, - const int max_blocking_time) + eprosima::fastdds::dds::Duration_t max_blocking_time) { datawriter_qos_.reliability().kind = kind; - datawriter_qos_.reliability().max_blocking_time.seconds = max_blocking_time; + datawriter_qos_.reliability().max_blocking_time = max_blocking_time; return *this; } diff --git a/test/blackbox/common/DDSBlackboxTestsListeners.cpp b/test/blackbox/common/DDSBlackboxTestsListeners.cpp index 89bdfb7e755..6810d76c2bd 100644 --- a/test/blackbox/common/DDSBlackboxTestsListeners.cpp +++ b/test/blackbox/common/DDSBlackboxTestsListeners.cpp @@ -3502,7 +3502,7 @@ TEST(DDSStatus, reliable_keep_all_unack_sample_removed_call) PubSubWriter writer(TEST_TOPIC_NAME); PubSubReader reader(TEST_TOPIC_NAME); - writer.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS, 200) + writer.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS, eprosima::fastdds::dds::Duration_t (200, 0)) .history_kind(eprosima::fastdds::dds::KEEP_ALL_HISTORY_QOS) .resource_limits_max_instances(1) .resource_limits_max_samples(1) From 9bc500176e129545ba59dccc208720c3461a33d8 Mon Sep 17 00:00:00 2001 From: Juanjo Garcia Date: Thu, 30 Jan 2025 14:26:29 +0100 Subject: [PATCH 7/7] Refs #22648: solved conflicts Signed-off-by: Juanjo Garcia --- src/cpp/rtps/writer/StatefulWriter.cpp | 2 ++ test/blackbox/api/dds-pim/PubSubWriter.hpp | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/cpp/rtps/writer/StatefulWriter.cpp b/src/cpp/rtps/writer/StatefulWriter.cpp index eb1a7080e9d..3e177a3bd6b 100644 --- a/src/cpp/rtps/writer/StatefulWriter.cpp +++ b/src/cpp/rtps/writer/StatefulWriter.cpp @@ -1521,6 +1521,8 @@ void StatefulWriter::check_acked_status() if (min_low_mark >= get_seq_num_min()) { + // get_seq_num_min() returns SequenceNumber_t::unknown() when the history is empty. + // Thus, it is set to 2 to indicate that all samples have been removed. may_remove_change_ = (get_seq_num_min() == SequenceNumber_t::unknown()) ? 2 : 1; } diff --git a/test/blackbox/api/dds-pim/PubSubWriter.hpp b/test/blackbox/api/dds-pim/PubSubWriter.hpp index 5a6d081818d..450bc0e9a5b 100644 --- a/test/blackbox/api/dds-pim/PubSubWriter.hpp +++ b/test/blackbox/api/dds-pim/PubSubWriter.hpp @@ -898,7 +898,6 @@ class PubSubWriter { datawriter_qos_.reliability().kind = kind; datawriter_qos_.reliability().max_blocking_time = max_blocking_time; - return *this; }