-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsolver.py
96 lines (86 loc) · 3.71 KB
/
solver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import ikpy.chain
import numpy as np
import ikpy.utils.plot as plot_utils
import matplotlib.pyplot as plt
import lcm
import threading
import target_position # Generated by lcm-gen from target_position.lcm
class iksolver:
def __init__(self):
# Load the kinematic chain from the URDF file.
self.my_chain = ikpy.chain.Chain.from_urdf_file("dummy_urdf/dummy_urdf/urdf/dummy_urdf.urdf")
self.joint_angle = None
self.goal = None
# Create a persistent figure and axis for plotting.
self.fig, self.ax = plot_utils.init_3d_figure()
plt.ion() # Turn on interactive mode.
self.new_update = False # Flag to indicate new data is available.
def compute(self, target_position):
"""
Compute the inverse kinematics for the given target position.
"""
self.joint_angle = self.my_chain.inverse_kinematics(target_position)
self.goal = target_position
print("The angles of each joint are: {}".format(self.joint_angle))
self.real_frame = self.my_chain.forward_kinematics(self.joint_angle)
print("Computed position vector: {}, original position vector: {}".format(
self.real_frame[:3, 3], target_position))
self.new_update = True # Signal that a new update is available.
def update_plot(self):
"""
Clear and update the plot with the current chain configuration.
Also overlay text showing the target position and joint angles.
"""
self.ax.cla() # Clear the axis.
self.my_chain.plot(self.joint_angle, self.ax, target=self.goal)
# Set axis limits (you might need to adjust these based on your model)
self.ax.set_xlim(-0.5, 0.5)
self.ax.set_ylim(-0.5, 0.5)
self.ax.set_zlim(0, 1.5)
# Overlay text information for clarity.
textstr = f"Target: {np.round(self.goal, 3)}\nJoint Angles: {np.round(self.joint_angle, 3)}"
self.ax.text2D(0.05, 0.95, textstr, transform=self.ax.transAxes, fontsize=10,
verticalalignment='top', bbox=dict(boxstyle="round", facecolor="wheat", alpha=0.5))
self.fig.canvas.draw_idle()
self.fig.canvas.flush_events()
def start_subscriber(self):
"""
Start an LCM subscriber in a separate thread.
"""
thread = threading.Thread(target=self._subscriber_loop)
thread.daemon = True
thread.start()
def _subscriber_loop(self):
"""
LCM subscriber loop: listens for messages on channel "TARGET_POSITION".
"""
lc_inst = lcm.LCM()
def handler(channel, data):
msg = target_position.TargetPosition.decode(data)
target = np.array([msg.x, msg.y, msg.z])
print("Received target on channel {}: {}".format(channel, target))
self.compute(target)
# Do NOT update the plot here. The main thread will handle that.
subscription = lc_inst.subscribe("TARGET_POSITION", handler)
try:
while True:
lc_inst.handle()
except KeyboardInterrupt:
pass
if __name__ == '__main__':
# Create the IK solver instance and start the LCM subscriber.
solve = iksolver()
solve.start_subscriber()
# Perform an initial computation and update the plot.
solve.compute(np.array([0.05, 0.05, 0.1]))
solve.update_plot()
print("Starting main loop. Press Ctrl+C to exit.")
try:
# Main loop: if new data is available, update the plot.
while True:
if solve.new_update:
solve.update_plot()
solve.new_update = False
plt.pause(0.1) # Allow the GUI to update.
except KeyboardInterrupt:
print("Shutting down.")