Skip to content

Commit 4abf7ee

Browse files
committed
Merge pull request dpkp#111 from rdiomar/multitopic_producers
Make producers take a topic argument at send rather than init time -- fixes Issue dpkp#110, but breaks backwards compatibility with previous Producer interface.
2 parents c9d9d0a + f6df696 commit 4abf7ee

File tree

3 files changed

+98
-89
lines changed

3 files changed

+98
-89
lines changed

README.md

+13-13
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ Copyright 2013, David Arthur under Apache License, v2.0. See `LICENSE`
1717

1818
# Status
1919

20-
The current version of this package is **0.9.0** and is compatible with
20+
The current version of this package is **0.9.0** and is compatible with
2121
Kafka brokers running version **0.8.1**.
2222

2323
# Usage
@@ -32,24 +32,24 @@ from kafka.producer import SimpleProducer, KeyedProducer
3232
kafka = KafkaClient("localhost", 9092)
3333

3434
# To send messages synchronously
35-
producer = SimpleProducer(kafka, "my-topic")
36-
producer.send_messages("some message")
37-
producer.send_messages("this method", "is variadic")
35+
producer = SimpleProducer(kafka)
36+
producer.send_messages("my-topic", "some message")
37+
producer.send_messages("my-topic", "this method", "is variadic")
3838

3939
# To send messages asynchronously
40-
producer = SimpleProducer(kafka, "my-topic", async=True)
41-
producer.send_messages("async message")
40+
producer = SimpleProducer(kafka, async=True)
41+
producer.send_messages("my-topic", "async message")
4242

4343
# To wait for acknowledgements
4444
# ACK_AFTER_LOCAL_WRITE : server will wait till the data is written to
4545
# a local log before sending response
4646
# ACK_AFTER_CLUSTER_COMMIT : server will block until the message is committed
4747
# by all in sync replicas before sending a response
48-
producer = SimpleProducer(kafka, "my-topic", async=False,
48+
producer = SimpleProducer(kafka, async=False,
4949
req_acks=SimpleProducer.ACK_AFTER_LOCAL_WRITE,
5050
ack_timeout=2000)
5151

52-
response = producer.send_messages("async message")
52+
response = producer.send_messages("my-topic", "async message")
5353

5454
if response:
5555
print(response[0].error)
@@ -62,7 +62,7 @@ if response:
6262
# Notes:
6363
# * If the producer dies before the messages are sent, there will be losses
6464
# * Call producer.stop() to send the messages and cleanup
65-
producer = SimpleProducer(kafka, "my-topic", batch_send=True,
65+
producer = SimpleProducer(kafka, batch_send=True,
6666
batch_send_every_n=20,
6767
batch_send_every_t=60)
6868

@@ -83,11 +83,11 @@ from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner
8383
kafka = KafkaClient("localhost", 9092)
8484

8585
# HashedPartitioner is default
86-
producer = KeyedProducer(kafka, "my-topic")
87-
producer.send("key1", "some message")
88-
producer.send("key2", "this methode")
86+
producer = KeyedProducer(kafka)
87+
producer.send("my-topic", "key1", "some message")
88+
producer.send("my-topic", "key2", "this methode")
8989

90-
producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner)
90+
producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
9191
```
9292

9393
## Multiprocess consumer

kafka/producer.py

+44-35
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from itertools import cycle
99
from multiprocessing import Queue, Process
1010

11-
from kafka.common import ProduceRequest
11+
from kafka.common import ProduceRequest, TopicAndPartition
1212
from kafka.partitioner import HashedPartitioner
1313
from kafka.protocol import create_message
1414

@@ -20,7 +20,7 @@
2020
STOP_ASYNC_PRODUCER = -1
2121

2222

23-
def _send_upstream(topic, queue, client, batch_time, batch_size,
23+
def _send_upstream(queue, client, batch_time, batch_size,
2424
req_acks, ack_timeout):
2525
"""
2626
Listen on the queue for a specified number of messages or till
@@ -44,24 +44,27 @@ def _send_upstream(topic, queue, client, batch_time, batch_size,
4444
# timeout is reached
4545
while count > 0 and timeout >= 0:
4646
try:
47-
partition, msg = queue.get(timeout=timeout)
47+
topic_partition, msg = queue.get(timeout=timeout)
48+
4849
except Empty:
4950
break
5051

5152
# Check if the controller has requested us to stop
52-
if partition == STOP_ASYNC_PRODUCER:
53+
if topic_partition == STOP_ASYNC_PRODUCER:
5354
stop = True
5455
break
5556

5657
# Adjust the timeout to match the remaining period
5758
count -= 1
5859
timeout = send_at - time.time()
59-
msgset[partition].append(msg)
60+
msgset[topic_partition].append(msg)
6061

6162
# Send collected requests upstream
6263
reqs = []
63-
for partition, messages in msgset.items():
64-
req = ProduceRequest(topic, partition, messages)
64+
for topic_partition, messages in msgset.items():
65+
req = ProduceRequest(topic_partition.topic,
66+
topic_partition.partition,
67+
messages)
6568
reqs.append(req)
6669

6770
try:
@@ -78,7 +81,6 @@ class Producer(object):
7881
7982
Params:
8083
client - The Kafka client instance to use
81-
topic - The topic for sending messages to
8284
async - If set to true, the messages are sent asynchronously via another
8385
thread (process). We will not wait for a response to these
8486
req_acks - A value indicating the acknowledgements that the server must
@@ -119,8 +121,7 @@ def __init__(self, client, async=False,
119121
if self.async:
120122
self.queue = Queue() # Messages are sent through this queue
121123
self.proc = Process(target=_send_upstream,
122-
args=(self.topic,
123-
self.queue,
124+
args=(self.queue,
124125
self.client.copy(),
125126
batch_send_every_t,
126127
batch_send_every_n,
@@ -131,17 +132,18 @@ def __init__(self, client, async=False,
131132
self.proc.daemon = True
132133
self.proc.start()
133134

134-
def send_messages(self, partition, *msg):
135+
def send_messages(self, topic, partition, *msg):
135136
"""
136137
Helper method to send produce requests
137138
"""
138139
if self.async:
139140
for m in msg:
140-
self.queue.put((partition, create_message(m)))
141+
self.queue.put((TopicAndPartition(topic, partition),
142+
create_message(m)))
141143
resp = []
142144
else:
143145
messages = [create_message(m) for m in msg]
144-
req = ProduceRequest(self.topic, partition, messages)
146+
req = ProduceRequest(topic, partition, messages)
145147
try:
146148
resp = self.client.send_produce_request([req], acks=self.req_acks,
147149
timeout=self.ack_timeout)
@@ -169,7 +171,6 @@ class SimpleProducer(Producer):
169171
170172
Params:
171173
client - The Kafka client instance to use
172-
topic - The topic for sending messages to
173174
async - If True, the messages are sent asynchronously via another
174175
thread (process). We will not wait for a response to these
175176
req_acks - A value indicating the acknowledgements that the server must
@@ -180,27 +181,31 @@ class SimpleProducer(Producer):
180181
batch_send_every_n - If set, messages are send in batches of this size
181182
batch_send_every_t - If set, messages are send after this timeout
182183
"""
183-
def __init__(self, client, topic, async=False,
184+
def __init__(self, client, async=False,
184185
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
185186
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
186187
batch_send=False,
187188
batch_send_every_n=BATCH_SEND_MSG_COUNT,
188189
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
189-
self.topic = topic
190-
client.load_metadata_for_topics(topic)
191-
self.next_partition = cycle(client.topic_partitions[topic])
192-
190+
self.partition_cycles = {}
193191
super(SimpleProducer, self).__init__(client, async, req_acks,
194192
ack_timeout, batch_send,
195193
batch_send_every_n,
196194
batch_send_every_t)
197195

198-
def send_messages(self, *msg):
199-
partition = self.next_partition.next()
200-
return super(SimpleProducer, self).send_messages(partition, *msg)
196+
def _next_partition(self, topic):
197+
if topic not in self.partition_cycles:
198+
if topic not in self.client.topic_partitions:
199+
self.client.load_metadata_for_topics(topic)
200+
self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic])
201+
return self.partition_cycles[topic].next()
202+
203+
def send_messages(self, topic, *msg):
204+
partition = self._next_partition(topic)
205+
return super(SimpleProducer, self).send_messages(topic, partition, *msg)
201206

202207
def __repr__(self):
203-
return '<SimpleProducer topic=%s, batch=%s>' % (self.topic, self.async)
208+
return '<SimpleProducer batch=%s>' % self.async
204209

205210

206211
class KeyedProducer(Producer):
@@ -209,7 +214,6 @@ class KeyedProducer(Producer):
209214
210215
Args:
211216
client - The kafka client instance
212-
topic - The kafka topic to send messages to
213217
partitioner - A partitioner class that will be used to get the partition
214218
to send the message to. Must be derived from Partitioner
215219
async - If True, the messages are sent asynchronously via another
@@ -220,29 +224,34 @@ class KeyedProducer(Producer):
220224
batch_send_every_n - If set, messages are send in batches of this size
221225
batch_send_every_t - If set, messages are send after this timeout
222226
"""
223-
def __init__(self, client, topic, partitioner=None, async=False,
227+
def __init__(self, client, partitioner=None, async=False,
224228
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
225229
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
226230
batch_send=False,
227231
batch_send_every_n=BATCH_SEND_MSG_COUNT,
228232
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
229-
self.topic = topic
230-
client.load_metadata_for_topics(topic)
231-
232233
if not partitioner:
233234
partitioner = HashedPartitioner
234-
235-
self.partitioner = partitioner(client.topic_partitions[topic])
235+
self.partitioner_class = partitioner
236+
self.partitioners = {}
236237

237238
super(KeyedProducer, self).__init__(client, async, req_acks,
238239
ack_timeout, batch_send,
239240
batch_send_every_n,
240241
batch_send_every_t)
241242

242-
def send(self, key, msg):
243-
partitions = self.client.topic_partitions[self.topic]
244-
partition = self.partitioner.partition(key, partitions)
245-
return self.send_messages(partition, msg)
243+
def _next_partition(self, topic, key):
244+
if topic not in self.partitioners:
245+
if topic not in self.client.topic_partitions:
246+
self.client.load_metadata_for_topics(topic)
247+
self.partitioners[topic] = \
248+
self.partitioner_class(self.client.topic_partitions[topic])
249+
partitioner = self.partitioners[topic]
250+
return partitioner.partition(key, self.client.topic_partitions[topic])
251+
252+
def send(self, topic, key, msg):
253+
partition = self._next_partition(topic, key)
254+
return self.send_messages(topic, partition, msg)
246255

247256
def __repr__(self):
248-
return '<KeyedProducer topic=%s, batch=%s>' % (self.topic, self.async)
257+
return '<KeyedProducer batch=%s>' % self.async

0 commit comments

Comments
 (0)