Skip to content

Commit

Permalink
feat(MPM-329): add subscription param (#59)
Browse files Browse the repository at this point in the history
* feat(MPM-329): add subscription param

* add test, remove from low level (legacy)

* fix cs
  • Loading branch information
nick-zh authored Apr 6, 2021
1 parent c93d278 commit 890efc8
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 24 deletions.
2 changes: 1 addition & 1 deletion src/Conf/KafkaConfiguration.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class KafkaConfiguration extends RdKafkaConf
* @param mixed[] $config
* @param string $type
*/
public function __construct(array $brokers, array $topicSubscriptions, array $config = [], string $type = '')
public function __construct(array $brokers, array $topicSubscriptions = [], array $config = [], string $type = '')
{
parent::__construct();

Expand Down
4 changes: 0 additions & 4 deletions src/Consumer/KafkaConsumerBuilder.php
Original file line number Diff line number Diff line change
Expand Up @@ -292,10 +292,6 @@ public function build(): KafkaConsumerInterface
throw new KafkaConsumerBuilderException(KafkaConsumerBuilderException::NO_BROKER_EXCEPTION_MESSAGE);
}

if ([] === $this->topics) {
throw new KafkaConsumerBuilderException(KafkaConsumerBuilderException::NO_TOPICS_EXCEPTION_MESSAGE);
}

//set additional config
$this->config['group.id'] = $this->consumerGroup;

Expand Down
26 changes: 19 additions & 7 deletions src/Consumer/KafkaHighLevelConsumer.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use Jobcloud\Kafka\Message\KafkaConsumerMessageInterface;
use RdKafka\Exception as RdKafkaException;
use RdKafka\Message as RdKafkaMessage;
use RdKafka\TopicPartition;
use RdKafka\TopicPartition as RdKafkaTopicPartition;
use RdKafka\KafkaConsumer as RdKafkaHighLevelConsumer;

Expand Down Expand Up @@ -41,13 +42,14 @@ public function __construct(
* Subscribes to all defined topics, if no partitions were set, subscribes to all partitions.
* If partition(s) (and optionally offset(s)) were set, subscribes accordingly
*
* @param array<TopicSubscription> $topicSubscriptions
* @throws KafkaConsumerSubscriptionException
* @return void
*/
public function subscribe(): void
public function subscribe(array $topicSubscriptions = []): void
{
$subscriptions = $this->getTopicSubscriptions();
$assignments = $this->getTopicAssignments();
$subscriptions = $this->getTopicSubscriptions($topicSubscriptions);
$assignments = $this->getTopicAssignments($topicSubscriptions);

if ([] !== $subscriptions && [] !== $assignments) {
throw new KafkaConsumerSubscriptionException(
Expand Down Expand Up @@ -239,13 +241,18 @@ private function getOffsetsToCommitForMessages(array $messages): array
}

/**
* @param array<TopicSubscription> $topicSubscriptions
* @return array|string[]
*/
private function getTopicSubscriptions(): array
private function getTopicSubscriptions(array $topicSubscriptions = []): array
{
$subscriptions = [];

foreach ($this->kafkaConfiguration->getTopicSubscriptions() as $topicSubscription) {
if ([] === $topicSubscriptions) {
$topicSubscriptions = $this->kafkaConfiguration->getTopicSubscriptions();
}

foreach ($topicSubscriptions as $topicSubscription) {
if (
[] !== $topicSubscription->getPartitions()
|| KafkaConsumerBuilderInterface::OFFSET_STORED !== $topicSubscription->getOffset()
Expand All @@ -259,13 +266,18 @@ private function getTopicSubscriptions(): array
}

/**
* @param array<TopicSubscription> $topicSubscriptions
* @return array|RdKafkaTopicPartition[]
*/
private function getTopicAssignments(): array
private function getTopicAssignments(array $topicSubscriptions = []): array
{
$assignments = [];

foreach ($this->kafkaConfiguration->getTopicSubscriptions() as $topicSubscription) {
if ([] === $topicSubscriptions) {
$topicSubscriptions = $this->kafkaConfiguration->getTopicSubscriptions();
}

foreach ($topicSubscriptions as $topicSubscription) {
if (
[] === $topicSubscription->getPartitions()
&& KafkaConsumerBuilderInterface::OFFSET_STORED === $topicSubscription->getOffset()
Expand Down
12 changes: 0 additions & 12 deletions tests/Unit/Consumer/KafkaConsumerBuilderTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -288,18 +288,6 @@ public function testBuildFailMissingBrokers(): void
$this->kafkaConsumerBuilder->build();
}

/**
* @return void
* @throws KafkaConsumerBuilderException
*/
public function testBuildFailMissingTopics(): void
{
self::expectException(KafkaConsumerBuilderException::class);
self::expectExceptionMessage(KafkaConsumerBuilderException::NO_TOPICS_EXCEPTION_MESSAGE);

$this->kafkaConsumerBuilder->withAdditionalBroker('localhost')->build();
}

/**
* @return void
*/
Expand Down
16 changes: 16 additions & 0 deletions tests/Unit/Consumer/KafkaHighLevelConsumerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,22 @@ public function testSubscribeSuccess(): void
$kafkaConsumer->subscribe();
}

/**
* @throws KafkaConsumerSubscriptionException
*/
public function testSubscribeSuccessWithParam(): void
{
$rdKafkaConsumerMock = $this->createMock(RdKafkaHighLevelConsumer::class);
$kafkaConfigurationMock = $this->createMock(KafkaConfiguration::class);
$kafkaConfigurationMock->expects(self::never())->method('getTopicSubscriptions');
$decoderMock = $this->getMockForAbstractClass(DecoderInterface::class);
$kafkaConsumer = new KafkaHighLevelConsumer($rdKafkaConsumerMock, $kafkaConfigurationMock, $decoderMock);

$rdKafkaConsumerMock->expects(self::once())->method('subscribe')->with(['testTopic3']);

$kafkaConsumer->subscribe([new TopicSubscription('testTopic3')]);
}

/**
* @throws KafkaConsumerSubscriptionException
*/
Expand Down

0 comments on commit 890efc8

Please sign in to comment.