Skip to content

Commit edf627c

Browse files
authored
Merge pull request #16 from kEERill/consumer-exception-logging
Add Consumer logging
2 parents 36b83e9 + 19ea834 commit edf627c

29 files changed

+820
-96
lines changed

README.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,24 @@ You can use any of the constants defined by the pcntl extension https://www.php.
156156

157157
Please see [CONTRIBUTING](.github/CONTRIBUTING.md) for details.
158158

159+
## Consumer faking
160+
161+
Testing tools have been added to test the developed handlers. You can create a fake
162+
Consumer and call the topic listening command:
163+
164+
```php
165+
use Ensi\LaravelPhpRdKafkaConsumer\Commands\KafkaConsumeCommand;
166+
use Ensi\LaravelPhpRdKafkaConsumer\Tests\ConsumerFaker;
167+
use RdKafka\Message;
168+
169+
ConsumerFaker::new('test-model')
170+
->addMessage(new Message())
171+
->addMessage(new Message())
172+
->consume();
173+
```
174+
175+
## Testing
176+
159177
### Testing
160178

161179
1. composer install

config/kafka-consumer.php

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@
44
'global_middleware' => [],
55

66
'processors' => [],
7-
7+
8+
'log_channel' => env('KAFKA_CONSUMER_LOG_CHANNEL', 'null'),
9+
810
'consumer_options' => [
911
/** options for consumer with name `default` */
1012
'default' => [
@@ -19,6 +21,6 @@
1921
| Array of middleware.
2022
*/
2123
'middleware' => [],
22-
]
23-
]
24+
],
25+
],
2426
];

src/Commands/KafkaConsumeCommand.php

Lines changed: 61 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,12 @@
22

33
namespace Ensi\LaravelPhpRdKafkaConsumer\Commands;
44

5-
use Ensi\LaravelPhpRdKafka\KafkaFacade;
6-
use Ensi\LaravelPhpRdKafkaConsumer\ConsumerOptions;
7-
use Ensi\LaravelPhpRdKafkaConsumer\HighLevelConsumer;
8-
use Ensi\LaravelPhpRdKafkaConsumer\ProcessorData;
5+
use Ensi\LaravelPhpRdKafkaConsumer\Consumers\Consumer;
6+
use Ensi\LaravelPhpRdKafkaConsumer\Consumers\Factories\ConsumerFactory;
7+
use Ensi\LaravelPhpRdKafkaConsumer\Exceptions\KafkaConsumerException;
8+
use Ensi\LaravelPhpRdKafkaConsumer\Exceptions\KafkaConsumerProcessorException;
9+
use Ensi\LaravelPhpRdKafkaConsumer\Loggers\ConsumerLoggerFactory;
10+
use Ensi\LaravelPhpRdKafkaConsumer\Loggers\ConsumerLoggerInterface;
911
use Illuminate\Console\Command;
1012
use Symfony\Component\Console\Command\SignalableCommandInterface;
1113
use Throwable;
@@ -28,7 +30,12 @@ class KafkaConsumeCommand extends Command implements SignalableCommandInterface
2830
*/
2931
protected $description = 'Consume concrete topic';
3032

31-
protected ?HighLevelConsumer $consumer = null;
33+
protected ?Consumer $consumer = null;
34+
35+
public function __construct(protected ConsumerLoggerFactory $loggerFactory)
36+
{
37+
parent::__construct();
38+
}
3239

3340
public function getStopSignalsFromConfig(): array
3441
{
@@ -40,6 +47,26 @@ public function getSubscribedSignals(): array
4047
return $this->getStopSignalsFromConfig();
4148
}
4249

50+
public function getTopicKey(): string
51+
{
52+
return $this->argument('topic-key');
53+
}
54+
55+
public function getConsumerName(): string
56+
{
57+
return $this->argument('consumer');
58+
}
59+
60+
public function getMaxEvents(): int
61+
{
62+
return $this->option('once') ? 1 : (int) $this->option('max-events');
63+
}
64+
65+
public function getMaxTime(): int
66+
{
67+
return (int) $this->option('max-time');
68+
}
69+
4370
public function handleSignal(int $signal, int|false $previousExitCode = 0): int|false
4471
{
4572
if ($this->consumer) {
@@ -53,84 +80,49 @@ public function handleSignal(int $signal, int|false $previousExitCode = 0): int|
5380
/**
5481
* Execute the console command.
5582
*/
56-
public function handle(HighLevelConsumer $highLevelConsumer): int
83+
public function handle(ConsumerFactory $consumerFactory): int
5784
{
58-
$this->consumer = $highLevelConsumer;
59-
$topicKey = $this->argument('topic-key');
60-
$consumer = $this->argument('consumer');
61-
62-
$processorData = $this->findMatchedProcessor($topicKey, $consumer);
63-
if (is_null($processorData)) {
64-
$this->error("Processor for topic-key \"$topicKey\" and consumer \"$consumer\" is not found");
65-
$this->line('Processors are set in /config/kafka-consumer.php');
66-
67-
return 1;
68-
}
69-
70-
if (!class_exists($processorData->class)) {
71-
$this->error("Processor class \"$processorData->class\" is not found");
72-
$this->line('Processors are set in /config/kafka-consumer.php');
73-
74-
return 1;
75-
}
76-
77-
if (!$processorData->hasValidType()) {
78-
$this->error("Invalid processor type \"$processorData->type\", supported types are: " . implode(',', $processorData->getSupportedTypes()));
79-
80-
return 1;
81-
}
82-
83-
$consumerPackageOptions = config('kafka-consumer.consumer_options.' . $consumer, []);
84-
$consumerOptions = new ConsumerOptions(
85-
consumeTimeout: $consumerPackageOptions['consume_timeout'] ?? $processorData->consumeTimeout,
86-
maxEvents: $this->option('once') ? 1 : (int) $this->option('max-events'),
87-
maxTime: (int) $this->option('max-time'),
88-
middleware: $this->collectMiddleware($consumerPackageOptions['middleware'] ?? []),
89-
);
85+
try {
86+
$this->consumer = $consumerFactory
87+
->build($this->getTopicKey(), $this->getConsumerName())
88+
->setMaxEvents($this->getMaxEvents())
89+
->setMaxTime($this->getMaxTime());
9090

91-
$topicName = KafkaFacade::topicNameByClient('consumer', $consumer, $topicKey);
92-
$this->info("Start listening to topic: \"{$topicKey}\" ({$topicName}), consumer \"{$consumer}\"");
91+
$this->info("Start listening to topic: \"{$this->getTopicKey()}\"" .
92+
" ({$this->consumer->getTopicName()}), consumer \"{$this->getConsumerName()}\"");
9393

94-
try {
95-
$highLevelConsumer
96-
->for($consumer)
97-
->listen($topicName, $processorData, $consumerOptions);
98-
} catch (Throwable $e) {
99-
$this->error('An error occurred while listening to the topic: ' . $e->getMessage() . ' ' . $e->getFile() . '::' . $e->getLine());
94+
$this->consumer->listen();
95+
} catch (Throwable $exception) {
96+
$this->errorThrowable($exception);
10097

101-
return 1;
98+
return self::FAILURE;
10299
}
103100

104-
return 0;
101+
return self::SUCCESS;
105102
}
106103

107-
protected function findMatchedProcessor(string $topic, string $consumer): ?ProcessorData
104+
private function errorThrowable(Throwable $exception): void
108105
{
109-
foreach (config('kafka-consumer.processors', []) as $processor) {
110-
$topicMatched = empty($processor['topic']) || $processor['topic'] === $topic;
111-
$consumerMatched = empty($processor['consumer']) || $processor['consumer'] === $consumer;
112-
if ($topicMatched && $consumerMatched) {
113-
return new ProcessorData(
114-
class: $processor['class'],
115-
topicKey: $processor['topic'] ?? null,
116-
consumer: $processor['consumer'] ?? null,
117-
type: $processor['type'] ?? 'action',
118-
queue: $processor['queue'] ?? false,
119-
consumeTimeout: $processor['consume_timeout'] ?? 20000,
120-
);
106+
$this->makeLogger()
107+
->error($exception->getMessage(), ['exception' => $exception]);
108+
109+
if ($exception instanceof KafkaConsumerException) {
110+
$this->error($exception->getMessage());
111+
112+
if ($exception instanceof KafkaConsumerProcessorException) {
113+
$this->line('Processors are set in /config/kafka-consumer.php');
121114
}
115+
116+
return;
122117
}
123118

124-
return null;
119+
$this->error('An error occurred while listening to the topic: ' .
120+
$exception->getMessage() . ' ' . $exception->getFile() . '::' . $exception->getLine());
125121
}
126122

127-
protected function collectMiddleware(array $processorMiddleware): array
123+
private function makeLogger(): ConsumerLoggerInterface
128124
{
129-
return array_unique(
130-
array_merge(
131-
config('kafka-consumer.global_middleware', []),
132-
$processorMiddleware
133-
)
134-
);
125+
return $this->loggerFactory
126+
->make($this->getTopicKey(), $this->getConsumerName());
135127
}
136128
}

src/Consumers/Consumer.php

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
<?php
2+
3+
namespace Ensi\LaravelPhpRdKafkaConsumer\Consumers;
4+
5+
use Ensi\LaravelPhpRdKafkaConsumer\ConsumerOptions;
6+
use Ensi\LaravelPhpRdKafkaConsumer\HighLevelConsumer;
7+
use Ensi\LaravelPhpRdKafkaConsumer\ProcessorData;
8+
use RdKafka\Exception;
9+
use Throwable;
10+
11+
class Consumer
12+
{
13+
public function __construct(
14+
protected HighLevelConsumer $highLevelConsumer,
15+
protected ProcessorData $processorData,
16+
protected ConsumerOptions $consumerOptions,
17+
protected string $topicName
18+
) {
19+
}
20+
21+
public function getTopicName(): string
22+
{
23+
return $this->topicName;
24+
}
25+
26+
public function setMaxTime(int $maxTime = 0): self
27+
{
28+
$this->consumerOptions->maxTime = $maxTime;
29+
30+
return $this;
31+
}
32+
33+
public function setMaxEvents(int $maxEvents = 0): self
34+
{
35+
$this->consumerOptions->maxEvents = $maxEvents;
36+
37+
return $this;
38+
}
39+
40+
public function forceStop(): void
41+
{
42+
$this->highLevelConsumer->forceStop();
43+
}
44+
45+
public function getProcessorData(): ProcessorData
46+
{
47+
return $this->processorData;
48+
}
49+
50+
public function getConsumerOptions(): ConsumerOptions
51+
{
52+
return $this->consumerOptions;
53+
}
54+
55+
/**
56+
* @throws Exception
57+
* @throws Throwable
58+
*/
59+
public function listen(): void
60+
{
61+
$this->highLevelConsumer
62+
->for($this->processorData->consumer)
63+
->listen($this->topicName, $this->processorData, $this->consumerOptions);
64+
}
65+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
<?php
2+
3+
namespace Ensi\LaravelPhpRdKafkaConsumer\Consumers\Factories;
4+
5+
use Ensi\LaravelPhpRdKafka\KafkaFacade;
6+
use Ensi\LaravelPhpRdKafkaConsumer\ConsumerOptions;
7+
use Ensi\LaravelPhpRdKafkaConsumer\Consumers\Consumer;
8+
use Ensi\LaravelPhpRdKafkaConsumer\Exceptions\KafkaConsumerProcessorException;
9+
use Ensi\LaravelPhpRdKafkaConsumer\HighLevelConsumer;
10+
use Ensi\LaravelPhpRdKafkaConsumer\ProcessorData;
11+
12+
class ConsumerFactory
13+
{
14+
public function __construct(
15+
protected HighLevelConsumer $highLevelConsumer
16+
) {
17+
}
18+
19+
/**
20+
* @throws KafkaConsumerProcessorException
21+
*/
22+
public function build(string $topicKey, string $consumerName = 'default'): Consumer
23+
{
24+
$processorData = $this->makeProcessorData($topicKey, $consumerName);
25+
$consumerOptions = $this->makeConsumerOptions($consumerName, $processorData);
26+
27+
return new Consumer(
28+
highLevelConsumer: $this->highLevelConsumer,
29+
processorData: $processorData,
30+
consumerOptions: $consumerOptions,
31+
topicName: KafkaFacade::topicNameByClient('consumer', $consumerName, $topicKey)
32+
);
33+
}
34+
35+
/**
36+
* @throws KafkaConsumerProcessorException
37+
*/
38+
protected function makeProcessorData(string $topicKey, string $consumerName): ProcessorData
39+
{
40+
$processorData = $this->findMatchedProcessor($topicKey, $consumerName);
41+
42+
if (!class_exists($processorData->class)) {
43+
throw new KafkaConsumerProcessorException("Processor class \"$processorData->class\" is not found");
44+
}
45+
46+
if (!$processorData->hasValidType()) {
47+
throw new KafkaConsumerProcessorException("Invalid processor type \"$processorData->type\"," .
48+
" supported types are: " . implode(',', $processorData->getSupportedTypes()));
49+
}
50+
51+
return $processorData;
52+
}
53+
54+
/**
55+
* @throws KafkaConsumerProcessorException
56+
*/
57+
protected function findMatchedProcessor(string $topicKey, string $consumerName): ProcessorData
58+
{
59+
foreach (config('kafka-consumer.processors', []) as $processor) {
60+
$topicMatched = empty($processor['topic']) || $processor['topic'] === $topicKey;
61+
$consumerMatched = empty($processor['consumer']) || $processor['consumer'] === $consumerName;
62+
63+
if ($topicMatched && $consumerMatched) {
64+
return new ProcessorData(
65+
class: $processor['class'],
66+
topicKey: $processor['topic'] ?? $topicKey,
67+
consumer: $processor['consumer'] ?? $consumerName,
68+
type: $processor['type'] ?? 'action',
69+
queue: $processor['queue'] ?? false,
70+
consumeTimeout: $processor['consume_timeout'] ?? 20000,
71+
);
72+
}
73+
}
74+
75+
throw new KafkaConsumerProcessorException("Processor for topic-key \"$topicKey\" and consumer \"$consumerName\" is not found");
76+
}
77+
78+
protected function makeConsumerOptions(string $consumerName, ProcessorData $processorData): ConsumerOptions
79+
{
80+
$consumerPackageOptions = config('kafka-consumer.consumer_options.' . $consumerName, []);
81+
82+
return new ConsumerOptions(
83+
consumeTimeout: $consumerPackageOptions['consume_timeout'] ?? $processorData->consumeTimeout,
84+
middleware: $this->collectMiddleware($consumerPackageOptions['middleware'] ?? []),
85+
);
86+
}
87+
88+
protected function collectMiddleware(array $processorMiddleware): array
89+
{
90+
return collect(config('kafka-consumer.global_middleware', []))
91+
->merge($processorMiddleware)
92+
->unique()
93+
->values()
94+
->toArray();
95+
}
96+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
<?php
2+
3+
namespace Ensi\LaravelPhpRdKafkaConsumer\Exceptions;
4+
5+
use Exception;
6+
7+
final class KafkaConsumerMessagedEndedException extends Exception
8+
{
9+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
<?php
2+
3+
namespace Ensi\LaravelPhpRdKafkaConsumer\Exceptions;
4+
5+
final class KafkaConsumerProcessorException extends KafkaConsumerException
6+
{
7+
}

0 commit comments

Comments
 (0)