Skip to content

Commit

Permalink
Rely on the wrapper in the connector package
Browse files Browse the repository at this point in the history
  • Loading branch information
b-Tomas committed Nov 26, 2024
1 parent 04ed595 commit da253d1
Showing 1 changed file with 90 additions and 101 deletions.
191 changes: 90 additions & 101 deletions mir_connector/inorbit_mir_connector/src/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,111 +113,100 @@ def _inorbit_command_handler(self, command_name, args, options):
- `metadata` is reserved for the future and will contains additional
information about the received command request.
"""
try:
if command_name == COMMAND_CUSTOM_COMMAND:
self._logger.info(f"Received '{command_name}'!. {args}")
if len(args) < 2:
self._logger.error("Invalid number of arguments: ", args)
options["result_function"](
"1", execution_status_details="Invalid number of arguments"
)
return
script_name = args[0]
script_args = args[1]
# TODO (Elvio): Needs to be re designed.
# 1. script_name is not standarized at all
# 2. Consider implementing a callback for handling mission specific commands
# 3. Needs an interface for supporting mission related actions
if script_name == "queue_mission" and script_args[0] == "--mission_id":
self.mission_tracking.mir_mission_tracking_enabled = (
self._robot_session.missions_module.executor.wait_until_idle(0)
)
self.mir_api.queue_mission(script_args[1])
elif script_name == "run_mission_now" and script_args[0] == "--mission_id":
self.mission_tracking.mir_mission_tracking_enabled = (
self._robot_session.missions_module.executor.wait_until_idle(0)
)
self.mir_api.abort_all_missions()
self.mir_api.queue_mission(script_args[1])
elif script_name == "abort_missions":
self._robot_session.missions_module.executor.cancel_mission("*")
self.mir_api.abort_all_missions()
elif script_name == "set_state":
if script_args[0] == "--state_id":
state_id = script_args[1]
if not state_id.isdigit() or int(state_id) not in MIR_STATE.keys():
error = f"Invalid `state_id` '{state_id}'"
self._logger.error(error)
options["result_function"]("1", execution_status_details=error)
return
state_id = int(state_id)
self._logger.info(
f"Setting robot state to state {state_id}: {MIR_STATE[state_id]}"
)
self.mir_api.set_state(state_id)
if script_args[0] == "--clear_error":
self._logger.info("Clearing error state")
self.mir_api.clear_error()
elif script_name == "set_waiting_for" and script_args[0] == "--text":
self._logger.info(f"Setting 'waiting for' value to {script_args[1]}")
self.mission_tracking.waiting_for_text = script_args[1]
elif script_name == "localize":
# The localize command sets the robot's position and current map
# The expected arguments are "x" and "y" in meters and "orientation" in degrees,
# as in MiR Fleet, and "map_id" as the target map in MiR Fleet, which matches
# the uploaded "frame_id" in InOrbit
if (
len(script_args) == 8
and script_args[0] == "--x"
and script_args[2] == "--y"
and script_args[4] == "--orientation"
and script_args[6] == "--map_id"
):
status = {
"position": {
"x": float(script_args[1]),
"y": float(script_args[3]),
"orientation": float(script_args[5]),
},
"map_id": script_args[7],
}
self._logger.info(f"Changing map to {script_args[7]}")
self.mir_api.set_status(status)
else:
self._logger.error("Invalid arguments for 'localize' command")
options["result_function"](
"1", execution_status_details="Invalid arguments"
)
if command_name == COMMAND_CUSTOM_COMMAND:
self._logger.info(f"Received '{command_name}'!. {args}")
if len(args) < 2:
self._logger.error("Invalid number of arguments: ", args)
options["result_function"](
"1", execution_status_details="Invalid number of arguments"
)
return
script_name = args[0]
script_args = args[1]
# TODO (Elvio): Needs to be re designed.
# 1. script_name is not standarized at all
# 2. Consider implementing a callback for handling mission specific commands
# 3. Needs an interface for supporting mission related actions
if script_name == "queue_mission" and script_args[0] == "--mission_id":
self.mission_tracking.mir_mission_tracking_enabled = (
self._robot_session.missions_module.executor.wait_until_idle(0)
)
self.mir_api.queue_mission(script_args[1])
elif script_name == "run_mission_now" and script_args[0] == "--mission_id":
self.mission_tracking.mir_mission_tracking_enabled = (
self._robot_session.missions_module.executor.wait_until_idle(0)
)
self.mir_api.abort_all_missions()
self.mir_api.queue_mission(script_args[1])
elif script_name == "abort_missions":
self._robot_session.missions_module.executor.cancel_mission("*")
self.mir_api.abort_all_missions()
elif script_name == "set_state":
if script_args[0] == "--state_id":
state_id = script_args[1]
if not state_id.isdigit() or int(state_id) not in MIR_STATE.keys():
error = f"Invalid `state_id` '{state_id}'"
self._logger.error(error)
options["result_function"]("1", execution_status_details=error)
return
state_id = int(state_id)
self._logger.info(
f"Setting robot state to state {state_id}: {MIR_STATE[state_id]}"
)
self.mir_api.set_state(state_id)
if script_args[0] == "--clear_error":
self._logger.info("Clearing error state")
self.mir_api.clear_error()
elif script_name == "set_waiting_for" and script_args[0] == "--text":
self._logger.info(f"Setting 'waiting for' value to {script_args[1]}")
self.mission_tracking.waiting_for_text = script_args[1]
elif script_name == "localize":
# The localize command sets the robot's position and current map
# The expected arguments are "x" and "y" in meters and "orientation" in degrees,
# as in MiR Fleet, and "map_id" as the target map in MiR Fleet, which matches
# the uploaded "frame_id" in InOrbit
if (
len(script_args) == 8
and script_args[0] == "--x"
and script_args[2] == "--y"
and script_args[4] == "--orientation"
and script_args[6] == "--map_id"
):
status = {
"position": {
"x": float(script_args[1]),
"y": float(script_args[3]),
"orientation": float(script_args[5]),
},
"map_id": script_args[7],
}
self._logger.info(f"Changing map to {script_args[7]}")
self.mir_api.set_status(status)
else:
# Other kind if custom commands may be handled by the edge-sdk
# (e.g.user_scripts) and not by the connector code itself
# Do not return any result and leave it to the edge-sdk to handle it
self._logger.error("Invalid arguments for 'localize' command")
options["result_function"](
"1", execution_status_details="Invalid arguments"
)
return
# Return '0' for success
options["result_function"]("0")
elif command_name == COMMAND_NAV_GOAL:
self._logger.info(f"Received '{command_name}'!. {args}")
pose = args[0]
self.send_waypoint_over_missions(pose)
elif command_name == COMMAND_MESSAGE:
msg = args[0]
if msg == "inorbit_pause":
self.mir_api.set_state(4)
elif msg == "inorbit_resume":
self.mir_api.set_state(3)
else:
self._logger.info(f"Received unknown command '{command_name}'!. {args}")
except Exception as ex:
self._logger.error(
f"Failed to execute command '{command_name}' with args {args}. Exception:\n{ex}"
)
options["result_function"](
"1",
execution_status_details="An error occured executing custom command",
stderr=str(ex),
)
return
# Other kind if custom commands may be handled by the edge-sdk
# (e.g.user_scripts) and not by the connector code itself
# Do not return any result and leave it to the edge-sdk to handle it
return
# Return '0' for success
options["result_function"]("0")
elif command_name == COMMAND_NAV_GOAL:
self._logger.info(f"Received '{command_name}'!. {args}")
pose = args[0]
self.send_waypoint_over_missions(pose)
elif command_name == COMMAND_MESSAGE:
msg = args[0]
if msg == "inorbit_pause":
self.mir_api.set_state(4)
elif msg == "inorbit_resume":
self.mir_api.set_state(3)
else:
self._logger.info(f"Received unknown command '{command_name}'!. {args}")

def _connect(self) -> None:
"""Connect to the robot services and to InOrbit"""
Expand Down

0 comments on commit da253d1

Please sign in to comment.