Skip to content

Commit

Permalink
Filter interested readers on PDP writer (#5604)
Browse files Browse the repository at this point in the history
* Refs #22506. Regression test.

Signed-off-by: Miguel Company <[email protected]>

* Refs #22506. Add `PDPStatelessWriter` extending `StatelessWriter`

Signed-off-by: Miguel Company <[email protected]>

* Refs #22506. Use `PDPStatelessWriter` in `SimplePDPEndpoints`

Signed-off-by: Miguel Company <[email protected]>

* Refs #22506. Move specific API to `PDPStatelessWriter`.

Signed-off-by: Miguel Company <[email protected]>

* Refs #22506. Initial work on management of interested writers.

Signed-off-by: Miguel Company <[email protected]>

* Refs #22506. Take advantage of single sample history.

Signed-off-by: Miguel Company <[email protected]>

* Refs #22506. Reset state after sending datagram.

Signed-off-by: Miguel Company <[email protected]>

* Refs #22506. Filter interested readers.

Signed-off-by: Miguel Company <[email protected]>

* Refs #22506. Do not reset unsent changes upon participant discovery.

Signed-off-by: Miguel Company <[email protected]>

* Refs #22506. Fix AsymmeticIgnoreParticipantFlags test.

Signed-off-by: Miguel Company <[email protected]>

* Refs #22506. Create sender resources for matched readers.

Signed-off-by: Miguel Company <[email protected]>

* Refs #22708. Rename methods.

Signed-off-by: Miguel Company <[email protected]>

* Refs #22708. Avoid allocations using a `ResourceLimitedVector` instead of an `std::set`

Signed-off-by: Miguel Company <[email protected]>

* Refs #22604. Improve monitor service blackbox tests.

Reducing announcement period to improve discovery timing.

Signed-off-by: Miguel Company <[email protected]>

---------

Signed-off-by: Miguel Company <[email protected]>
(cherry picked from commit 8eb1072)
  • Loading branch information
MiguelCompany authored and mergify[bot] committed Feb 21, 2025
1 parent e03357e commit 6e7d002
Show file tree
Hide file tree
Showing 13 changed files with 506 additions and 66 deletions.
1 change: 1 addition & 0 deletions src/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ set(${PROJECT_NAME}_source_files
rtps/builtin/discovery/participant/PDPServer.cpp
rtps/builtin/discovery/participant/PDPServerListener.cpp
rtps/builtin/discovery/participant/PDPSimple.cpp
rtps/builtin/discovery/participant/simple/PDPStatelessWriter.cpp
rtps/builtin/discovery/participant/timedevent/DSClientEvent.cpp
rtps/builtin/discovery/participant/timedevent/DServerEvent.cpp
rtps/builtin/liveliness/WLP.cpp
Expand Down
11 changes: 3 additions & 8 deletions src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ void PDPSimple::announceParticipantState(

if (!(dispose || new_change))
{
endpoints->writer.writer_->unsent_changes_reset();
endpoints->writer.writer_->send_periodic_announcement();
}
}
}
Expand Down Expand Up @@ -403,7 +403,7 @@ bool PDPSimple::create_dcps_participant_endpoints()
if (mp_RTPSParticipant->createWriter(&rtps_writer, watt, writer.history_.get(),
nullptr, writer_entity_id, true))
{
writer.writer_ = dynamic_cast<StatelessWriter*>(rtps_writer);
writer.writer_ = dynamic_cast<PDPStatelessWriter*>(rtps_writer);
assert(nullptr != writer.writer_);

#if HAVE_SECURITY
Expand Down Expand Up @@ -455,7 +455,7 @@ bool PDPSimple::create_dcps_participant_endpoints()
EPROSIMA_LOG_WARNING(RTPS_PDP, "Ignoring initial peers locator " << loc << " : not allowed.");
}
}
writer.writer_->set_fixed_locators(fixed_locators);
writer.writer_->set_initial_peers(fixed_locators);
}
else
{
Expand Down Expand Up @@ -725,11 +725,6 @@ void PDPSimple::match_pdp_remote_endpoints(
{
writer->matched_reader_add_edp(*temp_reader_data);
}

if (!writer_only && (dds::BEST_EFFORT_RELIABILITY_QOS == reliability_kind))
{
endpoints->writer.writer_->unsent_changes_reset();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
// Copyright 2025 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* @file PDPStatelessWriter.cpp
*/

#include <rtps/builtin/discovery/participant/simple/PDPStatelessWriter.hpp>

#include <algorithm>
#include <cassert>
#include <chrono>
#include <cstdint>
#include <mutex>
#include <set>
#include <vector>

#include <fastdds/rtps/common/LocatorList.hpp>
#include <fastdds/rtps/history/WriterHistory.hpp>
#include <fastdds/rtps/transport/NetworkBuffer.hpp>
#include <fastdds/utils/collections/ResourceLimitedVector.hpp>
#include <fastdds/utils/TimedMutex.hpp>

#include <rtps/builtin/data/ReaderProxyData.hpp>
#include <rtps/participant/RTPSParticipantImpl.hpp>
#include <rtps/writer/StatelessWriter.hpp>

namespace eprosima {
namespace fastdds {
namespace rtps {

PDPStatelessWriter::PDPStatelessWriter(
RTPSParticipantImpl* participant,
const GUID_t& guid,
const WriterAttributes& attributes,
FlowController* flow_controller,
WriterHistory* history,
WriterListener* listener)
: StatelessWriter(participant, guid, attributes, flow_controller, history, listener)
, interested_readers_(participant->get_attributes().allocation.participants)
{
}

bool PDPStatelessWriter::matched_reader_add_edp(
const ReaderProxyData& data)
{
bool ret = StatelessWriter::matched_reader_add_edp(data);
if (ret)
{
// Mark new reader as interested
add_interested_reader(data.guid());
// Send announcement to new reader
reschedule_all_samples();
}
return ret;
}

bool PDPStatelessWriter::matched_reader_remove(
const GUID_t& reader_guid)
{
bool ret = StatelessWriter::matched_reader_remove(reader_guid);
if (ret)
{
// Mark reader as not interested
remove_interested_reader(reader_guid);
}
return ret;
}

void PDPStatelessWriter::unsent_change_added_to_history(
CacheChange_t* change,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time)
{
mark_all_readers_interested();
StatelessWriter::unsent_change_added_to_history(change, max_blocking_time);
}

void PDPStatelessWriter::set_initial_peers(
const LocatorList& locator_list)
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);

initial_peers_.push_back(locator_list);
mp_RTPSParticipant->createSenderResources(initial_peers_);
}

void PDPStatelessWriter::send_periodic_announcement()
{
mark_all_readers_interested();
reschedule_all_samples();
}

bool PDPStatelessWriter::send_to_fixed_locators(
const std::vector<eprosima::fastdds::rtps::NetworkBuffer>& buffers,
const uint32_t& total_bytes,
std::chrono::steady_clock::time_point& max_blocking_time_point) const
{
bool ret = true;

if (should_reach_all_destinations_)
{
ret = initial_peers_.empty() ||
mp_RTPSParticipant->sendSync(buffers, total_bytes, m_guid,
Locators(initial_peers_.begin()), Locators(initial_peers_.end()),
max_blocking_time_point);

if (ret)
{
fixed_locators_.clear();
should_reach_all_destinations_ = false;
}
}
else
{
interested_readers_.clear();
}

return ret;
}

bool PDPStatelessWriter::is_relevant(
const fastdds::rtps::CacheChange_t& /* change */,
const fastdds::rtps::GUID_t& reader_guid) const
{
return interested_readers_.end() !=
std::find(interested_readers_.begin(), interested_readers_.end(), reader_guid);
}

void PDPStatelessWriter::mark_all_readers_interested()
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
should_reach_all_destinations_ = true;
interested_readers_.clear();
fixed_locators_.clear();
fixed_locators_.push_back(initial_peers_);
reader_data_filter(nullptr);
}

void PDPStatelessWriter::add_interested_reader(
const GUID_t& reader_guid)
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
if (!should_reach_all_destinations_)
{
auto it = std::find(interested_readers_.begin(), interested_readers_.end(), reader_guid);
if (it == interested_readers_.end())
{
interested_readers_.emplace_back(reader_guid);
reader_data_filter(this);
}
}
}

void PDPStatelessWriter::remove_interested_reader(
const GUID_t& reader_guid)
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
interested_readers_.remove(reader_guid);
}

void PDPStatelessWriter::reschedule_all_samples()
{
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
size_t n_samples = history_->getHistorySize();
if (0 < n_samples)
{
assert(1 == n_samples);
auto it = history_->changesBegin();
CacheChange_t* change = *it;
flow_controller_->add_new_sample(this, change, std::chrono::steady_clock::now() + std::chrono::hours(24));
}
}

} // namespace rtps
} // namespace fastdds
} // namespace eprosima
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
// Copyright 2025 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/**
* @file PDPStatelessWriter.hpp
*/

#ifndef FASTDDS_RTPS_BUILTIN_DISCOVERY_PARTICIPANT_SIMPLE__PDPSTATELESSWRITER_HPP
#define FASTDDS_RTPS_BUILTIN_DISCOVERY_PARTICIPANT_SIMPLE__PDPSTATELESSWRITER_HPP

#include <chrono>

#include <fastdds/rtps/common/LocatorList.hpp>
#include <fastdds/rtps/interfaces/IReaderDataFilter.hpp>
#include <fastdds/utils/collections/ResourceLimitedVector.hpp>

#include <rtps/writer/StatelessWriter.hpp>

namespace eprosima {
namespace fastdds {
namespace rtps {

/**
* Class PDPStatelessWriter, specialization of StatelessWriter with specific behavior for PDP.
*/
class PDPStatelessWriter : public StatelessWriter, private IReaderDataFilter
{

public:

PDPStatelessWriter(
RTPSParticipantImpl* participant,
const GUID_t& guid,
const WriterAttributes& attributes,
FlowController* flow_controller,
WriterHistory* history,
WriterListener* listener);

virtual ~PDPStatelessWriter() = default;

//vvvvvvvvvvvvvvvvvvvvv [Exported API] vvvvvvvvvvvvvvvvvvvvv

bool matched_reader_add_edp(
const ReaderProxyData& data) final;

bool matched_reader_remove(
const GUID_t& reader_guid) final;

//^^^^^^^^^^^^^^^^^^^^^^ [Exported API] ^^^^^^^^^^^^^^^^^^^^^^^

//vvvvvvvvvvvvvvvvvvvvv [BaseWriter API] vvvvvvvvvvvvvvvvvvvvvv

void unsent_change_added_to_history(
CacheChange_t* change,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) final;

//^^^^^^^^^^^^^^^^^^^^^^ [BaseWriter API] ^^^^^^^^^^^^^^^^^^^^^^^

/**
* @brief Set the locators to which the writer should send periodic announcements.
*
* This method is used to configure the initial peers list on the PDP writer.
*
* @param locator_list List of locators to which the writer should send periodic announcements.
*
* @return true if the locators were set successfully.
*/
void set_initial_peers(
const LocatorList& locator_list);

/**
* Reset the unsent changes.
*/
void send_periodic_announcement();

protected:

bool send_to_fixed_locators(
const std::vector<eprosima::fastdds::rtps::NetworkBuffer>& buffers,
const uint32_t& total_bytes,
std::chrono::steady_clock::time_point& max_blocking_time_point) const override;

private:

/**
* This method checks whether a CacheChange_t is relevant for the specified reader
* This callback should return always the same result given the same arguments
* @param change The CacheChange_t to be evaluated
* @param reader_guid remote reader GUID_t
* @return true if relevant, false otherwise.
*/
bool is_relevant(
const fastdds::rtps::CacheChange_t& change,
const fastdds::rtps::GUID_t& reader_guid) const final;

/**
* @brief Mark all readers as interested.
*
* This method sets the flag indicating that all readers are interested in the data sent by this writer.
* It is used to ensure that all readers are considered when sending data.
* The flag will be reset when all the samples from this writer have been sent.
*/
void mark_all_readers_interested();

/**
* @brief Mark an interested reader.
*
* Add the guid of a reader to the list of interested readers.
*
* @param reader_guid The GUID of the reader to mark as interested.
*/
void add_interested_reader(
const GUID_t& reader_guid);

/**
* @brief Unmark an interested reader.
*
* Remove the guid of a reader from the list of interested readers.
*
* @param reader_guid The GUID of the reader to mark as interested.
*/
void remove_interested_reader(
const GUID_t& reader_guid);

/**
* @brief Add all samples from this writer to the flow controller.
*/
void reschedule_all_samples();

//! Configured initial peers
LocatorList initial_peers_{};
//! The set of readers interested
mutable ResourceLimitedVector<GUID_t> interested_readers_;
//! Whether we have set that all destinations are interested
mutable bool should_reach_all_destinations_ = false;

};

} // namespace rtps
} // namespace fastdds
} // namespace eprosima

#endif // FASTDDS_RTPS_BUILTIN_DISCOVERY_PARTICIPANT_SIMPLE__PDPSTATELESSWRITER_HPP

Loading

0 comments on commit 6e7d002

Please sign in to comment.