Skip to content

Commit fc32e4f

Browse files
committed
rclcpp fix
1 parent 3862a43 commit fc32e4f

8 files changed

Lines changed: 204 additions & 39 deletions

File tree

.github/workflows/rmw-zenoh-rs.yml

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ on:
88
workflow_dispatch:
99
inputs:
1010
test_filter:
11-
description: 'CTest filter regex (e.g., test_publisher|test_node)'
11+
description: 'CTest filter regex (e.g., test_publisher|test_node) - empty for all rcl+rclcpp tests'
1212
required: false
1313
default: ''
1414

@@ -50,10 +50,18 @@ jobs:
5050
ref: patch-rmwz/jazzy
5151
path: rcl
5252

53+
- name: Checkout rclcpp fork
54+
uses: actions/checkout@v4
55+
with:
56+
repository: YuanYuYuan/rclcpp
57+
ref: patch-rmwz/jazzy
58+
path: rclcpp
59+
5360
- name: Setup workspace
5461
run: |
5562
mkdir -p ws/src
5663
cp -r $GITHUB_WORKSPACE/rcl ws/src/rcl
64+
cp -r $GITHUB_WORKSPACE/rclcpp ws/src/rclcpp
5765
5866
- name: Install Rust
5967
uses: actions-rust-lang/setup-rust-toolchain@v1
@@ -67,20 +75,31 @@ jobs:
6775
mv ${NU_PKG}/nu /usr/local/bin/
6876
chmod +x ros-z/scripts/test-ros-packages.nu
6977
70-
- name: Test ROS packages
78+
- name: Test RCL packages
79+
shell: bash
80+
run: |
81+
cd ros-z
82+
source /opt/ros/jazzy/setup.bash
83+
# Test rcl: run all tests
84+
nu scripts/test-ros-packages.nu \
85+
--ws-dir $GITHUB_WORKSPACE/ws \
86+
--rmw-path $GITHUB_WORKSPACE/ros-z/rmw-zenoh-rs \
87+
--packages rcl \
88+
--verbose \
89+
--exclude-filter "test_count_matched|test_events"
90+
91+
- name: Test RCLCPP packages (RMW-relevant tests only)
7192
shell: bash
7293
run: |
7394
cd ros-z
7495
source /opt/ros/jazzy/setup.bash
75-
TEST_FILTER="${{ github.event.inputs.test_filter }}"
76-
if [ -z "$TEST_FILTER" ]; then
77-
TEST_FILTER="rmw_zenoh_rs"
78-
fi
96+
# Test rclcpp: run only RMW-relevant tests (node, publisher, subscriber, service, client, graph)
7997
nu scripts/test-ros-packages.nu \
8098
--ws-dir $GITHUB_WORKSPACE/ws \
8199
--rmw-path $GITHUB_WORKSPACE/ros-z/rmw-zenoh-rs \
100+
--packages rclcpp \
82101
--verbose \
83-
--include-filter "$TEST_FILTER" \
102+
--include-filter "test_node|test_publisher|test_subscription|test_service|test_client|test_node_graph" \
84103
--exclude-filter "test_count_matched|test_events"
85104
86105
- name: Upload test results

rmw-zenoh-rs/CMakeLists.txt

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,25 @@ install(
7171
DESTINATION share/${PROJECT_NAME}
7272
)
7373

74-
ament_package()
74+
# Create a cmake extras file to export the library target
75+
# This is needed because the library is built by Cargo and doesn't exist at CMake configure time
76+
file(WRITE "${CMAKE_CURRENT_BINARY_DIR}/rmw_zenoh_rs-extras.cmake" "
77+
# Export rmw_zenoh_rs library
78+
if(NOT TARGET rmw_zenoh_rs::rmw_zenoh_rs)
79+
add_library(rmw_zenoh_rs::rmw_zenoh_rs SHARED IMPORTED)
80+
set_target_properties(rmw_zenoh_rs::rmw_zenoh_rs PROPERTIES
81+
IMPORTED_LOCATION \"\${rmw_zenoh_rs_DIR}/../../../lib/librmw_zenoh_rs.so\"
82+
)
83+
endif()
84+
# Also provide the library without namespace for ament_target_dependencies
85+
set(rmw_zenoh_rs_LIBRARIES \"\${rmw_zenoh_rs_DIR}/../../../lib/librmw_zenoh_rs.so\")
86+
")
87+
88+
install(
89+
FILES "${CMAKE_CURRENT_BINARY_DIR}/rmw_zenoh_rs-extras.cmake"
90+
DESTINATION share/${PROJECT_NAME}/cmake
91+
)
92+
93+
ament_package(
94+
CONFIG_EXTRAS "${CMAKE_CURRENT_BINARY_DIR}/rmw_zenoh_rs-extras.cmake"
95+
)
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
# Export rmw_zenoh_rs library
2+
# This file is included when find_package(rmw_zenoh_rs) is called
3+
4+
if(NOT TARGET rmw_zenoh_rs::rmw_zenoh_rs)
5+
add_library(rmw_zenoh_rs::rmw_zenoh_rs SHARED IMPORTED)
6+
set_target_properties(rmw_zenoh_rs::rmw_zenoh_rs PROPERTIES
7+
IMPORTED_LOCATION "${rmw_zenoh_rs_DIR}/../../../lib/librmw_zenoh_rs.so"
8+
)
9+
endif()
10+
11+
# Also provide the library without namespace for ament_target_dependencies
12+
set(rmw_zenoh_rs_LIBRARIES "${rmw_zenoh_rs_DIR}/../../../lib/librmw_zenoh_rs.so")

rmw-zenoh-rs/src/guard_condition.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@ impl GuardConditionImpl {
1818
.as_ref()
1919
.ok_or(())?;
2020
self.triggered = true;
21+
tracing::debug!("[GC] Guard condition triggered, calling notify_all()");
2122
notifier.notify_all();
23+
tracing::debug!("[GC] notify_all() called");
2224
Ok(())
2325
}
2426

2527
pub fn reset(&mut self) {
28+
tracing::debug!("[GC] Guard condition reset");
2629
self.triggered = false;
2730
}
2831
}

rmw-zenoh-rs/src/rmw.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -404,19 +404,24 @@ pub extern "C" fn rmw_create_subscription(
404404
let callback_holder_clone = callback_holder.clone();
405405
let user_data_holder_clone = user_data_holder.clone();
406406
let unread_count_clone = unread_count_holder.clone();
407+
let topic_name_for_log = topic_str.to_string();
407408
let notify_callback = move || {
409+
tracing::debug!("[notify_callback] Message received on topic '{}', notifying wait sets and checking for executor callback", topic_name_for_log);
408410
notifier_clone.notify_all();
409411
// Invoke the user callback if set, otherwise increment unread count
410412
if let Ok(cb) = callback_holder_clone.lock() {
411413
if let Some(callback_fn) = *cb {
414+
tracing::debug!("[notify_callback] Executor callback is set for topic '{}', invoking it", topic_name_for_log);
412415
if let Ok(user_data_usize) = user_data_holder_clone.lock() {
413416
unsafe {
414417
let user_data_ptr = *user_data_usize as *const std::ffi::c_void;
415418
callback_fn(user_data_ptr, 1); // 1 new message
416419
}
417420
}
421+
tracing::debug!("[notify_callback] Executor callback invoked successfully for topic '{}'", topic_name_for_log);
418422
} else {
419423
// No callback set, increment unread count
424+
tracing::debug!("[notify_callback] No executor callback set for topic '{}', incrementing unread count", topic_name_for_log);
420425
if let Ok(mut unread) = unread_count_clone.lock() {
421426
*unread += 1;
422427
}
@@ -1830,7 +1835,12 @@ pub extern "C" fn rmw_publisher_event_init(
18301835
// Convert RMW event type to Zenoh event type
18311836
let zenoh_event_type = match rmw_event_type_to_zenoh_event(event_type) {
18321837
Some(t) => t,
1833-
None => return RMW_RET_UNSUPPORTED as _,
1838+
None => {
1839+
let msg = std::ffi::CString::new(format!("Event type {} is not supported by rmw_zenoh_rs", event_type))
1840+
.unwrap_or_else(|_| std::ffi::CString::new("Event type is not supported").unwrap());
1841+
unsafe { crate::ros::rcutils_set_error_state(msg.as_ptr(), cfile!(), line!() as usize) };
1842+
return RMW_RET_UNSUPPORTED as _;
1843+
}
18341844
};
18351845

18361846
// Verify this is a publisher event type
@@ -1878,7 +1888,12 @@ pub extern "C" fn rmw_subscription_event_init(
18781888
// Convert RMW event type to Zenoh event type
18791889
let zenoh_event_type = match rmw_event_type_to_zenoh_event(event_type) {
18801890
Some(t) => t,
1881-
None => return RMW_RET_UNSUPPORTED as _,
1891+
None => {
1892+
let msg = std::ffi::CString::new(format!("Event type {} is not supported by rmw_zenoh_rs", event_type))
1893+
.unwrap_or_else(|_| std::ffi::CString::new("Event type is not supported").unwrap());
1894+
unsafe { crate::ros::rcutils_set_error_state(msg.as_ptr(), cfile!(), line!() as usize) };
1895+
return RMW_RET_UNSUPPORTED as _;
1896+
}
18821897
};
18831898

18841899
// Verify this is a subscription event type

rmw-zenoh-rs/src/type_support.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -133,17 +133,18 @@ impl MessageTypeSupport {
133133
}
134134

135135
pub unsafe fn serialize_message(&self, ros_message: *const c_void) -> Vec<u8> {
136-
// Get the serialized size and add extra buffer for safety
136+
// Get the serialized size (includes 4-byte CDR encapsulation header)
137137
let size = unsafe { self.get_serialized_size(ros_message) };
138-
// Add significant extra buffer to account for potential alignment, padding, and string overhead
139-
// For string messages, we need more space than get_serialized_size might report
138+
// Add extra buffer to account for potential alignment and padding
140139
let buffer_size = (size + 256).max(512);
141140
let mut out = vec![0u8; buffer_size];
142141
let res = unsafe { serialize_message(self.as_ref(), ros_message, &mut out) };
143142
if !res {
144143
tracing::warn!("Failed to run serialize_message");
145144
vec![]
146145
} else {
146+
// Truncate to actual serialized size
147+
out.truncate(size);
147148
out
148149
}
149150
}

rmw-zenoh-rs/src/wait_set.rs

Lines changed: 61 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,122 +27,168 @@ impl WaitSetImpl {
2727
pub fn wait(&self, timeout: &rmw_time_t) -> bool {
2828
use std::time::Duration;
2929

30+
tracing::debug!("[WAIT] wait() called with timeout: {}s {}ns", timeout.sec, timeout.nsec);
31+
3032
// If timeout is zero, check ready immediately and return
3133
if timeout.sec == 0 && timeout.nsec == 0 {
32-
return self.check_ready();
34+
let ready = self.check_ready();
35+
tracing::debug!("[WAIT] Zero timeout, check_ready returned: {}", ready);
36+
return ready;
3337
}
3438

3539
// Calculate timeout duration
3640
let timeout_duration = if timeout.sec == u64::MAX {
41+
tracing::debug!("[WAIT] Infinite timeout");
3742
None // Infinite wait
3843
} else {
39-
Some(Duration::from_secs(timeout.sec) + Duration::from_nanos(timeout.nsec))
44+
let dur = Duration::from_secs(timeout.sec) + Duration::from_nanos(timeout.nsec);
45+
tracing::debug!("[WAIT] Finite timeout: {:?}", dur);
46+
Some(dur)
4047
};
4148

49+
// CRITICAL: Check if anything is already ready BEFORE waiting
50+
// This handles the case where trigger() happened before wait()
51+
let already_ready = self.check_ready();
52+
tracing::debug!("[WAIT] Pre-wait check_ready: {}", already_ready);
53+
if already_ready {
54+
tracing::debug!("[WAIT] Already ready, returning immediately");
55+
return true;
56+
}
57+
4258
// Use the notifier's condition variable for efficient waiting
59+
tracing::debug!("[WAIT] Acquiring mutex lock...");
4360
let mut mutex_guard = self.notifier.mutex.lock();
61+
tracing::debug!("[WAIT] Mutex locked, entering wait loop");
4462

45-
// Always wait (at least try to) - this prevents busy loops when data is already present
46-
// We'll check ready status after waiting or timing out
63+
// Now wait for notification
4764
loop {
4865
if let Some(dur) = timeout_duration {
66+
tracing::debug!("[WAIT] Calling wait_for({:?})", dur);
4967
let wait_result = self.notifier.cv.wait_for(&mut mutex_guard, dur);
68+
tracing::debug!("[WAIT] wait_for returned, timed_out: {}", wait_result.timed_out());
5069

5170
// After wait (notification or timeout), check if anything is ready
5271
let is_ready = self.check_ready();
72+
tracing::debug!("[WAIT] Post-wait check_ready: {}", is_ready);
5373

5474
if is_ready {
75+
tracing::debug!("[WAIT] Ready after wait, returning true");
5576
return true;
5677
}
5778

5879
// Nothing ready
5980
if wait_result.timed_out() {
81+
tracing::debug!("[WAIT] Timed out, returning false");
6082
return false;
6183
}
6284

85+
tracing::debug!("[WAIT] Spurious wakeup, looping...");
6386
// Spurious wakeup - nothing ready yet, loop and wait again
6487
} else {
6588
// Infinite wait
89+
tracing::debug!("[WAIT] Calling wait (infinite)");
6690
self.notifier.cv.wait(&mut mutex_guard);
91+
tracing::debug!("[WAIT] Woke up from infinite wait");
6792

6893
// Check if anything is ready after waking
69-
if self.check_ready() {
94+
let is_ready = self.check_ready();
95+
tracing::debug!("[WAIT] Post-wake check_ready: {}", is_ready);
96+
if is_ready {
97+
tracing::debug!("[WAIT] Ready after wake, returning true");
7098
return true;
7199
}
100+
tracing::debug!("[WAIT] Not ready after wake, looping...");
72101
// If notified but nothing ready yet, loop and wait again
73102
}
74103
}
75104
}
76105

77106
fn check_ready(&self) -> bool {
107+
tracing::debug!("[WAIT] check_ready: {} subs, {} gcs, {} srvs, {} clients, {} events",
108+
self.subscriptions.len(), self.guard_conditions.len(), self.services.len(),
109+
self.clients.len(), self.events.len());
110+
78111
// Check subscriptions
79-
for sub_impl_ptr in &self.subscriptions {
112+
for (i, sub_impl_ptr) in self.subscriptions.iter().enumerate() {
80113
if sub_impl_ptr.is_null() {
81114
continue;
82115
}
83116
unsafe {
84117
let sub_impl = &*((*sub_impl_ptr) as *const _ as *const crate::pubsub::SubscriptionImpl);
85-
if sub_impl.is_ready() {
118+
let ready = sub_impl.is_ready();
119+
tracing::debug!("[WAIT] Subscription {}: ready={}", i, ready);
120+
if ready {
86121
return true;
87122
}
88123
}
89124
}
90125

91126
// Check guard conditions
92-
for gc_impl_ptr in &self.guard_conditions {
127+
for (i, gc_impl_ptr) in self.guard_conditions.iter().enumerate() {
93128
if gc_impl_ptr.is_null() {
129+
tracing::debug!("[WAIT] Guard condition {}: NULL", i);
94130
continue;
95131
}
96132
unsafe {
97133
let gc_impl = &*(*gc_impl_ptr as *const _ as *const crate::guard_condition::GuardConditionImpl);
98-
if gc_impl.is_ready() {
134+
let ready = gc_impl.is_ready();
135+
tracing::debug!("[WAIT] Guard condition {}: ready={}", i, ready);
136+
if ready {
137+
tracing::debug!("[WAIT] Guard condition {} is ready! Returning true", i);
99138
return true;
100139
}
101140
}
102141
}
103142

104143
// Check services
105-
for srv_impl_ptr in &self.services {
144+
for (i, srv_impl_ptr) in self.services.iter().enumerate() {
106145
if srv_impl_ptr.is_null() {
107146
continue;
108147
}
109148
unsafe {
110149
let srv_impl = &*(*srv_impl_ptr as *const _ as *const crate::service::ServiceImpl);
111-
if srv_impl.is_ready() {
150+
let ready = srv_impl.is_ready();
151+
tracing::debug!("[WAIT] Service {}: ready={}", i, ready);
152+
if ready {
112153
return true;
113154
}
114155
}
115156
}
116157

117158
// Check clients
118-
for cli_impl_ptr in &self.clients {
159+
for (i, cli_impl_ptr) in self.clients.iter().enumerate() {
119160
if cli_impl_ptr.is_null() {
120161
continue;
121162
}
122163
unsafe {
123164
let cli_impl = &*(*cli_impl_ptr as *const _ as *const crate::service::ClientImpl);
124-
if cli_impl.is_ready() {
165+
let ready = cli_impl.is_ready();
166+
tracing::debug!("[WAIT] Client {}: ready={}", i, ready);
167+
if ready {
125168
return true;
126169
}
127170
}
128171
}
129172

130173
// Check events
131-
for event_ptr in &self.events {
174+
for (i, event_ptr) in self.events.iter().enumerate() {
132175
if event_ptr.is_null() {
133176
continue;
134177
}
135178
unsafe {
136179
let event = &*(*event_ptr);
137180
if !event.data.is_null() {
138181
let event_handle = &*(event.data as *const ros_z::event::RmEventHandle);
139-
if event_handle.is_ready() {
182+
let ready = event_handle.is_ready();
183+
tracing::debug!("[WAIT] Event {}: ready={}", i, ready);
184+
if ready {
140185
return true;
141186
}
142187
}
143188
}
144189
}
145190

191+
tracing::debug!("[WAIT] check_ready: nothing ready");
146192
false
147193
}
148194
}

0 commit comments

Comments
 (0)