Skip to content

Commit 8ea792c

Browse files
authored
Add set oauth bearer func (#48)
* save work * add test * init vars * fix test * add free
1 parent b472b14 commit 8ea792c

4 files changed

+89
-1
lines changed

Diff for: simple_kafka_client.c

+53
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,59 @@ ZEND_METHOD(SimpleKafkaClient_Kafka, setOAuthBearerTokenFailure)
288288
}
289289
/* }}} */
290290

291+
/* {{{ proto void SimpleKafkaClient\Kafka::setOAuthBearerToken(string $token, int $lifetimeMs, string $principalName, ?array $extensions = null)
292+
Set SASL/OAUTHBEARER token and metadata. */
293+
ZEND_METHOD(SimpleKafkaClient_Kafka, setOAuthBearerToken)
294+
{
295+
zend_long lifetime_ms;
296+
const char **extensions = NULL;
297+
char *header_key, *header_value, *token, *principal_name, *errstr = NULL;
298+
size_t token_len, principal_name_len, errstr_size = 0, extension_size = 0;
299+
kafka_object *intern;
300+
rd_kafka_resp_err_t err;
301+
HashTable *ht_extensions = NULL;
302+
HashPosition extensionsPos;
303+
zval *z_header_value;
304+
305+
ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 3, 4)
306+
Z_PARAM_STRING(token, token_len)
307+
Z_PARAM_LONG(lifetime_ms)
308+
Z_PARAM_STRING(principal_name, principal_name_len)
309+
Z_PARAM_OPTIONAL
310+
Z_PARAM_ARRAY_HT_OR_NULL(ht_extensions)
311+
ZEND_PARSE_PARAMETERS_END();
312+
313+
intern = get_kafka_object(getThis());
314+
if (!intern) {
315+
return;
316+
}
317+
318+
if (ht_extensions) {
319+
for (zend_hash_internal_pointer_reset_ex(ht_extensions, &extensionsPos);
320+
(z_header_value = zend_hash_get_current_data_ex(ht_extensions, &extensionsPos)) != NULL &&
321+
(header_key = kafka_hash_get_current_key_ex(ht_extensions, &extensionsPos)) != NULL;
322+
zend_hash_move_forward_ex(ht_extensions, &extensionsPos)) {
323+
convert_to_string_ex(z_header_value);
324+
extensions = realloc(extensions, (extension_size + 1) * sizeof (header_key));
325+
extensions[extension_size] = header_key;
326+
header_value = Z_STRVAL_P(z_header_value);
327+
extensions = realloc(extensions, (extension_size + 2) * sizeof (header_value));
328+
extensions[extension_size+1] = Z_STRVAL_P(z_header_value);
329+
extension_size+=2;
330+
}
331+
}
332+
333+
err = rd_kafka_oauthbearer_set_token(intern->rk, token, lifetime_ms, principal_name, extensions, extension_size, errstr, errstr_size);
334+
335+
if (err != RD_KAFKA_RESP_ERR_NO_ERROR) {
336+
zend_throw_exception(ce_kafka_exception, rd_kafka_err2str(err), err);
337+
return;
338+
}
339+
340+
free(extensions);
341+
}
342+
/* }}} */
343+
291344
#define COPY_CONSTANT(name) \
292345
REGISTER_LONG_CONSTANT(#name, name, CONST_CS | CONST_PERSISTENT)
293346

Diff for: simple_kafka_client.stub.php

+2
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,6 @@ public function queryWatermarkOffsets(string $topic, int $partition, int &$low,
1515
public function offsetsForTimes(array $topicPartitions, int $timeoutMs): array {}
1616

1717
public function setOAuthBearerTokenFailure(string $errorString): void {}
18+
19+
public function setOAuthBearerToken(string $token, int $lifetimeMs, string $principalName, ?array $extensions = null): void {}
1820
}

Diff for: simple_kafka_client_arginfo.h

+10-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/* This is a generated file, edit the .stub.php file instead.
2-
* Stub hash: 54f0c76165212e21416f46325d0a52b0b7fce4a8 */
2+
* Stub hash: 54165f3ef5d3833ee646b825574c959323fd612b */
33

44
ZEND_BEGIN_ARG_WITH_RETURN_OBJ_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_getMetadata, 0, 3, SimpleKafkaClient\\Metadata, 0)
55
ZEND_ARG_TYPE_INFO(0, allTopics, _IS_BOOL, 0)
@@ -27,12 +27,20 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_se
2727
ZEND_ARG_TYPE_INFO(0, errorString, IS_STRING, 0)
2828
ZEND_END_ARG_INFO()
2929

30+
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_setOAuthBearerToken, 0, 3, IS_VOID, 0)
31+
ZEND_ARG_TYPE_INFO(0, token, IS_STRING, 0)
32+
ZEND_ARG_TYPE_INFO(0, lifetimeMs, IS_LONG, 0)
33+
ZEND_ARG_TYPE_INFO(0, principalName, IS_STRING, 0)
34+
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, extensions, IS_ARRAY, 1, "null")
35+
ZEND_END_ARG_INFO()
36+
3037

3138
ZEND_METHOD(SimpleKafkaClient_Kafka, getMetadata);
3239
ZEND_METHOD(SimpleKafkaClient_Kafka, getOutQLen);
3340
ZEND_METHOD(SimpleKafkaClient_Kafka, queryWatermarkOffsets);
3441
ZEND_METHOD(SimpleKafkaClient_Kafka, offsetsForTimes);
3542
ZEND_METHOD(SimpleKafkaClient_Kafka, setOAuthBearerTokenFailure);
43+
ZEND_METHOD(SimpleKafkaClient_Kafka, setOAuthBearerToken);
3644

3745

3846
static const zend_function_entry class_SimpleKafkaClient_Kafka_methods[] = {
@@ -41,5 +49,6 @@ static const zend_function_entry class_SimpleKafkaClient_Kafka_methods[] = {
4149
ZEND_ME(SimpleKafkaClient_Kafka, queryWatermarkOffsets, arginfo_class_SimpleKafkaClient_Kafka_queryWatermarkOffsets, ZEND_ACC_PUBLIC)
4250
ZEND_ME(SimpleKafkaClient_Kafka, offsetsForTimes, arginfo_class_SimpleKafkaClient_Kafka_offsetsForTimes, ZEND_ACC_PUBLIC)
4351
ZEND_ME(SimpleKafkaClient_Kafka, setOAuthBearerTokenFailure, arginfo_class_SimpleKafkaClient_Kafka_setOAuthBearerTokenFailure, ZEND_ACC_PUBLIC)
52+
ZEND_ME(SimpleKafkaClient_Kafka, setOAuthBearerToken, arginfo_class_SimpleKafkaClient_Kafka_setOAuthBearerToken, ZEND_ACC_PUBLIC)
4453
ZEND_FE_END
4554
};

Diff for: tests/set_oauthbearer_token.phpt

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
--TEST--
2+
Produce, consume
3+
--SKIPIF--
4+
<?php
5+
require __DIR__ . '/integration-tests-check.php';
6+
--FILE--
7+
<?php
8+
require __DIR__ . '/integration-tests-check.php';
9+
10+
$conf = new SimpleKafkaClient\Configuration();
11+
$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS'));
12+
$conf->set('security.protocol', 'SASL_PLAINTEXT');
13+
$conf->set('sasl.mechanisms', 'OAUTHBEARER');
14+
15+
$conf->setErrorCb(function($kafka, $errorCode, $errorString) {
16+
var_dump($errorString);
17+
});
18+
19+
$producer = new SimpleKafkaClient\Producer($conf);
20+
$producer->setOAuthBearerToken('token', 100000 + time() * 1000, 'principal', ['test'=>'key']);
21+
$producer->poll(-1);
22+
echo 'Done';
23+
--EXPECT--
24+
Done

0 commit comments

Comments
 (0)