This repository was archived by the owner on Dec 30, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathfront.py
177 lines (146 loc) · 6.38 KB
/
front.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
#!/usr/bin/env python
import logging
from urlparse import urlparse
from saml2.httputil import ServiceError
from saml2.httputil import Response
from saml2.httputil import Redirect
from saml2.httputil import Unauthorized
from saml2.s_utils import UnknownPrincipal
from saml2.s_utils import UnsupportedBinding
from saml2.server import Server
import service
logger = logging.getLogger(__name__)
class SamlIDP(service.Service):
def __init__(self, environ, start_response, conf, cache, incomming):
"""
Constructor for the class.
:param environ: WSGI environ
:param start_response: WSGI start response function
:param conf: The SAML configuration
:param cache: Cache with active sessions
"""
service.Service.__init__(self, environ, start_response)
self.response_bindings = None
self.idp = Server(config=conf, cache=cache)
self.incomming = incomming
def verify_request(self, query, binding):
""" Parses and verifies the SAML Authentication Request
:param query: The SAML authn request, transport encoded
:param binding: Which binding the query came in over
:returns: dictionary
"""
if not query:
logger.info("Missing QUERY")
resp = Unauthorized('Unknown user')
return {"response": resp(self.environ, self.start_response)}
req_info = self.idp.parse_authn_request(query, binding)
logger.info("parsed OK")
_authn_req = req_info.message
logger.debug("%s" % _authn_req)
# Check that I know where to send the reply to
try:
binding_out, destination = self.idp.pick_binding(
"assertion_consumer_service",
bindings=self.response_bindings,
entity_id=_authn_req.issuer.text, request=_authn_req)
except Exception as err:
logger.error("Couldn't find receiver endpoint: %s" % err)
raise
logger.debug("Binding: %s, destination: %s" % (binding_out,
destination))
resp_args = {}
try:
resp_args = self.idp.response_args(_authn_req)
_resp = None
except UnknownPrincipal as excp:
_resp = self.idp.create_error_response(_authn_req.id,
destination, excp)
except UnsupportedBinding as excp:
_resp = self.idp.create_error_response(_authn_req.id,
destination, excp)
req_args = {}
for key in ["subject", "name_id_policy", "conditions",
"requested_authn_context", "scoping", "force_authn",
"is_passive"]:
try:
val = getattr(_authn_req, key)
except AttributeError:
pass
else:
req_args[key] = val
return {"resp_args": resp_args, "response": _resp,
"authn_req": _authn_req, "req_args": req_args}
def handle_authn_request(self, binding_in):
"""
Deal with an authentication request
:param binding_in: Which binding was used when receiving the query
:return: A response if an error occurred or session information in a
dictionary
"""
_request = self.unpack(binding_in)
_binding_in = service.INV_BINDING_MAP[binding_in]
try:
_dict = self.verify_request(_request["SAMLRequest"], _binding_in)
except UnknownPrincipal as excp:
logger.error("UnknownPrincipal: %s" % (excp,))
resp = ServiceError("UnknownPrincipal: %s" % (excp,))
return resp(self.environ, self.start_response)
except UnsupportedBinding as excp:
logger.error("UnsupportedBinding: %s" % (excp,))
resp = ServiceError("UnsupportedBinding: %s" % (excp,))
return resp(self.environ, self.start_response)
_binding = _dict["resp_args"]["binding"]
if _dict["response"]: # An error response
http_args = self.idp.apply_binding(
_binding, "%s" % _dict["response"],
_dict["resp_args"]["destination"],
_request["RelayState"], response=True)
logger.debug("HTTPargs: %s" % http_args)
return self.response(_binding, http_args)
else:
return self.incomming(_dict, self, self.environ,
self.start_response, _request["RelayState"])
def construct_authn_response(self, identity, name_id, authn, resp_args,
relay_state, sign_response=True):
"""
:param identity:
:param name_id:
:param authn:
:param resp_args:
:param relay_state:
:param sign_response:
:return:
"""
_resp = self.idp.create_authn_response(identity, name_id=name_id,
authn=authn,
sign_response=sign_response,
**resp_args)
http_args = self.idp.apply_binding(
resp_args["binding"], "%s" % _resp, resp_args["destination"],
relay_state, response=True)
logger.debug("HTTPargs: %s" % http_args)
resp = None
if http_args["data"]:
resp = Response(http_args["data"], headers=http_args["headers"])
else:
for header in http_args["headers"]:
if header[0] == "Location":
resp = Redirect(header[1])
if not resp:
resp = ServiceError("Don't know how to return response")
return resp(self.environ, self.start_response)
def register_endpoints(self):
"""
Given the configuration, return a set of URL to function mappings.
"""
url_map = []
for endp, binding in self.idp.config.getattr("endpoints", "idp")[
"single_sign_on_service"]:
p = urlparse(endp)
url_map.append(("^%s/(.*)$" % p.path[1:],
("IDP", "handle_authn_request",
service.BINDING_MAP[binding])))
url_map.append(("^%s$" % p.path[1:],
("IDP", "handle_authn_request",
service.BINDING_MAP[binding])))
return url_map