Skip to content

Commit d62583d

Browse files
authored
Async Workers (#446)
* Squash async worker changes Signed-off-by: Michael X. Grey <[email protected]> Signed-off-by: Michael X. Grey <[email protected]> * Fix bug in logging macros Signed-off-by: Michael X. Grey <[email protected]> * Clean up examples using new API Signed-off-by: Michael X. Grey <[email protected]> * Ensure workers wake up when a task is run Signed-off-by: Michael X. Grey <[email protected]> * Iterating on documentation Signed-off-by: Michael X. Grey <[email protected]> * Update docs for subscriptions Signed-off-by: Michael X. Grey <[email protected]> * Fix create_worker bug Signed-off-by: Michael X. Grey <[email protected]> * Add more documentation for Worker Signed-off-by: Michael X. Grey <[email protected]> * Fix all todos that can be addressed for now Signed-off-by: Michael X. Grey <[email protected]> * Add introductory documentation and a parameter demo Signed-off-by: Michael X. Grey <[email protected]> * Add documentation for parameters Signed-off-by: Michael X. Grey <[email protected]> * Fix formatting Signed-off-by: Michael X. Grey <[email protected]> * Remove residual directory Signed-off-by: Michael X. Grey <[email protected]> * Port over packaging fix Signed-off-by: Michael X. Grey <[email protected]> * Fix formatting for examples Signed-off-by: Michael X. Grey <[email protected]> * Fix distro divergence in guid type Signed-off-by: Michael X. Grey <[email protected]> * Fix compilation bug in 1.75 Signed-off-by: Michael X. Grey <[email protected]> * Force backtrace version that is compatible with 1.75 Signed-off-by: Michael X. Grey <[email protected]> --------- Signed-off-by: Michael X. Grey <[email protected]> Signed-off-by: Michael X. Grey <[email protected]>
1 parent 205b84b commit d62583d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+6680
-2405
lines changed

examples/logging_demo/Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
[package]
2+
name = "logging_demo"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
rclrs = "0.4"
8+
example_interfaces = "*"

examples/logging_demo/package.xml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
<?xml version="1.0"?>
2+
<?xml-model
3+
href="http://download.ros.org/schema/package_format3.xsd"
4+
schematypens="http://www.w3.org/2001/XMLSchema"?>
5+
<package format="3">
6+
<name>examples_logging_demo</name>
7+
<maintainer email="[email protected]">Esteve Fernandez</maintainer>
8+
<!-- This project is not military-sponsored, Jacob's employment contract just requires him to use this email address -->
9+
<maintainer email="[email protected]">Jacob Hassold</maintainer>
10+
<version>0.4.1</version>
11+
<description>Package containing an example of how to use a worker in rclrs.</description>
12+
<license>Apache License 2.0</license>
13+
14+
<depend>rclrs</depend>
15+
<depend>rosidl_runtime_rs</depend>
16+
<depend>example_interfaces</depend>
17+
18+
<export>
19+
<build_type>ament_cargo</build_type>
20+
</export>
21+
</package>

examples/logging_demo/src/main.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
use rclrs::*;
2+
use std::time::Duration;
3+
4+
fn main() -> Result<(), RclrsError> {
5+
let mut executor = Context::default_from_env()?.create_basic_executor();
6+
let node = executor.create_node("logging_demo")?;
7+
8+
let _subscription = node.clone().create_subscription(
9+
"logging_demo",
10+
move |msg: example_interfaces::msg::String| {
11+
let data = msg.data;
12+
13+
// You can apply modifiers such as .once() to node.logger()
14+
// to dictate how the logging behaves.
15+
log!(node.logger().once(), "First message: {data}",);
16+
17+
log!(node.logger().skip_first(), "Subsequent message: {data}",);
18+
19+
// You can chain multiple modifiers together.
20+
log_warn!(
21+
node.logger().skip_first().throttle(Duration::from_secs(5)),
22+
"Throttled message: {data}",
23+
);
24+
},
25+
)?;
26+
27+
// Any &str can be used as the logger name and have
28+
// logging modifiers applied to it.
29+
log_info!(
30+
"notice".once(),
31+
"Ready to begin logging example_interfaces/msg/String messages published to 'logging_demo'.",
32+
);
33+
log_warn!(
34+
"help",
35+
"Try running\n \
36+
$ ros2 topic pub logging_demo example_interfaces/msg/String \"data: message\"",
37+
);
38+
executor.spin(SpinOptions::default()).first_error()
39+
}
Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,31 @@
11
use anyhow::{Error, Result};
2+
use example_interfaces::srv::*;
23
use rclrs::*;
34

45
fn main() -> Result<(), Error> {
56
let mut executor = Context::default_from_env()?.create_basic_executor();
67

78
let node = executor.create_node("minimal_client")?;
89

9-
let client = node.create_client::<example_interfaces::srv::AddTwoInts>("add_two_ints")?;
10+
let client = node.create_client::<AddTwoInts>("add_two_ints")?;
1011

11-
let request = example_interfaces::srv::AddTwoInts_Request { a: 41, b: 1 };
12+
let promise = executor.commands().run(async move {
13+
println!("Waiting for service...");
14+
client.notify_on_service_ready().await.unwrap();
1215

13-
println!("Starting client");
16+
let request = AddTwoInts_Request { a: 41, b: 1 };
1417

15-
while !client.service_is_ready()? {
16-
std::thread::sleep(std::time::Duration::from_millis(10));
17-
}
18+
println!("Waiting for response");
19+
let response: AddTwoInts_Response = client.call(&request).unwrap().await.unwrap();
1820

19-
client.async_send_request_with_callback(
20-
&request,
21-
move |response: example_interfaces::srv::AddTwoInts_Response| {
22-
println!(
23-
"Result of {} + {} is: {}",
24-
request.a, request.b, response.sum
25-
);
26-
},
27-
)?;
21+
println!(
22+
"Result of {} + {} is: {}",
23+
request.a, request.b, response.sum,
24+
);
25+
});
2826

29-
std::thread::sleep(std::time::Duration::from_millis(500));
30-
31-
println!("Waiting for response");
3227
executor
33-
.spin(SpinOptions::default())
34-
.first_error()
35-
.map_err(|err| err.into())
28+
.spin(SpinOptions::new().until_promise_resolved(promise))
29+
.first_error()?;
30+
Ok(())
3631
}
Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use anyhow::{Error, Result};
2+
use example_interfaces::srv::*;
23
use rclrs::*;
34

45
#[tokio::main]
@@ -7,28 +8,28 @@ async fn main() -> Result<(), Error> {
78

89
let node = executor.create_node("minimal_client")?;
910

10-
let client = node.create_client::<example_interfaces::srv::AddTwoInts>("add_two_ints")?;
11+
let client = node.create_client::<AddTwoInts>("add_two_ints")?;
1112

1213
println!("Starting client");
1314

1415
while !client.service_is_ready()? {
1516
std::thread::sleep(std::time::Duration::from_millis(10));
1617
}
1718

18-
let request = example_interfaces::srv::AddTwoInts_Request { a: 41, b: 1 };
19+
let request = AddTwoInts_Request { a: 41, b: 1 };
1920

20-
let future = client.call_async(&request);
21+
let promise = client
22+
.call_then(&request, move |response: AddTwoInts_Response| {
23+
println!(
24+
"Result of {} + {} is: {}",
25+
request.a, request.b, response.sum,
26+
);
27+
})
28+
.unwrap();
2129

2230
println!("Waiting for response");
23-
24-
let rclrs_spin = tokio::task::spawn_blocking(move || executor.spin(SpinOptions::default()));
25-
26-
let response = future.await?;
27-
println!(
28-
"Result of {} + {} is: {}",
29-
request.a, request.b, response.sum
30-
);
31-
32-
rclrs_spin.await.ok();
31+
executor
32+
.spin(SpinOptions::new().until_promise_resolved(promise))
33+
.first_error()?;
3334
Ok(())
3435
}
Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
use anyhow::{Error, Result};
2+
use example_interfaces::srv::*;
23
use rclrs::*;
34

4-
fn handle_service(
5-
_request_header: &rclrs::rmw_request_id_t,
6-
request: example_interfaces::srv::AddTwoInts_Request,
7-
) -> example_interfaces::srv::AddTwoInts_Response {
8-
println!("request: {} + {}", request.a, request.b);
9-
example_interfaces::srv::AddTwoInts_Response {
5+
fn handle_service(request: AddTwoInts_Request, info: ServiceInfo) -> AddTwoInts_Response {
6+
let timestamp = info
7+
.received_timestamp
8+
.map(|t| format!(" at [{t:?}]"))
9+
.unwrap_or(String::new());
10+
11+
println!("request{timestamp}: {} + {}", request.a, request.b);
12+
AddTwoInts_Response {
1013
sum: request.a + request.b,
1114
}
1215
}
@@ -16,12 +19,9 @@ fn main() -> Result<(), Error> {
1619

1720
let node = executor.create_node("minimal_service")?;
1821

19-
let _server = node
20-
.create_service::<example_interfaces::srv::AddTwoInts, _>("add_two_ints", handle_service)?;
22+
let _server = node.create_service::<AddTwoInts, _>("add_two_ints", handle_service)?;
2123

2224
println!("Starting server");
23-
executor
24-
.spin(SpinOptions::default())
25-
.first_error()
26-
.map_err(|err| err.into())
25+
executor.spin(SpinOptions::default()).first_error()?;
26+
Ok(())
2727
}

examples/minimal_pub_sub/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ path = "src/zero_copy_publisher.rs"
2929
anyhow = {version = "1", features = ["backtrace"]}
3030
rclrs = "0.4"
3131
rosidl_runtime_rs = "0.4"
32-
std_msgs = "*"
32+
example_interfaces = "*"
3333

3434
# This specific version is compatible with Rust 1.75
3535
backtrace = "=0.3.74"

examples/minimal_pub_sub/package.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@
1414

1515
<build_depend>rclrs</build_depend>
1616
<build_depend>rosidl_runtime_rs</build_depend>
17-
<build_depend>std_msgs</build_depend>
17+
<build_depend>example_interfaces</build_depend>
1818

1919
<exec_depend>rclrs</exec_depend>
2020
<exec_depend>rosidl_runtime_rs</exec_depend>
21-
<exec_depend>std_msgs</exec_depend>
21+
<exec_depend>example_interfaces</exec_depend>
2222

2323
<export>
2424
<build_type>ament_cargo</build_type>

examples/minimal_pub_sub/src/minimal_publisher.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@ fn main() -> Result<(), Error> {
77

88
let node = executor.create_node("minimal_publisher")?;
99

10-
let publisher = node.create_publisher::<std_msgs::msg::String>("topic")?;
10+
let publisher = node.create_publisher::<example_interfaces::msg::String>("topic")?;
1111

12-
let mut message = std_msgs::msg::String::default();
12+
let mut message = example_interfaces::msg::String::default();
1313

1414
let mut publish_count: u32 = 1;
1515

examples/minimal_pub_sub/src/minimal_subscriber.rs

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,19 +7,16 @@ fn main() -> Result<(), Error> {
77

88
let node = executor.create_node("minimal_subscriber")?;
99

10-
let mut num_messages: usize = 0;
11-
12-
let _subscription = node.create_subscription::<std_msgs::msg::String, _>(
10+
let worker = node.create_worker::<usize>(0);
11+
let _subscription = worker.create_subscription::<example_interfaces::msg::String, _>(
1312
"topic",
14-
move |msg: std_msgs::msg::String| {
15-
num_messages += 1;
16-
println!("I heard: '{}'", msg.data);
17-
println!("(Got {} messages so far)", num_messages);
13+
move |num_messages: &mut usize, msg: example_interfaces::msg::String| {
14+
*num_messages += 1;
15+
println!("#{} | I heard: '{}'", *num_messages, msg.data);
1816
},
1917
)?;
2018

21-
executor
22-
.spin(SpinOptions::default())
23-
.first_error()
24-
.map_err(|err| err.into())
19+
println!("Waiting for messages...");
20+
executor.spin(SpinOptions::default()).first_error()?;
21+
Ok(())
2522
}

0 commit comments

Comments
 (0)