Skip to content

Commit b2a6503

Browse files
committed
Implement support for Keyed producer
Provides support for two partitioners * Round robin * Hashed (default as per kafka clients)
1 parent 77b8301 commit b2a6503

File tree

3 files changed

+96
-0
lines changed

3 files changed

+96
-0
lines changed

README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,18 @@ for message in consumer:
4141
kafka.close()
4242
```
4343

44+
## Keyed messages
45+
from kafka.client import KafkaClient
46+
from kafka.producer import KeyedProducer
47+
from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner
48+
49+
kafka = KafkaClient("localhost", 9092)
50+
51+
producer = KeyedProducer(kafka, "my-topic", partitioner=HashedPartitioner)
52+
producer.send("key1", "some message")
53+
producer.send("key2", "this methode")
54+
55+
4456
## Low level
4557

4658
```python

kafka/partitioner.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
from itertools import cycle
2+
3+
4+
class Partitioner(object):
5+
"""
6+
Base class for a partitioner
7+
"""
8+
def __init__(self, partitions):
9+
"""
10+
Initialize the partitioner
11+
12+
partitions - A list of available partitions (during startup)
13+
"""
14+
self.partitions = partitions
15+
16+
def partition(self, key, partitions):
17+
"""
18+
Takes a string key and num_partitions as argument and returns
19+
a partition to be used for the message
20+
21+
partitions - The list of partitions is passed in every call. This
22+
may look like an overhead, but it will be useful
23+
(in future) when we handle cases like rebalancing
24+
"""
25+
raise NotImplemented('partition function has to be implemented')
26+
27+
28+
class RoundRobinPartitioner(Partitioner):
29+
"""
30+
Implements a round robin partitioner which sends data to partitions
31+
in a round robin fashion
32+
"""
33+
def __init__(self, partitions):
34+
self.partitions = cycle(partitions)
35+
36+
def partition(self, key, partitions):
37+
# Refresh the partition list if necessary
38+
if self.partitions != partitions:
39+
self.partitions = cycle(partitions)
40+
41+
return self.partitions.next()
42+
43+
44+
class HashedPartitioner(Partitioner):
45+
"""
46+
Implements a partitioner which selects the target partition based on
47+
the hash of the key
48+
"""
49+
def partition(self, key, partitions):
50+
size = len(partitions)
51+
idx = hash(key) % size
52+
return partitions[idx]

kafka/producer.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from kafka.common import ProduceRequest
55
from kafka.protocol import create_message
6+
from kafka.partitioner import HashedPartitioner
67

78
log = logging.getLogger("kafka")
89

@@ -23,3 +24,34 @@ def send_messages(self, *msg):
2324

2425
resp = self.client.send_produce_request([req])[0]
2526
assert resp.error == 0
27+
28+
29+
class KeyedProducer(object):
30+
"""
31+
A producer which distributes messages to partitions based on the key
32+
33+
Args:
34+
client - The kafka client instance
35+
topic - The kafka topic to send messages to
36+
partitioner - A partitioner class that will be used to get the partition
37+
to send the message to. Must be derived from Partitioner
38+
"""
39+
def __init__(self, client, topic, partitioner=None):
40+
self.client = client
41+
self.topic = topic
42+
self.client._load_metadata_for_topics(topic)
43+
44+
if not partitioner:
45+
partitioner = HashedPartitioner
46+
47+
self.partitioner = partitioner(self.client.topic_partitions[topic])
48+
49+
def send(self, client, key, msg):
50+
partitions = self.client.topic_partitions[topic]
51+
partition = self.partitioner.partition(key, partitions)
52+
53+
req = ProduceRequest(self.topic, partition,
54+
messages=[create_message(msg)])
55+
56+
resp = self.client.send_produce_request([req])[0]
57+
assert resp.error == 0

0 commit comments

Comments
 (0)