Skip to content

Commit 599ed87

Browse files
authored
Merge pull request #947 from dirk39/kafka_singleton_consumer
Kafka singleton consumer
2 parents acb978a + c4ba709 commit 599ed87

File tree

2 files changed

+50
-10
lines changed

2 files changed

+50
-10
lines changed

pkg/rdkafka/RdKafkaContext.php

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,19 @@ class RdKafkaContext implements Context
4545
*/
4646
private $kafkaConsumers;
4747

48+
/**
49+
* @var RdKafkaConsumer[]
50+
*/
51+
private $rdKafkaConsumers;
52+
4853
/**
4954
* @param array $config
5055
*/
5156
public function __construct(array $config)
5257
{
5358
$this->config = $config;
5459
$this->kafkaConsumers = [];
60+
$this->rdKafkaConsumers = [];
5561

5662
$this->setSerializer(new JsonSerializer());
5763
}
@@ -102,26 +108,33 @@ public function createConsumer(Destination $destination): Consumer
102108
{
103109
InvalidDestinationException::assertDestinationInstanceOf($destination, RdKafkaTopic::class);
104110

105-
$this->kafkaConsumers[] = $kafkaConsumer = new KafkaConsumer($this->getConf());
111+
$queueName = $destination->getQueueName();
106112

107-
$consumer = new RdKafkaConsumer(
108-
$kafkaConsumer,
109-
$this,
110-
$destination,
111-
$this->getSerializer()
112-
);
113+
if (!isset($this->rdKafkaConsumers[$queueName])) {
114+
$this->kafkaConsumers[] = $kafkaConsumer = new KafkaConsumer($this->getConf());
115+
116+
$consumer = new RdKafkaConsumer(
117+
$kafkaConsumer,
118+
$this,
119+
$destination,
120+
$this->getSerializer()
121+
);
122+
123+
if (isset($this->config['commit_async'])) {
124+
$consumer->setCommitAsync($this->config['commit_async']);
125+
}
113126

114-
if (isset($this->config['commit_async'])) {
115-
$consumer->setCommitAsync($this->config['commit_async']);
127+
$this->rdKafkaConsumers[$queueName] = $consumer;
116128
}
117129

118-
return $consumer;
130+
return $this->rdKafkaConsumers[$queueName];
119131
}
120132

121133
public function close(): void
122134
{
123135
$kafkaConsumers = $this->kafkaConsumers;
124136
$this->kafkaConsumers = [];
137+
$this->rdKafkaConsumers = [];
125138

126139
foreach ($kafkaConsumers as $kafkaConsumer) {
127140
$kafkaConsumer->unsubscribe();

pkg/rdkafka/Tests/RdKafkaContextTest.php

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,4 +69,31 @@ public function testShouldInjectItsSerializerToConsumer()
6969

7070
$this->assertSame($context->getSerializer(), $producer->getSerializer());
7171
}
72+
73+
public function testShouldNotCreateConsumerTwice()
74+
{
75+
$context = new RdKafkaContext(['global' => [
76+
'group.id' => uniqid('', true),
77+
]]);
78+
$queue = $context->createQueue('aQueue');
79+
80+
$consumer = $context->createConsumer($queue);
81+
$consumer2 = $context->createConsumer($queue);
82+
83+
$this->assertSame($consumer, $consumer2);
84+
}
85+
86+
public function testShouldCreateTwoConsumers()
87+
{
88+
$context = new RdKafkaContext(['global' => [
89+
'group.id' => uniqid('', true),
90+
]]);
91+
$queueA = $context->createQueue('aQueue');
92+
$queueB = $context->createQueue('bQueue');
93+
94+
$consumer = $context->createConsumer($queueA);
95+
$consumer2 = $context->createConsumer($queueB);
96+
97+
$this->assertNotSame($consumer, $consumer2);
98+
}
7299
}

0 commit comments

Comments
 (0)