20
20
STOP_ASYNC_PRODUCER = - 1
21
21
22
22
23
- def _send_upstream (topic , queue , client , batch_time , batch_size ,
23
+ def _send_upstream (queue , client , batch_time , batch_size ,
24
24
req_acks , ack_timeout ):
25
25
"""
26
26
Listen on the queue for a specified number of messages or till
@@ -44,23 +44,24 @@ def _send_upstream(topic, queue, client, batch_time, batch_size,
44
44
# timeout is reached
45
45
while count > 0 and timeout >= 0 :
46
46
try :
47
- partition , msg = queue .get (timeout = timeout )
47
+ topic , partition , msg = queue .get (timeout = timeout )
48
+
48
49
except Empty :
49
50
break
50
51
51
52
# Check if the controller has requested us to stop
52
- if partition == STOP_ASYNC_PRODUCER :
53
+ if topic == STOP_ASYNC_PRODUCER :
53
54
stop = True
54
55
break
55
56
56
57
# Adjust the timeout to match the remaining period
57
58
count -= 1
58
59
timeout = send_at - time .time ()
59
- msgset [partition ].append (msg )
60
+ msgset [( topic , partition ) ].append (msg )
60
61
61
62
# Send collected requests upstream
62
63
reqs = []
63
- for partition , messages in msgset .items ():
64
+ for ( topic , partition ) , messages in msgset .items ():
64
65
req = ProduceRequest (topic , partition , messages )
65
66
reqs .append (req )
66
67
@@ -78,7 +79,6 @@ class Producer(object):
78
79
79
80
Params:
80
81
client - The Kafka client instance to use
81
- topic - The topic for sending messages to
82
82
async - If set to true, the messages are sent asynchronously via another
83
83
thread (process). We will not wait for a response to these
84
84
req_acks - A value indicating the acknowledgements that the server must
@@ -119,8 +119,7 @@ def __init__(self, client, async=False,
119
119
if self .async :
120
120
self .queue = Queue () # Messages are sent through this queue
121
121
self .proc = Process (target = _send_upstream ,
122
- args = (self .topic ,
123
- self .queue ,
122
+ args = (self .queue ,
124
123
self .client .copy (),
125
124
batch_send_every_t ,
126
125
batch_send_every_n ,
@@ -131,17 +130,17 @@ def __init__(self, client, async=False,
131
130
self .proc .daemon = True
132
131
self .proc .start ()
133
132
134
- def send_messages (self , partition , * msg ):
133
+ def send_messages (self , topic , partition , * msg ):
135
134
"""
136
135
Helper method to send produce requests
137
136
"""
138
137
if self .async :
139
138
for m in msg :
140
- self .queue .put ((partition , create_message (m )))
139
+ self .queue .put ((topic , partition , create_message (m )))
141
140
resp = []
142
141
else :
143
142
messages = [create_message (m ) for m in msg ]
144
- req = ProduceRequest (self . topic , partition , messages )
143
+ req = ProduceRequest (topic , partition , messages )
145
144
try :
146
145
resp = self .client .send_produce_request ([req ], acks = self .req_acks ,
147
146
timeout = self .ack_timeout )
@@ -156,7 +155,7 @@ def stop(self, timeout=1):
156
155
forcefully cleaning up.
157
156
"""
158
157
if self .async :
159
- self .queue .put ((STOP_ASYNC_PRODUCER , None ))
158
+ self .queue .put ((STOP_ASYNC_PRODUCER , None , None ))
160
159
self .proc .join (timeout )
161
160
162
161
if self .proc .is_alive ():
@@ -169,7 +168,6 @@ class SimpleProducer(Producer):
169
168
170
169
Params:
171
170
client - The Kafka client instance to use
172
- topic - The topic for sending messages to
173
171
async - If True, the messages are sent asynchronously via another
174
172
thread (process). We will not wait for a response to these
175
173
req_acks - A value indicating the acknowledgements that the server must
@@ -180,27 +178,31 @@ class SimpleProducer(Producer):
180
178
batch_send_every_n - If set, messages are send in batches of this size
181
179
batch_send_every_t - If set, messages are send after this timeout
182
180
"""
183
- def __init__ (self , client , topic , async = False ,
181
+ def __init__ (self , client , async = False ,
184
182
req_acks = Producer .ACK_AFTER_LOCAL_WRITE ,
185
183
ack_timeout = Producer .DEFAULT_ACK_TIMEOUT ,
186
184
batch_send = False ,
187
185
batch_send_every_n = BATCH_SEND_MSG_COUNT ,
188
186
batch_send_every_t = BATCH_SEND_DEFAULT_INTERVAL ):
189
- self .topic = topic
190
- client .load_metadata_for_topics (topic )
191
- self .next_partition = cycle (client .topic_partitions [topic ])
192
-
187
+ self .partition_cycles = {}
193
188
super (SimpleProducer , self ).__init__ (client , async , req_acks ,
194
189
ack_timeout , batch_send ,
195
190
batch_send_every_n ,
196
191
batch_send_every_t )
197
192
198
- def send_messages (self , * msg ):
199
- partition = self .next_partition .next ()
200
- return super (SimpleProducer , self ).send_messages (partition , * msg )
193
+ def _next_partition (self , topic ):
194
+ if topic not in self .partition_cycles :
195
+ if topic not in self .client .topic_partitions :
196
+ self .client .load_metadata_for_topics (topic )
197
+ self .partition_cycles [topic ] = cycle (self .client .topic_partitions [topic ])
198
+ return self .partition_cycles [topic ].next ()
199
+
200
+ def send_messages (self , topic , * msg ):
201
+ partition = self ._next_partition (topic )
202
+ return super (SimpleProducer , self ).send_messages (topic , partition , * msg )
201
203
202
204
def __repr__ (self ):
203
- return '<SimpleProducer topic=%s, batch=%s>' % ( self .topic , self . async )
205
+ return '<SimpleProducer batch=%s>' % self .async
204
206
205
207
206
208
class KeyedProducer (Producer ):
@@ -209,7 +211,6 @@ class KeyedProducer(Producer):
209
211
210
212
Args:
211
213
client - The kafka client instance
212
- topic - The kafka topic to send messages to
213
214
partitioner - A partitioner class that will be used to get the partition
214
215
to send the message to. Must be derived from Partitioner
215
216
async - If True, the messages are sent asynchronously via another
@@ -220,29 +221,34 @@ class KeyedProducer(Producer):
220
221
batch_send_every_n - If set, messages are send in batches of this size
221
222
batch_send_every_t - If set, messages are send after this timeout
222
223
"""
223
- def __init__ (self , client , topic , partitioner = None , async = False ,
224
+ def __init__ (self , client , partitioner = None , async = False ,
224
225
req_acks = Producer .ACK_AFTER_LOCAL_WRITE ,
225
226
ack_timeout = Producer .DEFAULT_ACK_TIMEOUT ,
226
227
batch_send = False ,
227
228
batch_send_every_n = BATCH_SEND_MSG_COUNT ,
228
229
batch_send_every_t = BATCH_SEND_DEFAULT_INTERVAL ):
229
- self .topic = topic
230
- client .load_metadata_for_topics (topic )
231
-
232
230
if not partitioner :
233
231
partitioner = HashedPartitioner
234
-
235
- self .partitioner = partitioner ( client . topic_partitions [ topic ])
232
+ self . partitioner_class = partitioner
233
+ self .partitioners = {}
236
234
237
235
super (KeyedProducer , self ).__init__ (client , async , req_acks ,
238
236
ack_timeout , batch_send ,
239
237
batch_send_every_n ,
240
238
batch_send_every_t )
241
239
242
- def send (self , key , msg ):
243
- partitions = self .client .topic_partitions [self .topic ]
244
- partition = self .partitioner .partition (key , partitions )
245
- return self .send_messages (partition , msg )
240
+ def _next_partition (self , topic , key ):
241
+ if topic not in self .partitioners :
242
+ if topic not in self .client .topic_partitions :
243
+ self .client .load_metadata_for_topics (topic )
244
+ self .partitioners [topic ] = \
245
+ self .partitioner_class (self .client .topic_partitions [topic ])
246
+ partitioner = self .partitioners [topic ]
247
+ return partitioner .partition (key , self .client .topic_partitions [topic ])
248
+
249
+ def send (self , topic , key , msg ):
250
+ partition = self ._next_partition (topic , key )
251
+ return self .send_messages (topic , partition , msg )
246
252
247
253
def __repr__ (self ):
248
- return '<KeyedProducer topic=%s, batch=%s>' % ( self .topic , self . async )
254
+ return '<KeyedProducer batch=%s>' % self .async
0 commit comments