From b20d530cbec5f765e2f8cf0eebb9165830ed2624 Mon Sep 17 00:00:00 2001 From: Nick Chiu Date: Tue, 7 Sep 2021 14:46:04 +0200 Subject: [PATCH 1/4] add oauth callback --- src/Consumer/KafkaConsumerBuilder.php | 20 +++++++++++++- src/Producer/KafkaProducerBuilder.php | 22 +++++++++++++++ .../Consumer/KafkaConsumerBuilderTest.php | 27 +++++++++++++++++++ .../Producer/KafkaProducerBuilderTest.php | 18 +++++++++++++ 4 files changed, 86 insertions(+), 1 deletion(-) diff --git a/src/Consumer/KafkaConsumerBuilder.php b/src/Consumer/KafkaConsumerBuilder.php index 2985f39..18f850d 100644 --- a/src/Consumer/KafkaConsumerBuilder.php +++ b/src/Consumer/KafkaConsumerBuilder.php @@ -51,7 +51,7 @@ final class KafkaConsumerBuilder implements KafkaConsumerBuilderInterface /** * @var callable */ - private $consumeCallback; + private $oauthBearerCallback; /** * @var callable @@ -228,6 +228,20 @@ public function withOffsetCommitCallback(callable $offsetCommitCallback): KafkaC return $that; } + /** + * Set callback that is being called on offset commits + * + * @param callable $offsetCommitCallback + * @return KafkaConsumerBuilderInterface + */ + public function withOAuthBearerTokenRefreshCallback(callable $oauthBearerCallback): KafkaConsumerBuilderInterface + { + $that = clone $this; + $that->oauthBearerCallback = $oauthBearerCallback; + + return $that; + } + /** * Lets you set a custom decoder for the consumed message * @@ -292,5 +306,9 @@ private function registerCallbacks(KafkaConfiguration $conf): void if (null !== $this->offsetCommitCallback) { $conf->setOffsetCommitCb($this->offsetCommitCallback); } + + if (null !== $this->oauthBearerCallback) { + $conf->setOAuthBearerTokenRefreshCbLogCb($this->oauthBearerCallback); + } } } diff --git a/src/Producer/KafkaProducerBuilder.php b/src/Producer/KafkaProducerBuilder.php index bf564f3..a74c58b 100644 --- a/src/Producer/KafkaProducerBuilder.php +++ b/src/Producer/KafkaProducerBuilder.php @@ -39,6 +39,11 @@ final class KafkaProducerBuilder implements KafkaProducerBuilderInterface */ private $logCallback; + /** + * @var callable + */ + private $oauthBearerCallback; + /** * @var EncoderInterface */ @@ -131,6 +136,19 @@ public function withLogCallback(callable $logCallback): KafkaProducerBuilderInte return $this; } + /** + * Callback for OAuth Bearer Token refresh + * + * @param callable $oauthBearerCallback + * @return KafkaProducerBuilderInterface + */ + public function withOAuthBearerTokenRefreshCallback(callable $oauthBearerCallback): KafkaProducerBuilderInterface + { + $this->oauthBearerCallback = $oauthBearerCallback; + + return $this; + } + /** * Lets you set a custom encoder for produce message * @@ -188,5 +206,9 @@ private function registerCallbacks(KafkaConfiguration $conf): void if (null !== $this->logCallback) { $conf->setLogCb($this->logCallback); } + + if (null !== $this->oauthBearerCallback) { + $conf->setOAuthBearerTokenRefreshCbLogCb($this->oauthBearerCallback); + } } } diff --git a/tests/Unit/Consumer/KafkaConsumerBuilderTest.php b/tests/Unit/Consumer/KafkaConsumerBuilderTest.php index 0519da4..4db7f59 100644 --- a/tests/Unit/Consumer/KafkaConsumerBuilderTest.php +++ b/tests/Unit/Consumer/KafkaConsumerBuilderTest.php @@ -203,6 +203,33 @@ public function testSetRebalanceCallback(): void self::assertArrayHasKey('rebalance_cb', $conf); } + /** + * @return void + * @throws \ReflectionException + */ + public function testSetOAuthBearerTokenRefreshCallback(): void + { + $callback = function () { + // Anonymous test method, no logic required + }; + + $clone = $this->kafkaConsumerBuilder->withOAuthBearerTokenRefreshCallback($callback); + + $reflectionProperty = new \ReflectionProperty($clone, 'oauthBearerCallback'); + $reflectionProperty->setAccessible(true); + + self::assertSame($callback, $reflectionProperty->getValue($clone)); + self::assertNotSame($clone, $this->kafkaConsumerBuilder); + + $consumer = $clone + ->withAdditionalBroker('localhost') + ->withSubscription('test') + ->withOAuthBearerTokenRefreshCallback($callback) + ->build(); + $conf = $consumer->getConfiguration(); + self::assertArrayHasKey('oauthbearer_refresh', $conf); + } + /** * @return void * @throws \ReflectionException diff --git a/tests/Unit/Producer/KafkaProducerBuilderTest.php b/tests/Unit/Producer/KafkaProducerBuilderTest.php index 9df09db..60e3356 100644 --- a/tests/Unit/Producer/KafkaProducerBuilderTest.php +++ b/tests/Unit/Producer/KafkaProducerBuilderTest.php @@ -108,6 +108,24 @@ public function testSetErrorCallback(): void self::assertSame($callback, $reflectionProperty->getValue($clone)); } + /** + * @return void + * @throws \ReflectionException + */ + public function testSetOAuthBearerTokenRefreshCallback(): void + { + $callback = function () { + // Anonymous test method, no logic required + }; + + $clone = $this->kafkaProducerBuilder->withOAuthBearerTokenRefreshCallback($callback); + + $reflectionProperty = new \ReflectionProperty($clone, 'oauthBearerCallback'); + $reflectionProperty->setAccessible(true); + + self::assertSame($callback, $reflectionProperty->getValue($clone)); + } + /** * @throws KafkaProducerException */ From 1b05f303b4fe233eb6e70084884a3a68b1444141 Mon Sep 17 00:00:00 2001 From: Nick Chiu Date: Tue, 7 Sep 2021 14:52:42 +0200 Subject: [PATCH 2/4] fix typos --- Makefile | 2 +- src/Consumer/KafkaConsumerBuilder.php | 2 +- src/Producer/KafkaProducerBuilder.php | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index 014e83a..6b12518 100644 --- a/Makefile +++ b/Makefile @@ -22,7 +22,7 @@ test: pcov-disable static-analysis: pcov-disable mkdir -p build/logs/phpstan - ${PHPSTAN} analyse --no-progress --memory-limit=64 + ${PHPSTAN} analyse --no-progress --memory-limit=64M update-dependencies: composer update diff --git a/src/Consumer/KafkaConsumerBuilder.php b/src/Consumer/KafkaConsumerBuilder.php index 18f850d..8bcb8b4 100644 --- a/src/Consumer/KafkaConsumerBuilder.php +++ b/src/Consumer/KafkaConsumerBuilder.php @@ -308,7 +308,7 @@ private function registerCallbacks(KafkaConfiguration $conf): void } if (null !== $this->oauthBearerCallback) { - $conf->setOAuthBearerTokenRefreshCbLogCb($this->oauthBearerCallback); + $conf->setOAuthBearerTokenRefreshCb($this->oauthBearerCallback); } } } diff --git a/src/Producer/KafkaProducerBuilder.php b/src/Producer/KafkaProducerBuilder.php index a74c58b..d158a8d 100644 --- a/src/Producer/KafkaProducerBuilder.php +++ b/src/Producer/KafkaProducerBuilder.php @@ -208,7 +208,7 @@ private function registerCallbacks(KafkaConfiguration $conf): void } if (null !== $this->oauthBearerCallback) { - $conf->setOAuthBearerTokenRefreshCbLogCb($this->oauthBearerCallback); + $conf->setOAuthBearerTokenRefreshCb($this->oauthBearerCallback); } } } From 902bb3fcac70955c72eb3788c8270eb0daac4f53 Mon Sep 17 00:00:00 2001 From: Nick Chiu Date: Tue, 7 Sep 2021 15:00:39 +0200 Subject: [PATCH 3/4] fix test --- Makefile | 2 +- tests/Unit/Consumer/KafkaConsumerBuilderTest.php | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 6b12518..87c6438 100644 --- a/Makefile +++ b/Makefile @@ -22,7 +22,7 @@ test: pcov-disable static-analysis: pcov-disable mkdir -p build/logs/phpstan - ${PHPSTAN} analyse --no-progress --memory-limit=64M + ${PHPSTAN} analyse --no-progress --memory-limit=128M update-dependencies: composer update diff --git a/tests/Unit/Consumer/KafkaConsumerBuilderTest.php b/tests/Unit/Consumer/KafkaConsumerBuilderTest.php index 4db7f59..936c574 100644 --- a/tests/Unit/Consumer/KafkaConsumerBuilderTest.php +++ b/tests/Unit/Consumer/KafkaConsumerBuilderTest.php @@ -227,7 +227,7 @@ public function testSetOAuthBearerTokenRefreshCallback(): void ->withOAuthBearerTokenRefreshCallback($callback) ->build(); $conf = $consumer->getConfiguration(); - self::assertArrayHasKey('oauthbearer_refresh', $conf); + self::assertArrayHasKey('oauthbearer_token_refresh_cb', $conf); } /** From d3c116abe90d2703fcec67a756df8f1bebf10799 Mon Sep 17 00:00:00 2001 From: Nick Chiu Date: Tue, 7 Sep 2021 15:08:27 +0200 Subject: [PATCH 4/4] adjust interfaces --- src/Consumer/KafkaConsumerBuilder.php | 2 +- src/Consumer/KafkaConsumerBuilderInterface.php | 10 +++++++++- src/Producer/KafkaProducerBuilderInterface.php | 8 ++++++++ tests/Unit/Producer/KafkaProducerBuilderTest.php | 1 + 4 files changed, 19 insertions(+), 2 deletions(-) diff --git a/src/Consumer/KafkaConsumerBuilder.php b/src/Consumer/KafkaConsumerBuilder.php index 8bcb8b4..f813037 100644 --- a/src/Consumer/KafkaConsumerBuilder.php +++ b/src/Consumer/KafkaConsumerBuilder.php @@ -231,7 +231,7 @@ public function withOffsetCommitCallback(callable $offsetCommitCallback): KafkaC /** * Set callback that is being called on offset commits * - * @param callable $offsetCommitCallback + * @param callable $oauthBearerCallback * @return KafkaConsumerBuilderInterface */ public function withOAuthBearerTokenRefreshCallback(callable $oauthBearerCallback): KafkaConsumerBuilderInterface diff --git a/src/Consumer/KafkaConsumerBuilderInterface.php b/src/Consumer/KafkaConsumerBuilderInterface.php index e828696..8f60205 100644 --- a/src/Consumer/KafkaConsumerBuilderInterface.php +++ b/src/Consumer/KafkaConsumerBuilderInterface.php @@ -105,7 +105,15 @@ public function withDecoder(DecoderInterface $decoder): self; * @param callable $logCallback * @return KafkaConsumerBuilderInterface */ - public function withLogCallback(callable $logCallback): KafkaConsumerBuilderInterface; + public function withLogCallback(callable $logCallback): self; + + /** + * Set callback that is being called on offset commits + * + * @param callable $oauthBearerCallback + * @return KafkaConsumerBuilderInterface + */ + public function withOAuthBearerTokenRefreshCallback(callable $oauthBearerCallback): self; /** * Returns your consumer instance diff --git a/src/Producer/KafkaProducerBuilderInterface.php b/src/Producer/KafkaProducerBuilderInterface.php index c14a0bb..c8e6b05 100644 --- a/src/Producer/KafkaProducerBuilderInterface.php +++ b/src/Producer/KafkaProducerBuilderInterface.php @@ -50,6 +50,14 @@ public function withErrorCallback(callable $errorCallback): self; */ public function withLogCallback(callable $logCallback): self; + /** + * Callback for OAuth Bearer Token refresh + * + * @param callable $oauthBearerCallback + * @return KafkaProducerBuilderInterface + */ + public function withOAuthBearerTokenRefreshCallback(callable $oauthBearerCallback): self; + /** * Lets you set a custom encoder for produce message * diff --git a/tests/Unit/Producer/KafkaProducerBuilderTest.php b/tests/Unit/Producer/KafkaProducerBuilderTest.php index 60e3356..13f6d15 100644 --- a/tests/Unit/Producer/KafkaProducerBuilderTest.php +++ b/tests/Unit/Producer/KafkaProducerBuilderTest.php @@ -150,6 +150,7 @@ public function testBuild(): void ->withDeliveryReportCallback($callback) ->withErrorCallback($callback) ->withLogCallback($callback) + ->withOAuthBearerTokenRefreshCallback($callback) ->build(); self::assertInstanceOf(KafkaProducerInterface::class, $producer);