Skip to content

Commit 33cde52

Browse files
committed
Merge pull request dpkp#62 from mahendra/cons-windows
Ensure that multiprocess consumer works in windows
2 parents 92d70e7 + 6e3ee64 commit 33cde52

File tree

1 file changed

+63
-53
lines changed

1 file changed

+63
-53
lines changed

kafka/consumer.py

+63-53
Original file line numberDiff line numberDiff line change
@@ -404,6 +404,63 @@ def __iter_partition__(self, partition, offset):
404404
offset = next_offset + 1
405405

406406

407+
def _mp_consume(client, group, topic, chunk, queue, start, exit, pause, size):
408+
"""
409+
A child process worker which consumes messages based on the
410+
notifications given by the controller process
411+
412+
NOTE: Ideally, this should have been a method inside the Consumer
413+
class. However, multiprocessing module has issues in windows. The
414+
functionality breaks unless this function is kept outside of a class
415+
"""
416+
417+
# Make the child processes open separate socket connections
418+
client.reinit()
419+
420+
# We will start consumers without auto-commit. Auto-commit will be
421+
# done by the master controller process.
422+
consumer = SimpleConsumer(client, group, topic,
423+
partitions=chunk,
424+
auto_commit=False,
425+
auto_commit_every_n=None,
426+
auto_commit_every_t=None)
427+
428+
# Ensure that the consumer provides the partition information
429+
consumer.provide_partition_info()
430+
431+
while True:
432+
# Wait till the controller indicates us to start consumption
433+
start.wait()
434+
435+
# If we are asked to quit, do so
436+
if exit.is_set():
437+
break
438+
439+
# Consume messages and add them to the queue. If the controller
440+
# indicates a specific number of messages, follow that advice
441+
count = 0
442+
443+
for partition, message in consumer:
444+
queue.put((partition, message))
445+
count += 1
446+
447+
# We have reached the required size. The controller might have
448+
# more than what he needs. Wait for a while.
449+
# Without this logic, it is possible that we run into a big
450+
# loop consuming all available messages before the controller
451+
# can reset the 'start' event
452+
if count == size.value:
453+
pause.wait()
454+
break
455+
456+
# In case we did not receive any message, give up the CPU for
457+
# a while before we try again
458+
if count == 0:
459+
time.sleep(0.1)
460+
461+
consumer.stop()
462+
463+
407464
class MultiProcessConsumer(Consumer):
408465
"""
409466
A consumer implementation that consumes partitions for a topic in
@@ -468,63 +525,16 @@ def __init__(self, client, group, topic, auto_commit=True,
468525
self.procs = []
469526
for chunk in chunks:
470527
chunk = filter(lambda x: x is not None, chunk)
471-
proc = Process(target=self._consume, args=(chunk,))
528+
args = (client.copy(),
529+
group, topic, chunk,
530+
self.queue, self.start, self.exit,
531+
self.pause, self.size)
532+
533+
proc = Process(target=_mp_consume, args=args)
472534
proc.daemon = True
473535
proc.start()
474536
self.procs.append(proc)
475537

476-
def _consume(self, partitions):
477-
"""
478-
A child process worker which consumes messages based on the
479-
notifications given by the controller process
480-
"""
481-
482-
# Make the child processes open separate socket connections
483-
self.client.reinit()
484-
485-
# We will start consumers without auto-commit. Auto-commit will be
486-
# done by the master controller process.
487-
consumer = SimpleConsumer(self.client, self.group, self.topic,
488-
partitions=partitions,
489-
auto_commit=False,
490-
auto_commit_every_n=None,
491-
auto_commit_every_t=None)
492-
493-
# Ensure that the consumer provides the partition information
494-
consumer.provide_partition_info()
495-
496-
while True:
497-
# Wait till the controller indicates us to start consumption
498-
self.start.wait()
499-
500-
# If we are asked to quit, do so
501-
if self.exit.is_set():
502-
break
503-
504-
# Consume messages and add them to the queue. If the controller
505-
# indicates a specific number of messages, follow that advice
506-
count = 0
507-
508-
for partition, message in consumer:
509-
self.queue.put((partition, message))
510-
count += 1
511-
512-
# We have reached the required size. The controller might have
513-
# more than what he needs. Wait for a while.
514-
# Without this logic, it is possible that we run into a big
515-
# loop consuming all available messages before the controller
516-
# can reset the 'start' event
517-
if count == self.size.value:
518-
self.pause.wait()
519-
break
520-
521-
# In case we did not receive any message, give up the CPU for
522-
# a while before we try again
523-
if count == 0:
524-
time.sleep(0.1)
525-
526-
consumer.stop()
527-
528538
def stop(self):
529539
# Set exit and start off all waiting consumers
530540
self.exit.set()

0 commit comments

Comments
 (0)