Skip to content

Commit

Permalink
[pre-commit.ci] auto fixes from pre-commit.com hooks
Browse files Browse the repository at this point in the history
for more information, see https://pre-commit.ci
  • Loading branch information
pre-commit-ci[bot] committed Jan 26, 2025
1 parent 451c2c4 commit d2ad008
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 53 deletions.
99 changes: 67 additions & 32 deletions guidance/guidance_los/guidance_los/los_guidance_algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __sub__(self, other: "State") -> "State":

def as_los_array(self):
return np.array([self.surge_vel, self.pitch, self.yaw])

def as_pos_array(self):
return np.array([self.x, self.y, self.z])

Expand Down Expand Up @@ -179,19 +179,27 @@ def compute_pitch_command(
self.los_params.max_pitch_angle,
)

def compute_desired_speed(self, yaw_error: float, distance_to_target: float, crosstrack_y: float) -> float:
"""
Compute speed command with yaw error, distance and crosstrack-based scaling.
def compute_desired_speed(
self, yaw_error: float, distance_to_target: float, crosstrack_y: float
) -> float:
"""Compute speed command with yaw error, distance and crosstrack-based scaling.
Args:
yaw_error: Error in yaw angle (radians)
distance_to_target: Distance to target (meters)
crosstrack_y: Cross-track error in y direction (meters)
"""
yaw_factor = np.cos(yaw_error)
distance_factor = min(1.0, distance_to_target / self.los_params.lookahead_distance_max)
distance_factor = min(
1.0, distance_to_target / self.los_params.lookahead_distance_max
)
crosstrack_factor = 1.0 / (1.0 + abs(crosstrack_y))
desired_speed = self.los_params.nominal_speed * yaw_factor * distance_factor * crosstrack_factor
desired_speed = (
self.los_params.nominal_speed
* yaw_factor
* distance_factor
* crosstrack_factor
)
return max(self.los_params.min_speed, desired_speed)

def apply_reference_filter(self, commands: State) -> State:
Expand All @@ -217,50 +225,77 @@ def compute_pi_h(current_waypoint: State, next_waypoint: State) -> float:
dx = next_waypoint.x - current_waypoint.x
dy = next_waypoint.y - current_waypoint.y
return np.arctan2(dy, dx)

@staticmethod
def compute_pi_v(current_waypoint: State, next_waypoint: State) -> float:
dz = next_waypoint.z - current_waypoint.z
horizontal_distance = np.sqrt((next_waypoint.x - current_waypoint.x)**2 + (next_waypoint.y - current_waypoint.y)**2)
horizontal_distance = np.sqrt(
(next_waypoint.x - current_waypoint.x) ** 2
+ (next_waypoint.y - current_waypoint.y) ** 2
)
return np.arctan2(-dz, horizontal_distance)

@staticmethod
def compute_rotation_y(pi_v: float) -> np.ndarray:
return np.array([
[np.cos(pi_v), 0, np.sin(pi_v)],
[0, 1, 0],
[-np.sin(pi_v), 0, np.cos(pi_v)]
])

return np.array(
[
[np.cos(pi_v), 0, np.sin(pi_v)],
[0, 1, 0],
[-np.sin(pi_v), 0, np.cos(pi_v)],
]
)

@staticmethod
def compute_rotation_z(pi_h: float) -> np.ndarray:
return np.array([
[np.cos(pi_h), -np.sin(pi_h), 0],
[np.sin(pi_h), np.cos(pi_h), 0],
[0, 0, 1]
])

def compute_psi_d(self, current_waypoint: State, next_waypoint: State, crosstrack_y: float, y_int: float) -> float:
return np.array(
[
[np.cos(pi_h), -np.sin(pi_h), 0],
[np.sin(pi_h), np.cos(pi_h), 0],
[0, 0, 1],
]
)

def compute_psi_d(
self,
current_waypoint: State,
next_waypoint: State,
crosstrack_y: float,
y_int: float,
) -> float:
pi_h = self.compute_pi_h(current_waypoint, next_waypoint)
psi_d = pi_h - np.arctan((crosstrack_y + self.kappa) / self.lookahead_h)
psi_d = self.ssa(psi_d)
return psi_d

def compute_theta_d(self, current_waypoint: State, next_waypoint: State, crosstrack_z: float, alpha_c: float) -> float:

def compute_theta_d(
self,
current_waypoint: State,
next_waypoint: State,
crosstrack_z: float,
alpha_c: float,
) -> float:
pi_v = self.compute_pi_v(current_waypoint, next_waypoint)
theta_d = pi_v + np.arctan(crosstrack_z / self.lookahead_v) + alpha_c
theta_d = self.ssa(theta_d)
return theta_d

@staticmethod
def calculate_alpha_c(u: float, v: float, w: float, phi: float) -> float:
return np.arctan((v * np.sin(phi) + w * np.cos(phi)) / u) # Slide 104 in Fossen 2024

return np.arctan(
(v * np.sin(phi) + w * np.cos(phi)) / u
) # Slide 104 in Fossen 2024

@staticmethod
def calculate_beta_c(u: float, v: float, w: float, phi: float, theta: float, alpha_c: float) -> float:
U_v = u * np.sqrt(1 + np.tan(alpha_c)**2)
return np.arctan((v * np.cos(phi) - w * np.sin(phi)) / (U_v * np.cos(theta - alpha_c))) # Slide 104 in Fossen 2024
def calculate_beta_c(
u: float, v: float, w: float, phi: float, theta: float, alpha_c: float
) -> float:
U_v = u * np.sqrt(1 + np.tan(alpha_c) ** 2)
return np.arctan(
(v * np.cos(phi) - w * np.sin(phi)) / (U_v * np.cos(theta - alpha_c))
) # Slide 104 in Fossen 2024

def calculate_y_int_dot(self, crosstrack_y, y_int):
y_int_dot = (self.lookahead_h * crosstrack_y) / (self.lookahead_h**2 + (crosstrack_y + self.kappa * y_int)**2)
return y_int_dot
y_int_dot = (self.lookahead_h * crosstrack_y) / (
self.lookahead_h**2 + (crosstrack_y + self.kappa * y_int) ** 2
)
return y_int_dot
72 changes: 51 additions & 21 deletions guidance/guidance_los/scripts/los_guidance_action_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ def __init__(self):

self.desired_vel = 0.3
self.y_int = 0.0


def declare_los_parameters_(self):
"""Declare all LOS guidance parameters."""
Expand Down Expand Up @@ -133,7 +132,6 @@ def _declare_topic_parameters(self):

def _initialize_publishers(self):
"""Initialize all publishers."""

los_commands_topic = self.get_parameter('topics.publishers.los_commands').value
self.get_logger().info(f"Publishing LOS commands to: {los_commands_topic}")
self.guidance_cmd_pub = self.create_publisher(
Expand Down Expand Up @@ -219,7 +217,10 @@ def odom_callback(self, msg: Odometry):
self.filter_initialized = True

def calculate_guidance(self) -> State:
error = self.state.as_pos_array() - self.waypoints[self.current_waypoint_index].as_pos_array()
error = (
self.state.as_pos_array()
- self.waypoints[self.current_waypoint_index].as_pos_array()
)
_, crosstrack_y, crosstrack_z = self.rotation_yz @ error
alpha_c = self.guidance_calculator.calculate_alpha_c(
self.u, self.v, self.w, self.phi
Expand All @@ -233,8 +234,12 @@ def calculate_guidance(self) -> State:
crosstrack_y,
self.y_int,
)
y_int_dot = self.guidance_calculator.calculate_y_int_dot(crosstrack_y, self.y_int)
self.y_int += y_int_dot * self.update_period # Forward euler, may need to use Runge-Kutta 4
y_int_dot = self.guidance_calculator.calculate_y_int_dot(
crosstrack_y, self.y_int
)
self.y_int += (
y_int_dot * self.update_period
) # Forward euler, may need to use Runge-Kutta 4
theta_d = self.guidance_calculator.compute_theta_d(
self.waypoints[self.current_waypoint_index],
self.waypoints[self.current_waypoint_index + 1],
Expand All @@ -254,7 +259,6 @@ def calculate_guidance(self) -> State:

def execute_callback(self, goal_handle: ServerGoalHandle):
"""Execute waypoint navigation action."""

# Initialize navigation goal with lock
with self._goal_lock:
self.goal_handle = goal_handle
Expand All @@ -269,15 +273,24 @@ def execute_callback(self, goal_handle: ServerGoalHandle):
)
self.waypoints.append(point_as_state)

self.pi_h = self.guidance_calculator.compute_pi_h(self.waypoints[self.current_waypoint_index], self.waypoints[self.current_waypoint_index + 1])
self.pi_v = self.guidance_calculator.compute_pi_v(self.waypoints[self.current_waypoint_index], self.waypoints[self.current_waypoint_index + 1])
self.pi_h = self.guidance_calculator.compute_pi_h(
self.waypoints[self.current_waypoint_index],
self.waypoints[self.current_waypoint_index + 1],
)
self.pi_v = self.guidance_calculator.compute_pi_v(
self.waypoints[self.current_waypoint_index],
self.waypoints[self.current_waypoint_index + 1],
)

rotation_y = self.guidance_calculator.compute_rotation_y(self.pi_v)
rotation_z = self.guidance_calculator.compute_rotation_z(self.pi_h)
self.rotation_yz = rotation_y.T @ rotation_z.T

# Initialize y_int based on initial crosstrack error
initial_error = self.state.as_pos_array() - self.waypoints[self.current_waypoint_index].as_pos_array()
initial_error = (
self.state.as_pos_array()
- self.waypoints[self.current_waypoint_index].as_pos_array()
)
_, initial_crosstrack_y, _ = self.rotation_yz @ initial_error
self.y_int = -initial_crosstrack_y / self.guidance_calculator.kappa

Expand All @@ -286,7 +299,9 @@ def execute_callback(self, goal_handle: ServerGoalHandle):

rate = self.create_rate(1.0 / self.update_period)
waiting_at_waypoint = False
wait_cycles = int(1.0 / self.update_period) # Number of cycles for 1 second wait
wait_cycles = int(
1.0 / self.update_period
) # Number of cycles for 1 second wait
wait_count = 0

self.get_logger().info('Executing goal')
Expand Down Expand Up @@ -314,39 +329,53 @@ def execute_callback(self, goal_handle: ServerGoalHandle):
if not waiting_at_waypoint:
self.get_logger().info('Waypoint reached')
waiting_at_waypoint = True
stop_state = State(surge_vel=0.0, pitch=self.state.pitch, yaw=self.state.yaw)
stop_state = State(
surge_vel=0.0, pitch=self.state.pitch, yaw=self.state.yaw
)
self.guidance_calculator.reset_filter_state(stop_state)

if wait_count < wait_cycles:
self.publish_guidance(State(surge_vel=0.0, pitch=self.state.pitch, yaw=self.state.yaw))
self.publish_guidance(
State(surge_vel=0.0, pitch=self.state.pitch, yaw=self.state.yaw)
)
wait_count += 1
else:
self.current_waypoint_index += 1
# Reset integral term for new waypoint
initial_error = self.state.as_pos_array() - self.waypoints[self.current_waypoint_index].as_pos_array()
initial_error = (
self.state.as_pos_array()
- self.waypoints[self.current_waypoint_index].as_pos_array()
)
_, initial_crosstrack_y, _ = self.rotation_yz @ initial_error
self.y_int = -initial_crosstrack_y / self.guidance_calculator.kappa
waiting_at_waypoint = False
wait_count = 0

# Reset filter with stop state
# self.guidance_calculator.reset_filter_state(stop_state)

# # Publish stop command and wait briefly
# self.publish_guidance(stop_state)
# time.sleep(2.0) # Wait for 2 seconds to stabilize


if self.current_waypoint_index >= len(self.waypoints) - 1:
self.get_logger().info('All waypoints reached!')
final_commands = State(surge_vel=0.0, pitch=self.state.pitch, yaw=self.state.yaw)
final_commands = State(
surge_vel=0.0, pitch=self.state.pitch, yaw=self.state.yaw
)
self.publish_guidance(final_commands)
result.success = True
self.goal_handle.succeed()
return result

self.pi_h = self.guidance_calculator.compute_pi_h(self.waypoints[self.current_waypoint_index], self.waypoints[self.current_waypoint_index + 1])
self.pi_v = self.guidance_calculator.compute_pi_v(self.waypoints[self.current_waypoint_index], self.waypoints[self.current_waypoint_index + 1])

self.pi_h = self.guidance_calculator.compute_pi_h(
self.waypoints[self.current_waypoint_index],
self.waypoints[self.current_waypoint_index + 1],
)
self.pi_v = self.guidance_calculator.compute_pi_v(
self.waypoints[self.current_waypoint_index],
self.waypoints[self.current_waypoint_index + 1],
)

rotation_y = self.guidance_calculator.compute_rotation_y(self.pi_v)
rotation_z = self.guidance_calculator.compute_rotation_z(self.pi_h)
Expand All @@ -366,6 +395,7 @@ def publish_guidance(self, commands: State):
msg.yaw = commands.yaw
self.guidance_cmd_pub.publish(msg)


def main(args=None):
"""Main function to initialize and run the guidance node."""
rclpy.init(args=args)
Expand All @@ -384,4 +414,4 @@ def main(args=None):


if __name__ == '__main__':
main()
main()

0 comments on commit d2ad008

Please sign in to comment.