Skip to content

Commit eb4949c

Browse files
authored
fix:binary payloads (#30)
* fix:binary payloads * review
1 parent 2c325b3 commit eb4949c

File tree

4 files changed

+48
-26
lines changed

4 files changed

+48
-26
lines changed

hivemind_bus_client/client.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from websocket import ABNF
1313
from websocket import WebSocketApp, WebSocketConnectionClosedException
1414
import pgpy
15-
15+
from hivemind_bus_client.serialization import HiveMindBinaryPayloadType
1616
from hivemind_bus_client.identity import NodeIdentity
1717
from hivemind_bus_client.message import HiveMessage, HiveMessageType
1818
from hivemind_bus_client.serialization import get_bitstring, decode_bitstring
@@ -269,7 +269,8 @@ def on_message(self, *args):
269269
elif isinstance(message, str):
270270
message = json.loads(message)
271271
if "ciphertext" in message:
272-
raise RuntimeError("got encrypted message, but could not decrypt!")
272+
LOG.error("got encrypted message, but could not decrypt!")
273+
return
273274
self.emitter.emit('message', message) # raw message
274275
self._handle_hive_protocol(HiveMessage(**message))
275276

@@ -279,7 +280,8 @@ def _handle_hive_protocol(self, message: HiveMessage):
279280
self.internal_bus.emit(message.payload)
280281
self.emitter.emit(message.msg_type, message) # hive message
281282

282-
def emit(self, message: Union[MycroftMessage, HiveMessage]):
283+
def emit(self, message: Union[MycroftMessage, HiveMessage],
284+
binary_type: HiveMindBinaryPayloadType=HiveMindBinaryPayloadType.UNDEFINED):
283285
if isinstance(message, MycroftMessage):
284286
message = HiveMessage(msg_type=HiveMessageType.BUS,
285287
payload=message)
@@ -321,7 +323,8 @@ def emit(self, message: Union[MycroftMessage, HiveMessage]):
321323
if binarize:
322324
bitstr = get_bitstring(hive_type=message.msg_type,
323325
payload=message.payload,
324-
compressed=self.compress)
326+
compressed=self.compress,
327+
binary_type=binary_type)
325328
if self.crypto_key:
326329
ws_payload = encrypt_bin(self.crypto_key, bitstr.bytes)
327330
else:

hivemind_bus_client/message.py

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import json
2-
from enum import Enum
2+
from enum import Enum, IntEnum
33

44
from ovos_bus_client import Message
55
from ovos_utils.json_helper import merge_dict
@@ -29,31 +29,45 @@ class HiveMessageType(str, Enum):
2929
BINARY = "bin" # binary data container, payload for something else
3030

3131

32+
class HiveMindBinaryPayloadType(IntEnum):
33+
""" Pseudo extension type for binary payloads
34+
it doesnt describe the payload but rather provides instruction to hivemind about how to handle it"""
35+
UNDEFINED = 0 # no info provided about binary contents
36+
RAW_AUDIO = 1 # binary content is raw audio (TODO spec exactly what "raw audio" means)
37+
NUMPY_IMAGE = 2 # binary content is an image as a numpy array, eg. webcam picture
38+
FILE = 3 # binary is a file to be saved, additional metadata provided elsewhere
39+
40+
3241
class HiveMessage:
3342
def __init__(self, msg_type: Union[HiveMessageType, str],
34-
payload: Optional[Union[Message, 'HiveMessage', str, dict]] =None,
43+
payload: Optional[Union[Message, 'HiveMessage', str, dict, bytes]] =None,
3544
node: Optional[str]=None,
3645
source_peer: Optional[str]=None,
3746
route: Optional[List[str]]=None,
3847
target_peers: Optional[List[str]]=None,
3948
target_site_id: Optional[str] =None,
40-
target_pubkey: Optional[str] =None):
49+
target_pubkey: Optional[str] =None,
50+
bin_type: HiveMindBinaryPayloadType = HiveMindBinaryPayloadType.UNDEFINED):
4151
# except for the hivemind node classes receiving the message and
4252
# creating the object nothing should be able to change these values
4353
# node classes might change them a runtime by the private attribute
4454
# but end-users should consider them read_only
45-
46-
4755
if msg_type not in [m.value for m in HiveMessageType]:
4856
raise ValueError("Unknown HiveMessage.msg_type")
57+
if msg_type != HiveMessageType.BINARY and bin_type != HiveMindBinaryPayloadType.UNDEFINED:
58+
raise ValueError("bin_type can only be set for BINARY message type")
59+
4960
self._msg_type = msg_type
61+
self._bin_type = bin_type
5062

5163
# the payload is more or less a free for all
5264
# the msg_type determines what happens to the message, but the
5365
# payload can simply be ignored by the receiving module
5466
# we store things in dict/json format, json is always used at the
5567
# transport layer before converting into any of the other formats
56-
if isinstance(payload, Message):
68+
if not isinstance(payload, bytes) and msg_type == HiveMessageType.BINARY:
69+
raise ValueError(f"expected 'bytes' payload for HiveMessageType.BINARY, got {type(payload)}")
70+
elif isinstance(payload, Message):
5771
payload = {"type": payload.msg_type,
5872
"data": payload.data,
5973
"context": payload.context}
@@ -99,7 +113,7 @@ def route(self) -> List[str]:
99113
return [r for r in self._route if r.get("targets") and r.get("source")]
100114

101115
@property
102-
def payload(self) -> Union['HiveMessage', Message, dict]:
116+
def payload(self) -> Union['HiveMessage', Message, dict, bytes]:
103117
if self.msg_type in [HiveMessageType.BUS, HiveMessageType.SHARED_BUS]:
104118
return Message(self._payload["type"],
105119
data=self._payload.get("data"),
@@ -111,9 +125,15 @@ def payload(self) -> Union['HiveMessage', Message, dict]:
111125
return HiveMessage(**self._payload)
112126
return self._payload
113127

128+
@property
129+
def bin_type(self) -> HiveMindBinaryPayloadType:
130+
return self._bin_type
131+
114132
@property
115133
def as_dict(self) -> dict:
116134
pload = self._payload
135+
if self.msg_type == HiveMessageType.BINARY:
136+
raise ValueError("messages with type HiveMessageType.BINARY can not be cast to dict")
117137
if isinstance(pload, HiveMessage):
118138
pload = pload.as_dict
119139
elif isinstance(pload, Message):
@@ -166,12 +186,19 @@ def deserialize(payload: Union[str, dict]) -> 'HiveMessage':
166186
target_pubkey=payload.get("target_pubkey"))
167187

168188
def __getitem__(self, item):
189+
if not isinstance(self._payload, dict):
190+
raise TypeError(f"Item access not supported for payload type {type(self._payload)}")
169191
return self._payload.get(item)
170192

171193
def __setitem__(self, key, value):
172-
self._payload[key] = value
194+
if isinstance(self._payload, dict):
195+
self._payload[key] = value
196+
else:
197+
raise TypeError(f"Item assignment not supported for payload type {type(self._payload)}")
173198

174199
def __str__(self):
200+
if self.msg_type == HiveMessageType.BINARY:
201+
return f"HiveMessage(BINARY:{len(self._payload)}])"
175202
return self.as_json
176203

177204
def update_hop_data(self, data=None, **kwargs):

hivemind_bus_client/protocol.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,8 @@ def handle_bus(self, message: HiveMessage):
213213
SessionManager.update(sess)
214214

215215
# from this point on, it should be a native source and execute audio
216-
pload.context["source"] = pload.context.pop("destination")
216+
if "destination" in pload.context:
217+
pload.context["source"] = pload.context.pop("destination")
217218
self.internal_protocol.bus.emit(pload)
218219

219220
def handle_broadcast(self, message: HiveMessage):

hivemind_bus_client/serialization.py

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,17 @@
1+
import json
12
import sys
2-
from enum import IntEnum
33
from inspect import signature
44

55
from bitstring import BitArray, BitStream
66

77
from hivemind_bus_client.exceptions import UnsupportedProtocolVersion
8-
from hivemind_bus_client.message import HiveMessageType, HiveMessage
8+
from hivemind_bus_client.message import HiveMessageType, HiveMessage, HiveMindBinaryPayloadType
99
from hivemind_bus_client.util import compress_payload, decompress_payload, cast2bytes, bytes2str
1010

1111
PROTOCOL_VERSION = 1 # integer, a version increase signals new functionality added
1212
# version 0 is the original hivemind protocol, 1 supports handshake + binary
1313

1414

15-
class HiveMindBinaryPayloadType(IntEnum):
16-
""" Pseudo extension type for binary payloads
17-
it doesnt describe the payload but rather provides instruction to hivemind about how to handle it"""
18-
UNDEFINED = 0 # no info provided about binary contents
19-
RAW_AUDIO = 1 # binary content is raw audio (TODO spec exactly what "raw audio" means)
20-
NUMPY_IMAGE = 2 # binary content is an image as a numpy array, eg. webcam picture
21-
FILE = 3 # binary is a file to be saved, additional metadata provided elsewhere
22-
23-
2415
_INT2TYPE = {0: HiveMessageType.HANDSHAKE,
2516
1: HiveMessageType.BUS,
2617
2: HiveMessageType.SHARED_BUS,
@@ -115,8 +106,7 @@ def _decode_bitstring_v1(s):
115106
meta = s.read(metalen)
116107

117108
# TODO standardize hivemind meta
118-
meta = bytes2str(meta.bytes, compressed)
119-
kwargs = {a: meta[a] for a in signature(HiveMessage).parameters if a in meta}
109+
meta = json.loads(bytes2str(meta.bytes, compressed))
120110

121111
is_bin = hive_type == HiveMessageType.BINARY
122112
bin_type = HiveMindBinaryPayloadType.UNDEFINED
@@ -132,6 +122,7 @@ def _decode_bitstring_v1(s):
132122
payload = payload.bytes
133123
meta["bin_type"] = bin_type
134124

125+
kwargs = {a: meta[a] for a in signature(HiveMessage).parameters if a in meta}
135126
return HiveMessage(hive_type, payload, **kwargs)
136127

137128

0 commit comments

Comments
 (0)