Skip to content

Commit 0bdff4e

Browse files
committed
Allow KafkaClient to take in a list of brokers for bootstrapping
1 parent af3a57e commit 0bdff4e

File tree

8 files changed

+256
-39
lines changed

8 files changed

+256
-39
lines changed

README.md

+4-4
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ from kafka.client import KafkaClient
3030
from kafka.consumer import SimpleConsumer
3131
from kafka.producer import SimpleProducer, KeyedProducer
3232

33-
kafka = KafkaClient("localhost", 9092)
33+
kafka = KafkaClient("localhost:9092")
3434

3535
# To send messages synchronously
3636
producer = SimpleProducer(kafka, "my-topic")
@@ -81,7 +81,7 @@ from kafka.client import KafkaClient
8181
from kafka.producer import KeyedProducer
8282
from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner
8383

84-
kafka = KafkaClient("localhost", 9092)
84+
kafka = KafkaClient("localhost:9092")
8585

8686
# HashedPartitioner is default
8787
producer = KeyedProducer(kafka, "my-topic")
@@ -96,7 +96,7 @@ producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner)
9696
from kafka.client import KafkaClient
9797
from kafka.consumer import MultiProcessConsumer
9898

99-
kafka = KafkaClient("localhost", 9092)
99+
kafka = KafkaClient("localhost:9092")
100100

101101
# This will split the number of partitions among two processes
102102
consumer = MultiProcessConsumer(kafka, "my-group", "my-topic", num_procs=2)
@@ -116,7 +116,7 @@ for message in consumer.get_messages(count=5, block=True, timeout=4):
116116

117117
```python
118118
from kafka.client import KafkaClient
119-
kafka = KafkaClient("localhost", 9092)
119+
kafka = KafkaClient("localhost:9092")
120120
req = ProduceRequest(topic="my-topic", partition=1,
121121
messages=[KafkaProdocol.encode_message("some message")])
122122
resps = kafka.send_produce_request(payloads=[req], fail_on_error=True)

example.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ def consume_example(client):
1414
print(message)
1515

1616
def main():
17-
client = KafkaClient("localhost", 9092)
17+
client = KafkaClient("localhost:9092")
1818
produce_example(client)
1919
consume_example(client)
2020

kafka/NOTES.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ There are a few levels of abstraction:
1818

1919
# Possible API
2020

21-
client = KafkaClient("localhost", 9092)
21+
client = KafkaClient("localhost:9092")
2222

2323
producer = KafkaProducer(client, "topic")
2424
producer.send_string("hello")

kafka/client.py

+21-14
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
from kafka.common import ErrorMapping, TopicAndPartition
1010
from kafka.common import ConnectionError, FailedPayloadsException
11-
from kafka.conn import KafkaConnection
11+
from kafka.conn import collect_hosts, KafkaConnection
1212
from kafka.protocol import KafkaProtocol
1313

1414
log = logging.getLogger("kafka")
@@ -19,13 +19,15 @@ class KafkaClient(object):
1919
CLIENT_ID = "kafka-python"
2020
ID_GEN = count()
2121

22-
def __init__(self, host, port, bufsize=4096, client_id=CLIENT_ID):
22+
def __init__(self, hosts, bufsize=4096, client_id=CLIENT_ID):
2323
# We need one connection to bootstrap
2424
self.bufsize = bufsize
2525
self.client_id = client_id
26-
self.conns = { # (host, port) -> KafkaConnection
27-
(host, port): KafkaConnection(host, port, bufsize)
28-
}
26+
27+
self.hosts = collect_hosts(hosts)
28+
29+
# create connections only when we need them
30+
self.conns = {}
2931
self.brokers = {} # broker_id -> BrokerMetadata
3032
self.topics_to_brokers = {} # topic_id -> broker_id
3133
self.topic_partitions = defaultdict(list) # topic_id -> [0, 1, 2, ...]
@@ -35,15 +37,19 @@ def __init__(self, host, port, bufsize=4096, client_id=CLIENT_ID):
3537
# Private API #
3638
##################
3739

40+
def _get_conn(self, host, port):
41+
"Get or create a connection to a broker using host and port"
42+
43+
host_key = (host, port)
44+
if host_key not in self.conns:
45+
self.conns[host_key] = KafkaConnection(host, port, self.bufsize)
46+
47+
return self.conns[host_key]
48+
3849
def _get_conn_for_broker(self, broker):
39-
"""
40-
Get or create a connection to a broker
41-
"""
42-
if (broker.host, broker.port) not in self.conns:
43-
self.conns[(broker.host, broker.port)] = \
44-
KafkaConnection(broker.host, broker.port, self.bufsize)
50+
"Get or create a connection to a broker"
4551

46-
return self.conns[(broker.host, broker.port)]
52+
return self._get_conn(broker.host, broker.port)
4753

4854
def _get_leader_for_partition(self, topic, partition):
4955
key = TopicAndPartition(topic, partition)
@@ -108,7 +114,8 @@ def _send_broker_unaware_request(self, requestId, request):
108114
Attempt to send a broker-agnostic request to one of the available
109115
brokers. Keep trying until you succeed.
110116
"""
111-
for conn in self.conns.values():
117+
for (host, port) in self.hosts:
118+
conn = self._get_conn(host, port)
112119
try:
113120
conn.send(requestId, request)
114121
response = conn.recv(requestId)
@@ -174,7 +181,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
174181
except ConnectionError, e: # ignore BufferUnderflow for now
175182
log.warning("Could not send request [%s] to server %s: %s" % (request, conn, e))
176183
failed_payloads += payloads
177-
self.topics_to_brokers = {} # reset metadata
184+
self.topics_to_brokers = {} # reset metadata
178185
continue
179186

180187
for response in decoder_fn(response):

kafka/conn.py

+26-7
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import logging
33
import socket
44
import struct
5+
from random import shuffle
56
from threading import local
67

78
from kafka.common import BufferUnderflowError
@@ -10,6 +11,26 @@
1011
log = logging.getLogger("kafka")
1112

1213

14+
def collect_hosts(hosts, randomize=True):
15+
"""
16+
Collects a comma-separated set of hosts (host:port) and optionnaly
17+
randomize the returned list.
18+
"""
19+
20+
result = []
21+
for host_port in hosts.split(","):
22+
23+
res = host_port.split(':')
24+
host = res[0]
25+
port = int(res[1]) if len(res) > 1 else 9092
26+
result.append((host.strip(), port))
27+
28+
if randomize:
29+
shuffle(result)
30+
31+
return result
32+
33+
1334
class KafkaConnection(local):
1435
"""
1536
A socket connection to a single Kafka broker
@@ -19,14 +40,14 @@ class KafkaConnection(local):
1940
we can do something in here to facilitate multiplexed requests/responses
2041
since the Kafka API includes a correlation id.
2142
"""
22-
def __init__(self, host, port, bufsize=4096):
43+
def __init__(self, host, port, bufsize=4096, timeout=10):
2344
super(KafkaConnection, self).__init__()
2445
self.host = host
2546
self.port = port
2647
self.bufsize = bufsize
27-
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
28-
self._sock.connect((host, port))
29-
self._sock.settimeout(10)
48+
self.timeout = timeout
49+
50+
self._sock = socket.create_connection((host, port), timeout=timeout)
3051
self._dirty = False
3152

3253
def __str__(self):
@@ -125,7 +146,5 @@ def reinit(self):
125146
Re-initialize the socket connection
126147
"""
127148
self.close()
128-
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
129-
self._sock.connect((self.host, self.port))
130-
self._sock.settimeout(10)
149+
self._sock = socket.create_connection((self.host, self.port), timeout=self.timeout)
131150
self._dirty = False

setup.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
import os.path
21
import sys
32

43
from setuptools import setup, Command
54

65

76
class Tox(Command):
7+
88
user_options = []
9+
910
def initialize_options(self):
1011
pass
1112

@@ -21,7 +22,7 @@ def run(self):
2122
name="kafka-python",
2223
version="0.8.1-1",
2324

24-
install_requires=["distribute", "tox"],
25+
install_requires=["distribute", "tox", "mock"],
2526
tests_require=["tox"],
2627
cmdclass={"test": Tox},
2728

test/test_integration.py

+14-9
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ class TestKafkaClient(unittest.TestCase):
1616
def setUpClass(cls): # noqa
1717
cls.zk = ZookeeperFixture.instance()
1818
cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
19-
cls.client = KafkaClient(cls.server.host, cls.server.port)
19+
cls.client = KafkaClient('%s:%d' % (cls.server.host, cls.server.port))
2020

2121
@classmethod
2222
def tearDownClass(cls): # noqa
@@ -554,7 +554,7 @@ def setUpClass(cls):
554554
cls.zk = ZookeeperFixture.instance()
555555
cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
556556
cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port)
557-
cls.client = KafkaClient(cls.server2.host, cls.server2.port, bufsize=8192)
557+
cls.client = KafkaClient('%s:%d' % (cls.server2.host, cls.server2.port), bufsize=8192)
558558

559559
@classmethod
560560
def tearDownClass(cls): # noqa
@@ -770,20 +770,23 @@ def test_large_messages(self):
770770
self.assertEquals(all_messages[i], message.message)
771771
self.assertEquals(i, 19)
772772

773+
773774
class TestFailover(unittest.TestCase):
774775

775776
@classmethod
776777
def setUpClass(cls):
777778

778779
zk_chroot = random_string(10)
779-
replicas = 2
780+
replicas = 2
780781
partitions = 2
781782

782783
# mini zookeeper, 2 kafka brokers
783-
cls.zk = ZookeeperFixture.instance()
784-
kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions]
784+
cls.zk = ZookeeperFixture.instance()
785+
kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions]
785786
cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
786-
cls.client = KafkaClient(cls.brokers[0].host, cls.brokers[0].port)
787+
788+
hosts = ','.join(['%s:%d' % (b.host, b.port) for b in cls.brokers])
789+
cls.client = KafkaClient(hosts)
787790

788791
@classmethod
789792
def tearDownClass(cls):
@@ -858,17 +861,19 @@ def _send_random_messages(self, producer, n):
858861
resp = producer.send_messages(random_string(10))
859862
if len(resp) > 0:
860863
self.assertEquals(resp[0].error, 0)
861-
time.sleep(1) # give it some time
864+
time.sleep(1) # give it some time
862865

863866
def _kill_leader(self, topic, partition):
864867
leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)]
865868
broker = self.brokers[leader.nodeId]
866869
broker.close()
867-
time.sleep(1) # give it some time
870+
time.sleep(1) # give it some time
868871
return broker
869872

870873
def _count_messages(self, group, topic):
871-
client = KafkaClient(self.brokers[0].host, self.brokers[0].port)
874+
875+
hosts = '%s:%d' % (self.brokers[0].host, self.brokers[0].port)
876+
client = KafkaClient(hosts)
872877
consumer = SimpleConsumer(client, group, topic, auto_commit=False)
873878
all_messages = []
874879
for message in consumer:

0 commit comments

Comments
 (0)