1
1
import enum
2
2
import json
3
3
from binascii import hexlify , unhexlify
4
- from typing import Union , Optional , Dict , Any
4
+ from typing import Union , Optional , Dict , Any , Tuple
5
5
6
6
import pybase64
7
7
from Cryptodome .Cipher import AES , ChaCha20_Poly1305
8
-
9
8
from cpuinfo import get_cpu_info
10
- from hivemind_bus_client .exceptions import EncryptionKeyError , DecryptionKeyError , InvalidCipher
9
+
10
+ from hivemind_bus_client .exceptions import EncryptionKeyError , DecryptionKeyError , InvalidCipher , InvalidKeySize
11
+
12
+ # Cipher-specific constants
13
+ AES_GCM_KEY_SIZE = 16
14
+ AES_GCM_NONCE_SIZE = 16
15
+ AES_GCM_TAG_SIZE = 16
16
+ CHACHA20_KEY_SIZE = 32
17
+ CHACHA20_NONCE_SIZE = 12
18
+ CHACHA20_TAG_SIZE = 16
11
19
12
20
13
21
def cpu_supports_AES () -> bool :
22
+ """
23
+ Check if the CPU supports AES encryption.
24
+
25
+ Returns:
26
+ bool: True if AES is supported by the CPU, False otherwise.
27
+ """
14
28
return "aes" in get_cpu_info ()["flags" ]
15
29
16
30
@@ -66,13 +80,22 @@ def encrypt_as_json(key: Union[str, bytes], data: Union[str, Dict[str, Any]],
66
80
67
81
bcipher = BinaryCiphers .BINARY_AES_GCM_128 if cipher in aes_ciphers else BinaryCiphers .BINARY_CHACHA20_POLY1305
68
82
69
- ciphertext = encrypt_bin (key , data , cipher = bcipher )
83
+ try :
84
+ ciphertext = encrypt_bin (key , data , cipher = bcipher )
85
+ except InvalidKeySize as e :
86
+ raise e
87
+ except Exception as e :
88
+ raise EncryptionKeyError from e
70
89
71
90
# extract nonce/tag depending on cipher, sizes are different
72
91
if cipher in aes_ciphers :
73
- nonce , ciphertext , tag = ciphertext [:16 ], ciphertext [16 :- 16 ], ciphertext [- 16 :]
92
+ nonce , ciphertext , tag = (ciphertext [:AES_GCM_NONCE_SIZE ],
93
+ ciphertext [AES_GCM_NONCE_SIZE :- AES_GCM_TAG_SIZE ],
94
+ ciphertext [- AES_GCM_TAG_SIZE :])
74
95
else :
75
- nonce , ciphertext , tag = ciphertext [:12 ], ciphertext [12 :- 16 ], ciphertext [- 16 :]
96
+ nonce , ciphertext , tag = (ciphertext [:CHACHA20_NONCE_SIZE ],
97
+ ciphertext [CHACHA20_NONCE_SIZE :- CHACHA20_TAG_SIZE ],
98
+ ciphertext [- CHACHA20_TAG_SIZE :])
76
99
77
100
encoder = pybase64 .b64encode if cipher in b64_ciphers else hexlify
78
101
@@ -90,7 +113,7 @@ def decrypt_from_json(key: Union[str, bytes], data: Union[str, bytes], cipher: J
90
113
Args:
91
114
key (Union[str, bytes]): The decryption key, up to 16 bytes. Longer keys will be truncated.
92
115
data (Union[str, bytes]): The encrypted data as a JSON string or bytes.
93
- cipher (Optional[ JsonCiphers] ): The cipher used for encryption. If None, it is auto-detected .
116
+ cipher (JsonCiphers): The cipher used for encryption.
94
117
95
118
Returns:
96
119
str: The decrypted plaintext data.
@@ -99,7 +122,6 @@ def decrypt_from_json(key: Union[str, bytes], data: Union[str, bytes], cipher: J
99
122
InvalidCipher: If an unsupported cipher is provided.
100
123
DecryptionKeyError: If decryption fails due to an invalid key or corrupted data.
101
124
"""
102
-
103
125
aes_ciphers = {JsonCiphers .JSON_B64_AES_GCM_128 , JsonCiphers .JSON_HEX_AES_GCM_128 }
104
126
b64_ciphers = {JsonCiphers .JSON_B64_AES_GCM_128 , JsonCiphers .JSON_B64_CHACHA20_POLY1305 }
105
127
@@ -111,7 +133,10 @@ def decrypt_from_json(key: Union[str, bytes], data: Union[str, bytes], cipher: J
111
133
112
134
ciphertext = decoder (data ["ciphertext" ])
113
135
if "tag" not in data : # web crypto compatibility
114
- ciphertext , tag = ciphertext [:- 16 ], ciphertext [- 16 :]
136
+ if bcipher == BinaryCiphers .BINARY_AES_GCM_128 :
137
+ ciphertext , tag = ciphertext [:- AES_GCM_TAG_SIZE ], ciphertext [- AES_GCM_TAG_SIZE :]
138
+ else :
139
+ ciphertext , tag = ciphertext [:- CHACHA20_TAG_SIZE ], ciphertext [- CHACHA20_TAG_SIZE :]
115
140
else :
116
141
tag = decoder (data ["tag" ])
117
142
nonce = decoder (data ["nonce" ])
@@ -120,7 +145,9 @@ def decrypt_from_json(key: Union[str, bytes], data: Union[str, bytes], cipher: J
120
145
try :
121
146
plaintext = decryptor (key , ciphertext , tag , nonce )
122
147
return plaintext .decode ("utf-8" )
123
- except ValueError as e :
148
+ except InvalidKeySize as e :
149
+ raise e
150
+ except Exception as e :
124
151
raise DecryptionKeyError from e
125
152
126
153
@@ -148,6 +175,8 @@ def encrypt_bin(key: Union[str, bytes], data: Union[str, bytes], cipher: BinaryC
148
175
encryptor = encrypt_AES_GCM_128 if cipher == BinaryCiphers .BINARY_AES_GCM_128 else encrypt_ChaCha20_Poly1305
149
176
try :
150
177
ciphertext , tag , nonce = encryptor (key , data )
178
+ except InvalidKeySize as e :
179
+ raise e
151
180
except Exception as e :
152
181
raise EncryptionKeyError from e
153
182
@@ -175,14 +204,20 @@ def decrypt_bin(key: Union[str, bytes], ciphertext: bytes, cipher: BinaryCiphers
175
204
176
205
# extract nonce/tag depending on cipher, sizes are different
177
206
if cipher == BinaryCiphers .BINARY_AES_GCM_128 :
178
- nonce , ciphertext , tag = ciphertext [:16 ], ciphertext [16 :- 16 ], ciphertext [- 16 :]
207
+ nonce , ciphertext , tag = (ciphertext [:AES_GCM_NONCE_SIZE ],
208
+ ciphertext [AES_GCM_NONCE_SIZE :- AES_GCM_TAG_SIZE ],
209
+ ciphertext [- AES_GCM_TAG_SIZE :])
179
210
else :
180
- nonce , ciphertext , tag = ciphertext [:12 ], ciphertext [12 :- 16 ], ciphertext [- 16 :]
211
+ nonce , ciphertext , tag = (ciphertext [:CHACHA20_NONCE_SIZE ],
212
+ ciphertext [CHACHA20_NONCE_SIZE :- CHACHA20_TAG_SIZE ],
213
+ ciphertext [- CHACHA20_TAG_SIZE :])
181
214
182
215
decryptor = decrypt_AES_GCM_128 if cipher == BinaryCiphers .BINARY_AES_GCM_128 else decrypt_ChaCha20_Poly1305
183
216
try :
184
217
return decryptor (key , ciphertext , tag , nonce )
185
- except ValueError as e :
218
+ except InvalidKeySize as e :
219
+ raise e
220
+ except Exception as e :
186
221
raise DecryptionKeyError from e
187
222
188
223
@@ -205,8 +240,8 @@ def encrypt_AES_GCM_128(key: Union[str, bytes], text: Union[str, bytes],
205
240
text = bytes (text , encoding = "utf-8" )
206
241
if not isinstance (key , bytes ):
207
242
key = bytes (key , encoding = "utf-8" )
208
- if len (key ) != 16 : # AES-128 uses 128 bit/16 byte keys
209
- raise ValueError ("AES-GCM-128 requires a 16-byte key" )
243
+ if len (key ) != AES_GCM_KEY_SIZE : # AES-128 uses 128 bit/16 byte keys
244
+ raise InvalidKeySize ("AES-GCM-128 requires a 16-byte key" )
210
245
cipher = AES .new (key , AES .MODE_GCM , nonce = nonce )
211
246
ciphertext , tag = cipher .encrypt_and_digest (text )
212
247
return ciphertext , tag , cipher .nonce
@@ -226,10 +261,13 @@ def decrypt_AES_GCM_128(key: Union[str, bytes], ciphertext: bytes, tag: bytes, n
226
261
str: The decrypted plaintext.
227
262
228
263
Raises:
264
+ InvalidKeySize: If key size is not valid
229
265
ValueError: If decryption or authentication fails.
230
266
"""
231
- if not isinstance (key , bytes ):
232
- key = bytes (key , encoding = "utf-8" )
267
+ if isinstance (key , str ):
268
+ key = key .encode ("utf-8" )
269
+ if len (key ) != AES_GCM_KEY_SIZE : # AES-128 uses 128 bit/16 byte keys
270
+ raise InvalidKeySize ("AES-GCM-128 requires a 16-byte key" )
233
271
cipher = AES .new (key , AES .MODE_GCM , nonce )
234
272
return cipher .decrypt_and_verify (ciphertext , tag )
235
273
@@ -248,13 +286,16 @@ def encrypt_ChaCha20_Poly1305(key: Union[str, bytes],
248
286
Returns:
249
287
tuple[bytes, bytes, bytes]: The ciphertext, authentication tag, and nonce.
250
288
"""
251
- if not isinstance (text , bytes ):
252
- text = bytes (text , encoding = "utf-8" )
253
- if not isinstance (key , bytes ):
254
- key = bytes (key , encoding = "utf-8" )
255
- assert len (key ) == 32 # ChaCha20 uses 256 bit/32 byte keys
289
+ if isinstance (text , str ):
290
+ text = text .encode ("utf-8" )
291
+ if isinstance (key , str ):
292
+ key = key .encode ("utf-8" )
293
+
294
+ if len (key ) != CHACHA20_KEY_SIZE : # ChaCha20 uses 256 bit/32 byte keys
295
+ raise InvalidKeySize ("CHACHA20-POLY1305 requires a 32-byte key" )
256
296
if nonce :
257
- assert len (nonce ) == 12 # 92bits/12bytes bytes per RFC7539
297
+ if len (nonce ) != CHACHA20_NONCE_SIZE : # 92bits/12bytes per RFC7539
298
+ raise InvalidKeySize ("CHACHA20-POLY1305 requires a 12-byte nonce per RFC7539" )
258
299
cipher = ChaCha20_Poly1305 .new (key = key , nonce = nonce )
259
300
ciphertext , tag = cipher .encrypt_and_digest (text )
260
301
return ciphertext , tag , cipher .nonce
@@ -277,14 +318,17 @@ def decrypt_ChaCha20_Poly1305(key: Union[str, bytes],
277
318
str: The decrypted plaintext.
278
319
279
320
Raises:
321
+ InvalidKeySize:
280
322
ValueError: If decryption or authentication fails.
281
323
"""
282
- if not isinstance (key , bytes ):
283
- key = bytes ( key , encoding = "utf-8" )
324
+ if isinstance (key , str ):
325
+ key = key . encode ( "utf-8" )
284
326
285
- assert len (key ) == 32 # ChaCha20 uses 256 bit/32 byte keys
327
+ if len (key ) != CHACHA20_KEY_SIZE : # ChaCha20 uses 256 bit/32 byte keys
328
+ raise InvalidKeySize ("CHACHA20-POLY1305 requires a 32-byte key" )
286
329
if nonce :
287
- assert len (nonce ) == 12 # 92bits/12bytes per RFC7539
330
+ if len (nonce ) != CHACHA20_NONCE_SIZE : # 92bits/12bytes per RFC7539
331
+ raise InvalidKeySize ("CHACHA20-POLY1305 requires a 12-byte nonce per RFC7539" )
288
332
cipher = ChaCha20_Poly1305 .new (key = key , nonce = nonce )
289
333
return cipher .decrypt_and_verify (ciphertext , tag )
290
334
@@ -294,7 +338,7 @@ def decrypt_ChaCha20_Poly1305(key: Union[str, bytes],
294
338
295
339
print ("JSON-B64-AES-GCM-128" == JsonCiphers .JSON_B64_AES_GCM_128 )
296
340
297
- key = get_random_bytes (32 )
341
+ key = get_random_bytes (CHACHA20_KEY_SIZE )
298
342
plaintext = b'Attack at dawn'
299
343
ciphertext , tag , nonce = encrypt_ChaCha20_Poly1305 (key , plaintext )
300
344
recovered = decrypt_ChaCha20_Poly1305 (key , ciphertext , tag , nonce )
0 commit comments