-
Notifications
You must be signed in to change notification settings - Fork 17
/
Copy pathjwe_rsa.py
136 lines (110 loc) · 3.55 KB
/
jwe_rsa.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
import logging
import zlib
from ..utils import as_bytes
from . import SUPPORTED
from .exception import NotSupportedAlgorithm
from .exception import ParameterError
from .jwekey import JWEKey
from .jwenc import JWEnc
from .rsa import RSAEncrypter
logger = logging.getLogger(__name__)
__author__ = "Roland Hedberg"
class JWE_RSA(JWEKey):
args = [
"msg",
"alg",
"enc",
"epk",
"zip",
"jku",
"jwk",
"x5u",
"x5t",
"x5c",
"kid",
"typ",
"cty",
"apu",
"crit",
]
def encrypt(self, key, iv="", cek="", **kwargs):
"""
Produces a JWE as defined in RFC7516 using RSA algorithms
:param key: RSA key
:param iv: Initialization vector
:param cek: Content master key
:param kwargs: Extra keyword arguments
:return: A signed payload
"""
_msg = as_bytes(self.msg)
if "zip" in self:
if self["zip"] == "DEF":
_msg = zlib.compress(_msg)
else:
raise ParameterError("Zip has unknown value: %s" % self["zip"])
kwarg_cek = cek or None
_enc = self["enc"]
iv = self._generate_iv(_enc, iv)
cek = self._generate_key(_enc, cek)
self["cek"] = cek
logger.debug("cek: %s, iv: %s" % ([c for c in cek], [c for c in iv]))
_encrypt = RSAEncrypter(self.with_digest).encrypt
_alg = self["alg"]
if kwarg_cek:
jwe_enc_key = ""
elif _alg == "RSA-OAEP":
jwe_enc_key = _encrypt(cek, key, "pkcs1_oaep_padding")
elif _alg == "RSA-OAEP-256":
jwe_enc_key = _encrypt(cek, key, "pkcs1_oaep_256_padding")
elif _alg == "RSA1_5":
jwe_enc_key = _encrypt(cek, key)
else:
raise NotSupportedAlgorithm(_alg)
jwe = JWEnc(**self.headers())
try:
_auth_data = kwargs["auth_data"]
except KeyError:
_auth_data = jwe.b64_encode_header()
ctxt, tag, key = self.enc_setup(_enc, _msg, key=cek, iv=iv, auth_data=_auth_data)
return jwe.pack(parts=[jwe_enc_key, iv, ctxt, tag])
def decrypt(self, token, key, cek=None):
"""Decrypts a JWT
:param token: The JWT
:param key: A key to use for decrypting
:param cek: Ephemeral cipher key
:return: The decrypted message
"""
if not isinstance(token, JWEnc):
jwe = JWEnc().unpack(token)
else:
jwe = token
self.jwt = jwe.encrypted_key()
jek = jwe.encrypted_key()
_decrypt = RSAEncrypter(self.with_digest).decrypt
_alg = jwe.headers["alg"]
if cek:
pass
elif _alg == "RSA-OAEP":
cek = _decrypt(jek, key, "pkcs1_oaep_padding")
elif _alg == "RSA-OAEP-256":
cek = _decrypt(jek, key, "pkcs1_oaep_256_padding")
elif _alg == "RSA1_5":
cek = _decrypt(jek, key)
else:
raise NotSupportedAlgorithm(_alg)
self["cek"] = cek
enc = jwe.headers["enc"]
if enc not in SUPPORTED["enc"]:
raise NotSupportedAlgorithm(enc)
auth_data = jwe.b64_protected_header()
msg = self._decrypt(
enc,
cek,
jwe.ciphertext(),
auth_data=auth_data,
iv=jwe.initialization_vector(),
tag=jwe.authentication_tag(),
)
if "zip" in jwe.headers and jwe.headers["zip"] == "DEF":
msg = zlib.decompress(msg)
return msg