This repository was archived by the owner on Jun 12, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathaccess_token.py
62 lines (48 loc) · 2.27 KB
/
access_token.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
"""Implements the service that talks to the Access Token endpoint."""
import logging
from oidcmsg import oauth2
from oidcmsg.oauth2 import ResponseMessage
from oidcmsg.time_util import time_sans_frac
from oidcservice.oauth2.utils import get_state_parameter
from oidcservice.service import Service
LOGGER = logging.getLogger(__name__)
class AccessToken(Service):
"""The access token service."""
msg_type = oauth2.AccessTokenRequest
response_cls = oauth2.AccessTokenResponse
error_msg = ResponseMessage
endpoint_name = 'token_endpoint'
synchronous = True
service_name = 'accesstoken'
default_authn_method = 'client_secret_basic'
http_method = 'POST'
request_body_type = 'urlencoded'
response_body_type = 'json'
def __init__(self, service_context, client_authn_factory=None, conf=None):
Service.__init__(self, service_context, client_authn_factory=client_authn_factory,
conf=conf)
self.pre_construct.append(self.oauth_pre_construct)
def update_service_context(self, resp, key='', **kwargs):
if 'expires_in' in resp:
resp['__expires_at'] = time_sans_frac() + int(resp['expires_in'])
self.service_context.state.store_item(resp, 'token_response', key)
def oauth_pre_construct(self, request_args=None, post_args=None, **kwargs):
"""
:param request_args: Initial set of request arguments
:param kwargs: Extra keyword arguments
:return: Request arguments
"""
_state = get_state_parameter(request_args, kwargs)
parameters = list(self.msg_type.c_param.keys())
_args = self.service_context.state.extend_request_args({}, oauth2.AuthorizationRequest,
'auth_request', _state, parameters)
_args = self.service_context.state.extend_request_args(_args, oauth2.AuthorizationResponse,
'auth_response', _state, parameters)
if "grant_type" not in _args:
_args["grant_type"] = "authorization_code"
if request_args is None:
request_args = _args
else:
_args.update(request_args)
request_args = _args
return request_args, post_args