diff --git a/.gitignore b/.gitignore index 97a9b1a4..ca1d9b95 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ *__pycache__* *.npy deprecated +agimus_controller_ros/agimus_controller_ros/agimus_controller_parameters.py diff --git a/agimus_controller/agimus_controller/mpc.py b/agimus_controller/agimus_controller/mpc.py index 420a32f1..5e453405 100644 --- a/agimus_controller/agimus_controller/mpc.py +++ b/agimus_controller/agimus_controller/mpc.py @@ -31,7 +31,7 @@ def run(self, initial_state: TrajectoryPoint, current_time_ns: int) -> OCPResult assert self._ocp is not None assert self._warm_start is not None timer1 = time.perf_counter_ns() - self._buffer.clear_past(current_time_ns) + self._buffer.clear_past() reference_trajectory = self._extract_horizon_from_buffer() self._ocp.set_reference_trajectory(reference_trajectory) timer2 = time.perf_counter_ns() @@ -65,4 +65,4 @@ def append_trajectory_points( self._buffer.extend(trajectory_points) def _extract_horizon_from_buffer(self): - return self._buffer.horizon(self._ocp.horizon_size, self._ocp.dt) + return self._buffer.horizon diff --git a/agimus_controller/agimus_controller/ocp_param_base.py b/agimus_controller/agimus_controller/ocp_param_base.py index 210dca21..b489552b 100644 --- a/agimus_controller/agimus_controller/ocp_param_base.py +++ b/agimus_controller/agimus_controller/ocp_param_base.py @@ -5,15 +5,29 @@ class OCPParamsBaseCroco: """Input data structure of the OCP.""" - # Data relevant to solve the OCP - dt: float # Integration step of the OCP - horizon_size: int # Number of time steps in the horizon - solver_iters: int # Number of solver iterations + # Data relevant to solve the OCP. + dt: float # Integration step of the OCP. + solver_iters: int # Number of solver iterations. + dt_factor_n_seq: list[ + tuple[ + int, # Number of dts between two time steps, the "factor". + int, # Number of time steps, the "n". + ] + ] + # Number of time steps in the horizon must be equal to: + # sum(sn for _, sn in dt_factor_n_seq). + horizon_size: int qp_iters: int = 200 # Number of QP iterations (must be a multiple of 25). termination_tolerance: float = ( - 1e-3 # Termination tolerance (norm of the KKT conditions) + 1e-3 # Termination tolerance (norm of the KKT conditions). ) - eps_abs: float = 1e-6 # Absolute tolerance of the solver - eps_rel: float = 0.0 # Relative tolerance of the solver - callbacks: bool = False # Flag to enable/disable callbacks - use_filter_line_search = False # Flag to enable/disable the filter line searchs + eps_abs: float = 1e-6 # Absolute tolerance of the solver. + eps_rel: float = 0.0 # Relative tolerance of the solver. + callbacks: bool = False # Flag to enable/disable callbacks. + use_filter_line_search = False # Flag to enable/disable the filter line searchs. + + def __post_init__(self): + assert ( + self.horizon_size == sum(sn for _, sn in self.dt_factor_n_seq) + and "The horizon size must be equal to the sum of the time steps." + ) diff --git a/agimus_controller/agimus_controller/trajectory.py b/agimus_controller/agimus_controller/trajectory.py index a7de02a9..04df4c07 100644 --- a/agimus_controller/agimus_controller/trajectory.py +++ b/agimus_controller/agimus_controller/trajectory.py @@ -1,4 +1,4 @@ -from collections import deque +from copy import deepcopy from dataclasses import dataclass import numpy as np import numpy.typing as npt @@ -17,6 +17,56 @@ class TrajectoryPoint: forces: dict[Force] | None = None # Dictionary of pinocchio.Force end_effector_poses: dict[SE3] | None = None # Dictionary of pinocchio.SE3 + def __eq__(self, other): + if not isinstance(other, TrajectoryPoint): + return False + + # Compare scalar values directly + if self.time_ns != other.time_ns: + return False + + # Compare numpy arrays (ignoring None values) + if ( + self.robot_configuration is not None + and other.robot_configuration is not None + ): + if not np.array_equal(self.robot_configuration, other.robot_configuration): + return False + elif ( + self.robot_configuration is not None + or other.robot_configuration is not None + ): + return False + + if self.robot_velocity is not None and other.robot_velocity is not None: + if not np.array_equal(self.robot_velocity, other.robot_velocity): + return False + elif self.robot_velocity is not None or other.robot_velocity is not None: + return False + + if self.robot_acceleration is not None and other.robot_acceleration is not None: + if not np.array_equal(self.robot_acceleration, other.robot_acceleration): + return False + elif ( + self.robot_acceleration is not None or other.robot_acceleration is not None + ): + return False + + if self.robot_effort is not None and other.robot_effort is not None: + if not np.array_equal(self.robot_effort, other.robot_effort): + return False + elif self.robot_effort is not None or other.robot_effort is not None: + return False + + # Compare dictionaries (forces and end_effector_poses) + if self.forces != other.forces: + return False + + if self.end_effector_poses != other.end_effector_poses: + return False + + return True + @dataclass class TrajectoryPointWeights: @@ -29,6 +79,59 @@ class TrajectoryPointWeights: w_forces: dict[npt.NDArray[np.float64]] | None = None w_end_effector_poses: dict[npt.NDArray[np.float64]] | None = None + def __eq__(self, other): + if not isinstance(other, TrajectoryPointWeights): + return False + + # Compare numpy arrays (weights) + if ( + self.w_robot_configuration is not None + and other.w_robot_configuration is not None + ): + if not np.array_equal( + self.w_robot_configuration, other.w_robot_configuration + ): + return False + elif ( + self.w_robot_configuration is not None + or other.w_robot_configuration is not None + ): + return False + + if self.w_robot_velocity is not None and other.w_robot_velocity is not None: + if not np.array_equal(self.w_robot_velocity, other.w_robot_velocity): + return False + elif self.w_robot_velocity is not None or other.w_robot_velocity is not None: + return False + + if ( + self.w_robot_acceleration is not None + and other.w_robot_acceleration is not None + ): + if not np.array_equal( + self.w_robot_acceleration, other.w_robot_acceleration + ): + return False + elif ( + self.w_robot_acceleration is not None + or other.w_robot_acceleration is not None + ): + return False + + if self.w_robot_effort is not None and other.w_robot_effort is not None: + if not np.array_equal(self.w_robot_effort, other.w_robot_effort): + return False + elif self.w_robot_effort is not None or other.w_robot_effort is not None: + return False + + if self.w_forces != other.w_forces: + return False + + if self.w_end_effector_poses != other.w_end_effector_poses: + return False + + return True + @dataclass class WeightedTrajectoryPoint: @@ -37,14 +140,64 @@ class WeightedTrajectoryPoint: point: TrajectoryPoint weight: TrajectoryPointWeights + def __eq__(self, other): + if not isinstance(other, WeightedTrajectoryPoint): + return False + + # Compare the 'point' and 'weight' attributes + if self.point != other.point: + return False + + if self.weight != other.weight: + return False -class TrajectoryBuffer(deque): + return True + + +class TrajectoryBuffer(object): """List of variable size in which the HPP trajectory nodes will be.""" - def clear_past(self, current_time_ns): - while self and self[0].point.time_ns < current_time_ns: - self.popleft() + def __init__(self, dt_factor_n_seq: list[tuple[int, int]]): + self._buffer = [] + self.dt_factor_n_seq = deepcopy(dt_factor_n_seq) + self.horizon_indexes = self.compute_horizon_indexes(self.dt_factor_n_seq) + + def append(self, item): + self._buffer.append(item) + + def pop(self, index=-1): + return self._buffer.pop(index) + + def clear_past(self): + if self._buffer: + self._buffer.pop(0) + + def compute_horizon_indexes(self, dt_factor_n_seq: list[tuple[int, int]]): + indexes = [0] * sum(sn for _, sn in dt_factor_n_seq) + i = 0 + for factor, sn in dt_factor_n_seq: + for _ in range(sn): + indexes[i] = 0 if i == 0 else factor + indexes[i - 1] + i += 1 + + assert indexes[0] == 0, "First time step must be 0" + assert all(t0 <= t1 for t0, t1 in zip(indexes[:-1], indexes[1:])), ( + "Time steps must be increasing" + ) + return indexes + + @property + def horizon(self): + assert self.horizon_indexes[-1] < len(self._buffer), ( + "Size of buffer must be at least horizon_indexes[-1]." + ) + return [self._buffer[i] for i in self.horizon_indexes] + + def __len__(self): + return len(self._buffer) + + def __getitem__(self, index): + return self._buffer[index] - def horizon(self, horizon_size, dt_ocp): - # TBD improve this implementation in case the dt_mpc != dt_ocp - return self._buffer[: self._ocp.horizon_size] + def __setitem__(self, index, value): + self._buffer[index] = value diff --git a/agimus_controller/tests/test_buffer.py b/agimus_controller/tests/test_buffer.py new file mode 100644 index 00000000..6d1c762d --- /dev/null +++ b/agimus_controller/tests/test_buffer.py @@ -0,0 +1,134 @@ +from copy import deepcopy +import numpy as np +from random import randint +import unittest + + +from agimus_controller.trajectory import ( + TrajectoryBuffer, + TrajectoryPoint, + TrajectoryPointWeights, + WeightedTrajectoryPoint, +) + + +class TestTrajectoryBuffer(unittest.TestCase): + """ + TestOCPParamsCrocoBase unittests parameters settters and getters of OCPParamsBaseCrocoCroco class. + """ + + def __init__(self, methodName="runTest"): + super().__init__(methodName) + self.nv = randint(10, 100) # Number of dof in the robot velocity + self.nq = self.nv + 1 # Number of dof in the robot configuration + + self.trajectory_size = 100 + self.horizon_size = 10 + self.dt_factor_n_seq = [(1, self.horizon_size)] + self.dt = 0.01 + self.dt_ns = int(1e9 * self.dt) + + def generate_random_weighted_states(self, time_ns): + """ + Generate random data for the TrajectoryPointWeights. + """ + return WeightedTrajectoryPoint( + point=TrajectoryPoint( + time_ns=time_ns, + robot_configuration=np.random.random(self.nq), + robot_velocity=np.random.random(self.nv), + robot_acceleration=np.random.random(self.nv), + robot_effort=np.random.random(self.nv), + ), + weight=TrajectoryPointWeights( + w_robot_configuration=np.random.random(self.nv), + w_robot_velocity=np.random.random(self.nv), + w_robot_acceleration=np.random.random(self.nv), + w_robot_effort=np.random.random(self.nv), + ), + ) + + def test_append_data(self): + """ + Test adding points to the buffer. + """ + obj = TrajectoryBuffer(self.dt_factor_n_seq) + times_ns = np.arange( + 0, 30 * self.trajectory_size * self.dt_ns, self.dt_ns, dtype=int + ) + for time_ns in times_ns: + obj.append(self.generate_random_weighted_states(time_ns)) + + self.assertEqual(len(obj), times_ns.size) + + def test_clear_past(self): + """ + Test clearing the past of the buffer. + """ + obj = TrajectoryBuffer(self.dt_factor_n_seq) + times_ns = np.arange( + 0, 30 * self.trajectory_size * self.dt_ns, self.dt_ns, dtype=int + ) + for time_ns in times_ns: + obj.append(self.generate_random_weighted_states(time_ns)) + + obj.clear_past() + self.assertEqual(len(obj), times_ns.size - 1) + obj.clear_past() + self.assertEqual(len(obj), times_ns.size - 2) + obj.clear_past() + self.assertEqual(len(obj), times_ns.size - 3) + + def test_compute_horizon_index(self): + """ + Test computing the time indexes from dt_factor_n_seq. + """ + obj = TrajectoryBuffer(self.dt_factor_n_seq) + dt_factor_n_seq = [(1, 2), (2, 2), (3, 2), (4, 2), (5, 2)] + indexes_out = obj.compute_horizon_indexes(dt_factor_n_seq) + indexes_test = [0, 1, 3, 5, 8, 11, 15, 19, 24, 29] + np.testing.assert_equal(indexes_out, indexes_test) + + def test_horizon(self): + """ + Test computing the horizon from the dt_factor_n_seq format. + """ + obj = TrajectoryBuffer(self.dt_factor_n_seq) + times_ns = np.arange( + 0, 30 * self.trajectory_size * self.dt_ns, self.dt_ns, dtype=int + ) + for time_ns in times_ns: + obj.append(self.generate_random_weighted_states(time_ns)) + + horizon = obj.horizon + np.testing.assert_array_equal( + deepcopy(horizon), + obj[: len(horizon)], + ) + + def test_horizon_with_more_complex_dt_factor_n_seq(self): + """ + Test computing the horizon from complex dt_factor_n_seq. + """ + dt_factor_n_seq = [(1, 2), (2, 2), (3, 2), (4, 2), (5, 2)] + horizon_indexes = [0, 1, 3, 5, 8, 11, 15, 19, 24, 29] + + obj = TrajectoryBuffer(dt_factor_n_seq) + self.assertEqual(horizon_indexes, obj.horizon_indexes) + + # Fill the data in + times_ns = np.arange( + 0, 30 * self.trajectory_size * self.dt_ns, self.dt_ns, dtype=int + ) + for time_ns in times_ns: + obj.append(self.generate_random_weighted_states(time_ns)) + + # Get the horizon + np.testing.assert_array_equal( + deepcopy(obj.horizon), + [obj[index] for index in horizon_indexes], + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/agimus_controller/tests/test_ocp_croco_base.py b/agimus_controller/tests/test_ocp_croco_base.py index f4888f5d..05b2f507 100644 --- a/agimus_controller/tests/test_ocp_croco_base.py +++ b/agimus_controller/tests/test_ocp_croco_base.py @@ -136,7 +136,11 @@ def setUpClass(self): # Set mock parameters self.ocp_params = OCPParamsBaseCroco( - dt=0.1, horizon_size=10, solver_iters=100, callbacks=False + dt=0.1, + horizon_size=10, + dt_factor_n_seq=[(1, 10)], + solver_iters=100, + callbacks=False, ) self.state_reg = np.concatenate( (pin.neutral(self.robot_model), np.zeros(self.robot_model.nv)) diff --git a/agimus_controller/tests/test_ocp_croco_goal_reaching.py b/agimus_controller/tests/test_ocp_croco_goal_reaching.py index 19939176..705a836e 100644 --- a/agimus_controller/tests/test_ocp_croco_goal_reaching.py +++ b/agimus_controller/tests/test_ocp_croco_goal_reaching.py @@ -18,9 +18,9 @@ class TestOCPGoalReaching(unittest.TestCase): def setUp(self): ### LOAD ROBOT robot = robex.load("panda") - urdf = Path(robot.urdf) - srdf = Path(robot.urdf.replace("urdf", "srdf")) - urdf_meshes_dir = urdf.parent.parent.parent.parent.parent + urdf_path = Path(robot.urdf) + srdf_path = Path(robot.urdf.replace("urdf", "srdf")) + urdf_meshes_dir = urdf_path.parent.parent.parent.parent.parent free_flyer = False locked_joint_names = ["panda_finger_joint1", "panda_finger_joint2"] reduced_nq = robot.model.nq - len(locked_joint_names) @@ -34,8 +34,8 @@ def setUp(self): full_q0=full_q0, free_flyer=free_flyer, locked_joint_names=locked_joint_names, - urdf=urdf, - srdf=srdf, + urdf=urdf_path, + srdf=srdf_path, urdf_meshes_dir=urdf_meshes_dir, collision_as_capsule=True, self_collision=False, @@ -53,6 +53,7 @@ def setUp(self): self._ocp_params = OCPParamsBaseCroco( dt=dt, horizon_size=horizon_size, + dt_factor_n_seq=[(1, horizon_size)], solver_iters=solver_iters, callbacks=callbacks, ) diff --git a/agimus_controller/tests/test_ocp_param_base.py b/agimus_controller/tests/test_ocp_param_base.py index 0855f7b4..0555f64b 100644 --- a/agimus_controller/tests/test_ocp_param_base.py +++ b/agimus_controller/tests/test_ocp_param_base.py @@ -18,6 +18,7 @@ def test_initialization(self): params = { "dt": 0.01, "horizon_size": 100, + "dt_factor_n_seq": [(1, 100)], "solver_iters": 50, "qp_iters": 200, "termination_tolerance": 1e-3,