Skip to content

Commit ef3c345

Browse files
Refs #22506. Filter interested readers on PDP writer
Signed-off-by: Eugenio Collado <[email protected]>
1 parent 88a860d commit ef3c345

File tree

11 files changed

+375
-34
lines changed

11 files changed

+375
-34
lines changed

include/fastdds/rtps/writer/StatelessWriter.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,12 @@ class StatelessWriter : public RTPSWriter
7676
WriterHistory* hist,
7777
WriterListener* listen = nullptr);
7878

79+
mutable LocatorList_t fixed_locators_;
80+
81+
virtual bool send_to_fixed_locators(
82+
CDRMessage_t* message,
83+
std::chrono::steady_clock::time_point& max_blocking_time_point) const;
84+
7985
public:
8086

8187
virtual ~StatelessWriter();
@@ -159,10 +165,11 @@ class StatelessWriter : public RTPSWriter
159165
//FOR NOW THERE IS NOTHING TO UPDATE.
160166
}
161167

168+
//! Deprecated in favor of PDP simple writer
162169
bool set_fixed_locators(
163170
const LocatorList_t& locator_list);
164171

165-
//!Reset the unsent changes.
172+
//! Deprecated in favor of PDP simple writer
166173
void unsent_changes_reset();
167174

168175
/**
@@ -272,7 +279,6 @@ class StatelessWriter : public RTPSWriter
272279

273280

274281
bool is_inline_qos_expected_ = false;
275-
LocatorList_t fixed_locators_;
276282
ResourceLimitedVector<std::unique_ptr<ReaderLocator>> matched_remote_readers_;
277283

278284
std::condition_variable_any unsent_changes_cond_;

src/cpp/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ set(${PROJECT_NAME}_source_files
195195
rtps/builtin/discovery/participant/PDP.cpp
196196
rtps/builtin/discovery/participant/ServerAttributes.cpp
197197
rtps/builtin/discovery/participant/PDPSimple.cpp
198+
rtps/builtin/discovery/participant/simple/PDPStatelessWriter.cpp
198199
rtps/builtin/discovery/participant/PDPListener.cpp
199200
rtps/builtin/discovery/endpoint/EDP.cpp
200201
rtps/builtin/discovery/endpoint/EDPSimple.cpp

src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ void PDPSimple::announceParticipantState(
285285

286286
if (!(dispose || new_change))
287287
{
288-
endpoints->writer.writer_->unsent_changes_reset();
288+
endpoints->writer.writer_->send_periodic_announcement();
289289
}
290290
}
291291
}
@@ -399,7 +399,7 @@ bool PDPSimple::create_dcps_participant_endpoints()
399399
if (mp_RTPSParticipant->createWriter(&rtps_writer, watt, writer.payload_pool_, writer.history_.get(),
400400
nullptr, writer_entity_id, true))
401401
{
402-
writer.writer_ = dynamic_cast<StatelessWriter*>(rtps_writer);
402+
writer.writer_ = dynamic_cast<PDPStatelessWriter*>(rtps_writer);
403403
assert(nullptr != writer.writer_);
404404

405405
#if HAVE_SECURITY
@@ -451,7 +451,7 @@ bool PDPSimple::create_dcps_participant_endpoints()
451451
EPROSIMA_LOG_WARNING(RTPS_PDP, "Ignoring initial peers locator " << loc << " : not allowed.");
452452
}
453453
}
454-
writer.writer_->set_fixed_locators(fixed_locators);
454+
writer.writer_->set_initial_peers(fixed_locators);
455455
}
456456
else
457457
{
@@ -721,11 +721,6 @@ void PDPSimple::match_pdp_remote_endpoints(
721721
{
722722
writer->matched_reader_add(*temp_reader_data);
723723
}
724-
725-
if (!writer_only && (BEST_EFFORT_RELIABILITY_QOS == reliability_kind))
726-
{
727-
endpoints->writer.writer_->unsent_changes_reset();
728-
}
729724
}
730725
}
731726

Lines changed: 179 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,179 @@
1+
// Copyright 2025 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
/**
16+
* @file PDPStatelessWriter.cpp
17+
*/
18+
19+
#include <rtps/builtin/discovery/participant/simple/PDPStatelessWriter.hpp>
20+
21+
#include <algorithm>
22+
#include <cassert>
23+
#include <chrono>
24+
#include <cstdint>
25+
#include <mutex>
26+
#include <set>
27+
#include <vector>
28+
29+
#include <fastrtps/rtps/history/WriterHistory.h>
30+
#include <rtps/participant/RTPSParticipantImpl.h>
31+
32+
namespace eprosima {
33+
namespace fastrtps {
34+
namespace rtps {
35+
36+
PDPStatelessWriter::PDPStatelessWriter(
37+
RTPSParticipantImpl* participant,
38+
const GUID_t& guid,
39+
const WriterAttributes& attributes,
40+
fastdds::rtps::FlowController* flow_controller,
41+
WriterHistory* history,
42+
WriterListener* listener)
43+
: StatelessWriter(participant, guid, attributes, flow_controller, history, listener)
44+
, interested_readers_(participant->getRTPSParticipantAttributes().allocation.participants)
45+
{
46+
}
47+
48+
bool PDPStatelessWriter::matched_reader_add(
49+
const ReaderProxyData& data)
50+
{
51+
bool ret = StatelessWriter::matched_reader_add(data);
52+
if (ret)
53+
{
54+
// Mark new reader as interested
55+
add_interested_reader(data.guid());
56+
// Send announcement to new reader
57+
reschedule_all_samples();
58+
}
59+
return ret;
60+
}
61+
62+
bool PDPStatelessWriter::matched_reader_remove(
63+
const GUID_t& reader_guid)
64+
{
65+
bool ret = StatelessWriter::matched_reader_remove(reader_guid);
66+
if (ret)
67+
{
68+
// Mark reader as not interested
69+
remove_interested_reader(reader_guid);
70+
}
71+
return ret;
72+
}
73+
74+
void PDPStatelessWriter::unsent_change_added_to_history(
75+
CacheChange_t* change,
76+
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time)
77+
{
78+
mark_all_readers_interested();
79+
StatelessWriter::unsent_change_added_to_history(change, max_blocking_time);
80+
}
81+
82+
void PDPStatelessWriter::set_initial_peers(
83+
const fastdds::rtps::LocatorList& locator_list)
84+
{
85+
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
86+
87+
initial_peers_.push_back(locator_list);
88+
mp_RTPSParticipant->createSenderResources(initial_peers_);
89+
}
90+
91+
void PDPStatelessWriter::send_periodic_announcement()
92+
{
93+
mark_all_readers_interested();
94+
reschedule_all_samples();
95+
}
96+
97+
bool PDPStatelessWriter::send_to_fixed_locators(
98+
CDRMessage_t* message,
99+
std::chrono::steady_clock::time_point& max_blocking_time_point) const
100+
{
101+
bool ret = true;
102+
103+
if (should_reach_all_destinations_)
104+
{
105+
ret = initial_peers_.empty() ||
106+
mp_RTPSParticipant->sendSync(message, m_guid,
107+
Locators(initial_peers_.begin()), Locators(initial_peers_.end()),
108+
max_blocking_time_point);
109+
110+
if (ret)
111+
{
112+
fixed_locators_.clear();
113+
should_reach_all_destinations_ = false;
114+
}
115+
}
116+
else
117+
{
118+
interested_readers_.clear();
119+
}
120+
121+
return ret;
122+
}
123+
124+
bool PDPStatelessWriter::is_relevant(
125+
const CacheChange_t& /* change */,
126+
const GUID_t& reader_guid) const
127+
{
128+
return interested_readers_.end() !=
129+
std::find(interested_readers_.begin(), interested_readers_.end(), reader_guid);
130+
}
131+
132+
void PDPStatelessWriter::mark_all_readers_interested()
133+
{
134+
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
135+
should_reach_all_destinations_ = true;
136+
interested_readers_.clear();
137+
fixed_locators_.clear();
138+
fixed_locators_.push_back(initial_peers_);
139+
reader_data_filter(nullptr);
140+
}
141+
142+
void PDPStatelessWriter::add_interested_reader(
143+
const GUID_t& reader_guid)
144+
{
145+
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
146+
if (!should_reach_all_destinations_)
147+
{
148+
auto it = std::find(interested_readers_.begin(), interested_readers_.end(), reader_guid);
149+
if (it == interested_readers_.end())
150+
{
151+
interested_readers_.emplace_back(reader_guid);
152+
reader_data_filter(this);
153+
}
154+
}
155+
}
156+
157+
void PDPStatelessWriter::remove_interested_reader(
158+
const GUID_t& reader_guid)
159+
{
160+
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
161+
interested_readers_.remove(reader_guid);
162+
}
163+
164+
void PDPStatelessWriter::reschedule_all_samples()
165+
{
166+
std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
167+
size_t n_samples = mp_history->getHistorySize();
168+
if (0 < n_samples)
169+
{
170+
assert(1 == n_samples);
171+
auto it = mp_history->changesBegin();
172+
CacheChange_t* change = *it;
173+
flow_controller_->add_new_sample(this, change, std::chrono::steady_clock::now() + std::chrono::hours(24));
174+
}
175+
}
176+
177+
} // namespace rtps
178+
} // namespace fastdds
179+
} // namespace eprosima

0 commit comments

Comments
 (0)