Skip to content

Commit

Permalink
feat/better_kwargs (#24)
Browse files Browse the repository at this point in the history
- internal bus is now a kwarg to allow to easily connect an existing OVOS bus, instead of always using a FakeBus

- allow port and host to come from identity

- add identity-test script validate info

- typing

    # $ hivemind-client set-identity --key 3d20b5188aa0157562b2bc2e7d966127 --password 6f81688ec095bd00dee63aca13dd091f --host 0.0.0.0 --port 5678 --siteid test

    # $ cat /home/miro/.config/hivemind/_identity.json
    # {
    #     "password": "6f81688ec095bd00dee63aca13dd091f",
    #     "access_key": "3d20b5188aa0157562b2bc2e7d966127",
    #     "site_id": "test",
    #     "default_port": 5678,
    #     "default_master": "ws://0.0.0.0"
    # }

    # $ hivemind-client test-identity
  • Loading branch information
JarbasAl authored May 20, 2024
1 parent b426fe7 commit d70eeaa
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 34 deletions.
89 changes: 56 additions & 33 deletions hivemind_bus_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
import ssl
from threading import Event
from typing import Union
from typing import Union, Optional

from ovos_bus_client import Message as MycroftMessage, MessageBusClient as OVOSBusClient
from ovos_bus_client.session import Session
Expand All @@ -26,18 +26,18 @@ class HiveMessageWaiter:
the actual waiting act so the waiting can be setuo, actions can be
performed and _then_ the message can be waited for.
Argunments:
Arguments:
bus: Bus to check for messages on
message_type: message type to wait for
"""

def __init__(self, bus, message_type):
def __init__(self, bus, message_type: Union[HiveMessageType, str]):
self.bus = bus
self.msg_type = message_type
self.received_msg = None
# Setup response handler
self.response_event = Event()
self.bus.once(message_type, self._handler)
self.bus.on(message_type, self._handler)

def _handler(self, message):
"""Receive response data."""
Expand All @@ -54,44 +54,42 @@ def wait(self, timeout=3.0):
HiveMessage or None
"""
self.response_event.wait(timeout)
if not self.response_event.is_set():
# Clean up the event handler
try:
self.bus.remove(self.msg_type, self._handler)
except (ValueError, KeyError):
# ValueError occurs on pyee 5.0.1 removing handlers
# registered with once.
# KeyError may theoretically occur if the event occurs as
# the handler is removed
pass
self.bus.remove(self.msg_type, self._handler)
return self.received_msg


class HivePayloadWaiter(HiveMessageWaiter):
def __init__(self, payload_type=HiveMessageType.THIRDPRTY, *args, **kwargs):
def __init__(self, payload_type: Union[HiveMessageType, str],
*args, **kwargs):
super(HivePayloadWaiter, self).__init__(*args, **kwargs)
self.payload_type = payload_type

def _handler(self, message):
"""Receive response data."""
if message.payload.msg_type == self.payload_type:
self.received_msg = message
self.response_event.set()
else:
self.bus.once(self.msg_type, self._handler)
super()._handler(message)


class HiveMessageBusClient(OVOSBusClient):
def __init__(self, key=None, password=None, crypto_key=None, host='127.0.0.1', port=5678,
useragent="", self_signed=True, share_bus=False,
compress=True, binarize=True, identity: NodeIdentity = None):
ssl = host.startswith("wss://")
host = host.replace("ws://", "").replace("wss://", "").strip()
def __init__(self, key: Optional[str] = None,
password: Optional[str] = None,
crypto_key: Optional[str] = None,
host: Optional[str] = None,
port: Optional[int] = None,
useragent: str = "",
self_signed: bool = True,
share_bus: bool = False,
compress: bool = True,
binarize: bool = True,
identity: NodeIdentity = None,
internal_bus: Optional[OVOSBusClient] = None):

self.identity = identity or None
self._password = password
self._access_key = key
self._name = useragent
self._port = port
self._host = host
self.init_identity()

self.crypto_key = crypto_key
Expand All @@ -103,21 +101,38 @@ def __init__(self, key=None, password=None, crypto_key=None, host='127.0.0.1', p
self.compress = compress # None -> auto
self.binarize = binarize # only if hivemind reports also supporting it

sess = Session() # new session for this client
# connect to OVOS, if on a OVOS device
if not internal_bus:
# FakeBus needed to send emitted events to handlers registered within the client
sess = Session() # new session for this client
self.internal_bus = FakeBus(session=sess)
else:
sess = Session(session_id=internal_bus.session_id)
self.internal_bus = internal_bus
LOG.info(f"Session ID: {sess.session_id}")
self.internal_bus = FakeBus(session=sess) # also send emitted events to handlers registered within the client
super().__init__(host=host, port=port, ssl=ssl, emitter=EventEmitter(), session=sess)

# NOTE: self._host and self._port accessed only after self.init_identity()
# this allows them to come from set-identity cli command
host = self._host.replace("ws://", "").replace("wss://", "").strip()
use_ssl = host.startswith("wss://")
super().__init__(host=host, port=self._port, ssl=use_ssl,
emitter=EventEmitter(), session=sess)

def init_identity(self, site_id=None):
self.identity = self.identity or NodeIdentity()
self.identity.password = self._password or self.identity.password
self.identity.access_key = self._access_key or self.identity.access_key
self.identity.default_master = self._host = self._host or self.identity.default_master
self.identity.default_port = self._port = self._port or self.identity.default_port
self.identity.name = self._name or "HiveMessageBusClientV0.0.1"
self.identity.site_id = site_id or self.identity.site_id

if not self.identity.access_key or not self.identity.password:
raise RuntimeError("NodeIdentity not set, please pass key and password or "
"call 'hivemind-client set-identity'")
if not self.identity.default_master:
raise RuntimeError("host not set, please pass host and port or "
"call 'hivemind-client set-identity'")

@property
def useragent(self):
Expand Down Expand Up @@ -272,6 +287,10 @@ def emit(self, message: Union[MycroftMessage, HiveMessage]):
ctxt["platform"] = self.useragent
if "destination" not in message.payload.context:
ctxt["destination"] = "HiveMind"
if "session" not in ctxt:
ctxt["session"] = {}
ctxt["session"]["session_id"] = self.session_id
ctxt["session"]["site_id"] = self.site_id
message.payload.context = ctxt
# also send event to client registered handlers
self.internal_bus.emit(message.payload)
Expand Down Expand Up @@ -325,7 +344,7 @@ def on(self, event_name, func):
self.emitter.on(event_name, func)

# utility
def wait_for_message(self, message_type, timeout=3.0):
def wait_for_message(self, message_type: Union[HiveMessageType, str], timeout=3.0):
"""Wait for a message of a specific type.
Arguments:
Expand All @@ -338,8 +357,8 @@ def wait_for_message(self, message_type, timeout=3.0):

return HiveMessageWaiter(self, message_type).wait(timeout)

def wait_for_payload(self, payload_type: str,
message_type=HiveMessageType.THIRDPRTY,
def wait_for_payload(self, payload_type: Union[HiveMessageType, str],
message_type: Union[HiveMessageType, str] = HiveMessageType.THIRDPRTY,
timeout=3.0):
"""Wait for a message of a specific type + payload of a specific type.
Expand All @@ -359,7 +378,9 @@ def wait_for_mycroft(self, mycroft_msg_type: str, timeout: float = 3.0):
return self.wait_for_payload(mycroft_msg_type, timeout=timeout,
message_type=HiveMessageType.BUS)

def wait_for_response(self, message, reply_type=None, timeout=3.0):
def wait_for_response(self, message: Union[MycroftMessage, HiveMessage],
reply_type: Optional[Union[HiveMessageType, str]] = None,
timeout=3.0):
"""Send a message and wait for a response.
Arguments:
Expand All @@ -379,8 +400,10 @@ def wait_for_response(self, message, reply_type=None, timeout=3.0):
self.emit(message)
return waiter.wait(timeout)

def wait_for_payload_response(self, message, payload_type,
reply_type=None, timeout=3.0):
def wait_for_payload_response(self, message: Union[MycroftMessage, HiveMessage],
payload_type: Union[HiveMessageType, str],
reply_type: Optional[Union[HiveMessageType, str]] = None,
timeout=3.0):
"""Send a message and wait for a response.
Arguments:
Expand Down
8 changes: 8 additions & 0 deletions hivemind_bus_client/identity.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,14 @@ def default_master(self):
def default_master(self, val):
self.IDENTITY_FILE["default_master"] = val

@property
def default_port(self):
return self.IDENTITY_FILE.get("default_port")

@default_port.setter
def default_port(self, val):
self.IDENTITY_FILE["default_port"] = val

def save(self):
self.IDENTITY_FILE.store()

Expand Down
16 changes: 15 additions & 1 deletion hivemind_bus_client/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ def hmclient_cmds():
@click.option("--key", help="HiveMind access key", type=str, default="")
@click.option("--password", help="HiveMind password", type=str, default="")
@click.option("--host", help="default host for hivemind-core", type=str, default="")
@click.option("--port", help="default port for hivemind-core", type=int, default=5678)
@click.option("--siteid", help="location identifier for message.context", type=str, default="")
def identity_set(key: str, password: str, host: str, siteid: str):
def identity_set(key: str, password: str, host: str, port: int, siteid: str):
if not key and not password and not siteid:
raise ValueError("please set at least one of key/password/siteid/host")
identity = NodeIdentity()
identity.password = password or identity.password
identity.access_key = key or identity.access_key
identity.site_id = siteid or identity.site_id
identity.default_port = port or identity.default_port
host = host or identity.default_master
if not host.startswith("ws://") and not host.startswith("wss://"):
host = "ws://" + host
Expand Down Expand Up @@ -178,5 +180,17 @@ def propagate(key: str, password: str, host: str, port: int, siteid: str, msg: s
node.close()


@hmclient_cmds.command(help="test if Identity file can connect to HiveMind",
name="test-identity")
def test_identity():
node = HiveMessageBusClient()
node.connect(FakeBus())

node.connected_event.wait()
print("== Identity successfully connected to HiveMind!")

node.close()


if __name__ == "__main__":
hmclient_cmds()

0 comments on commit d70eeaa

Please sign in to comment.