diff --git a/src/Conf/KafkaConfiguration.php b/src/Conf/KafkaConfiguration.php index 6f817af..5d74d92 100644 --- a/src/Conf/KafkaConfiguration.php +++ b/src/Conf/KafkaConfiguration.php @@ -41,7 +41,7 @@ class KafkaConfiguration extends RdKafkaConf * @param mixed[] $config * @param string $type */ - public function __construct(array $brokers, array $topicSubscriptions, array $config = [], string $type = '') + public function __construct(array $brokers, array $topicSubscriptions = [], array $config = [], string $type = '') { parent::__construct(); diff --git a/src/Consumer/KafkaConsumerBuilder.php b/src/Consumer/KafkaConsumerBuilder.php index 96561ff..4363205 100644 --- a/src/Consumer/KafkaConsumerBuilder.php +++ b/src/Consumer/KafkaConsumerBuilder.php @@ -292,10 +292,6 @@ public function build(): KafkaConsumerInterface throw new KafkaConsumerBuilderException(KafkaConsumerBuilderException::NO_BROKER_EXCEPTION_MESSAGE); } - if ([] === $this->topics) { - throw new KafkaConsumerBuilderException(KafkaConsumerBuilderException::NO_TOPICS_EXCEPTION_MESSAGE); - } - //set additional config $this->config['group.id'] = $this->consumerGroup; diff --git a/src/Consumer/KafkaHighLevelConsumer.php b/src/Consumer/KafkaHighLevelConsumer.php index acb048c..2ec2de0 100644 --- a/src/Consumer/KafkaHighLevelConsumer.php +++ b/src/Consumer/KafkaHighLevelConsumer.php @@ -13,6 +13,7 @@ use Jobcloud\Kafka\Message\KafkaConsumerMessageInterface; use RdKafka\Exception as RdKafkaException; use RdKafka\Message as RdKafkaMessage; +use RdKafka\TopicPartition; use RdKafka\TopicPartition as RdKafkaTopicPartition; use RdKafka\KafkaConsumer as RdKafkaHighLevelConsumer; @@ -41,13 +42,14 @@ public function __construct( * Subscribes to all defined topics, if no partitions were set, subscribes to all partitions. * If partition(s) (and optionally offset(s)) were set, subscribes accordingly * + * @param array $topicSubscriptions * @throws KafkaConsumerSubscriptionException * @return void */ - public function subscribe(): void + public function subscribe(array $topicSubscriptions = []): void { - $subscriptions = $this->getTopicSubscriptions(); - $assignments = $this->getTopicAssignments(); + $subscriptions = $this->getTopicSubscriptions($topicSubscriptions); + $assignments = $this->getTopicAssignments($topicSubscriptions); if ([] !== $subscriptions && [] !== $assignments) { throw new KafkaConsumerSubscriptionException( @@ -239,13 +241,18 @@ private function getOffsetsToCommitForMessages(array $messages): array } /** + * @param array $topicSubscriptions * @return array|string[] */ - private function getTopicSubscriptions(): array + private function getTopicSubscriptions(array $topicSubscriptions = []): array { $subscriptions = []; - foreach ($this->kafkaConfiguration->getTopicSubscriptions() as $topicSubscription) { + if ([] === $topicSubscriptions) { + $topicSubscriptions = $this->kafkaConfiguration->getTopicSubscriptions(); + } + + foreach ($topicSubscriptions as $topicSubscription) { if ( [] !== $topicSubscription->getPartitions() || KafkaConsumerBuilderInterface::OFFSET_STORED !== $topicSubscription->getOffset() @@ -259,13 +266,18 @@ private function getTopicSubscriptions(): array } /** + * @param array $topicSubscriptions * @return array|RdKafkaTopicPartition[] */ - private function getTopicAssignments(): array + private function getTopicAssignments(array $topicSubscriptions = []): array { $assignments = []; - foreach ($this->kafkaConfiguration->getTopicSubscriptions() as $topicSubscription) { + if ([] === $topicSubscriptions) { + $topicSubscriptions = $this->kafkaConfiguration->getTopicSubscriptions(); + } + + foreach ($topicSubscriptions as $topicSubscription) { if ( [] === $topicSubscription->getPartitions() && KafkaConsumerBuilderInterface::OFFSET_STORED === $topicSubscription->getOffset() diff --git a/tests/Unit/Consumer/KafkaConsumerBuilderTest.php b/tests/Unit/Consumer/KafkaConsumerBuilderTest.php index 927e260..0207545 100644 --- a/tests/Unit/Consumer/KafkaConsumerBuilderTest.php +++ b/tests/Unit/Consumer/KafkaConsumerBuilderTest.php @@ -288,18 +288,6 @@ public function testBuildFailMissingBrokers(): void $this->kafkaConsumerBuilder->build(); } - /** - * @return void - * @throws KafkaConsumerBuilderException - */ - public function testBuildFailMissingTopics(): void - { - self::expectException(KafkaConsumerBuilderException::class); - self::expectExceptionMessage(KafkaConsumerBuilderException::NO_TOPICS_EXCEPTION_MESSAGE); - - $this->kafkaConsumerBuilder->withAdditionalBroker('localhost')->build(); - } - /** * @return void */ diff --git a/tests/Unit/Consumer/KafkaHighLevelConsumerTest.php b/tests/Unit/Consumer/KafkaHighLevelConsumerTest.php index 8385439..685a422 100644 --- a/tests/Unit/Consumer/KafkaHighLevelConsumerTest.php +++ b/tests/Unit/Consumer/KafkaHighLevelConsumerTest.php @@ -47,6 +47,22 @@ public function testSubscribeSuccess(): void $kafkaConsumer->subscribe(); } + /** + * @throws KafkaConsumerSubscriptionException + */ + public function testSubscribeSuccessWithParam(): void + { + $rdKafkaConsumerMock = $this->createMock(RdKafkaHighLevelConsumer::class); + $kafkaConfigurationMock = $this->createMock(KafkaConfiguration::class); + $kafkaConfigurationMock->expects(self::never())->method('getTopicSubscriptions'); + $decoderMock = $this->getMockForAbstractClass(DecoderInterface::class); + $kafkaConsumer = new KafkaHighLevelConsumer($rdKafkaConsumerMock, $kafkaConfigurationMock, $decoderMock); + + $rdKafkaConsumerMock->expects(self::once())->method('subscribe')->with(['testTopic3']); + + $kafkaConsumer->subscribe([new TopicSubscription('testTopic3')]); + } + /** * @throws KafkaConsumerSubscriptionException */