Skip to content

Commit 9732ed1

Browse files
author
Mark Roberts
committed
Minor refactor in conn.py, update version in __init__.py, add ErrorString
1 parent ee7e86e commit 9732ed1

File tree

4 files changed

+36
-27
lines changed

4 files changed

+36
-27
lines changed

kafka/client.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from functools import partial
66
from itertools import count
77

8-
from kafka.common import (ErrorMapping, TopicAndPartition,
8+
from kafka.common import (ErrorMapping, ErrorStrings, TopicAndPartition,
99
ConnectionError, FailedPayloadsError,
1010
BrokerResponseError, PartitionUnavailableError,
1111
KafkaUnavailableError, KafkaRequestError)
@@ -174,8 +174,8 @@ def _raise_on_response_error(self, resp):
174174
self.reset_topic_metadata(resp.topic)
175175

176176
raise BrokerResponseError(
177-
"Request for %s failed with errorcode=%d" %
178-
(TopicAndPartition(resp.topic, resp.partition), resp.error))
177+
"Request for %s failed with errorcode=%d (%s)" %
178+
(TopicAndPartition(resp.topic, resp.partition), resp.error, ErrorStrings[resp.error]))
179179

180180
#################
181181
# Public API #

kafka/common.py

Lines changed: 21 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -48,22 +48,28 @@
4848
TopicAndPartition = namedtuple("TopicAndPartition", ["topic", "partition"])
4949

5050

51+
ErrorStrings = {
52+
-1 : 'UNKNOWN',
53+
0 : 'NO_ERROR',
54+
1 : 'OFFSET_OUT_OF_RANGE',
55+
2 : 'INVALID_MESSAGE',
56+
3 : 'UNKNOWN_TOPIC_OR_PARTITON',
57+
4 : 'INVALID_FETCH_SIZE',
58+
5 : 'LEADER_NOT_AVAILABLE',
59+
6 : 'NOT_LEADER_FOR_PARTITION',
60+
7 : 'REQUEST_TIMED_OUT',
61+
8 : 'BROKER_NOT_AVAILABLE',
62+
9 : 'REPLICA_NOT_AVAILABLE',
63+
10 : 'MESSAGE_SIZE_TO_LARGE',
64+
11 : 'STALE_CONTROLLER_EPOCH',
65+
12 : 'OFFSET_METADATA_TOO_LARGE',
66+
}
67+
5168
class ErrorMapping(object):
52-
# Many of these are not actually used by the client
53-
UNKNOWN = -1
54-
NO_ERROR = 0
55-
OFFSET_OUT_OF_RANGE = 1
56-
INVALID_MESSAGE = 2
57-
UNKNOWN_TOPIC_OR_PARTITON = 3
58-
INVALID_FETCH_SIZE = 4
59-
LEADER_NOT_AVAILABLE = 5
60-
NOT_LEADER_FOR_PARTITION = 6
61-
REQUEST_TIMED_OUT = 7
62-
BROKER_NOT_AVAILABLE = 8
63-
REPLICA_NOT_AVAILABLE = 9
64-
MESSAGE_SIZE_TO_LARGE = 10
65-
STALE_CONTROLLER_EPOCH = 11
66-
OFFSET_METADATA_TOO_LARGE = 12
69+
pass
70+
71+
for k, v in ErrorStrings.items():
72+
setattr(ErrorMapping, v, k)
6773

6874
#################
6975
# Exceptions #

kafka/conn.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,10 @@ def __init__(self, host, port, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
2828
super(KafkaConnection, self).__init__()
2929
self.host = host
3030
self.port = port
31-
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
32-
self._sock.connect((host, port))
3331
self.timeout = timeout
34-
self._sock.settimeout(self.timeout)
35-
self._dirty = False
32+
self._sock = None
33+
34+
self.reinit()
3635

3736
def __repr__(self):
3837
return "<KafkaConnection host=%s port=%d>" % (self.host, self.port)
@@ -47,24 +46,28 @@ def _raise_connection_error(self):
4746

4847
def _read_bytes(self, num_bytes):
4948
bytes_left = num_bytes
50-
resp = ''
49+
responses = []
50+
5151
log.debug("About to read %d bytes from Kafka", num_bytes)
5252
if self._dirty:
5353
self.reinit()
54+
5455
while bytes_left:
5556
try:
56-
data = self._sock.recv(bytes_left)
57+
data = self._sock.recv(min(bytes_left, 4096))
5758
except socket.error:
5859
log.exception('Unable to receive data from Kafka')
5960
self._raise_connection_error()
61+
6062
if data == '':
6163
log.error("Not enough data to read this response")
6264
self._raise_connection_error()
65+
6366
bytes_left -= len(data)
6467
log.debug("Read %d/%d bytes from Kafka", num_bytes - bytes_left, num_bytes)
65-
resp += data
68+
responses.append(data)
6669

67-
return resp
70+
return ''.join(responses)
6871

6972
##################
7073
# Public API #

load_example.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,5 +53,5 @@ def main():
5353
print 'Messages invalid: %d' % threads[1].invalid
5454

5555
if __name__ == "__main__":
56-
logging.basicConfig(level=logging.DEBUG)
56+
logging.basicConfig(level=logging.WARN)
5757
main()

0 commit comments

Comments
 (0)