Skip to content

Commit c0c7b80

Browse files
authored
add oauth callback (#7)
* add oauth callback * fix typos * fix test * adjust interfaces
1 parent d2f1403 commit c0c7b80

7 files changed

+105
-3
lines changed

Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ test: pcov-disable
2222

2323
static-analysis: pcov-disable
2424
mkdir -p build/logs/phpstan
25-
${PHPSTAN} analyse --no-progress --memory-limit=64
25+
${PHPSTAN} analyse --no-progress --memory-limit=128M
2626

2727
update-dependencies:
2828
composer update

src/Consumer/KafkaConsumerBuilder.php

+19-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ final class KafkaConsumerBuilder implements KafkaConsumerBuilderInterface
5151
/**
5252
* @var callable
5353
*/
54-
private $consumeCallback;
54+
private $oauthBearerCallback;
5555

5656
/**
5757
* @var callable
@@ -228,6 +228,20 @@ public function withOffsetCommitCallback(callable $offsetCommitCallback): KafkaC
228228
return $that;
229229
}
230230

231+
/**
232+
* Set callback that is being called on offset commits
233+
*
234+
* @param callable $oauthBearerCallback
235+
* @return KafkaConsumerBuilderInterface
236+
*/
237+
public function withOAuthBearerTokenRefreshCallback(callable $oauthBearerCallback): KafkaConsumerBuilderInterface
238+
{
239+
$that = clone $this;
240+
$that->oauthBearerCallback = $oauthBearerCallback;
241+
242+
return $that;
243+
}
244+
231245
/**
232246
* Lets you set a custom decoder for the consumed message
233247
*
@@ -292,5 +306,9 @@ private function registerCallbacks(KafkaConfiguration $conf): void
292306
if (null !== $this->offsetCommitCallback) {
293307
$conf->setOffsetCommitCb($this->offsetCommitCallback);
294308
}
309+
310+
if (null !== $this->oauthBearerCallback) {
311+
$conf->setOAuthBearerTokenRefreshCb($this->oauthBearerCallback);
312+
}
295313
}
296314
}

src/Consumer/KafkaConsumerBuilderInterface.php

+9-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,15 @@ public function withDecoder(DecoderInterface $decoder): self;
105105
* @param callable $logCallback
106106
* @return KafkaConsumerBuilderInterface
107107
*/
108-
public function withLogCallback(callable $logCallback): KafkaConsumerBuilderInterface;
108+
public function withLogCallback(callable $logCallback): self;
109+
110+
/**
111+
* Set callback that is being called on offset commits
112+
*
113+
* @param callable $oauthBearerCallback
114+
* @return KafkaConsumerBuilderInterface
115+
*/
116+
public function withOAuthBearerTokenRefreshCallback(callable $oauthBearerCallback): self;
109117

110118
/**
111119
* Returns your consumer instance

src/Producer/KafkaProducerBuilder.php

+22
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,11 @@ final class KafkaProducerBuilder implements KafkaProducerBuilderInterface
3939
*/
4040
private $logCallback;
4141

42+
/**
43+
* @var callable
44+
*/
45+
private $oauthBearerCallback;
46+
4247
/**
4348
* @var EncoderInterface
4449
*/
@@ -131,6 +136,19 @@ public function withLogCallback(callable $logCallback): KafkaProducerBuilderInte
131136
return $this;
132137
}
133138

139+
/**
140+
* Callback for OAuth Bearer Token refresh
141+
*
142+
* @param callable $oauthBearerCallback
143+
* @return KafkaProducerBuilderInterface
144+
*/
145+
public function withOAuthBearerTokenRefreshCallback(callable $oauthBearerCallback): KafkaProducerBuilderInterface
146+
{
147+
$this->oauthBearerCallback = $oauthBearerCallback;
148+
149+
return $this;
150+
}
151+
134152
/**
135153
* Lets you set a custom encoder for produce message
136154
*
@@ -188,5 +206,9 @@ private function registerCallbacks(KafkaConfiguration $conf): void
188206
if (null !== $this->logCallback) {
189207
$conf->setLogCb($this->logCallback);
190208
}
209+
210+
if (null !== $this->oauthBearerCallback) {
211+
$conf->setOAuthBearerTokenRefreshCb($this->oauthBearerCallback);
212+
}
191213
}
192214
}

src/Producer/KafkaProducerBuilderInterface.php

+8
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,14 @@ public function withErrorCallback(callable $errorCallback): self;
5050
*/
5151
public function withLogCallback(callable $logCallback): self;
5252

53+
/**
54+
* Callback for OAuth Bearer Token refresh
55+
*
56+
* @param callable $oauthBearerCallback
57+
* @return KafkaProducerBuilderInterface
58+
*/
59+
public function withOAuthBearerTokenRefreshCallback(callable $oauthBearerCallback): self;
60+
5361
/**
5462
* Lets you set a custom encoder for produce message
5563
*

tests/Unit/Consumer/KafkaConsumerBuilderTest.php

+27
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,33 @@ public function testSetRebalanceCallback(): void
203203
self::assertArrayHasKey('rebalance_cb', $conf);
204204
}
205205

206+
/**
207+
* @return void
208+
* @throws \ReflectionException
209+
*/
210+
public function testSetOAuthBearerTokenRefreshCallback(): void
211+
{
212+
$callback = function () {
213+
// Anonymous test method, no logic required
214+
};
215+
216+
$clone = $this->kafkaConsumerBuilder->withOAuthBearerTokenRefreshCallback($callback);
217+
218+
$reflectionProperty = new \ReflectionProperty($clone, 'oauthBearerCallback');
219+
$reflectionProperty->setAccessible(true);
220+
221+
self::assertSame($callback, $reflectionProperty->getValue($clone));
222+
self::assertNotSame($clone, $this->kafkaConsumerBuilder);
223+
224+
$consumer = $clone
225+
->withAdditionalBroker('localhost')
226+
->withSubscription('test')
227+
->withOAuthBearerTokenRefreshCallback($callback)
228+
->build();
229+
$conf = $consumer->getConfiguration();
230+
self::assertArrayHasKey('oauthbearer_token_refresh_cb', $conf);
231+
}
232+
206233
/**
207234
* @return void
208235
* @throws \ReflectionException

tests/Unit/Producer/KafkaProducerBuilderTest.php

+19
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,24 @@ public function testSetErrorCallback(): void
108108
self::assertSame($callback, $reflectionProperty->getValue($clone));
109109
}
110110

111+
/**
112+
* @return void
113+
* @throws \ReflectionException
114+
*/
115+
public function testSetOAuthBearerTokenRefreshCallback(): void
116+
{
117+
$callback = function () {
118+
// Anonymous test method, no logic required
119+
};
120+
121+
$clone = $this->kafkaProducerBuilder->withOAuthBearerTokenRefreshCallback($callback);
122+
123+
$reflectionProperty = new \ReflectionProperty($clone, 'oauthBearerCallback');
124+
$reflectionProperty->setAccessible(true);
125+
126+
self::assertSame($callback, $reflectionProperty->getValue($clone));
127+
}
128+
111129
/**
112130
* @throws KafkaProducerException
113131
*/
@@ -132,6 +150,7 @@ public function testBuild(): void
132150
->withDeliveryReportCallback($callback)
133151
->withErrorCallback($callback)
134152
->withLogCallback($callback)
153+
->withOAuthBearerTokenRefreshCallback($callback)
135154
->build();
136155

137156
self::assertInstanceOf(KafkaProducerInterface::class, $producer);

0 commit comments

Comments
 (0)