diff --git a/src/cryptojwt/jwe/__init__.py b/src/cryptojwt/jwe/__init__.py index 17776e0..f0e511b 100644 --- a/src/cryptojwt/jwe/__init__.py +++ b/src/cryptojwt/jwe/__init__.py @@ -22,7 +22,14 @@ "ECDH-ES+A192KW", "ECDH-ES+A256KW", ], - "enc": ["A128CBC-HS256", "A192CBC-HS384", "A256CBC-HS512", "A128GCM", "A192GCM", "A256GCM",], + "enc": [ + "A128CBC-HS256", + "A192CBC-HS384", + "A256CBC-HS512", + "A128GCM", + "A192GCM", + "A256GCM", + ], } diff --git a/src/cryptojwt/jwe/aes.py b/src/cryptojwt/jwe/aes.py index 625d0a6..20ea7d7 100644 --- a/src/cryptojwt/jwe/aes.py +++ b/src/cryptojwt/jwe/aes.py @@ -18,8 +18,7 @@ class AES_CBCEncrypter(Encrypter): - """ - """ + """""" def __init__(self, key_len=32, key=None, msg_padding="PKCS7"): Encrypter.__init__(self) diff --git a/src/cryptojwt/jwe/jwe_ec.py b/src/cryptojwt/jwe/jwe_ec.py index 911a6ea..3321a8e 100644 --- a/src/cryptojwt/jwe/jwe_ec.py +++ b/src/cryptojwt/jwe/jwe_ec.py @@ -157,7 +157,12 @@ def dec_setup(self, token, key=None, **kwargs): raise Exception("Unknown key length for algorithm") self.cek = ecdh_derive_key( - key, epubkey.pub_key, apu, apv, str(self.headers["enc"]).encode(), dk_len, + key, + epubkey.pub_key, + apu, + apv, + str(self.headers["enc"]).encode(), + dk_len, ) elif self.headers["alg"] in [ "ECDH-ES+A128KW", diff --git a/src/cryptojwt/jwe/jwe_rsa.py b/src/cryptojwt/jwe/jwe_rsa.py index dd4324d..f34b133 100644 --- a/src/cryptojwt/jwe/jwe_rsa.py +++ b/src/cryptojwt/jwe/jwe_rsa.py @@ -85,7 +85,7 @@ def encrypt(self, key, iv="", cek="", **kwargs): return jwe.pack(parts=[jwe_enc_key, iv, ctxt, tag]) def decrypt(self, token, key, cek=None): - """ Decrypts a JWT + """Decrypts a JWT :param token: The JWT :param key: A key to use for decrypting diff --git a/src/cryptojwt/jwe/jwekey.py b/src/cryptojwt/jwe/jwekey.py index faa6093..31a1c8a 100644 --- a/src/cryptojwt/jwe/jwekey.py +++ b/src/cryptojwt/jwe/jwekey.py @@ -38,7 +38,7 @@ def alg2keytype(self, alg): return alg2keytype(alg) def enc_setup(self, enc_alg, msg, auth_data=b"", key=None, iv=""): - """ Encrypt JWE content. + """Encrypt JWE content. :param enc_alg: The JWE "enc" value specifying the encryption algorithm :param msg: The plain text message @@ -62,7 +62,7 @@ def enc_setup(self, enc_alg, msg, auth_data=b"", key=None, iv=""): @staticmethod def _decrypt(enc, key, ctxt, iv, tag, auth_data=b""): - """ Decrypt JWE content. + """Decrypt JWE content. :param enc: The JWE "enc" value specifying the encryption algorithm :param key: Key (CEK) diff --git a/src/cryptojwt/jwe/rsa.py b/src/cryptojwt/jwe/rsa.py index 691c3e5..8e7cc4f 100644 --- a/src/cryptojwt/jwe/rsa.py +++ b/src/cryptojwt/jwe/rsa.py @@ -20,7 +20,9 @@ def encrypt(self, msg, key, sign_padding="pkcs1_padding"): return key.encrypt( msg, _padding( - mgf=padding.MGF1(algorithm=_chosen_hash()), algorithm=_chosen_hash(), label=None, + mgf=padding.MGF1(algorithm=_chosen_hash()), + algorithm=_chosen_hash(), + label=None, ), ) diff --git a/src/cryptojwt/jwk/jwk.py b/src/cryptojwt/jwk/jwk.py index 86f41c4..c9e3290 100644 --- a/src/cryptojwt/jwk/jwk.py +++ b/src/cryptojwt/jwk/jwk.py @@ -93,7 +93,9 @@ def key_from_jwk_dict(jwk_dict, private=None): else: # Ecdsa public key. ec_pub_numbers = ec.EllipticCurvePublicNumbers( - base64url_to_long(_jwk_dict["x"]), base64url_to_long(_jwk_dict["y"]), curve, + base64url_to_long(_jwk_dict["x"]), + base64url_to_long(_jwk_dict["y"]), + curve, ) _jwk_dict["pub_key"] = ec_pub_numbers.public_key(backends.default_backend()) return ECKey(**_jwk_dict) diff --git a/src/cryptojwt/jwk/rsa.py b/src/cryptojwt/jwk/rsa.py index 07de5a6..e98cb23 100644 --- a/src/cryptojwt/jwk/rsa.py +++ b/src/cryptojwt/jwk/rsa.py @@ -128,7 +128,7 @@ def rsa_eq(key1, key2): def x509_rsa_load(txt): - """ So I get the same output format as loads produces + """So I get the same output format as loads produces :param txt: :return: """ @@ -172,10 +172,10 @@ def rsa_construct_private(numbers): try: cnum["iqmp"] = numbers["di"] except KeyError: - cnum["iqmp"] = rsa.rsa_crt_iqmp(cnum["p"], cnum["p"]) + cnum["iqmp"] = rsa.rsa_crt_iqmp(cnum["p"], cnum["q"]) else: if not numbers["di"]: - cnum["iqmp"] = rsa.rsa_crt_iqmp(cnum["p"], cnum["p"]) + cnum["iqmp"] = rsa.rsa_crt_iqmp(cnum["p"], cnum["q"]) rpubn = rsa.RSAPublicNumbers(e=numbers["e"], n=numbers["n"]) rprivn = rsa.RSAPrivateNumbers(public_numbers=rpubn, **cnum) diff --git a/src/cryptojwt/jws/jws.py b/src/cryptojwt/jws/jws.py index b24b091..c9b334e 100644 --- a/src/cryptojwt/jws/jws.py +++ b/src/cryptojwt/jws/jws.py @@ -321,7 +321,11 @@ def verify_json(self, jws, keys=None, allow_none=False, at_least_one=False): for _sign in _signs: protected_headers = _sign.get("protected", "") token = b".".join( - [protected_headers.encode(), _payload.encode(), _sign["signature"].encode(),] + [ + protected_headers.encode(), + _payload.encode(), + _sign["signature"].encode(), + ] ) unprotected_headers = _sign.get("header", {}) diff --git a/src/cryptojwt/jws/pss.py b/src/cryptojwt/jws/pss.py index 71cbb3d..a7443dd 100644 --- a/src/cryptojwt/jws/pss.py +++ b/src/cryptojwt/jws/pss.py @@ -38,7 +38,8 @@ def sign(self, msg, key): sig = key.sign( digest, padding.PSS( - mgf=padding.MGF1(self.hash_algorithm()), salt_length=padding.PSS.MAX_LENGTH, + mgf=padding.MGF1(self.hash_algorithm()), + salt_length=padding.PSS.MAX_LENGTH, ), utils.Prehashed(self.hash_algorithm()), ) @@ -59,7 +60,8 @@ def verify(self, msg, signature, key): signature, msg, padding.PSS( - mgf=padding.MGF1(self.hash_algorithm()), salt_length=padding.PSS.MAX_LENGTH, + mgf=padding.MGF1(self.hash_algorithm()), + salt_length=padding.PSS.MAX_LENGTH, ), self.hash_algorithm(), ) diff --git a/src/cryptojwt/jws/utils.py b/src/cryptojwt/jws/utils.py index 171e7d0..8ee3095 100644 --- a/src/cryptojwt/jws/utils.py +++ b/src/cryptojwt/jws/utils.py @@ -11,7 +11,7 @@ def left_hash(msg, func="HS256"): - """ Calculate left hash as described in + """Calculate left hash as described in https://openid.net/specs/openid-connect-core-1_0.html#CodeIDToken for at_hash and in for c_hash diff --git a/src/cryptojwt/key_bundle.py b/src/cryptojwt/key_bundle.py index 9e7e752..9b2f200 100755 --- a/src/cryptojwt/key_bundle.py +++ b/src/cryptojwt/key_bundle.py @@ -402,7 +402,9 @@ def do_remote(self): else: LOGGER.warning( - "HTTP status %d reading remote JWKS from %s", _http_resp.status_code, self.source, + "HTTP status %d reading remote JWKS from %s", + _http_resp.status_code, + self.source, ) raise UpdateFailed(REMOTE_FAILED.format(self.source, _http_resp.status_code)) self.last_updated = time.time() diff --git a/src/cryptojwt/key_jar.py b/src/cryptojwt/key_jar.py index 9c040b7..4b58bfe 100755 --- a/src/cryptojwt/key_jar.py +++ b/src/cryptojwt/key_jar.py @@ -762,7 +762,12 @@ def build_keyjar(key_conf, kid_template="", keyjar=None, issuer_id="", storage=N @deprecated_alias(issuer="issuer_id", owner="issuer_id") def init_key_jar( - public_path="", private_path="", key_defs="", issuer_id="", read_only=True, storage=None, + public_path="", + private_path="", + key_defs="", + issuer_id="", + read_only=True, + storage=None, ): """ A number of cases here: @@ -805,7 +810,10 @@ def init_key_jar( """ _issuer = init_key_issuer( - public_path=public_path, private_path=private_path, key_defs=key_defs, read_only=read_only, + public_path=public_path, + private_path=private_path, + key_defs=key_defs, + read_only=read_only, ) if _issuer is None: diff --git a/src/cryptojwt/tools/keyconv.py b/src/cryptojwt/tools/keyconv.py index c12c8d5..83f0b2d 100644 --- a/src/cryptojwt/tools/keyconv.py +++ b/src/cryptojwt/tools/keyconv.py @@ -115,7 +115,10 @@ def pem2jwk( def export_jwk( - jwk: JWK, private: bool = False, encrypt: bool = False, passphrase: Optional[str] = None, + jwk: JWK, + private: bool = False, + encrypt: bool = False, + passphrase: Optional[str] = None, ) -> bytes: """Export JWK as PEM/bin""" diff --git a/src/cryptojwt/utils.py b/src/cryptojwt/utils.py index d0c6d97..b0619f7 100644 --- a/src/cryptojwt/utils.py +++ b/src/cryptojwt/utils.py @@ -158,8 +158,7 @@ def as_unicode(b): def bytes2str_conv(item): - """ - """ + """""" if isinstance(item, bytes): return item.decode("utf-8") elif item is None or isinstance(item, (str, int, bool)): diff --git a/tests/test_01_simplejwt.py b/tests/test_01_simplejwt.py index 02a944b..9d94a17 100644 --- a/tests/test_01_simplejwt.py +++ b/tests/test_01_simplejwt.py @@ -10,7 +10,10 @@ def _eq(l1, l2): def test_pack_jwt(): _jwt = SimpleJWT(**{"alg": "none", "cty": "jwt"}) jwt = _jwt.pack( - parts=[{"iss": "joe", "exp": 1300819380, "http://example.com/is_root": True}, "",] + parts=[ + {"iss": "joe", "exp": 1300819380, "http://example.com/is_root": True}, + "", + ] ) p = jwt.split(".") diff --git a/tests/test_02_jwk.py b/tests/test_02_jwk.py index 0290060..a9307b8 100644 --- a/tests/test_02_jwk.py +++ b/tests/test_02_jwk.py @@ -515,6 +515,9 @@ def test_key_from_jwk_dict_rsa(): _key = key_from_jwk_dict(jwk) assert isinstance(_key, RSAKey) assert _key.has_private_key() + _key2 = RSAKey(**jwk) + assert isinstance(_key2, RSAKey) + assert _key2.has_private_key() def test_key_from_jwk_dict_ec(): @@ -707,7 +710,10 @@ def test_x5t_calculation(): @pytest.mark.parametrize( "filename,key_type", - [("ec-public.pem", ec.EllipticCurvePublicKey), ("rsa-public.pem", rsa.RSAPublicKey),], + [ + ("ec-public.pem", ec.EllipticCurvePublicKey), + ("rsa-public.pem", rsa.RSAPublicKey), + ], ) def test_import_public_key_from_pem_file(filename, key_type): _file = full_path(filename) diff --git a/tests/test_04_key_issuer.py b/tests/test_04_key_issuer.py index 5e5fba7..fedad97 100755 --- a/tests/test_04_key_issuer.py +++ b/tests/test_04_key_issuer.py @@ -221,7 +221,11 @@ def test_build_keyissuer_usage(): def test_build_keyissuer_missing(tmpdir): keys = [ - {"type": "RSA", "key": os.path.join(tmpdir.dirname, "missing_file"), "use": ["enc", "sig"],} + { + "type": "RSA", + "key": os.path.join(tmpdir.dirname, "missing_file"), + "use": ["enc", "sig"], + } ] key_issuer = build_keyissuer(keys) @@ -239,7 +243,11 @@ def test_build_RSA_keyissuer_from_file(tmpdir): def test_build_EC_keyissuer_missing(tmpdir): keys = [ - {"type": "EC", "key": os.path.join(tmpdir.dirname, "missing_file"), "use": ["enc", "sig"],} + { + "type": "EC", + "key": os.path.join(tmpdir.dirname, "missing_file"), + "use": ["enc", "sig"], + } ] key_issuer = build_keyissuer(keys) @@ -616,7 +624,10 @@ def test_init_key_issuer_update(): # New set of keys, JWKSs with keys and public written to file _keyissuer_1 = init_key_issuer( - private_path=PRIVATE_FILE, key_defs=KEYSPEC, public_path=PUBLIC_FILE, read_only=False, + private_path=PRIVATE_FILE, + key_defs=KEYSPEC, + public_path=PUBLIC_FILE, + read_only=False, ) assert len(_keyissuer_1) == 2 @@ -646,7 +657,10 @@ def test_init_key_issuer_update(): assert len(_keyissuer_3.get("sig", "EC")) == 1 _keyissuer_4 = init_key_issuer( - private_path=PRIVATE_FILE, key_defs=KEYSPEC_2, public_path=PUBLIC_FILE, read_only=False, + private_path=PRIVATE_FILE, + key_defs=KEYSPEC_2, + public_path=PUBLIC_FILE, + read_only=False, ) # Now it should diff --git a/tests/test_04_key_jar.py b/tests/test_04_key_jar.py index 5e63e9c..b31e5ba 100755 --- a/tests/test_04_key_jar.py +++ b/tests/test_04_key_jar.py @@ -229,7 +229,11 @@ def test_build_keyjar_usage(): def test_build_keyjar_missing(tmpdir): keys = [ - {"type": "RSA", "key": os.path.join(tmpdir.dirname, "missing_file"), "use": ["enc", "sig"],} + { + "type": "RSA", + "key": os.path.join(tmpdir.dirname, "missing_file"), + "use": ["enc", "sig"], + } ] key_jar = build_keyjar(keys) @@ -247,7 +251,11 @@ def test_build_RSA_keyjar_from_file(tmpdir): def test_build_EC_keyjar_missing(tmpdir): keys = [ - {"type": "EC", "key": os.path.join(tmpdir.dirname, "missing_file"), "use": ["enc", "sig"],} + { + "type": "EC", + "key": os.path.join(tmpdir.dirname, "missing_file"), + "use": ["enc", "sig"], + } ] key_jar = build_keyjar(keys) @@ -303,7 +311,8 @@ def test_items(self): ), ) ks.add_kb( - "http://www.example.org", keybundle_from_local_file(RSAKEY, "der", ["ver", "sig"]), + "http://www.example.org", + keybundle_from_local_file(RSAKEY, "der", ["ver", "sig"]), ) assert len(ks.items()) == 2 @@ -329,7 +338,8 @@ def test_issuer_extra_slash(self): ), ) ks.add_kb( - "http://www.example.org", keybundle_from_local_file(RSAKEY, "der", ["ver", "sig"]), + "http://www.example.org", + keybundle_from_local_file(RSAKEY, "der", ["ver", "sig"]), ) assert ks.get("sig", "RSA", "http://www.example.org/") @@ -355,7 +365,8 @@ def test_issuer_missing_slash(self): ), ) ks.add_kb( - "http://www.example.org/", keybundle_from_local_file(RSAKEY, "der", ["ver", "sig"]), + "http://www.example.org/", + keybundle_from_local_file(RSAKEY, "der", ["ver", "sig"]), ) assert ks.get("sig", "RSA", "http://www.example.org") @@ -381,7 +392,8 @@ def test_get_enc(self): ), ) ks.add_kb( - "http://www.example.org/", keybundle_from_local_file(RSAKEY, "der", ["ver", "sig"]), + "http://www.example.org/", + keybundle_from_local_file(RSAKEY, "der", ["ver", "sig"]), ) assert ks.get("enc", "oct") @@ -407,7 +419,8 @@ def test_get_enc_not_mine(self): ), ) ks.add_kb( - "http://www.example.org/", keybundle_from_local_file(RSAKEY, "der", ["ver", "sig"]), + "http://www.example.org/", + keybundle_from_local_file(RSAKEY, "der", ["ver", "sig"]), ) assert ks.get("enc", "oct", "http://www.example.org/") @@ -449,7 +462,8 @@ def test_provider(self): kj = KeyJar() _url = "https://connect-op.herokuapp.com/jwks.json" kj.load_keys( - "https://connect-op.heroku.com", jwks_uri=_url, + "https://connect-op.heroku.com", + jwks_uri=_url, ) iss_keys = kj.get_issuer_keys("https://connect-op.heroku.com") if not iss_keys: @@ -968,7 +982,10 @@ def test_init_key_jar_update(): assert len(_keyjar_3.get_signing_key("EC")) == 1 _keyjar_4 = init_key_jar( - private_path=PRIVATE_FILE, key_defs=KEYSPEC_2, public_path=PUBLIC_FILE, read_only=False, + private_path=PRIVATE_FILE, + key_defs=KEYSPEC_2, + public_path=PUBLIC_FILE, + read_only=False, ) # Now it should diff --git a/tests/test_06_jws.py b/tests/test_06_jws.py index 96b15ac..c452e0b 100644 --- a/tests/test_06_jws.py +++ b/tests/test_06_jws.py @@ -431,7 +431,8 @@ def test_jws_mm(): @pytest.mark.parametrize( - "ec_func,alg", [(ec.SECP256R1, "ES256"), (ec.SECP384R1, "ES384"), (ec.SECP521R1, "ES512")], + "ec_func,alg", + [(ec.SECP256R1, "ES256"), (ec.SECP384R1, "ES384"), (ec.SECP521R1, "ES512")], ) def test_signer_es(ec_func, alg): payload = "Please take a moment to register today" @@ -706,7 +707,9 @@ def test_sign_json_dont_flatten_if_multiple_signatures(): key = ECKey().load_key(P256()) unprotected_headers = {"foo": "bar"} _jwt = JWS(msg="hello world", alg="ES256").sign_json( - headers=[(None, unprotected_headers), (None, {"abc": "xyz"})], keys=[key], flatten=True, + headers=[(None, unprotected_headers), (None, {"abc": "xyz"})], + keys=[key], + flatten=True, ) assert "signatures" in json.loads(_jwt) diff --git a/tests/test_09_jwt.py b/tests/test_09_jwt.py index 2639857..71b019d 100755 --- a/tests/test_09_jwt.py +++ b/tests/test_09_jwt.py @@ -26,10 +26,16 @@ def full_path(local_file): # k2 = import_private_rsa_key_from_file(full_path('size2048.key')) kb1 = KeyBundle( - source="file://{}".format(full_path("rsa.key")), fileformat="der", keyusage="sig", kid="1", + source="file://{}".format(full_path("rsa.key")), + fileformat="der", + keyusage="sig", + kid="1", ) kb2 = KeyBundle( - source="file://{}".format(full_path("size2048.key")), fileformat="der", keyusage="enc", kid="2", + source="file://{}".format(full_path("size2048.key")), + fileformat="der", + keyusage="enc", + kid="2", ) ALICE_KEY_JAR = KeyJar() @@ -37,7 +43,10 @@ def full_path(local_file): ALICE_KEY_JAR.add_kb(ALICE, kb2) kb3 = KeyBundle( - source="file://{}".format(full_path("server.key")), fileformat="der", keyusage="enc", kid="3", + source="file://{}".format(full_path("server.key")), + fileformat="der", + keyusage="enc", + kid="3", ) BOB_KEY_JAR = KeyJar() diff --git a/tests/test_50_argument_alias.py b/tests/test_50_argument_alias.py index 98b0947..746ab60 100644 --- a/tests/test_50_argument_alias.py +++ b/tests/test_50_argument_alias.py @@ -168,7 +168,10 @@ def test_init_key_jar_update(): assert len(_keyjar_3.get_signing_key("EC")) == 1 _keyjar_4 = init_key_jar( - private_path=PRIVATE_FILE, key_defs=KEYSPEC_2, public_path=PUBLIC_FILE, read_only=False, + private_path=PRIVATE_FILE, + key_defs=KEYSPEC_2, + public_path=PUBLIC_FILE, + read_only=False, ) # Now it should