Skip to content

Commit ada97c6

Browse files
authored
add oauthbearer refresh cb (#49)
* add oauthbearer refresh cb * fix crash, add test
1 parent 8ea792c commit ada97c6

5 files changed

+96
-1
lines changed

Diff for: configuration.c

+62
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ void kafka_conf_callbacks_dtor(kafka_conf_callbacks *cbs) /* {{{ */
6868
cbs->offset_commit = NULL;
6969
kafka_conf_callback_dtor(cbs->log);
7070
cbs->log = NULL;
71+
kafka_conf_callback_dtor(cbs->oauthbearer_refresh);
72+
cbs->oauthbearer_refresh = NULL;
7173
} /* }}} */
7274

7375
static void kafka_conf_callback_copy(kafka_conf_callback **to, kafka_conf_callback *from) /* {{{ */
@@ -87,6 +89,7 @@ void kafka_conf_callbacks_copy(kafka_conf_callbacks *to, kafka_conf_callbacks *f
8789
kafka_conf_callback_copy(&to->stats, from->stats);
8890
kafka_conf_callback_copy(&to->offset_commit, from->offset_commit);
8991
kafka_conf_callback_copy(&to->log, from->log);
92+
kafka_conf_callback_copy(&to->oauthbearer_refresh, from->oauthbearer_refresh);
9093
} /* }}} */
9194

9295
static void kafka_conf_free(zend_object *object) /* {{{ */
@@ -304,6 +307,33 @@ static void kafka_conf_log_cb(const rd_kafka_t *rk, int level, const char *facil
304307
zval_ptr_dtor(&args[3]);
305308
}
306309

310+
static void kafka_conf_oauthbearer_token_refresh_cb(rd_kafka_t *rk, const char *oauthbearer_config, void *opaque)
311+
{
312+
kafka_conf_callbacks *cbs = (kafka_conf_callbacks*) opaque;
313+
zval args[2];
314+
315+
if (!opaque) {
316+
return;
317+
}
318+
319+
if (!cbs->oauthbearer_refresh) {
320+
return;
321+
}
322+
323+
ZVAL_NULL(&args[0]);
324+
ZVAL_NULL(&args[1]);
325+
326+
ZVAL_ZVAL(&args[0], &cbs->zrk, 1, 0);
327+
if (oauthbearer_config) {
328+
ZVAL_STRING(&args[1], oauthbearer_config);
329+
}
330+
331+
kafka_call_function(&cbs->oauthbearer_refresh->fci, &cbs->oauthbearer_refresh->fcc, NULL, 2, args);
332+
333+
zval_ptr_dtor(&args[0]);
334+
zval_ptr_dtor(&args[1]);
335+
}
336+
307337
/* {{{ proto SimpleKafkaClient\Configuration::__construct() */
308338
ZEND_METHOD(SimpleKafkaClient_Configuration, __construct)
309339
{
@@ -579,6 +609,38 @@ ZEND_METHOD(SimpleKafkaClient_Configuration, setLogCb)
579609
}
580610
/* }}} */
581611

612+
/* {{{ proto void SimpleKafkaClient\Configuration::setOAuthBearerTokenRefreshCb(callable $callback)
613+
Sets the OAuthBearer token refresh callback */
614+
ZEND_METHOD(SimpleKafkaClient_Configuration, setOAuthBearerTokenRefreshCb)
615+
{
616+
zend_fcall_info fci;
617+
zend_fcall_info_cache fcc;
618+
kafka_conf_object *intern;
619+
620+
ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 1)
621+
Z_PARAM_FUNC(fci, fcc)
622+
ZEND_PARSE_PARAMETERS_END();
623+
624+
intern = get_kafka_conf_object(getThis());
625+
if (!intern) {
626+
return;
627+
}
628+
629+
Z_ADDREF_P(&fci.function_name);
630+
631+
if (intern->cbs.oauthbearer_refresh) {
632+
zval_ptr_dtor(&intern->cbs.oauthbearer_refresh->fci.function_name);
633+
} else {
634+
intern->cbs.oauthbearer_refresh = ecalloc(1, sizeof(*intern->cbs.oauthbearer_refresh));
635+
}
636+
637+
intern->cbs.oauthbearer_refresh->fci = fci;
638+
intern->cbs.oauthbearer_refresh->fcc = fcc;
639+
640+
rd_kafka_conf_set_oauthbearer_token_refresh_cb(intern->conf, kafka_conf_oauthbearer_token_refresh_cb);
641+
}
642+
/* }}} */
643+
582644
void kafka_conf_init(INIT_FUNC_ARGS)
583645
{
584646
zend_class_entry tmpce;

Diff for: configuration.stub.php

+2
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,6 @@ public function setRebalanceCb(callable $callback): void {}
2323
public function setOffsetCommitCb(callable $callback): void {}
2424

2525
public function setLogCb(callable $callback): void {}
26+
27+
public function setOAuthBearerTokenRefreshCb(callable $callback): void {}
2628
}

Diff for: configuration_arginfo.h

+5-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/* This is a generated file, edit the .stub.php file instead.
2-
* Stub hash: 1790726e9dd0d0664baa412bb345663c4dab71b5 */
2+
* Stub hash: b372876d55f3b02bd30dd4c09d20f305b070718c */
33

44
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_SimpleKafkaClient_Configuration___construct, 0, 0, 0)
55
ZEND_END_ARG_INFO()
@@ -26,6 +26,8 @@ ZEND_END_ARG_INFO()
2626

2727
#define arginfo_class_SimpleKafkaClient_Configuration_setLogCb arginfo_class_SimpleKafkaClient_Configuration_setErrorCb
2828

29+
#define arginfo_class_SimpleKafkaClient_Configuration_setOAuthBearerTokenRefreshCb arginfo_class_SimpleKafkaClient_Configuration_setErrorCb
30+
2931

3032
ZEND_METHOD(SimpleKafkaClient_Configuration, __construct);
3133
ZEND_METHOD(SimpleKafkaClient_Configuration, dump);
@@ -36,6 +38,7 @@ ZEND_METHOD(SimpleKafkaClient_Configuration, setStatsCb);
3638
ZEND_METHOD(SimpleKafkaClient_Configuration, setRebalanceCb);
3739
ZEND_METHOD(SimpleKafkaClient_Configuration, setOffsetCommitCb);
3840
ZEND_METHOD(SimpleKafkaClient_Configuration, setLogCb);
41+
ZEND_METHOD(SimpleKafkaClient_Configuration, setOAuthBearerTokenRefreshCb);
3942

4043

4144
static const zend_function_entry class_SimpleKafkaClient_Configuration_methods[] = {
@@ -48,5 +51,6 @@ static const zend_function_entry class_SimpleKafkaClient_Configuration_methods[]
4851
ZEND_ME(SimpleKafkaClient_Configuration, setRebalanceCb, arginfo_class_SimpleKafkaClient_Configuration_setRebalanceCb, ZEND_ACC_PUBLIC)
4952
ZEND_ME(SimpleKafkaClient_Configuration, setOffsetCommitCb, arginfo_class_SimpleKafkaClient_Configuration_setOffsetCommitCb, ZEND_ACC_PUBLIC)
5053
ZEND_ME(SimpleKafkaClient_Configuration, setLogCb, arginfo_class_SimpleKafkaClient_Configuration_setLogCb, ZEND_ACC_PUBLIC)
54+
ZEND_ME(SimpleKafkaClient_Configuration, setOAuthBearerTokenRefreshCb, arginfo_class_SimpleKafkaClient_Configuration_setOAuthBearerTokenRefreshCb, ZEND_ACC_PUBLIC)
5155
ZEND_FE_END
5256
};

Diff for: php_simple_kafka_client_int.h

+1
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ typedef struct _kafka_conf_callbacks {
5656
kafka_conf_callback *consume;
5757
kafka_conf_callback *offset_commit;
5858
kafka_conf_callback *log;
59+
kafka_conf_callback *oauthbearer_refresh;
5960
} kafka_conf_callbacks;
6061

6162
typedef struct _kafka_conf_object {

Diff for: tests/oauthbearer_cb.phpt

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
--TEST--
2+
Produce, consume
3+
--SKIPIF--
4+
<?php
5+
require __DIR__ . '/integration-tests-check.php';
6+
--FILE--
7+
<?php
8+
require __DIR__ . '/integration-tests-check.php';
9+
10+
$conf = new SimpleKafkaClient\Configuration();
11+
$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS'));
12+
$conf->set('security.protocol', 'SASL_PLAINTEXT');
13+
$conf->set('sasl.mechanisms', 'OAUTHBEARER');
14+
$conf->set('sasl.oauthbearer.config', 'principalClaimName=azp');
15+
$conf->setOAuthBearerTokenRefreshCb(function($kafka, $oAuthBearerConfig) {
16+
var_dump($oAuthBearerConfig);
17+
});
18+
19+
$conf->setErrorCb(function($kafka, $errorCode, $errorString) {
20+
var_dump($errorString);
21+
});
22+
23+
$producer = new SimpleKafkaClient\Producer($conf);
24+
$producer->poll(-1);
25+
--EXPECT--
26+
string(22) "principalClaimName=azp"

0 commit comments

Comments
 (0)