diff --git a/spot_wrapper/spot_arm.py b/spot_wrapper/spot_arm.py index c671b88..b1ba226 100644 --- a/spot_wrapper/spot_arm.py +++ b/spot_wrapper/spot_arm.py @@ -21,7 +21,7 @@ from bosdyn.client.time_sync import TimeSyncEndpoint from bosdyn.util import seconds_to_duration -from spot_wrapper.wrapper_helpers import RobotState +from spot_wrapper.wrapper_helpers import RobotState, ClaimAndPowerDecorator class SpotArm: @@ -34,13 +34,19 @@ def __init__( manipulation_api_client: ManipulationApiClient, robot_state_client: RobotStateClient, max_command_duration: float, + claim_and_power_decorator: ClaimAndPowerDecorator, ) -> None: """ Constructor for SpotArm class. Args: robot: Robot object logger: Logger object + robot_state: Object containing the robot's state as controlled by the wrapper + robot_command_client: Command client to use to send commands to the robot + manipulation_api_client: Command client to send manipulation commands to the robot + robot_state_client: Client to retrieve state of the robot max_command_duration: Maximum duration for commands when using the manipulation command method + claim_and_power_decorator: Object to use to decorate the functions on this object """ self._robot = robot self._logger = logger @@ -49,6 +55,23 @@ def __init__( self._robot_command_client = robot_command_client self._manipulation_api_client = manipulation_api_client self._robot_state_client = robot_state_client + self._claim_and_power_decorator = claim_and_power_decorator + self._claim_and_power_decorator.decorate_functions( + self, + decorated_funcs=[ + self.ensure_arm_power_and_stand, + self.arm_stow, + self.arm_unstow, + self.arm_carry, + self.arm_joint_move, + self.force_trajectory, + self.gripper_open, + self.gripper_close, + self.gripper_angle_open, + self.hand_pose, + self.grasp_3d, + ], + ) def _manipulation_request( self, diff --git a/spot_wrapper/spot_docking.py b/spot_wrapper/spot_docking.py index 63a88f8..085152c 100644 --- a/spot_wrapper/spot_docking.py +++ b/spot_wrapper/spot_docking.py @@ -6,7 +6,11 @@ from bosdyn.client.docking import DockingClient, blocking_dock_robot, blocking_undock from bosdyn.client.robot import Robot -from spot_wrapper.wrapper_helpers import RobotState, RobotCommandData +from spot_wrapper.wrapper_helpers import ( + RobotState, + RobotCommandData, + ClaimAndPowerDecorator, +) class SpotDocking: @@ -22,6 +26,7 @@ def __init__( command_data: RobotCommandData, docking_client: DockingClient, robot_command_client: robot_command.RobotCommandClient, + claim_and_power_decorator: ClaimAndPowerDecorator, ) -> None: self._robot = robot self._logger = logger @@ -29,6 +34,12 @@ def __init__( self._docking_client: DockingClient = docking_client self._robot_command_client = robot_command_client self._robot_state = robot_state + self._claim_and_power_decorator = claim_and_power_decorator + # Decorate the functions so that they take the lease. Dock function needs to power on because it might have + # to move the robot, the undock + self._claim_and_power_decorator.decorate_functions( + self, decorated_funcs=[self.dock, self.undock] + ) def dock(self, dock_id: int) -> typing.Tuple[bool, str]: """Dock the robot to the docking station with fiducial ID [dock_id].""" diff --git a/spot_wrapper/spot_images.py b/spot_wrapper/spot_images.py index 38c4d4b..9cc2325 100644 --- a/spot_wrapper/spot_images.py +++ b/spot_wrapper/spot_images.py @@ -114,12 +114,12 @@ class ImageQualityConfig: Dataclass to store configuration of image quality. Default values are the default for the build_image_request """ - DEFAULT_QUALITY = 75 + DEFAULT_QUALITY = 75.0 - robot_depth_quality: int = DEFAULT_QUALITY - robot_image_quality: int = DEFAULT_QUALITY - hand_image_quality: int = DEFAULT_QUALITY - hand_depth_quality: int = DEFAULT_QUALITY + robot_depth_quality: float = DEFAULT_QUALITY + robot_image_quality: float = DEFAULT_QUALITY + hand_image_quality: float = DEFAULT_QUALITY + hand_depth_quality: float = DEFAULT_QUALITY class SpotImages: diff --git a/spot_wrapper/wrapper.py b/spot_wrapper/wrapper.py index 92fae65..cb12707 100644 --- a/spot_wrapper/wrapper.py +++ b/spot_wrapper/wrapper.py @@ -81,7 +81,7 @@ from .spot_images import SpotImages from .spot_world_objects import SpotWorldObjects -from .wrapper_helpers import RobotCommandData, RobotState +from .wrapper_helpers import RobotCommandData, RobotState, ClaimAndPowerDecorator """Service name for getting pointcloud of VLP16 connected to Spot Core""" point_cloud_sources = ["velodyne-point-cloud"] @@ -376,38 +376,6 @@ def _start_query(self): pass -def try_claim(func=None, *, power_on=False): - """ - Decorator which tries to acquire the lease before executing the wrapped function - - the _func=None and * args are required to allow this decorator to be used with or without arguments - - Args: - func: Function that is being wrapped - power_on: If true, power on after claiming the lease - - Returns: - Decorator which will wrap the decorated function - """ - # If this decorator is being used without the power_on arg, return it as if it was called with that arg specified - if func is None: - return functools.partial(try_claim, power_on=power_on) - - @functools.wraps(func) - def wrapper_try_claim(self, *args, **kwargs): - if self._get_lease_on_action: - if power_on: - # Power on is also wrapped by this decorator so if we request power on the lease will also be claimed - response = self.power_on() - else: - response = self.claim() - if not response[0]: - return response - return func(self, *args, **kwargs) - - return wrapper_try_claim - - class SpotWrapper: """Generic wrapper class to encompass release 1.1.4 API features as well as maintaining leases automatically""" @@ -456,7 +424,10 @@ def __init__( self._rates = rates or {} self._callbacks = callbacks or {} self._use_take_lease = use_take_lease - self._get_lease_on_action = get_lease_on_action + self._claim_decorator = ClaimAndPowerDecorator( + self.power_on, self.claim, get_lease_on_action + ) + self.decorate_functions() self._continually_try_stand = continually_try_stand self._rgb_cameras = rgb_cameras self._frame_prefix = "" @@ -650,6 +621,7 @@ def __init__( self._manipulation_api_client, self._robot_state_client, MAX_COMMAND_DURATION, + self._claim_decorator, ) else: self._spot_arm = None @@ -663,6 +635,7 @@ def __init__( self._command_data, self._docking_client, self._robot_command_client, + self._claim_decorator, ) self._spot_graph_nav = SpotGraphNav( @@ -707,6 +680,40 @@ def __init__( self._robot_id = None self._lease = None + def decorate_functions(self): + """ + Many of the functions in the wrapper need to have the lease claimed and the robot powered on before they will + function. The TryClaimDecorator object includes a decorator which is the mechanism we use to make sure that + is the case, assuming the get_lease_on_action variable is true. Otherwise, it is up to the user to ensure + that the lease is claimed and the power is on before running commands, otherwise the commands will fail. + """ + decorated_funcs = [ + self.stop, + self.self_right, + self.sit, + self.simple_stand, + self.stand, + self.battery_change_pose, + self.velocity_cmd, + self.trajectory_cmd, + self.navigate_to, + self._navigate_to, + self._navigate_route, + self.execute_dance, + self._robot_command, + self._manipulation_request, + ] + decorated_funcs_no_power = [ + self.stop, + self.power_on, + self.safe_power_off, + self.toggle_power, + ] + + self._claim_decorator.decorate_functions( + self, decorated_funcs, decorated_funcs_no_power + ) + @staticmethod def authenticate( robot: Robot, username: str, password: str, logger: logging.Logger @@ -1147,7 +1154,6 @@ def _manipulation_request( self._logger.error(f"Unable to execute manipulation command: {e}") return False, str(e), None - @try_claim def stop(self) -> typing.Tuple[bool, str]: """ Stop any action the robot is currently doing. @@ -1159,7 +1165,6 @@ def stop(self) -> typing.Tuple[bool, str]: response = self._robot_command(RobotCommandBuilder.stop_command()) return response[0], response[1] - @try_claim(power_on=True) def self_right(self) -> typing.Tuple[bool, str]: """ Have the robot self-right. @@ -1170,7 +1175,6 @@ def self_right(self) -> typing.Tuple[bool, str]: response = self._robot_command(RobotCommandBuilder.selfright_command()) return response[0], response[1] - @try_claim(power_on=True) def sit(self) -> typing.Tuple[bool, str]: """ Stop the robot's motion and sit down if able. @@ -1183,7 +1187,6 @@ def sit(self) -> typing.Tuple[bool, str]: self.last_sit_command = response[2] return response[0], response[1] - @try_claim(power_on=True) def simple_stand(self, monitor_command: bool = True) -> typing.Tuple[bool, str]: """ If the e-stop is enabled, and the motor power is enabled, stand the robot up. @@ -1198,7 +1201,6 @@ def simple_stand(self, monitor_command: bool = True) -> typing.Tuple[bool, str]: self.last_stand_command = response[2] return response[0], response[1] - @try_claim(power_on=True) def stand( self, monitor_command: bool = True, @@ -1242,7 +1244,6 @@ def stand( self.last_stand_command = response[2] return response[0], response[1] - @try_claim(power_on=True) def battery_change_pose(self, dir_hint: int = 1) -> typing.Tuple[bool, str]: """ Put the robot into the battery change pose @@ -1260,7 +1261,6 @@ def battery_change_pose(self, dir_hint: int = 1) -> typing.Tuple[bool, str]: return response[0], response[1] return False, "Call sit before trying to roll over" - @try_claim def safe_power_off(self) -> typing.Tuple[bool, str]: """ Stop the robot's motion and sit if possible. Once sitting, disable motor power. @@ -1288,7 +1288,6 @@ def clear_behavior_fault( except Exception as e: return False, f"Exception while clearing behavior fault: {e}", None - @try_claim def power_on(self) -> typing.Tuple[bool, str]: """ Enable the motor power if e-stop is enabled. @@ -1325,7 +1324,6 @@ def get_mobility_params(self) -> spot_command_pb2.MobilityParams: """Get mobility params""" return self._mobility_params - @try_claim def velocity_cmd( self, v_x: float, v_y: float, v_rot: float, cmd_duration: float = 0.125 ) -> typing.Tuple[bool, str]: @@ -1353,7 +1351,6 @@ def velocity_cmd( self.last_velocity_command_time = end_time return response[0], response[1] - @try_claim def trajectory_cmd( self, goal_x: float, @@ -1462,7 +1459,6 @@ def get_manipulation_command_feedback(self, cmd_id): manipulation_api_feedback_request=feedback_request ) - @try_claim def toggle_power(self, should_power_on): """Power the robot on/off dependent on the current power state.""" is_powered_on = self.check_is_powered_on() @@ -1500,7 +1496,6 @@ def check_is_powered_on(self) -> bool: self._powered_on = power_state.motor_power_state == power_state.STATE_ON return self._powered_on - @try_claim def execute_dance(self, data): if self._is_licensed_for_choreography: return self._spot_dance.execute_dance(data) diff --git a/spot_wrapper/wrapper_helpers.py b/spot_wrapper/wrapper_helpers.py index c824cba..bf2f816 100644 --- a/spot_wrapper/wrapper_helpers.py +++ b/spot_wrapper/wrapper_helpers.py @@ -1,9 +1,119 @@ """Helper classes for the wrapper. This file is necessary to prevent circular imports caused by the modules also using these classes""" import typing +import functools from dataclasses import dataclass +class ClaimAndPowerDecorator: + """ + Some functions in the wrapper require the lease to be claimed and the robot to be powered on before they can + function. + + This class provides a portable way of wrapping the functions of the wrapper or modules to enable them to do + that. It can be passed around to modules which can then decorate their functions with it, allowing them to claim + and power on as needed. + + Note that this decorator is not intended to be applied using the @ syntax. It should be applied during or after + object instantiation. + """ + + def __init__( + self, + power_on_function: typing.Callable[[], typing.Tuple[bool, str]], + claim_function: typing.Callable[[], typing.Tuple[bool, str]], + get_lease_on_action: bool = False, + ) -> None: + self.power_on = power_on_function + self.claim = claim_function + self._get_lease_on_action = get_lease_on_action + + def _make_function_take_lease_and_power_on( + self, func: typing.Callable, power_on: bool = True + ) -> typing.Callable: + """ + Decorator which tries to acquire the lease before executing the wrapped function + + Args: + func: Function that is being wrapped + power_on: If true, power on after claiming the lease. For the vast majority of cases this is needed + + Returns: + Decorator which will wrap the decorated function + """ + + @functools.wraps(func) + def wrapper_try_claim(*args, **kwargs) -> typing.Callable: + # Note that because we are assuming that this decorator is used only on instantiated classes, + # this function does not take a self arg. The self arg is necessary when using the @ syntax because at + # that point the class has not yet been instantiated. In this case, the func we receive is already a bound + # method, as opposed to an unbound one. A bound function has the "self" instance built in. + if self._get_lease_on_action: + # Ignore success or failure of these functions. If they fail, then the function that is being wrapped + # will fail and the caller will be able to handle from there. + self.claim() + if power_on: + self.power_on() + return func(*args, **kwargs) + + return wrapper_try_claim + + def make_function_take_lease_and_power_on( + self, decorated_object, function: typing.Callable, power_on: bool = True + ) -> None: + """ + Decorate a function of an object with this class's decorator. After being decorated, when the function is + called, it will forcefully take the lease, then power on the robot if that option is specified. + + Args: + decorated_object: The object whose function is to be decorated + function: The function to be decorated + power_on: If true, power on the robot after claiming + + Raises: + AttributeError: if the object passed does not have a function with the same name as the given function + """ + function_name = function.__name__ + if not hasattr(decorated_object, function_name): + raise AttributeError( + f"Requested decoration of function {function_name} of object {decorated_object}, but the object does " + f"not have a function by that name." + ) + + setattr( + decorated_object, + function_name, + self._make_function_take_lease_and_power_on(function, power_on=power_on), + ) + + def decorate_functions( + self, + decorated_object, + decorated_funcs: typing.List[typing.Callable], + decorated_funcs_no_power: typing.Optional[typing.List[typing.Callable]] = None, + ) -> None: + """ + Decorate the specified functions of the given object with this class's decorator. + + Args: + decorated_object: Object which contains the functions to be decorated + decorated_funcs: List of the functions of the object which should be decorated. When called, + these functions will forcefully take the lease and power on the robot + decorated_funcs_no_power: Same as decorated_funcs, but will calling these will not power on the robot. + """ + + for func in decorated_funcs: + self.make_function_take_lease_and_power_on(decorated_object, func) + + if decorated_funcs_no_power is None: + decorated_funcs_no_power = [] + + for func in decorated_funcs_no_power: + self.make_function_take_lease_and_power_on( + decorated_object, func, power_on=False + ) + + @dataclass() class RobotState: """