Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topic/mnaveau/buffer #162

Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
*__pycache__*
*.npy
deprecated
agimus_controller_ros/agimus_controller_ros/agimus_controller_parameters.py
MaximilienNaveau marked this conversation as resolved.
Show resolved Hide resolved
4 changes: 2 additions & 2 deletions agimus_controller/agimus_controller/mpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def run(self, initial_state: TrajectoryPoint, current_time_ns: int) -> OCPResult
assert self._ocp is not None
assert self._warm_start is not None
timer1 = time.perf_counter_ns()
self._buffer.clear_past(current_time_ns)
self._buffer.clear_past()
reference_trajectory = self._extract_horizon_from_buffer()
self._ocp.set_reference_trajectory(reference_trajectory)
timer2 = time.perf_counter_ns()
Expand Down Expand Up @@ -65,4 +65,4 @@ def append_trajectory_points(
self._buffer.extend(trajectory_points)

def _extract_horizon_from_buffer(self):
return self._buffer.horizon(self._ocp.horizon_size, self._ocp.dt)
return self._buffer.horizon
6 changes: 3 additions & 3 deletions agimus_controller/agimus_controller/ocp_base_croco.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@
from agimus_controller.factory.robot_model import RobotModels
from agimus_controller.mpc_data import OCPResults, OCPDebugData
from agimus_controller.ocp_base import OCPBase
from agimus_controller.ocp_param_base import OCPParamsBaseCroco
from agimus_controller.ocp_param_base import OCPParamsBase


class OCPBaseCroco(OCPBase):
def __init__(
self,
robot_models: RobotModels,
ocp_params: OCPParamsBaseCroco,
ocp_params: OCPParamsBase,
) -> None:
"""Defines common behavior for all OCP using croccodyl. This is an abstract class with some helpers to design OCPs in a more friendly way.

Args:
robot_models (RobotModels): All models of the robot.
ocp_params (OCPParamsBaseCroco): Input data structure of the OCP.
ocp_params (OCPParamsBase): Input data structure of the OCP.
"""
# Setting the robot model
self._robot_models = robot_models
Expand Down
34 changes: 24 additions & 10 deletions agimus_controller/agimus_controller/ocp_param_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,32 @@


@dataclass
class OCPParamsBaseCroco:
class OCPParamsBase:
"""Input data structure of the OCP."""

# Data relevant to solve the OCP
dt: float # Integration step of the OCP
horizon_size: int # Number of time steps in the horizon
solver_iters: int # Number of solver iterations
# Data relevant to solve the OCP.
dt: float # Integration step of the OCP.
solver_iters: int # Number of solver iterations.
MaximilienNaveau marked this conversation as resolved.
Show resolved Hide resolved
dt_factor_n_seq: list[
tuple[
int, # Number of dts between two time steps, the "factor".
int, # Number of time steps, the "n".
]
]
# Number of time steps in the horizon must be equal to:
# sum(sn for _, sn in dt_factor_n_seq).
horizon_size: int
qp_iters: int = 200 # Number of QP iterations (must be a multiple of 25).
termination_tolerance: float = (
1e-3 # Termination tolerance (norm of the KKT conditions)
1e-3 # Termination tolerance (norm of the KKT conditions).
)
eps_abs: float = 1e-6 # Absolute tolerance of the solver
eps_rel: float = 0.0 # Relative tolerance of the solver
callbacks: bool = False # Flag to enable/disable callbacks
use_filter_line_search = False # Flag to enable/disable the filter line searchs
eps_abs: float = 1e-6 # Absolute tolerance of the solver.
eps_rel: float = 0.0 # Relative tolerance of the solver.
callbacks: bool = False # Flag to enable/disable callbacks.
use_filter_line_search = False # Flag to enable/disable the filter line searchs.

def __post_init__(self):
assert (
self.horizon_size == sum(sn for _, sn in self.dt_factor_n_seq)
and "The horizon size must be equal to the sum of the time steps."
)
169 changes: 161 additions & 8 deletions agimus_controller/agimus_controller/trajectory.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from collections import deque
from copy import deepcopy
from dataclasses import dataclass
import numpy as np
import numpy.typing as npt
Expand All @@ -17,6 +17,56 @@ class TrajectoryPoint:
forces: dict[Force] | None = None # Dictionary of pinocchio.Force
end_effector_poses: dict[SE3] | None = None # Dictionary of pinocchio.SE3

def __eq__(self, other):
if not isinstance(other, TrajectoryPoint):
return False

# Compare scalar values directly
if self.time_ns != other.time_ns:
return False

# Compare numpy arrays (ignoring None values)
if (
self.robot_configuration is not None
and other.robot_configuration is not None
):
if not np.array_equal(self.robot_configuration, other.robot_configuration):
return False
elif (
self.robot_configuration is not None
or other.robot_configuration is not None
):
return False

if self.robot_velocity is not None and other.robot_velocity is not None:
if not np.array_equal(self.robot_velocity, other.robot_velocity):
return False
elif self.robot_velocity is not None or other.robot_velocity is not None:
return False

if self.robot_acceleration is not None and other.robot_acceleration is not None:
if not np.array_equal(self.robot_acceleration, other.robot_acceleration):
return False
elif (
self.robot_acceleration is not None or other.robot_acceleration is not None
):
return False

if self.robot_effort is not None and other.robot_effort is not None:
if not np.array_equal(self.robot_effort, other.robot_effort):
return False
elif self.robot_effort is not None or other.robot_effort is not None:
return False

# Compare dictionaries (forces and end_effector_poses)
if self.forces != other.forces:
return False

if self.end_effector_poses != other.end_effector_poses:
return False

return True


@dataclass
class TrajectoryPointWeights:
Expand All @@ -29,6 +79,59 @@ class TrajectoryPointWeights:
w_forces: dict[npt.NDArray[np.float64]] | None = None
w_end_effector_poses: dict[npt.NDArray[np.float64]] | None = None

def __eq__(self, other):
if not isinstance(other, TrajectoryPointWeights):
return False

# Compare numpy arrays (weights)
if (
self.w_robot_configuration is not None
and other.w_robot_configuration is not None
):
if not np.array_equal(
self.w_robot_configuration, other.w_robot_configuration
):
return False
elif (
self.w_robot_configuration is not None
or other.w_robot_configuration is not None
):
return False

if self.w_robot_velocity is not None and other.w_robot_velocity is not None:
if not np.array_equal(self.w_robot_velocity, other.w_robot_velocity):
return False
elif self.w_robot_velocity is not None or other.w_robot_velocity is not None:
return False

if (
self.w_robot_acceleration is not None
and other.w_robot_acceleration is not None
):
if not np.array_equal(
self.w_robot_acceleration, other.w_robot_acceleration
):
return False
elif (
self.w_robot_acceleration is not None
or other.w_robot_acceleration is not None
):
return False

if self.w_robot_effort is not None and other.w_robot_effort is not None:
if not np.array_equal(self.w_robot_effort, other.w_robot_effort):
return False
elif self.w_robot_effort is not None or other.w_robot_effort is not None:
return False

if self.w_forces != other.w_forces:
return False

if self.w_end_effector_poses != other.w_end_effector_poses:
return False

return True


@dataclass
class WeightedTrajectoryPoint:
Expand All @@ -37,14 +140,64 @@ class WeightedTrajectoryPoint:
point: TrajectoryPoint
weight: TrajectoryPointWeights

def __eq__(self, other):
if not isinstance(other, WeightedTrajectoryPoint):
return False

# Compare the 'point' and 'weight' attributes
if self.point != other.point:
return False

if self.weight != other.weight:
return False

class TrajectoryBuffer(deque):
return True


class TrajectoryBuffer(object):
"""List of variable size in which the HPP trajectory nodes will be."""

def clear_past(self, current_time_ns):
while self and self[0].point.time_ns < current_time_ns:
self.popleft()
def __init__(self, dt_factor_n_seq: list[tuple[int, int]]):
self._buffer = []
self.dt_factor_n_seq = deepcopy(dt_factor_n_seq)
self.horizon_indexes = self.compute_horizon_indexes(self.dt_factor_n_seq)

def append(self, item):
self._buffer.append(item)

def pop(self, index=-1):
return self._buffer.pop(index)

def clear_past(self):
if self._buffer:
TheoMF marked this conversation as resolved.
Show resolved Hide resolved
self._buffer.pop(0)

def compute_horizon_indexes(self, dt_factor_n_seq: list[tuple[int, int]]):
indexes = [0] * sum(sn for _, sn in dt_factor_n_seq)
i = 0
for factor, sn in dt_factor_n_seq:
for _ in range(sn):
indexes[i] = 0 if i == 0 else factor + indexes[i - 1]
i += 1

assert indexes[0] == 0, "First time step must be 0"
assert all(t0 <= t1 for t0, t1 in zip(indexes[:-1], indexes[1:])), (
"Time steps must be increasing"
)
return indexes

@property
def horizon(self):
assert self.horizon_indexes[-1] < len(self._buffer), (
"Size of buffer must be at least horizon_indexes[-1]."
)
return [self._buffer[i] for i in self.horizon_indexes]

def __len__(self):
return len(self._buffer)

def __getitem__(self, index):
return self._buffer[index]

def horizon(self, horizon_size, dt_ocp):
# TBD improve this implementation in case the dt_mpc != dt_ocp
return self._buffer[: self._ocp.horizon_size]
def __setitem__(self, index, value):
self._buffer[index] = value
Loading