-
Notifications
You must be signed in to change notification settings - Fork 23
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[SW-949] Add Mission Client in spot_wrapper (#105)
* started building client for mission * filled in spot_mission wrapper * mission client wrapper done; testing/debugging * PR comments edits
- Loading branch information
1 parent
a630665
commit 83d70bb
Showing
2 changed files
with
212 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,192 @@ | ||
import logging | ||
from typing import Optional | ||
|
||
from bosdyn.api.mission import nodes_pb2 | ||
from bosdyn.client import RpcError, robot_command | ||
from bosdyn.client.lease import LeaseClient, LeaseWallet | ||
from bosdyn.client.robot import Robot | ||
from bosdyn.mission.client import ( | ||
CompilationError, | ||
MissionClient, | ||
NoMissionError, | ||
NoMissionPlayingError, | ||
ValidationError, | ||
) | ||
|
||
from spot_wrapper.wrapper_helpers import RobotState | ||
|
||
|
||
class SpotMission: | ||
""" | ||
Allow access to mission functionality through the SDK | ||
""" | ||
|
||
def __init__( | ||
self, | ||
robot: Robot, | ||
logger: logging.Logger, | ||
robot_state: RobotState, | ||
mission_client: MissionClient, | ||
robot_command_client: robot_command.RobotCommandClient, | ||
lease_client: LeaseClient, | ||
) -> None: | ||
self._robot = robot | ||
self._logger = logger | ||
self._mission_client: MissionClient = mission_client | ||
self._robot_command_client = robot_command_client | ||
self._lease_client = lease_client | ||
self._robot_state = robot_state | ||
self._spot_check_resp = None | ||
self._lease = None | ||
self._lease_wallet: LeaseWallet = self._lease_client.lease_wallet | ||
|
||
def load_mission(self, root: nodes_pb2.Node, leases=None, data_chunk_byte_size: Optional[int] = None): | ||
"""Load a mission | ||
Args: | ||
root: Root node in a mission. | ||
leases: All leases necessary to initialize a mission. | ||
data_chunk_byte_size: Optional max size of each streamed message | ||
Raises: | ||
RpcError: Problem communicating with the robot. | ||
bosdyn.mission.client.CompilationError: The mission failed to compile. | ||
bosdyn.mission.client.ValidationError: The mission failed to validate. | ||
""" | ||
if leases is None: | ||
leases = [] | ||
if data_chunk_byte_size: | ||
return self._load_mission_as_chunks(root, leases, data_chunk_byte_size) | ||
try: | ||
return self._mission_client.load_mission_async(root, leases) | ||
except RpcError: | ||
return False, "Could not communicate with the robot" | ||
except CompilationError as e: | ||
return False, f"The mission failed to compile: {e}" | ||
except ValidationError as e: | ||
return False, f"The mission could not be validated: {e}" | ||
|
||
def _load_mission_as_chunks(self, root: nodes_pb2.Node, leases=None, data_chunk_byte_size: int = 1000 * 1000): | ||
"""Load a mission onto the robot. | ||
Args: | ||
root: Root node in a mission. | ||
leases: All leases necessary to initialize a mission. | ||
data_chunk_byte_size: max size of each streamed message | ||
Raises: | ||
RpcError: Problem communicating with the robot. | ||
bosdyn.mission.client.CompilationError: The mission failed to compile. | ||
bosdyn.mission.client.ValidationError: The mission failed to validate. | ||
""" | ||
if leases is None: | ||
leases = [] | ||
try: | ||
return self._mission_client.load_mission_as_chunks2(root, leases, data_chunk_byte_size) | ||
except RpcError: | ||
return False, "Could not communicate with the robot" | ||
except CompilationError as e: | ||
return False, f"The mission failed to compile: {e}" | ||
except ValidationError as e: | ||
return False, f"The mission could not be validated: {e}" | ||
|
||
def get_mission_info(self): | ||
"""Get static information about the loaded mission. | ||
Raises: | ||
RpcError: Problem communicating with the robot. | ||
""" | ||
try: | ||
return self._mission_client.get_info() | ||
except RpcError: | ||
return False, "Could not communicate with the robot" | ||
|
||
def play_mission( | ||
self, | ||
pause_time_secs: int, | ||
leases=None, | ||
settings=None, | ||
): | ||
"""Play loaded mission or continue a paused mission | ||
Args: | ||
pause_time_secs: Absolute time when the mission should pause execution. Subsequent RPCs | ||
will override this value, so you can use this to say "if you don't hear from me again, | ||
stop running the mission at this time." | ||
leases: Leases the mission service will need to use. Unlike other clients, these MUST | ||
be specified. | ||
Raises: | ||
RpcError: Problem communicating with the robot. | ||
NoMissionError: No mission Loaded. | ||
""" | ||
if leases is None: | ||
leases = [] | ||
try: | ||
return self._mission_client.play_mission_async(pause_time_secs, leases, settings) | ||
except RpcError: | ||
return False, "Could not communicate with the robot" | ||
except NoMissionError: | ||
return False, "No mission loaded" | ||
|
||
def get_mission_state( | ||
self, | ||
upper_tick_bound: Optional[int] = None, | ||
lower_tick_bound: Optional[int] = None, | ||
past_ticks: Optional[int] = None, | ||
): | ||
"""Get the state of the current playing mission | ||
Raises: | ||
RpcError: Problem communicating with the robot. | ||
NoMissionPlayingError: No mission playing. | ||
""" | ||
try: | ||
return self._mission_client.get_state_async(upper_tick_bound, lower_tick_bound, past_ticks) | ||
except RpcError: | ||
return False, "Could not communicate with the robot" | ||
except NoMissionPlayingError: | ||
return False, "No mission playing" | ||
|
||
def pause_mission(self): | ||
"""Pause the current mission | ||
Raises: | ||
RpcError: Problem communicating with the robot. | ||
NoMissionPlayingError: No mission playing. | ||
""" | ||
try: | ||
return self._mission_client.pause_mission_async() | ||
except RpcError: | ||
return False, "Could not communicate with the robot" | ||
except NoMissionPlayingError: | ||
return False, "No mission playing" | ||
|
||
def restart_mission(self, pause_time_secs, leases=None, settings=None): | ||
"""Restart mission from the beginning | ||
Args: | ||
pause_time_secs: Absolute time when the mission should pause execution. Subsequent RPCs | ||
to RestartMission will override this value, so you can use this to say "if you don't hear | ||
from me again, stop running the mission at this time." | ||
leases: Leases the mission service will need to use. Unlike other clients, these MUST | ||
be specified. | ||
Raises: | ||
RpcError: Problem communicating with the robot. | ||
NoMissionError: No Mission Loaded. | ||
bosdyn.mission.client.ValidationError: The mission failed to validate. | ||
""" | ||
if leases is None: | ||
leases = [] | ||
try: | ||
return self._mission_client.restart_mission_async(pause_time_secs, leases, settings) | ||
except RpcError: | ||
return False, "Could not communicate with the robot" | ||
except NoMissionError: | ||
return False, "No mission loaded" | ||
except ValidationError as e: | ||
return False, f"The mission could not be validated: {e}" | ||
|
||
def stop_mission(self): | ||
"""Stop the current mission | ||
Raises: | ||
RpcError: Problem communicating with the robot. | ||
NoMissionPlayingError: No mission playing. | ||
""" | ||
try: | ||
return self._mission_client.stop_mission_async() | ||
except RpcError: | ||
return False, "Could not communicate with the robot" | ||
except NoMissionPlayingError: | ||
return False, "No mission playing" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters