Skip to content

Commit 19646b1

Browse files
committed
Merge branch 'master' into develop
Conflicts: test/test_unit.py
2 parents 828133c + 32edabd commit 19646b1

File tree

10 files changed

+303
-52
lines changed

10 files changed

+303
-52
lines changed

README.md

+4-4
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ from kafka.client import KafkaClient
2929
from kafka.consumer import SimpleConsumer
3030
from kafka.producer import SimpleProducer, KeyedProducer
3131

32-
kafka = KafkaClient("localhost", 9092)
32+
kafka = KafkaClient("localhost:9092")
3333

3434
# To send messages synchronously
3535
producer = SimpleProducer(kafka)
@@ -80,7 +80,7 @@ from kafka.client import KafkaClient
8080
from kafka.producer import KeyedProducer
8181
from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner
8282

83-
kafka = KafkaClient("localhost", 9092)
83+
kafka = KafkaClient("localhost:9092")
8484

8585
# HashedPartitioner is default
8686
producer = KeyedProducer(kafka)
@@ -95,7 +95,7 @@ producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
9595
from kafka.client import KafkaClient
9696
from kafka.consumer import MultiProcessConsumer
9797

98-
kafka = KafkaClient("localhost", 9092)
98+
kafka = KafkaClient("localhost:9092")
9999

100100
# This will split the number of partitions among two processes
101101
consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2)
@@ -115,7 +115,7 @@ for message in consumer.get_messages(count=5, block=True, timeout=4):
115115

116116
```python
117117
from kafka.client import KafkaClient
118-
kafka = KafkaClient("localhost", 9092)
118+
kafka = KafkaClient("localhost:9092")
119119
req = ProduceRequest(topic="my-topic", partition=1,
120120
messages=[KafkaProdocol.encode_message("some message")])
121121
resps = kafka.send_produce_request(payloads=[req], fail_on_error=True)

example.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ def consume_example(client):
1414
print(message)
1515

1616
def main():
17-
client = KafkaClient("localhost", 9092)
17+
client = KafkaClient("localhost:9092")
1818
produce_example(client)
1919
consume_example(client)
2020

kafka/NOTES.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ There are a few levels of abstraction:
1818

1919
# Possible API
2020

21-
client = KafkaClient("localhost", 9092)
21+
client = KafkaClient("localhost:9092")
2222

2323
producer = KafkaProducer(client, "topic")
2424
producer.send_string("hello")

kafka/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
__title__ = 'kafka'
2-
__version__ = '0.2-alpha'
2+
__version__ = '0.9.0'
33
__author__ = 'David Arthur'
44
__license__ = 'Apache License 2.0'
55
__copyright__ = 'Copyright 2012, David Arthur under Apache License, v2.0'

kafka/client.py

+20-9
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
LeaderUnavailableError,
1212
KafkaUnavailableError)
1313

14-
from kafka.conn import KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
14+
from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS
1515
from kafka.protocol import KafkaProtocol
1616

1717
log = logging.getLogger("kafka")
@@ -25,14 +25,15 @@ class KafkaClient(object):
2525
# NOTE: The timeout given to the client should always be greater than the
2626
# one passed to SimpleConsumer.get_message(), otherwise you can get a
2727
# socket timeout.
28-
def __init__(self, host, port, client_id=CLIENT_ID,
28+
def __init__(self, hosts, client_id=CLIENT_ID,
2929
timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
3030
# We need one connection to bootstrap
3131
self.client_id = client_id
3232
self.timeout = timeout
33-
self.conns = { # (host, port) -> KafkaConnection
34-
(host, port): KafkaConnection(host, port, timeout=timeout)
35-
}
33+
self.hosts = collect_hosts(hosts)
34+
35+
# create connections only when we need them
36+
self.conns = {}
3637
self.brokers = {} # broker_id -> BrokerMetadata
3738
self.topics_to_brokers = {} # topic_id -> broker_id
3839
self.topic_partitions = {} # topic_id -> [0, 1, 2, ...]
@@ -42,6 +43,15 @@ def __init__(self, host, port, client_id=CLIENT_ID,
4243
# Private API #
4344
##################
4445

46+
def _get_conn(self, host, port):
47+
"Get or create a connection to a broker using host and port"
48+
49+
host_key = (host, port)
50+
if host_key not in self.conns:
51+
self.conns[host_key] = KafkaConnection(host, port)
52+
53+
return self.conns[host_key]
54+
4555
def _get_conn_for_broker(self, broker):
4656
"""
4757
Get or create a connection to a broker
@@ -50,7 +60,7 @@ def _get_conn_for_broker(self, broker):
5060
self.conns[(broker.host, broker.port)] = \
5161
KafkaConnection(broker.host, broker.port, timeout=self.timeout)
5262

53-
return self.conns[(broker.host, broker.port)]
63+
return self._get_conn(broker.host, broker.port)
5464

5565
def _get_leader_for_partition(self, topic, partition):
5666
"""
@@ -83,14 +93,15 @@ def _send_broker_unaware_request(self, requestId, request):
8393
Attempt to send a broker-agnostic request to one of the available
8494
brokers. Keep trying until you succeed.
8595
"""
86-
for conn in self.conns.values():
96+
for (host, port) in self.hosts:
8797
try:
98+
conn = self._get_conn(host, port)
8899
conn.send(requestId, request)
89100
response = conn.recv(requestId)
90101
return response
91102
except Exception, e:
92-
log.warning("Could not send request [%r] to server %s, "
93-
"trying next server: %s" % (request, conn, e))
103+
log.warning("Could not send request [%r] to server %s:%i, "
104+
"trying next server: %s" % (request, host, port, e))
94105
continue
95106

96107
raise KafkaUnavailableError("All servers failed to process request")

kafka/codec.py

+95-3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
from cStringIO import StringIO
22
import gzip
3+
import struct
4+
5+
_XERIAL_V1_HEADER = (-126, 'S', 'N', 'A', 'P', 'P', 'Y', 0, 1, 1)
6+
_XERIAL_V1_FORMAT = 'bccccccBii'
37

48
try:
59
import snappy
@@ -36,13 +40,101 @@ def gzip_decode(payload):
3640
return result
3741

3842

39-
def snappy_encode(payload):
43+
def snappy_encode(payload, xerial_compatible=False, xerial_blocksize=32 * 1024):
44+
"""Encodes the given data with snappy if xerial_compatible is set then the
45+
stream is encoded in a fashion compatible with the xerial snappy library
46+
47+
The block size (xerial_blocksize) controls how frequent the blocking occurs
48+
32k is the default in the xerial library.
49+
50+
The format winds up being
51+
+-------------+------------+--------------+------------+--------------+
52+
| Header | Block1 len | Block1 data | Blockn len | Blockn data |
53+
|-------------+------------+--------------+------------+--------------|
54+
| 16 bytes | BE int32 | snappy bytes | BE int32 | snappy bytes |
55+
+-------------+------------+--------------+------------+--------------+
56+
57+
It is important to not that the blocksize is the amount of uncompressed
58+
data presented to snappy at each block, whereas the blocklen is the
59+
number of bytes that will be present in the stream, that is the
60+
length will always be <= blocksize.
61+
"""
62+
4063
if not _has_snappy:
4164
raise NotImplementedError("Snappy codec is not available")
42-
return snappy.compress(payload)
65+
66+
if xerial_compatible:
67+
def _chunker():
68+
for i in xrange(0, len(payload), xerial_blocksize):
69+
yield payload[i:i+xerial_blocksize]
70+
71+
out = StringIO()
72+
73+
header = ''.join([struct.pack('!' + fmt, dat) for fmt, dat
74+
in zip(_XERIAL_V1_FORMAT, _XERIAL_V1_HEADER)])
75+
76+
out.write(header)
77+
for chunk in _chunker():
78+
block = snappy.compress(chunk)
79+
block_size = len(block)
80+
out.write(struct.pack('!i', block_size))
81+
out.write(block)
82+
83+
out.seek(0)
84+
return out.read()
85+
86+
else:
87+
return snappy.compress(payload)
88+
89+
90+
def _detect_xerial_stream(payload):
91+
"""Detects if the data given might have been encoded with the blocking mode
92+
of the xerial snappy library.
93+
94+
This mode writes a magic header of the format:
95+
+--------+--------------+------------+---------+--------+
96+
| Marker | Magic String | Null / Pad | Version | Compat |
97+
|--------+--------------+------------+---------+--------|
98+
| byte | c-string | byte | int32 | int32 |
99+
|--------+--------------+------------+---------+--------|
100+
| -126 | 'SNAPPY' | \0 | | |
101+
+--------+--------------+------------+---------+--------+
102+
103+
The pad appears to be to ensure that SNAPPY is a valid cstring
104+
The version is the version of this format as written by xerial,
105+
in the wild this is currently 1 as such we only support v1.
106+
107+
Compat is there to claim the miniumum supported version that
108+
can read a xerial block stream, presently in the wild this is
109+
1.
110+
"""
111+
112+
if len(payload) > 16:
113+
header = header = struct.unpack('!' + _XERIAL_V1_FORMAT, bytes(payload)[:16])
114+
return header == _XERIAL_V1_HEADER
115+
return False
43116

44117

45118
def snappy_decode(payload):
46119
if not _has_snappy:
47120
raise NotImplementedError("Snappy codec is not available")
48-
return snappy.decompress(payload)
121+
122+
if _detect_xerial_stream(payload):
123+
# TODO ? Should become a fileobj ?
124+
out = StringIO()
125+
byt = buffer(payload[16:])
126+
length = len(byt)
127+
cursor = 0
128+
129+
while cursor < length:
130+
block_size = struct.unpack_from('!i', byt[cursor:])[0]
131+
# Skip the block size
132+
cursor += 4
133+
end = cursor + block_size
134+
out.write(snappy.decompress(byt[cursor:end]))
135+
cursor = end
136+
137+
out.seek(0)
138+
return out.read()
139+
else:
140+
return snappy.decompress(payload)

kafka/conn.py

+27-1
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,39 @@
22
import logging
33
import socket
44
import struct
5+
from random import shuffle
56
from threading import local
67

78
from kafka.common import ConnectionError
89

910
log = logging.getLogger("kafka")
1011

1112
DEFAULT_SOCKET_TIMEOUT_SECONDS = 120
13+
DEFAULT_KAFKA_PORT = 9092
14+
15+
16+
def collect_hosts(hosts, randomize=True):
17+
"""
18+
Collects a comma-separated set of hosts (host:port) and optionally
19+
randomize the returned list.
20+
"""
21+
22+
if isinstance(hosts, str):
23+
hosts = hosts.strip().split(',')
24+
25+
result = []
26+
for host_port in hosts:
27+
28+
res = host_port.split(':')
29+
host = res[0]
30+
port = int(res[1]) if len(res) > 1 else DEFAULT_KAFKA_PORT
31+
result.append((host.strip(), port))
32+
33+
if randomize:
34+
shuffle(result)
35+
36+
return result
37+
1238

1339
class KafkaConnection(local):
1440
"""
@@ -81,7 +107,7 @@ def send(self, request_id, payload):
81107
sent = self._sock.sendall(payload)
82108
if sent is not None:
83109
self._raise_connection_error()
84-
except socket.error, e:
110+
except socket.error:
85111
log.exception('Unable to send payload to Kafka')
86112
self._raise_connection_error()
87113

setup.py

+1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55

66
class Tox(Command):
7+
78
user_options = []
89

910
def initialize_options(self):

test/test_integration.py

+19-15
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ def ensure_topic_creation(client, topic_name):
3333

3434
class KafkaTestCase(unittest.TestCase):
3535
def setUp(self):
36-
self.topic = "%s-%s" % (self.id()[self.id().rindex(".")+1:], random_string(10))
36+
self.topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10))
3737
ensure_topic_creation(self.client, self.topic)
3838

3939

@@ -42,7 +42,7 @@ class TestKafkaClient(KafkaTestCase):
4242
def setUpClass(cls): # noqa
4343
cls.zk = ZookeeperFixture.instance()
4444
cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
45-
cls.client = KafkaClient(cls.server.host, cls.server.port)
45+
cls.client = KafkaClient('%s:%d' % (cls.server.host, cls.server.port))
4646

4747
@classmethod
4848
def tearDownClass(cls): # noqa
@@ -578,7 +578,7 @@ def setUpClass(cls):
578578
cls.zk = ZookeeperFixture.instance()
579579
cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
580580
cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port)
581-
cls.client = KafkaClient(cls.server2.host, cls.server2.port)
581+
cls.client = KafkaClient('%s:%d' % (cls.server2.host, cls.server2.port))
582582

583583
@classmethod
584584
def tearDownClass(cls): # noqa
@@ -826,23 +826,26 @@ def test_large_messages(self):
826826

827827
class TestFailover(KafkaTestCase):
828828

829-
def setUp(self):
829+
@classmethod
830+
def setUpClass(cls): # noqa
830831
zk_chroot = random_string(10)
831832
replicas = 2
832833
partitions = 2
833834

834835
# mini zookeeper, 2 kafka brokers
835-
self.zk = ZookeeperFixture.instance()
836-
kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions]
837-
self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
838-
self.client = KafkaClient(self.brokers[0].host, self.brokers[0].port)
839-
super(TestFailover, self).setUp()
840-
841-
def tearDown(self):
842-
self.client.close()
843-
for broker in self.brokers:
836+
cls.zk = ZookeeperFixture.instance()
837+
kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions]
838+
cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
839+
840+
hosts = ['%s:%d' % (b.host, b.port) for b in cls.brokers]
841+
cls.client = KafkaClient(hosts)
842+
843+
@classmethod
844+
def tearDownClass(cls):
845+
cls.client.close()
846+
for broker in cls.brokers:
844847
broker.close()
845-
self.zk.close()
848+
cls.zk.close()
846849

847850
def test_switch_leader(self):
848851
key, topic, partition = random_string(5), self.topic, 0
@@ -918,7 +921,8 @@ def _kill_leader(self, topic, partition):
918921
return broker
919922

920923
def _count_messages(self, group, topic):
921-
client = KafkaClient(self.brokers[0].host, self.brokers[0].port)
924+
hosts = '%s:%d' % (self.brokers[0].host, self.brokers[0].port)
925+
client = KafkaClient(hosts)
922926
consumer = SimpleConsumer(client, group, topic, auto_commit=False, iter_timeout=0)
923927
all_messages = []
924928
for message in consumer:

0 commit comments

Comments
 (0)