Skip to content

Commit 2d40c6c

Browse files
feat: publish top level msg when error is received
1 parent 24fb79a commit 2d40c6c

File tree

4 files changed

+147
-1
lines changed

4 files changed

+147
-1
lines changed

diagnostic_aggregator/CMakeLists.txt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,11 @@ if(BUILD_TESTING)
123123
ENV
124124
)
125125
endforeach()
126+
127+
add_launch_test(
128+
test/test_critical_pub.py
129+
TIMEOUT 30
130+
)
126131
endif()
127132

128133
install(

diagnostic_aggregator/include/diagnostic_aggregator/aggregator.hpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,16 @@ class Aggregator
152152

153153
std::string base_path_; /**< \brief Prepended to all status names of aggregator. */
154154

155+
/*!
156+
*\brief If true, aggregator will publish an error immediately after receiving.
157+
*/
158+
bool critical_;
159+
160+
/*!
161+
*\brief Store the last top level value to publish the critical error only once.
162+
*/
163+
std::uint8_t last_top_level_state_;
164+
155165
/// Records all ROS warnings. No warnings are repeated.
156166
std::set<std::string> ros_warnings_;
157167

diagnostic_aggregator/src/aggregator.cpp

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,9 @@ Aggregator::Aggregator()
6464
pub_rate_(1.0),
6565
history_depth_(1000),
6666
clock_(n_->get_clock()),
67-
base_path_("")
67+
base_path_(""),
68+
critical_(false),
69+
last_top_level_state_(DiagnosticStatus::STALE)
6870
{
6971
RCLCPP_DEBUG(logger_, "constructor");
7072
bool other_as_errors = false;
@@ -88,12 +90,16 @@ Aggregator::Aggregator()
8890
other_as_errors = param.second.as_bool();
8991
} else if (param.first.compare("history_depth") == 0) {
9092
history_depth_ = param.second.as_int();
93+
} else if (param.first.compare("critical") == 0) {
94+
critical_ = param.second.as_bool();
9195
}
9296
}
9397
RCLCPP_DEBUG(logger_, "Aggregator publication rate configured to: %f", pub_rate_);
9498
RCLCPP_DEBUG(logger_, "Aggregator base path configured to: %s", base_path_.c_str());
9599
RCLCPP_DEBUG(
96100
logger_, "Aggregator other_as_errors configured to: %s", (other_as_errors ? "true" : "false"));
101+
RCLCPP_DEBUG(
102+
logger_, "Aggregator critical publisher configured to: %s", (critical_ ? "true" : "false"));
97103

98104
analyzer_group_ = std::make_unique<AnalyzerGroup>();
99105
if (!analyzer_group_->init(base_path_, "", n_)) {
@@ -149,6 +155,24 @@ void Aggregator::diagCallback(const DiagnosticArray::SharedPtr diag_msg)
149155
std::lock_guard<std::mutex> lock(mutex_);
150156
for (auto j = 0u; j < diag_msg->status.size(); ++j) {
151157
analyzed = false;
158+
159+
const bool top_level_state_transition_to_error =
160+
(last_top_level_state_ != DiagnosticStatus::ERROR) &&
161+
(diag_msg->status[j].level == DiagnosticStatus::ERROR);
162+
163+
if (critical_ && top_level_state_transition_to_error) {
164+
RCLCPP_DEBUG(
165+
logger_, "Received error message: %s, publishing error immediately",
166+
diag_msg->status[j].name.c_str());
167+
DiagnosticStatus diag_toplevel_state;
168+
diag_toplevel_state.name = "toplevel_state_critical";
169+
diag_toplevel_state.level = diag_msg->status[j].level;
170+
toplevel_state_pub_->publish(diag_toplevel_state);
171+
172+
// store the last published state
173+
last_top_level_state_ = diag_toplevel_state.level;
174+
}
175+
152176
auto item = std::make_shared<StatusItem>(&diag_msg->status[j]);
153177

154178
if (analyzer_group_->match(item->getName())) {
@@ -217,6 +241,8 @@ void Aggregator::publishData()
217241
// have stale items but not all are stale
218242
diag_toplevel_state.level = DiagnosticStatus::ERROR;
219243
}
244+
last_top_level_state_ = diag_toplevel_state.level;
245+
220246
toplevel_state_pub_->publish(diag_toplevel_state);
221247
}
222248

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
import pytest
2+
import rclpy
3+
import unittest
4+
5+
from launch import LaunchDescription
6+
from launch_ros.actions import Node
7+
from launch_testing.actions import ReadyToTest
8+
9+
from diagnostic_msgs.msg import DiagnosticArray, DiagnosticStatus
10+
11+
12+
@pytest.mark.launch_test
13+
def generate_test_description():
14+
# Launch the aggregator
15+
parameters = [{"analyzers.test.type": "diagnostic_aggregator/GenericAnalyzer"},
16+
{"analyzers.test.path": "Test"},
17+
{"analyzers.test.contains": ["test"]},
18+
{"path": "Base"},
19+
{"critical": True}]
20+
21+
aggregator_cmd = Node(
22+
package='diagnostic_aggregator',
23+
executable='aggregator_node',
24+
name='diagnostic_aggregator',
25+
parameters=parameters)
26+
27+
ld = LaunchDescription()
28+
29+
# Launch the node
30+
ld.add_action(aggregator_cmd)
31+
ld.add_action(ReadyToTest())
32+
return ld
33+
34+
35+
class TestProcessOutput(unittest.TestCase):
36+
37+
@ classmethod
38+
def setUpClass(cls):
39+
# Initialize the ROS context for the test node
40+
rclpy.init()
41+
42+
@ classmethod
43+
def tearDownClass(cls):
44+
# Shutdown the ROS context
45+
rclpy.shutdown()
46+
47+
def setUp(self):
48+
# Create a ROS node for tests
49+
self.node = rclpy.create_node('test_node')
50+
self.publisher = self.node.create_publisher(DiagnosticArray, '/diagnostics', 1)
51+
self.subscriber = self.node.create_subscription(
52+
DiagnosticStatus,
53+
'/diagnostics_toplevel_state',
54+
lambda msg: self.received_state.append(msg.level),
55+
1)
56+
self.received_state = []
57+
58+
def tearDown(self):
59+
self.node.destroy_node()
60+
61+
def publish_message(self, level):
62+
msg = DiagnosticArray()
63+
msg.status.append(DiagnosticStatus())
64+
msg.status[0].level = level
65+
msg.status[0].name = "test status"
66+
self.publisher.publish(msg)
67+
while len(self.received_state) == 0:
68+
rclpy.spin_once(self.node)
69+
70+
return self.node.get_clock().now()
71+
72+
def test_ok_state(self):
73+
# Publish the ok message and wait till the toplevel state is received
74+
state = DiagnosticStatus.OK
75+
time_0 = self.publish_message(state)
76+
77+
assert (self.received_state[0] == state), \
78+
"Received state is not the same as the sent state"
79+
self.received_state.clear()
80+
81+
# Publish the ok message and expect the toplevel state after 1 second period
82+
time_1 = self.publish_message(state)
83+
assert (time_1 - time_0 > rclpy.duration.Duration(seconds=0.99)), \
84+
"OK message received too early"
85+
assert (self.received_state[0] == state), \
86+
"Received state is not the same as the sent state"
87+
self.received_state.clear()
88+
89+
# Publish the message and expect the critical error message immediately
90+
state = DiagnosticStatus.ERROR
91+
time_2 = self.publish_message(state)
92+
93+
assert (time_2 - time_1 < rclpy.duration.Duration(seconds=0.1)), \
94+
"Critical error message not received within 0.1 second"
95+
assert (self.received_state[0] == state), \
96+
"Received state is not the same as the sent state"
97+
self.received_state.clear()
98+
99+
# Next error message should be sent at standard 1 second rate
100+
time_3 = self.publish_message(state)
101+
102+
assert (time_3 - time_1 > rclpy.duration.Duration(seconds=0.99)), \
103+
"Periodic error message received too early"
104+
assert (self.received_state[0] == state), \
105+
"Received state is not the same as the sent state"

0 commit comments

Comments
 (0)