Skip to content

Commit 18c5c2b

Browse files
authored
Merge pull request #17 from ensi-platform/devops-2512
DEVOPS-2512 feature: listen multiple topics
2 parents edf627c + 3ab97c1 commit 18c5c2b

File tree

13 files changed

+148
-99
lines changed

13 files changed

+148
-99
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ use Ensi\LaravelPhpRdKafkaConsumer\Commands\KafkaConsumeCommand;
166166
use Ensi\LaravelPhpRdKafkaConsumer\Tests\ConsumerFaker;
167167
use RdKafka\Message;
168168

169-
ConsumerFaker::new('test-model')
169+
ConsumerFaker::new(['test-model'])
170170
->addMessage(new Message())
171171
->addMessage(new Message())
172172
->consume();

phpunit.xml

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,16 @@
11
<?xml version="1.0" encoding="UTF-8"?>
2-
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3-
xsi:noNamespaceSchemaLocation="./vendor/phpunit/phpunit/phpunit.xsd"
4-
bootstrap="vendor/autoload.php"
5-
colors="true"
6-
>
7-
<testsuites>
8-
<testsuite name="Tests">
9-
<directory>tests</directory>
10-
</testsuite>
11-
</testsuites>
12-
<coverage>
13-
<include>
14-
<directory suffix="Test.php">./src</directory>
15-
</include>
16-
</coverage>
17-
<logging>
18-
<junit outputFile="build/report.junit.xml"/>
19-
</logging>
2+
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/10.5/phpunit.xsd" bootstrap="vendor/autoload.php" colors="true">
3+
<testsuites>
4+
<testsuite name="Tests">
5+
<directory>tests</directory>
6+
</testsuite>
7+
</testsuites>
8+
<logging>
9+
<junit outputFile="build/report.junit.xml"/>
10+
</logging>
11+
<source>
12+
<include>
13+
<directory suffix="Test.php">./src</directory>
14+
</include>
15+
</source>
2016
</phpunit>

src/Commands/KafkaConsumeCommand.php

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ class KafkaConsumeCommand extends Command implements SignalableCommandInterface
1818
* The name and signature of the console command.
1919
*/
2020
protected $signature = 'kafka:consume
21-
{topic-key : The key of a topic in the kafka.topics list}
21+
{topic-key : The key of a topic in the kafka.topics list, can accept multiple keys delimited by comma}
2222
{consumer=default : The name of the consumer}
2323
{--max-events=0 : The number of events to consume before stopping}
2424
{--max-time=0 : The maximum number of seconds the worker should run}
@@ -82,14 +82,18 @@ public function handleSignal(int $signal, int|false $previousExitCode = 0): int|
8282
*/
8383
public function handle(ConsumerFactory $consumerFactory): int
8484
{
85+
$topicKeys = explode(',', $this->getTopicKey());
86+
8587
try {
8688
$this->consumer = $consumerFactory
87-
->build($this->getTopicKey(), $this->getConsumerName())
89+
->build($topicKeys, $this->getConsumerName())
8890
->setMaxEvents($this->getMaxEvents())
8991
->setMaxTime($this->getMaxTime());
9092

91-
$this->info("Start listening to topic: \"{$this->getTopicKey()}\"" .
92-
" ({$this->consumer->getTopicName()}), consumer \"{$this->getConsumerName()}\"");
93+
foreach ($this->consumer->getTopicNames() as $topicName) {
94+
$this->info("Start listening to topic:" .
95+
" {$topicName}, consumer \"{$this->getConsumerName()}\"");
96+
}
9397

9498
$this->consumer->listen();
9599
} catch (Throwable $exception) {

src/Consumers/Consumer.php

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,22 +5,28 @@
55
use Ensi\LaravelPhpRdKafkaConsumer\ConsumerOptions;
66
use Ensi\LaravelPhpRdKafkaConsumer\HighLevelConsumer;
77
use Ensi\LaravelPhpRdKafkaConsumer\ProcessorData;
8-
use RdKafka\Exception;
98
use Throwable;
109

1110
class Consumer
1211
{
12+
/**
13+
* @param HighLevelConsumer $highLevelConsumer
14+
* @param ProcessorData[] $processorData
15+
* @param ConsumerOptions $consumerOptions
16+
* @param array $topicNames
17+
*/
1318
public function __construct(
1419
protected HighLevelConsumer $highLevelConsumer,
15-
protected ProcessorData $processorData,
20+
protected array $processorData,
1621
protected ConsumerOptions $consumerOptions,
17-
protected string $topicName
22+
protected array $topicNames,
23+
protected string $consumerName,
1824
) {
1925
}
2026

21-
public function getTopicName(): string
27+
public function getTopicNames(): array
2228
{
23-
return $this->topicName;
29+
return $this->topicNames;
2430
}
2531

2632
public function setMaxTime(int $maxTime = 0): self
@@ -42,24 +48,18 @@ public function forceStop(): void
4248
$this->highLevelConsumer->forceStop();
4349
}
4450

45-
public function getProcessorData(): ProcessorData
46-
{
47-
return $this->processorData;
48-
}
49-
5051
public function getConsumerOptions(): ConsumerOptions
5152
{
5253
return $this->consumerOptions;
5354
}
5455

5556
/**
56-
* @throws Exception
5757
* @throws Throwable
5858
*/
5959
public function listen(): void
6060
{
6161
$this->highLevelConsumer
62-
->for($this->processorData->consumer)
63-
->listen($this->topicName, $this->processorData, $this->consumerOptions);
62+
->for($this->consumerName)
63+
->listen($this->topicNames, $this->processorData, $this->consumerOptions);
6464
}
6565
}

src/Consumers/Factories/ConsumerFactory.php

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,24 @@ public function __construct(
1919
/**
2020
* @throws KafkaConsumerProcessorException
2121
*/
22-
public function build(string $topicKey, string $consumerName = 'default'): Consumer
22+
public function build(array $topicKeys, string $consumerName = 'default'): Consumer
2323
{
24-
$processorData = $this->makeProcessorData($topicKey, $consumerName);
25-
$consumerOptions = $this->makeConsumerOptions($consumerName, $processorData);
24+
$topicNames = [];
25+
$processors = [];
26+
foreach ($topicKeys as $topicKey) {
27+
$topicName = KafkaFacade::topicNameByClient('consumer', $consumerName, $topicKey);
28+
$topicNames[] = $topicName;
29+
$processors[$topicName] = $this->makeProcessorData($topicKey, $consumerName);
30+
}
31+
32+
$consumerOptions = $this->makeConsumerOptions($consumerName);
2633

2734
return new Consumer(
2835
highLevelConsumer: $this->highLevelConsumer,
29-
processorData: $processorData,
36+
processorData: $processors,
3037
consumerOptions: $consumerOptions,
31-
topicName: KafkaFacade::topicNameByClient('consumer', $consumerName, $topicKey)
38+
topicNames: $topicNames,
39+
consumerName: $consumerName,
3240
);
3341
}
3442

@@ -39,10 +47,6 @@ protected function makeProcessorData(string $topicKey, string $consumerName): Pr
3947
{
4048
$processorData = $this->findMatchedProcessor($topicKey, $consumerName);
4149

42-
if (!class_exists($processorData->class)) {
43-
throw new KafkaConsumerProcessorException("Processor class \"$processorData->class\" is not found");
44-
}
45-
4650
if (!$processorData->hasValidType()) {
4751
throw new KafkaConsumerProcessorException("Invalid processor type \"$processorData->type\"," .
4852
" supported types are: " . implode(',', $processorData->getSupportedTypes()));
@@ -75,12 +79,12 @@ class: $processor['class'],
7579
throw new KafkaConsumerProcessorException("Processor for topic-key \"$topicKey\" and consumer \"$consumerName\" is not found");
7680
}
7781

78-
protected function makeConsumerOptions(string $consumerName, ProcessorData $processorData): ConsumerOptions
82+
protected function makeConsumerOptions(string $consumerName): ConsumerOptions
7983
{
8084
$consumerPackageOptions = config('kafka-consumer.consumer_options.' . $consumerName, []);
8185

8286
return new ConsumerOptions(
83-
consumeTimeout: $consumerPackageOptions['consume_timeout'] ?? $processorData->consumeTimeout,
87+
consumeTimeout: $consumerPackageOptions['consume_timeout'] ?? 20000,
8488
middleware: $this->collectMiddleware($consumerPackageOptions['middleware'] ?? []),
8589
);
8690
}

src/HighLevelConsumer.php

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
use RdKafka\KafkaConsumer;
1212
use RdKafka\Message;
1313
use RdKafka\TopicPartition;
14-
use Throwable;
1514

1615
class HighLevelConsumer
1716
{
@@ -42,14 +41,14 @@ public function forceStop(): static
4241
}
4342

4443
/**
45-
* @throws RdKafkaException
46-
* @throws Throwable
47-
*
48-
* @noinspection PhpRedundantCatchClauseInspection
44+
* @param array $topicNames
45+
* @param ProcessorData[] $processors
46+
* @param ConsumerOptions $options
47+
* @throws KafkaConsumerException
4948
*/
50-
public function listen(string $topicName, ProcessorData $processorData, ConsumerOptions $options): void
49+
public function listen(array $topicNames, array $processors, ConsumerOptions $options): void
5150
{
52-
$this->subscribe($topicName);
51+
$this->subscribe($topicNames);
5352

5453
[$startTime, $eventsProcessed] = [hrtime(true) / 1e9, 0];
5554

@@ -60,7 +59,11 @@ public function listen(string $topicName, ProcessorData $processorData, Consumer
6059
switch ($message->err) {
6160

6261
case RD_KAFKA_RESP_ERR_NO_ERROR:
63-
$this->processThroughMiddleware($processorData, $message, $options);
62+
if (!isset($processors[$message->topic_name])) {
63+
throw new KafkaConsumerException("Kafka error: no processor defined for topic '{$message->topic_name}'");
64+
}
65+
66+
$this->processThroughMiddleware($processors[$message->topic_name], $message, $options);
6467
$this->consumer->commitAsync($message);
6568
$eventsProcessed++;
6669

@@ -137,19 +140,21 @@ protected function shouldBeStopped(int|float $startTime, int $eventsProcessed, C
137140
return $this->forceStop;
138141
}
139142

140-
protected function subscribe(string $topicName): void
143+
protected function subscribe(array $topicNames): void
141144
{
142-
$attempts = 0;
143-
do {
144-
$topicExists = (bool)$this->getPartitions($topicName);
145-
if (!$topicExists) {
146-
$this->consumer->newTopic($topicName);
147-
sleep(1);
148-
}
149-
$attempts++;
150-
} while (!$topicExists && $attempts < 10);
145+
foreach ($topicNames as $topicName) {
146+
$attempts = 0;
147+
do {
148+
$topicExists = (bool)$this->getPartitions($topicName);
149+
if (!$topicExists) {
150+
$this->consumer->newTopic($topicName);
151+
sleep(1);
152+
}
153+
$attempts++;
154+
} while (!$topicExists && $attempts < 10);
155+
}
151156

152-
$this->consumer->subscribe([$topicName]);
157+
$this->consumer->subscribe($topicNames);
153158
}
154159

155160
/**

src/Tests/Consumer/KafkaConsumer.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,11 @@ class KafkaConsumer extends BaseKafkaConsumer
1212
{
1313
protected Metadata $metadata;
1414

15-
public function __construct(string $topicName, protected array $messages = [])
15+
public function __construct(array $topicNames, protected array $messages = [])
1616
{
1717
parent::__construct($this->makeConf());
1818

19-
$this->metadata = new Metadata($topicName);
19+
$this->metadata = new Metadata($topicNames);
2020
}
2121

2222
private function makeConf(): Conf
@@ -42,7 +42,7 @@ public function consume($timeout_ms)
4242
/**
4343
* @phpstan-ignore-next-line
4444
*/
45-
public function getMetadata($all_topics, $only_topic = null, $timeout_ms): Metadata
45+
public function getMetadata($all_topics, $only_topic = null, $timeout_ms = 0): Metadata
4646
{
4747
return $this->metadata;
4848
}

src/Tests/Consumer/Topics/Metadata.php

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,15 @@
22

33
namespace Ensi\LaravelPhpRdKafkaConsumer\Tests\Consumer\Topics;
44

5+
use Illuminate\Support\Arr;
6+
57
class Metadata
68
{
79
protected array $topics = [];
810

9-
public function __construct(string $topicName)
11+
public function __construct(array $topicNames)
1012
{
11-
$this->topics[] = new Topic($topicName);
13+
$this->topics = Arr::map($topicNames, fn ($topicName) => new Topic($topicName));
1214
}
1315

1416
public function getTopics(): array

src/Tests/ConsumerFaker.php

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,21 @@
99
use Ensi\LaravelPhpRdKafkaConsumer\HighLevelConsumer;
1010
use Ensi\LaravelPhpRdKafkaConsumer\Tests\Consumer\KafkaConsumer;
1111
use Ensi\LaravelPhpRdKafkaConsumer\Tests\Exceptions\OnlyTestingEnvironmentException;
12+
use Illuminate\Support\Arr;
1213
use RdKafka\Message;
1314
use Throwable;
1415

1516
class ConsumerFaker
1617
{
1718
protected array $messages = [];
1819

19-
protected string $topicName;
20+
protected array $topicNames;
2021

2122
public function __construct(
22-
protected string $topicKey,
23+
protected array $topicKeys,
2324
protected string $consumerName = 'default'
2425
) {
25-
$this->topicName = KafkaFacade::topicNameByClient('consumer', $consumerName, $topicKey);
26+
$this->topicNames = Arr::map($topicKeys, fn ($topicKey) => KafkaFacade::topicNameByClient('consumer', $consumerName, $topicKey));
2627
}
2728

2829
public function addMessage(Message $message): self
@@ -48,7 +49,7 @@ public function consume(): void
4849
$this->bind();
4950

5051
(new ConsumerFactory(resolve(HighLevelConsumer::class)))
51-
->build($this->topicKey, $this->consumerName)
52+
->build($this->topicKeys, $this->consumerName)
5253
->listen();
5354
}
5455

@@ -66,11 +67,11 @@ public function bind(): void
6667

6768
private function makeKafkaManager(): KafkaManager
6869
{
69-
return new KafkaManager(new KafkaConsumer($this->topicName, $this->messages));
70+
return new KafkaManager(new KafkaConsumer($this->topicNames, $this->messages));
7071
}
7172

72-
public static function new(string $topicKey, string $consumerName = 'default'): self
73+
public static function new(array $topicKeys, string $consumerName = 'default'): self
7374
{
74-
return new self($topicKey, $consumerName);
75+
return new self($topicKeys, $consumerName);
7576
}
7677
}

0 commit comments

Comments
 (0)