Skip to content

Commit 2417132

Browse files
committed
[upd] Переименован параметр $consumer в $consumerName, как это реализовано в пакете laravel-rdkafka-producer
1 parent f68bd16 commit 2417132

File tree

8 files changed

+34
-34
lines changed

8 files changed

+34
-34
lines changed

src/Commands/KafkaConsumeCommand.php

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public function getTopicKey(): string
5252
return $this->argument('topic-key');
5353
}
5454

55-
public function getConsumer(): string
55+
public function getConsumerName(): string
5656
{
5757
return $this->argument('consumer');
5858
}
@@ -84,12 +84,12 @@ public function handle(ConsumerFactory $consumerFactory): int
8484
{
8585
try {
8686
$this->consumer = $consumerFactory
87-
->build($this->getTopicKey(), $this->getConsumer())
87+
->build($this->getTopicKey(), $this->getConsumerName())
8888
->setMaxEvents($this->getMaxEvents())
8989
->setMaxTime($this->getMaxTime());
9090

9191
$this->info("Start listening to topic: \"{$this->getTopicKey()}\"" .
92-
" ({$this->consumer->getTopicName()}), consumer \"{$this->getConsumer()}\"");
92+
" ({$this->consumer->getTopicName()}), consumer \"{$this->getConsumerName()}\"");
9393

9494
$this->consumer->listen();
9595
} catch (Throwable $exception) {
@@ -123,6 +123,6 @@ private function errorThrowable(Throwable $exception): void
123123
private function makeLogger(): ConsumerLoggerInterface
124124
{
125125
return $this->loggerFactory
126-
->make($this->getTopicKey(), $this->getConsumer());
126+
->make($this->getTopicKey(), $this->getConsumerName());
127127
}
128128
}

src/Consumers/Factories/ConsumerFactory.php

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -18,34 +18,34 @@ public function __construct(
1818

1919
/**
2020
* @param string $topicKey
21-
* @param string $consumer
21+
* @param string $consumerName
2222
* @return Consumer
2323
*
2424
* @throws KafkaConsumerProcessorException
2525
*/
26-
public function build(string $topicKey, string $consumer = 'default'): Consumer
26+
public function build(string $topicKey, string $consumerName = 'default'): Consumer
2727
{
28-
$processorData = $this->makeProcessorData($topicKey, $consumer);
29-
$consumerOptions = $this->makeConsumerOptions($consumer, $processorData);
28+
$processorData = $this->makeProcessorData($topicKey, $consumerName);
29+
$consumerOptions = $this->makeConsumerOptions($consumerName, $processorData);
3030

3131
return new Consumer(
3232
highLevelConsumer: $this->highLevelConsumer,
3333
processorData: $processorData,
3434
consumerOptions: $consumerOptions,
35-
topicName: KafkaFacade::topicNameByClient('consumer', $consumer, $topicKey)
35+
topicName: KafkaFacade::topicNameByClient('consumer', $consumerName, $topicKey)
3636
);
3737
}
3838

3939
/**
4040
* @param string $topicKey
41-
* @param string $consumer
41+
* @param string $consumerName
4242
* @return ProcessorData
4343
*
4444
* @throws KafkaConsumerProcessorException
4545
*/
46-
protected function makeProcessorData(string $topicKey, string $consumer): ProcessorData
46+
protected function makeProcessorData(string $topicKey, string $consumerName): ProcessorData
4747
{
48-
$processorData = $this->findMatchedProcessor($topicKey, $consumer);
48+
$processorData = $this->findMatchedProcessor($topicKey, $consumerName);
4949

5050
if (!class_exists($processorData->class)) {
5151
throw new KafkaConsumerProcessorException("Processor class \"$processorData->class\" is not found");
@@ -61,35 +61,35 @@ protected function makeProcessorData(string $topicKey, string $consumer): Proces
6161

6262
/**
6363
* @param string $topicKey
64-
* @param string $consumer
64+
* @param string $consumerName
6565
* @return ProcessorData
6666
*
6767
* @throws KafkaConsumerProcessorException
6868
*/
69-
protected function findMatchedProcessor(string $topicKey, string $consumer): ProcessorData
69+
protected function findMatchedProcessor(string $topicKey, string $consumerName): ProcessorData
7070
{
7171
foreach (config('kafka-consumer.processors', []) as $processor) {
7272
$topicMatched = empty($processor['topic']) || $processor['topic'] === $topicKey;
73-
$consumerMatched = empty($processor['consumer']) || $processor['consumer'] === $consumer;
73+
$consumerMatched = empty($processor['consumer']) || $processor['consumer'] === $consumerName;
7474

7575
if ($topicMatched && $consumerMatched) {
7676
return new ProcessorData(
7777
class: $processor['class'],
7878
topicKey: $processor['topic'] ?? $topicKey,
79-
consumer: $processor['consumer'] ?? $consumer,
79+
consumer: $processor['consumer'] ?? $consumerName,
8080
type: $processor['type'] ?? 'action',
8181
queue: $processor['queue'] ?? false,
8282
consumeTimeout: $processor['consume_timeout'] ?? 20000,
8383
);
8484
}
8585
}
8686

87-
throw new KafkaConsumerProcessorException("Processor for topic-key \"$topicKey\" and consumer \"$consumer\" is not found");
87+
throw new KafkaConsumerProcessorException("Processor for topic-key \"$topicKey\" and consumer \"$consumerName\" is not found");
8888
}
8989

90-
protected function makeConsumerOptions(string $consumer, ProcessorData $processorData): ConsumerOptions
90+
protected function makeConsumerOptions(string $consumerName, ProcessorData $processorData): ConsumerOptions
9191
{
92-
$consumerPackageOptions = config('kafka-consumer.consumer_options.' . $consumer, []);
92+
$consumerPackageOptions = config('kafka-consumer.consumer_options.' . $consumerName, []);
9393

9494
return new ConsumerOptions(
9595
consumeTimeout: $consumerPackageOptions['consume_timeout'] ?? $processorData->consumeTimeout,

src/Loggers/ConsumerLogger.php

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ final class ConsumerLogger extends AbstractLogger implements ConsumerLoggerInter
1010
{
1111
public function __construct(
1212
protected LoggerInterface $logger,
13-
protected string $topicKey,
14-
protected string $consumer,
13+
protected string $topicKey,
14+
protected string $consumerName,
1515
) {
1616
}
1717

@@ -20,9 +20,9 @@ public function getTopicKey(): string
2020
return $this->topicKey;
2121
}
2222

23-
public function getConsumer(): string
23+
public function getConsumerName(): string
2424
{
25-
return $this->consumer;
25+
return $this->consumerName;
2626
}
2727

2828
public function getLogger(): LoggerInterface
@@ -34,7 +34,7 @@ public function log($level, Stringable|string $message, array $context = []): vo
3434
{
3535
$context = array_merge($context, [
3636
'topic_key' => $this->topicKey,
37-
'consumer' => $this->consumer,
37+
'consumer_name' => $this->consumerName,
3838
]);
3939

4040
$this->logger->log($level, $message, $context);

src/Loggers/ConsumerLoggerFactory.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,12 @@
88

99
final class ConsumerLoggerFactory
1010
{
11-
public function make(string $topicKey, string $consumer): ConsumerLoggerInterface
11+
public function make(string $topicKey, string $consumerName = 'default'): ConsumerLoggerInterface
1212
{
1313
return new ConsumerLogger(
1414
$this->makeLogger(),
1515
$topicKey,
16-
$consumer
16+
$consumerName
1717
);
1818
}
1919

src/Loggers/ConsumerLoggerInterface.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,5 @@ public function getLogger(): LoggerInterface;
1010

1111
public function getTopicKey(): string;
1212

13-
public function getConsumer(): string;
13+
public function getConsumerName(): string;
1414
}

src/Tests/ConsumerFaker.php

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ class ConsumerFaker
2020

2121
public function __construct(
2222
protected string $topicKey,
23-
protected string $consumer = 'default'
23+
protected string $consumerName = 'default'
2424
) {
25-
$this->topicName = KafkaFacade::topicNameByClient('consumer', $consumer, $topicKey);
25+
$this->topicName = KafkaFacade::topicNameByClient('consumer', $consumerName, $topicKey);
2626
}
2727

2828
public function addMessage(Message $message): self
@@ -50,7 +50,7 @@ public function consume(): void
5050
$this->bind();
5151

5252
(new ConsumerFactory(resolve(HighLevelConsumer::class)))
53-
->build($this->topicKey, $this->consumer)
53+
->build($this->topicKey, $this->consumerName)
5454
->listen();
5555
}
5656

@@ -71,8 +71,8 @@ private function makeKafkaManager(): KafkaManager
7171
return new KafkaManager(new KafkaConsumer($this->topicName, $this->messages));
7272
}
7373

74-
public static function new(string $topicKey, string $consumer = 'default'): self
74+
public static function new(string $topicKey, string $consumerName = 'default'): self
7575
{
76-
return new self($topicKey, $consumer);
76+
return new self($topicKey, $consumerName);
7777
}
7878
}

tests/Loggers/ConsumerLoggerFactoryTest.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,6 @@
1313
->toBeInstanceOf(NullLogger::class)
1414
->and($consumerLogger->getTopicKey())
1515
->toBe('products')
16-
->and($consumerLogger->getConsumer())
16+
->and($consumerLogger->getConsumerName())
1717
->toBe('default');
1818
})->with([null, 'null']);

tests/Loggers/ConsumerLoggerTest.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
&& $message === 'Test message'
1717
&& count($context) == 3
1818
&& $context['topic_key'] === 'products'
19-
&& $context['consumer'] === 'default'
19+
&& $context['consumer_name'] === 'default'
2020
&& $context['foo'] == 'bar';
2121
});
2222
});

0 commit comments

Comments
 (0)