Skip to content

Commit 7b7f3af

Browse files
authored
Fix sasl gssapi plugin: do not rely on client_ctx.complete in auth_bytes() (#2631)
1 parent 48dd596 commit 7b7f3af

File tree

2 files changed

+54
-5
lines changed

2 files changed

+54
-5
lines changed

kafka/sasl/gssapi.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,15 @@ def __init__(self, **config):
2626
raise ValueError('sasl_kerberos_service_name or sasl_kerberos_name required for GSSAPI sasl configuration')
2727
self._is_done = False
2828
self._is_authenticated = False
29+
self.gssapi_name = None
2930
if config.get('sasl_kerberos_name', None) is not None:
3031
self.auth_id = str(config['sasl_kerberos_name'])
32+
if isinstance(config['sasl_kerberos_name'], gssapi.Name):
33+
self.gssapi_name = config['sasl_kerberos_name']
3134
else:
3235
kerberos_domain_name = config.get('sasl_kerberos_domain_name', '') or config.get('host', '')
3336
self.auth_id = config['sasl_kerberos_service_name'] + '@' + kerberos_domain_name
34-
if isinstance(config.get('sasl_kerberos_name', None), gssapi.Name):
35-
self.gssapi_name = config['sasl_kerberos_name']
36-
else:
37+
if self.gssapi_name is None:
3738
self.gssapi_name = gssapi.Name(self.auth_id, name_type=gssapi.NameType.hostbased_service).canonicalize(gssapi.MechType.kerberos)
3839
self._client_ctx = gssapi.SecurityContext(name=self.gssapi_name, usage='initiate')
3940
self._next_token = self._client_ctx.step(None)
@@ -43,9 +44,8 @@ def auth_bytes(self):
4344
# so mark is_done after the final auth_bytes are provided
4445
# in practice we'll still receive a response when using SaslAuthenticate
4546
# but not when using the prior unframed approach.
46-
if self._client_ctx.complete:
47+
if self._is_authenticated:
4748
self._is_done = True
48-
self._is_authenticated = True
4949
return self._next_token or b''
5050

5151
def receive(self, auth_bytes):
@@ -74,6 +74,13 @@ def receive(self, auth_bytes):
7474
]
7575
# add authorization identity to the response, and GSS-wrap
7676
self._next_token = self._client_ctx.wrap(b''.join(message_parts), False).message
77+
# We need to identify the last token in auth_bytes();
78+
# we can't rely on client_ctx.complete because it becomes True after generating
79+
# the second-to-last token (after calling .step(auth_bytes) for the final time)
80+
# We could introduce an additional state variable (i.e., self._final_token),
81+
# but instead we just set _is_authenticated. Since the plugin interface does
82+
# not read is_authenticated() until after is_done() is True, this should be fine.
83+
self._is_authenticated = True
7784

7885
def is_done(self):
7986
return self._is_done

test/sasl/test_gssapi.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
from __future__ import absolute_import
2+
3+
try:
4+
from unittest import mock
5+
except ImportError:
6+
import mock
7+
8+
from kafka.sasl import get_sasl_mechanism
9+
import kafka.sasl.gssapi
10+
11+
12+
def test_gssapi():
13+
config = {
14+
'sasl_kerberos_domain_name': 'foo',
15+
'sasl_kerberos_service_name': 'bar',
16+
}
17+
client_ctx = mock.Mock()
18+
client_ctx.step.side_effect = [b'init', b'exchange', b'complete', b'xxxx']
19+
client_ctx.complete = False
20+
def mocked_message_wrapper(msg, *args):
21+
wrapped = mock.Mock()
22+
type(wrapped).message = mock.PropertyMock(return_value=msg)
23+
return wrapped
24+
client_ctx.unwrap.side_effect = mocked_message_wrapper
25+
client_ctx.wrap.side_effect = mocked_message_wrapper
26+
kafka.sasl.gssapi.gssapi = mock.Mock()
27+
kafka.sasl.gssapi.gssapi.SecurityContext.return_value = client_ctx
28+
gssapi = get_sasl_mechanism('GSSAPI')(**config)
29+
assert isinstance(gssapi, kafka.sasl.gssapi.SaslMechanismGSSAPI)
30+
client_ctx.step.assert_called_with(None)
31+
32+
while not gssapi.is_done():
33+
send_token = gssapi.auth_bytes()
34+
receive_token = send_token # not realistic, but enough for testing
35+
if send_token == b'\x00cbar@foo': # final wrapped message
36+
receive_token = b'' # final message gets an empty response
37+
gssapi.receive(receive_token)
38+
if client_ctx.step.call_count == 3:
39+
client_ctx.complete = True
40+
41+
assert gssapi.is_done()
42+
assert gssapi.is_authenticated()

0 commit comments

Comments
 (0)