@@ -57,23 +57,27 @@ ReaderProxy::ReaderProxy(
57
57
, nack_supression_event_(nullptr )
58
58
, initial_heartbeat_event_(nullptr )
59
59
, timers_enabled_(false )
60
- , last_acknack_count_ (0 )
60
+ , next_expected_acknack_count_ (0 )
61
61
, last_nackfrag_count_(0 )
62
62
{
63
- nack_supression_event_ = new TimedEvent (writer_->getRTPSParticipant ()->getEventResource (),
64
- [&]() -> bool
65
- {
66
- writer_->perform_nack_supression (guid ());
67
- return false ;
68
- },
69
- TimeConv::Time_t2MilliSecondsDouble (times.nackSupressionDuration ));
63
+ auto participant = writer_->getRTPSParticipant ();
64
+ if (nullptr != participant)
65
+ {
66
+ nack_supression_event_ = new TimedEvent (participant->getEventResource (),
67
+ [&]() -> bool
68
+ {
69
+ writer_->perform_nack_supression (guid ());
70
+ return false ;
71
+ },
72
+ TimeConv::Time_t2MilliSecondsDouble (times.nackSupressionDuration ));
70
73
71
- initial_heartbeat_event_ = new TimedEvent (writer_->getRTPSParticipant ()->getEventResource (),
72
- [&]() -> bool
73
- {
74
- writer_->intraprocess_heartbeat (this );
75
- return false ;
76
- }, 0 );
74
+ initial_heartbeat_event_ = new TimedEvent (participant->getEventResource (),
75
+ [&]() -> bool
76
+ {
77
+ writer_->intraprocess_heartbeat (this );
78
+ return false ;
79
+ }, 0 );
80
+ }
77
81
78
82
stop ();
79
83
}
@@ -135,7 +139,7 @@ void ReaderProxy::start(
135
139
}
136
140
137
141
timers_enabled_.store (is_remote_and_reliable ());
138
- if (is_local_reader ())
142
+ if (is_local_reader () && initial_heartbeat_event_ )
139
143
{
140
144
initial_heartbeat_event_->restart_timer ();
141
145
}
@@ -166,32 +170,38 @@ void ReaderProxy::stop()
166
170
disable_timers ();
167
171
168
172
changes_for_reader_.clear ();
169
- last_acknack_count_ = 0 ;
173
+ next_expected_acknack_count_ = 0 ;
170
174
last_nackfrag_count_ = 0 ;
171
175
changes_low_mark_ = SequenceNumber_t ();
172
176
}
173
177
174
178
void ReaderProxy::disable_timers ()
175
179
{
176
- if (timers_enabled_.exchange (false ))
180
+ if (timers_enabled_.exchange (false ) && nack_supression_event_ )
177
181
{
178
182
nack_supression_event_->cancel_timer ();
179
183
}
180
- initial_heartbeat_event_->cancel_timer ();
184
+ if (initial_heartbeat_event_)
185
+ {
186
+ initial_heartbeat_event_->cancel_timer ();
187
+ }
181
188
}
182
189
183
190
void ReaderProxy::update_nack_supression_interval (
184
191
const Duration_t& interval)
185
192
{
186
- nack_supression_event_->update_interval (interval);
193
+ if (nack_supression_event_)
194
+ {
195
+ nack_supression_event_->update_interval (interval);
196
+ }
187
197
}
188
198
189
199
void ReaderProxy::add_change (
190
200
const ChangeForReader_t& change,
191
201
bool is_relevant,
192
202
bool restart_nack_supression)
193
203
{
194
- if (restart_nack_supression && timers_enabled_.load ())
204
+ if (restart_nack_supression && timers_enabled_.load () && nack_supression_event_ )
195
205
{
196
206
nack_supression_event_->restart_timer ();
197
207
}
@@ -205,7 +215,7 @@ void ReaderProxy::add_change(
205
215
bool restart_nack_supression,
206
216
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time)
207
217
{
208
- if (restart_nack_supression && timers_enabled_)
218
+ if (restart_nack_supression && timers_enabled_ && nack_supression_event_ )
209
219
{
210
220
nack_supression_event_->restart_timer (max_blocking_time);
211
221
}
@@ -459,7 +469,7 @@ void ReaderProxy::from_unsent_to_status(
459
469
// It will use acked_changes_set().
460
470
assert (is_reliable_);
461
471
462
- if (restart_nack_supression && is_remote_and_reliable ())
472
+ if (restart_nack_supression && is_remote_and_reliable () && nack_supression_event_ )
463
473
{
464
474
assert (timers_enabled_.load ());
465
475
nack_supression_event_->restart_timer ();
0 commit comments