diff --git a/hivemind_bus_client/client.py b/hivemind_bus_client/client.py index 3429a74..fee5e74 100644 --- a/hivemind_bus_client/client.py +++ b/hivemind_bus_client/client.py @@ -12,7 +12,7 @@ from websocket import ABNF from websocket import WebSocketApp, WebSocketConnectionClosedException import pgpy - +from hivemind_bus_client.serialization import HiveMindBinaryPayloadType from hivemind_bus_client.identity import NodeIdentity from hivemind_bus_client.message import HiveMessage, HiveMessageType from hivemind_bus_client.serialization import get_bitstring, decode_bitstring @@ -269,7 +269,8 @@ def on_message(self, *args): elif isinstance(message, str): message = json.loads(message) if "ciphertext" in message: - raise RuntimeError("got encrypted message, but could not decrypt!") + LOG.error("got encrypted message, but could not decrypt!") + return self.emitter.emit('message', message) # raw message self._handle_hive_protocol(HiveMessage(**message)) @@ -279,7 +280,8 @@ def _handle_hive_protocol(self, message: HiveMessage): self.internal_bus.emit(message.payload) self.emitter.emit(message.msg_type, message) # hive message - def emit(self, message: Union[MycroftMessage, HiveMessage]): + def emit(self, message: Union[MycroftMessage, HiveMessage], + binary_type: HiveMindBinaryPayloadType=HiveMindBinaryPayloadType.UNDEFINED): if isinstance(message, MycroftMessage): message = HiveMessage(msg_type=HiveMessageType.BUS, payload=message) @@ -321,7 +323,8 @@ def emit(self, message: Union[MycroftMessage, HiveMessage]): if binarize: bitstr = get_bitstring(hive_type=message.msg_type, payload=message.payload, - compressed=self.compress) + compressed=self.compress, + binary_type=binary_type) if self.crypto_key: ws_payload = encrypt_bin(self.crypto_key, bitstr.bytes) else: diff --git a/hivemind_bus_client/message.py b/hivemind_bus_client/message.py index 66201cb..bf4fc06 100644 --- a/hivemind_bus_client/message.py +++ b/hivemind_bus_client/message.py @@ -1,5 +1,5 @@ import json -from enum import Enum +from enum import Enum, IntEnum from ovos_bus_client import Message from ovos_utils.json_helper import merge_dict @@ -29,31 +29,44 @@ class HiveMessageType(str, Enum): BINARY = "bin" # binary data container, payload for something else +class HiveMindBinaryPayloadType(IntEnum): + """ Pseudo extension type for binary payloads + it doesnt describe the payload but rather provides instruction to hivemind about how to handle it""" + UNDEFINED = 0 # no info provided about binary contents + RAW_AUDIO = 1 # binary content is raw audio (TODO spec exactly what "raw audio" means) + NUMPY_IMAGE = 2 # binary content is an image as a numpy array, eg. webcam picture + FILE = 3 # binary is a file to be saved, additional metadata provided elsewhere + + class HiveMessage: def __init__(self, msg_type: Union[HiveMessageType, str], - payload: Optional[Union[Message, 'HiveMessage', str, dict]] =None, + payload: Optional[Union[Message, 'HiveMessage', str, dict, bytes]] =None, node: Optional[str]=None, source_peer: Optional[str]=None, route: Optional[List[str]]=None, target_peers: Optional[List[str]]=None, target_site_id: Optional[str] =None, - target_pubkey: Optional[str] =None): + target_pubkey: Optional[str] =None, + bin_type: HiveMindBinaryPayloadType = HiveMindBinaryPayloadType.UNDEFINED): # except for the hivemind node classes receiving the message and # creating the object nothing should be able to change these values # node classes might change them a runtime by the private attribute # but end-users should consider them read_only - - if msg_type not in [m.value for m in HiveMessageType]: raise ValueError("Unknown HiveMessage.msg_type") self._msg_type = msg_type + self._bin_type = bin_type + if msg_type == HiveMessageType.BINARY: + self.__setattr__("bin_type", bin_type) # the payload is more or less a free for all # the msg_type determines what happens to the message, but the # payload can simply be ignored by the receiving module # we store things in dict/json format, json is always used at the # transport layer before converting into any of the other formats - if isinstance(payload, Message): + if not isinstance(payload, bytes) and msg_type == HiveMessageType.BINARY: + raise ValueError(f"expected 'bytes' payload for HiveMessageType.BINARY, got {type(payload)}") + elif isinstance(payload, Message): payload = {"type": payload.msg_type, "data": payload.data, "context": payload.context} @@ -99,7 +112,7 @@ def route(self) -> List[str]: return [r for r in self._route if r.get("targets") and r.get("source")] @property - def payload(self) -> Union['HiveMessage', Message, dict]: + def payload(self) -> Union['HiveMessage', Message, dict, bytes]: if self.msg_type in [HiveMessageType.BUS, HiveMessageType.SHARED_BUS]: return Message(self._payload["type"], data=self._payload.get("data"), @@ -114,6 +127,8 @@ def payload(self) -> Union['HiveMessage', Message, dict]: @property def as_dict(self) -> dict: pload = self._payload + if self.msg_type == HiveMessageType.BINARY: + raise ValueError("messages with type HiveMessageType.BINARY can not be cast to dict") if isinstance(pload, HiveMessage): pload = pload.as_dict elif isinstance(pload, Message): @@ -166,12 +181,17 @@ def deserialize(payload: Union[str, dict]) -> 'HiveMessage': target_pubkey=payload.get("target_pubkey")) def __getitem__(self, item): + if not isinstance(self._payload, dict): + return None return self._payload.get(item) def __setitem__(self, key, value): - self._payload[key] = value + if isinstance(self._payload, dict): + self._payload[key] = value def __str__(self): + if self.msg_type == HiveMessageType.BINARY: + return f"HiveMessage(BINARY:{len(self._payload)}])" return self.as_json def update_hop_data(self, data=None, **kwargs): diff --git a/hivemind_bus_client/protocol.py b/hivemind_bus_client/protocol.py index 505fe68..2e05b42 100644 --- a/hivemind_bus_client/protocol.py +++ b/hivemind_bus_client/protocol.py @@ -213,7 +213,8 @@ def handle_bus(self, message: HiveMessage): SessionManager.update(sess) # from this point on, it should be a native source and execute audio - pload.context["source"] = pload.context.pop("destination") + if "destination" in pload.context: + pload.context["source"] = pload.context.pop("destination") self.internal_protocol.bus.emit(pload) def handle_broadcast(self, message: HiveMessage): diff --git a/hivemind_bus_client/serialization.py b/hivemind_bus_client/serialization.py index bfd857c..729b753 100644 --- a/hivemind_bus_client/serialization.py +++ b/hivemind_bus_client/serialization.py @@ -1,26 +1,17 @@ +import json import sys -from enum import IntEnum from inspect import signature from bitstring import BitArray, BitStream from hivemind_bus_client.exceptions import UnsupportedProtocolVersion -from hivemind_bus_client.message import HiveMessageType, HiveMessage +from hivemind_bus_client.message import HiveMessageType, HiveMessage, HiveMindBinaryPayloadType from hivemind_bus_client.util import compress_payload, decompress_payload, cast2bytes, bytes2str PROTOCOL_VERSION = 1 # integer, a version increase signals new functionality added # version 0 is the original hivemind protocol, 1 supports handshake + binary -class HiveMindBinaryPayloadType(IntEnum): - """ Pseudo extension type for binary payloads - it doesnt describe the payload but rather provides instruction to hivemind about how to handle it""" - UNDEFINED = 0 # no info provided about binary contents - RAW_AUDIO = 1 # binary content is raw audio (TODO spec exactly what "raw audio" means) - NUMPY_IMAGE = 2 # binary content is an image as a numpy array, eg. webcam picture - FILE = 3 # binary is a file to be saved, additional metadata provided elsewhere - - _INT2TYPE = {0: HiveMessageType.HANDSHAKE, 1: HiveMessageType.BUS, 2: HiveMessageType.SHARED_BUS, @@ -115,8 +106,7 @@ def _decode_bitstring_v1(s): meta = s.read(metalen) # TODO standardize hivemind meta - meta = bytes2str(meta.bytes, compressed) - kwargs = {a: meta[a] for a in signature(HiveMessage).parameters if a in meta} + meta = json.loads(bytes2str(meta.bytes, compressed)) is_bin = hive_type == HiveMessageType.BINARY bin_type = HiveMindBinaryPayloadType.UNDEFINED @@ -132,6 +122,7 @@ def _decode_bitstring_v1(s): payload = payload.bytes meta["bin_type"] = bin_type + kwargs = {a: meta[a] for a in signature(HiveMessage).parameters if a in meta} return HiveMessage(hive_type, payload, **kwargs)