Skip to content

Commit b78f86e

Browse files
authored
interpret [] as all partitions for asign as well (#29)
* throw exception for offset without partition * fix stan & cs * fix assign * remove obsolete message * add partition fetch * remove at * add infection (#30)
1 parent 7e8dc21 commit b78f86e

12 files changed

+215
-80
lines changed

.circleci/config.yml

+5
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ workflows:
2727
dependencyCheckSumFile: "./composer.json"
2828
requires:
2929
- ci-php/install-dependencies
30+
- ci-php/infection-testing:
31+
dockerComposeFile: "./docker/docker-compose.yml"
32+
dependencyCheckSumFile: "./composer.json"
33+
requires:
34+
- ci-php/install-dependencies
3035

3136
jobs:
3237
coverage:

Makefile

+8-1
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
1-
.PHONY: clean code-style coverage help test static-analysis update-dependencies xdebug-enable xdebug-disable
1+
.PHONY: clean code-style coverage help test static-analysis update-dependencies xdebug-enable xdebug-disable infection-testing
22
.DEFAULT_GOAL := test
33

44
PHPUNIT = ./vendor/bin/phpunit -c ./phpunit.xml
55
PHPDBG = phpdbg -qrr ./vendor/bin/phpunit -c ./phpunit.xml
66
PHPSTAN = ./vendor/bin/phpstan
77
PHPCS = ./vendor/bin/phpcs --extensions=php
88
CONSOLE = ./bin/console
9+
INFECTION = ./vendor/bin/infection
910

1011
clean:
1112
rm -rf ./build ./vendor
@@ -33,6 +34,11 @@ install-dependencies:
3334
install-dependencies-lowest:
3435
composer install --prefer-lowest
3536

37+
infection-testing:
38+
make coverage
39+
cp -f build/logs/phpunit/junit.xml build/logs/phpunit/coverage/junit.xml
40+
${INFECTION} --coverage=build/logs/phpunit/coverage --min-msi=76 --threads=`nproc`
41+
3642
xdebug-enable:
3743
sudo php-ext-enable xdebug
3844

@@ -50,6 +56,7 @@ help:
5056
# help You're looking at it!
5157
# test (default) Run all the tests with phpunit
5258
# static-analysis Run static analysis using phpstan
59+
# infection-testing Run infection/mutation testing
5360
# install-dependencies Run composer install
5461
# update-dependencies Run composer update
5562
# xdebug-enable Enable xdebug

composer.json

+3-2
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,15 @@
2222
"ext-json": "*"
2323
},
2424
"require-dev": {
25-
"phpunit/phpunit": "^9.1",
25+
"phpunit/phpunit": "^9.3",
2626
"squizlabs/php_codesniffer": "^3.5.4",
2727
"phpstan/phpstan": "0.12.32",
2828
"php-mock/php-mock-phpunit": "^2.6",
2929
"kwn/php-rdkafka-stubs": "^2.0.0",
3030
"rregeer/phpunit-coverage-check": "^0.3.1",
3131
"johnkary/phpunit-speedtrap": "^3.1",
32-
"flix-tech/avro-serde-php": "^1.3"
32+
"flix-tech/avro-serde-php": "^1.3",
33+
"infection/infection": "^0.16"
3334
},
3435
"autoload": {
3536
"psr-4": {

infection.json

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
{
2+
"timeout": 10,
3+
"source": {
4+
"directories": [
5+
"src"
6+
]
7+
},
8+
"logs": {
9+
"text": "build\/logs\/infection\/infection.log",
10+
"summary": "build\/logs\/infection\/infection-summary.log"
11+
},
12+
"mutators": {
13+
"@default": true
14+
},
15+
"phpUnit": {
16+
"customPath": "vendor/bin/phpunit"
17+
}
18+
}

phpunit.xml

+44-43
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,46 @@
11
<?xml version="1.0" encoding="UTF-8"?>
2-
<phpunit
3-
backupGlobals = "false"
4-
backupStaticAttributes = "false"
5-
colors = "true"
6-
convertErrorsToExceptions = "true"
7-
convertNoticesToExceptions = "true"
8-
convertWarningsToExceptions = "true"
9-
processIsolation = "false"
10-
stopOnFailure = "false"
11-
bootstrap = "tests/bootstrap.php" >
12-
<php>
13-
<ini name="max_execution_time" value="-1"/>
14-
<ini name="html_errors" value="false"/>
15-
<ini name="memory_limit" value="2G"/>
16-
17-
<ini name="xdebug.default_enable" value="1" />
18-
<ini name="xdebug.enable_coverage" value="1" />
19-
<ini name="xdebug.remote_autostart" value="0" />
20-
<ini name="xdebug.remote_enable" value="0" />
21-
<ini name="xdebug.overload_var_dump" value="0" />
22-
<ini name="xdebug.show_mem_delta" value="0" />
23-
</php>
24-
25-
<testsuites>
26-
<testsuite name="Unit">
27-
<directory>./tests/Unit</directory>
28-
</testsuite>
29-
</testsuites>
30-
<logging>
31-
<log type="coverage-text" target="php://stdout" showOnlySummary="true"/>
32-
<log type="coverage-html" target="build/logs/phpunit/coverage"/>
33-
<log type="coverage-xml" target="build/logs/phpunit/coverage/coverage-xml"/>
34-
<log type="coverage-clover" target="clover.xml"/>
35-
<log type="junit" target="build/logs/phpunit/junit.xml"/>
36-
</logging>
37-
<filter>
38-
<whitelist>
39-
<directory>src</directory>
40-
</whitelist>
41-
</filter>
42-
<listeners>
43-
<listener class="JohnKary\PHPUnit\Listener\SpeedTrapListener" />
44-
</listeners>
2+
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3+
backupGlobals="false"
4+
backupStaticAttributes="false"
5+
colors="true"
6+
convertErrorsToExceptions="true"
7+
convertNoticesToExceptions="true"
8+
convertWarningsToExceptions="true"
9+
processIsolation="false"
10+
stopOnFailure="false"
11+
bootstrap="tests/bootstrap.php"
12+
xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/9.3/phpunit.xsd">
13+
<coverage>
14+
<include>
15+
<directory>src</directory>
16+
</include>
17+
<report>
18+
<clover outputFile="clover.xml"/>
19+
<html outputDirectory="build/logs/phpunit/coverage"/>
20+
<text outputFile="php://stdout" showOnlySummary="true"/>
21+
<xml outputDirectory="build/logs/phpunit/coverage/coverage-xml"/>
22+
</report>
23+
</coverage>
24+
<php>
25+
<ini name="max_execution_time" value="-1"/>
26+
<ini name="html_errors" value="false"/>
27+
<ini name="memory_limit" value="2G"/>
28+
<ini name="xdebug.default_enable" value="1"/>
29+
<ini name="xdebug.enable_coverage" value="1"/>
30+
<ini name="xdebug.remote_autostart" value="0"/>
31+
<ini name="xdebug.remote_enable" value="0"/>
32+
<ini name="xdebug.overload_var_dump" value="0"/>
33+
<ini name="xdebug.show_mem_delta" value="0"/>
34+
</php>
35+
<testsuites>
36+
<testsuite name="Unit">
37+
<directory>./tests/Unit</directory>
38+
</testsuite>
39+
</testsuites>
40+
<logging>
41+
<junit outputFile="build/logs/phpunit/junit.xml"/>
42+
</logging>
43+
<listeners>
44+
<listener class="JohnKary\PHPUnit\Listener\SpeedTrapListener"/>
45+
</listeners>
4546
</phpunit>

src/Consumer/AbstractKafkaConsumer.php

+18
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,24 @@ public function getLastOffsetForTopicPartition(string $topic, int $partition, in
200200
return $highOffset;
201201
}
202202

203+
/**
204+
* @param string $topic
205+
* @return int[]
206+
* @throws RdKafkaException
207+
*/
208+
protected function getAllTopicPartitions(string $topic): array
209+
{
210+
211+
$partitions = [];
212+
$topicMetadata = $this->getMetadataForTopic($topic);
213+
214+
foreach ($topicMetadata->getPartitions() as $partition) {
215+
$partitions[] = $partition->getId();
216+
}
217+
218+
return $partitions;
219+
}
220+
203221
/**
204222
* @param RdKafkaMessage $message
205223
* @return KafkaConsumerMessageInterface

src/Consumer/KafkaConsumerBuilder.php

+1
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ public function withSubscription(
146146
int $offset = self::OFFSET_STORED
147147
): KafkaConsumerBuilderInterface {
148148
$that = clone $this;
149+
149150
$that->topics = [new TopicSubscription($topicName, $partitions, $offset)];
150151

151152
return $that;

src/Consumer/KafkaHighLevelConsumer.php

+14-3
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,10 @@ private function getTopicSubscriptions(): array
246246
$subscriptions = [];
247247

248248
foreach ($this->kafkaConfiguration->getTopicSubscriptions() as $topicSubscription) {
249-
if ([] !== $topicSubscription->getPartitions()) {
249+
if (
250+
[] !== $topicSubscription->getPartitions()
251+
|| KafkaConsumerBuilderInterface::OFFSET_STORED !== $topicSubscription->getOffset()
252+
) {
250253
continue;
251254
}
252255
$subscriptions[] = $topicSubscription->getTopicName();
@@ -263,13 +266,21 @@ private function getTopicAssignments(): array
263266
$assignments = [];
264267

265268
foreach ($this->kafkaConfiguration->getTopicSubscriptions() as $topicSubscription) {
266-
if ([] === $topicSubscription->getPartitions()) {
269+
if (
270+
[] === $topicSubscription->getPartitions()
271+
&& KafkaConsumerBuilderInterface::OFFSET_STORED === $topicSubscription->getOffset()
272+
) {
267273
continue;
268274
}
269275

270276
$offset = $topicSubscription->getOffset();
277+
$partitions = $topicSubscription->getPartitions();
271278

272-
foreach ($topicSubscription->getPartitions() as $partitionId) {
279+
if ([] === $partitions) {
280+
$partitions = $this->getAllTopicPartitions($topicSubscription->getTopicName());
281+
}
282+
283+
foreach ($partitions as $partitionId) {
273284
$assignments[] = new RdKafkaTopicPartition(
274285
$topicSubscription->getTopicName(),
275286
$partitionId,

src/Consumer/KafkaLowLevelConsumer.php

-18
Original file line numberDiff line numberDiff line change
@@ -148,22 +148,4 @@ protected function kafkaConsume(int $timeoutMs): ?RdKafkaMessage
148148
{
149149
return $this->queue->consume($timeoutMs);
150150
}
151-
152-
/**
153-
* @param string $topic
154-
* @return int[]
155-
* @throws RdKafkaException
156-
*/
157-
private function getAllTopicPartitions(string $topic): array
158-
{
159-
160-
$partitions = [];
161-
$topicMetadata = $this->getMetadataForTopic($topic);
162-
163-
foreach ($topicMetadata->getPartitions() as $partition) {
164-
$partitions[] = $partition->getId();
165-
}
166-
167-
return $partitions;
168-
}
169151
}

0 commit comments

Comments
 (0)