diff --git a/Makefile b/Makefile index 7b97d53..014e83a 100644 --- a/Makefile +++ b/Makefile @@ -37,7 +37,7 @@ infection-testing: make coverage cp -f build/logs/phpunit/junit.xml build/logs/phpunit/coverage/junit.xml sudo php-ext-disable pcov - ${INFECTION} --coverage=build/logs/phpunit/coverage --min-msi=91 --threads=`nproc` + ${INFECTION} --coverage=build/logs/phpunit/coverage --min-msi=93 --threads=`nproc` sudo php-ext-enable pcov pcov-enable: diff --git a/README.md b/README.md index 5feccb6..755eb49 100644 --- a/README.md +++ b/README.md @@ -204,46 +204,6 @@ while (true) { } ``` -#### Kafka Low Level - -```php -withAdditionalConfig( - [ - 'compression.codec' => 'lz4', - 'auto.commit.interval.ms' => 500 - ] - ) - ->withAdditionalBroker('kafka:9092') - ->withConsumerGroup('testGroup') - ->withAdditionalSubscription('test-topic') - ->withConsumerType(KafkaConsumerBuilder::CONSUMER_TYPE_LOW_LEVEL) - ->build(); - -$consumer->subscribe(); - -while (true) { - try { - $message = $consumer->consume(); - // your business logic - $consumer->commit($message); - } catch (KafkaConsumerTimeoutException $e) { - //no messages were read in a given time - } catch (KafkaConsumerEndOfPartitionException $e) { - //only occurs if enable.partition.eof is true (default: false) - } catch (KafkaConsumerConsumeException $e) { - // Failed - } -} -``` - #### Avro Consumer To create an avro consumer add the avro decoder. diff --git a/src/Conf/KafkaConfiguration.php b/src/Conf/KafkaConfiguration.php index 5d74d92..ed5e1ab 100644 --- a/src/Conf/KafkaConfiguration.php +++ b/src/Conf/KafkaConfiguration.php @@ -4,10 +4,8 @@ namespace Jobcloud\Kafka\Conf; -use Jobcloud\Kafka\Consumer\KafkaConsumerBuilder; use Jobcloud\Kafka\Consumer\TopicSubscription; use RdKafka\Conf as RdKafkaConf; -use RdKafka\TopicConf as RdKafkaTopicConf; class KafkaConfiguration extends RdKafkaConf { @@ -22,32 +20,17 @@ class KafkaConfiguration extends RdKafkaConf */ protected $topicSubscriptions; - /** - * @var string - */ - private $type; - - /** - * @var array - */ - private $lowLevelTopicSettings = [ - 'auto.commit.interval.ms' => 1, - 'auto.offset.reset' => 1, - ]; - /** * @param string[] $brokers * @param array|TopicSubscription[] $topicSubscriptions * @param mixed[] $config - * @param string $type */ - public function __construct(array $brokers, array $topicSubscriptions = [], array $config = [], string $type = '') + public function __construct(array $brokers, array $topicSubscriptions = [], array $config = []) { parent::__construct(); $this->brokers = $brokers; $this->topicSubscriptions = $topicSubscriptions; - $this->type = $type; $this->initializeConfig($config); } @@ -79,23 +62,13 @@ public function getConfiguration(): array * @param mixed[] $config * @return void */ - protected function initializeConfig(array $config = []): void + private function initializeConfig(array $config = []): void { - $topicConf = new RdKafkaTopicConf(); - foreach ($config as $name => $value) { if (false === is_scalar($value)) { continue; } - if ( - KafkaConsumerBuilder::CONSUMER_TYPE_LOW_LEVEL === $this->type - && true === $this->isLowLevelTopicConfSetting($name) - ) { - $topicConf->set($name, (string) $value); - $this->setDefaultTopicConf($topicConf); - } - if (true === is_bool($value)) { $value = true === $value ? 'true' : 'false'; } @@ -105,13 +78,4 @@ protected function initializeConfig(array $config = []): void $this->set('metadata.broker.list', implode(',', $this->getBrokers())); } - - /** - * @param string $settingName - * @return bool - */ - private function isLowLevelTopicConfSetting(string $settingName): bool - { - return true === isset($this->lowLevelTopicSettings[$settingName]); - } } diff --git a/src/Consumer/AbstractKafkaConsumer.php b/src/Consumer/AbstractKafkaConsumer.php index af4ff01..1b45818 100644 --- a/src/Consumer/AbstractKafkaConsumer.php +++ b/src/Consumer/AbstractKafkaConsumer.php @@ -11,9 +11,9 @@ use Jobcloud\Kafka\Conf\KafkaConfiguration; use Jobcloud\Kafka\Exception\KafkaConsumerConsumeException; use Jobcloud\Kafka\Message\KafkaConsumerMessage; -use RdKafka\Consumer as RdKafkaLowLevelConsumer; use RdKafka\Exception as RdKafkaException; use RdKafka\KafkaConsumer as RdKafkaHighLevelConsumer; +use RdKafka\KafkaConsumerTopic as RdKafkaConsumerTopic; use RdKafka\Metadata\Topic as RdKafkaMetadataTopic; use RdKafka\Message as RdKafkaMessage; use RdKafka\TopicPartition as RdKafkaTopicPartition; @@ -32,7 +32,7 @@ abstract class AbstractKafkaConsumer implements KafkaConsumerInterface protected $subscribed = false; /** - * @var RdKafkaLowLevelConsumer|RdKafkaHighLevelConsumer + * @var RdKafkaHighLevelConsumer */ protected $consumer; @@ -141,6 +141,7 @@ public function decodeMessage(KafkaConsumerMessageInterface $message): KafkaCons */ public function getMetadataForTopic(string $topicName, int $timeoutMs = 10000): RdKafkaMetadataTopic { + /** @var RdKafkaConsumerTopic $topic */ $topic = $this->consumer->newTopic($topicName); return $this->consumer ->getMetadata( @@ -222,7 +223,7 @@ protected function getAllTopicPartitions(string $topic): array * @param RdKafkaMessage $message * @return KafkaConsumerMessageInterface */ - protected function getConsumerMessage(RdKafkaMessage $message): KafkaConsumerMessageInterface + private function getConsumerMessage(RdKafkaMessage $message): KafkaConsumerMessageInterface { return new KafkaConsumerMessage( (string) $message->topic_name, diff --git a/src/Consumer/KafkaConsumerBuilder.php b/src/Consumer/KafkaConsumerBuilder.php index 4363205..2f48e50 100644 --- a/src/Consumer/KafkaConsumerBuilder.php +++ b/src/Consumer/KafkaConsumerBuilder.php @@ -9,15 +9,10 @@ use Jobcloud\Kafka\Exception\KafkaConsumerBuilderException; use Jobcloud\Kafka\Message\Decoder\DecoderInterface; use Jobcloud\Kafka\Message\Decoder\NullDecoder; -use RdKafka\Consumer as RdKafkaLowLevelConsumer; use RdKafka\KafkaConsumer as RdKafkaHighLevelConsumer; final class KafkaConsumerBuilder implements KafkaConsumerBuilderInterface { - - public const CONSUMER_TYPE_LOW_LEVEL = 'low'; - public const CONSUMER_TYPE_HIGH_LEVEL = 'high'; - /** * @var string[] */ @@ -42,11 +37,6 @@ final class KafkaConsumerBuilder implements KafkaConsumerBuilderInterface */ private $consumerGroup = 'default'; - /** - * @var string - */ - private $consumerType = self::CONSUMER_TYPE_HIGH_LEVEL; - /** * @var callable */ @@ -180,20 +170,6 @@ public function withConsumerGroup(string $consumerGroup): KafkaConsumerBuilderIn return $that; } - /** - * Set the consumer type, can be either CONSUMER_TYPE_LOW_LEVEL or CONSUMER_TYPE_HIGH_LEVEL - * - * @param string $consumerType - * @return KafkaConsumerBuilderInterface - */ - public function withConsumerType(string $consumerType): KafkaConsumerBuilderInterface - { - $that = clone $this; - $that->consumerType = $consumerType; - - return $that; - } - /** * Set a callback to be called on errors. * The default callback will throw an exception for every error @@ -299,34 +275,13 @@ public function build(): KafkaConsumerInterface $kafkaConfig = new KafkaConfiguration( $this->brokers, $this->topics, - $this->config, - $this->consumerType + $this->config ); //set consumer callbacks $this->registerCallbacks($kafkaConfig); //create RdConsumer - if (self::CONSUMER_TYPE_LOW_LEVEL === $this->consumerType) { - if (null !== $this->consumeCallback) { - throw new KafkaConsumerBuilderException( - sprintf( - KafkaConsumerBuilderException::UNSUPPORTED_CALLBACK_EXCEPTION_MESSAGE, - 'consumerCallback', - KafkaLowLevelConsumer::class - ) - ); - } - - $rdKafkaConsumer = new RdKafkaLowLevelConsumer($kafkaConfig); - - return new KafkaLowLevelConsumer( - $rdKafkaConsumer, - $kafkaConfig, - $this->decoder - ); - } - $rdKafkaConsumer = new RdKafkaHighLevelConsumer($kafkaConfig); return new KafkaHighLevelConsumer($rdKafkaConsumer, $kafkaConfig, $this->decoder); @@ -349,7 +304,7 @@ private function registerCallbacks(KafkaConfiguration $conf): void } if (null !== $this->logCallback) { - $conf->setLogCb($this->logCallback); + //$conf->setLogCb($this->logCallback); } if (null !== $this->offsetCommitCallback) { diff --git a/src/Consumer/KafkaConsumerBuilderInterface.php b/src/Consumer/KafkaConsumerBuilderInterface.php index af7d1c8..368cff6 100644 --- a/src/Consumer/KafkaConsumerBuilderInterface.php +++ b/src/Consumer/KafkaConsumerBuilderInterface.php @@ -66,14 +66,6 @@ public function withAdditionalConfig(array $config): self; */ public function withConsumerGroup(string $consumerGroup): self; - /** - * Set the consumer type, can be either CONSUMER_TYPE_LOW_LEVEL or CONSUMER_TYPE_HIGH_LEVEL - * - * @param string $consumerType - * @return KafkaConsumerBuilderInterface - */ - public function withConsumerType(string $consumerType): self; - /** * Set a callback to be called on errors. * The default callback will throw an exception for every error diff --git a/src/Consumer/KafkaLowLevelConsumer.php b/src/Consumer/KafkaLowLevelConsumer.php deleted file mode 100644 index a4b00a7..0000000 --- a/src/Consumer/KafkaLowLevelConsumer.php +++ /dev/null @@ -1,151 +0,0 @@ -queue = $consumer->newQueue(); - } - - /** - * Subcribes to all defined topics, if no partitions were set, subscribes to all partitions. - * If partition(s) (and optionally offset(s)) were set, subscribes accordingly - * - * @return void - * @throws KafkaConsumerSubscriptionException - */ - public function subscribe(): void - { - if (true === $this->isSubscribed()) { - return; - } - - try { - $topicSubscriptions = $this->kafkaConfiguration->getTopicSubscriptions(); - foreach ($topicSubscriptions as $topicSubscription) { - $topicName = $topicSubscription->getTopicName(); - $offset = $topicSubscription->getOffset(); - - if (false === isset($this->topics[$topicName])) { - $this->topics[$topicName] = $topic = $this->consumer->newTopic($topicName); - } else { - $topic = $this->topics[$topicName]; - } - - $partitions = $topicSubscription->getPartitions(); - - if ([] === $partitions) { - $topicSubscription->setPartitions($this->getAllTopicPartitions($topicName)); - $partitions = $topicSubscription->getPartitions(); - } - - foreach ($partitions as $partitionId) { - $topic->consumeQueueStart($partitionId, $offset, $this->queue); - } - } - - $this->subscribed = true; - } catch (RdKafkaException $e) { - throw new KafkaConsumerSubscriptionException($e->getMessage(), $e->getCode(), $e); - } - } - - /** - * Commits the offset to the broker for the given message(s). This is a blocking function - * - * @param mixed $messages - * @return void - * @throws KafkaConsumerCommitException - */ - public function commit($messages): void - { - $messages = is_array($messages) ? $messages : [$messages]; - - foreach ($messages as $i => $message) { - if (false === $message instanceof KafkaConsumerMessageInterface) { - throw new KafkaConsumerCommitException( - sprintf('Provided message (index: %d) is not an instance of "%s"', $i, KafkaConsumerMessage::class) - ); - } - - $this->topics[$message->getTopicName()]->offsetStore( - $message->getPartition(), - $message->getOffset() - ); - } - } - - /** - * Unsubscribes from the current subscription - * - * @return void - */ - public function unsubscribe(): void - { - if (false === $this->isSubscribed()) { - return; - } - - $topicSubscriptions = $this->kafkaConfiguration->getTopicSubscriptions(); - - /** @var TopicSubscription $topicSubscription */ - foreach ($topicSubscriptions as $topicSubscription) { - foreach ($topicSubscription->getPartitions() as $partitionId) { - $this->topics[$topicSubscription->getTopicName()]->consumeStop($partitionId); - } - } - - $this->subscribed = false; - } - - /** - * @param integer $timeoutMs - * @return null|RdKafkaMessage - */ - protected function kafkaConsume(int $timeoutMs): ?RdKafkaMessage - { - return $this->queue->consume($timeoutMs); - } -} diff --git a/src/Consumer/KafkaLowLevelConsumerInterface.php b/src/Consumer/KafkaLowLevelConsumerInterface.php deleted file mode 100644 index 46b2fb4..0000000 --- a/src/Consumer/KafkaLowLevelConsumerInterface.php +++ /dev/null @@ -1,10 +0,0 @@ - $inputValue, 'auto.commit.interval.ms' => 100 - ], - KafkaConsumerBuilder::CONSUMER_TYPE_LOW_LEVEL + ] ); $config = $kafkaConfiguration->getConfiguration(); - if(null === $expectedValue) { + if (null === $expectedValue) { self::assertArrayNotHasKey('group.id', $config); return; } @@ -112,7 +111,5 @@ public function testConfigValues($inputValue, $expectedValue): void self::assertEquals($config['metadata.broker.list'], 'localhost'); self::assertEquals($expectedValue, $config['group.id']); self::assertEquals('100', $config['auto.commit.interval.ms']); - self::assertArrayHasKey('default_topic_conf', $config); - self::assertIsString($config['default_topic_conf']); } } diff --git a/tests/Unit/Consumer/KafkaConsumerBuilderTest.php b/tests/Unit/Consumer/KafkaConsumerBuilderTest.php index 0207545..2b717fb 100644 --- a/tests/Unit/Consumer/KafkaConsumerBuilderTest.php +++ b/tests/Unit/Consumer/KafkaConsumerBuilderTest.php @@ -4,13 +4,11 @@ use Jobcloud\Kafka\Consumer\KafkaHighLevelConsumer; use Jobcloud\Kafka\Consumer\KafkaHighLevelConsumerInterface; -use Jobcloud\Kafka\Consumer\KafkaLowLevelConsumer; use Jobcloud\Kafka\Consumer\KafkaConsumerBuilder; use Jobcloud\Kafka\Message\Decoder\DecoderInterface; use Jobcloud\Kafka\Consumer\TopicSubscription; use Jobcloud\Kafka\Exception\KafkaConsumerBuilderException; use Jobcloud\Kafka\Consumer\KafkaConsumerInterface; -use Jobcloud\Kafka\Consumer\KafkaLowLevelConsumerInterface; use PHPUnit\Framework\TestCase; /** @@ -151,36 +149,6 @@ public function testSetConsumerGroup(): void self::assertNotSame($clone, $this->kafkaConsumerBuilder); } - /** - * @return void - * @throws \ReflectionException - */ - public function testSetConsumerTypeLow(): void - { - $clone = $this->kafkaConsumerBuilder->withConsumerType(KafkaConsumerBuilder::CONSUMER_TYPE_LOW_LEVEL); - - $actualConsumerType = new \ReflectionProperty($clone, 'consumerType'); - $actualConsumerType->setAccessible(true); - - self::assertSame(KafkaConsumerBuilder::CONSUMER_TYPE_LOW_LEVEL, $actualConsumerType->getValue($clone)); - self::assertNotSame($clone, $this->kafkaConsumerBuilder); - } - - /** - * @return void - * @throws \ReflectionException - */ - public function testSetConsumerTypeHigh(): void - { - $clone = $this->kafkaConsumerBuilder->withConsumerType(KafkaConsumerBuilder::CONSUMER_TYPE_HIGH_LEVEL); - - $actualConsumerType = new \ReflectionProperty($clone, 'consumerType'); - $actualConsumerType->setAccessible(true); - - self::assertSame(KafkaConsumerBuilder::CONSUMER_TYPE_HIGH_LEVEL, $actualConsumerType->getValue($clone)); - self::assertNotSame($clone, $this->kafkaConsumerBuilder); - } - /** * @return void * @throws \ReflectionException @@ -198,6 +166,13 @@ public function testSetErrorCallback(): void self::assertSame($callback, $reflectionProperty->getValue($clone)); self::assertNotSame($clone, $this->kafkaConsumerBuilder); + + $consumer = $clone + ->withAdditionalBroker('localhost') + ->withSubscription('test') + ->build(); + $conf = $consumer->getConfiguration(); + self::assertArrayHasKey('error_cb', $conf); } /** @@ -217,6 +192,13 @@ public function testSetRebalanceCallback(): void self::assertSame($callback, $reflectionProperty->getValue($clone)); self::assertNotSame($clone, $this->kafkaConsumerBuilder); + + $consumer = $clone + ->withAdditionalBroker('localhost') + ->withSubscription('test') + ->build(); + $conf = $consumer->getConfiguration(); + self::assertArrayHasKey('rebalance_cb', $conf); } /** @@ -236,6 +218,13 @@ public function testSetConsumeCallback(): void self::assertSame($callback, $reflectionProperty->getValue($clone)); self::assertNotSame($clone, $this->kafkaConsumerBuilder); + + $consumer = $clone + ->withAdditionalBroker('localhost') + ->withSubscription('test') + ->build(); + $conf = $consumer->getConfiguration(); + self::assertArrayHasKey('consume_cb', $conf); } /** @@ -255,6 +244,13 @@ public function testSetOffsetCommitCallback(): void self::assertSame($callback, $reflectionProperty->getValue($clone)); self::assertNotSame($clone, $this->kafkaConsumerBuilder); + + $consumer = $clone + ->withAdditionalBroker('localhost') + ->withSubscription('test') + ->build(); + $conf = $consumer->getConfiguration(); + self::assertArrayHasKey('offset_commit_cb', $conf); } /** @@ -297,7 +293,7 @@ public function testBuildSuccess(): void // Anonymous test method, no logic required }; - /** @var $consumer KafkaLowLevelConsumer */ + /** @var $consumer KafkaHighLevelConsumer */ $consumer = $this->kafkaConsumerBuilder ->withAdditionalBroker('localhost') ->withAdditionalSubscription('test-topic') @@ -312,58 +308,6 @@ public function testBuildSuccess(): void self::assertInstanceOf(KafkaHighLevelConsumer::class, $consumer); } - /** - * @return void - */ - public function testBuildLowLevelSuccess(): void - { - $callback = function ($kafka, $errId, $msg) { - // Anonymous test method, no logic required - }; - /** @var $consumer KafkaLowLevelConsumer */ - $consumer = $this->kafkaConsumerBuilder - ->withAdditionalBroker('localhost') - ->withAdditionalSubscription('test-topic') - ->withRebalanceCallback($callback) - ->withErrorCallback($callback) - ->withLogCallback($callback) - ->withConsumerType(KafkaConsumerBuilder::CONSUMER_TYPE_LOW_LEVEL) - ->build(); - - $conf = $consumer->getConfiguration(); - - self::assertInstanceOf(KafkaConsumerInterface::class, $consumer); - self::assertInstanceOf(KafkaLowLevelConsumerInterface::class, $consumer); - self::assertArrayHasKey('enable.auto.offset.store', $conf); - self::assertEquals($conf['enable.auto.offset.store'], 'false'); - } - - /** - * @return void - */ - public function testBuildLowLevelFailureOnUnsupportedCallback(): void - { - $callback = function ($kafka, $errId, $msg) { - // Anonymous test method, no logic required - }; - - self::expectException(KafkaConsumerBuilderException::class); - self::expectExceptionMessage( - sprintf( - KafkaConsumerBuilderException::UNSUPPORTED_CALLBACK_EXCEPTION_MESSAGE, - 'consumerCallback', - KafkaLowLevelConsumer::class - ) - ); - - $this->kafkaConsumerBuilder - ->withAdditionalBroker('localhost') - ->withAdditionalSubscription('test-topic') - ->withConsumeCallback($callback) - ->withConsumerType(KafkaConsumerBuilder::CONSUMER_TYPE_LOW_LEVEL) - ->build(); - } - /** * @return void */ diff --git a/tests/Unit/Consumer/KafkaHighLevelConsumerTest.php b/tests/Unit/Consumer/KafkaHighLevelConsumerTest.php index 685a422..d5003ae 100644 --- a/tests/Unit/Consumer/KafkaHighLevelConsumerTest.php +++ b/tests/Unit/Consumer/KafkaHighLevelConsumerTest.php @@ -4,6 +4,8 @@ use Jobcloud\Kafka\Consumer\KafkaHighLevelConsumer; use Jobcloud\Kafka\Exception\KafkaConsumerConsumeException; +use Jobcloud\Kafka\Exception\KafkaConsumerEndOfPartitionException; +use Jobcloud\Kafka\Exception\KafkaConsumerTimeoutException; use Jobcloud\Kafka\Message\Decoder\DecoderInterface; use Jobcloud\Kafka\Consumer\TopicSubscription; use Jobcloud\Kafka\Exception\KafkaConsumerAssignmentException; @@ -16,7 +18,7 @@ use RdKafka\KafkaConsumer as RdKafkaHighLevelConsumer; use RdKafka\ConsumerTopic as RdKafkaConsumerTopic; use RdKafka\Exception as RdKafkaException; -use RdKafka\Message; +use RdKafka\Message as RdKafkaMessage; use RdKafka\Metadata as RdKafkaMetadata; use RdKafka\Metadata\Collection as RdKafkaMetadataCollection; use RdKafka\Metadata\Partition as RdKafkaMetadataPartition; @@ -457,7 +459,7 @@ public function testGetAssignmentException(): void public function testKafkaConsumeWithDecode(): void { - $message = new Message(); + $message = new RdKafkaMessage(); $message->key = 'test'; $message->payload = null; $message->topic_name = 'test_topic'; @@ -504,7 +506,7 @@ function (KafkaConsumerMessageInterface $message) { public function testKafkaConsumeWithoutDecode(): void { - $message = new Message(); + $message = new RdKafkaMessage(); $message->key = 'test'; $message->payload = null; $message->topic_name = 'test_topic'; @@ -657,4 +659,247 @@ private function getMetadataPartitionMock(int $partitionId): RdKafkaMetadataPart return $partitionMock; } + + /** + * @return void + */ + public function testOffsetsForTimes(): void + { + $rdKafkaConsumerMock = $this->createMock(RdKafkaHighLevelConsumer::class); + $kafkaConfigurationMock = $this->createMock(KafkaConfiguration::class); + $decoderMock = $this->getMockForAbstractClass(DecoderInterface::class); + $kafkaConsumer = new KafkaHighLevelConsumer($rdKafkaConsumerMock, $kafkaConfigurationMock, $decoderMock); + + $rdKafkaConsumerMock + ->expects(self::once()) + ->method('offsetsForTimes') + ->with([], 1000) + ->willReturn([]); + + $kafkaConsumer->offsetsForTimes([], 1000); + } + + /** + * @return void + */ + public function testGetConfiguration(): void + { + $rdKafkaConsumerMock = $this->createMock(RdKafkaHighLevelConsumer::class); + $kafkaConfigurationMock = $this->createMock(KafkaConfiguration::class); + $kafkaConfigurationMock->expects(self::any())->method('dump')->willReturn([]); + $decoderMock = $this->getMockForAbstractClass(DecoderInterface::class); + $kafkaConsumer = new KafkaHighLevelConsumer($rdKafkaConsumerMock, $kafkaConfigurationMock, $decoderMock); + + self::assertIsArray($kafkaConsumer->getConfiguration()); + } + + /** + * @return void + */ + public function testGetFirstOffsetForTopicPartition(): void + { + $rdKafkaConsumerMock = $this->createMock(RdKafkaHighLevelConsumer::class); + $kafkaConfigurationMock = $this->createMock(KafkaConfiguration::class); + $decoderMock = $this->getMockForAbstractClass(DecoderInterface::class); + + $rdKafkaConsumerMock + ->expects(self::once()) + ->method('queryWatermarkOffsets') + ->with('test-topic', 1, 0, 0, 1000) + ->willReturnCallback( + function (string $topic, int $partition, int &$lowOffset, int &$highOffset, int $timeoutMs) { + $lowOffset++; + } + ); + + $kafkaConsumer = new KafkaHighLevelConsumer($rdKafkaConsumerMock, $kafkaConfigurationMock, $decoderMock); + + $lowOffset = $kafkaConsumer->getFirstOffsetForTopicPartition('test-topic', 1, 1000); + + self::assertEquals(1, $lowOffset); + } + + /** + * @return void + */ + public function testGetLastOffsetForTopicPartition(): void + { + $rdKafkaConsumerMock = $this->createMock(RdKafkaHighLevelConsumer::class); + $kafkaConfigurationMock = $this->createMock(KafkaConfiguration::class); + $decoderMock = $this->getMockForAbstractClass(DecoderInterface::class); + + $rdKafkaConsumerMock + ->expects(self::once()) + ->method('queryWatermarkOffsets') + ->with('test-topic', 1, 0, 0, 1000) + ->willReturnCallback( + function (string $topic, int $partition, int &$lowOffset, int &$highOffset, int $timeoutMs) { + $highOffset += 5; + } + ); + + $kafkaConsumer = new KafkaHighLevelConsumer($rdKafkaConsumerMock, $kafkaConfigurationMock, $decoderMock); + + $lowOffset = $kafkaConsumer->getLastOffsetForTopicPartition('test-topic', 1, 1000); + + $this->assertEquals(5, $lowOffset); + } + + /** + * @throws KafkaConsumerConsumeException + * @throws KafkaConsumerEndOfPartitionException + * @throws KafkaConsumerSubscriptionException + * @throws KafkaConsumerTimeoutException + * @return void + */ + public function testConsumeThrowsEofExceptionIfQueueConsumeReturnsNull(): void + { + self::expectException(KafkaConsumerEndOfPartitionException::class); + self::expectExceptionCode(RD_KAFKA_RESP_ERR__PARTITION_EOF); + self::expectExceptionMessage(rd_kafka_err2str(RD_KAFKA_RESP_ERR__PARTITION_EOF)); + + $rdKafkaConsumerMock = $this->createMock(RdKafkaHighLevelConsumer::class); + $kafkaConfigurationMock = $this->createMock(KafkaConfiguration::class); + $decoderMock = $this->getMockForAbstractClass(DecoderInterface::class); + + $kafkaConsumer = new KafkaHighLevelConsumer($rdKafkaConsumerMock, $kafkaConfigurationMock, $decoderMock); + + $rdKafkaConsumerMock + ->expects(self::once()) + ->method('consume') + ->with(10000) + ->willReturn(null); + + $kafkaConsumer->subscribe(); + $kafkaConsumer->consume(); + } + + /** + * @throws KafkaConsumerConsumeException + * @throws KafkaConsumerEndOfPartitionException + * @throws KafkaConsumerSubscriptionException + * @throws KafkaConsumerTimeoutException + * @return void + */ + public function testConsumeDedicatedEofException(): void + { + self::expectException(KafkaConsumerEndOfPartitionException::class); + self::expectExceptionCode(RD_KAFKA_RESP_ERR__PARTITION_EOF); + self::expectExceptionMessage(rd_kafka_err2str(RD_KAFKA_RESP_ERR__PARTITION_EOF)); + + $rdKafkaConsumerMock = $this->createMock(RdKafkaHighLevelConsumer::class); + $kafkaConfigurationMock = $this->createMock(KafkaConfiguration::class); + $decoderMock = $this->getMockForAbstractClass(DecoderInterface::class); + + $kafkaConsumer = new KafkaHighLevelConsumer($rdKafkaConsumerMock, $kafkaConfigurationMock, $decoderMock); + + $message = new RdKafkaMessage(); + $message->err = RD_KAFKA_RESP_ERR__PARTITION_EOF; + + $rdKafkaConsumerMock + ->expects(self::once()) + ->method('consume') + ->with(10000) + ->willReturn($message); + + $kafkaConsumer->subscribe(); + $kafkaConsumer->consume(); + } + + /** + * @throws KafkaConsumerConsumeException + * @throws KafkaConsumerEndOfPartitionException + * @throws KafkaConsumerSubscriptionException + * @throws KafkaConsumerTimeoutException + * @return void + */ + public function testConsumeDedicatedTimeoutException(): void + { + self::expectException(KafkaConsumerTimeoutException::class); + self::expectExceptionCode(RD_KAFKA_RESP_ERR__TIMED_OUT); + self::expectExceptionMessage(rd_kafka_err2str(RD_KAFKA_RESP_ERR__TIMED_OUT)); + + $rdKafkaConsumerMock = $this->createMock(RdKafkaHighLevelConsumer::class); + $kafkaConfigurationMock = $this->createMock(KafkaConfiguration::class); + $decoderMock = $this->getMockForAbstractClass(DecoderInterface::class); + + $kafkaConsumer = new KafkaHighLevelConsumer($rdKafkaConsumerMock, $kafkaConfigurationMock, $decoderMock); + + $message = new RdKafkaMessage(); + $message->err = RD_KAFKA_RESP_ERR__TIMED_OUT; + + $rdKafkaConsumerMock + ->expects(self::once()) + ->method('consume') + ->with(1000) + ->willReturn($message); + + $kafkaConsumer->subscribe(); + $kafkaConsumer->consume(1000); + } + + /** + * @throws KafkaConsumerConsumeException + * @throws KafkaConsumerEndOfPartitionException + * @throws KafkaConsumerSubscriptionException + * @throws KafkaConsumerTimeoutException + * @return void + */ + public function testConsumeThrowsExceptionIfConsumedMessageHasNoTopicAndErrorCodeIsNotOkay(): void + { + self::expectException(KafkaConsumerConsumeException::class); + self::expectExceptionMessage('Unknown error'); + + $rdKafkaConsumerMock = $this->createMock(RdKafkaHighLevelConsumer::class); + $kafkaConfigurationMock = $this->createMock(KafkaConfiguration::class); + $decoderMock = $this->getMockForAbstractClass(DecoderInterface::class); + + $kafkaConsumer = new KafkaHighLevelConsumer($rdKafkaConsumerMock, $kafkaConfigurationMock, $decoderMock); + + /** @var RdKafkaMessage|MockObject $rdKafkaMessageMock */ + $rdKafkaMessageMock = $this->createMock(RdKafkaMessage::class); + $rdKafkaMessageMock->err = RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN; + $rdKafkaMessageMock->partition = 1; + $rdKafkaMessageMock->offset = 103; + $rdKafkaMessageMock->topic_name = null; + $rdKafkaMessageMock + ->expects(self::once()) + ->method('errstr') + ->willReturn('Unknown error'); + + $topicSubscription = new TopicSubscription('test-topic', [1], 103); + + $rdKafkaConsumerMock + ->expects(self::once()) + ->method('consume') + ->with(10000) + ->willReturn($rdKafkaMessageMock); + $kafkaConfigurationMock + ->expects(self::exactly(2)) + ->method('getTopicSubscriptions') + ->willReturn([$topicSubscription]); + + $kafkaConsumer->subscribe(); + $kafkaConsumer->consume(); + } + + /** + * @throws KafkaConsumerConsumeException + * @throws KafkaConsumerEndOfPartitionException + * @throws KafkaConsumerTimeoutException + * @return void + */ + public function testConsumeThrowsExceptionIfConsumerIsCurrentlyNotSubscribed(): void + { + self::expectException(KafkaConsumerConsumeException::class); + self::expectExceptionMessage('This consumer is currently not subscribed'); + + $rdKafkaConsumerMock = $this->createMock(RdKafkaHighLevelConsumer::class); + $kafkaConfigurationMock = $this->createMock(KafkaConfiguration::class); + $decoderMock = $this->getMockForAbstractClass(DecoderInterface::class); + + $kafkaConsumer = new KafkaHighLevelConsumer($rdKafkaConsumerMock, $kafkaConfigurationMock, $decoderMock); + + $kafkaConsumer->consume(); + } } diff --git a/tests/Unit/Consumer/KafkaLowLevelConsumerTest.php b/tests/Unit/Consumer/KafkaLowLevelConsumerTest.php deleted file mode 100644 index 8e33855..0000000 --- a/tests/Unit/Consumer/KafkaLowLevelConsumerTest.php +++ /dev/null @@ -1,590 +0,0 @@ -rdKafkaQueueMock = $this->createMock(RdKafkaQueue::class); - $this->rdKafkaConsumerMock = $this->createMock(RdKafkaLowLevelConsumer::class); - $this->rdKafkaConsumerMock - ->expects(self::atLeastOnce()) - ->method('newQueue') - ->willReturn($this->rdKafkaQueueMock); - $this->kafkaConfigurationMock = $this->createMock(KafkaConfiguration::class); - $this->kafkaConfigurationMock->expects(self::any())->method('dump')->willReturn([]); - $this->decoderMock = $this->getMockForAbstractClass(DecoderInterface::class); - $this->kafkaConsumer = new KafkaLowLevelConsumer($this->rdKafkaConsumerMock, $this->kafkaConfigurationMock, $this->decoderMock); - } - - /** - * @throws KafkaConsumerConsumeException - * @throws KafkaConsumerEndOfPartitionException - * @throws KafkaConsumerSubscriptionException - * @throws KafkaConsumerTimeoutException - * @return void - */ - public function testConsumeWithTopicSubscriptionWithNoPartitionsIsSuccessful(): void - { - $partitions = [ - $this->getMetadataPartitionMock(1), - $this->getMetadataPartitionMock(2) - ]; - - /** @var RdKafkaMessage|MockObject $rdKafkaMessageMock */ - $rdKafkaMessageMock = $this->createMock(RdKafkaMessage::class); - $rdKafkaMessageMock->err = RD_KAFKA_RESP_ERR_NO_ERROR; - $rdKafkaMessageMock->topic_name = 'sample_topic'; - $rdKafkaMessageMock->partition = 0; - $rdKafkaMessageMock->offset = 1; - $rdKafkaMessageMock->timestamp = 1; - $rdKafkaMessageMock->headers = null; - $rdKafkaMessageMock - ->expects(self::never()) - ->method('errstr'); - - $kafkaMessageMock = $this->getMockForAbstractClass(KafkaConsumerMessageInterface::class); - $kafkaMessageMock->expects(self::once())->method('getBody')->willReturn($rdKafkaMessageMock->payload); - $kafkaMessageMock->expects(self::once())->method('getOffset')->willReturn($rdKafkaMessageMock->offset); - $kafkaMessageMock->expects(self::once())->method('getPartition')->willReturn($rdKafkaMessageMock->partition); - - $this->decoderMock->expects(self::once())->method('decode')->willReturn($kafkaMessageMock); - - /** @var RdKafkaConsumerTopic|MockObject $rdKafkaConsumerTopicMock */ - $rdKafkaConsumerTopicMock = $this->createMock(RdKafkaConsumerTopic::class); - - /** @var RdKafkaMetadataTopic|MockObject $rdKafkaMetadataTopicMock */ - $rdKafkaMetadataTopicMock = $this->createMock(RdKafkaMetadataTopic::class); - $rdKafkaMetadataTopicMock - ->expects(self::once()) - ->method('getPartitions') - ->willReturn($partitions); - - /** @var RdKafkaMetadata|MockObject $rdKafkaMetadataMock */ - $rdKafkaMetadataMock = $this->createMock(RdKafkaMetadata::class); - $rdKafkaMetadataMock - ->expects(self::once()) - ->method('getTopics') - ->willReturnCallback( - function () use ($rdKafkaMetadataTopicMock) { - /** @var RdKafkaMetadataCollection|MockObject $collection */ - $collection = $this->createMock(RdKafkaMetadataCollection::class); - $collection - ->expects(self::once()) - ->method('current') - ->willReturn($rdKafkaMetadataTopicMock); - - return $collection; - } - ); - - $this->rdKafkaQueueMock - ->expects(self::once()) - ->method('consume') - ->with(10000) - ->willReturn($rdKafkaMessageMock); - $this->kafkaConfigurationMock - ->expects(self::once()) - ->method('getTopicSubscriptions') - ->willReturn([new TopicSubscription('test-topic')]); - $this->rdKafkaConsumerMock - ->expects(self::once()) - ->method('getMetadata') - ->with(false, $rdKafkaConsumerTopicMock, 10000) - ->willReturn($rdKafkaMetadataMock); - $this->rdKafkaConsumerMock - ->expects(self::exactly(2)) - ->method('newTopic') - ->with('test-topic') - ->willReturn($rdKafkaConsumerTopicMock); - - $this->kafkaConsumer->subscribe(); - $message = $this->kafkaConsumer->consume(); - - self::assertInstanceOf(KafkaConsumerMessageInterface::class, $message); - self::assertInstanceOf(KafkaMessageInterface::class, $message); - - self::assertEquals($rdKafkaMessageMock->payload, $message->getBody()); - self::assertEquals($rdKafkaMessageMock->offset, $message->getOffset()); - self::assertEquals($rdKafkaMessageMock->partition, $message->getPartition()); - } - - /** - * @throws KafkaConsumerConsumeException - * @throws KafkaConsumerEndOfPartitionException - * @throws KafkaConsumerSubscriptionException - * @throws KafkaConsumerTimeoutException - * @return void - */ - public function testConsumeThrowsEofExceptionIfQueueConsumeReturnsNull(): void - { - self::expectException(KafkaConsumerEndOfPartitionException::class); - self::expectExceptionCode(RD_KAFKA_RESP_ERR__PARTITION_EOF); - self::expectExceptionMessage(rd_kafka_err2str(RD_KAFKA_RESP_ERR__PARTITION_EOF)); - - $this->rdKafkaQueueMock - ->expects(self::once()) - ->method('consume') - ->with(10000) - ->willReturn(null); - - $this->kafkaConsumer->subscribe(); - $this->kafkaConsumer->consume(); - } - - /** - * @throws KafkaConsumerConsumeException - * @throws KafkaConsumerEndOfPartitionException - * @throws KafkaConsumerSubscriptionException - * @throws KafkaConsumerTimeoutException - * @return void - */ - public function testConsumeDedicatedEofException(): void - { - self::expectException(KafkaConsumerEndOfPartitionException::class); - self::expectExceptionCode(RD_KAFKA_RESP_ERR__PARTITION_EOF); - self::expectExceptionMessage(rd_kafka_err2str(RD_KAFKA_RESP_ERR__PARTITION_EOF)); - - $message = new RdKafkaMessage(); - $message->err = RD_KAFKA_RESP_ERR__PARTITION_EOF; - - $this->rdKafkaQueueMock - ->expects(self::once()) - ->method('consume') - ->with(10000) - ->willReturn($message); - - $this->kafkaConsumer->subscribe(); - $this->kafkaConsumer->consume(); - } - - /** - * @throws KafkaConsumerConsumeException - * @throws KafkaConsumerEndOfPartitionException - * @throws KafkaConsumerSubscriptionException - * @throws KafkaConsumerTimeoutException - * @return void - */ - public function testConsumeDedicatedTimeoutException(): void - { - self::expectException(KafkaConsumerTimeoutException::class); - self::expectExceptionCode(RD_KAFKA_RESP_ERR__TIMED_OUT); - self::expectExceptionMessage(rd_kafka_err2str(RD_KAFKA_RESP_ERR__TIMED_OUT)); - - $message = new RdKafkaMessage(); - $message->err = RD_KAFKA_RESP_ERR__TIMED_OUT; - - $this->rdKafkaQueueMock - ->expects(self::once()) - ->method('consume') - ->with(1000) - ->willReturn($message); - - $this->kafkaConsumer->subscribe(); - $this->kafkaConsumer->consume(1000); - } - - /** - * @throws KafkaConsumerConsumeException - * @throws KafkaConsumerEndOfPartitionException - * @throws KafkaConsumerSubscriptionException - * @throws KafkaConsumerTimeoutException - * @return void - */ - public function testConsumeThrowsExceptionIfConsumedMessageHasNoTopicAndErrorCodeIsNotOkay(): void - { - self::expectException(KafkaConsumerConsumeException::class); - self::expectExceptionMessage('Unknown error'); - - /** @var RdKafkaMessage|MockObject $rdKafkaMessageMock */ - $rdKafkaMessageMock = $this->createMock(RdKafkaMessage::class); - $rdKafkaMessageMock->err = RD_KAFKA_RESP_ERR__ALL_BROKERS_DOWN; - $rdKafkaMessageMock->partition = 1; - $rdKafkaMessageMock->offset = 103; - $rdKafkaMessageMock->topic_name = null; - $rdKafkaMessageMock - ->expects(self::once()) - ->method('errstr') - ->willReturn('Unknown error'); - - /** @var RdKafkaConsumerTopic|MockObject $rdKafkaConsumerTopicMock */ - $rdKafkaConsumerTopicMock = $this->createMock(RdKafkaConsumerTopic::class); - $rdKafkaConsumerTopicMock - ->expects(self::once()) - ->method('consumeQueueStart') - ->with(1, 103, $this->rdKafkaQueueMock) - ->willReturn(null); - - $topicSubscription = new TopicSubscription('test-topic', [1], 103); - - $this->rdKafkaQueueMock - ->expects(self::once()) - ->method('consume') - ->with(10000) - ->willReturn($rdKafkaMessageMock); - $this->kafkaConfigurationMock - ->expects(self::once()) - ->method('getTopicSubscriptions') - ->willReturn([$topicSubscription]); - $this->rdKafkaConsumerMock - ->expects(self::once()) - ->method('newTopic') - ->with('test-topic') - ->willReturn($rdKafkaConsumerTopicMock); - - $this->kafkaConsumer->subscribe(); - $this->kafkaConsumer->consume(); - } - - /** - * @throws KafkaConsumerConsumeException - * @throws KafkaConsumerEndOfPartitionException - * @throws KafkaConsumerSubscriptionException - * @throws KafkaConsumerTimeoutException - * @return void - */ - public function testConsumeFailThrowsException(): void - { - self::expectException(KafkaConsumerConsumeException::class); - self::expectExceptionMessage('Unknown error'); - - /** @var RdKafkaMessage|MockObject $rdKafkaMessageMock */ - $rdKafkaMessageMock = $this->createMock(RdKafkaMessage::class); - $rdKafkaMessageMock->err = -1; - $rdKafkaMessageMock->partition = 1; - $rdKafkaMessageMock->offset = 103; - $rdKafkaMessageMock->topic_name = 'test-topic'; - $rdKafkaMessageMock->timestamp = 1; - $rdKafkaMessageMock->headers = ['key' => 'value']; - $rdKafkaMessageMock - ->expects(self::once()) - ->method('errstr') - ->willReturn('Unknown error'); - - /** @var RdKafkaConsumerTopic|MockObject $rdKafkaConsumerTopicMock */ - $rdKafkaConsumerTopicMock = $this->createMock(RdKafkaConsumerTopic::class); - $rdKafkaConsumerTopicMock - ->expects(self::once()) - ->method('consumeQueueStart') - ->with(1, 103, $this->rdKafkaQueueMock) - ->willReturn(null); - - $topicSubscription = new TopicSubscription('test-topic', [1], 103); - - $this->rdKafkaQueueMock - ->expects(self::once()) - ->method('consume') - ->with(10000) - ->willReturn($rdKafkaMessageMock); - $this->kafkaConfigurationMock - ->expects(self::once()) - ->method('getTopicSubscriptions') - ->willReturn([$topicSubscription]); - $this->rdKafkaConsumerMock - ->expects(self::once()) - ->method('newTopic') - ->with('test-topic') - ->willReturn($rdKafkaConsumerTopicMock); - - $this->kafkaConsumer->subscribe(); - $this->kafkaConsumer->consume(); - } - - /** - * @throws KafkaConsumerConsumeException - * @throws KafkaConsumerEndOfPartitionException - * @throws KafkaConsumerTimeoutException - * @return void - */ - public function testConsumeThrowsExceptionIfConsumerIsCurrentlyNotSubscribed(): void - { - self::expectException(KafkaConsumerConsumeException::class); - self::expectExceptionMessage('This consumer is currently not subscribed'); - - $this->kafkaConsumer->consume(); - } - - /** - * @throws KafkaConsumerSubscriptionException - * @throws \ReflectionException - * @return void - */ - public function testSubscribeEarlyReturnsIfAlreadySubscribed(): void - { - $subscribedProperty = new \ReflectionProperty(KafkaLowLevelConsumer::class, 'subscribed'); - $subscribedProperty->setAccessible(true); - $subscribedProperty->setValue($this->kafkaConsumer, true); - - $this->kafkaConsumer->subscribe(); - } - - /** - * @throws KafkaConsumerSubscriptionException - * @return void - */ - public function testSubscribeConvertsExtensionExceptionToLibraryException(): void - { - self::expectException(KafkaConsumerSubscriptionException::class); - self::expectExceptionMessage('TEST_EXCEPTION_MESSAGE'); - - $topicSubscription = new TopicSubscription('test-topic', [1], 103); - - $this->kafkaConfigurationMock - ->expects(self::once()) - ->method('getTopicSubscriptions') - ->willReturn([$topicSubscription]); - $this->rdKafkaConsumerMock - ->expects(self::once()) - ->method('newTopic') - ->with('test-topic') - ->willThrowException(new RdKafkaException('TEST_EXCEPTION_MESSAGE')); - - $this->kafkaConsumer->subscribe(); - } - - /** - * @throws KafkaConsumerSubscriptionException - * @return void - */ - public function testSubscribeUseExistingTopicsForResubscribe(): void - { - $topicSubscription = new TopicSubscription('test-topic', [1], 103); - - /** @var RdKafkaConsumerTopic|MockObject $rdKafkaConsumerTopicMock */ - $rdKafkaConsumerTopicMock = $this->createMock(RdKafkaConsumerTopic::class); - $rdKafkaConsumerTopicMock - ->expects(self::exactly(2)) - ->method('consumeQueueStart') - ->with(1, 103, $this->rdKafkaQueueMock) - ->willReturn(null); - $rdKafkaConsumerTopicMock - ->expects(self::once()) - ->method('consumeStop') - ->with(1) - ->willReturn(null); - - $this->kafkaConfigurationMock - ->expects(self::exactly(3)) - ->method('getTopicSubscriptions') - ->willReturn([$topicSubscription]); - $this->rdKafkaConsumerMock - ->expects(self::once()) - ->method('newTopic') - ->with('test-topic') - ->willReturn($rdKafkaConsumerTopicMock); - - $this->kafkaConsumer->subscribe(); - - self::assertTrue($this->kafkaConsumer->isSubscribed()); - - $this->kafkaConsumer->unsubscribe(); - - self::assertFalse($this->kafkaConsumer->isSubscribed()); - - $this->kafkaConsumer->subscribe(); - } - - /** - * @throws KafkaConsumerCommitException - * @throws \ReflectionException - * @return void - */ - public function testCommitWithMessageStoresOffsetOfIt(): void - { - $message = $this->getMockForAbstractClass(KafkaConsumerMessageInterface::class); - $message->expects(self::once())->method('getTopicName')->willReturn('test-topic'); - - /** @var RdKafkaConsumerTopic|MockObject $rdKafkaConsumerTopicMock */ - $rdKafkaConsumerTopicMock = $this->createMock(RdKafkaConsumerTopic::class); - $rdKafkaConsumerTopicMock - ->expects(self::once()) - ->method('offsetStore') - ->with($message->getPartition(), $message->getOffset()); - - $rdKafkaConsumerMockProperty = new \ReflectionProperty(KafkaLowLevelConsumer::class, 'topics'); - $rdKafkaConsumerMockProperty->setAccessible(true); - $rdKafkaConsumerMockProperty->setValue( - $this->kafkaConsumer, - ['test-topic' => $rdKafkaConsumerTopicMock] - ); - - $this->kafkaConsumer->commit($message); - } - - /** - * @throws KafkaConsumerCommitException - * @throws \ReflectionException - * @return void - */ - public function testCommitWithInvalidObjectThrowsExceptionAndDoesNotTriggerCommit(): void - { - self::expectException(KafkaConsumerCommitException::class); - self::expectExceptionMessage( - 'Provided message (index: 0) is not an instance of "Jobcloud\Kafka\Message\KafkaConsumerMessage"' - ); - - $message = new \stdClass(); - - /** @var RdKafkaConsumerTopic|MockObject $rdKafkaConsumerTopicMock */ - $rdKafkaConsumerTopicMock = $this->createMock(RdKafkaConsumerTopic::class); - $rdKafkaConsumerTopicMock - ->expects(self::never()) - ->method('offsetStore'); - - $rdKafkaConsumerMockProperty = new \ReflectionProperty(KafkaLowLevelConsumer::class, 'topics'); - $rdKafkaConsumerMockProperty->setAccessible(true); - $rdKafkaConsumerMockProperty->setValue($this->kafkaConsumer, ['test-topic' => $rdKafkaConsumerTopicMock]); - - $this->kafkaConsumer->commit($message); - } - - /** - * @return void - */ - public function testUnsubscribeEarlyReturnsIfAlreadyUnsubscribed(): void - { - self::assertFalse($this->kafkaConsumer->isSubscribed()); - - $this->kafkaConsumer->unsubscribe(); - } - - /** - * @return void - */ - public function testIsSubscribedReturnsDefaultSubscriptionState(): void - { - self::assertFalse($this->kafkaConsumer->isSubscribed()); - } - - /** - * @return void - */ - public function testGetConfiguration(): void - { - self::assertIsArray($this->kafkaConsumer->getConfiguration()); - } - - /** - * @return void - */ - public function testGetFirstOffsetForTopicPartition(): void - { - $this->rdKafkaConsumerMock - ->expects(self::once()) - ->method('queryWatermarkOffsets') - ->with('test-topic', 1, 0, 0, 1000) - ->willReturnCallback( - function (string $topic, int $partition, int &$lowOffset, int &$highOffset, int $timeoutMs) { - $lowOffset++; - } - ); - - $this->kafkaConsumer = new KafkaLowLevelConsumer($this->rdKafkaConsumerMock, $this->kafkaConfigurationMock, $this->decoderMock); - - $lowOffset = $this->kafkaConsumer->getFirstOffsetForTopicPartition('test-topic', 1, 1000); - - $this->assertEquals(1, $lowOffset); - } - - /** - * @return void - */ - public function testGetLastOffsetForTopicPartition(): void - { - $this->rdKafkaConsumerMock - ->expects(self::once()) - ->method('queryWatermarkOffsets') - ->with('test-topic', 1, 0, 0, 1000) - ->willReturnCallback( - function (string $topic, int $partition, int &$lowOffset, int &$highOffset, int $timeoutMs) { - $highOffset += 5; - } - ); - - $this->kafkaConsumer = new KafkaLowLevelConsumer($this->rdKafkaConsumerMock, $this->kafkaConfigurationMock, $this->decoderMock); - - $lowOffset = $this->kafkaConsumer->getLastOffsetForTopicPartition('test-topic', 1, 1000); - - $this->assertEquals(5, $lowOffset); - } - - /** - * @param int $partitionId - * @return RdKafkaMetadataPartition|MockObject - */ - private function getMetadataPartitionMock(int $partitionId): RdKafkaMetadataPartition - { - $partitionMock = $this->getMockBuilder(RdKafkaMetadataPartition::class) - ->disableOriginalConstructor() - ->onlyMethods(['getId']) - ->getMock(); - - $partitionMock - ->expects(self::once()) - ->method('getId') - ->willReturn($partitionId); - - return $partitionMock; - } - - /** - * @return void - */ - public function testOffsetsForTimes(): void - { - $this->rdKafkaConsumerMock - ->expects(self::once()) - ->method('offsetsForTimes') - ->with([], 1000) - ->willReturn([]); - - $this->kafkaConsumer->offsetsForTimes([], 1000); - } -}