From 83112cff5a8f11a83e87b8131c5ace650064f385 Mon Sep 17 00:00:00 2001 From: ClaireHzl <126695207+ClaireHzl@users.noreply.github.com> Date: Thu, 16 Jan 2025 18:11:30 +0100 Subject: [PATCH 1/7] Add print function at the instanciation and if no interaction. --- src/reachy2_sdk/reachy_sdk.py | 48 ++++++++++++++++++++++++++++++++--- 1 file changed, 44 insertions(+), 4 deletions(-) diff --git a/src/reachy2_sdk/reachy_sdk.py b/src/reachy2_sdk/reachy_sdk.py index 86a88cfe..e294df26 100644 --- a/src/reachy2_sdk/reachy_sdk.py +++ b/src/reachy2_sdk/reachy_sdk.py @@ -24,7 +24,7 @@ from reachy2_sdk_api import reachy_pb2, reachy_pb2_grpc from reachy2_sdk_api.goto_pb2 import GoalStatus, GoToAck, GoToGoalStatus, GoToId from reachy2_sdk_api.goto_pb2_grpc import GoToServiceStub -from reachy2_sdk_api.reachy_pb2 import ReachyState +from reachy2_sdk_api.reachy_pb2 import ReachyCoreMode, ReachyState from .config.reachy_info import ReachyInfo from .media.camera_manager import CameraManager @@ -64,10 +64,11 @@ class ReachySDK: def __new__(cls: Type[ReachySDK], host: str) -> ReachySDK: """Ensure only one connected instance per IP is created.""" if host in cls._instances_by_host: - if cls._instances_by_host[host]._grpc_connected: - return cls._instances_by_host[host] + instance = cls._instances_by_host[host] + if instance._grpc_connected: + return instance else: - del cls._instances_by_host[host] + del instance instance = super().__new__(cls) cls._instances_by_host[host] = instance @@ -92,6 +93,7 @@ def __init__( if hasattr(self, "_initialized"): self._logger.warning("An instance already exists.") + self._print_mode_type() return self._host = host @@ -111,12 +113,16 @@ def __init__( self._update_timestamp: Timestamp = Timestamp(seconds=0) + self._inactivity_timer: Optional[threading.Timer] = None + self._check_inactivity_from_user() + self.connect() def connect(self) -> None: """Connects the SDK to the robot.""" if self._grpc_connected: self._logger.warning("Already connected to Reachy.") + self._print_mode_type() return self._grpc_channel = grpc.insecure_channel(f"{self._host}:{self._sdk_port}") @@ -147,6 +153,7 @@ def connect(self) -> None: self._grpc_connected = True self._logger.info("Connected to Reachy.") + self._print_mode_type() def disconnect(self, lost_connection: bool = False) -> None: """Disconnect the SDK from the robot's server. @@ -428,6 +435,39 @@ def _setup_parts(self) -> None: self._setup_part_head(initial_state) self._setup_part_mobile_base(initial_state) + def _print_mode_type(self) -> None: + """Print a warning for users, on the mode of Reachy.""" + if not self.info: + self._logger.warning("Reachy is not connected!") + return + + mode: ReachyCoreMode = self.info.mode + if mode == "REAL": + warning_str = "Be careful, the PHYSICAL Reachy" + elif mode == "FAKE": + warning_str = "Only the virtual Reachy on Rviz" + elif mode == "GAZEBO": + warning_str = "Only the virtual Reachy on Gazebo" + + self._logger.warning(f"\nThis Reachy is in {mode} mode : {warning_str} is going to move.\n") + + def _check_inactivity_from_user(self, timeout: float = 30.0) -> None: + """Check inactivity from the user, by catching the functions called by them. + If that exceeds the timeout, print the mode type for the user to have a reminder. + """ + if self._inactivity_timer: + self._inactivity_timer.cancel() + self._inactivity_timer = threading.Timer(timeout, self._print_mode_type) + self._inactivity_timer.start() + + def __getattribute__(self, name: str) -> Any: + """Intercepts method calls to track user interactions, ignoring private/internal methods.""" + if name.startswith("_"): + return super().__getattribute__(name) + + self._check_inactivity_from_user() + return super().__getattribute__(name) + def get_update_timestamp(self) -> int: """Returns the timestamp (ns) of the last update. From f878cbcead035dbaeb8718c11cc6b4651c6bc6e4 Mon Sep 17 00:00:00 2001 From: ClaireHzl <126695207+ClaireHzl@users.noreply.github.com> Date: Mon, 20 Jan 2025 15:40:34 +0100 Subject: [PATCH 2/7] Log only for real mode. --- src/reachy2_sdk/reachy_sdk.py | 50 +++++++++++++++++++++++------------ 1 file changed, 33 insertions(+), 17 deletions(-) diff --git a/src/reachy2_sdk/reachy_sdk.py b/src/reachy2_sdk/reachy_sdk.py index e294df26..8bcc1c3a 100644 --- a/src/reachy2_sdk/reachy_sdk.py +++ b/src/reachy2_sdk/reachy_sdk.py @@ -11,6 +11,8 @@ from __future__ import annotations +import logging +import sys import threading import time from collections import namedtuple @@ -62,7 +64,8 @@ class ReachySDK: _instances_by_host: Dict[str, "ReachySDK"] = {} def __new__(cls: Type[ReachySDK], host: str) -> ReachySDK: - """Ensure only one connected instance per IP is created.""" + """Ensure that only one instance of ReachySDK is created for each host, and that the variable name is unique.""" + # check that the host is not already connected to another instance if host in cls._instances_by_host: instance = cls._instances_by_host[host] if instance._grpc_connected: @@ -70,8 +73,12 @@ def __new__(cls: Type[ReachySDK], host: str) -> ReachySDK: else: del instance + # Create a new instance instance = super().__new__(cls) + + # Add the instance to the instances dict cls._instances_by_host[host] = instance + return instance def __init__( @@ -89,6 +96,8 @@ def __init__( audio_port: The gRPC port for audio services. Default is 50063. video_port: The gRPC port for video services. Default is 50065. """ + + logging.basicConfig(level=logging.DEBUG, format="%(message)s", stream=sys.stdout) self._logger = getLogger(__name__) if hasattr(self, "_initialized"): @@ -96,6 +105,7 @@ def __init__( self._print_mode_type() return + self._variable_name: Optional[str] = None self._host = host self._sdk_port = sdk_port self._audio_port = audio_port @@ -113,8 +123,8 @@ def __init__( self._update_timestamp: Timestamp = Timestamp(seconds=0) + self._mode: Optional[str] = None self._inactivity_timer: Optional[threading.Timer] = None - self._check_inactivity_from_user() self.connect() @@ -140,6 +150,8 @@ def connect(self) -> None: return self._setup_parts() + self._mode = str(ReachyCoreMode.keys()[self._info._mode]) if self._info else None + # self._setup_audio() self._cameras = self._setup_video() @@ -155,6 +167,9 @@ def connect(self) -> None: self._logger.info("Connected to Reachy.") self._print_mode_type() + if self._mode == "REAL": + self._check_inactivity_from_user() + def disconnect(self, lost_connection: bool = False) -> None: """Disconnect the SDK from the robot's server. @@ -176,6 +191,7 @@ def disconnect(self, lost_connection: bool = False) -> None: self._r_arm = None self._l_arm = None self._mobile_base = None + self._mode = None self._logger.info("Disconnected from Reachy.") @@ -437,23 +453,21 @@ def _setup_parts(self) -> None: def _print_mode_type(self) -> None: """Print a warning for users, on the mode of Reachy.""" - if not self.info: - self._logger.warning("Reachy is not connected!") - return - - mode: ReachyCoreMode = self.info.mode - if mode == "REAL": - warning_str = "Be careful, the PHYSICAL Reachy" - elif mode == "FAKE": - warning_str = "Only the virtual Reachy on Rviz" - elif mode == "GAZEBO": - warning_str = "Only the virtual Reachy on Gazebo" + if self._grpc_connected: + mode = self._mode + if mode == "REAL": + warning_str = "Be careful, the PHYSICAL Reachy" + elif mode == "FAKE": + warning_str = "Only the virtual Reachy on Rviz" + elif mode == "GAZEBO": + warning_str = "Only the virtual Reachy on Gazebo" - self._logger.warning(f"\nThis Reachy is in {mode} mode : {warning_str} is going to move.\n") + self._logger.warning(f"\nThis Reachy is in {mode} mode : {warning_str} is going to move.\n") - def _check_inactivity_from_user(self, timeout: float = 30.0) -> None: + def _check_inactivity_from_user(self, timeout: float = 60.0) -> None: """Check inactivity from the user, by catching the functions called by them. If that exceeds the timeout, print the mode type for the user to have a reminder. + Default timeout is 60 seconds. """ if self._inactivity_timer: self._inactivity_timer.cancel() @@ -462,10 +476,12 @@ def _check_inactivity_from_user(self, timeout: float = 30.0) -> None: def __getattribute__(self, name: str) -> Any: """Intercepts method calls to track user interactions, ignoring private/internal methods.""" - if name.startswith("_"): + if name.startswith("_") or not self._grpc_connected: return super().__getattribute__(name) - self._check_inactivity_from_user() + elif self._mode == "REAL": + self._check_inactivity_from_user() + return super().__getattribute__(name) def get_update_timestamp(self) -> int: From 9eb74439e6e12e77dc6d20bb796489b925c6b78c Mon Sep 17 00:00:00 2001 From: ClaireHzl <126695207+ClaireHzl@users.noreply.github.com> Date: Mon, 20 Jan 2025 16:42:33 +0100 Subject: [PATCH 3/7] Only one log under one command. --- src/reachy2_sdk/reachy_sdk.py | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/src/reachy2_sdk/reachy_sdk.py b/src/reachy2_sdk/reachy_sdk.py index 8bcc1c3a..7e356e80 100644 --- a/src/reachy2_sdk/reachy_sdk.py +++ b/src/reachy2_sdk/reachy_sdk.py @@ -11,8 +11,8 @@ from __future__ import annotations -import logging -import sys +import platform +import readline import threading import time from collections import namedtuple @@ -96,8 +96,6 @@ def __init__( audio_port: The gRPC port for audio services. Default is 50063. video_port: The gRPC port for video services. Default is 50065. """ - - logging.basicConfig(level=logging.DEBUG, format="%(message)s", stream=sys.stdout) self._logger = getLogger(__name__) if hasattr(self, "_initialized"): @@ -125,6 +123,7 @@ def __init__( self._mode: Optional[str] = None self._inactivity_timer: Optional[threading.Timer] = None + self._last_command_id: Optional[str] = None self.connect() @@ -451,18 +450,32 @@ def _setup_parts(self) -> None: self._setup_part_head(initial_state) self._setup_part_mobile_base(initial_state) + def _get_current_command(self) -> str: + """Get the current command being typed by the user.""" + if platform.system() == "Windows": + return "" + try: + return readline.get_line_buffer() + print(readline.get_line_buffer()) + except Exception: + return "" + def _print_mode_type(self) -> None: """Print a warning for users, on the mode of Reachy.""" if self._grpc_connected: mode = self._mode if mode == "REAL": - warning_str = "Be careful, the PHYSICAL Reachy" + warning_str = "\n⚠️ Be careful, the PHYSICAL Reachy" elif mode == "FAKE": - warning_str = "Only the virtual Reachy on Rviz" + warning_str = " Only the virtual Reachy on Rviz" elif mode == "GAZEBO": warning_str = "Only the virtual Reachy on Gazebo" - self._logger.warning(f"\nThis Reachy is in {mode} mode : {warning_str} is going to move.\n") + current_command = self._get_current_command() + if current_command and current_command == self._last_command_id: + return + self._logger.warning(f"This Reachy is in {mode} mode :{warning_str} is going to move.\n") + self._last_log_command_id = current_command def _check_inactivity_from_user(self, timeout: float = 60.0) -> None: """Check inactivity from the user, by catching the functions called by them. From 9af9f2b7b294ee9ccc1c23219532ba8da6e7357e Mon Sep 17 00:00:00 2001 From: ClaireHzl <126695207+ClaireHzl@users.noreply.github.com> Date: Tue, 21 Jan 2025 10:18:30 +0100 Subject: [PATCH 4/7] Add user confirmation if instanciation of a physical robot. --- src/reachy2_sdk/reachy_sdk.py | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/src/reachy2_sdk/reachy_sdk.py b/src/reachy2_sdk/reachy_sdk.py index 7e356e80..f24d25de 100644 --- a/src/reachy2_sdk/reachy_sdk.py +++ b/src/reachy2_sdk/reachy_sdk.py @@ -140,6 +140,15 @@ def connect(self) -> None: try: self._get_info() + self._mode = str(ReachyCoreMode.keys()[self._info._mode]) if self._info else None + + # ask for user confirmation if the robot is physical + if self._mode == "REAL": + if not self._confirm_connection(): + self._logger.warning("Connection to Reachy aborted.") + self.disconnect() + return + except ConnectionError: self._logger.error( f"Could not connect to Reachy with on IP address {self._host}, " @@ -149,7 +158,6 @@ def connect(self) -> None: return self._setup_parts() - self._mode = str(ReachyCoreMode.keys()[self._info._mode]) if self._info else None # self._setup_audio() self._cameras = self._setup_video() @@ -450,6 +458,14 @@ def _setup_parts(self) -> None: self._setup_part_head(initial_state) self._setup_part_mobile_base(initial_state) + def _confirm_connection(self) -> bool: + """Ask the user to confirm the connection to a physical Reachy.""" + response = input("⚠️ You are about to connect to a PHYSICAL Reachy.\n Do you want to continue (y/n)?").strip().lower() + if response in ["", "y", "yes"]: + return True + else: + return False + def _get_current_command(self) -> str: """Get the current command being typed by the user.""" if platform.system() == "Windows": From 7382fc919b6ff575cf6dc4afb3450372416e70ce Mon Sep 17 00:00:00 2001 From: ClaireHzl <126695207+ClaireHzl@users.noreply.github.com> Date: Wed, 22 Jan 2025 12:19:57 +0100 Subject: [PATCH 5/7] No warning message if the last command was from another instance. --- src/reachy2_sdk/reachy_sdk.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/src/reachy2_sdk/reachy_sdk.py b/src/reachy2_sdk/reachy_sdk.py index f24d25de..80db9c24 100644 --- a/src/reachy2_sdk/reachy_sdk.py +++ b/src/reachy2_sdk/reachy_sdk.py @@ -62,6 +62,7 @@ class ReachySDK: """ _instances_by_host: Dict[str, "ReachySDK"] = {} + _last_executing_instance = None def __new__(cls: Type[ReachySDK], host: str) -> ReachySDK: """Ensure that only one instance of ReachySDK is created for each host, and that the variable name is unique.""" @@ -481,20 +482,21 @@ def _print_mode_type(self) -> None: if self._grpc_connected: mode = self._mode if mode == "REAL": - warning_str = "\n⚠️ Be careful, the PHYSICAL Reachy" - elif mode == "FAKE": - warning_str = " Only the virtual Reachy on Rviz" - elif mode == "GAZEBO": - warning_str = "Only the virtual Reachy on Gazebo" + warning_str = "\n ⚠️ Be careful, you're controlling the PHYSICAL Reachy" + else: + warning_str = " you're controlling the virtual Reachy" current_command = self._get_current_command() - if current_command and current_command == self._last_command_id: + + # if the last command was from a different instance, or if there is already a warning message under the last command + # we don't print another warning + if ReachySDK._last_executing_instance != self or (current_command and current_command == self._last_command_id): return - self._logger.warning(f"This Reachy is in {mode} mode :{warning_str} is going to move.\n") - self._last_log_command_id = current_command + self._logger.warning(f"This Reachy is in {mode} mode :{warning_str}.\n") def _check_inactivity_from_user(self, timeout: float = 60.0) -> None: """Check inactivity from the user, by catching the functions called by them. + If that exceeds the timeout, print the mode type for the user to have a reminder. Default timeout is 60 seconds. """ @@ -508,7 +510,10 @@ def __getattribute__(self, name: str) -> Any: if name.startswith("_") or not self._grpc_connected: return super().__getattribute__(name) - elif self._mode == "REAL": + else: + ReachySDK._last_executing_instance = self + + if self._mode == "REAL": self._check_inactivity_from_user() return super().__getattribute__(name) From b03ab44ce2a41c28b363dee9c670b006d0e12be4 Mon Sep 17 00:00:00 2001 From: ClaireHzl <126695207+ClaireHzl@users.noreply.github.com> Date: Wed, 22 Jan 2025 14:27:45 +0100 Subject: [PATCH 6/7] Delete the check of the last command sent. --- src/reachy2_sdk/reachy_sdk.py | 28 +++------------------------- 1 file changed, 3 insertions(+), 25 deletions(-) diff --git a/src/reachy2_sdk/reachy_sdk.py b/src/reachy2_sdk/reachy_sdk.py index 80db9c24..d5dd08c6 100644 --- a/src/reachy2_sdk/reachy_sdk.py +++ b/src/reachy2_sdk/reachy_sdk.py @@ -11,8 +11,6 @@ from __future__ import annotations -import platform -import readline import threading import time from collections import namedtuple @@ -65,7 +63,7 @@ class ReachySDK: _last_executing_instance = None def __new__(cls: Type[ReachySDK], host: str) -> ReachySDK: - """Ensure that only one instance of ReachySDK is created for each host, and that the variable name is unique.""" + """Ensure that only one instance of ReachySDK is created for each host.""" # check that the host is not already connected to another instance if host in cls._instances_by_host: instance = cls._instances_by_host[host] @@ -74,10 +72,8 @@ def __new__(cls: Type[ReachySDK], host: str) -> ReachySDK: else: del instance - # Create a new instance + # Create a new instance and add it to the dict instance = super().__new__(cls) - - # Add the instance to the instances dict cls._instances_by_host[host] = instance return instance @@ -104,7 +100,6 @@ def __init__( self._print_mode_type() return - self._variable_name: Optional[str] = None self._host = host self._sdk_port = sdk_port self._audio_port = audio_port @@ -124,7 +119,6 @@ def __init__( self._mode: Optional[str] = None self._inactivity_timer: Optional[threading.Timer] = None - self._last_command_id: Optional[str] = None self.connect() @@ -136,7 +130,6 @@ def connect(self) -> None: return self._grpc_channel = grpc.insecure_channel(f"{self._host}:{self._sdk_port}") - self._stop_flag = threading.Event() try: @@ -467,16 +460,6 @@ def _confirm_connection(self) -> bool: else: return False - def _get_current_command(self) -> str: - """Get the current command being typed by the user.""" - if platform.system() == "Windows": - return "" - try: - return readline.get_line_buffer() - print(readline.get_line_buffer()) - except Exception: - return "" - def _print_mode_type(self) -> None: """Print a warning for users, on the mode of Reachy.""" if self._grpc_connected: @@ -486,11 +469,7 @@ def _print_mode_type(self) -> None: else: warning_str = " you're controlling the virtual Reachy" - current_command = self._get_current_command() - - # if the last command was from a different instance, or if there is already a warning message under the last command - # we don't print another warning - if ReachySDK._last_executing_instance != self or (current_command and current_command == self._last_command_id): + if ReachySDK._last_executing_instance != self: return self._logger.warning(f"This Reachy is in {mode} mode :{warning_str}.\n") @@ -509,7 +488,6 @@ def __getattribute__(self, name: str) -> Any: """Intercepts method calls to track user interactions, ignoring private/internal methods.""" if name.startswith("_") or not self._grpc_connected: return super().__getattribute__(name) - else: ReachySDK._last_executing_instance = self From 207514c2e8ed05bfc4d84bf867a0e063b41dd629 Mon Sep 17 00:00:00 2001 From: ClaireHzl <126695207+ClaireHzl@users.noreply.github.com> Date: Wed, 22 Jan 2025 14:35:47 +0100 Subject: [PATCH 7/7] Modify print function. --- src/reachy2_sdk/reachy_sdk.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/reachy2_sdk/reachy_sdk.py b/src/reachy2_sdk/reachy_sdk.py index d5dd08c6..6e3670b2 100644 --- a/src/reachy2_sdk/reachy_sdk.py +++ b/src/reachy2_sdk/reachy_sdk.py @@ -462,15 +462,16 @@ def _confirm_connection(self) -> bool: def _print_mode_type(self) -> None: """Print a warning for users, on the mode of Reachy.""" + # check if the last executing instance is the current one to avoid printing warning on a different instance + if ReachySDK._last_executing_instance != self: + return + if self._grpc_connected: mode = self._mode if mode == "REAL": warning_str = "\n ⚠️ Be careful, you're controlling the PHYSICAL Reachy" else: warning_str = " you're controlling the virtual Reachy" - - if ReachySDK._last_executing_instance != self: - return self._logger.warning(f"This Reachy is in {mode} mode :{warning_str}.\n") def _check_inactivity_from_user(self, timeout: float = 60.0) -> None: