|
| 1 | +import json |
| 2 | +import os |
| 3 | + |
| 4 | +import pytest |
| 5 | +import responses |
| 6 | +from cryptojwt.key_jar import init_key_jar |
| 7 | + |
| 8 | +from oidcservice.client_auth import factory as ca_factory |
| 9 | +from oidcservice.oauth2 import DEFAULT_SERVICES |
| 10 | +from oidcservice.oidc.add_on import do_add_ons |
| 11 | +from oidcservice.service import init_services |
| 12 | +from oidcservice.service_context import ServiceContext |
| 13 | +from oidcservice.state_interface import InMemoryStateDataBase |
| 14 | + |
| 15 | +_dirname = os.path.dirname(os.path.abspath(__file__)) |
| 16 | + |
| 17 | +ISS = 'https://example.com' |
| 18 | + |
| 19 | +KEYSPEC = [ |
| 20 | + {"type": "RSA", "use": ["sig"]}, |
| 21 | + {"type": "EC", "crv": "P-256", "use": ["sig"]}, |
| 22 | +] |
| 23 | + |
| 24 | +CLI_KEY = init_key_jar(public_path='{}/pub_client.jwks'.format(_dirname), |
| 25 | + private_path='{}/priv_client.jwks'.format(_dirname), |
| 26 | + key_defs=KEYSPEC, owner='') |
| 27 | + |
| 28 | + |
| 29 | +class TestPushedAuth: |
| 30 | + @pytest.fixture(autouse=True) |
| 31 | + def create_client(self): |
| 32 | + config = { |
| 33 | + 'client_id': 'client_id', 'client_secret': 'a longesh password', |
| 34 | + 'redirect_uris': ['https://example.com/cli/authz_cb'], |
| 35 | + 'behaviour': {'response_types': ['code']}, |
| 36 | + 'add_ons': { |
| 37 | + "pushed_authorization": { |
| 38 | + "function": |
| 39 | + "oidcservice.oidc.add_on.pushed_authorization" |
| 40 | + ".add_pushed_authorization_support", |
| 41 | + "kwargs": { |
| 42 | + "body_format": "jws", |
| 43 | + "signing_algorthm": "RS256", |
| 44 | + "http_client": None, |
| 45 | + "merge_rule": "lax" |
| 46 | + } |
| 47 | + } |
| 48 | + } |
| 49 | + } |
| 50 | + _cam = ca_factory |
| 51 | + _srvs = DEFAULT_SERVICES |
| 52 | + service_context = ServiceContext(CLI_KEY, client_id='client_id', |
| 53 | + issuer='https://www.example.org/as', |
| 54 | + config=config) |
| 55 | + |
| 56 | + self.service = init_services(_srvs, service_context, InMemoryStateDataBase(), _cam) |
| 57 | + |
| 58 | + if 'add_ons' in config: |
| 59 | + do_add_ons(config['add_ons'], self.service) |
| 60 | + |
| 61 | + service_context.service = self.service |
| 62 | + service_context.provider_info = { |
| 63 | + "pushed_authorization_request_endpoint": "https://as.example.com/push" |
| 64 | + } |
| 65 | + |
| 66 | + def test_authorization(self): |
| 67 | + auth_service = self.service["authorization"] |
| 68 | + req_args = {'foo': 'bar', "response_type": "code"} |
| 69 | + with responses.RequestsMock() as rsps: |
| 70 | + _resp = { |
| 71 | + "request_uri": "urn:example:bwc4JK-ESC0w8acc191e-Y1LTC2", |
| 72 | + "expires_in": 3600 |
| 73 | + } |
| 74 | + rsps.add("GET", |
| 75 | + auth_service.service_context.provider_info[ |
| 76 | + "pushed_authorization_request_endpoint"], |
| 77 | + body=json.dumps(_resp), status=200) |
| 78 | + |
| 79 | + _req = auth_service.construct(request_args=req_args, state='state') |
| 80 | + |
| 81 | + assert set(_req.keys()) == {"request_uri", "response_type", "client_id"} |
0 commit comments