Skip to content

Commit

Permalink
Merge branch 'main' into modular-graphnav
Browse files Browse the repository at this point in the history
  • Loading branch information
heuristicus committed Aug 3, 2023
2 parents 9b0eedf + dca6c69 commit ae55ef4
Show file tree
Hide file tree
Showing 5 changed files with 192 additions and 53 deletions.
25 changes: 24 additions & 1 deletion spot_wrapper/spot_arm.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from bosdyn.client.time_sync import TimeSyncEndpoint
from bosdyn.util import seconds_to_duration

from spot_wrapper.wrapper_helpers import RobotState
from spot_wrapper.wrapper_helpers import RobotState, ClaimAndPowerDecorator


class SpotArm:
Expand All @@ -34,13 +34,19 @@ def __init__(
manipulation_api_client: ManipulationApiClient,
robot_state_client: RobotStateClient,
max_command_duration: float,
claim_and_power_decorator: ClaimAndPowerDecorator,
) -> None:
"""
Constructor for SpotArm class.
Args:
robot: Robot object
logger: Logger object
robot_state: Object containing the robot's state as controlled by the wrapper
robot_command_client: Command client to use to send commands to the robot
manipulation_api_client: Command client to send manipulation commands to the robot
robot_state_client: Client to retrieve state of the robot
max_command_duration: Maximum duration for commands when using the manipulation command method
claim_and_power_decorator: Object to use to decorate the functions on this object
"""
self._robot = robot
self._logger = logger
Expand All @@ -49,6 +55,23 @@ def __init__(
self._robot_command_client = robot_command_client
self._manipulation_api_client = manipulation_api_client
self._robot_state_client = robot_state_client
self._claim_and_power_decorator = claim_and_power_decorator
self._claim_and_power_decorator.decorate_functions(
self,
decorated_funcs=[
self.ensure_arm_power_and_stand,
self.arm_stow,
self.arm_unstow,
self.arm_carry,
self.arm_joint_move,
self.force_trajectory,
self.gripper_open,
self.gripper_close,
self.gripper_angle_open,
self.hand_pose,
self.grasp_3d,
],
)

def _manipulation_request(
self,
Expand Down
13 changes: 12 additions & 1 deletion spot_wrapper/spot_docking.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
from bosdyn.client.docking import DockingClient, blocking_dock_robot, blocking_undock
from bosdyn.client.robot import Robot

from spot_wrapper.wrapper_helpers import RobotState, RobotCommandData
from spot_wrapper.wrapper_helpers import (
RobotState,
RobotCommandData,
ClaimAndPowerDecorator,
)


class SpotDocking:
Expand All @@ -22,13 +26,20 @@ def __init__(
command_data: RobotCommandData,
docking_client: DockingClient,
robot_command_client: robot_command.RobotCommandClient,
claim_and_power_decorator: ClaimAndPowerDecorator,
) -> None:
self._robot = robot
self._logger = logger
self._command_data = command_data
self._docking_client: DockingClient = docking_client
self._robot_command_client = robot_command_client
self._robot_state = robot_state
self._claim_and_power_decorator = claim_and_power_decorator
# Decorate the functions so that they take the lease. Dock function needs to power on because it might have
# to move the robot, the undock
self._claim_and_power_decorator.decorate_functions(
self, decorated_funcs=[self.dock, self.undock]
)

def dock(self, dock_id: int) -> typing.Tuple[bool, str]:
"""Dock the robot to the docking station with fiducial ID [dock_id]."""
Expand Down
10 changes: 5 additions & 5 deletions spot_wrapper/spot_images.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,12 @@ class ImageQualityConfig:
Dataclass to store configuration of image quality. Default values are the default for the build_image_request
"""

DEFAULT_QUALITY = 75
DEFAULT_QUALITY = 75.0

robot_depth_quality: int = DEFAULT_QUALITY
robot_image_quality: int = DEFAULT_QUALITY
hand_image_quality: int = DEFAULT_QUALITY
hand_depth_quality: int = DEFAULT_QUALITY
robot_depth_quality: float = DEFAULT_QUALITY
robot_image_quality: float = DEFAULT_QUALITY
hand_image_quality: float = DEFAULT_QUALITY
hand_depth_quality: float = DEFAULT_QUALITY


class SpotImages:
Expand Down
87 changes: 41 additions & 46 deletions spot_wrapper/wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@
from .spot_images import SpotImages
from .spot_world_objects import SpotWorldObjects

from .wrapper_helpers import RobotCommandData, RobotState
from .wrapper_helpers import RobotCommandData, RobotState, ClaimAndPowerDecorator

"""Service name for getting pointcloud of VLP16 connected to Spot Core"""
point_cloud_sources = ["velodyne-point-cloud"]
Expand Down Expand Up @@ -376,38 +376,6 @@ def _start_query(self):
pass


def try_claim(func=None, *, power_on=False):
"""
Decorator which tries to acquire the lease before executing the wrapped function
the _func=None and * args are required to allow this decorator to be used with or without arguments
Args:
func: Function that is being wrapped
power_on: If true, power on after claiming the lease
Returns:
Decorator which will wrap the decorated function
"""
# If this decorator is being used without the power_on arg, return it as if it was called with that arg specified
if func is None:
return functools.partial(try_claim, power_on=power_on)

@functools.wraps(func)
def wrapper_try_claim(self, *args, **kwargs):
if self._get_lease_on_action:
if power_on:
# Power on is also wrapped by this decorator so if we request power on the lease will also be claimed
response = self.power_on()
else:
response = self.claim()
if not response[0]:
return response
return func(self, *args, **kwargs)

return wrapper_try_claim


class SpotWrapper:
"""Generic wrapper class to encompass release 1.1.4 API features as well as maintaining leases automatically"""

Expand Down Expand Up @@ -456,7 +424,10 @@ def __init__(
self._rates = rates or {}
self._callbacks = callbacks or {}
self._use_take_lease = use_take_lease
self._get_lease_on_action = get_lease_on_action
self._claim_decorator = ClaimAndPowerDecorator(
self.power_on, self.claim, get_lease_on_action
)
self.decorate_functions()
self._continually_try_stand = continually_try_stand
self._rgb_cameras = rgb_cameras
self._frame_prefix = ""
Expand Down Expand Up @@ -650,6 +621,7 @@ def __init__(
self._manipulation_api_client,
self._robot_state_client,
MAX_COMMAND_DURATION,
self._claim_decorator,
)
else:
self._spot_arm = None
Expand All @@ -663,6 +635,7 @@ def __init__(
self._command_data,
self._docking_client,
self._robot_command_client,
self._claim_decorator,
)

self._spot_graph_nav = SpotGraphNav(
Expand Down Expand Up @@ -707,6 +680,40 @@ def __init__(
self._robot_id = None
self._lease = None

def decorate_functions(self):
"""
Many of the functions in the wrapper need to have the lease claimed and the robot powered on before they will
function. The TryClaimDecorator object includes a decorator which is the mechanism we use to make sure that
is the case, assuming the get_lease_on_action variable is true. Otherwise, it is up to the user to ensure
that the lease is claimed and the power is on before running commands, otherwise the commands will fail.
"""
decorated_funcs = [
self.stop,
self.self_right,
self.sit,
self.simple_stand,
self.stand,
self.battery_change_pose,
self.velocity_cmd,
self.trajectory_cmd,
self.navigate_to,
self._navigate_to,
self._navigate_route,
self.execute_dance,
self._robot_command,
self._manipulation_request,
]
decorated_funcs_no_power = [
self.stop,
self.power_on,
self.safe_power_off,
self.toggle_power,
]

self._claim_decorator.decorate_functions(
self, decorated_funcs, decorated_funcs_no_power
)

@staticmethod
def authenticate(
robot: Robot, username: str, password: str, logger: logging.Logger
Expand Down Expand Up @@ -1147,7 +1154,6 @@ def _manipulation_request(
self._logger.error(f"Unable to execute manipulation command: {e}")
return False, str(e), None

@try_claim
def stop(self) -> typing.Tuple[bool, str]:
"""
Stop any action the robot is currently doing.
Expand All @@ -1159,7 +1165,6 @@ def stop(self) -> typing.Tuple[bool, str]:
response = self._robot_command(RobotCommandBuilder.stop_command())
return response[0], response[1]

@try_claim(power_on=True)
def self_right(self) -> typing.Tuple[bool, str]:
"""
Have the robot self-right.
Expand All @@ -1170,7 +1175,6 @@ def self_right(self) -> typing.Tuple[bool, str]:
response = self._robot_command(RobotCommandBuilder.selfright_command())
return response[0], response[1]

@try_claim(power_on=True)
def sit(self) -> typing.Tuple[bool, str]:
"""
Stop the robot's motion and sit down if able.
Expand All @@ -1183,7 +1187,6 @@ def sit(self) -> typing.Tuple[bool, str]:
self.last_sit_command = response[2]
return response[0], response[1]

@try_claim(power_on=True)
def simple_stand(self, monitor_command: bool = True) -> typing.Tuple[bool, str]:
"""
If the e-stop is enabled, and the motor power is enabled, stand the robot up.
Expand All @@ -1198,7 +1201,6 @@ def simple_stand(self, monitor_command: bool = True) -> typing.Tuple[bool, str]:
self.last_stand_command = response[2]
return response[0], response[1]

@try_claim(power_on=True)
def stand(
self,
monitor_command: bool = True,
Expand Down Expand Up @@ -1242,7 +1244,6 @@ def stand(
self.last_stand_command = response[2]
return response[0], response[1]

@try_claim(power_on=True)
def battery_change_pose(self, dir_hint: int = 1) -> typing.Tuple[bool, str]:
"""
Put the robot into the battery change pose
Expand All @@ -1260,7 +1261,6 @@ def battery_change_pose(self, dir_hint: int = 1) -> typing.Tuple[bool, str]:
return response[0], response[1]
return False, "Call sit before trying to roll over"

@try_claim
def safe_power_off(self) -> typing.Tuple[bool, str]:
"""
Stop the robot's motion and sit if possible. Once sitting, disable motor power.
Expand Down Expand Up @@ -1288,7 +1288,6 @@ def clear_behavior_fault(
except Exception as e:
return False, f"Exception while clearing behavior fault: {e}", None

@try_claim
def power_on(self) -> typing.Tuple[bool, str]:
"""
Enable the motor power if e-stop is enabled.
Expand Down Expand Up @@ -1325,7 +1324,6 @@ def get_mobility_params(self) -> spot_command_pb2.MobilityParams:
"""Get mobility params"""
return self._mobility_params

@try_claim
def velocity_cmd(
self, v_x: float, v_y: float, v_rot: float, cmd_duration: float = 0.125
) -> typing.Tuple[bool, str]:
Expand Down Expand Up @@ -1353,7 +1351,6 @@ def velocity_cmd(
self.last_velocity_command_time = end_time
return response[0], response[1]

@try_claim
def trajectory_cmd(
self,
goal_x: float,
Expand Down Expand Up @@ -1462,7 +1459,6 @@ def get_manipulation_command_feedback(self, cmd_id):
manipulation_api_feedback_request=feedback_request
)

@try_claim
def toggle_power(self, should_power_on):
"""Power the robot on/off dependent on the current power state."""
is_powered_on = self.check_is_powered_on()
Expand Down Expand Up @@ -1500,7 +1496,6 @@ def check_is_powered_on(self) -> bool:
self._powered_on = power_state.motor_power_state == power_state.STATE_ON
return self._powered_on

@try_claim
def execute_dance(self, data):
if self._is_licensed_for_choreography:
return self._spot_dance.execute_dance(data)
Expand Down
Loading

0 comments on commit ae55ef4

Please sign in to comment.