diff --git a/hivemind_bus_client/client.py b/hivemind_bus_client/client.py index 40a5ee1..18b7393 100644 --- a/hivemind_bus_client/client.py +++ b/hivemind_bus_client/client.py @@ -446,11 +446,9 @@ def wait_for_payload_response(self, message: Union[MycroftMessage, HiveMessage], return waiter.wait(timeout) # targeted messages for nodes, assymetric encryption - def emit_encrypted(self, message: MycroftMessage, pubkey: Union[str, pgpy.PGPKey]): - message = self.encrypt(message, pubkey) - self.emit(message) + def emit_intercom(self, message: Union[MycroftMessage, HiveMessage], + pubkey: Union[str, pgpy.PGPKey]): - def encrypt(self, message: MycroftMessage, pubkey: Union[str, pgpy.PGPKey]): if isinstance(pubkey, str): pubkey, _ = pgpy.PGPKey.from_blob(pubkey) assert isinstance(pubkey, pgpy.PGPKey) @@ -467,16 +465,4 @@ def encrypt(self, message: MycroftMessage, pubkey: Union[str, pgpy.PGPKey]): encrypted_message |= private_key.sign(encrypted_message, intended_recipients=[pubkey]) - return MycroftMessage("hive.identity_encrypted", - {"ciphertext": str(encrypted_message)}) - - def decrypt(self, message: MycroftMessage): - assert message.msg_type == "hive.identity_encrypted" - ciphertext = message.data["ciphertext"] - message_from_blob = pgpy.PGPMessage.from_blob(ciphertext) - - with open(self.identity.private_key, "r") as f: - private_key = pgpy.PGPKey.from_blob(f.read()) - - decrypted: str = private_key.decrypt(message_from_blob) - return MycroftMessage.deserialize(json.loads(decrypted)) + self.emit(HiveMessage(HiveMessageType.INTERCOM, payload={"ciphertext": str(encrypted_message)})) diff --git a/hivemind_bus_client/message.py b/hivemind_bus_client/message.py index 9d0e298..66201cb 100644 --- a/hivemind_bus_client/message.py +++ b/hivemind_bus_client/message.py @@ -1,23 +1,28 @@ -from enum import Enum import json -from ovos_utils.json_helper import merge_dict +from enum import Enum + from ovos_bus_client import Message +from ovos_utils.json_helper import merge_dict +from typing import Union, List, Optional class HiveMessageType(str, Enum): HANDSHAKE = "shake" # negotiate initial connection BUS = "bus" # request meant for internal mycroft-bus in master SHARED_BUS = "shared_bus" # passive sharing of message - # from mycroft-bus in slave + # from mycroft-bus in slave + + INTERCOM = "intercom" # from satellite to satellite + BROADCAST = "broadcast" # forward message to all slaves PROPAGATE = "propagate" # forward message to all slaves and masters ESCALATE = "escalate" # forward message up the authority chain to all - # masters + # masters HELLO = "hello" # like escalate, used to announce the device QUERY = "query" # like escalate, but stops once one of the nodes can - # send a response + # send a response CASCADE = "cascade" # like propagate, but expects a response back from - # all nodes in the hive (responses optional) + # all nodes in the hive (responses optional) PING = "ping" # like cascade, but used to map the network RENDEZVOUS = "rendezvous" # reserved for rendezvous-nodes THIRDPRTY = "3rdparty" # user land message, do whatever you want @@ -25,23 +30,29 @@ class HiveMessageType(str, Enum): class HiveMessage: - def __init__(self, msg_type, payload=None, node=None, source_peer=None, - route=None, target_peers=None, meta=None, target_site_id=None): + def __init__(self, msg_type: Union[HiveMessageType, str], + payload: Optional[Union[Message, 'HiveMessage', str, dict]] =None, + node: Optional[str]=None, + source_peer: Optional[str]=None, + route: Optional[List[str]]=None, + target_peers: Optional[List[str]]=None, + target_site_id: Optional[str] =None, + target_pubkey: Optional[str] =None): # except for the hivemind node classes receiving the message and # creating the object nothing should be able to change these values # node classes might change them a runtime by the private attribute # but end-users should consider them read_only + + if msg_type not in [m.value for m in HiveMessageType]: raise ValueError("Unknown HiveMessage.msg_type") - self._msg_type = msg_type + # the payload is more or less a free for all # the msg_type determines what happens to the message, but the # payload can simply be ignored by the receiving module - - # some msg_types might return HiveMessage, others (mycroft) Message - # we should support the dict/json format, json is used at the - # transport layer before converting into any of these formats + # we store things in dict/json format, json is always used at the + # transport layer before converting into any of the other formats if isinstance(payload, Message): payload = {"type": payload.msg_type, "data": payload.data, @@ -51,40 +62,44 @@ def __init__(self, msg_type, payload=None, node=None, source_peer=None, self._payload = payload or {} self._site_id = target_site_id + self._target_pubkey = target_pubkey self._node = node # node semi-unique identifier self._source_peer = source_peer # peer_id self._route = route or [] # where did this message come from self._targets = target_peers or [] # where will it be sent - self._meta = meta or {} @property - def target_site_id(self): + def target_site_id(self) -> str: return self._site_id @property - def msg_type(self): + def target_public_key(self) -> str: + return self._target_pubkey + + @property + def msg_type(self) -> str: return self._msg_type @property - def node_id(self): + def node_id(self) -> str: return self._node @property - def source_peer(self): + def source_peer(self) -> str: return self._source_peer @property - def target_peers(self): + def target_peers(self) -> List[str]: if self.source_peer: return self._targets or [self._source_peer] return self._targets @property - def route(self): + def route(self) -> List[str]: return [r for r in self._route if r.get("targets") and r.get("source")] @property - def payload(self): + def payload(self) -> Union['HiveMessage', Message, dict]: if self.msg_type in [HiveMessageType.BUS, HiveMessageType.SHARED_BUS]: return Message(self._payload["type"], data=self._payload.get("data"), @@ -97,37 +112,42 @@ def payload(self): return self._payload @property - def as_dict(self): + def as_dict(self) -> dict: pload = self._payload if isinstance(pload, HiveMessage): - pload = pload.as_json + pload = pload.as_dict elif isinstance(pload, Message): pload = pload.serialize() if isinstance(pload, str): pload = json.loads(pload) + + assert isinstance(pload, dict) + return {"msg_type": self.msg_type, "payload": pload, "route": self.route, "node": self.node_id, "target_site_id": self.target_site_id, + "target_pubkey": self.target_public_key, "source_peer": self.source_peer} @property - def as_json(self): + def as_json(self) -> str: return json.dumps(self.as_dict) - def serialize(self): + def serialize(self) -> str: return self.as_json @staticmethod - def deserialize(payload): + def deserialize(payload: Union[str, dict]) -> 'HiveMessage': if isinstance(payload, str): payload = json.loads(payload) if "msg_type" in payload: try: return HiveMessage(payload["msg_type"], payload["payload"], - target_site_id=payload.get("target_site_id")) + target_site_id=payload.get("target_site_id"), + target_pubkey=payload.get("target_pubkey")) except: pass # not a hivemind message @@ -136,12 +156,14 @@ def deserialize(payload): # NOTE: technically could also be SHARED_BUS or THIRDPRTY return HiveMessage(HiveMessageType.BUS, Message.deserialize(payload), - target_site_id=payload.get("target_site_id")) + target_site_id=payload.get("target_site_id"), + target_pubkey=payload.get("target_pubkey")) except: pass # not a mycroft message return HiveMessage(HiveMessageType.THIRDPRTY, payload, - target_site_id=payload.get("target_site_id")) + target_site_id=payload.get("target_site_id"), + target_pubkey=payload.get("target_pubkey")) def __getitem__(self, item): return self._payload.get(item) diff --git a/hivemind_bus_client/protocol.py b/hivemind_bus_client/protocol.py index e04203b..9d20ff8 100644 --- a/hivemind_bus_client/protocol.py +++ b/hivemind_bus_client/protocol.py @@ -1,15 +1,16 @@ from dataclasses import dataclass -from typing import Optional from ovos_bus_client import Message as MycroftMessage from ovos_bus_client import MessageBusClient from ovos_bus_client.message import Message from ovos_bus_client.session import Session, SessionManager from ovos_utils.log import LOG -from hivemind_bus_client.identity import NodeIdentity +from poorman_handshake import HandShake, PasswordHandShake +from typing import Optional +import pgpy from hivemind_bus_client.client import HiveMessageBusClient +from hivemind_bus_client.identity import NodeIdentity from hivemind_bus_client.message import HiveMessage, HiveMessageType -from poorman_handshake import HandShake, PasswordHandShake @dataclass() @@ -104,6 +105,7 @@ def bind(self, bus: Optional[MessageBusClient] = None): self.hm.on(HiveMessageType.HELLO, self.handle_hello) self.hm.on(HiveMessageType.BROADCAST, self.handle_broadcast) self.hm.on(HiveMessageType.PROPAGATE, self.handle_propagate) + self.hm.on(HiveMessageType.INTERCOM, self.handle_intercom) self.hm.on(HiveMessageType.ESCALATE, self.handle_illegal_msg) self.hm.on(HiveMessageType.SHARED_BUS, self.handle_illegal_msg) self.hm.on(HiveMessageType.BUS, self.handle_bus) @@ -136,7 +138,7 @@ def handle_hello(self, message: HiveMessage): self.internal_protocol.bus.session_id = message.payload["session_id"] LOG.debug("session_id updated to: " + message.payload["session_id"]) - def start_handshake(self,): + def start_handshake(self): if self.binarize: LOG.info("hivemind supports binarization protocol") else: @@ -216,14 +218,15 @@ def handle_bus(self, message: HiveMessage): def handle_broadcast(self, message: HiveMessage): LOG.info(f"BROADCAST: {message.payload}") - # if the message targets our site_id, send it to internal bus - site = message.target_site_id - if site and site == self.site_id: - pload = message.payload - # broadcast messages always come from a trusted source - # only masters can emit them - if isinstance(pload, MycroftMessage): - self.handle_bus(message) + if message.payload.msg_type == HiveMessageType.INTERCOM: + self.handle_intercom(message) + return + + if message.payload.msg_type == HiveMessageType.BUS: + # if the message targets our site_id, send it to internal bus + site = message.target_site_id + if site and site == self.site_id: + self.handle_bus(message.payload) # if this device is also a hivemind server # forward to HiveMindListenerInternalProtocol @@ -234,18 +237,58 @@ def handle_broadcast(self, message: HiveMessage): def handle_propagate(self, message: HiveMessage): LOG.info(f"PROPAGATE: {message.payload}") - # if the message targets our site_id, send it to internal bus - site = message.target_site_id - if site and site == self.site_id: - # might originate from untrusted - # satellite anywhere in the hive - # do not inject by default - pload = message.payload - #if isinstance(pload, MycroftMessage): - # self.handle_bus(message) + if message.payload.msg_type == HiveMessageType.INTERCOM: + self.handle_intercom(message) + return + + if message.payload.msg_type == HiveMessageType.BUS: + # if the message targets our site_id, send it to internal bus + site = message.target_site_id + if site and site == self.site_id: + # might originate from untrusted + # satellite anywhere in the hive + # do not inject by default + pass # TODO - when to inject ? add list of trusted peers? + # self.handle_bus(message.payload) + # if this device is also a hivemind server # forward to HiveMindListenerInternalProtocol data = message.serialize() ctxt = {"source": self.node_id} self.internal_protocol.bus.emit(MycroftMessage('hive.send.downstream', data, ctxt)) + + + def handle_intercom(self, message: HiveMessage): + LOG.info(f"INTERCOM: {message.payload}") + + # if the message targets our site_id, send it to internal bus + k = message.target_public_key + if k and k != self.hm.identity.public_key: + # not for us + return + + pload = message.payload + if isinstance(pload, dict) and "ciphertext" in pload: + try: + message_from_blob = pgpy.PGPMessage.from_blob(pload["ciphertext"]) + + with open(self.identity.private_key, "r") as f: + private_key = pgpy.PGPKey.from_blob(f.read()) + + decrypted: str = private_key.decrypt(message_from_blob) + message._payload = HiveMessage.deserialize(decrypted) + except: + if k: + LOG.error("failed to decrypt message!") + raise + LOG.debug("failed to decrypt message, not for us") + return + + if message.msg_type == HiveMessageType.BUS: + self.handle_bus(message) + elif message.msg_type == HiveMessageType.PROPAGATE: + self.handle_propagate(message) + elif message.msg_type == HiveMessageType.BROADCAST: + self.handle_broadcast(message) +