diff --git a/spot_wrapper/spot_mission_wrapper.py b/spot_wrapper/spot_mission_wrapper.py deleted file mode 100644 index cd4b967..0000000 --- a/spot_wrapper/spot_mission_wrapper.py +++ /dev/null @@ -1,192 +0,0 @@ -import logging -from typing import Optional - -from bosdyn.api.mission import nodes_pb2 -from bosdyn.client import RpcError, robot_command -from bosdyn.client.lease import LeaseClient, LeaseWallet -from bosdyn.client.robot import Robot -from bosdyn.mission.client import ( - CompilationError, - MissionClient, - NoMissionError, - NoMissionPlayingError, - ValidationError, -) - -from spot_wrapper.wrapper_helpers import RobotState - - -class SpotMission: - """ - Allow access to mission functionality through the SDK - """ - - def __init__( - self, - robot: Robot, - logger: logging.Logger, - robot_state: RobotState, - mission_client: MissionClient, - robot_command_client: robot_command.RobotCommandClient, - lease_client: LeaseClient, - ) -> None: - self._robot = robot - self._logger = logger - self._mission_client: MissionClient = mission_client - self._robot_command_client = robot_command_client - self._lease_client = lease_client - self._robot_state = robot_state - self._spot_check_resp = None - self._lease = None - self._lease_wallet: LeaseWallet = self._lease_client.lease_wallet - - def load_mission(self, root: nodes_pb2.Node, leases=None, data_chunk_byte_size: Optional[int] = None): - """Load a mission - Args: - root: Root node in a mission. - leases: All leases necessary to initialize a mission. - data_chunk_byte_size: Optional max size of each streamed message - Raises: - RpcError: Problem communicating with the robot. - bosdyn.mission.client.CompilationError: The mission failed to compile. - bosdyn.mission.client.ValidationError: The mission failed to validate. - """ - if leases is None: - leases = [] - if data_chunk_byte_size: - return self._load_mission_as_chunks(root, leases, data_chunk_byte_size) - try: - return self._mission_client.load_mission_async(root, leases) - except RpcError: - return False, "Could not communicate with the robot" - except CompilationError as e: - return False, f"The mission failed to compile: {e}" - except ValidationError as e: - return False, f"The mission could not be validated: {e}" - - def _load_mission_as_chunks(self, root: nodes_pb2.Node, leases=None, data_chunk_byte_size: int = 1000 * 1000): - """Load a mission onto the robot. - Args: - root: Root node in a mission. - leases: All leases necessary to initialize a mission. - data_chunk_byte_size: max size of each streamed message - Raises: - RpcError: Problem communicating with the robot. - bosdyn.mission.client.CompilationError: The mission failed to compile. - bosdyn.mission.client.ValidationError: The mission failed to validate. - """ - if leases is None: - leases = [] - try: - return self._mission_client.load_mission_as_chunks2(root, leases, data_chunk_byte_size) - except RpcError: - return False, "Could not communicate with the robot" - except CompilationError as e: - return False, f"The mission failed to compile: {e}" - except ValidationError as e: - return False, f"The mission could not be validated: {e}" - - def get_mission_info(self): - """Get static information about the loaded mission. - - Raises: - RpcError: Problem communicating with the robot. - """ - try: - return self._mission_client.get_info() - except RpcError: - return False, "Could not communicate with the robot" - - def play_mission( - self, - pause_time_secs: int, - leases=None, - settings=None, - ): - """Play loaded mission or continue a paused mission - Args: - pause_time_secs: Absolute time when the mission should pause execution. Subsequent RPCs - will override this value, so you can use this to say "if you don't hear from me again, - stop running the mission at this time." - leases: Leases the mission service will need to use. Unlike other clients, these MUST - be specified. - Raises: - RpcError: Problem communicating with the robot. - NoMissionError: No mission Loaded. - """ - if leases is None: - leases = [] - try: - return self._mission_client.play_mission_async(pause_time_secs, leases, settings) - except RpcError: - return False, "Could not communicate with the robot" - except NoMissionError: - return False, "No mission loaded" - - def get_mission_state( - self, - upper_tick_bound: Optional[int] = None, - lower_tick_bound: Optional[int] = None, - past_ticks: Optional[int] = None, - ): - """Get the state of the current playing mission - Raises: - RpcError: Problem communicating with the robot. - NoMissionPlayingError: No mission playing. - """ - try: - return self._mission_client.get_state_async(upper_tick_bound, lower_tick_bound, past_ticks) - except RpcError: - return False, "Could not communicate with the robot" - except NoMissionPlayingError: - return False, "No mission playing" - - def pause_mission(self): - """Pause the current mission - Raises: - RpcError: Problem communicating with the robot. - NoMissionPlayingError: No mission playing. - """ - try: - return self._mission_client.pause_mission_async() - except RpcError: - return False, "Could not communicate with the robot" - except NoMissionPlayingError: - return False, "No mission playing" - - def restart_mission(self, pause_time_secs, leases=None, settings=None): - """Restart mission from the beginning - Args: - pause_time_secs: Absolute time when the mission should pause execution. Subsequent RPCs - to RestartMission will override this value, so you can use this to say "if you don't hear - from me again, stop running the mission at this time." - leases: Leases the mission service will need to use. Unlike other clients, these MUST - be specified. - Raises: - RpcError: Problem communicating with the robot. - NoMissionError: No Mission Loaded. - bosdyn.mission.client.ValidationError: The mission failed to validate. - """ - if leases is None: - leases = [] - try: - return self._mission_client.restart_mission_async(pause_time_secs, leases, settings) - except RpcError: - return False, "Could not communicate with the robot" - except NoMissionError: - return False, "No mission loaded" - except ValidationError as e: - return False, f"The mission could not be validated: {e}" - - def stop_mission(self): - """Stop the current mission - Raises: - RpcError: Problem communicating with the robot. - NoMissionPlayingError: No mission playing. - """ - try: - return self._mission_client.stop_mission_async() - except RpcError: - return False, "Could not communicate with the robot" - except NoMissionPlayingError: - return False, "No mission playing" diff --git a/spot_wrapper/wrapper.py b/spot_wrapper/wrapper.py index 65e1bbd..a7a1030 100644 --- a/spot_wrapper/wrapper.py +++ b/spot_wrapper/wrapper.py @@ -55,7 +55,6 @@ from bosdyn.client.time_sync import TimeSyncEndpoint from bosdyn.client.world_object import WorldObjectClient from bosdyn.geometry import EulerZXY -from bosdyn.mission.client import MissionClient from google.protobuf.timestamp_pb2 import Timestamp from .spot_arm import SpotArm @@ -65,7 +64,6 @@ from .spot_eap import SpotEAP from .spot_graph_nav import SpotGraphNav from .spot_images import SpotImages -from .spot_mission_wrapper import SpotMission from .spot_world_objects import SpotWorldObjects from .wrapper_helpers import ClaimAndPowerDecorator, RobotCommandData, RobotState @@ -394,9 +392,7 @@ def __init__( self._command_data = RobotCommandData() try: - self._sdk = create_standard_sdk( - SPOT_CLIENT_NAME, service_clients=[MissionClient], cert_resource_glob=cert_resource_glob - ) + self._sdk = create_standard_sdk(SPOT_CLIENT_NAME, cert_resource_glob=cert_resource_glob) except Exception as e: self._logger.error("Error creating SDK object: %s", e) self._valid = False @@ -441,7 +437,6 @@ def __init__( self._estop_client = self._robot.ensure_client(EstopClient.default_service_name) self._docking_client = self._robot.ensure_client(DockingClient.default_service_name) self._spot_check_client = self._robot.ensure_client(SpotCheckClient.default_service_name) - self._mission_client = self._robot.ensure_client(MissionClient.default_service_name) self._license_client = self._robot.ensure_client(LicenseClient.default_service_name) if self._robot.has_arm(): self._gripper_cam_param_client = self._robot.ensure_client( @@ -526,15 +521,6 @@ def __init__( self._lease_client, ) - self._spot_mission = SpotMission( - self._robot, - self._logger, - self._state, - self._mission_client, - self._robot_command_client, - self._lease_client, - ) - if self._robot.has_arm(): self._spot_arm = SpotArm( self._robot, @@ -740,11 +726,6 @@ def spot_check(self) -> SpotCheck: """Return SpotCheck instance""" return self._spot_check - @property - def spot_mission(self) -> SpotMission: - """Return SpotMission instance""" - return self._spot_mission - @property def spot_eap_lidar(self) -> typing.Optional[SpotEAP]: """Return SpotEAP instance"""