diff --git a/hivemind_bus_client/client.py b/hivemind_bus_client/client.py index dbbf6b1..e8d1a6c 100644 --- a/hivemind_bus_client/client.py +++ b/hivemind_bus_client/client.py @@ -18,7 +18,8 @@ from hivemind_bus_client.serialization import HiveMindBinaryPayloadType from hivemind_bus_client.serialization import get_bitstring, decode_bitstring from hivemind_bus_client.util import serialize_message -from hivemind_bus_client.encryption import encrypt_as_json, decrypt_from_json, encrypt_bin, decrypt_bin, JsonCiphers, BinaryCiphers +from hivemind_bus_client.encryption import (encrypt_as_json, decrypt_from_json, encrypt_bin, decrypt_bin, + SupportedEncodings, SupportedCiphers) from poorman_handshake.asymmetric.utils import encrypt_RSA, load_RSA_key, sign_RSA @@ -104,8 +105,8 @@ def __init__(self, key: Optional[str] = None, internal_bus: Optional[OVOSBusClient] = None, bin_callbacks: BinaryDataCallbacks = BinaryDataCallbacks()): self.bin_callbacks = bin_callbacks - self.json_cipher = JsonCiphers.JSON_HEX_AES_GCM_128 # server defaults before it was made configurable - self.bin_cipher = BinaryCiphers.BINARY_AES_GCM_128 # server defaults before it was made configurable + self.json_encoding = SupportedEncodings.JSON_HEX # server defaults before it was made configurable + self.cipher = SupportedCiphers.AES_GCM # server defaults before it was made configurable self.identity = identity or None self._password = password @@ -273,11 +274,12 @@ def on_message(self, *args): if self.crypto_key: # handle binary encryption if isinstance(message, bytes): - message = decrypt_bin(self.crypto_key, message, cipher=self.bin_cipher) + message = decrypt_bin(self.crypto_key, message, cipher=self.cipher) # handle json encryption elif "ciphertext" in message: # LOG.debug(f"got encrypted message: {len(message)}") - message = decrypt_from_json(self.crypto_key, message, cipher=self.json_cipher) + message = decrypt_from_json(self.crypto_key, message, + cipher=self.cipher, encoding=self.json_encoding) else: LOG.debug("Message was unencrypted") @@ -369,14 +371,15 @@ def emit(self, message: Union[MycroftMessage, HiveMessage], binary_type=binary_type, hivemeta=message.metadata) if self.crypto_key: - ws_payload = encrypt_bin(self.crypto_key, bitstr.bytes, cipher=self.bin_cipher) + ws_payload = encrypt_bin(self.crypto_key, bitstr.bytes, cipher=self.cipher) else: ws_payload = bitstr.bytes self.client.send(ws_payload, ABNF.OPCODE_BINARY) else: ws_payload = serialize_message(message) if self.crypto_key: - ws_payload = encrypt_as_json(self.crypto_key, ws_payload, cipher=self.json_cipher) + ws_payload = encrypt_as_json(self.crypto_key, ws_payload, + cipher=self.cipher, encoding=self.json_encoding) self.client.send(ws_payload) except WebSocketConnectionClosedException: diff --git a/hivemind_bus_client/encryption.py b/hivemind_bus_client/encryption.py index 3c803b4..49fdad0 100644 --- a/hivemind_bus_client/encryption.py +++ b/hivemind_bus_client/encryption.py @@ -1,7 +1,7 @@ import enum import json from binascii import hexlify, unhexlify -from typing import Union, Optional, Dict, Any, Tuple +from typing import Union, Optional, Dict, Any, Literal, List import pybase64 from Cryptodome.Cipher import AES, ChaCha20_Poly1305 @@ -10,9 +10,9 @@ from hivemind_bus_client.exceptions import EncryptionKeyError, DecryptionKeyError, InvalidCipher, InvalidKeySize # Cipher-specific constants -AES_GCM_KEY_SIZE = 16 -AES_GCM_NONCE_SIZE = 16 -AES_GCM_TAG_SIZE = 16 +AES_KEY_SIZES = [16, 24, 32] # poorman_handshake generates 32 bit secrets +AES_NONCE_SIZE = 16 +AES_TAG_SIZE = 16 CHACHA20_KEY_SIZE = 32 CHACHA20_NONCE_SIZE = 12 CHACHA20_TAG_SIZE = 16 @@ -28,17 +28,17 @@ def cpu_supports_AES() -> bool: return "aes" in get_cpu_info()["flags"] -class JsonCiphers(str, enum.Enum): +class SupportedEncodings(str, enum.Enum): """ - Enum representing JSON-based encryption ciphers. + Enum representing JSON-based encryption encodings. + + Ciphers output binary data, json needs to transmit that data as plaintext """ - JSON_B64_AES_GCM_128 = "JSON-B64-AES-GCM-128" # JSON text output with Base64 encoding - JSON_HEX_AES_GCM_128 = "JSON-HEX-AES-GCM-128" # JSON text output with Hex encoding - JSON_B64_CHACHA20_POLY1305 = "JSON-B64-CHACHA20-POLY1305" # JSON text output with Base64 encoding - JSON_HEX_CHACHA20_POLY1305 = "JSON-HEX-CHACHA20-POLY1305" # JSON text output with Hex encoding + JSON_B64 = "JSON-B64" # JSON text output with Base64 encoding + JSON_HEX = "JSON-HEX" # JSON text output with Hex encoding (LEGACY SUPPORT) -class BinaryCiphers(str, enum.Enum): +class SupportedCiphers(str, enum.Enum): """ Enum representing binary encryption ciphers. @@ -47,19 +47,31 @@ class BinaryCiphers(str, enum.Enum): - GCM - http://csrc.nist.gov/publications/nistpubs/800-38D/SP-800-38D.pdf - CHACHA20-POLY1305 - https://datatracker.ietf.org/doc/html/rfc7539 """ - BINARY_AES_GCM_128 = "BINARY-AES-GCM-128" # Binary output - BINARY_CHACHA20_POLY1305 = "BINARY-CHACHA20-POLY1305" # specified in RFC7539. + AES_GCM = "AES-GCM" + AES_CCM = "AES-CCM" + CHACHA20_POLY1305 = "CHACHA20-POLY1305" # specified in RFC7539. + + +AES_CIPHERS = {c for c in SupportedCiphers if "AES" in c} +BLOCK_CIPHERS = AES_CIPHERS # Blowfish etc can be added in the future + + +def optimal_ciphers() -> List[SupportedCiphers]: + if not cpu_supports_AES(): + return [SupportedCiphers.CHACHA20_POLY1305, SupportedCiphers.AES_CCM, SupportedCiphers.AES_GCM] + return [SupportedCiphers.AES_GCM, SupportedCiphers.AES_CCM, SupportedCiphers.CHACHA20_POLY1305] def encrypt_as_json(key: Union[str, bytes], data: Union[str, Dict[str, Any]], - cipher: JsonCiphers = JsonCiphers.JSON_B64_AES_GCM_128) -> str: + cipher: SupportedCiphers = SupportedCiphers.AES_GCM, + encoding: SupportedEncodings = SupportedEncodings.JSON_B64) -> str: """ Encrypts the given data and outputs it as a JSON string. Args: key (Union[str, bytes]): The encryption key, up to 16 bytes. Longer keys will be truncated. data (Union[str, Dict[str, Any]]): The data to encrypt. If a dictionary, it will be serialized to JSON. - cipher (JsonCiphers): The encryption cipher. Supported options: + cipher (SupportedEncodings): The encryption cipher. Supported options: - JSON-B64-AES-GCM-128: Outputs Base64-encoded JSON. - JSON-HEX-AES-GCM-128: Outputs Hex-encoded JSON. @@ -69,35 +81,32 @@ def encrypt_as_json(key: Union[str, bytes], data: Union[str, Dict[str, Any]], Raises: InvalidCipher: If an unsupported cipher is provided. """ - if cipher not in JsonCiphers: - raise InvalidCipher(f"Invalid JSON cipher: {str(cipher)}") + if cipher not in SupportedCiphers: + raise InvalidCipher(f"Invalid cipher: {str(cipher)}") + if encoding not in SupportedEncodings: + raise InvalidCipher(f"Invalid JSON encoding: {str(encoding)}") if isinstance(data, dict): data = json.dumps(data) - aes_ciphers = {JsonCiphers.JSON_B64_AES_GCM_128, JsonCiphers.JSON_HEX_AES_GCM_128} - b64_ciphers = {JsonCiphers.JSON_B64_AES_GCM_128, JsonCiphers.JSON_B64_CHACHA20_POLY1305} - - bcipher = BinaryCiphers.BINARY_AES_GCM_128 if cipher in aes_ciphers else BinaryCiphers.BINARY_CHACHA20_POLY1305 - try: - ciphertext = encrypt_bin(key, data, cipher=bcipher) + ciphertext = encrypt_bin(key=key, plaintext=data, cipher=cipher) except InvalidKeySize as e: raise e except Exception as e: raise EncryptionKeyError from e # extract nonce/tag depending on cipher, sizes are different - if cipher in aes_ciphers: - nonce, ciphertext, tag = (ciphertext[:AES_GCM_NONCE_SIZE], - ciphertext[AES_GCM_NONCE_SIZE:-AES_GCM_TAG_SIZE], - ciphertext[-AES_GCM_TAG_SIZE:]) + if cipher in AES_CIPHERS: + nonce, ciphertext, tag = (ciphertext[:AES_NONCE_SIZE], + ciphertext[AES_NONCE_SIZE:-AES_TAG_SIZE], + ciphertext[-AES_TAG_SIZE:]) else: nonce, ciphertext, tag = (ciphertext[:CHACHA20_NONCE_SIZE], ciphertext[CHACHA20_NONCE_SIZE:-CHACHA20_TAG_SIZE], ciphertext[-CHACHA20_TAG_SIZE:]) - encoder = pybase64.b64encode if cipher in b64_ciphers else hexlify + encoder = pybase64.b64encode if encoding == SupportedEncodings.JSON_B64 else hexlify return json.dumps({ "ciphertext": encoder(ciphertext).decode('utf-8'), @@ -106,14 +115,16 @@ def encrypt_as_json(key: Union[str, bytes], data: Union[str, Dict[str, Any]], }) -def decrypt_from_json(key: Union[str, bytes], data: Union[str, bytes], cipher: JsonCiphers) -> str: +def decrypt_from_json(key: Union[str, bytes], data: Union[str, bytes], + cipher: SupportedCiphers = SupportedCiphers.AES_GCM, + encoding: SupportedEncodings = SupportedEncodings.JSON_B64) -> str: """ Decrypts data from a JSON string. Args: key (Union[str, bytes]): The decryption key, up to 16 bytes. Longer keys will be truncated. data (Union[str, bytes]): The encrypted data as a JSON string or bytes. - cipher (JsonCiphers): The cipher used for encryption. + cipher (SupportedEncodings): The cipher used for encryption. Returns: str: The decrypted plaintext data. @@ -122,28 +133,30 @@ def decrypt_from_json(key: Union[str, bytes], data: Union[str, bytes], cipher: J InvalidCipher: If an unsupported cipher is provided. DecryptionKeyError: If decryption fails due to an invalid key or corrupted data. """ - aes_ciphers = {JsonCiphers.JSON_B64_AES_GCM_128, JsonCiphers.JSON_HEX_AES_GCM_128} - b64_ciphers = {JsonCiphers.JSON_B64_AES_GCM_128, JsonCiphers.JSON_B64_CHACHA20_POLY1305} + if cipher not in SupportedCiphers: + raise InvalidCipher(f"Invalid cipher: {str(cipher)}") + if encoding not in SupportedEncodings: + raise InvalidCipher(f"Invalid JSON encoding: {str(encoding)}") if isinstance(data, str): data = json.loads(data) - decoder = pybase64.b64decode if cipher in b64_ciphers else unhexlify - bcipher = BinaryCiphers.BINARY_AES_GCM_128 if cipher in aes_ciphers else BinaryCiphers.BINARY_CHACHA20_POLY1305 + decoder = pybase64.b64decode if encoding == SupportedEncodings.JSON_B64 else unhexlify ciphertext = decoder(data["ciphertext"]) if "tag" not in data: # web crypto compatibility - if bcipher == BinaryCiphers.BINARY_AES_GCM_128: - ciphertext, tag = ciphertext[:-AES_GCM_TAG_SIZE], ciphertext[-AES_GCM_TAG_SIZE:] + if cipher in AES_CIPHERS: + ciphertext, tag = ciphertext[:-AES_TAG_SIZE], ciphertext[-AES_TAG_SIZE:] else: ciphertext, tag = ciphertext[:-CHACHA20_TAG_SIZE], ciphertext[-CHACHA20_TAG_SIZE:] else: tag = decoder(data["tag"]) nonce = decoder(data["nonce"]) - decryptor = decrypt_AES_GCM_128 if bcipher == BinaryCiphers.BINARY_AES_GCM_128 else decrypt_ChaCha20_Poly1305 try: - plaintext = decryptor(key, ciphertext, tag, nonce) + plaintext = decrypt_bin(key=key, + ciphertext=nonce + ciphertext + tag, + cipher=cipher) return plaintext.decode("utf-8") except InvalidKeySize as e: raise e @@ -151,16 +164,17 @@ def decrypt_from_json(key: Union[str, bytes], data: Union[str, bytes], cipher: J raise DecryptionKeyError from e -def encrypt_bin(key: Union[str, bytes], data: Union[str, bytes], cipher: BinaryCiphers) -> bytes: +def encrypt_bin(key: Union[str, bytes], plaintext: Union[str, bytes], cipher: SupportedCiphers) -> bytes: """ Encrypts the given data and returns it as binary. Args: key (Union[str, bytes]): The encryption key, up to 16 bytes. Longer keys will be truncated. - data (Union[str, bytes]): The data to encrypt. Strings will be encoded as UTF-8. - cipher (BinaryCiphers): The encryption cipher. Supported options: - - BINARY_AES_GCM_128: AES-GCM with 128-bit key - - BINARY_CHACHA20_POLY1305: ChaCha20-Poly1305 with 256-bit key + plaintext (Union[str, bytes]): The data to encrypt. Strings will be encoded as UTF-8. + cipher (SupportedCiphers): The encryption cipher. Supported options: + - AES_GCM: AES-GCM with 128-bit/256-bit key + - AES_CCM: AES-GCM with 128-bit/256-bit key + - CHACHA20_POLY1305: ChaCha20-Poly1305 with 256-bit key Returns: bytes: The encrypted data, including the nonce and tag. @@ -169,12 +183,22 @@ def encrypt_bin(key: Union[str, bytes], data: Union[str, bytes], cipher: BinaryC InvalidCipher: If an unsupported cipher is provided. EncryptionKeyError: If encryption fails. """ - if cipher not in BinaryCiphers: - raise InvalidCipher(f"Invalid binary cipher: {str(cipher)}") + if cipher not in SupportedCiphers: + raise InvalidCipher(f"Invalid cipher: {str(cipher)}") + + encryptor = encrypt_AES if cipher in AES_CIPHERS else encrypt_ChaCha20_Poly1305 - encryptor = encrypt_AES_GCM_128 if cipher == BinaryCiphers.BINARY_AES_GCM_128 else encrypt_ChaCha20_Poly1305 try: - ciphertext, tag, nonce = encryptor(key, data) + if cipher in BLOCK_CIPHERS: + if cipher == SupportedCiphers.AES_GCM: + mode = AES.MODE_GCM + elif cipher == SupportedCiphers.AES_CCM: + mode = AES.MODE_CCM + else: + raise ValueError("invalid block cipher mode") + ciphertext, tag, nonce = encryptor(key, plaintext, mode=mode) + else: + ciphertext, tag, nonce = encryptor(key, plaintext) except InvalidKeySize as e: raise e except Exception as e: @@ -183,14 +207,14 @@ def encrypt_bin(key: Union[str, bytes], data: Union[str, bytes], cipher: BinaryC return nonce + ciphertext + tag -def decrypt_bin(key: Union[str, bytes], ciphertext: bytes, cipher: BinaryCiphers) -> bytes: +def decrypt_bin(key: Union[str, bytes], ciphertext: bytes, cipher: SupportedCiphers) -> bytes: """ Decrypts binary data. Args: key (Union[str, bytes]): The decryption key, up to 16 bytes. Longer keys will be truncated. ciphertext (bytes): The binary encrypted data. Must include nonce and tag. - cipher (BinaryCiphers): The cipher used for encryption. Only BINARY_AES_GCM_128 is supported. + cipher (SupportedCiphers): The cipher used for encryption. Returns: bytes: The decrypted plaintext data. @@ -199,21 +223,29 @@ def decrypt_bin(key: Union[str, bytes], ciphertext: bytes, cipher: BinaryCiphers InvalidCipher: If an unsupported cipher is provided. DecryptionKeyError: If decryption fails due to an invalid key or corrupted data. """ - if cipher not in BinaryCiphers: - raise InvalidCipher(f"Invalid binary cipher: {str(cipher)}") + if cipher not in SupportedCiphers: + raise InvalidCipher(f"Invalid cipher: {str(cipher)}") # extract nonce/tag depending on cipher, sizes are different - if cipher == BinaryCiphers.BINARY_AES_GCM_128: - nonce, ciphertext, tag = (ciphertext[:AES_GCM_NONCE_SIZE], - ciphertext[AES_GCM_NONCE_SIZE:-AES_GCM_TAG_SIZE], - ciphertext[-AES_GCM_TAG_SIZE:]) + if cipher in AES_CIPHERS: + nonce, ciphertext, tag = (ciphertext[:AES_NONCE_SIZE], + ciphertext[AES_NONCE_SIZE:-AES_TAG_SIZE], + ciphertext[-AES_TAG_SIZE:]) else: nonce, ciphertext, tag = (ciphertext[:CHACHA20_NONCE_SIZE], ciphertext[CHACHA20_NONCE_SIZE:-CHACHA20_TAG_SIZE], ciphertext[-CHACHA20_TAG_SIZE:]) - decryptor = decrypt_AES_GCM_128 if cipher == BinaryCiphers.BINARY_AES_GCM_128 else decrypt_ChaCha20_Poly1305 + decryptor = decrypt_AES_128 if cipher in AES_CIPHERS else decrypt_ChaCha20_Poly1305 try: + if cipher in BLOCK_CIPHERS: + if cipher == SupportedCiphers.AES_GCM: + mode = AES.MODE_GCM + elif cipher == SupportedCiphers.AES_CCM: + mode = AES.MODE_CCM + else: + raise ValueError("invalid block cipher mode") + return decryptor(key, ciphertext, tag, nonce, mode=mode) return decryptor(key, ciphertext, tag, nonce) except InvalidKeySize as e: raise e @@ -223,8 +255,9 @@ def decrypt_bin(key: Union[str, bytes], ciphertext: bytes, cipher: BinaryCiphers ############################# # Cipher Implementations -def encrypt_AES_GCM_128(key: Union[str, bytes], text: Union[str, bytes], - nonce: Optional[bytes] = None) -> tuple[bytes, bytes, bytes]: +def encrypt_AES(key: Union[str, bytes], text: Union[str, bytes], + nonce: Optional[bytes] = None, + mode: Literal[AES.MODE_GCM, AES.MODE_CCM] = AES.MODE_GCM) -> tuple[bytes, bytes, bytes]: """ Encrypts plaintext using AES-GCM-128. @@ -240,14 +273,20 @@ def encrypt_AES_GCM_128(key: Union[str, bytes], text: Union[str, bytes], text = bytes(text, encoding="utf-8") if not isinstance(key, bytes): key = bytes(key, encoding="utf-8") - if len(key) != AES_GCM_KEY_SIZE: # AES-128 uses 128 bit/16 byte keys - raise InvalidKeySize("AES-GCM-128 requires a 16-byte key") - cipher = AES.new(key, AES.MODE_GCM, nonce=nonce) + # AES-128 uses 128 bit/16 byte keys + # AES-256 uses 256 bit/32 byte keys + if len(key) not in AES_KEY_SIZES: + raise InvalidKeySize("AES-GCM requires a 16/24/32 bytes key") + cipher = AES.new(key, mode, nonce=nonce) ciphertext, tag = cipher.encrypt_and_digest(text) return ciphertext, tag, cipher.nonce -def decrypt_AES_GCM_128(key: Union[str, bytes], ciphertext: bytes, tag: bytes, nonce: bytes) -> bytes: +def decrypt_AES_128(key: Union[str, bytes], + ciphertext: bytes, + tag: bytes, + nonce: bytes, + mode: Literal[AES.MODE_GCM, AES.MODE_CCM] = AES.MODE_GCM) -> bytes: """ Decrypts ciphertext encrypted using AES-GCM-128. @@ -266,9 +305,11 @@ def decrypt_AES_GCM_128(key: Union[str, bytes], ciphertext: bytes, tag: bytes, n """ if isinstance(key, str): key = key.encode("utf-8") - if len(key) != AES_GCM_KEY_SIZE: # AES-128 uses 128 bit/16 byte keys - raise InvalidKeySize("AES-GCM-128 requires a 16-byte key") - cipher = AES.new(key, AES.MODE_GCM, nonce) + # AES-128 uses 128 bit/16 byte keys + # AES-256 uses 256 bit/32 byte keys + if len(key) not in AES_KEY_SIZES: + raise InvalidKeySize("AES-GCM requires a 16/24/32 bytes key") + cipher = AES.new(key, mode, nonce) return cipher.decrypt_and_verify(ciphertext, tag) @@ -336,7 +377,7 @@ def decrypt_ChaCha20_Poly1305(key: Union[str, bytes], if __name__ == "__main__": from Cryptodome.Random import get_random_bytes - print("JSON-B64-AES-GCM-128" == JsonCiphers.JSON_B64_AES_GCM_128) + print("JSON-B64" == SupportedEncodings.JSON_B64) key = get_random_bytes(CHACHA20_KEY_SIZE) plaintext = b'Attack at dawn' diff --git a/hivemind_bus_client/protocol.py b/hivemind_bus_client/protocol.py index d60bfdc..0d28314 100644 --- a/hivemind_bus_client/protocol.py +++ b/hivemind_bus_client/protocol.py @@ -9,7 +9,7 @@ from ovos_utils.log import LOG from hivemind_bus_client.client import HiveMessageBusClient -from hivemind_bus_client.encryption import JsonCiphers, BinaryCiphers, cpu_supports_AES +from hivemind_bus_client.encryption import SupportedEncodings, SupportedCiphers from hivemind_bus_client.identity import NodeIdentity from hivemind_bus_client.message import HiveMessage, HiveMessageType from poorman_handshake import HandShake, PasswordHandShake @@ -119,17 +119,6 @@ def node_id(self): # this is how ovos-core bus refers to this slave's master return self.internal_protocol.node_id - @property - def optimal_ciphers(self) -> Tuple[JsonCiphers, BinaryCiphers]: - if not cpu_supports_AES(): - j = JsonCiphers.JSON_B64_CHACHA20_POLY1305 - b = BinaryCiphers.BINARY_CHACHA20_POLY1305 - else: - j = JsonCiphers.JSON_B64_AES_GCM_128 - b = BinaryCiphers.BINARY_AES_GCM_128 - return j, b - - # TODO - handshake handlers # hivemind events def handle_illegal_msg(self, message: HiveMessage): # this should not happen, @@ -155,8 +144,8 @@ def start_handshake(self): LOG.info("hivemind does not support binarization protocol") payload = {"binarize": self.binarize, - "json_ciphers": list(c for c in JsonCiphers), - "binary_ciphers": list(c for c in BinaryCiphers)} + "encodings": list(c for c in SupportedEncodings), + "ciphers": list(c for c in SupportedCiphers)} if self.pswd_handshake is not None: payload["envelope"] = self.pswd_handshake.generate_handshake() else: @@ -194,10 +183,12 @@ def handle_handshake(self, message: HiveMessage): # master is performing the handshake if "envelope" in message.payload: envelope = message.payload["envelope"] - self.hm.json_cipher = message.payload.get("json_cipher") or JsonCiphers.JSON_HEX_AES_GCM_128 - self.hm.bin_cipher = message.payload.get("binary_cipher") or BinaryCiphers.BINARY_AES_GCM_128 - LOG.debug(f"Cipher to use: {self.hm.json_cipher} + {self.hm.bin_cipher}") + self.hm.json_encoding = message.payload.get("json_cipher") or SupportedEncodings.JSON_HEX + self.hm.cipher = message.payload.get("binary_cipher") or SupportedCiphers.AES_GCM self.receive_handshake(envelope) + LOG.debug(f"Encoding: {self.hm.json_encoding}") + LOG.debug(f"Cipher: {self.hm.cipher}") + LOG.debug(f"Key size: {len(self.pswd_handshake.secret) * 8}bit") # master is requesting handshake start else: @@ -212,7 +203,7 @@ def handle_handshake(self, message: HiveMessage): # TODO - flag to give preference to pre-shared key over handshake self.binarize = message.payload.get("binarize", False) - # TODO - flag to give preference to / require password or not + # TODO - flag to give preference to / require password or use RSA handshake # currently if password is set then it is always used if message.payload.get("password") and self.identity.password: self.pswd_handshake = PasswordHandShake(self.identity.password) diff --git a/hivemind_bus_client/util.py b/hivemind_bus_client/util.py index ff0cd26..c41540d 100644 --- a/hivemind_bus_client/util.py +++ b/hivemind_bus_client/util.py @@ -2,6 +2,7 @@ import zlib from typing import Union, Dict +from hivemind_bus_client.encryption import SupportedEncodings, SupportedCiphers from hivemind_bus_client.message import HiveMessage, HiveMessageType, Message @@ -140,9 +141,9 @@ def encrypt_as_json(key, data, b64=False) -> str: DeprecationWarning, stacklevel=2 ) - from hivemind_bus_client.encryption import encrypt_as_json as _ej, JsonCiphers - c = JsonCiphers.JSON_B64_AES_GCM_128 if b64 else JsonCiphers.JSON_HEX_AES_GCM_128 - return _ej(key, data, c) + from hivemind_bus_client.encryption import encrypt_as_json as _ej + c = SupportedEncodings.JSON_B64 if b64 else SupportedEncodings.JSON_HEX + return _ej(key, data, encoding=c, cipher=SupportedCiphers.AES_GCM) def decrypt_from_json(key, data: Union[str, bytes]): @@ -152,12 +153,12 @@ def decrypt_from_json(key, data: Union[str, bytes]): DeprecationWarning, stacklevel=2 ) - from hivemind_bus_client.encryption import decrypt_from_json as _dj, JsonCiphers + from hivemind_bus_client.encryption import decrypt_from_json as _dj try: - return _dj(key, data, JsonCiphers.JSON_HEX_AES_GCM_128) + return _dj(key, data, encoding=SupportedEncodings.JSON_HEX, cipher=SupportedCiphers.AES_GCM) except Exception as e: try: - return _dj(key, data, JsonCiphers.JSON_B64_AES_GCM_128) + return _dj(key, data, encoding=SupportedEncodings.JSON_B64, cipher=SupportedCiphers.AES_GCM) except: raise e @@ -169,8 +170,8 @@ def encrypt_bin(key, data: Union[str, bytes]): DeprecationWarning, stacklevel=2 ) - from hivemind_bus_client.encryption import encrypt_bin as _eb, BinaryCiphers - return _eb(key, data, BinaryCiphers.BINARY_AES_GCM_128) + from hivemind_bus_client.encryption import encrypt_bin as _eb + return _eb(key, data, cipher=SupportedCiphers.AES_GCM) def decrypt_bin(key, ciphertext: bytes): @@ -180,8 +181,8 @@ def decrypt_bin(key, ciphertext: bytes): DeprecationWarning, stacklevel=2 ) - from hivemind_bus_client.encryption import decrypt_bin as _db, BinaryCiphers - return _db(key, ciphertext, BinaryCiphers.BINARY_AES_GCM_128) + from hivemind_bus_client.encryption import decrypt_bin as _db + return _db(key, ciphertext, SupportedCiphers.AES_GCM) if __name__ == "__main__":