Skip to content

Commit

Permalink
fix:binary payloads
Browse files Browse the repository at this point in the history
  • Loading branch information
JarbasAl committed Oct 24, 2024
1 parent 2c325b3 commit 947c0ff
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 26 deletions.
11 changes: 7 additions & 4 deletions hivemind_bus_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from websocket import ABNF
from websocket import WebSocketApp, WebSocketConnectionClosedException
import pgpy

from hivemind_bus_client.serialization import HiveMindBinaryPayloadType
from hivemind_bus_client.identity import NodeIdentity
from hivemind_bus_client.message import HiveMessage, HiveMessageType
from hivemind_bus_client.serialization import get_bitstring, decode_bitstring
Expand Down Expand Up @@ -269,7 +269,8 @@ def on_message(self, *args):
elif isinstance(message, str):
message = json.loads(message)
if "ciphertext" in message:
raise RuntimeError("got encrypted message, but could not decrypt!")
LOG.error("got encrypted message, but could not decrypt!")
return
self.emitter.emit('message', message) # raw message
self._handle_hive_protocol(HiveMessage(**message))

Expand All @@ -279,7 +280,8 @@ def _handle_hive_protocol(self, message: HiveMessage):
self.internal_bus.emit(message.payload)
self.emitter.emit(message.msg_type, message) # hive message

def emit(self, message: Union[MycroftMessage, HiveMessage]):
def emit(self, message: Union[MycroftMessage, HiveMessage],
binary_type: HiveMindBinaryPayloadType=HiveMindBinaryPayloadType.UNDEFINED):
if isinstance(message, MycroftMessage):
message = HiveMessage(msg_type=HiveMessageType.BUS,
payload=message)
Expand Down Expand Up @@ -321,7 +323,8 @@ def emit(self, message: Union[MycroftMessage, HiveMessage]):
if binarize:
bitstr = get_bitstring(hive_type=message.msg_type,
payload=message.payload,
compressed=self.compress)
compressed=self.compress,
binary_type=binary_type)
if self.crypto_key:
ws_payload = encrypt_bin(self.crypto_key, bitstr.bytes)
else:
Expand Down
36 changes: 28 additions & 8 deletions hivemind_bus_client/message.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import json
from enum import Enum
from enum import Enum, IntEnum

from ovos_bus_client import Message
from ovos_utils.json_helper import merge_dict
Expand Down Expand Up @@ -29,31 +29,44 @@ class HiveMessageType(str, Enum):
BINARY = "bin" # binary data container, payload for something else


class HiveMindBinaryPayloadType(IntEnum):
""" Pseudo extension type for binary payloads
it doesnt describe the payload but rather provides instruction to hivemind about how to handle it"""
UNDEFINED = 0 # no info provided about binary contents
RAW_AUDIO = 1 # binary content is raw audio (TODO spec exactly what "raw audio" means)
NUMPY_IMAGE = 2 # binary content is an image as a numpy array, eg. webcam picture
FILE = 3 # binary is a file to be saved, additional metadata provided elsewhere


class HiveMessage:
def __init__(self, msg_type: Union[HiveMessageType, str],
payload: Optional[Union[Message, 'HiveMessage', str, dict]] =None,
payload: Optional[Union[Message, 'HiveMessage', str, dict, bytes]] =None,
node: Optional[str]=None,
source_peer: Optional[str]=None,
route: Optional[List[str]]=None,
target_peers: Optional[List[str]]=None,
target_site_id: Optional[str] =None,
target_pubkey: Optional[str] =None):
target_pubkey: Optional[str] =None,
bin_type: HiveMindBinaryPayloadType = HiveMindBinaryPayloadType.UNDEFINED):
# except for the hivemind node classes receiving the message and
# creating the object nothing should be able to change these values
# node classes might change them a runtime by the private attribute
# but end-users should consider them read_only


if msg_type not in [m.value for m in HiveMessageType]:
raise ValueError("Unknown HiveMessage.msg_type")
self._msg_type = msg_type
self._bin_type = bin_type
if msg_type == HiveMessageType.BINARY:
self.__setattr__("bin_type", bin_type)

# the payload is more or less a free for all
# the msg_type determines what happens to the message, but the
# payload can simply be ignored by the receiving module
# we store things in dict/json format, json is always used at the
# transport layer before converting into any of the other formats
if isinstance(payload, Message):
if not isinstance(payload, bytes) and msg_type == HiveMessageType.BINARY:
raise ValueError(f"expected 'bytes' payload for HiveMessageType.BINARY, got {type(payload)}")
elif isinstance(payload, Message):
payload = {"type": payload.msg_type,
"data": payload.data,
"context": payload.context}
Expand Down Expand Up @@ -99,7 +112,7 @@ def route(self) -> List[str]:
return [r for r in self._route if r.get("targets") and r.get("source")]

@property
def payload(self) -> Union['HiveMessage', Message, dict]:
def payload(self) -> Union['HiveMessage', Message, dict, bytes]:
if self.msg_type in [HiveMessageType.BUS, HiveMessageType.SHARED_BUS]:
return Message(self._payload["type"],
data=self._payload.get("data"),
Expand All @@ -114,6 +127,8 @@ def payload(self) -> Union['HiveMessage', Message, dict]:
@property
def as_dict(self) -> dict:
pload = self._payload
if self.msg_type == HiveMessageType.BINARY:
raise ValueError("messages with type HiveMessageType.BINARY can not be cast to dict")
if isinstance(pload, HiveMessage):
pload = pload.as_dict
elif isinstance(pload, Message):
Expand Down Expand Up @@ -166,12 +181,17 @@ def deserialize(payload: Union[str, dict]) -> 'HiveMessage':
target_pubkey=payload.get("target_pubkey"))

def __getitem__(self, item):
if not isinstance(self._payload, dict):
return None
return self._payload.get(item)

def __setitem__(self, key, value):
self._payload[key] = value
if isinstance(self._payload, dict):
self._payload[key] = value

def __str__(self):
if self.msg_type == HiveMessageType.BINARY:
return f"HiveMessage(BINARY:{len(self._payload)}])"
return self.as_json

def update_hop_data(self, data=None, **kwargs):
Expand Down
3 changes: 2 additions & 1 deletion hivemind_bus_client/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ def handle_bus(self, message: HiveMessage):
SessionManager.update(sess)

# from this point on, it should be a native source and execute audio
pload.context["source"] = pload.context.pop("destination")
if "destination" in pload.context:
pload.context["source"] = pload.context.pop("destination")
self.internal_protocol.bus.emit(pload)

def handle_broadcast(self, message: HiveMessage):
Expand Down
17 changes: 4 additions & 13 deletions hivemind_bus_client/serialization.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,17 @@
import json
import sys
from enum import IntEnum
from inspect import signature

from bitstring import BitArray, BitStream

from hivemind_bus_client.exceptions import UnsupportedProtocolVersion
from hivemind_bus_client.message import HiveMessageType, HiveMessage
from hivemind_bus_client.message import HiveMessageType, HiveMessage, HiveMindBinaryPayloadType
from hivemind_bus_client.util import compress_payload, decompress_payload, cast2bytes, bytes2str

PROTOCOL_VERSION = 1 # integer, a version increase signals new functionality added
# version 0 is the original hivemind protocol, 1 supports handshake + binary


class HiveMindBinaryPayloadType(IntEnum):
""" Pseudo extension type for binary payloads
it doesnt describe the payload but rather provides instruction to hivemind about how to handle it"""
UNDEFINED = 0 # no info provided about binary contents
RAW_AUDIO = 1 # binary content is raw audio (TODO spec exactly what "raw audio" means)
NUMPY_IMAGE = 2 # binary content is an image as a numpy array, eg. webcam picture
FILE = 3 # binary is a file to be saved, additional metadata provided elsewhere


_INT2TYPE = {0: HiveMessageType.HANDSHAKE,
1: HiveMessageType.BUS,
2: HiveMessageType.SHARED_BUS,
Expand Down Expand Up @@ -115,8 +106,7 @@ def _decode_bitstring_v1(s):
meta = s.read(metalen)

# TODO standardize hivemind meta
meta = bytes2str(meta.bytes, compressed)
kwargs = {a: meta[a] for a in signature(HiveMessage).parameters if a in meta}
meta = json.loads(bytes2str(meta.bytes, compressed))

is_bin = hive_type == HiveMessageType.BINARY
bin_type = HiveMindBinaryPayloadType.UNDEFINED
Expand All @@ -132,6 +122,7 @@ def _decode_bitstring_v1(s):
payload = payload.bytes
meta["bin_type"] = bin_type

kwargs = {a: meta[a] for a in signature(HiveMessage).parameters if a in meta}
return HiveMessage(hive_type, payload, **kwargs)


Expand Down

0 comments on commit 947c0ff

Please sign in to comment.