Skip to content

Commit d3cf18a

Browse files
committed
Fix bugs and testing
* Ensure that round-robin partitioner works fine * _load_metadata_for_topics() would cause duplicate and stale entries in self.topic_partitions. Fix this
1 parent ac76520 commit d3cf18a

File tree

3 files changed

+14
-5
lines changed

3 files changed

+14
-5
lines changed

kafka/client.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,12 @@ def _load_metadata_for_topics(self, *topics):
7373

7474
self.brokers.update(brokers)
7575
self.topics_to_brokers = {}
76+
7677
for topic, partitions in topics.items():
78+
# Clear the list once before we add it. This removes stale entries
79+
# and avoids duplicates
80+
self.topic_partitions.pop(topic, None)
81+
7782
if not partitions:
7883
log.info("Partition is unassigned, delay for 1s and retry")
7984
time.sleep(1)

kafka/partitioner.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,14 +31,18 @@ class RoundRobinPartitioner(Partitioner):
3131
in a round robin fashion
3232
"""
3333
def __init__(self, partitions):
34-
self.partitions = cycle(partitions)
34+
self._set_partitions(partitions)
35+
36+
def _set_partitions(self, partitions):
37+
self.partitions = partitions
38+
self.iterpart = cycle(partitions)
3539

3640
def partition(self, key, partitions):
3741
# Refresh the partition list if necessary
3842
if self.partitions != partitions:
39-
self.partitions = cycle(partitions)
43+
self._set_partitions(partitions)
4044

41-
return self.partitions.next()
45+
return self.iterpart.next()
4246

4347

4448
class HashedPartitioner(Partitioner):

kafka/producer.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ def __init__(self, client, topic, partitioner=None):
4646

4747
self.partitioner = partitioner(self.client.topic_partitions[topic])
4848

49-
def send(self, client, key, msg):
50-
partitions = self.client.topic_partitions[topic]
49+
def send(self, key, msg):
50+
partitions = self.client.topic_partitions[self.topic]
5151
partition = self.partitioner.partition(key, partitions)
5252

5353
req = ProduceRequest(self.topic, partition,

0 commit comments

Comments
 (0)