@@ -51,7 +51,7 @@ def _get_lease(self) -> Lease:
5151 self ._lease = self ._lease_wallet .get_lease ()
5252 return self ._lease
5353
54- def _init_current_graph_nav_state (self ):
54+ def _init_current_graph_nav_state (self ) -> None :
5555 # Store the most recent knowledge of the state of the robot based on rpc calls.
5656 self ._current_graph = None
5757 self ._current_edges = {} # maps to_waypoint to list(from_waypoint)
@@ -75,7 +75,7 @@ def navigate_initial_localization(
7575 upload_path : str ,
7676 initial_localization_fiducial : bool = True ,
7777 initial_localization_waypoint : typing .Optional [str ] = None ,
78- ):
78+ ) -> typing . Tuple [ bool , str ] :
7979 """Navigate with graphnav.
8080
8181 Args:
@@ -112,7 +112,7 @@ def navigate_initial_localization(
112112
113113 return True , "Localization done!"
114114
115- def navigate_to_existing_waypoint (self , waypoint_id : str ):
115+ def navigate_to_existing_waypoint (self , waypoint_id : str ) -> typing . Tuple [ bool , str ] :
116116 """Navigate to an existing waypoint.
117117 Args:
118118 waypoint_id : Waypoint id string for where to go
@@ -121,7 +121,7 @@ def navigate_to_existing_waypoint(self, waypoint_id: str):
121121 resp = self ._navigate_to (waypoint_id )
122122 return resp
123123
124- def navigate_through_route (self , waypoint_ids : typing .List [str ]):
124+ def navigate_through_route (self , waypoint_ids : typing .List [str ]) -> typing . Tuple [ bool , str ] :
125125 """
126126 Args:
127127 waypoint_ids: List[str] of waypoints to follow
@@ -146,7 +146,7 @@ def navigation_close_loops(self, close_fiducial_loops: bool, close_odometry_loop
146146 def optmize_anchoring (self ) -> typing .Tuple [bool , str ]:
147147 return self ._optimize_anchoring ()
148148
149- def _write_bytes_while_download (self , filepath : str , data : bytes ):
149+ def _write_bytes_while_download (self , filepath : str , data : bytes ) -> None :
150150 """Write data to a file.
151151 Args:
152152 filepath (str) : Path of file where data will be written.
@@ -257,14 +257,14 @@ def download_graph(self, download_path: str) -> typing.Tuple[bool, str]:
257257 # Downloading, reproducing, distributing or otherwise using the SDK Software
258258 # is subject to the terms and conditions of the Boston Dynamics Software
259259 # Development Kit License (20191101-BDSDK-SL).
260- def _get_localization_state (self , * args ):
260+ def _get_localization_state (self , * args ) -> None :
261261 """Get the current localization and state of the robot."""
262262 state = self ._graph_nav_client .get_localization_state ()
263263 self ._logger .info (f"Got localization: \n { str (state .localization )} " )
264264 odom_tform_body = get_odom_tform_body (state .robot_kinematics .transforms_snapshot )
265265 self ._logger .info (f"Got robot state in kinematic odometry frame: \n { str (odom_tform_body )} " )
266266
267- def set_initial_localization_fiducial (self , * args ):
267+ def set_initial_localization_fiducial (self , * args ) -> None :
268268 """Trigger localization when near a fiducial."""
269269 robot_state = self ._robot_state_client .get_robot_state ()
270270 current_odom_tform_body = get_odom_tform_body (robot_state .kinematic_state .transforms_snapshot ).to_proto ()
@@ -276,7 +276,7 @@ def set_initial_localization_fiducial(self, *args):
276276 ko_tform_body = current_odom_tform_body ,
277277 )
278278
279- def set_initial_localization_waypoint (self , * args ):
279+ def set_initial_localization_waypoint (self , * args ) -> None :
280280 """Trigger localization to a waypoint."""
281281 # Take the first argument as the localization waypoint.
282282 if len (args ) < 1 :
@@ -316,7 +316,7 @@ def _download_current_graph(self):
316316 self ._current_graph = graph
317317 return graph
318318
319- def _download_full_graph (self , * args ):
319+ def _download_full_graph (self , * args ) -> None :
320320 """Download the graph and snapshots from the robot."""
321321 graph = self ._graph_nav_client .download_graph ()
322322 if graph is None :
@@ -330,12 +330,12 @@ def _download_full_graph(self, *args):
330330 self ._download_and_write_waypoint_snapshots (graph .waypoints )
331331 self ._download_and_write_edge_snapshots (graph .edges )
332332
333- def _write_full_graph (self , graph ):
333+ def _write_full_graph (self , graph ) -> None :
334334 """Download the graph from robot to the specified, local filepath location."""
335335 graph_bytes = graph .SerializeToString ()
336336 self ._write_bytes (self ._download_filepath , "/graph" , graph_bytes )
337337
338- def _download_and_write_waypoint_snapshots (self , waypoints ):
338+ def _download_and_write_waypoint_snapshots (self , waypoints ) -> None :
339339 """Download the waypoint snapshots from robot to the specified, local filepath location."""
340340 num_waypoint_snapshots_downloaded = 0
341341 for waypoint in waypoints :
@@ -359,7 +359,7 @@ def _download_and_write_waypoint_snapshots(self, waypoints):
359359 )
360360 )
361361
362- def _download_and_write_edge_snapshots (self , edges ):
362+ def _download_and_write_edge_snapshots (self , edges ) -> None :
363363 """Download the edge snapshots from robot to the specified, local filepath location."""
364364 num_edge_snapshots_downloaded = 0
365365 num_to_download = 0
@@ -383,7 +383,7 @@ def _download_and_write_edge_snapshots(self, edges):
383383 "Downloaded {} of the total {} edge snapshots." .format (num_edge_snapshots_downloaded , num_to_download )
384384 )
385385
386- def _write_bytes (self , filepath : str , filename : str , data ):
386+ def _write_bytes (self , filepath : str , filename : str , data ) -> None :
387387 """Write data to a file."""
388388 os .makedirs (filepath , exist_ok = True )
389389 with open (filepath + filename , "wb+" ) as f :
@@ -655,7 +655,9 @@ def _match_edge(
655655 return map_pb2 .Edge .Id (from_waypoint = waypoint1 , to_waypoint = waypoint2 )
656656 return None
657657
658- def _auto_close_loops (self , close_fiducial_loops : bool , close_odometry_loops : bool , * args ):
658+ def _auto_close_loops (
659+ self , close_fiducial_loops : bool , close_odometry_loops : bool , * args
660+ ) -> typing .Tuple [bool , str ]:
659661 """Automatically find and close all loops in the graph."""
660662 response : map_processing_pb2 .ProcessTopologyResponse = self ._map_processing_client .process_topology (
661663 params = map_processing_pb2 .ProcessTopologyRequest .Params (
@@ -676,7 +678,7 @@ def _auto_close_loops(self, close_fiducial_loops: bool, close_odometry_loops: bo
676678 else :
677679 return False , "Unknown error during map processing."
678680
679- def _optimize_anchoring (self , * args ):
681+ def _optimize_anchoring (self , * args ) -> typing . Tuple [ bool , str ] :
680682 """Call anchoring optimization on the server, producing a globally optimal reference frame for waypoints to be
681683 expressed in.
682684 """
@@ -712,7 +714,7 @@ def _optimize_anchoring(self, *args):
712714 return False , status_mapping [response .status ]
713715 return False , "Unknown error during anchoring optimization."
714716
715- def _id_to_short_code (self , id : str ):
717+ def _id_to_short_code (self , id : str ) -> typing . Optional [ str ] :
716718 """Convert a unique id to a 2 letter short code."""
717719 tokens = id .split ("-" )
718720 if len (tokens ) > 2 :
@@ -726,7 +728,7 @@ def _pretty_print_waypoints(
726728 short_code_to_count : typing .Dict [str , int ],
727729 localization_id : str ,
728730 logger : logging .Logger ,
729- ):
731+ ) -> None :
730732 short_code = self ._id_to_short_code (waypoint_id )
731733 if short_code is None or short_code_to_count [short_code ] != 1 :
732734 # If the short code is not valid/unique, don't show it.
0 commit comments