|
3 | 3 |
|
4 | 4 | from cryptography.exceptions import InvalidSignature, InvalidTag |
5 | 5 | from cryptography.hazmat.backends import default_backend |
6 | | -from cryptography.hazmat.bindings.openssl.binding import Binding |
7 | 6 | from cryptography.hazmat.primitives import hashes, hmac, serialization |
8 | 7 | from cryptography.hazmat.primitives.asymmetric import ec, padding, rsa |
9 | 8 | from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature, encode_dss_signature |
|
16 | 15 |
|
17 | 16 | from ..constants import ALGORITHMS |
18 | 17 | from ..exceptions import JWEError, JWKError |
19 | | -from ..utils import base64_to_long, base64url_decode, base64url_encode, ensure_binary, long_to_base64 |
| 18 | +from ..utils import ( |
| 19 | + base64_to_long, |
| 20 | + base64url_decode, |
| 21 | + base64url_encode, |
| 22 | + ensure_binary, |
| 23 | + is_pem_format, |
| 24 | + is_ssh_key, |
| 25 | + long_to_base64, |
| 26 | +) |
| 27 | +from . import get_random_bytes |
20 | 28 | from .base import Key |
21 | 29 |
|
22 | 30 | _binding = None |
23 | 31 |
|
24 | 32 |
|
25 | | -def get_random_bytes(num_bytes): |
26 | | - """ |
27 | | - Get random bytes |
28 | | -
|
29 | | - Currently, Cryptography returns OS random bytes. If you want OpenSSL |
30 | | - generated random bytes, you'll have to switch the RAND engine after |
31 | | - initializing the OpenSSL backend |
32 | | - Args: |
33 | | - num_bytes (int): Number of random bytes to generate and return |
34 | | - Returns: |
35 | | - bytes: Random bytes |
36 | | - """ |
37 | | - global _binding |
38 | | - |
39 | | - if _binding is None: |
40 | | - _binding = Binding() |
41 | | - |
42 | | - buf = _binding.ffi.new("char[]", num_bytes) |
43 | | - _binding.lib.RAND_bytes(buf, num_bytes) |
44 | | - rand_bytes = _binding.ffi.buffer(buf, num_bytes)[:] |
45 | | - return rand_bytes |
46 | | - |
47 | | - |
48 | 33 | class CryptographyECKey(Key): |
49 | 34 | SHA256 = hashes.SHA256 |
50 | 35 | SHA384 = hashes.SHA384 |
@@ -439,6 +424,8 @@ class CryptographyAESKey(Key): |
439 | 424 | ALGORITHMS.A256KW: None, |
440 | 425 | } |
441 | 426 |
|
| 427 | + IV_BYTE_LENGTH_MODE_MAP = {"CBC": algorithms.AES.block_size // 8, "GCM": 96 // 8} |
| 428 | + |
442 | 429 | def __init__(self, key, algorithm): |
443 | 430 | if algorithm not in ALGORITHMS.AES: |
444 | 431 | raise JWKError("%s is not a valid AES algorithm" % algorithm) |
@@ -468,7 +455,8 @@ def to_dict(self): |
468 | 455 | def encrypt(self, plain_text, aad=None): |
469 | 456 | plain_text = ensure_binary(plain_text) |
470 | 457 | try: |
471 | | - iv = get_random_bytes(algorithms.AES.block_size // 8) |
| 458 | + iv_byte_length = self.IV_BYTE_LENGTH_MODE_MAP.get(self._mode.name, algorithms.AES.block_size) |
| 459 | + iv = get_random_bytes(iv_byte_length) |
472 | 460 | mode = self._mode(iv) |
473 | 461 | if mode.name == "GCM": |
474 | 462 | cipher = aead.AESGCM(self._key) |
@@ -552,14 +540,7 @@ def __init__(self, key, algorithm): |
552 | 540 | if isinstance(key, str): |
553 | 541 | key = key.encode("utf-8") |
554 | 542 |
|
555 | | - invalid_strings = [ |
556 | | - b"-----BEGIN PUBLIC KEY-----", |
557 | | - b"-----BEGIN RSA PUBLIC KEY-----", |
558 | | - b"-----BEGIN CERTIFICATE-----", |
559 | | - b"ssh-rsa", |
560 | | - ] |
561 | | - |
562 | | - if any(string_value in key for string_value in invalid_strings): |
| 543 | + if is_pem_format(key) or is_ssh_key(key): |
563 | 544 | raise JWKError( |
564 | 545 | "The specified key is an asymmetric key or x509 certificate and" |
565 | 546 | " should not be used as an HMAC secret." |
|
0 commit comments