From d70eeaa15c70786972b29c415984051565201280 Mon Sep 17 00:00:00 2001 From: JarbasAI <33701864+JarbasAl@users.noreply.github.com> Date: Mon, 20 May 2024 21:29:38 +0100 Subject: [PATCH] feat/better_kwargs (#24) - internal bus is now a kwarg to allow to easily connect an existing OVOS bus, instead of always using a FakeBus - allow port and host to come from identity - add identity-test script validate info - typing # $ hivemind-client set-identity --key 3d20b5188aa0157562b2bc2e7d966127 --password 6f81688ec095bd00dee63aca13dd091f --host 0.0.0.0 --port 5678 --siteid test # $ cat /home/miro/.config/hivemind/_identity.json # { # "password": "6f81688ec095bd00dee63aca13dd091f", # "access_key": "3d20b5188aa0157562b2bc2e7d966127", # "site_id": "test", # "default_port": 5678, # "default_master": "ws://0.0.0.0" # } # $ hivemind-client test-identity --- hivemind_bus_client/client.py | 89 +++++++++++++++++++++------------ hivemind_bus_client/identity.py | 8 +++ hivemind_bus_client/scripts.py | 16 +++++- 3 files changed, 79 insertions(+), 34 deletions(-) diff --git a/hivemind_bus_client/client.py b/hivemind_bus_client/client.py index 5bc69cb..9f9b4c8 100644 --- a/hivemind_bus_client/client.py +++ b/hivemind_bus_client/client.py @@ -2,7 +2,7 @@ import json import ssl from threading import Event -from typing import Union +from typing import Union, Optional from ovos_bus_client import Message as MycroftMessage, MessageBusClient as OVOSBusClient from ovos_bus_client.session import Session @@ -26,18 +26,18 @@ class HiveMessageWaiter: the actual waiting act so the waiting can be setuo, actions can be performed and _then_ the message can be waited for. - Argunments: + Arguments: bus: Bus to check for messages on message_type: message type to wait for """ - def __init__(self, bus, message_type): + def __init__(self, bus, message_type: Union[HiveMessageType, str]): self.bus = bus self.msg_type = message_type self.received_msg = None # Setup response handler self.response_event = Event() - self.bus.once(message_type, self._handler) + self.bus.on(message_type, self._handler) def _handler(self, message): """Receive response data.""" @@ -54,44 +54,42 @@ def wait(self, timeout=3.0): HiveMessage or None """ self.response_event.wait(timeout) - if not self.response_event.is_set(): - # Clean up the event handler - try: - self.bus.remove(self.msg_type, self._handler) - except (ValueError, KeyError): - # ValueError occurs on pyee 5.0.1 removing handlers - # registered with once. - # KeyError may theoretically occur if the event occurs as - # the handler is removed - pass + self.bus.remove(self.msg_type, self._handler) return self.received_msg class HivePayloadWaiter(HiveMessageWaiter): - def __init__(self, payload_type=HiveMessageType.THIRDPRTY, *args, **kwargs): + def __init__(self, payload_type: Union[HiveMessageType, str], + *args, **kwargs): super(HivePayloadWaiter, self).__init__(*args, **kwargs) self.payload_type = payload_type def _handler(self, message): """Receive response data.""" if message.payload.msg_type == self.payload_type: - self.received_msg = message - self.response_event.set() - else: - self.bus.once(self.msg_type, self._handler) + super()._handler(message) class HiveMessageBusClient(OVOSBusClient): - def __init__(self, key=None, password=None, crypto_key=None, host='127.0.0.1', port=5678, - useragent="", self_signed=True, share_bus=False, - compress=True, binarize=True, identity: NodeIdentity = None): - ssl = host.startswith("wss://") - host = host.replace("ws://", "").replace("wss://", "").strip() + def __init__(self, key: Optional[str] = None, + password: Optional[str] = None, + crypto_key: Optional[str] = None, + host: Optional[str] = None, + port: Optional[int] = None, + useragent: str = "", + self_signed: bool = True, + share_bus: bool = False, + compress: bool = True, + binarize: bool = True, + identity: NodeIdentity = None, + internal_bus: Optional[OVOSBusClient] = None): self.identity = identity or None self._password = password self._access_key = key self._name = useragent + self._port = port + self._host = host self.init_identity() self.crypto_key = crypto_key @@ -103,21 +101,38 @@ def __init__(self, key=None, password=None, crypto_key=None, host='127.0.0.1', p self.compress = compress # None -> auto self.binarize = binarize # only if hivemind reports also supporting it - sess = Session() # new session for this client + # connect to OVOS, if on a OVOS device + if not internal_bus: + # FakeBus needed to send emitted events to handlers registered within the client + sess = Session() # new session for this client + self.internal_bus = FakeBus(session=sess) + else: + sess = Session(session_id=internal_bus.session_id) + self.internal_bus = internal_bus LOG.info(f"Session ID: {sess.session_id}") - self.internal_bus = FakeBus(session=sess) # also send emitted events to handlers registered within the client - super().__init__(host=host, port=port, ssl=ssl, emitter=EventEmitter(), session=sess) + + # NOTE: self._host and self._port accessed only after self.init_identity() + # this allows them to come from set-identity cli command + host = self._host.replace("ws://", "").replace("wss://", "").strip() + use_ssl = host.startswith("wss://") + super().__init__(host=host, port=self._port, ssl=use_ssl, + emitter=EventEmitter(), session=sess) def init_identity(self, site_id=None): self.identity = self.identity or NodeIdentity() self.identity.password = self._password or self.identity.password self.identity.access_key = self._access_key or self.identity.access_key + self.identity.default_master = self._host = self._host or self.identity.default_master + self.identity.default_port = self._port = self._port or self.identity.default_port self.identity.name = self._name or "HiveMessageBusClientV0.0.1" self.identity.site_id = site_id or self.identity.site_id if not self.identity.access_key or not self.identity.password: raise RuntimeError("NodeIdentity not set, please pass key and password or " "call 'hivemind-client set-identity'") + if not self.identity.default_master: + raise RuntimeError("host not set, please pass host and port or " + "call 'hivemind-client set-identity'") @property def useragent(self): @@ -272,6 +287,10 @@ def emit(self, message: Union[MycroftMessage, HiveMessage]): ctxt["platform"] = self.useragent if "destination" not in message.payload.context: ctxt["destination"] = "HiveMind" + if "session" not in ctxt: + ctxt["session"] = {} + ctxt["session"]["session_id"] = self.session_id + ctxt["session"]["site_id"] = self.site_id message.payload.context = ctxt # also send event to client registered handlers self.internal_bus.emit(message.payload) @@ -325,7 +344,7 @@ def on(self, event_name, func): self.emitter.on(event_name, func) # utility - def wait_for_message(self, message_type, timeout=3.0): + def wait_for_message(self, message_type: Union[HiveMessageType, str], timeout=3.0): """Wait for a message of a specific type. Arguments: @@ -338,8 +357,8 @@ def wait_for_message(self, message_type, timeout=3.0): return HiveMessageWaiter(self, message_type).wait(timeout) - def wait_for_payload(self, payload_type: str, - message_type=HiveMessageType.THIRDPRTY, + def wait_for_payload(self, payload_type: Union[HiveMessageType, str], + message_type: Union[HiveMessageType, str] = HiveMessageType.THIRDPRTY, timeout=3.0): """Wait for a message of a specific type + payload of a specific type. @@ -359,7 +378,9 @@ def wait_for_mycroft(self, mycroft_msg_type: str, timeout: float = 3.0): return self.wait_for_payload(mycroft_msg_type, timeout=timeout, message_type=HiveMessageType.BUS) - def wait_for_response(self, message, reply_type=None, timeout=3.0): + def wait_for_response(self, message: Union[MycroftMessage, HiveMessage], + reply_type: Optional[Union[HiveMessageType, str]] = None, + timeout=3.0): """Send a message and wait for a response. Arguments: @@ -379,8 +400,10 @@ def wait_for_response(self, message, reply_type=None, timeout=3.0): self.emit(message) return waiter.wait(timeout) - def wait_for_payload_response(self, message, payload_type, - reply_type=None, timeout=3.0): + def wait_for_payload_response(self, message: Union[MycroftMessage, HiveMessage], + payload_type: Union[HiveMessageType, str], + reply_type: Optional[Union[HiveMessageType, str]] = None, + timeout=3.0): """Send a message and wait for a response. Arguments: diff --git a/hivemind_bus_client/identity.py b/hivemind_bus_client/identity.py index 22ff343..e0440f6 100644 --- a/hivemind_bus_client/identity.py +++ b/hivemind_bus_client/identity.py @@ -68,6 +68,14 @@ def default_master(self): def default_master(self, val): self.IDENTITY_FILE["default_master"] = val + @property + def default_port(self): + return self.IDENTITY_FILE.get("default_port") + + @default_port.setter + def default_port(self, val): + self.IDENTITY_FILE["default_port"] = val + def save(self): self.IDENTITY_FILE.store() diff --git a/hivemind_bus_client/scripts.py b/hivemind_bus_client/scripts.py index 4188f5b..96656ef 100644 --- a/hivemind_bus_client/scripts.py +++ b/hivemind_bus_client/scripts.py @@ -20,14 +20,16 @@ def hmclient_cmds(): @click.option("--key", help="HiveMind access key", type=str, default="") @click.option("--password", help="HiveMind password", type=str, default="") @click.option("--host", help="default host for hivemind-core", type=str, default="") +@click.option("--port", help="default port for hivemind-core", type=int, default=5678) @click.option("--siteid", help="location identifier for message.context", type=str, default="") -def identity_set(key: str, password: str, host: str, siteid: str): +def identity_set(key: str, password: str, host: str, port: int, siteid: str): if not key and not password and not siteid: raise ValueError("please set at least one of key/password/siteid/host") identity = NodeIdentity() identity.password = password or identity.password identity.access_key = key or identity.access_key identity.site_id = siteid or identity.site_id + identity.default_port = port or identity.default_port host = host or identity.default_master if not host.startswith("ws://") and not host.startswith("wss://"): host = "ws://" + host @@ -178,5 +180,17 @@ def propagate(key: str, password: str, host: str, port: int, siteid: str, msg: s node.close() +@hmclient_cmds.command(help="test if Identity file can connect to HiveMind", + name="test-identity") +def test_identity(): + node = HiveMessageBusClient() + node.connect(FakeBus()) + + node.connected_event.wait() + print("== Identity successfully connected to HiveMind!") + + node.close() + + if __name__ == "__main__": hmclient_cmds()