Skip to content

Commit 9ea8c8f

Browse files
feat: publish top level msg when error is received
1 parent 24fb79a commit 9ea8c8f

File tree

4 files changed

+148
-1
lines changed

4 files changed

+148
-1
lines changed

diagnostic_aggregator/CMakeLists.txt

+5
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,11 @@ if(BUILD_TESTING)
123123
ENV
124124
)
125125
endforeach()
126+
127+
add_launch_test(
128+
test/test_critical_pub.py
129+
TIMEOUT 30
130+
)
126131
endif()
127132

128133
install(

diagnostic_aggregator/include/diagnostic_aggregator/aggregator.hpp

+10
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,16 @@ class Aggregator
152152

153153
std::string base_path_; /**< \brief Prepended to all status names of aggregator. */
154154

155+
/*!
156+
*\brief If true, aggregator will publish an error immediately after receiving.
157+
*/
158+
bool critical_;
159+
160+
/*!
161+
*\brief Store the last top level value to publish the critical error only once.
162+
*/
163+
std::uint8_t last_top_level_state_;
164+
155165
/// Records all ROS warnings. No warnings are repeated.
156166
std::set<std::string> ros_warnings_;
157167

diagnostic_aggregator/src/aggregator.cpp

+27-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,9 @@ Aggregator::Aggregator()
6464
pub_rate_(1.0),
6565
history_depth_(1000),
6666
clock_(n_->get_clock()),
67-
base_path_("")
67+
base_path_(""),
68+
critical_(false),
69+
last_top_level_state_(DiagnosticStatus::STALE)
6870
{
6971
RCLCPP_DEBUG(logger_, "constructor");
7072
bool other_as_errors = false;
@@ -88,12 +90,16 @@ Aggregator::Aggregator()
8890
other_as_errors = param.second.as_bool();
8991
} else if (param.first.compare("history_depth") == 0) {
9092
history_depth_ = param.second.as_int();
93+
} else if (param.first.compare("critical") == 0) {
94+
critical_ = param.second.as_bool();
9195
}
9296
}
9397
RCLCPP_DEBUG(logger_, "Aggregator publication rate configured to: %f", pub_rate_);
9498
RCLCPP_DEBUG(logger_, "Aggregator base path configured to: %s", base_path_.c_str());
9599
RCLCPP_DEBUG(
96100
logger_, "Aggregator other_as_errors configured to: %s", (other_as_errors ? "true" : "false"));
101+
RCLCPP_DEBUG(
102+
logger_, "Aggregator critical publisher configured to: %s", (critical_ ? "true" : "false"));
97103

98104
analyzer_group_ = std::make_unique<AnalyzerGroup>();
99105
if (!analyzer_group_->init(base_path_, "", n_)) {
@@ -149,6 +155,24 @@ void Aggregator::diagCallback(const DiagnosticArray::SharedPtr diag_msg)
149155
std::lock_guard<std::mutex> lock(mutex_);
150156
for (auto j = 0u; j < diag_msg->status.size(); ++j) {
151157
analyzed = false;
158+
159+
const bool top_level_state_transition_to_error =
160+
(last_top_level_state_ != DiagnosticStatus::ERROR) &&
161+
(diag_msg->status[j].level == DiagnosticStatus::ERROR);
162+
163+
if (critical_ && top_level_state_transition_to_error) {
164+
RCLCPP_DEBUG(
165+
logger_, "Received error message: %s, publishing error immediately",
166+
diag_msg->status[j].name.c_str());
167+
DiagnosticStatus diag_toplevel_state;
168+
diag_toplevel_state.name = "toplevel_state_critical";
169+
diag_toplevel_state.level = diag_msg->status[j].level;
170+
toplevel_state_pub_->publish(diag_toplevel_state);
171+
172+
// store the last published state
173+
last_top_level_state_ = diag_toplevel_state.level;
174+
}
175+
152176
auto item = std::make_shared<StatusItem>(&diag_msg->status[j]);
153177

154178
if (analyzer_group_->match(item->getName())) {
@@ -217,6 +241,8 @@ void Aggregator::publishData()
217241
// have stale items but not all are stale
218242
diag_toplevel_state.level = DiagnosticStatus::ERROR;
219243
}
244+
last_top_level_state_ = diag_toplevel_state.level;
245+
220246
toplevel_state_pub_->publish(diag_toplevel_state);
221247
}
222248

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
import unittest
2+
3+
import pytest
4+
import rclpy
5+
6+
from launch import LaunchDescription
7+
from launch_ros.actions import Node
8+
from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus
9+
from launch_testing.actions import ReadyToTest
10+
11+
12+
13+
@pytest.mark.launch_test
14+
def generate_test_description():
15+
# Launch the aggregator
16+
parameters = [{'analyzers.test.type': 'diagnostic_aggregator/GenericAnalyzer'},
17+
{'analyzers.test.path': 'Test'},
18+
{'analyzers.test.contains': ['test']},
19+
{'path': 'Base'},
20+
{'critical': True}]
21+
22+
aggregator_cmd = Node(
23+
package='diagnostic_aggregator',
24+
executable='aggregator_node',
25+
name='diagnostic_aggregator',
26+
parameters=parameters)
27+
28+
ld = LaunchDescription()
29+
30+
# Launch the node
31+
ld.add_action(aggregator_cmd)
32+
ld.add_action(ReadyToTest())
33+
return ld
34+
35+
36+
class TestProcessOutput(unittest.TestCase):
37+
38+
@ classmethod
39+
def setUpClass(cls):
40+
# Initialize the ROS context for the test node
41+
rclpy.init()
42+
43+
@ classmethod
44+
def tearDownClass(cls):
45+
# Shutdown the ROS context
46+
rclpy.shutdown()
47+
48+
def setUp(self):
49+
# Create a ROS node for tests
50+
self.node = rclpy.create_node('test_node')
51+
self.publisher = self.node.create_publisher(DiagnosticArray, '/diagnostics', 1)
52+
self.subscriber = self.node.create_subscription(
53+
DiagnosticStatus,
54+
'/diagnostics_toplevel_state',
55+
lambda msg: self.received_state.append(msg.level),
56+
1)
57+
self.received_state = []
58+
59+
def tearDown(self):
60+
self.node.destroy_node()
61+
62+
def publish_message(self, level):
63+
msg = DiagnosticArray()
64+
msg.status.append(DiagnosticStatus())
65+
msg.status[0].level = level
66+
msg.status[0].name = 'test status'
67+
self.publisher.publish(msg)
68+
while len(self.received_state) == 0:
69+
rclpy.spin_once(self.node)
70+
71+
return self.node.get_clock().now()
72+
73+
def test_ok_state(self):
74+
# Publish the ok message and wait till the toplevel state is received
75+
state = DiagnosticStatus.OK
76+
time_0 = self.publish_message(state)
77+
78+
assert (self.received_state[0] == state), \
79+
'Received state is not the same as the sent state'
80+
self.received_state.clear()
81+
82+
# Publish the ok message and expect the toplevel state after 1 second period
83+
time_1 = self.publish_message(state)
84+
assert (time_1 - time_0 > rclpy.duration.Duration(seconds=0.99)), \
85+
'OK message received too early'
86+
assert (self.received_state[0] == state), \
87+
'Received state is not the same as the sent state'
88+
self.received_state.clear()
89+
90+
# Publish the message and expect the critical error message immediately
91+
state = DiagnosticStatus.ERROR
92+
time_2 = self.publish_message(state)
93+
94+
assert (time_2 - time_1 < rclpy.duration.Duration(seconds=0.1)), \
95+
'Critical error message not received within 0.1 second'
96+
assert (self.received_state[0] == state), \
97+
'Received state is not the same as the sent state'
98+
self.received_state.clear()
99+
100+
# Next error message should be sent at standard 1 second rate
101+
time_3 = self.publish_message(state)
102+
103+
assert (time_3 - time_1 > rclpy.duration.Duration(seconds=0.99)), \
104+
'Periodic error message received too early'
105+
assert (self.received_state[0] == state), \
106+
'Received state is not the same as the sent state'

0 commit comments

Comments
 (0)