From 15b92ecfeaf90b139ed07519fbf382b65d7cb8b1 Mon Sep 17 00:00:00 2001 From: Michel Haller Date: Thu, 3 Jun 2021 18:14:29 +0200 Subject: [PATCH] feat(MPC-616): add getTopicSubscriptions method to consumers (#66) * feat(MPC-616): add getTopicSubscriptions method to consumers * feat(MPC-616): adjust 'for next major release' comment * feat(MPC-616): fix method annotation --- src/Consumer/AbstractKafkaConsumer.php | 8 +++++++ src/Consumer/KafkaConsumerInterface.php | 14 +++++++++-- src/Consumer/KafkaHighLevelConsumer.php | 4 ++-- .../Consumer/KafkaHighLevelConsumerTest.php | 24 +++++++++++++++++++ .../Consumer/KafkaLowLevelConsumerTest.php | 24 +++++++++++++++++++ 5 files changed, 70 insertions(+), 4 deletions(-) diff --git a/src/Consumer/AbstractKafkaConsumer.php b/src/Consumer/AbstractKafkaConsumer.php index af4ff01..66ee5c7 100644 --- a/src/Consumer/AbstractKafkaConsumer.php +++ b/src/Consumer/AbstractKafkaConsumer.php @@ -235,6 +235,14 @@ protected function getConsumerMessage(RdKafkaMessage $message): KafkaConsumerMes ); } + /** + * @return array + */ + public function getTopicSubscriptions(): array + { + return $this->kafkaConfiguration->getTopicSubscriptions(); + } + /** * @param integer $timeoutMs * @return null|RdKafkaMessage diff --git a/src/Consumer/KafkaConsumerInterface.php b/src/Consumer/KafkaConsumerInterface.php index 89a8251..aeca20a 100644 --- a/src/Consumer/KafkaConsumerInterface.php +++ b/src/Consumer/KafkaConsumerInterface.php @@ -4,12 +4,15 @@ namespace Jobcloud\Kafka\Consumer; -use Jobcloud\Kafka\Consumer\ConsumerInterface; use Jobcloud\Kafka\Message\KafkaConsumerMessageInterface; use RdKafka\Metadata\Topic as RdKafkaMetadataTopic; -use RdKafka\ConsumerTopic as RdKafkaConsumerTopic; use RdKafka\TopicPartition as RdKafkaTopicPartition; +/** + * @todo v2: subscribe(array $topicSubscriptions = []) + * @method array getTopicSubscriptions() + */ + interface KafkaConsumerInterface { /** @@ -105,4 +108,11 @@ public function getFirstOffsetForTopicPartition(string $topic, int $partition, i * @return integer */ public function getLastOffsetForTopicPartition(string $topic, int $partition, int $timeoutMs): int; + + /** + * @todo v2 + * + * @return array + */ + //public function getTopicSubscriptions(): array; } diff --git a/src/Consumer/KafkaHighLevelConsumer.php b/src/Consumer/KafkaHighLevelConsumer.php index 2ec2de0..fe7980a 100644 --- a/src/Consumer/KafkaHighLevelConsumer.php +++ b/src/Consumer/KafkaHighLevelConsumer.php @@ -48,7 +48,7 @@ public function __construct( */ public function subscribe(array $topicSubscriptions = []): void { - $subscriptions = $this->getTopicSubscriptions($topicSubscriptions); + $subscriptions = $this->getTopicSubscriptionNames($topicSubscriptions); $assignments = $this->getTopicAssignments($topicSubscriptions); if ([] !== $subscriptions && [] !== $assignments) { @@ -244,7 +244,7 @@ private function getOffsetsToCommitForMessages(array $messages): array * @param array $topicSubscriptions * @return array|string[] */ - private function getTopicSubscriptions(array $topicSubscriptions = []): array + private function getTopicSubscriptionNames(array $topicSubscriptions = []): array { $subscriptions = []; diff --git a/tests/Unit/Consumer/KafkaHighLevelConsumerTest.php b/tests/Unit/Consumer/KafkaHighLevelConsumerTest.php index 685a422..aca482a 100644 --- a/tests/Unit/Consumer/KafkaHighLevelConsumerTest.php +++ b/tests/Unit/Consumer/KafkaHighLevelConsumerTest.php @@ -3,6 +3,7 @@ namespace Jobcloud\Kafka\Tests\Unit\Kafka\Consumer; use Jobcloud\Kafka\Consumer\KafkaHighLevelConsumer; +use Jobcloud\Kafka\Consumer\TopicSubscriptionInterface; use Jobcloud\Kafka\Exception\KafkaConsumerConsumeException; use Jobcloud\Kafka\Message\Decoder\DecoderInterface; use Jobcloud\Kafka\Consumer\TopicSubscription; @@ -639,6 +640,29 @@ public function testClose(): void $kafkaConsumer->close(); } + /** + * @return void + */ + public function testGetTopicSubscriptionsReturnsTopicSubscriptions(): void + { + $rdKafkaConsumerMock = $this->createMock(RdKafkaHighLevelConsumer::class); + $decoderMock = $this->getMockForAbstractClass(DecoderInterface::class); + + $topicSubscriptionsMock = [ + $this->createMock(TopicSubscriptionInterface::class), + $this->createMock(TopicSubscriptionInterface::class) + ]; + + $kafkaConfigurationMock = $this->createMock(KafkaConfiguration::class); + $kafkaConfigurationMock->expects(self::once()) + ->method('getTopicSubscriptions') + ->willReturn($topicSubscriptionsMock); + + $kafkaConsumer = new KafkaHighLevelConsumer($rdKafkaConsumerMock, $kafkaConfigurationMock, $decoderMock); + + self::assertSame($topicSubscriptionsMock, $kafkaConsumer->getTopicSubscriptions()); + } + /** * @param int $partitionId * @return RdKafkaMetadataPartition|MockObject diff --git a/tests/Unit/Consumer/KafkaLowLevelConsumerTest.php b/tests/Unit/Consumer/KafkaLowLevelConsumerTest.php index 8e33855..64520b0 100644 --- a/tests/Unit/Consumer/KafkaLowLevelConsumerTest.php +++ b/tests/Unit/Consumer/KafkaLowLevelConsumerTest.php @@ -3,6 +3,7 @@ namespace Jobcloud\Kafka\Tests\Unit\Kafka\Consumer; use Jobcloud\Kafka\Consumer\KafkaLowLevelConsumer; +use Jobcloud\Kafka\Consumer\TopicSubscriptionInterface; use Jobcloud\Kafka\Exception\KafkaConsumerEndOfPartitionException; use Jobcloud\Kafka\Exception\KafkaConsumerTimeoutException; use Jobcloud\Kafka\Message\Decoder\DecoderInterface; @@ -555,6 +556,29 @@ function (string $topic, int $partition, int &$lowOffset, int &$highOffset, int $this->assertEquals(5, $lowOffset); } + /** + * @return void + */ + public function testGetTopicSubscriptionsReturnsTopicSubscriptions(): void + { + $rdKafkaConsumerMock = $this->createMock(RdKafkaLowLevelConsumer::class); + $decoderMock = $this->getMockForAbstractClass(DecoderInterface::class); + + $topicSubscriptionsMock = [ + $this->createMock(TopicSubscriptionInterface::class), + $this->createMock(TopicSubscriptionInterface::class) + ]; + + $kafkaConfigurationMock = $this->createMock(KafkaConfiguration::class); + $kafkaConfigurationMock->expects(self::once()) + ->method('getTopicSubscriptions') + ->willReturn($topicSubscriptionsMock); + + $kafkaConsumer = new KafkaLowLevelConsumer($rdKafkaConsumerMock, $kafkaConfigurationMock, $decoderMock); + + self::assertSame($topicSubscriptionsMock, $kafkaConsumer->getTopicSubscriptions()); + } + /** * @param int $partitionId * @return RdKafkaMetadataPartition|MockObject