Skip to content

Commit

Permalink
feat/INTERCOM msg type (#27)
Browse files Browse the repository at this point in the history
* feat/node_privacy

allow messages to be encrypted with a node public PGP key, so messages can be transported without the hive being able to read them

public keys are part of the NodeIdentity

* fix initial keygen

* reset-pgp command

* k

* typo

* msg_type/intercom

* .

* feat/intercom

* hotfix/message_kwarg

* fix serialze
  • Loading branch information
JarbasAl authored Jun 5, 2024
1 parent 05bc798 commit f48c515
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 55 deletions.
23 changes: 23 additions & 0 deletions hivemind_bus_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from pyee import EventEmitter
from websocket import ABNF
from websocket import WebSocketApp, WebSocketConnectionClosedException
import pgpy

from hivemind_bus_client.identity import NodeIdentity
from hivemind_bus_client.message import HiveMessage, HiveMessageType
Expand Down Expand Up @@ -436,3 +437,25 @@ def wait_for_payload_response(self, message: Union[MycroftMessage, HiveMessage],
# Send message and wait for it's response
self.emit(message)
return waiter.wait(timeout)

# targeted messages for nodes, assymetric encryption
def emit_intercom(self, message: Union[MycroftMessage, HiveMessage],
pubkey: Union[str, pgpy.PGPKey]):

if isinstance(pubkey, str):
pubkey, _ = pgpy.PGPKey.from_blob(pubkey)
assert isinstance(pubkey, pgpy.PGPKey)

txt = message.serialize()

text_message = pgpy.PGPMessage.new(txt)
encrypted_message = pubkey.encrypt(text_message)

# sign message
with open(self.identity.private_key, "r") as f:
private_key = pgpy.PGPKey.from_blob(f.read())
# the bitwise OR operator '|' is used to add a signature to a PGPMessage.
encrypted_message |= private_key.sign(encrypted_message,
intended_recipients=[pubkey])

self.emit(HiveMessage(HiveMessageType.INTERCOM, payload={"ciphertext": str(encrypted_message)}))
22 changes: 20 additions & 2 deletions hivemind_bus_client/identity.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from os.path import basename, dirname
from poorman_handshake.asymmetric.utils import export_private_key, create_private_key

from json_database import JsonConfigXDG

Expand All @@ -21,16 +22,25 @@ def name(self):
def name(self, val):
self.IDENTITY_FILE["name"] = val

@property
def public_key(self):
"""ASCI public PGP key"""
return self.IDENTITY_FILE.get("public_key")

@public_key.setter
def public_key(self, val):
self.IDENTITY_FILE["public_key"] = val

@property
def private_key(self):
"""path to PRIVATE .asc PGP key, this cryptographic key
uniquely identifies this device across the hive and proves it's identity"""
return self.IDENTITY_FILE.get("key") or \
return self.IDENTITY_FILE.get("secret_key") or \
f"{dirname(self.IDENTITY_FILE.path)}/{self.name}.asc"

@private_key.setter
def private_key(self, val):
self.IDENTITY_FILE["key"] = val
self.IDENTITY_FILE["secret_key"] = val

@property
def password(self):
Expand Down Expand Up @@ -81,3 +91,11 @@ def save(self):

def reload(self):
self.IDENTITY_FILE.reload()

def create_keys(self):
key = create_private_key("HiveMindComs")
priv = f"{dirname(self.IDENTITY_FILE.path)}/HiveMindComs.asc"
export_private_key(priv, key)
pub = str(key.pubkey)
self.private_key = priv
self.public_key = pub
80 changes: 51 additions & 29 deletions hivemind_bus_client/message.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,58 @@
from enum import Enum
import json
from ovos_utils.json_helper import merge_dict
from enum import Enum

from ovos_bus_client import Message
from ovos_utils.json_helper import merge_dict
from typing import Union, List, Optional


class HiveMessageType(str, Enum):
HANDSHAKE = "shake" # negotiate initial connection
BUS = "bus" # request meant for internal mycroft-bus in master
SHARED_BUS = "shared_bus" # passive sharing of message
# from mycroft-bus in slave
# from mycroft-bus in slave

INTERCOM = "intercom" # from satellite to satellite

BROADCAST = "broadcast" # forward message to all slaves
PROPAGATE = "propagate" # forward message to all slaves and masters
ESCALATE = "escalate" # forward message up the authority chain to all
# masters
# masters
HELLO = "hello" # like escalate, used to announce the device
QUERY = "query" # like escalate, but stops once one of the nodes can
# send a response
# send a response
CASCADE = "cascade" # like propagate, but expects a response back from
# all nodes in the hive (responses optional)
# all nodes in the hive (responses optional)
PING = "ping" # like cascade, but used to map the network
RENDEZVOUS = "rendezvous" # reserved for rendezvous-nodes
THIRDPRTY = "3rdparty" # user land message, do whatever you want
BINARY = "bin" # binary data container, payload for something else


class HiveMessage:
def __init__(self, msg_type, payload=None, node=None, source_peer=None,
route=None, target_peers=None, meta=None, target_site_id=None):
def __init__(self, msg_type: Union[HiveMessageType, str],
payload: Optional[Union[Message, 'HiveMessage', str, dict]] =None,
node: Optional[str]=None,
source_peer: Optional[str]=None,
route: Optional[List[str]]=None,
target_peers: Optional[List[str]]=None,
target_site_id: Optional[str] =None,
target_pubkey: Optional[str] =None):
# except for the hivemind node classes receiving the message and
# creating the object nothing should be able to change these values
# node classes might change them a runtime by the private attribute
# but end-users should consider them read_only


if msg_type not in [m.value for m in HiveMessageType]:
raise ValueError("Unknown HiveMessage.msg_type")

self._msg_type = msg_type

# the payload is more or less a free for all
# the msg_type determines what happens to the message, but the
# payload can simply be ignored by the receiving module

# some msg_types might return HiveMessage, others (mycroft) Message
# we should support the dict/json format, json is used at the
# transport layer before converting into any of these formats
# we store things in dict/json format, json is always used at the
# transport layer before converting into any of the other formats
if isinstance(payload, Message):
payload = {"type": payload.msg_type,
"data": payload.data,
Expand All @@ -51,40 +62,44 @@ def __init__(self, msg_type, payload=None, node=None, source_peer=None,
self._payload = payload or {}

self._site_id = target_site_id
self._target_pubkey = target_pubkey
self._node = node # node semi-unique identifier
self._source_peer = source_peer # peer_id
self._route = route or [] # where did this message come from
self._targets = target_peers or [] # where will it be sent
self._meta = meta or {}

@property
def target_site_id(self):
def target_site_id(self) -> str:
return self._site_id

@property
def msg_type(self):
def target_public_key(self) -> str:
return self._target_pubkey

@property
def msg_type(self) -> str:
return self._msg_type

@property
def node_id(self):
def node_id(self) -> str:
return self._node

@property
def source_peer(self):
def source_peer(self) -> str:
return self._source_peer

@property
def target_peers(self):
def target_peers(self) -> List[str]:
if self.source_peer:
return self._targets or [self._source_peer]
return self._targets

@property
def route(self):
def route(self) -> List[str]:
return [r for r in self._route if r.get("targets") and r.get("source")]

@property
def payload(self):
def payload(self) -> Union['HiveMessage', Message, dict]:
if self.msg_type in [HiveMessageType.BUS, HiveMessageType.SHARED_BUS]:
return Message(self._payload["type"],
data=self._payload.get("data"),
Expand All @@ -97,37 +112,42 @@ def payload(self):
return self._payload

@property
def as_dict(self):
def as_dict(self) -> dict:
pload = self._payload
if isinstance(pload, HiveMessage):
pload = pload.as_json
pload = pload.as_dict
elif isinstance(pload, Message):
pload = pload.serialize()
if isinstance(pload, str):
pload = json.loads(pload)

assert isinstance(pload, dict)

return {"msg_type": self.msg_type,
"payload": pload,
"route": self.route,
"node": self.node_id,
"target_site_id": self.target_site_id,
"target_pubkey": self.target_public_key,
"source_peer": self.source_peer}

@property
def as_json(self):
def as_json(self) -> str:
return json.dumps(self.as_dict)

def serialize(self):
def serialize(self) -> str:
return self.as_json

@staticmethod
def deserialize(payload):
def deserialize(payload: Union[str, dict]) -> 'HiveMessage':
if isinstance(payload, str):
payload = json.loads(payload)

if "msg_type" in payload:
try:
return HiveMessage(payload["msg_type"], payload["payload"],
target_site_id=payload.get("target_site_id"))
target_site_id=payload.get("target_site_id"),
target_pubkey=payload.get("target_pubkey"))
except:
pass # not a hivemind message

Expand All @@ -136,12 +156,14 @@ def deserialize(payload):
# NOTE: technically could also be SHARED_BUS or THIRDPRTY
return HiveMessage(HiveMessageType.BUS,
Message.deserialize(payload),
target_site_id=payload.get("target_site_id"))
target_site_id=payload.get("target_site_id"),
target_pubkey=payload.get("target_pubkey"))
except:
pass # not a mycroft message

return HiveMessage(HiveMessageType.THIRDPRTY, payload,
target_site_id=payload.get("target_site_id"))
target_site_id=payload.get("target_site_id"),
target_pubkey=payload.get("target_pubkey"))

def __getitem__(self, item):
return self._payload.get(item)
Expand Down
Loading

0 comments on commit f48c515

Please sign in to comment.