Skip to content

Commit 7963a06

Browse files
authored
only set topic conf when needed (#32)
1 parent f68dbc4 commit 7963a06

File tree

3 files changed

+18
-5
lines changed

3 files changed

+18
-5
lines changed

src/Conf/KafkaConfiguration.php

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
namespace Jobcloud\Kafka\Conf;
66

7+
use Jobcloud\Kafka\Consumer\KafkaConsumerBuilder;
78
use Jobcloud\Kafka\Consumer\TopicSubscription;
89
use RdKafka\Conf as RdKafkaConf;
910
use RdKafka\TopicConf as RdKafkaTopicConf;
@@ -21,6 +22,11 @@ class KafkaConfiguration extends RdKafkaConf
2122
*/
2223
protected $topicSubscriptions;
2324

25+
/**
26+
* @var string
27+
*/
28+
private $type;
29+
2430
/**
2531
* @var array<string,int>
2632
*/
@@ -33,14 +39,15 @@ class KafkaConfiguration extends RdKafkaConf
3339
* @param string[] $brokers
3440
* @param array|TopicSubscription[] $topicSubscriptions
3541
* @param mixed[] $config
42+
* @param string $type
3643
*/
37-
public function __construct(array $brokers, array $topicSubscriptions, array $config = [])
44+
public function __construct(array $brokers, array $topicSubscriptions, array $config = [], string $type = '')
3845
{
3946
parent::__construct();
4047

4148
$this->brokers = $brokers;
4249
$this->topicSubscriptions = $topicSubscriptions;
43-
50+
$this->type = $type;
4451
$this->initializeConfig($config);
4552
}
4653

@@ -81,7 +88,10 @@ protected function initializeConfig(array $config = []): void
8188
continue;
8289
}
8390

84-
if (true === $this->isLowLevelTopicConfSetting($name)) {
91+
if (
92+
KafkaConsumerBuilder::CONSUMER_TYPE_LOW_LEVEL === $this->type
93+
&& true === $this->isLowLevelTopicConfSetting($name)
94+
) {
8595
$topicConf->set($name, (string) $value);
8696
$this->setDefaultTopicConf($topicConf);
8797
}

src/Consumer/KafkaConsumerBuilder.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,8 @@ public function build(): KafkaConsumerInterface
303303
$kafkaConfig = new KafkaConfiguration(
304304
$this->brokers,
305305
$this->topics,
306-
$this->config
306+
$this->config,
307+
$this->consumerType
307308
);
308309

309310
//set consumer callbacks

tests/Unit/Conf/KafkaConfigurationTest.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace Jobcloud\Kafka\Tests\Unit\Kafka\Conf;
44

5+
use Jobcloud\Kafka\Consumer\KafkaConsumerBuilder;
56
use Jobcloud\Kafka\Consumer\TopicSubscription;
67
use Jobcloud\Kafka\Conf\KafkaConfiguration;
78
use PHPUnit\Framework\TestCase;
@@ -97,7 +98,8 @@ public function testConfigValues($inputValue, $expectedValue): void
9798
[
9899
'group.id' => $inputValue,
99100
'auto.commit.interval.ms' => 100
100-
]
101+
],
102+
KafkaConsumerBuilder::CONSUMER_TYPE_LOW_LEVEL
101103
);
102104

103105
$config = $kafkaConfiguration->getConfiguration();

0 commit comments

Comments
 (0)