Skip to content

Commit 2ed094a

Browse files
authored
Merge pull request #8 from ensi-platform/task-102492
#102492 работа с топиками из списка
2 parents 29de855 + ba85ed0 commit 2ed094a

File tree

5 files changed

+19
-23
lines changed

5 files changed

+19
-23
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,3 +10,4 @@ psalm.xml
1010
testbench.yaml
1111
vendor
1212
node_modules
13+
studio.json

composer.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
"require": {
1919
"php": "^8.0",
2020
"ext-rdkafka": "*",
21-
"ensi/laravel-phprdkafka": "^0.2",
21+
"ensi/laravel-phprdkafka": "^0.3",
2222
"illuminate/contracts": "^8.37 || ^9.0",
2323
"illuminate/pipeline": "^8.37 || ^9.0",
2424
"illuminate/support": "^8.37 || ^9.0"

src/Commands/KafkaConsumeCommand.php

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace Ensi\LaravelPhpRdKafkaConsumer\Commands;
44

5+
use Ensi\LaravelPhpRdKafka\KafkaFacade;
56
use Ensi\LaravelPhpRdKafkaConsumer\ConsumerOptions;
67
use Ensi\LaravelPhpRdKafkaConsumer\HighLevelConsumer;
78
use Ensi\LaravelPhpRdKafkaConsumer\ProcessorData;
@@ -15,7 +16,7 @@ class KafkaConsumeCommand extends Command implements SignalableCommandInterface
1516
* The name and signature of the console command.
1617
*/
1718
protected $signature = 'kafka:consume
18-
{topic : The name of the topic}
19+
{topic-key : The key of a topic in the kafka.topics list}
1920
{consumer=default : The name of the consumer}
2021
{--max-events=0 : The number of events to consume before stopping}
2122
{--max-time=0 : The maximum number of seconds the worker should run}
@@ -53,20 +54,12 @@ public function handleSignal(int $signal): void
5354
public function handle(HighLevelConsumer $highLevelConsumer): int
5455
{
5556
$this->consumer = $highLevelConsumer;
56-
$topic = $this->argument('topic');
57+
$topicKey = $this->argument('topic-key');
5758
$consumer = $this->argument('consumer');
58-
$availableConsumers = array_keys(config('kafka.consumers', []));
5959

60-
if (!in_array($consumer, $availableConsumers)) {
61-
$this->error("Unknown consumer \"$consumer\"");
62-
$this->line('Available consumers are: "' . implode(', ', $availableConsumers) . '" and can be found in /config/kafka.php');
63-
64-
return 1;
65-
}
66-
67-
$processorData = $this->findMatchedProcessor($topic, $consumer);
60+
$processorData = $this->findMatchedProcessor($topicKey, $consumer);
6861
if (is_null($processorData)) {
69-
$this->error("Processor for topic \"$topic\" and consumer \"$consumer\" is not found");
62+
$this->error("Processor for topic-key \"$topicKey\" and consumer \"$consumer\" is not found");
7063
$this->line('Processors are set in /config/kafka-consumers.php');
7164

7265
return 1;
@@ -93,12 +86,13 @@ public function handle(HighLevelConsumer $highLevelConsumer): int
9386
middleware: $this->collectMiddleware($consumerPackageOptions['middleware'] ?? []),
9487
);
9588

96-
$this->info("Start listenning to topic: \"$topic\", consumer \"$consumer\"");
89+
$topicName = KafkaFacade::topicNameByClient('consumer', $consumer, $topicKey);
90+
$this->info("Start listening to topic: \"{$topicKey}\" ({$topicName}), consumer \"{$consumer}\"");
9791

9892
try {
9993
$highLevelConsumer
10094
->for($consumer)
101-
->listen($topic, $processorData, $consumerOptions);
95+
->listen($topicName, $processorData, $consumerOptions);
10296
} catch (Throwable $e) {
10397
$this->error('An error occurred while listening to the topic: '. $e->getMessage(). ' '. $e->getFile() . '::' . $e->getLine());
10498

@@ -111,13 +105,12 @@ public function handle(HighLevelConsumer $highLevelConsumer): int
111105
protected function findMatchedProcessor(string $topic, string $consumer): ?ProcessorData
112106
{
113107
foreach (config('kafka-consumer.processors', []) as $processor) {
114-
if (
115-
(empty($processor['topic']) || $processor['topic'] === $topic)
116-
&& (empty($processor['consumer']) || $processor['consumer'] === $consumer)
117-
) {
108+
$topicMatched = empty($processor['topic']) || $processor['topic'] === $topic;
109+
$consumerMatched = empty($processor['consumer']) || $processor['consumer'] === $consumer;
110+
if ($topicMatched && $consumerMatched) {
118111
return new ProcessorData(
119112
class: $processor['class'],
120-
topic: $processor['topic'] ?? null,
113+
topicKey: $processor['topic'] ?? null,
121114
consumer: $processor['consumer'] ?? null,
122115
type: $processor['type'] ?? 'action',
123116
queue: $processor['queue'] ?? false,

src/HighLevelConsumer.php

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use Ensi\LaravelPhpRdKafka\KafkaManager;
66
use Ensi\LaravelPhpRdKafkaConsumer\Exceptions\KafkaConsumerException;
7+
use Illuminate\Foundation\Bus\Dispatchable;
78
use Illuminate\Pipeline\Pipeline;
89
use RdKafka\Exception as RdKafkaException;
910
use RdKafka\KafkaConsumer;
@@ -15,7 +16,7 @@ class HighLevelConsumer
1516
{
1617
protected ?KafkaConsumer $consumer;
1718

18-
protected $forceStop = false;
19+
protected bool $forceStop = false;
1920

2021
public function __construct(
2122
protected KafkaManager $kafkaManager,
@@ -40,7 +41,6 @@ public function forceStop(): static
4041
}
4142

4243
/**
43-
* @throws KafkaException
4444
* @throws RdKafkaException
4545
* @throws Throwable
4646
*/
@@ -95,6 +95,7 @@ protected function executeProcessor(ProcessorData $processorData, Message $messa
9595

9696
protected function executeSyncProcessor(ProcessorData $processorData, Message $message): void
9797
{
98+
/** @var class-string|Dispatchable $className */
9899
$className = $processorData->class;
99100
if ($processorData->type === 'job') {
100101
$className::dispatchSync($message);
@@ -105,6 +106,7 @@ protected function executeSyncProcessor(ProcessorData $processorData, Message $m
105106

106107
protected function executeQueueableProcessor(ProcessorData $processorData, Message $message): void
107108
{
109+
/** @var class-string|Dispatchable $className */
108110
$className = $processorData->class;
109111
$queue = $processorData->queue;
110112
if ($processorData->type === 'job') {

src/ProcessorData.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ class ProcessorData
88

99
public function __construct(
1010
public string $class,
11-
public ?string $topic = null,
11+
public ?string $topicKey = null,
1212
public ?string $consumer = null,
1313
public string $type = 'action',
1414
public string|bool $queue = false,

0 commit comments

Comments
 (0)