8
8
from itertools import cycle
9
9
from multiprocessing import Queue , Process
10
10
11
- from kafka .common import ProduceRequest
11
+ from kafka .common import ProduceRequest , TopicAndPartition
12
12
from kafka .partitioner import HashedPartitioner
13
13
from kafka .protocol import create_message
14
14
@@ -44,25 +44,27 @@ def _send_upstream(queue, client, batch_time, batch_size,
44
44
# timeout is reached
45
45
while count > 0 and timeout >= 0 :
46
46
try :
47
- topic , partition , msg = queue .get (timeout = timeout )
47
+ topic_partition , msg = queue .get (timeout = timeout )
48
48
49
49
except Empty :
50
50
break
51
51
52
52
# Check if the controller has requested us to stop
53
- if topic == STOP_ASYNC_PRODUCER :
53
+ if topic_partition == STOP_ASYNC_PRODUCER :
54
54
stop = True
55
55
break
56
56
57
57
# Adjust the timeout to match the remaining period
58
58
count -= 1
59
59
timeout = send_at - time .time ()
60
- msgset [( topic , partition ) ].append (msg )
60
+ msgset [topic_partition ].append (msg )
61
61
62
62
# Send collected requests upstream
63
63
reqs = []
64
- for (topic , partition ), messages in msgset .items ():
65
- req = ProduceRequest (topic , partition , messages )
64
+ for topic_partition , messages in msgset .items ():
65
+ req = ProduceRequest (topic_partition .topic ,
66
+ topic_partition .partition ,
67
+ messages )
66
68
reqs .append (req )
67
69
68
70
try :
@@ -136,7 +138,8 @@ def send_messages(self, topic, partition, *msg):
136
138
"""
137
139
if self .async :
138
140
for m in msg :
139
- self .queue .put ((topic , partition , create_message (m )))
141
+ self .queue .put ((TopicAndPartition (topic , partition ),
142
+ create_message (m )))
140
143
resp = []
141
144
else :
142
145
messages = [create_message (m ) for m in msg ]
@@ -155,7 +158,7 @@ def stop(self, timeout=1):
155
158
forcefully cleaning up.
156
159
"""
157
160
if self .async :
158
- self .queue .put ((STOP_ASYNC_PRODUCER , None , None ))
161
+ self .queue .put ((STOP_ASYNC_PRODUCER , None ))
159
162
self .proc .join (timeout )
160
163
161
164
if self .proc .is_alive ():
0 commit comments