Skip to content

Commit 92d70e7

Browse files
committed
Merge pull request dpkp#61 from mahendra/prod-windows
Ensure that async producer works in windows. Fixes dpkp#46
2 parents eb2c173 + f9cf628 commit 92d70e7

File tree

3 files changed

+87
-51
lines changed

3 files changed

+87
-51
lines changed

kafka/client.py

+11
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import copy
12
from collections import defaultdict
23
from functools import partial
34
from itertools import count
@@ -193,6 +194,16 @@ def close(self):
193194
for conn in self.conns.values():
194195
conn.close()
195196

197+
def copy(self):
198+
"""
199+
Create an inactive copy of the client object
200+
A reinit() has to be done on the copy before it can be used again
201+
"""
202+
c = copy.deepcopy(self)
203+
for k, v in c.conns.items():
204+
c.conns[k] = v.copy()
205+
return c
206+
196207
def reinit(self):
197208
for conn in self.conns.values():
198209
conn.reinit()

kafka/conn.py

+13-2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import copy
12
import logging
23
import socket
34
import struct
@@ -103,17 +104,27 @@ def recv(self, request_id):
103104
self.data = self._consume_response()
104105
return self.data
105106

107+
def copy(self):
108+
"""
109+
Create an inactive copy of the connection object
110+
A reinit() has to be done on the copy before it can be used again
111+
"""
112+
c = copy.deepcopy(self)
113+
c._sock = None
114+
return c
115+
106116
def close(self):
107117
"""
108118
Close this connection
109119
"""
110-
self._sock.close()
120+
if self._sock:
121+
self._sock.close()
111122

112123
def reinit(self):
113124
"""
114125
Re-initialize the socket connection
115126
"""
116-
self._sock.close()
127+
self.close()
117128
self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
118129
self._sock.connect((self.host, self.port))
119130
self._sock.settimeout(10)

kafka/producer.py

+63-49
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,58 @@
1919
STOP_ASYNC_PRODUCER = -1
2020

2121

22+
def _send_upstream(topic, queue, client, batch_time, batch_size,
23+
req_acks, ack_timeout):
24+
"""
25+
Listen on the queue for a specified number of messages or till
26+
a specified timeout and send them upstream to the brokers in one
27+
request
28+
29+
NOTE: Ideally, this should have been a method inside the Producer
30+
class. However, multiprocessing module has issues in windows. The
31+
functionality breaks unless this function is kept outside of a class
32+
"""
33+
stop = False
34+
client.reinit()
35+
36+
while not stop:
37+
timeout = batch_time
38+
count = batch_size
39+
send_at = datetime.now() + timedelta(seconds=timeout)
40+
msgset = defaultdict(list)
41+
42+
# Keep fetching till we gather enough messages or a
43+
# timeout is reached
44+
while count > 0 and timeout >= 0:
45+
try:
46+
partition, msg = queue.get(timeout=timeout)
47+
except Empty:
48+
break
49+
50+
# Check if the controller has requested us to stop
51+
if partition == STOP_ASYNC_PRODUCER:
52+
stop = True
53+
break
54+
55+
# Adjust the timeout to match the remaining period
56+
count -= 1
57+
timeout = (send_at - datetime.now()).total_seconds()
58+
msgset[partition].append(msg)
59+
60+
# Send collected requests upstream
61+
reqs = []
62+
for partition, messages in msgset.items():
63+
req = ProduceRequest(topic, partition, messages)
64+
reqs.append(req)
65+
66+
try:
67+
client.send_produce_request(reqs,
68+
acks=req_acks,
69+
timeout=ack_timeout)
70+
except Exception as exp:
71+
log.exception("Unable to send message")
72+
73+
2274
class Producer(object):
2375
"""
2476
Base class to be used by producers
@@ -62,60 +114,22 @@ def __init__(self, client, async=False,
62114
self.async = async
63115
self.req_acks = req_acks
64116
self.ack_timeout = ack_timeout
65-
self.batch_send = batch_send
66-
self.batch_size = batch_send_every_n
67-
self.batch_time = batch_send_every_t
68117

69118
if self.async:
70119
self.queue = Queue() # Messages are sent through this queue
71-
self.proc = Process(target=self._send_upstream, args=(self.queue,))
72-
self.proc.daemon = True # Process will die if main thread exits
120+
self.proc = Process(target=_send_upstream,
121+
args=(self.topic,
122+
self.queue,
123+
self.client.copy(),
124+
batch_send_every_t,
125+
batch_send_every_n,
126+
self.req_acks,
127+
self.ack_timeout))
128+
129+
# Process will die if main thread exits
130+
self.proc.daemon = True
73131
self.proc.start()
74132

75-
def _send_upstream(self, queue):
76-
"""
77-
Listen on the queue for a specified number of messages or till
78-
a specified timeout and send them upstream to the brokers in one
79-
request
80-
"""
81-
stop = False
82-
83-
while not stop:
84-
timeout = self.batch_time
85-
send_at = datetime.now() + timedelta(seconds=timeout)
86-
count = self.batch_size
87-
msgset = defaultdict(list)
88-
89-
# Keep fetching till we gather enough messages or a
90-
# timeout is reached
91-
while count > 0 and timeout >= 0:
92-
try:
93-
partition, msg = queue.get(timeout=timeout)
94-
except Empty:
95-
break
96-
97-
# Check if the controller has requested us to stop
98-
if partition == STOP_ASYNC_PRODUCER:
99-
stop = True
100-
break
101-
102-
# Adjust the timeout to match the remaining period
103-
count -= 1
104-
timeout = (send_at - datetime.now()).total_seconds()
105-
msgset[partition].append(msg)
106-
107-
# Send collected requests upstream
108-
reqs = []
109-
for partition, messages in msgset.items():
110-
req = ProduceRequest(self.topic, partition, messages)
111-
reqs.append(req)
112-
113-
try:
114-
self.client.send_produce_request(reqs, acks=self.req_acks,
115-
timeout=self.ack_timeout)
116-
except Exception:
117-
log.exception("Unable to send message")
118-
119133
def send_messages(self, partition, *msg):
120134
"""
121135
Helper method to send produce requests

0 commit comments

Comments
 (0)