Skip to content

Commit 75de0f0

Browse files
committed
Ensure that async producer works in windows. Fixes dpkp#46
As per the multiprocessing module's documentation, the objects passed to the Process() class must be pickle-able in Windows. So, the Async producer did not work in windows. To fix this we have to ensure that code which uses multiprocessing has to follow certain rules * The target=func should not be a member function * We cannot pass objects like socket() to multiprocessing This ticket fixes these issues. For KafkaClient and KafkaConnection objects, we make copies of the object and reinit() them inside the child processes.
1 parent cfd9f86 commit 75de0f0

File tree

3 files changed

+87
-51
lines changed

3 files changed

+87
-51
lines changed

kafka/client.py

+11
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import copy
12
from collections import defaultdict
23
from functools import partial
34
from itertools import count
@@ -181,6 +182,16 @@ def close(self):
181182
for conn in self.conns.values():
182183
conn.close()
183184

185+
def copy(self):
186+
"""
187+
Create an inactive copy of the client object
188+
A reinit() has to be done on the copy before it can be used again
189+
"""
190+
c = copy.deepcopy(self)
191+
for k, v in c.conns.items():
192+
c.conns[k] = v.copy()
193+
return c
194+
184195
def reinit(self):
185196
for conn in self.conns.values():
186197
conn.reinit()

kafka/conn.py

+13-2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import copy
12
import logging
23
import socket
34
import struct
@@ -96,17 +97,27 @@ def recv(self, request_id):
9697
self.data = self._consume_response()
9798
return self.data
9899

100+
def copy(self):
101+
"""
102+
Create an inactive copy of the connection object
103+
A reinit() has to be done on the copy before it can be used again
104+
"""
105+
c = copy.deepcopy(self)
106+
c._sock = None
107+
return c
108+
99109
def close(self):
100110
"""
101111
Close this connection
102112
"""
103-
self._sock.close()
113+
if self._sock:
114+
self._sock.close()
104115

105116
def reinit(self):
106117
"""
107118
Re-initialize the socket connection
108119
"""
109-
self._sock.close()
120+
self.close()
110121
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
111122
self._sock.connect((self.host, self.port))
112123
self._sock.settimeout(10)

kafka/producer.py

+63-49
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,58 @@
1818
STOP_ASYNC_PRODUCER = -1
1919

2020

21+
def _send_upstream(topic, queue, client, batch_time, batch_size,
22+
req_acks, ack_timeout):
23+
"""
24+
Listen on the queue for a specified number of messages or till
25+
a specified timeout and send them upstream to the brokers in one
26+
request
27+
28+
NOTE: Ideally, this should have been a method inside the Producer
29+
class. However, multiprocessing module has issues in windows. The
30+
functionality breaks unless this function is kept outside of a class
31+
"""
32+
stop = False
33+
client.reinit()
34+
35+
while not stop:
36+
timeout = batch_time
37+
count = batch_size
38+
send_at = datetime.now() + timedelta(seconds=timeout)
39+
msgset = defaultdict(list)
40+
41+
# Keep fetching till we gather enough messages or a
42+
# timeout is reached
43+
while count > 0 and timeout >= 0:
44+
try:
45+
partition, msg = queue.get(timeout=timeout)
46+
except Empty:
47+
break
48+
49+
# Check if the controller has requested us to stop
50+
if partition == STOP_ASYNC_PRODUCER:
51+
stop = True
52+
break
53+
54+
# Adjust the timeout to match the remaining period
55+
count -= 1
56+
timeout = (send_at - datetime.now()).total_seconds()
57+
msgset[partition].append(msg)
58+
59+
# Send collected requests upstream
60+
reqs = []
61+
for partition, messages in msgset.items():
62+
req = ProduceRequest(topic, partition, messages)
63+
reqs.append(req)
64+
65+
try:
66+
client.send_produce_request(reqs,
67+
acks=req_acks,
68+
timeout=ack_timeout)
69+
except Exception as exp:
70+
log.error("Error sending message", exc_info=sys.exc_info())
71+
72+
2173
class Producer(object):
2274
"""
2375
Base class to be used by producers
@@ -61,60 +113,22 @@ def __init__(self, client, async=False,
61113
self.async = async
62114
self.req_acks = req_acks
63115
self.ack_timeout = ack_timeout
64-
self.batch_send = batch_send
65-
self.batch_size = batch_send_every_n
66-
self.batch_time = batch_send_every_t
67116

68117
if self.async:
69118
self.queue = Queue() # Messages are sent through this queue
70-
self.proc = Process(target=self._send_upstream, args=(self.queue,))
71-
self.proc.daemon = True # Process will die if main thread exits
119+
self.proc = Process(target=_send_upstream,
120+
args=(self.topic,
121+
self.queue,
122+
self.client.copy(),
123+
batch_send_every_t,
124+
batch_send_every_n,
125+
self.req_acks,
126+
self.ack_timeout))
127+
128+
# Process will die if main thread exits
129+
self.proc.daemon = True
72130
self.proc.start()
73131

74-
def _send_upstream(self, queue):
75-
"""
76-
Listen on the queue for a specified number of messages or till
77-
a specified timeout and send them upstream to the brokers in one
78-
request
79-
"""
80-
stop = False
81-
82-
while not stop:
83-
timeout = self.batch_time
84-
send_at = datetime.now() + timedelta(seconds=timeout)
85-
count = self.batch_size
86-
msgset = defaultdict(list)
87-
88-
# Keep fetching till we gather enough messages or a
89-
# timeout is reached
90-
while count > 0 and timeout >= 0:
91-
try:
92-
partition, msg = queue.get(timeout=timeout)
93-
except Empty:
94-
break
95-
96-
# Check if the controller has requested us to stop
97-
if partition == STOP_ASYNC_PRODUCER:
98-
stop = True
99-
break
100-
101-
# Adjust the timeout to match the remaining period
102-
count -= 1
103-
timeout = (send_at - datetime.now()).total_seconds()
104-
msgset[partition].append(msg)
105-
106-
# Send collected requests upstream
107-
reqs = []
108-
for partition, messages in msgset.items():
109-
req = ProduceRequest(self.topic, partition, messages)
110-
reqs.append(req)
111-
112-
try:
113-
self.client.send_produce_request(reqs, acks=self.req_acks,
114-
timeout=self.ack_timeout)
115-
except Exception:
116-
log.error("Error sending message", exc_info=sys.exc_info())
117-
118132
def send_messages(self, partition, *msg):
119133
"""
120134
Helper method to send produce requests

0 commit comments

Comments
 (0)