Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chore/remove low level consumer #40

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -37,7 +37,7 @@ infection-testing:
make coverage
cp -f build/logs/phpunit/junit.xml build/logs/phpunit/coverage/junit.xml
sudo php-ext-disable pcov
${INFECTION} --coverage=build/logs/phpunit/coverage --min-msi=91 --threads=`nproc`
${INFECTION} --coverage=build/logs/phpunit/coverage --min-msi=93 --threads=`nproc`
sudo php-ext-enable pcov

pcov-enable:
40 changes: 0 additions & 40 deletions README.md
Original file line number Diff line number Diff line change
@@ -204,46 +204,6 @@ while (true) {
}
```

#### Kafka Low Level

```php
<?php

use Jobcloud\Kafka\Consumer\KafkaConsumerBuilder;
use Jobcloud\Kafka\Exception\KafkaConsumerConsumeException;
use Jobcloud\Kafka\Exception\KafkaConsumerEndOfPartitionException;
use Jobcloud\Kafka\Exception\KafkaConsumerTimeoutException;

$consumer = KafkaConsumerBuilder::create()
->withAdditionalConfig(
[
'compression.codec' => 'lz4',
'auto.commit.interval.ms' => 500
]
)
->withAdditionalBroker('kafka:9092')
->withConsumerGroup('testGroup')
->withAdditionalSubscription('test-topic')
->withConsumerType(KafkaConsumerBuilder::CONSUMER_TYPE_LOW_LEVEL)
->build();

$consumer->subscribe();

while (true) {
try {
$message = $consumer->consume();
// your business logic
$consumer->commit($message);
} catch (KafkaConsumerTimeoutException $e) {
//no messages were read in a given time
} catch (KafkaConsumerEndOfPartitionException $e) {
//only occurs if enable.partition.eof is true (default: false)
} catch (KafkaConsumerConsumeException $e) {
// Failed
}
}
```

#### Avro Consumer
To create an avro consumer add the avro decoder.

40 changes: 2 additions & 38 deletions src/Conf/KafkaConfiguration.php
Original file line number Diff line number Diff line change
@@ -4,10 +4,8 @@

namespace Jobcloud\Kafka\Conf;

use Jobcloud\Kafka\Consumer\KafkaConsumerBuilder;
use Jobcloud\Kafka\Consumer\TopicSubscription;
use RdKafka\Conf as RdKafkaConf;
use RdKafka\TopicConf as RdKafkaTopicConf;

class KafkaConfiguration extends RdKafkaConf
{
@@ -22,32 +20,17 @@ class KafkaConfiguration extends RdKafkaConf
*/
protected $topicSubscriptions;

/**
* @var string
*/
private $type;

/**
* @var array<string,int>
*/
private $lowLevelTopicSettings = [
'auto.commit.interval.ms' => 1,
'auto.offset.reset' => 1,
];

/**
* @param string[] $brokers
* @param array|TopicSubscription[] $topicSubscriptions
* @param mixed[] $config
* @param string $type
*/
public function __construct(array $brokers, array $topicSubscriptions = [], array $config = [], string $type = '')
public function __construct(array $brokers, array $topicSubscriptions = [], array $config = [])
{
parent::__construct();

$this->brokers = $brokers;
$this->topicSubscriptions = $topicSubscriptions;
$this->type = $type;
$this->initializeConfig($config);
}

@@ -79,23 +62,13 @@ public function getConfiguration(): array
* @param mixed[] $config
* @return void
*/
protected function initializeConfig(array $config = []): void
private function initializeConfig(array $config = []): void
{
$topicConf = new RdKafkaTopicConf();

foreach ($config as $name => $value) {
if (false === is_scalar($value)) {
continue;
}

if (
KafkaConsumerBuilder::CONSUMER_TYPE_LOW_LEVEL === $this->type
&& true === $this->isLowLevelTopicConfSetting($name)
) {
$topicConf->set($name, (string) $value);
$this->setDefaultTopicConf($topicConf);
}

if (true === is_bool($value)) {
$value = true === $value ? 'true' : 'false';
}
@@ -105,13 +78,4 @@ protected function initializeConfig(array $config = []): void

$this->set('metadata.broker.list', implode(',', $this->getBrokers()));
}

/**
* @param string $settingName
* @return bool
*/
private function isLowLevelTopicConfSetting(string $settingName): bool
{
return true === isset($this->lowLevelTopicSettings[$settingName]);
}
}
7 changes: 4 additions & 3 deletions src/Consumer/AbstractKafkaConsumer.php
Original file line number Diff line number Diff line change
@@ -11,9 +11,9 @@
use Jobcloud\Kafka\Conf\KafkaConfiguration;
use Jobcloud\Kafka\Exception\KafkaConsumerConsumeException;
use Jobcloud\Kafka\Message\KafkaConsumerMessage;
use RdKafka\Consumer as RdKafkaLowLevelConsumer;
use RdKafka\Exception as RdKafkaException;
use RdKafka\KafkaConsumer as RdKafkaHighLevelConsumer;
use RdKafka\KafkaConsumerTopic as RdKafkaConsumerTopic;
use RdKafka\Metadata\Topic as RdKafkaMetadataTopic;
use RdKafka\Message as RdKafkaMessage;
use RdKafka\TopicPartition as RdKafkaTopicPartition;
@@ -32,7 +32,7 @@ abstract class AbstractKafkaConsumer implements KafkaConsumerInterface
protected $subscribed = false;

/**
* @var RdKafkaLowLevelConsumer|RdKafkaHighLevelConsumer
* @var RdKafkaHighLevelConsumer
*/
protected $consumer;

@@ -141,6 +141,7 @@ public function decodeMessage(KafkaConsumerMessageInterface $message): KafkaCons
*/
public function getMetadataForTopic(string $topicName, int $timeoutMs = 10000): RdKafkaMetadataTopic
{
/** @var RdKafkaConsumerTopic $topic */
$topic = $this->consumer->newTopic($topicName);
return $this->consumer
->getMetadata(
@@ -222,7 +223,7 @@ protected function getAllTopicPartitions(string $topic): array
* @param RdKafkaMessage $message
* @return KafkaConsumerMessageInterface
*/
protected function getConsumerMessage(RdKafkaMessage $message): KafkaConsumerMessageInterface
private function getConsumerMessage(RdKafkaMessage $message): KafkaConsumerMessageInterface
{
return new KafkaConsumerMessage(
(string) $message->topic_name,
49 changes: 2 additions & 47 deletions src/Consumer/KafkaConsumerBuilder.php
Original file line number Diff line number Diff line change
@@ -9,15 +9,10 @@
use Jobcloud\Kafka\Exception\KafkaConsumerBuilderException;
use Jobcloud\Kafka\Message\Decoder\DecoderInterface;
use Jobcloud\Kafka\Message\Decoder\NullDecoder;
use RdKafka\Consumer as RdKafkaLowLevelConsumer;
use RdKafka\KafkaConsumer as RdKafkaHighLevelConsumer;

final class KafkaConsumerBuilder implements KafkaConsumerBuilderInterface
{

public const CONSUMER_TYPE_LOW_LEVEL = 'low';
public const CONSUMER_TYPE_HIGH_LEVEL = 'high';

/**
* @var string[]
*/
@@ -42,11 +37,6 @@ final class KafkaConsumerBuilder implements KafkaConsumerBuilderInterface
*/
private $consumerGroup = 'default';

/**
* @var string
*/
private $consumerType = self::CONSUMER_TYPE_HIGH_LEVEL;

/**
* @var callable
*/
@@ -180,20 +170,6 @@ public function withConsumerGroup(string $consumerGroup): KafkaConsumerBuilderIn
return $that;
}

/**
* Set the consumer type, can be either CONSUMER_TYPE_LOW_LEVEL or CONSUMER_TYPE_HIGH_LEVEL
*
* @param string $consumerType
* @return KafkaConsumerBuilderInterface
*/
public function withConsumerType(string $consumerType): KafkaConsumerBuilderInterface
{
$that = clone $this;
$that->consumerType = $consumerType;

return $that;
}

/**
* Set a callback to be called on errors.
* The default callback will throw an exception for every error
@@ -299,34 +275,13 @@ public function build(): KafkaConsumerInterface
$kafkaConfig = new KafkaConfiguration(
$this->brokers,
$this->topics,
$this->config,
$this->consumerType
$this->config
);

//set consumer callbacks
$this->registerCallbacks($kafkaConfig);

//create RdConsumer
if (self::CONSUMER_TYPE_LOW_LEVEL === $this->consumerType) {
if (null !== $this->consumeCallback) {
throw new KafkaConsumerBuilderException(
sprintf(
KafkaConsumerBuilderException::UNSUPPORTED_CALLBACK_EXCEPTION_MESSAGE,
'consumerCallback',
KafkaLowLevelConsumer::class
)
);
}

$rdKafkaConsumer = new RdKafkaLowLevelConsumer($kafkaConfig);

return new KafkaLowLevelConsumer(
$rdKafkaConsumer,
$kafkaConfig,
$this->decoder
);
}

$rdKafkaConsumer = new RdKafkaHighLevelConsumer($kafkaConfig);

return new KafkaHighLevelConsumer($rdKafkaConsumer, $kafkaConfig, $this->decoder);
@@ -349,7 +304,7 @@ private function registerCallbacks(KafkaConfiguration $conf): void
}

if (null !== $this->logCallback) {
$conf->setLogCb($this->logCallback);
//$conf->setLogCb($this->logCallback);
}

if (null !== $this->offsetCommitCallback) {
8 changes: 0 additions & 8 deletions src/Consumer/KafkaConsumerBuilderInterface.php
Original file line number Diff line number Diff line change
@@ -66,14 +66,6 @@ public function withAdditionalConfig(array $config): self;
*/
public function withConsumerGroup(string $consumerGroup): self;

/**
* Set the consumer type, can be either CONSUMER_TYPE_LOW_LEVEL or CONSUMER_TYPE_HIGH_LEVEL
*
* @param string $consumerType
* @return KafkaConsumerBuilderInterface
*/
public function withConsumerType(string $consumerType): self;

/**
* Set a callback to be called on errors.
* The default callback will throw an exception for every error
Loading