Skip to content

Commit 883eed1

Browse files
committed
Merge pull request dpkp#32 from mahendra/keyed
Implement support for keyed messages
2 parents 77b8301 + 0723cf8 commit 883eed1

File tree

4 files changed

+109
-0
lines changed

4 files changed

+109
-0
lines changed

README.md

+16
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,22 @@ for message in consumer:
4141
kafka.close()
4242
```
4343

44+
## Keyed messages
45+
```python
46+
from kafka.client import KafkaClient
47+
from kafka.producer import KeyedProducer
48+
from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner
49+
50+
kafka = KafkaClient("localhost", 9092)
51+
52+
# HashedPartitioner is default
53+
producer = KeyedProducer(kafka, "my-topic")
54+
producer.send("key1", "some message")
55+
producer.send("key2", "this methode")
56+
57+
producer = KeyedProducer(kafka, "my-topic", partitioner=RoundRobinPartitioner)
58+
```
59+
4460
## Low level
4561

4662
```python

kafka/client.py

+5
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,12 @@ def _load_metadata_for_topics(self, *topics):
7373

7474
self.brokers.update(brokers)
7575
self.topics_to_brokers = {}
76+
7677
for topic, partitions in topics.items():
78+
# Clear the list once before we add it. This removes stale entries
79+
# and avoids duplicates
80+
self.topic_partitions.pop(topic, None)
81+
7782
if not partitions:
7883
log.info("Partition is unassigned, delay for 1s and retry")
7984
time.sleep(1)

kafka/partitioner.py

+56
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
from itertools import cycle
2+
3+
4+
class Partitioner(object):
5+
"""
6+
Base class for a partitioner
7+
"""
8+
def __init__(self, partitions):
9+
"""
10+
Initialize the partitioner
11+
12+
partitions - A list of available partitions (during startup)
13+
"""
14+
self.partitions = partitions
15+
16+
def partition(self, key, partitions):
17+
"""
18+
Takes a string key and num_partitions as argument and returns
19+
a partition to be used for the message
20+
21+
partitions - The list of partitions is passed in every call. This
22+
may look like an overhead, but it will be useful
23+
(in future) when we handle cases like rebalancing
24+
"""
25+
raise NotImplemented('partition function has to be implemented')
26+
27+
28+
class RoundRobinPartitioner(Partitioner):
29+
"""
30+
Implements a round robin partitioner which sends data to partitions
31+
in a round robin fashion
32+
"""
33+
def __init__(self, partitions):
34+
self._set_partitions(partitions)
35+
36+
def _set_partitions(self, partitions):
37+
self.partitions = partitions
38+
self.iterpart = cycle(partitions)
39+
40+
def partition(self, key, partitions):
41+
# Refresh the partition list if necessary
42+
if self.partitions != partitions:
43+
self._set_partitions(partitions)
44+
45+
return self.iterpart.next()
46+
47+
48+
class HashedPartitioner(Partitioner):
49+
"""
50+
Implements a partitioner which selects the target partition based on
51+
the hash of the key
52+
"""
53+
def partition(self, key, partitions):
54+
size = len(partitions)
55+
idx = hash(key) % size
56+
return partitions[idx]

kafka/producer.py

+32
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from kafka.common import ProduceRequest
55
from kafka.protocol import create_message
6+
from kafka.partitioner import HashedPartitioner
67

78
log = logging.getLogger("kafka")
89

@@ -23,3 +24,34 @@ def send_messages(self, *msg):
2324

2425
resp = self.client.send_produce_request([req])[0]
2526
assert resp.error == 0
27+
28+
29+
class KeyedProducer(object):
30+
"""
31+
A producer which distributes messages to partitions based on the key
32+
33+
Args:
34+
client - The kafka client instance
35+
topic - The kafka topic to send messages to
36+
partitioner - A partitioner class that will be used to get the partition
37+
to send the message to. Must be derived from Partitioner
38+
"""
39+
def __init__(self, client, topic, partitioner=None):
40+
self.client = client
41+
self.topic = topic
42+
self.client._load_metadata_for_topics(topic)
43+
44+
if not partitioner:
45+
partitioner = HashedPartitioner
46+
47+
self.partitioner = partitioner(self.client.topic_partitions[topic])
48+
49+
def send(self, key, msg):
50+
partitions = self.client.topic_partitions[self.topic]
51+
partition = self.partitioner.partition(key, partitions)
52+
53+
req = ProduceRequest(self.topic, partition,
54+
messages=[create_message(msg)])
55+
56+
resp = self.client.send_produce_request([req])[0]
57+
assert resp.error == 0

0 commit comments

Comments
 (0)