Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

477 add information about real or fake mode #482

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 75 additions & 6 deletions src/reachy2_sdk/reachy_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from reachy2_sdk_api import reachy_pb2, reachy_pb2_grpc
from reachy2_sdk_api.goto_pb2 import GoalStatus, GoToAck, GoToGoalStatus, GoToId
from reachy2_sdk_api.goto_pb2_grpc import GoToServiceStub
from reachy2_sdk_api.reachy_pb2 import ReachyState
from reachy2_sdk_api.reachy_pb2 import ReachyCoreMode, ReachyState

from .config.reachy_info import ReachyInfo
from .media.camera_manager import CameraManager
Expand Down Expand Up @@ -60,17 +60,22 @@ class ReachySDK:
"""

_instances_by_host: Dict[str, "ReachySDK"] = {}
_last_executing_instance = None

def __new__(cls: Type[ReachySDK], host: str) -> ReachySDK:
"""Ensure only one connected instance per IP is created."""
"""Ensure that only one instance of ReachySDK is created for each host."""
# check that the host is not already connected to another instance
if host in cls._instances_by_host:
if cls._instances_by_host[host]._grpc_connected:
return cls._instances_by_host[host]
instance = cls._instances_by_host[host]
if instance._grpc_connected:
return instance
else:
del cls._instances_by_host[host]
del instance

# Create a new instance and add it to the dict
instance = super().__new__(cls)
cls._instances_by_host[host] = instance

return instance

def __init__(
Expand All @@ -92,6 +97,7 @@ def __init__(

if hasattr(self, "_initialized"):
self._logger.warning("An instance already exists.")
self._print_mode_type()
return

self._host = host
Expand All @@ -111,20 +117,32 @@ def __init__(

self._update_timestamp: Timestamp = Timestamp(seconds=0)

self._mode: Optional[str] = None
self._inactivity_timer: Optional[threading.Timer] = None

self.connect()

def connect(self) -> None:
"""Connects the SDK to the robot."""
if self._grpc_connected:
self._logger.warning("Already connected to Reachy.")
self._print_mode_type()
return

self._grpc_channel = grpc.insecure_channel(f"{self._host}:{self._sdk_port}")

self._stop_flag = threading.Event()

try:
self._get_info()
self._mode = str(ReachyCoreMode.keys()[self._info._mode]) if self._info else None

# ask for user confirmation if the robot is physical
if self._mode == "REAL":
if not self._confirm_connection():
self._logger.warning("Connection to Reachy aborted.")
self.disconnect()
return

except ConnectionError:
self._logger.error(
f"Could not connect to Reachy with on IP address {self._host}, "
Expand All @@ -134,6 +152,7 @@ def connect(self) -> None:
return

self._setup_parts()

# self._setup_audio()
self._cameras = self._setup_video()

Expand All @@ -147,6 +166,10 @@ def connect(self) -> None:

self._grpc_connected = True
self._logger.info("Connected to Reachy.")
self._print_mode_type()

if self._mode == "REAL":
self._check_inactivity_from_user()

def disconnect(self, lost_connection: bool = False) -> None:
"""Disconnect the SDK from the robot's server.
Expand All @@ -169,6 +192,7 @@ def disconnect(self, lost_connection: bool = False) -> None:
self._r_arm = None
self._l_arm = None
self._mobile_base = None
self._mode = None

self._logger.info("Disconnected from Reachy.")

Expand Down Expand Up @@ -428,6 +452,51 @@ def _setup_parts(self) -> None:
self._setup_part_head(initial_state)
self._setup_part_mobile_base(initial_state)

def _confirm_connection(self) -> bool:
"""Ask the user to confirm the connection to a physical Reachy."""
response = input("⚠️ You are about to connect to a PHYSICAL Reachy.\n Do you want to continue (y/n)?").strip().lower()
if response in ["", "y", "yes"]:
return True
else:
return False

def _print_mode_type(self) -> None:
"""Print a warning for users, on the mode of Reachy."""
# check if the last executing instance is the current one to avoid printing warning on a different instance
if ReachySDK._last_executing_instance != self:
return

if self._grpc_connected:
mode = self._mode
if mode == "REAL":
warning_str = "\n ⚠️ Be careful, you're controlling the PHYSICAL Reachy"
else:
warning_str = " you're controlling the virtual Reachy"
self._logger.warning(f"This Reachy is in {mode} mode :{warning_str}.\n")

def _check_inactivity_from_user(self, timeout: float = 60.0) -> None:
"""Check inactivity from the user, by catching the functions called by them.

If that exceeds the timeout, print the mode type for the user to have a reminder.
Default timeout is 60 seconds.
"""
if self._inactivity_timer:
self._inactivity_timer.cancel()
self._inactivity_timer = threading.Timer(timeout, self._print_mode_type)
self._inactivity_timer.start()

def __getattribute__(self, name: str) -> Any:
"""Intercepts method calls to track user interactions, ignoring private/internal methods."""
if name.startswith("_") or not self._grpc_connected:
return super().__getattribute__(name)
else:
ReachySDK._last_executing_instance = self

if self._mode == "REAL":
self._check_inactivity_from_user()

return super().__getattribute__(name)

def get_update_timestamp(self) -> int:
"""Returns the timestamp (ns) of the last update.

Expand Down