Skip to content

Commit

Permalink
msg_type/intercom
Browse files Browse the repository at this point in the history
  • Loading branch information
JarbasAl committed May 30, 2024
1 parent 6285742 commit 3271e54
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 67 deletions.
20 changes: 3 additions & 17 deletions hivemind_bus_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -446,11 +446,9 @@ def wait_for_payload_response(self, message: Union[MycroftMessage, HiveMessage],
return waiter.wait(timeout)

# targeted messages for nodes, assymetric encryption
def emit_encrypted(self, message: MycroftMessage, pubkey: Union[str, pgpy.PGPKey]):
message = self.encrypt(message, pubkey)
self.emit(message)
def emit_intercom(self, message: Union[MycroftMessage, HiveMessage],
pubkey: Union[str, pgpy.PGPKey]):

def encrypt(self, message: MycroftMessage, pubkey: Union[str, pgpy.PGPKey]):
if isinstance(pubkey, str):
pubkey, _ = pgpy.PGPKey.from_blob(pubkey)
assert isinstance(pubkey, pgpy.PGPKey)
Expand All @@ -467,16 +465,4 @@ def encrypt(self, message: MycroftMessage, pubkey: Union[str, pgpy.PGPKey]):
encrypted_message |= private_key.sign(encrypted_message,
intended_recipients=[pubkey])

return MycroftMessage("hive.identity_encrypted",
{"ciphertext": str(encrypted_message)})

def decrypt(self, message: MycroftMessage):
assert message.msg_type == "hive.identity_encrypted"
ciphertext = message.data["ciphertext"]
message_from_blob = pgpy.PGPMessage.from_blob(ciphertext)

with open(self.identity.private_key, "r") as f:
private_key = pgpy.PGPKey.from_blob(f.read())

decrypted: str = private_key.decrypt(message_from_blob)
return MycroftMessage.deserialize(json.loads(decrypted))
self.emit(HiveMessage(HiveMessageType.INTERCOM, payload={"ciphertext": str(encrypted_message)}))
80 changes: 51 additions & 29 deletions hivemind_bus_client/message.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,58 @@
from enum import Enum
import json
from ovos_utils.json_helper import merge_dict
from enum import Enum

from ovos_bus_client import Message
from ovos_utils.json_helper import merge_dict
from typing import Union, List, Optional


class HiveMessageType(str, Enum):
HANDSHAKE = "shake" # negotiate initial connection
BUS = "bus" # request meant for internal mycroft-bus in master
SHARED_BUS = "shared_bus" # passive sharing of message
# from mycroft-bus in slave
# from mycroft-bus in slave

INTERCOM = "intercom" # from satellite to satellite

BROADCAST = "broadcast" # forward message to all slaves
PROPAGATE = "propagate" # forward message to all slaves and masters
ESCALATE = "escalate" # forward message up the authority chain to all
# masters
# masters
HELLO = "hello" # like escalate, used to announce the device
QUERY = "query" # like escalate, but stops once one of the nodes can
# send a response
# send a response
CASCADE = "cascade" # like propagate, but expects a response back from
# all nodes in the hive (responses optional)
# all nodes in the hive (responses optional)
PING = "ping" # like cascade, but used to map the network
RENDEZVOUS = "rendezvous" # reserved for rendezvous-nodes
THIRDPRTY = "3rdparty" # user land message, do whatever you want
BINARY = "bin" # binary data container, payload for something else


class HiveMessage:
def __init__(self, msg_type, payload=None, node=None, source_peer=None,
route=None, target_peers=None, meta=None, target_site_id=None):
def __init__(self, msg_type: Union[HiveMessageType, str],
payload: Optional[Union[Message, 'HiveMessage', str, dict]] =None,
node: Optional[str]=None,
source_peer: Optional[str]=None,
route: Optional[List[str]]=None,
target_peers: Optional[List[str]]=None,
target_site_id: Optional[str] =None,
target_pubkey: Optional[str] =None):
# except for the hivemind node classes receiving the message and
# creating the object nothing should be able to change these values
# node classes might change them a runtime by the private attribute
# but end-users should consider them read_only


if msg_type not in [m.value for m in HiveMessageType]:
raise ValueError("Unknown HiveMessage.msg_type")

self._msg_type = msg_type

# the payload is more or less a free for all
# the msg_type determines what happens to the message, but the
# payload can simply be ignored by the receiving module

# some msg_types might return HiveMessage, others (mycroft) Message
# we should support the dict/json format, json is used at the
# transport layer before converting into any of these formats
# we store things in dict/json format, json is always used at the
# transport layer before converting into any of the other formats
if isinstance(payload, Message):
payload = {"type": payload.msg_type,
"data": payload.data,
Expand All @@ -51,40 +62,44 @@ def __init__(self, msg_type, payload=None, node=None, source_peer=None,
self._payload = payload or {}

self._site_id = target_site_id
self._target_pubkey = target_pubkey
self._node = node # node semi-unique identifier
self._source_peer = source_peer # peer_id
self._route = route or [] # where did this message come from
self._targets = target_peers or [] # where will it be sent
self._meta = meta or {}

@property
def target_site_id(self):
def target_site_id(self) -> str:
return self._site_id

@property
def msg_type(self):
def target_public_key(self) -> str:
return self._target_pubkey

@property
def msg_type(self) -> str:
return self._msg_type

@property
def node_id(self):
def node_id(self) -> str:
return self._node

@property
def source_peer(self):
def source_peer(self) -> str:
return self._source_peer

@property
def target_peers(self):
def target_peers(self) -> List[str]:
if self.source_peer:
return self._targets or [self._source_peer]
return self._targets

@property
def route(self):
def route(self) -> List[str]:
return [r for r in self._route if r.get("targets") and r.get("source")]

@property
def payload(self):
def payload(self) -> Union['HiveMessage', Message, dict]:
if self.msg_type in [HiveMessageType.BUS, HiveMessageType.SHARED_BUS]:
return Message(self._payload["type"],
data=self._payload.get("data"),
Expand All @@ -97,37 +112,42 @@ def payload(self):
return self._payload

@property
def as_dict(self):
def as_dict(self) -> dict:
pload = self._payload
if isinstance(pload, HiveMessage):
pload = pload.as_json
pload = pload.as_dict
elif isinstance(pload, Message):
pload = pload.serialize()
if isinstance(pload, str):
pload = json.loads(pload)

assert isinstance(pload, dict)

return {"msg_type": self.msg_type,
"payload": pload,
"route": self.route,
"node": self.node_id,
"target_site_id": self.target_site_id,
"target_pubkey": self.target_public_key,
"source_peer": self.source_peer}

@property
def as_json(self):
def as_json(self) -> str:
return json.dumps(self.as_dict)

def serialize(self):
def serialize(self) -> str:
return self.as_json

@staticmethod
def deserialize(payload):
def deserialize(payload: Union[str, dict]) -> 'HiveMessage':
if isinstance(payload, str):
payload = json.loads(payload)

if "msg_type" in payload:
try:
return HiveMessage(payload["msg_type"], payload["payload"],
target_site_id=payload.get("target_site_id"))
target_site_id=payload.get("target_site_id"),
target_pubkey=payload.get("target_pubkey"))
except:
pass # not a hivemind message

Expand All @@ -136,12 +156,14 @@ def deserialize(payload):
# NOTE: technically could also be SHARED_BUS or THIRDPRTY
return HiveMessage(HiveMessageType.BUS,
Message.deserialize(payload),
target_site_id=payload.get("target_site_id"))
target_site_id=payload.get("target_site_id"),
target_pubkey=payload.get("target_pubkey"))
except:
pass # not a mycroft message

return HiveMessage(HiveMessageType.THIRDPRTY, payload,
target_site_id=payload.get("target_site_id"))
target_site_id=payload.get("target_site_id"),
target_pubkey=payload.get("target_pubkey"))

def __getitem__(self, item):
return self._payload.get(item)
Expand Down
85 changes: 64 additions & 21 deletions hivemind_bus_client/protocol.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
from dataclasses import dataclass
from typing import Optional

from ovos_bus_client import Message as MycroftMessage
from ovos_bus_client import MessageBusClient
from ovos_bus_client.message import Message
from ovos_bus_client.session import Session, SessionManager
from ovos_utils.log import LOG
from hivemind_bus_client.identity import NodeIdentity
from poorman_handshake import HandShake, PasswordHandShake
from typing import Optional
import pgpy
from hivemind_bus_client.client import HiveMessageBusClient
from hivemind_bus_client.identity import NodeIdentity
from hivemind_bus_client.message import HiveMessage, HiveMessageType
from poorman_handshake import HandShake, PasswordHandShake


@dataclass()
Expand Down Expand Up @@ -104,6 +105,7 @@ def bind(self, bus: Optional[MessageBusClient] = None):
self.hm.on(HiveMessageType.HELLO, self.handle_hello)
self.hm.on(HiveMessageType.BROADCAST, self.handle_broadcast)
self.hm.on(HiveMessageType.PROPAGATE, self.handle_propagate)
self.hm.on(HiveMessageType.INTERCOM, self.handle_intercom)
self.hm.on(HiveMessageType.ESCALATE, self.handle_illegal_msg)
self.hm.on(HiveMessageType.SHARED_BUS, self.handle_illegal_msg)
self.hm.on(HiveMessageType.BUS, self.handle_bus)
Expand Down Expand Up @@ -136,7 +138,7 @@ def handle_hello(self, message: HiveMessage):
self.internal_protocol.bus.session_id = message.payload["session_id"]
LOG.debug("session_id updated to: " + message.payload["session_id"])

def start_handshake(self,):
def start_handshake(self):
if self.binarize:
LOG.info("hivemind supports binarization protocol")
else:
Expand Down Expand Up @@ -216,14 +218,15 @@ def handle_bus(self, message: HiveMessage):
def handle_broadcast(self, message: HiveMessage):
LOG.info(f"BROADCAST: {message.payload}")

# if the message targets our site_id, send it to internal bus
site = message.target_site_id
if site and site == self.site_id:
pload = message.payload
# broadcast messages always come from a trusted source
# only masters can emit them
if isinstance(pload, MycroftMessage):
self.handle_bus(message)
if message.payload.msg_type == HiveMessageType.INTERCOM:
self.handle_intercom(message)
return

if message.payload.msg_type == HiveMessageType.BUS:
# if the message targets our site_id, send it to internal bus
site = message.target_site_id
if site and site == self.site_id:
self.handle_bus(message.payload)

# if this device is also a hivemind server
# forward to HiveMindListenerInternalProtocol
Expand All @@ -234,18 +237,58 @@ def handle_broadcast(self, message: HiveMessage):
def handle_propagate(self, message: HiveMessage):
LOG.info(f"PROPAGATE: {message.payload}")

# if the message targets our site_id, send it to internal bus
site = message.target_site_id
if site and site == self.site_id:
# might originate from untrusted
# satellite anywhere in the hive
# do not inject by default
pload = message.payload
#if isinstance(pload, MycroftMessage):
# self.handle_bus(message)
if message.payload.msg_type == HiveMessageType.INTERCOM:
self.handle_intercom(message)
return

if message.payload.msg_type == HiveMessageType.BUS:
# if the message targets our site_id, send it to internal bus
site = message.target_site_id
if site and site == self.site_id:
# might originate from untrusted
# satellite anywhere in the hive
# do not inject by default
pass # TODO - when to inject ? add list of trusted peers?
# self.handle_bus(message.payload)


# if this device is also a hivemind server
# forward to HiveMindListenerInternalProtocol
data = message.serialize()
ctxt = {"source": self.node_id}
self.internal_protocol.bus.emit(MycroftMessage('hive.send.downstream', data, ctxt))


def handle_intercom(self, message: HiveMessage):
LOG.info(f"INTERCOM: {message.payload}")

# if the message targets our site_id, send it to internal bus
k = message.target_public_key
if k and k != self.hm.identity.public_key:
# not for us
return

pload = message.payload
if isinstance(pload, dict) and "ciphertext" in pload:
try:
message_from_blob = pgpy.PGPMessage.from_blob(pload["ciphertext"])

with open(self.identity.private_key, "r") as f:
private_key = pgpy.PGPKey.from_blob(f.read())

decrypted: str = private_key.decrypt(message_from_blob)
message._payload = HiveMessage.deserialize(decrypted)
except:
if k:
LOG.error("failed to decrypt message!")
raise
LOG.debug("failed to decrypt message, not for us")
return

if message.msg_type == HiveMessageType.BUS:
self.handle_bus(message)
elif message.msg_type == HiveMessageType.PROPAGATE:
self.handle_propagate(message)
elif message.msg_type == HiveMessageType.BROADCAST:
self.handle_broadcast(message)

0 comments on commit 3271e54

Please sign in to comment.