-
-
Notifications
You must be signed in to change notification settings - Fork 6
Feat: rendez #103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Feat: rendez #103
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||
|---|---|---|---|---|---|---|---|---|
|
|
@@ -10,7 +10,7 @@ | |||||||
| - **Trusted Peers**: `NodeIdentity.trusted_keys` gates BUS injection for PROPAGATE and INTERCOM from untrusted sources. | ||||||||
| - **CASCADE Aggregation**: Collects responses from all nodes with timeout and early resolution via `HiveMapper`. | ||||||||
| - **PING Discovery**: Flood-based topology mapping with public key and locale announcement. | ||||||||
| - **Binary Support**: Optimized handling for binary payloads such as TTS audio and file transfers. | ||||||||
| - **Binary Support**: Optimized handling for binary payloads such as TTS audio and file transfers.\n- **Rendezvous Polling**: Optional background thread that periodically retrieves INTERCOM messages from one or more `hivemind-rendezvous` servers, enabling async communication with nodes from non-simultaneously-connected hives. | ||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix inline The literal 📝 Proposed fix-- **Binary Support**: Optimized handling for binary payloads such as TTS audio and file transfers.\n- **Rendezvous Polling**: Optional background thread that periodically retrieves INTERCOM messages from one or more `hivemind-rendezvous` servers, enabling async communication with nodes from non-simultaneously-connected hives.
+- **Binary Support**: Optimized handling for binary payloads such as TTS audio and file transfers.
+- **Rendezvous Polling**: Optional background thread that periodically retrieves INTERCOM messages from one or more `hivemind-rendezvous` servers, enabling async communication with nodes from non-simultaneously-connected hives.📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||
| - **Drop-in Replacement**: Designed to be mostly compatible with `ovos-bus-client`, allowing easy migration of existing OVOS skills or services to HiveMind. | ||||||||
|
|
||||||||
| ## Primary Components | ||||||||
|
|
||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,10 @@ | ||
| import json | ||
| import ssl | ||
| from threading import Event | ||
| from typing import Union, Optional, Callable | ||
| import time | ||
| import urllib.error | ||
| import urllib.request | ||
| from threading import Event, Thread | ||
| from typing import Union, Optional, Callable, List | ||
|
|
||
| import pybase64 | ||
| from Cryptodome.PublicKey import RSA | ||
|
|
@@ -103,7 +106,9 @@ def __init__(self, key: Optional[str] = None, | |
| binarize: bool = True, | ||
| identity: NodeIdentity = None, | ||
| internal_bus: Optional[OVOSBusClient] = None, | ||
| bin_callbacks: BinaryDataCallbacks = BinaryDataCallbacks()): | ||
| bin_callbacks: BinaryDataCallbacks = BinaryDataCallbacks(), | ||
| rendezvous_urls: Optional[List[str]] = None, | ||
| rendezvous_poll_interval: float = 60.0): | ||
| self.bin_callbacks = bin_callbacks | ||
| self.json_encoding = SupportedEncodings.JSON_HEX # server defaults before it was made configurable | ||
| self.cipher = SupportedCiphers.AES_GCM # server defaults before it was made configurable | ||
|
|
@@ -120,6 +125,11 @@ def __init__(self, key: Optional[str] = None, | |
| self.allow_self_signed = self_signed | ||
| self.share_bus = share_bus | ||
| self.handshake_event = Event() | ||
| self._rendezvous_urls: List[str] = list(rendezvous_urls) if rendezvous_urls else [] | ||
| self._rendezvous_poll_interval: float = rendezvous_poll_interval | ||
| self._rendezvous_stop_event: Event = Event() | ||
| self._rendezvous_thread: Optional[Thread] = None | ||
| self._rendezvous_server_pubkeys: dict = {} # base_url -> cached server pubkey | ||
|
|
||
| # if you want to reduce CPU usage in exchange for more bandwidth set below to False | ||
| self.compress = compress # None -> auto | ||
|
|
@@ -142,6 +152,9 @@ def __init__(self, key: Optional[str] = None, | |
| super().__init__(host=host, port=self._port, ssl=use_ssl, | ||
| emitter=EventEmitter(), session=sess) | ||
|
|
||
| if self._rendezvous_urls: | ||
| self._start_rendezvous_polling() | ||
|
|
||
| def init_identity(self, site_id=None): | ||
| self.identity = self.identity or NodeIdentity() | ||
| self.identity.password = self._password or self.identity.password | ||
|
|
@@ -231,8 +244,123 @@ def on_error(self, *args): | |
| def on_close(self, *args): | ||
| self.handshake_event.clear() | ||
| self.crypto_key = None | ||
| self._stop_rendezvous_polling() | ||
| super().on_close(*args) | ||
|
|
||
| # ------------------------------------------------------------------ | ||
| # Rendezvous polling | ||
| # ------------------------------------------------------------------ | ||
|
|
||
| def _start_rendezvous_polling(self) -> None: | ||
| """Start the background thread that periodically polls rendezvous URLs. | ||
|
|
||
| Does nothing if no URLs are configured or a thread is already running. | ||
| """ | ||
| if not self._rendezvous_urls or self._rendezvous_thread is not None: | ||
| return | ||
| self._rendezvous_stop_event.clear() | ||
| self._rendezvous_thread = Thread( | ||
| target=self._rendezvous_poll_loop, | ||
| daemon=True, | ||
| name="rendezvous-poller", | ||
| ) | ||
| self._rendezvous_thread.start() | ||
| LOG.info("Rendezvous polling started (%d URLs, interval=%.0fs)", | ||
| len(self._rendezvous_urls), self._rendezvous_poll_interval) | ||
|
|
||
| def _stop_rendezvous_polling(self) -> None: | ||
| """Signal the rendezvous polling thread to stop and wait for it to exit.""" | ||
| stop_event = getattr(self, "_rendezvous_stop_event", None) | ||
| if stop_event is None: | ||
| return | ||
| stop_event.set() | ||
| thread = getattr(self, "_rendezvous_thread", None) | ||
| if thread is not None: | ||
| thread.join(timeout=5) | ||
| self._rendezvous_thread = None | ||
|
|
||
| def _rendezvous_poll_loop(self) -> None: | ||
| """Background loop: poll every configured rendezvous URL at the configured interval.""" | ||
| while not self._rendezvous_stop_event.wait(timeout=self._rendezvous_poll_interval): | ||
| for url in self._rendezvous_urls: | ||
| try: | ||
| self._poll_rendezvous(url.rstrip("/")) | ||
| except Exception: | ||
| LOG.exception("Rendezvous poll error for %s", url) | ||
|
|
||
| def _fetch_rendezvous_server_pubkey(self, base_url: str) -> str: | ||
| """Fetch and cache the rendezvous server's RSA public key from ``GET /pubkey``. | ||
|
|
||
| Args: | ||
| base_url: Rendezvous server base URL (trailing slash already stripped). | ||
|
|
||
| Returns: | ||
| PEM-encoded RSA public key string, or empty string on failure. | ||
| """ | ||
| if base_url in self._rendezvous_server_pubkeys: | ||
| return self._rendezvous_server_pubkeys[base_url] | ||
| try: | ||
| req = urllib.request.Request(f"{base_url}/pubkey", method="GET") | ||
| with urllib.request.urlopen(req, timeout=10) as resp: | ||
| data = json.loads(resp.read()) | ||
| server_pubkey: str = data.get("pubkey", "") | ||
| self._rendezvous_server_pubkeys[base_url] = server_pubkey | ||
| return server_pubkey | ||
| except Exception as exc: | ||
| LOG.warning("Rendezvous: could not fetch server pubkey from %s: %s", base_url, exc) | ||
| return "" | ||
|
|
||
| def _poll_rendezvous(self, base_url: str) -> None: | ||
| """Retrieve pending messages from one rendezvous server and inject them. | ||
|
|
||
| Fetches the server's public key (cached after first call), builds a | ||
| server-bound domain-separated proof-of-ownership, POSTs to | ||
| ``{base_url}/retrieve``, and feeds each returned serialised | ||
| :class:`HiveMessage` into :meth:`_handle_hive_protocol`. | ||
|
|
||
| Args: | ||
| base_url: Rendezvous server base URL (trailing slash already stripped). | ||
| """ | ||
| from hivemind_rendezvous.auth import sign_ownership | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# Check if hivemind_rendezvous is declared as a dependency
echo "=== Checking setup.py ==="
cat setup.py 2>/dev/null | grep -i rendezvous || echo "Not found in setup.py"
echo "=== Checking pyproject.toml ==="
cat pyproject.toml 2>/dev/null | grep -i rendezvous || echo "Not found in pyproject.toml"
echo "=== Checking requirements*.txt ==="
cat requirements*.txt 2>/dev/null | grep -i rendezvous || echo "Not found in requirements"Repository: JarbasHiveMind/hivemind-websocket-client Length of output: 252 🏁 Script executed: # Check what dependencies are actually declared
echo "=== Checking setup.py dependencies ==="
head -100 setup.py 2>/dev/null || echo "setup.py not found or too small"
echo -e "\n=== Checking pyproject.toml ==="
cat pyproject.toml 2>/dev/null || echo "pyproject.toml not found"
echo -e "\n=== Checking for any requirements files ==="
ls -la requirements*.txt 2>/dev/null || echo "No requirements files found"Repository: JarbasHiveMind/hivemind-websocket-client Length of output: 4183 🏁 Script executed: # View the code context around line 324 in hivemind_bus_client/client.py
echo "=== Code context around line 324 ==="
sed -n '310,340p' hivemind_bus_client/client.py 2>/dev/null || echo "File or lines not found"Repository: JarbasHiveMind/hivemind-websocket-client Length of output: 1447 🏁 Script executed: # Check requirements.txt content
cat requirements.txt
# Get more context around line 324 to check for error handling
echo -e "\n=== Broader context (lines 300-360) ==="
sed -n '300,360p' hivemind_bus_client/client.pyRepository: JarbasHiveMind/hivemind-websocket-client Length of output: 2977 Add The import 🤖 Prompt for AI Agents |
||
|
|
||
| server_pubkey: str = self._fetch_rendezvous_server_pubkey(base_url) | ||
| pubkey: str = self.identity.public_key | ||
| private_key = load_RSA_key(self.identity.private_key) | ||
| timestamp: int = int(time.time()) | ||
| signature: str = sign_ownership(private_key, pubkey, timestamp, | ||
| server_pubkey=server_pubkey) | ||
|
|
||
| body = json.dumps({ | ||
| "pubkey": pubkey, | ||
| "timestamp": timestamp, | ||
| "signature": signature, | ||
| }).encode("utf-8") | ||
| req = urllib.request.Request( | ||
| f"{base_url}/retrieve", | ||
| data=body, | ||
| headers={"Content-Type": "application/json"}, | ||
| method="POST", | ||
| ) | ||
| try: | ||
| with urllib.request.urlopen(req, timeout=10) as resp: | ||
| result = json.loads(resp.read()) | ||
| except urllib.error.HTTPError as exc: | ||
| LOG.warning("Rendezvous retrieve failed (%s): HTTP %d", base_url, exc.code) | ||
| return | ||
| except OSError as exc: | ||
| LOG.warning("Rendezvous retrieve connection error (%s): %s", base_url, exc) | ||
| return | ||
|
|
||
| messages = result.get("messages", []) | ||
| if messages: | ||
| LOG.info("Rendezvous: received %d message(s) from %s", len(messages), base_url) | ||
| for serialised in messages: | ||
| try: | ||
| msg = HiveMessage.deserialize(serialised) | ||
| self._handle_hive_protocol(msg) | ||
| except Exception: | ||
| LOG.exception("Rendezvous: failed to process retrieved message") | ||
|
|
||
| def wait_for_handshake(self, timeout=5, max_retries=15): | ||
| """ | ||
| Waits for the HiveMind handshake to complete; if the handshake is not set and the websocket connection is open, starts the handshake, otherwise waits for the websocket to open and retries. | ||
|
|
@@ -548,5 +676,7 @@ def emit_intercom(self, message: Union[MycroftMessage, HiveMessage], | |
| pubkey: RSA public key of the target peer. | ||
| """ | ||
| private_key = load_RSA_key(self.identity.private_key) | ||
| envelope = hybrid_encrypt(pubkey, message.serialize(), sign_key=private_key) | ||
| envelope = hybrid_encrypt(pubkey, message.serialize(), | ||
| sign_key=private_key, | ||
| recipient_pubkey=pubkey) | ||
| self.emit(HiveMessage(HiveMessageType.INTERCOM, payload=envelope)) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add
recipient_pubkeyto enable recipient binding in the example.The example encrypts for
recipient_pubkeybut doesn't pass it as therecipient_pubkeyparameter, so the envelope won't includerecipient_fingerprintfor binding. This should match the new API.📝 Proposed fix
📝 Committable suggestion
🤖 Prompt for AI Agents