Skip to content

Commit 188c908

Browse files
authored
Merge pull request #6 from ensi-platform/task-93909
#93909 add check-offsets and set-offset artisan commands
2 parents 05d7e3a + 4a52dc1 commit 188c908

File tree

5 files changed

+162
-1
lines changed

5 files changed

+162
-1
lines changed

composer.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,13 @@
2626
"require-dev": {
2727
"brianium/paratest": "^6.2",
2828
"friendsofphp/php-cs-fixer": "^3.2",
29+
"kwn/php-rdkafka-stubs": "^2.2",
2930
"nunomaduro/collision": "^5.3",
3031
"orchestra/testbench": "^6.15",
3132
"pestphp/pest": "^1.18",
3233
"pestphp/pest-plugin-laravel": "^1.1",
33-
"phpunit/phpunit": "^9.3",
3434
"php-parallel-lint/php-var-dump-check": "^0.5.0",
35+
"phpunit/phpunit": "^9.3",
3536
"spatie/laravel-ray": "^1.9"
3637
},
3738
"autoload": {
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
<?php
2+
3+
namespace Ensi\LaravelPhpRdKafkaConsumer\Commands;
4+
5+
use Ensi\LaravelPhpRdKafkaConsumer\HighLevelConsumer;
6+
use Illuminate\Console\Command;
7+
8+
class KafkaCheckOffsetsCommand extends Command
9+
{
10+
protected $signature = 'kafka:check-offsets';
11+
protected $description = 'Проверить что для всех используемых топиков задан offset';
12+
13+
public function handle(): int
14+
{
15+
$configuredTopics = config('kafka-consumer.processors', []);
16+
$success = true;
17+
foreach ($configuredTopics as $configuredTopic) {
18+
$success &= $this->checkTopic($configuredTopic['consumer'], $configuredTopic['topic']);
19+
sleep(1);
20+
}
21+
22+
23+
return $success ? self::SUCCESS : self::FAILURE;
24+
}
25+
26+
protected function checkTopic(string $consumerName, string $topicName): bool
27+
{
28+
$this->getOutput()->writeln("<bg=cyan>Check topic:</> {$topicName}");
29+
/** @var HighLevelConsumer $consumer */
30+
$consumer = resolve(HighLevelConsumer::class);
31+
$consumer->for($consumerName);
32+
$partitions = $consumer->getPartitions($topicName);
33+
if (!$partitions) {
34+
$this->getOutput()->writeln(" <fg=red>Topic {$topicName} doesn't exists!</>");
35+
return false;
36+
}
37+
$success = true;
38+
foreach ($partitions as $partition) {
39+
if ($partition->getOffset() < 0) {
40+
$success = false;
41+
$this->getOutput()->writeln(" <fg=red>No offset for partition: {$partition->getPartition()}</>");
42+
}
43+
}
44+
45+
if ($success) {
46+
$this->getOutput()->writeln(" <fg=green>No errors</>");
47+
}
48+
49+
return $success;
50+
}
51+
}

src/Commands/KafkaSetOffset.php

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
<?php
2+
3+
namespace Ensi\LaravelPhpRdKafkaConsumer\Commands;
4+
5+
use Ensi\LaravelPhpRdKafkaConsumer\HighLevelConsumer;
6+
use Illuminate\Console\Command;
7+
use RdKafka\Exception as RdKafkaException;
8+
9+
class KafkaSetOffset extends Command
10+
{
11+
public const EARLIEST = 'earliest';
12+
public const LATEST = 'latest';
13+
14+
protected $signature = 'kafka:set-offset
15+
{--consumer= : name of consumer}
16+
{--topic= : name of topic}
17+
{--partition= : id of partition}
18+
{--offset= : desired offset, "earliest", "latest" or positive integer}';
19+
protected $description = "Задать смещение для консюмера в топике";
20+
21+
public function handle()
22+
{
23+
$consumerName = $this->option('consumer');
24+
$topicName = $this->option('topic');
25+
$partitionId = $this->option('partition');
26+
$offset = $this->option('offset');
27+
28+
if (!in_array($offset, [self::EARLIEST, self::LATEST]) && !is_numeric($offset)) {
29+
$this->getOutput()->writeln("<fg=red>Error: Invalid offset</>");
30+
return self::INVALID;
31+
}
32+
33+
/** @var HighLevelConsumer $consumer */
34+
$consumer = resolve(HighLevelConsumer::class);
35+
$consumer->for($consumerName);
36+
37+
try {
38+
$bounds = $consumer->getPartitionBounds($topicName, $partitionId);
39+
$realOffset = match($offset) {
40+
'earliest' => $bounds[0] ?? 0,
41+
'latest' => $bounds[1] ?? 0,
42+
default => $offset,
43+
};
44+
45+
if ($offset != $realOffset) {
46+
$this->getOutput()->writeln("<fg=yellow>Use {$offset} offset:</> {$realOffset}");
47+
}
48+
49+
$consumer->setOffset($topicName, $partitionId, $realOffset);
50+
} catch (RdKafkaException $e) {
51+
$this->getOutput()->writeln("<fg=red>Error: {$e->getMessage()}</>");
52+
53+
return self::INVALID;
54+
}
55+
$this->getOutput()->writeln("<fg=green>Success</>");
56+
57+
return self::SUCCESS;
58+
}
59+
}

src/HighLevelConsumer.php

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
use RdKafka\Exception as RdKafkaException;
99
use RdKafka\KafkaConsumer;
1010
use RdKafka\Message;
11+
use RdKafka\TopicPartition;
1112
use Throwable;
1213

1314
class HighLevelConsumer
@@ -126,4 +127,49 @@ protected function shouldBeStopped(int|float $startTime, int $eventsProcessed, C
126127

127128
return $this->forceStop;
128129
}
130+
131+
/**
132+
* @param string $topicName
133+
* @return TopicPartition[]
134+
* @throws RdKafkaException
135+
*/
136+
public function getPartitions(string $topicName): array
137+
{
138+
$metadata = $this->consumer->getMetadata(true, null, 1000);
139+
foreach ($metadata->getTopics() as $topicMeta) {
140+
if ($topicMeta->getTopic() != $topicName) {
141+
continue;
142+
}
143+
$requestPartitions = [];
144+
foreach ($topicMeta->getPartitions() as $partitionMeta) {
145+
$requestPartitions[] = new TopicPartition($topicName, $partitionMeta->getId());
146+
$partitionMeta->getId();
147+
}
148+
149+
return $this->consumer->getCommittedOffsets($requestPartitions, 1000);
150+
}
151+
152+
return [];
153+
}
154+
155+
public function getPartitionBounds(string $topicName, int $partitionId): array
156+
{
157+
$this->consumer->queryWatermarkOffsets($topicName, $partitionId, $low, $high, 1000);
158+
159+
return [$low, $high];
160+
}
161+
162+
/**
163+
* @throws RdKafkaException
164+
*/
165+
public function setOffset(string $topicName, int $partitionId, int $offset): void
166+
{
167+
$this->consumer->commit([
168+
new TopicPartition(
169+
$topicName,
170+
$partitionId,
171+
$offset
172+
)
173+
]);
174+
}
129175
}

src/LaravelPhpRdKafkaConsumerServiceProvider.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
namespace Ensi\LaravelPhpRdKafkaConsumer;
44

5+
use Ensi\LaravelPhpRdKafkaConsumer\Commands\KafkaCheckOffsetsCommand;
56
use Ensi\LaravelPhpRdKafkaConsumer\Commands\KafkaConsumeCommand;
7+
use Ensi\LaravelPhpRdKafkaConsumer\Commands\KafkaSetOffset;
68
use Illuminate\Support\ServiceProvider;
79

810
class LaravelPhpRdKafkaConsumerServiceProvider extends ServiceProvider
@@ -26,6 +28,8 @@ public function boot()
2628

2729
$this->commands([
2830
KafkaConsumeCommand::class,
31+
KafkaCheckOffsetsCommand::class,
32+
KafkaSetOffset::class,
2933
]);
3034
}
3135
}

0 commit comments

Comments
 (0)