Skip to content

Commit

Permalink
Adding some manual mypy fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
khughes-bdai committed Jan 25, 2024
1 parent fe65ca5 commit 30c938d
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 65 deletions.
18 changes: 9 additions & 9 deletions spot_wrapper/cam_webrtc_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@


class SpotCAMMediaStreamTrack(MediaStreamTrack):
def __init__(self, track, queue):
def __init__(self, track, queue) -> None:
super().__init__()
self.track = track
self.queue = queue
Expand All @@ -36,7 +36,7 @@ def __init__(
rtc_config,
media_recorder=None,
recorder_type=None,
):
) -> None:
self.pc = RTCPeerConnection(configuration=rtc_config)

self.video_frame_queue = asyncio.Queue()
Expand Down Expand Up @@ -64,7 +64,7 @@ def get_sdp_offer_from_spot_cam(self, token):
result = response.json()
return result["id"], base64.b64decode(result["sdp"]).decode()

def send_sdp_answer_to_spot_cam(self, token, offer_id, sdp_answer):
def send_sdp_answer_to_spot_cam(self, token, offer_id, sdp_answer)-> None:
headers = {"Authorization": f"Bearer {token}"}
server_url = f"https://{self.hostname}:{self.sdp_port}/{self.sdp_filename}"

Expand All @@ -73,7 +73,7 @@ def send_sdp_answer_to_spot_cam(self, token, offer_id, sdp_answer):
if r.status_code != 200:
raise ValueError(r)

async def start(self):
async def start(self) -> None:
# first get a token
try:
token = self.get_bearer_token()
Expand All @@ -83,26 +83,26 @@ async def start(self):
offer_id, sdp_offer = self.get_sdp_offer_from_spot_cam(token)

@self.pc.on("icegatheringstatechange")
def _on_ice_gathering_state_change():
def _on_ice_gathering_state_change()-> None:
print(f"ICE gathering state changed to {self.pc.iceGatheringState}")

@self.pc.on("signalingstatechange")
def _on_signaling_state_change():
def _on_signaling_state_change()-> None:
print(f"Signaling state changed to: {self.pc.signalingState}")

@self.pc.on("icecandidate")
def _on_ice_candidate(event):
def _on_ice_candidate(event)-> None:
print(f"Received candidate: {event.candidate}")

@self.pc.on("iceconnectionstatechange")
async def _on_ice_connection_state_change():
async def _on_ice_connection_state_change()-> None:
print(f"ICE connection state changed to: {self.pc.iceConnectionState}")

if self.pc.iceConnectionState == "checking":
self.send_sdp_answer_to_spot_cam(token, offer_id, self.pc.localDescription.sdp.encode())

@self.pc.on("track")
def _on_track(track):
def _on_track(track)-> None:
print(f"Received track: {track.kind}")

if self.media_recorder:
Expand Down
60 changes: 30 additions & 30 deletions spot_wrapper/cam_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ class LEDPosition(enum.Enum):
FRONT_RIGHT = 2
REAR_RIGHT = 3

def __init__(self, robot: Robot, logger):
def __init__(self, robot: Robot, logger) -> None:
self.logger = logger
self.client: LightingClient = robot.ensure_client(LightingClient.default_service_name)

def set_led_brightness(self, brightness):
def set_led_brightness(self, brightness: float) -> None:
"""
Set the brightness of the LEDs to the specified brightness
Expand Down Expand Up @@ -82,7 +82,7 @@ class PowerWrapper:
Wrapper for power interaction
"""

def __init__(self, robot: Robot, logger):
def __init__(self, robot: Robot, logger) -> None:
self.logger = logger
self.client: PowerClient = robot.ensure_client(PowerClient.default_service_name)

Expand All @@ -98,7 +98,7 @@ def set_power_status(
aux1: typing.Optional[bool] = None,
aux2: typing.Optional[bool] = None,
external_mic: typing.Optional[bool] = None,
):
) -> None:
"""
Set power status for each of the devices
Expand All @@ -116,7 +116,7 @@ def cycle_power(
aux1: typing.Optional[bool] = None,
aux2: typing.Optional[bool] = None,
external_mic: typing.Optional[bool] = None,
):
) -> None:
"""
Cycle power of the specified devices
Expand All @@ -134,7 +134,7 @@ class CompositorWrapper:
Wrapper for compositor interaction
"""

def __init__(self, robot: Robot, logger):
def __init__(self, robot: Robot, logger) -> None:
self.logger = logger
self.client: CompositorClient = robot.ensure_client(CompositorClient.default_service_name)

Expand All @@ -157,7 +157,7 @@ def get_visible_cameras(self):
"""
return self.client.get_visible_cameras()

def set_screen(self, screen: str):
def set_screen(self, screen: str) -> None:
"""
Set the screen to be streamed over the network
Expand All @@ -175,7 +175,7 @@ def get_screen(self) -> str:
"""
return self.client.get_screen()

def set_ir_colormap(self, colormap, min_temp, max_temp, auto_scale=True):
def set_ir_colormap(self, colormap, min_temp: float, max_temp: float, auto_scale: bool=True) -> None:
"""
Set the colormap used for the IR camera
Expand All @@ -188,7 +188,7 @@ def set_ir_colormap(self, colormap, min_temp, max_temp, auto_scale=True):
"""
self.client.set_ir_colormap(colormap, min_temp, max_temp, auto_scale)

def set_ir_meter_overlay(self, x, y, enable=True):
def set_ir_meter_overlay(self, x: float, y: float, enable: bool=True) -> None:
"""
Set the reticle position on the Spot CAM IR.
Expand All @@ -205,7 +205,7 @@ class HealthWrapper:
Wrapper for health details
"""

def __init__(self, robot, logger):
def __init__(self, robot: Robot, logger) -> None:
self.client: HealthClient = robot.ensure_client(HealthClient.default_service_name)
self.logger = logger

Expand Down Expand Up @@ -250,7 +250,7 @@ class AudioWrapper:
Wrapper for audio commands on the camera
"""

def __init__(self, robot, logger):
def __init__(self, robot: Robot, logger) -> None:
self.client: AudioClient = robot.ensure_client(AudioClient.default_service_name)
self.logger = logger

Expand All @@ -263,7 +263,7 @@ def list_sounds(self) -> typing.List[str]:
"""
return self.client.list_sounds()

def set_volume(self, percentage):
def set_volume(self, percentage: float) -> None:
"""
Set the volume at which sounds should be played
Expand All @@ -272,7 +272,7 @@ def set_volume(self, percentage):
"""
self.client.set_volume(percentage)

def get_volume(self):
def get_volume(self) -> float:
"""
Get the current volume at which sounds are played
Expand All @@ -281,7 +281,7 @@ def get_volume(self):
"""
return self.client.get_volume()

def play_sound(self, sound_name, gain=1.0):
def play_sound(self, sound_name: str, gain: float=1.0) -> None:
"""
Play a sound which is on the device
Expand All @@ -292,7 +292,7 @@ def play_sound(self, sound_name, gain=1.0):
sound = audio_pb2.Sound(name=sound_name)
self.client.play_sound(sound, gain)

def load_sound(self, sound_file, name):
def load_sound(self, sound_file: str, name: str) -> None:
"""
Load a sound from a wav file and save it with the given name onto the device
Args:
Expand All @@ -319,7 +319,7 @@ def load_sound(self, sound_file, name):

self.client.load_sound(sound, data)

def delete_sound(self, name):
def delete_sound(self, name: str) -> None:
"""
Delete a sound from the device
Expand All @@ -334,11 +334,11 @@ class StreamQualityWrapper:
Wrapper for stream quality commands
"""

def __init__(self, robot, logger):
def __init__(self, robot: Robot, logger) -> None:
self.client: StreamQualityClient = robot.ensure_client(StreamQualityClient.default_service_name)
self.logger = logger

def set_stream_params(self, target_bitrate, refresh_interval, idr_interval, awb):
def set_stream_params(self, target_bitrate: int, refresh_interval: int, idr_interval: int, awb) -> None:
"""
Set image compression and postprocessing parameters
Expand Down Expand Up @@ -369,7 +369,7 @@ def get_stream_params(self) -> typing.Dict[str, int]:

return param_dict

def enable_congestion_control(self, enable):
def enable_congestion_control(self, enable: bool) -> None:
"""
Enable congestion control on the receiver... not sure what this does
Expand Down Expand Up @@ -402,7 +402,7 @@ class MediaLogWrapper:
Some functionality adapted from https://github.com/boston-dynamics/spot-sdk/blob/master/python/examples/spot_cam/media_log.py
"""

def __init__(self, robot, logger) -> None:
def __init__(self, robot: Robot, logger) -> None:
self.client: MediaLogClient = robot.ensure_client(MediaLogClient.default_service_name)
self.logger = logger

Expand Down Expand Up @@ -656,7 +656,7 @@ class PTZWrapper:
Wrapper for controlling the PTZ unit
"""

def __init__(self, robot, logger):
def __init__(self, robot: Robot, logger) -> None:
self.client: PtzClient = robot.ensure_client(PtzClient.default_service_name)
self.logger = logger
self.ptzs = {}
Expand Down Expand Up @@ -783,7 +783,7 @@ def get_ptz_velocity(self, ptz_name) -> PtzVelocity:
"""
return self.client.get_ptz_velocity(PtzDescription(name=ptz_name))

def set_ptz_velocity(self, ptz_name, pan, tilt, zoom):
def set_ptz_velocity(self, ptz_name, pan, tilt, zoom) -> None:
"""
Set the velocity of the various axes of the specified ptz
Expand All @@ -796,7 +796,7 @@ def set_ptz_velocity(self, ptz_name, pan, tilt, zoom):
# We do not clamp the velocity to the limits, as it is a rate
self.client.set_ptz_velocity(self._get_ptz_description(ptz_name), pan, tilt, zoom)

def initialise_lens(self):
def initialise_lens(self) -> None:
"""
Initialises or resets ptz autofocus
"""
Expand All @@ -819,12 +819,12 @@ class ImageStreamWrapper:
def __init__(
self,
hostname: str,
robot,
robot: Robot,
logger,
sdp_port=31102,
sdp_filename="h264.sdp",
cam_ssl_cert_path=None,
):
) -> None:
"""
Initialise the wrapper
Expand All @@ -838,7 +838,7 @@ def __init__(
"""
self.shutdown_flag = threading.Event()
self.logger = logger
self.last_image_time = None
self.last_image_time: typing.Optional[datetime.datetime] = None
self.image_lock = threading.Lock()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
Expand All @@ -862,15 +862,15 @@ def __init__(
self.async_thread = threading.Thread(target=loop.run_forever)
self.async_thread.start()

async def _monitor_shutdown(self):
async def _monitor_shutdown(self) -> None:
while not self.shutdown_flag.is_set():
await asyncio.sleep(1.0)

self.logger.info("Image stream wrapper received shutdown flag")
await self.client.pc.close()
asyncio.get_event_loop().stop()

async def _process_func(self):
async def _process_func(self) -> None:
while asyncio.get_event_loop().is_running():
try:
frame = await self.client.video_frame_queue.get()
Expand All @@ -895,7 +895,7 @@ async def _process_func(self):


class SpotCamWrapper:
def __init__(self, hostname, username, password, logger, port: typing.Optional[int] = None):
def __init__(self, hostname, username, password, logger, port: typing.Optional[int] = None) -> None:
self._hostname = hostname
self._username = username
self._password = password
Expand Down Expand Up @@ -935,6 +935,6 @@ def __init__(self, hostname, username, password, logger, port: typing.Optional[i

self._logger.info("Finished setting up spot cam wrapper components")

def shutdown(self):
def shutdown(self)-> None:
self._logger.info("Shutting down Spot CAM wrapper")
self.image.shutdown_flag.set()
14 changes: 7 additions & 7 deletions spot_wrapper/spot_arm.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def _manipulation_request(
request_proto: manipulation_api_pb2,
end_time_secs: typing.Optional[float] = None,
timesync_endpoint: typing.Optional[TimeSyncEndpoint] = None,
):
) -> typing.Tuple[bool, str, typing.Optional[str]]:
"""Generic function for sending requests to the manipulation api of a robot.
Args:
request_proto: manipulation_api_pb2 object to send to the robot.
Expand All @@ -97,7 +97,7 @@ def _manipulation_request(
self._logger.error(f"Unable to execute manipulation command: {e}")
return False, str(e), None

def manipulation_command(self, request: manipulation_api_pb2):
def manipulation_command(self, request: manipulation_api_pb2) -> typing.Tuple[bool, str, typing.Optional[str]]:
end_time = time.time() + self._max_command_duration
return self._manipulation_request(
request,
Expand Down Expand Up @@ -136,7 +136,7 @@ def ensure_arm_power_and_stand(self) -> typing.Tuple[bool, str]:

return True, "Spot has an arm, is powered on, and standing"

def wait_for_arm_command_to_complete(self, cmd_id, timeout_sec=None):
def wait_for_arm_command_to_complete(self, cmd_id, timeout_sec: typing.Optional[float]=None) -> None:
"""
Wait until a command issued to the arm complets. Wrapper around the SDK function for convenience
Expand Down Expand Up @@ -231,7 +231,7 @@ def make_arm_trajectory_command(
arm_sync_robot_cmd = robot_command_pb2.RobotCommand(synchronized_command=sync_arm)
return RobotCommandBuilder.build_synchro_command(arm_sync_robot_cmd)

def arm_joint_move(self, joint_targets) -> typing.Tuple[bool, str]:
def arm_joint_move(self, joint_targets: typing.List[float]) -> typing.Tuple[bool, str]:
# All perspectives are given when looking at the robot from behind after the unstow service is called
# Joint1: 0.0 arm points to the front. positive: turn left, negative: turn right)
# RANGE: -3.14 -> 3.14
Expand Down Expand Up @@ -295,7 +295,7 @@ def arm_joint_move(self, joint_targets) -> typing.Tuple[bool, str]:
except Exception as e:
return False, f"Exception occured during arm movement: {e}"

def create_wrench_from_forces_and_torques(self, forces, torques):
def create_wrench_from_forces_and_torques(self, forces: typing.List[float], torques: typing.List[float]) -> geometry_pb2.Wrench:
force = geometry_pb2.Vec3(x=forces[0], y=forces[1], z=forces[2])
torque = geometry_pb2.Vec3(x=torques[0], y=torques[1], z=torques[2])
return geometry_pb2.Wrench(force=force, torque=torque)
Expand Down Expand Up @@ -504,7 +504,7 @@ def hand_pose(self, data) -> typing.Tuple[bool, str]:
def block_until_gripper_command_completes(
robot_command_client: RobotCommandClient,
cmd_id: int,
timeout_sec: float = None,
timeout_sec: typing.Optional[float] = None,
) -> bool:
"""
Helper that blocks until a gripper command achieves a finishing state
Expand Down Expand Up @@ -544,7 +544,7 @@ def block_until_gripper_command_completes(
def block_until_manipulation_completes(
manipulation_client: ManipulationApiClient,
cmd_id: int,
timeout_sec: float = None,
timeout_sec: typing.Optional[float] = None,
) -> bool:
"""
Helper that blocks until the arm achieves a finishing state for the specific manipulation command.
Expand Down
Loading

0 comments on commit 30c938d

Please sign in to comment.