1
- import os
2
1
import argparse
3
- import logging
4
- from typing import Any , Dict , Optional
5
2
from contextlib import closing
3
+ from typing import Optional
4
+
6
5
import synchros2 .process as ros_process
7
6
from geometry_msgs .msg import Pose
8
7
from synchros2 .subscription import Subscription
8
+
9
9
from .robot_commander import RobotCommander
10
10
11
+
11
12
class SpotRobotInterface :
12
13
def __init__ (self , robot_name : Optional [str ] = None ):
13
14
# self.robot_name = os.getenv("ROBOT_NAME")
14
15
self .robot_commander = RobotCommander (robot_name )
15
16
self .latest_message = None # Buffer to store the latest message
16
17
self .is_busy = False # Flag to indicate if the robot is busy
17
18
self .robot_commander ._logger .info (f"Robot Name: { self .robot_commander ._robot_name } " )
18
-
19
- def initialize (self ):
19
+
20
+ def initialize (self ) -> bool :
20
21
if not self .robot_commander .initialize_robot ():
21
22
self .robot_commander ._logger .info ("Failed to initialize robot" )
22
23
return False
23
24
self .robot_commander ._logger .info ("Initialized robot" )
24
25
return True
25
26
26
- def process_message (self , message ) :
27
+ def process_message (self , message : Pose ) -> None :
27
28
if self .is_busy :
28
29
self .latest_message = message
29
30
self .robot_commander ._logger .info ("Robot is busy, buffering the latest message" )
30
31
else :
31
32
self .execute_command (message )
32
-
33
- if self .latest_message :
34
- self .execute_command (self .latest_message )
35
- self .latest_message = None
36
33
37
- def execute_command (self , message ):
34
+ # if self.latest_message:
35
+ # # self.execute_command(self.latest_message)
36
+ # self.latest_message = None
37
+
38
+ def execute_command (self , message : Pose ) -> None :
38
39
self .is_busy = True
39
40
self .robot_commander .walk_forward_with_world_frame_goal (message )
40
41
self .is_busy = False
41
-
42
- def listen_to_pose_commands (self ):
42
+
43
+ def listen_to_pose_commands (self ) -> None :
43
44
topic_data = Subscription (Pose , "/pose_commands" )
44
45
with closing (topic_data .stream ()) as stream :
45
46
for message in stream :
46
47
self .process_message (message )
47
48
49
+
48
50
def cli () -> argparse .ArgumentParser :
49
51
parser = argparse .ArgumentParser ()
50
52
parser .add_argument ("--robot" , help = "Name of the robot if the ROS driver is inside that namespace" )
51
53
return parser
52
54
55
+
53
56
@ros_process .main (cli ())
54
57
def main (args : argparse .Namespace ) -> None :
55
58
robot_interface = SpotRobotInterface (args .robot )
@@ -58,5 +61,6 @@ def main(args: argparse.Namespace) -> None:
58
61
if robot_interface .initialize ():
59
62
robot_interface .listen_to_pose_commands ()
60
63
64
+
61
65
if __name__ == "__main__" :
62
66
main ()
0 commit comments