diff --git a/src/reachy2_sdk/reachy_sdk.py b/src/reachy2_sdk/reachy_sdk.py index 86a88cfe..6e3670b2 100644 --- a/src/reachy2_sdk/reachy_sdk.py +++ b/src/reachy2_sdk/reachy_sdk.py @@ -24,7 +24,7 @@ from reachy2_sdk_api import reachy_pb2, reachy_pb2_grpc from reachy2_sdk_api.goto_pb2 import GoalStatus, GoToAck, GoToGoalStatus, GoToId from reachy2_sdk_api.goto_pb2_grpc import GoToServiceStub -from reachy2_sdk_api.reachy_pb2 import ReachyState +from reachy2_sdk_api.reachy_pb2 import ReachyCoreMode, ReachyState from .config.reachy_info import ReachyInfo from .media.camera_manager import CameraManager @@ -60,17 +60,22 @@ class ReachySDK: """ _instances_by_host: Dict[str, "ReachySDK"] = {} + _last_executing_instance = None def __new__(cls: Type[ReachySDK], host: str) -> ReachySDK: - """Ensure only one connected instance per IP is created.""" + """Ensure that only one instance of ReachySDK is created for each host.""" + # check that the host is not already connected to another instance if host in cls._instances_by_host: - if cls._instances_by_host[host]._grpc_connected: - return cls._instances_by_host[host] + instance = cls._instances_by_host[host] + if instance._grpc_connected: + return instance else: - del cls._instances_by_host[host] + del instance + # Create a new instance and add it to the dict instance = super().__new__(cls) cls._instances_by_host[host] = instance + return instance def __init__( @@ -92,6 +97,7 @@ def __init__( if hasattr(self, "_initialized"): self._logger.warning("An instance already exists.") + self._print_mode_type() return self._host = host @@ -111,20 +117,32 @@ def __init__( self._update_timestamp: Timestamp = Timestamp(seconds=0) + self._mode: Optional[str] = None + self._inactivity_timer: Optional[threading.Timer] = None + self.connect() def connect(self) -> None: """Connects the SDK to the robot.""" if self._grpc_connected: self._logger.warning("Already connected to Reachy.") + self._print_mode_type() return self._grpc_channel = grpc.insecure_channel(f"{self._host}:{self._sdk_port}") - self._stop_flag = threading.Event() try: self._get_info() + self._mode = str(ReachyCoreMode.keys()[self._info._mode]) if self._info else None + + # ask for user confirmation if the robot is physical + if self._mode == "REAL": + if not self._confirm_connection(): + self._logger.warning("Connection to Reachy aborted.") + self.disconnect() + return + except ConnectionError: self._logger.error( f"Could not connect to Reachy with on IP address {self._host}, " @@ -134,6 +152,7 @@ def connect(self) -> None: return self._setup_parts() + # self._setup_audio() self._cameras = self._setup_video() @@ -147,6 +166,10 @@ def connect(self) -> None: self._grpc_connected = True self._logger.info("Connected to Reachy.") + self._print_mode_type() + + if self._mode == "REAL": + self._check_inactivity_from_user() def disconnect(self, lost_connection: bool = False) -> None: """Disconnect the SDK from the robot's server. @@ -169,6 +192,7 @@ def disconnect(self, lost_connection: bool = False) -> None: self._r_arm = None self._l_arm = None self._mobile_base = None + self._mode = None self._logger.info("Disconnected from Reachy.") @@ -428,6 +452,51 @@ def _setup_parts(self) -> None: self._setup_part_head(initial_state) self._setup_part_mobile_base(initial_state) + def _confirm_connection(self) -> bool: + """Ask the user to confirm the connection to a physical Reachy.""" + response = input("⚠️ You are about to connect to a PHYSICAL Reachy.\n Do you want to continue (y/n)?").strip().lower() + if response in ["", "y", "yes"]: + return True + else: + return False + + def _print_mode_type(self) -> None: + """Print a warning for users, on the mode of Reachy.""" + # check if the last executing instance is the current one to avoid printing warning on a different instance + if ReachySDK._last_executing_instance != self: + return + + if self._grpc_connected: + mode = self._mode + if mode == "REAL": + warning_str = "\n ⚠️ Be careful, you're controlling the PHYSICAL Reachy" + else: + warning_str = " you're controlling the virtual Reachy" + self._logger.warning(f"This Reachy is in {mode} mode :{warning_str}.\n") + + def _check_inactivity_from_user(self, timeout: float = 60.0) -> None: + """Check inactivity from the user, by catching the functions called by them. + + If that exceeds the timeout, print the mode type for the user to have a reminder. + Default timeout is 60 seconds. + """ + if self._inactivity_timer: + self._inactivity_timer.cancel() + self._inactivity_timer = threading.Timer(timeout, self._print_mode_type) + self._inactivity_timer.start() + + def __getattribute__(self, name: str) -> Any: + """Intercepts method calls to track user interactions, ignoring private/internal methods.""" + if name.startswith("_") or not self._grpc_connected: + return super().__getattribute__(name) + else: + ReachySDK._last_executing_instance = self + + if self._mode == "REAL": + self._check_inactivity_from_user() + + return super().__getattribute__(name) + def get_update_timestamp(self) -> int: """Returns the timestamp (ns) of the last update.