1
+ import unittest
2
+
1
3
import pytest
2
4
import rclpy
3
- import unittest
4
5
5
6
from launch import LaunchDescription
6
7
from launch_ros .actions import Node
8
+ from diagnostic_msgs .msg import DiagnosticArray , DiagnosticStatus
7
9
from launch_testing .actions import ReadyToTest
8
10
9
- from diagnostic_msgs .msg import DiagnosticArray , DiagnosticStatus
10
11
11
12
12
13
@pytest .mark .launch_test
13
14
def generate_test_description ():
14
15
# Launch the aggregator
15
- parameters = [{" analyzers.test.type" : " diagnostic_aggregator/GenericAnalyzer" },
16
- {" analyzers.test.path" : " Test" },
17
- {" analyzers.test.contains" : [" test" ]},
18
- {" path" : " Base" },
19
- {" critical" : True }]
16
+ parameters = [{' analyzers.test.type' : ' diagnostic_aggregator/GenericAnalyzer' },
17
+ {' analyzers.test.path' : ' Test' },
18
+ {' analyzers.test.contains' : [' test' ]},
19
+ {' path' : ' Base' },
20
+ {' critical' : True }]
20
21
21
22
aggregator_cmd = Node (
22
23
package = 'diagnostic_aggregator' ,
@@ -62,7 +63,7 @@ def publish_message(self, level):
62
63
msg = DiagnosticArray ()
63
64
msg .status .append (DiagnosticStatus ())
64
65
msg .status [0 ].level = level
65
- msg .status [0 ].name = " test status"
66
+ msg .status [0 ].name = ' test status'
66
67
self .publisher .publish (msg )
67
68
while len (self .received_state ) == 0 :
68
69
rclpy .spin_once (self .node )
@@ -75,31 +76,31 @@ def test_ok_state(self):
75
76
time_0 = self .publish_message (state )
76
77
77
78
assert (self .received_state [0 ] == state ), \
78
- " Received state is not the same as the sent state"
79
+ ' Received state is not the same as the sent state'
79
80
self .received_state .clear ()
80
81
81
82
# Publish the ok message and expect the toplevel state after 1 second period
82
83
time_1 = self .publish_message (state )
83
84
assert (time_1 - time_0 > rclpy .duration .Duration (seconds = 0.99 )), \
84
- " OK message received too early"
85
+ ' OK message received too early'
85
86
assert (self .received_state [0 ] == state ), \
86
- " Received state is not the same as the sent state"
87
+ ' Received state is not the same as the sent state'
87
88
self .received_state .clear ()
88
89
89
90
# Publish the message and expect the critical error message immediately
90
91
state = DiagnosticStatus .ERROR
91
92
time_2 = self .publish_message (state )
92
93
93
94
assert (time_2 - time_1 < rclpy .duration .Duration (seconds = 0.1 )), \
94
- " Critical error message not received within 0.1 second"
95
+ ' Critical error message not received within 0.1 second'
95
96
assert (self .received_state [0 ] == state ), \
96
- " Received state is not the same as the sent state"
97
+ ' Received state is not the same as the sent state'
97
98
self .received_state .clear ()
98
99
99
100
# Next error message should be sent at standard 1 second rate
100
101
time_3 = self .publish_message (state )
101
102
102
103
assert (time_3 - time_1 > rclpy .duration .Duration (seconds = 0.99 )), \
103
- " Periodic error message received too early"
104
+ ' Periodic error message received too early'
104
105
assert (self .received_state [0 ] == state ), \
105
- " Received state is not the same as the sent state"
106
+ ' Received state is not the same as the sent state'
0 commit comments