Skip to content

Commit 38a6a92

Browse files
authored
Wake up wait set when adding a new waitable (#505)
* Wake up wait set when adding waitable Signed-off-by: Michael X. Grey <[email protected]> * Add test for subscribing while spinning Signed-off-by: Michael X. Grey <[email protected]> * prevent subscription from dropping Signed-off-by: Michael X. Grey <[email protected]> * Turn off parameter services to make the test more reliable Signed-off-by: Michael X. Grey <[email protected]> * Fix formatting Signed-off-by: Michael X. Grey <[email protected]> * Trigger CI Signed-off-by: Michael X. Grey <[email protected]> --------- Signed-off-by: Michael X. Grey <[email protected]>
1 parent d16172a commit 38a6a92

File tree

5 files changed

+96
-13
lines changed

5 files changed

+96
-13
lines changed

rclrs/src/executor.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ impl ExecutorCommands {
241241
}
242242

243243
pub(crate) fn add_to_wait_set(&self, waitable: Waitable) {
244-
self.async_worker_commands.channel.add_to_waitset(waitable);
244+
self.async_worker_commands.add_to_wait_set(waitable);
245245
}
246246

247247
#[cfg(test)]
@@ -275,7 +275,7 @@ impl ExecutorCommands {
275275
guard_condition: Arc::clone(&guard_condition),
276276
});
277277

278-
worker_channel.add_to_waitset(waitable);
278+
worker_channel.add_to_wait_set(waitable);
279279

280280
Arc::new(WorkerCommands {
281281
channel: worker_channel,
@@ -296,7 +296,8 @@ pub(crate) struct WorkerCommands {
296296

297297
impl WorkerCommands {
298298
pub(crate) fn add_to_wait_set(&self, waitable: Waitable) {
299-
self.channel.add_to_waitset(waitable);
299+
self.channel.add_to_wait_set(waitable);
300+
let _ = self.wakeup_wait_set.trigger();
300301
}
301302

302303
pub(crate) fn run_async<F>(&self, f: F)
@@ -327,7 +328,7 @@ pub trait WorkerChannel: Send + Sync {
327328
fn add_async_task(&self, f: BoxFuture<'static, ()>);
328329

329330
/// Add new entities to the waitset of the executor.
330-
fn add_to_waitset(&self, new_entity: Waitable);
331+
fn add_to_wait_set(&self, new_entity: Waitable);
331332

332333
/// Send a one-time task for the worker to run with its payload.
333334
fn send_payload_task(&self, f: PayloadTask);

rclrs/src/executor/basic_executor.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ struct BasicWorkerChannel {
308308
}
309309

310310
impl WorkerChannel for BasicWorkerChannel {
311-
fn add_to_waitset(&self, new_entity: Waitable) {
311+
fn add_to_wait_set(&self, new_entity: Waitable) {
312312
if let Err(err) = self.waitable_sender.unbounded_send(new_entity) {
313313
// This is a debug log because it is normal for this to happen while
314314
// an executor is winding down.

rclrs/src/node.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -385,13 +385,13 @@ impl NodeState {
385385
/// ```
386386
///
387387
pub fn create_publisher<'a, T>(
388-
&self,
388+
self: &Arc<Self>,
389389
options: impl Into<PublisherOptions<'a>>,
390390
) -> Result<Publisher<T>, RclrsError>
391391
where
392392
T: Message,
393393
{
394-
PublisherState::<T>::create(options, Arc::clone(&self.handle))
394+
PublisherState::<T>::create(options, Arc::clone(self))
395395
}
396396

397397
/// Creates a [`Service`] with an ordinary callback.

rclrs/src/publisher.rs

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use crate::{
1111
error::{RclrsError, ToResult},
1212
qos::QoSProfile,
1313
rcl_bindings::*,
14-
IntoPrimitiveOptions, NodeHandle, ENTITY_LIFECYCLE_MUTEX,
14+
IntoPrimitiveOptions, Node, Promise, ENTITY_LIFECYCLE_MUTEX,
1515
};
1616

1717
mod loaned_message;
@@ -28,12 +28,14 @@ unsafe impl Send for rcl_publisher_t {}
2828
/// [1]: <https://doc.rust-lang.org/reference/destructors.html>
2929
struct PublisherHandle {
3030
rcl_publisher: Mutex<rcl_publisher_t>,
31-
node_handle: Arc<NodeHandle>,
31+
/// We store the whole node here because we use some of its user-facing API
32+
/// in some of the Publisher methods.
33+
node: Node,
3234
}
3335

3436
impl Drop for PublisherHandle {
3537
fn drop(&mut self) {
36-
let mut rcl_node = self.node_handle.rcl_node.lock().unwrap();
38+
let mut rcl_node = self.node.handle().rcl_node.lock().unwrap();
3739
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
3840
// SAFETY: The entity lifecycle mutex is locked to protect against the risk of
3941
// global variables in the rmw implementation being unsafely modified during cleanup.
@@ -97,7 +99,7 @@ where
9799
/// Node and namespace changes are always applied _before_ topic remapping.
98100
pub(crate) fn create<'a>(
99101
options: impl Into<PublisherOptions<'a>>,
100-
node_handle: Arc<NodeHandle>,
102+
node: Node,
101103
) -> Result<Arc<Self>, RclrsError>
102104
where
103105
T: Message,
@@ -117,7 +119,7 @@ where
117119
publisher_options.qos = qos.into();
118120

119121
{
120-
let rcl_node = node_handle.rcl_node.lock().unwrap();
122+
let rcl_node = node.handle().rcl_node.lock().unwrap();
121123
let _lifecycle_lock = ENTITY_LIFECYCLE_MUTEX.lock().unwrap();
122124
unsafe {
123125
// SAFETY:
@@ -142,7 +144,7 @@ where
142144
message: PhantomData,
143145
handle: PublisherHandle {
144146
rcl_publisher: Mutex::new(rcl_publisher),
145-
node_handle,
147+
node,
146148
},
147149
}))
148150
}
@@ -177,6 +179,17 @@ where
177179
Ok(subscription_count)
178180
}
179181

182+
/// Get a promise that will be fulfilled when at least one subscriber is
183+
/// listening to this publisher.
184+
pub fn notify_on_subscriber_ready(self: &Arc<PublisherState<T>>) -> Promise<()> {
185+
let publisher = Arc::clone(self);
186+
self.handle.node.notify_on_graph_change(move || {
187+
publisher
188+
.get_subscription_count()
189+
.is_ok_and(|count| count > 0)
190+
})
191+
}
192+
180193
/// Publishes a message.
181194
///
182195
/// The [`MessageCow`] trait is implemented by any

rclrs/src/subscription.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -520,4 +520,73 @@ mod tests {
520520
assert!(start_time.elapsed() < std::time::Duration::from_secs(10));
521521
}
522522
}
523+
524+
#[test]
525+
fn test_delayed_subscription() {
526+
use crate::*;
527+
use example_interfaces::msg::Empty;
528+
use futures::{
529+
channel::{mpsc, oneshot},
530+
StreamExt,
531+
};
532+
use std::sync::atomic::{AtomicBool, Ordering};
533+
534+
let mut executor = Context::default().create_basic_executor();
535+
let node = executor
536+
.create_node(
537+
format!("test_delayed_subscription_{}", line!())
538+
// We need to turn off parameter services because their activity will
539+
// wake up the wait set, which defeats the purpose of this test.
540+
.start_parameter_services(false),
541+
)
542+
.unwrap();
543+
544+
let (promise, receiver) = oneshot::channel();
545+
let promise = Arc::new(Mutex::new(Some(promise)));
546+
547+
let success = Arc::new(AtomicBool::new(false));
548+
let send_success = Arc::clone(&success);
549+
550+
let publisher = node.create_publisher("test_delayed_subscription").unwrap();
551+
552+
let commands = Arc::clone(executor.commands());
553+
std::thread::spawn(move || {
554+
// Wait a little while so the executor can start spinning and guard
555+
// conditions can settle down.
556+
std::thread::sleep(std::time::Duration::from_millis(10));
557+
558+
let _ = commands.run(async move {
559+
let (sender, mut receiver) = mpsc::unbounded();
560+
let _subscription = node
561+
.create_subscription("test_delayed_subscription", move |_: Empty| {
562+
let _ = sender.unbounded_send(());
563+
})
564+
.unwrap();
565+
566+
// Make sure the message doesn't get dropped due to the subscriber
567+
// not being connected yet.
568+
let _ = publisher.notify_on_subscriber_ready().await;
569+
570+
// Publish the message, which should trigger the executor to stop spinning
571+
publisher.publish(Empty::default()).unwrap();
572+
573+
if let Some(_) = receiver.next().await {
574+
send_success.store(true, Ordering::Release);
575+
if let Some(promise) = promise.lock().unwrap().take() {
576+
promise.send(()).unwrap();
577+
}
578+
}
579+
});
580+
});
581+
582+
let r = executor.spin(
583+
SpinOptions::default()
584+
.until_promise_resolved(receiver)
585+
.timeout(std::time::Duration::from_secs(10)),
586+
);
587+
588+
assert!(r.is_empty(), "{r:?}");
589+
let message_was_received = success.load(Ordering::Acquire);
590+
assert!(message_was_received);
591+
}
523592
}

0 commit comments

Comments
 (0)