-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy patharm_control_backend.py
76 lines (66 loc) · 2.62 KB
/
arm_control_backend.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import lcm
import arm_control # Generated by lcm-gen from arm_control.lcm
#import odrive
import time
class ODriveController:
def __init__(self):
# Empty initializer for now; we set self.odrv when starting motors.
self.odrv = None
def start_motors(self):
"""
Search for an ODrive board and set its axes to closed-loop control.
"""
try:
print("Searching for ODrive...")
# This call blocks until an ODrive is found.
# self.odrv = odrive.find_any()
self.odrv = None
self.odrv.axis0.requested_state = 0
self.odrv.axis1.requested_state = 0
print("ODrive found:", self.odrv)
# Set axes 0 and 1 (if available) to CLOSED_LOOP_CONTROL (state 8).
if hasattr(self.odrv, 'axis0'):
self.odrv.axis0.requested_state = 8
print("Axis 0 set to CLOSED_LOOP_CONTROL")
if hasattr(self.odrv, 'axis1'):
self.odrv.axis1.requested_state = 8
print("Axis 1 set to CLOSED_LOOP_CONTROL")
print("Motors started successfully.")
except Exception as e:
print("Error starting motors:", e)
def update_speed(self, a1, a2, a3, a4, a5, a6, effector):
"""
For now, simply print the speed values received.
Later this method could actually command the ODrive board.
"""
print("ODrive Speed Update:")
print(" a1: {}".format(a1))
print(" a2: {}".format(a2))
print(" a3: {}".format(a3))
print(" a4: {}".format(a4))
print(" a5: {}".format(a5))
print(" a6: {}".format(a6))
print(" effector: {}".format(effector))
# Create a global instance of the ODrive controller and start motors.
odrive_controller = ODriveController()
odrive_controller.start_motors()
def arm_control_handler(channel, data):
"""
LCM handler for ARM_CONTROL messages.
Decodes the message, prints its values, and updates ODrive speeds.
"""
msg = arm_control.ArmControl.decode(data)
print("Received message on channel '{}':".format(channel))
# Update the ODrive controller with the speed values.
odrive_controller.update_speed(msg.a1, msg.a2, msg.a3, msg.a4, msg.a5, msg.a6, msg.effector)
def main():
lc = lcm.LCM()
subscription = lc.subscribe("ARM_CONTROL", arm_control_handler)
print("Listening for ARM_CONTROL messages. Press Ctrl+C to exit.")
try:
while True:
lc.handle()
except KeyboardInterrupt:
print("Exiting...")
if __name__ == "__main__":
main()