From 30bf3ecb7dd9a29e1085cdbae1009aeb9240ad8f Mon Sep 17 00:00:00 2001 From: Miguel Company Date: Thu, 4 Apr 2024 12:42:42 +0200 Subject: [PATCH] Allow processing of AckNack submessages with count == 0 (#4639) * Refs #20729. Regression test. Signed-off-by: Miguel Company * Refs #20729. Fix issue. Signed-off-by: Miguel Company * Refs #20729. Update doxydoc. Signed-off-by: Miguel Company * Refs #20729. Dont create timers if participant is nullptr. Signed-off-by: Miguel Company --------- Signed-off-by: Miguel Company (cherry picked from commit 66fc7c533a3fa8d5633dfb6f6f23c96cd6315bc6) # Conflicts: # test/unittest/rtps/writer/ReaderProxyTests.cpp --- include/fastdds/rtps/writer/ReaderProxy.h | 15 ++-- src/cpp/rtps/writer/ReaderProxy.cpp | 54 +++++++----- .../unittest/rtps/writer/ReaderProxyTests.cpp | 84 +++++++++++++++++++ 3 files changed, 125 insertions(+), 28 deletions(-) diff --git a/include/fastdds/rtps/writer/ReaderProxy.h b/include/fastdds/rtps/writer/ReaderProxy.h index 20f487a1d67..5ca0b686db4 100644 --- a/include/fastdds/rtps/writer/ReaderProxy.h +++ b/include/fastdds/rtps/writer/ReaderProxy.h @@ -300,16 +300,19 @@ class ReaderProxy } /** - * Called when an ACKNACK is received to set a new value for the count of the last received ACKNACK. + * Called when an ACKNACK is received to set a new value for the minimum count accepted for following received + * ACKNACKs. + * * @param acknack_count The count of the received ACKNACK. - * @return true if internal count changed (i.e. new ACKNACK is accepted) + * @return true if internal count changed (i.e. received ACKNACK is accepted) */ bool check_and_set_acknack_count( uint32_t acknack_count) { - if (last_acknack_count_ < acknack_count) + if (acknack_count >= next_expected_acknack_count_) { - last_acknack_count_ = acknack_count; + next_expected_acknack_count_ = acknack_count; + ++next_expected_acknack_count_; return true; } @@ -423,8 +426,8 @@ class ReaderProxy TimedEvent* initial_heartbeat_event_; //! Are timed events enabled? std::atomic_bool timers_enabled_; - //! Last ack/nack count - uint32_t last_acknack_count_; + //! Next expected ack/nack count + uint32_t next_expected_acknack_count_; //! Last NACKFRAG count. uint32_t last_nackfrag_count_; diff --git a/src/cpp/rtps/writer/ReaderProxy.cpp b/src/cpp/rtps/writer/ReaderProxy.cpp index 4ef2829e98c..a86b5614b2d 100644 --- a/src/cpp/rtps/writer/ReaderProxy.cpp +++ b/src/cpp/rtps/writer/ReaderProxy.cpp @@ -57,23 +57,27 @@ ReaderProxy::ReaderProxy( , nack_supression_event_(nullptr) , initial_heartbeat_event_(nullptr) , timers_enabled_(false) - , last_acknack_count_(0) + , next_expected_acknack_count_(0) , last_nackfrag_count_(0) { - nack_supression_event_ = new TimedEvent(writer_->getRTPSParticipant()->getEventResource(), - [&]() -> bool - { - writer_->perform_nack_supression(guid()); - return false; - }, - TimeConv::Time_t2MilliSecondsDouble(times.nackSupressionDuration)); + auto participant = writer_->getRTPSParticipant(); + if (nullptr != participant) + { + nack_supression_event_ = new TimedEvent(participant->getEventResource(), + [&]() -> bool + { + writer_->perform_nack_supression(guid()); + return false; + }, + TimeConv::Time_t2MilliSecondsDouble(times.nackSupressionDuration)); - initial_heartbeat_event_ = new TimedEvent(writer_->getRTPSParticipant()->getEventResource(), - [&]() -> bool - { - writer_->intraprocess_heartbeat(this); - return false; - }, 0); + initial_heartbeat_event_ = new TimedEvent(participant->getEventResource(), + [&]() -> bool + { + writer_->intraprocess_heartbeat(this); + return false; + }, 0); + } stop(); } @@ -135,7 +139,7 @@ void ReaderProxy::start( } timers_enabled_.store(is_remote_and_reliable()); - if (is_local_reader()) + if (is_local_reader() && initial_heartbeat_event_) { initial_heartbeat_event_->restart_timer(); } @@ -166,24 +170,30 @@ void ReaderProxy::stop() disable_timers(); changes_for_reader_.clear(); - last_acknack_count_ = 0; + next_expected_acknack_count_ = 0; last_nackfrag_count_ = 0; changes_low_mark_ = SequenceNumber_t(); } void ReaderProxy::disable_timers() { - if (timers_enabled_.exchange(false)) + if (timers_enabled_.exchange(false) && nack_supression_event_) { nack_supression_event_->cancel_timer(); } - initial_heartbeat_event_->cancel_timer(); + if (initial_heartbeat_event_) + { + initial_heartbeat_event_->cancel_timer(); + } } void ReaderProxy::update_nack_supression_interval( const Duration_t& interval) { - nack_supression_event_->update_interval(interval); + if (nack_supression_event_) + { + nack_supression_event_->update_interval(interval); + } } void ReaderProxy::add_change( @@ -191,7 +201,7 @@ void ReaderProxy::add_change( bool is_relevant, bool restart_nack_supression) { - if (restart_nack_supression && timers_enabled_.load()) + if (restart_nack_supression && timers_enabled_.load() && nack_supression_event_) { nack_supression_event_->restart_timer(); } @@ -205,7 +215,7 @@ void ReaderProxy::add_change( bool restart_nack_supression, const std::chrono::time_point& max_blocking_time) { - if (restart_nack_supression && timers_enabled_) + if (restart_nack_supression && timers_enabled_ && nack_supression_event_) { nack_supression_event_->restart_timer(max_blocking_time); } @@ -459,7 +469,7 @@ void ReaderProxy::from_unsent_to_status( // It will use acked_changes_set(). assert(is_reliable_); - if (restart_nack_supression && is_remote_and_reliable()) + if (restart_nack_supression && is_remote_and_reliable() && nack_supression_event_) { assert(timers_enabled_.load()); nack_supression_event_->restart_timer(); diff --git a/test/unittest/rtps/writer/ReaderProxyTests.cpp b/test/unittest/rtps/writer/ReaderProxyTests.cpp index 56bc95b3a44..ed7ae886651 100644 --- a/test/unittest/rtps/writer/ReaderProxyTests.cpp +++ b/test/unittest/rtps/writer/ReaderProxyTests.cpp @@ -331,6 +331,90 @@ TEST(ReaderProxyTests, process_nack_frag_multiple_fragments_different_windows_te TOTAL_NUMBER_OF_FRAGMENTS + 1u), TOTAL_NUMBER_OF_FRAGMENTS + 1u); } +<<<<<<< HEAD +======= +TEST(ReaderProxyTests, has_been_delivered_test) +{ + StatefulWriter writer_mock; + WriterTimes w_times; + RemoteLocatorsAllocationAttributes alloc; + ReaderProxy rproxy(w_times, alloc, &writer_mock); + + CacheChange_t seq1; + CacheChange_t seq2; + seq1.sequenceNumber = {0, 1}; + seq2.sequenceNumber = {0, 2}; + + ReaderProxyData reader_attributes(0, 0); + reader_attributes.m_qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS; + rproxy.start(reader_attributes); + + auto expect_result = [&rproxy](SequenceNumber_t seq, bool delivered, bool should_be_found) + { + bool found = false; + EXPECT_EQ(delivered, rproxy.has_been_delivered(seq, found)); + EXPECT_EQ(should_be_found, found); + }; + + // Add changes 1 and 2 + rproxy.add_change(ChangeForReader_t(&seq1), true, false); + rproxy.add_change(ChangeForReader_t(&seq2), true, false); + + // None of them has been delivered + expect_result(seq1.sequenceNumber, false, true); + expect_result(seq2.sequenceNumber, false, true); + + // Change 1 is sent + rproxy.from_unsent_to_status(seq1.sequenceNumber, UNACKNOWLEDGED, false, true); + + // Only change 1 has been delivered. Both are found + expect_result(seq1.sequenceNumber, true, true); + expect_result(seq2.sequenceNumber, false, true); + + // Change 1 is acknowledged + rproxy.acked_changes_set(seq1.sequenceNumber + 1); + + // Only change 1 has been delivered. Only change 2 is found + expect_result(seq1.sequenceNumber, true, false); + expect_result(seq2.sequenceNumber, false, true); + + // Change in the future should return not delivered and not found + expect_result({0, 3}, false, false); +} + +// Test expectations regarding acknack count. +// Serves as a regression test for redmine issue #20729. +TEST(ReaderProxyTests, acknack_count) +{ + StatefulWriter writer_mock; + WriterTimes w_times; + RemoteLocatorsAllocationAttributes alloc; + ReaderProxy rproxy(w_times, alloc, &writer_mock); + + ReaderProxyData reader_attributes(0, 0); + reader_attributes.m_qos.m_reliability.kind = RELIABLE_RELIABILITY_QOS; + rproxy.start(reader_attributes); + + // Check that the initial acknack count is 0. + EXPECT_TRUE(rproxy.check_and_set_acknack_count(0u)); + // Check that it is not accepted twice. + EXPECT_FALSE(rproxy.check_and_set_acknack_count(0u)); + // Check that it is accepted if it is incremented. + EXPECT_TRUE(rproxy.check_and_set_acknack_count(1u)); + // Check that it is not accepted twice. + EXPECT_FALSE(rproxy.check_and_set_acknack_count(1u)); + // Check that it is not accepted if it is decremented. + EXPECT_FALSE(rproxy.check_and_set_acknack_count(0u)); + // Check that it is accepted if it has a big increment. + EXPECT_TRUE(rproxy.check_and_set_acknack_count(100u)); + // Check that previous values are rejected. + for (uint32_t i = 0; i <= 100u; ++i) + { + EXPECT_FALSE(rproxy.check_and_set_acknack_count(i)); + } +} + +>>>>>>> 66fc7c533 (Allow processing of AckNack submessages with count == 0 (#4639)) } // namespace rtps } // namespace fastrtps } // namespace eprosima