Skip to content

Commit 6112d63

Browse files
committed
Add end-to-end tests with a custom http server
1 parent 6d1b083 commit 6112d63

File tree

4 files changed

+152
-1
lines changed

4 files changed

+152
-1
lines changed

requirements.txt

+1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
httpx~=0.16.1
22
gssapi
33
pytest
4+
k5test

setup.cfg

+3-1
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@ keywords =
2323
include_package_data = True
2424
python_requires = >=3.6
2525
packages = httpx_gssapi
26-
tests_require = pytest
2726
install_requires =
2827
httpx~=0.16.1
2928
gssapi
29+
tests_require =
30+
pytest
31+
k5test
3032
3133
[options.package_data]
3234
* =

tests/conftest.py

+129
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
# TODO: Provide this through k5test?
2+
import os
3+
import re
4+
import copy
5+
import socket
6+
import contextlib
7+
import multiprocessing as mp
8+
from time import sleep
9+
from base64 import b64decode
10+
from http.server import HTTPServer, BaseHTTPRequestHandler
11+
12+
import pytest
13+
import k5test
14+
15+
import gssapi.exceptions
16+
17+
WWW_AUTHENTICATE = 'WWW-Authenticate'
18+
AUTHORIZATION = 'Authorization'
19+
NEGOTIATE = 'Negotiate'
20+
21+
_find_auth = re.compile(r'Negotiate\s*([^,]*)', re.I).search
22+
23+
24+
class KrbRequestHandler(BaseHTTPRequestHandler):
25+
"""
26+
Simple HTTP Request Handler which implements kerberos authentication
27+
and responds with "Authenticated!" on success and "Unauthorized!" on
28+
failure.
29+
"""
30+
31+
def do_GET(self):
32+
in_token = self._get_auth_header()
33+
if not in_token:
34+
return self._unauthorized()
35+
ctx = self._get_context()
36+
out_token = ctx.step(in_token)
37+
if ctx.complete:
38+
return self._authorized(out_token)
39+
else:
40+
return self._unauthorized()
41+
42+
def _get_auth_header(self):
43+
auth = self.headers.get(AUTHORIZATION)
44+
if not auth:
45+
return
46+
match_obj = _find_auth(auth)
47+
if match_obj:
48+
return b64decode(match_obj.group(1))
49+
50+
def _authorized(self, neg_token=None):
51+
self._respond(200, 'Authorized!', neg_token)
52+
53+
def _unauthorized(self, neg_token=None):
54+
self._respond(401, 'Unauthorized!', neg_token)
55+
56+
def _respond(self, code, msg, neg_token=None):
57+
self.send_response(code)
58+
self.send_header('Content-Type', 'text/plain')
59+
self._set_www_auth(neg_token)
60+
self.end_headers()
61+
self.wfile.write(msg.encode())
62+
63+
def _set_www_auth(self, token=None):
64+
www_auth = f'{NEGOTIATE} {token}' if token else NEGOTIATE
65+
self.send_header(WWW_AUTHENTICATE, www_auth)
66+
67+
def _get_context(self):
68+
service_name = gssapi.Name(
69+
f'HTTP/{self.server.server_name}@{self.server.krb5_realm.realm}'
70+
)
71+
server_cred = gssapi.Credentials(name=service_name, usage='accept')
72+
return gssapi.SecurityContext(creds=server_cred)
73+
74+
75+
def start_http_server(realm: k5test.K5Realm,
76+
host: str = 'localhost',
77+
port: int = 8080):
78+
princ = f'HTTP/{host}@{realm.realm}'
79+
realm.addprinc(princ)
80+
realm.extract_keytab(princ, realm.keytab)
81+
realm.ccache = realm.env['KRB5CCNAME'] \
82+
= os.path.join(realm.tmpdir, 'service_ccache')
83+
realm.kinit(princ, flags=['-k', '-t', realm.keytab])
84+
85+
os.environ.update(realm.env)
86+
87+
with HTTPServer(server_address=(host, port),
88+
RequestHandlerClass=KrbRequestHandler) as httpd:
89+
httpd.krb5_realm = realm
90+
httpd.serve_forever()
91+
92+
93+
@pytest.fixture(scope='session')
94+
def krb_realm() -> k5test.K5Realm:
95+
realm = k5test.K5Realm()
96+
env = copy.deepcopy(os.environ)
97+
os.environ.update(realm.env)
98+
yield realm
99+
realm.stop()
100+
os.environ = env
101+
102+
103+
@pytest.fixture(scope='session')
104+
def http_server_port() -> int:
105+
with contextlib.closing(socket.socket()) as sock:
106+
sock.bind(('127.0.0.1', 0))
107+
return sock.getsockname()[1]
108+
109+
110+
@pytest.fixture(scope='session')
111+
def http_server(request, krb_realm: k5test.K5Realm, http_server_port: int):
112+
ps = mp.Process(
113+
target=start_http_server,
114+
args=(krb_realm,),
115+
kwargs={'port': http_server_port},
116+
)
117+
ps.start()
118+
119+
sleep(1)
120+
121+
@request.addfinalizer
122+
def cleanup():
123+
if ps.is_alive():
124+
ps.terminate()
125+
126+
127+
@pytest.fixture
128+
def http_creds(krb_realm: k5test.K5Realm):
129+
yield gssapi.Credentials(usage='initiate', name=gssapi.Name('user'))

tests/test_end_to_end.py

+19
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#!/usr/bin/env python
2+
"""Tests for httpx_gssapi."""
3+
4+
import httpx
5+
import pytest
6+
7+
import httpx_gssapi
8+
9+
10+
def test_end_to_end(http_server, http_creds, krb_realm, http_server_port):
11+
auth = httpx_gssapi.HTTPSPNEGOAuth(creds=http_creds)
12+
with httpx.Client(auth=auth, timeout=500) as client:
13+
for i in range(2):
14+
resp = client.get(f'http://localhost:{http_server_port}/')
15+
assert resp.status_code == 200
16+
17+
18+
if __name__ == '__main__':
19+
pytest.main()

0 commit comments

Comments
 (0)