Skip to content

Commit

Permalink
Merge pull request #19 from OtezVikentiy/add_new_consumer_config_max_…
Browse files Browse the repository at this point in the history
…poll_interval_ms

added configuration for consumers max.poll.interval.ms
  • Loading branch information
wowo authored Mar 4, 2024
2 parents eff2c7f + c80af1a commit b853f31
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 0 deletions.
65 changes: 65 additions & 0 deletions src/Configuration/Type/MaxPollIntervalMs.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
<?php

declare(strict_types=1);

namespace StsGamingGroup\KafkaBundle\Configuration\Type;

use StsGamingGroup\KafkaBundle\Configuration\Contract\CastValueInterface;
use StsGamingGroup\KafkaBundle\Configuration\Contract\ConsumerConfigurationInterface;
use StsGamingGroup\KafkaBundle\Configuration\Contract\KafkaConfigurationInterface;
use StsGamingGroup\KafkaBundle\Configuration\Traits\SupportsConsumerTrait;
use Symfony\Component\Console\Input\InputOption;

class MaxPollIntervalMs implements ConsumerConfigurationInterface, KafkaConfigurationInterface, CastValueInterface
{
use SupportsConsumerTrait;

public const NAME = 'max_poll_interval_ms';

public function getName(): string
{
return self::NAME;
}

public function getKafkaProperty(): string
{
return 'max.poll.interval.ms';
}

public function getMode(): int
{
return InputOption::VALUE_REQUIRED;
}

public function getDescription(): string
{
return sprintf(
<<<EOT
The maximum delay between invocations of poll() when using consumer group management. This places an upper
bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not
called before expiration of this timeout, then the consumer is considered failed and the group will
rebalance in order to reassign the partitions to another member. For consumers using a non-null
group.instance.id which reach this timeout, partitions will not be immediately reassigned. Instead, the
consumer will stop sending heartbeats and partitions will be reassigned after expiration of
session.timeout.ms. This mirrors the behavior of a static consumer which has shutdown.
Defaults to %s ms. Set 0 to disable.
EOT,
$this->getDefaultValue()
);
}

public function isValueValid($value): bool
{
return is_numeric($value) && !str_contains((string)$value, '.') && $value >= 0;
}

public function getDefaultValue(): int
{
return 300000;
}

public function cast($validatedValue): int
{
return (int) $validatedValue;
}
}
4 changes: 4 additions & 0 deletions src/DependencyInjection/Configuration.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
use StsGamingGroup\KafkaBundle\Configuration\Type\EnableAutoOffsetStore;
use StsGamingGroup\KafkaBundle\Configuration\Type\GroupId;
use StsGamingGroup\KafkaBundle\Configuration\Type\LogLevel;
use StsGamingGroup\KafkaBundle\Configuration\Type\MaxPollIntervalMs;
use StsGamingGroup\KafkaBundle\Configuration\Type\MaxRetries;
use StsGamingGroup\KafkaBundle\Configuration\Type\MaxRetryDelay;
use StsGamingGroup\KafkaBundle\Configuration\Type\ProducerPartition;
Expand Down Expand Up @@ -143,6 +144,9 @@ private function addConsumerConfigurations(NodeBuilder $builder): void
->integerNode(StatisticsIntervalMs::NAME)
->defaultValue((new StatisticsIntervalMs)->getDefaultValue())
->end()
->integerNode(MaxPollIntervalMs::NAME)
->defaultValue((new MaxPollIntervalMs)->getDefaultValue())
->end()
->scalarNode(AutoOffsetReset::NAME)
->defaultValue((new AutoOffsetReset)->getDefaultValue())
->cannotBeEmpty()
Expand Down
4 changes: 4 additions & 0 deletions src/Resources/config/configuration_types.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@
id="sts_gaming_group_kafka.configuration.type.statistics_interval_ms">
<tag name="sts_gaming_group_kafka.configuration.type"/>
</service>
<service class="StsGamingGroup\KafkaBundle\Configuration\Type\MaxPollIntervalMs"
id="sts_gaming_group_kafka.configuration.type.max_poll_interval_ms">
<tag name="sts_gaming_group_kafka.configuration.type"/>
</service>
<service class="StsGamingGroup\KafkaBundle\Configuration\Type\Decoder" id="sts_gaming_group_kafka.configuration.type.decoder">
<tag name="sts_gaming_group_kafka.configuration.type"/>
</service>
Expand Down
26 changes: 26 additions & 0 deletions tests/Unit/Configuration/Type/MaxPollIntervalMsTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<?php

declare(strict_types=1);

namespace StsGamingGroup\KafkaBundle\Tests\Unit\Configuration\Type;

use StsGamingGroup\KafkaBundle\Configuration\Contract\ConfigurationInterface;
use StsGamingGroup\KafkaBundle\Configuration\Type\MaxPollIntervalMs;

class MaxPollIntervalMsTest extends AbstractConfigurationTest
{
protected function getConfiguration(): ConfigurationInterface
{
return new MaxPollIntervalMs();
}

protected function getValidValues(): array
{
return ['1', 2, 1000, '1000', 5000, 30000, 100000, 2_000_000];
}

protected function getInvalidValues(): array
{
return [-1, '-1', 1.51, '2.55', '', [], null, new \stdClass(), false, true];
}
}

0 comments on commit b853f31

Please sign in to comment.