Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add metadata property to topic partition #50

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion php_simple_kafka_client_int.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ typedef struct _kafka_topic_partition_intern {
char *topic;
int32_t partition;
int64_t offset;
zval metadata;
zend_object std;
} kafka_topic_partition_intern;

Expand Down Expand Up @@ -113,6 +114,9 @@ typedef void (*kafka_metadata_collection_ctor_t)(zval *renurn_value, zval *zmeta
#define Z_PARAM_STRING_OR_NULL(dest, dest_len) \
Z_PARAM_STRING_EX(dest, dest_len, 1, 0)

#define Z_PARAM_ZVAL_OR_NULL(dest) \
Z_PARAM_ZVAL_EX(dest, 1, 0)

#endif

#ifdef PHP_WIN32
Expand Down Expand Up @@ -208,7 +212,7 @@ void kafka_metadata_topic_init(INIT_FUNC_ARGS);
void kafka_metadata_topic_ctor(zval *return_value, zval *zmetadata, const void *metadata_topic);
void kafka_topic_init(INIT_FUNC_ARGS);
void kafka_metadata_topic_partition_init(INIT_FUNC_ARGS);
void kafka_topic_partition_init(zval *z, char *topic, int32_t partition, int64_t offset);
void kafka_topic_partition_init(zval *z, char *topic, int32_t partition, int64_t offset, zval *metadata);
void kafka_topic_partition_list_to_array(zval *return_value, rd_kafka_topic_partition_list_t *list);

kafka_topic_partition_intern * get_topic_partition_object(zval *z);
Expand Down
40 changes: 35 additions & 5 deletions tests/topic_partition.phpt
Original file line number Diff line number Diff line change
Expand Up @@ -11,48 +11,78 @@ $topar = new SimpleKafkaClient\TopicPartition("test", RD_KAFKA_PARTITION_UA, 42)

var_dump($topar);

$topar = new SimpleKafkaClient\TopicPartition("test", RD_KAFKA_PARTITION_UA, 42, ['test']);

var_dump($topar);

var_dump(array(
"topic" => $topar->getTopicName(),
"partition" => $topar->getPartition(),
"offset" => $topar->getOffset(),
"metadata" => $topar->getMetadata(),
));

$topar
->setTopicName("foo")
->setPartition(123)
->setOffset(43);
->setOffset(43)
->setMetadata(2);

var_dump($topar);
--EXPECT--
object(SimpleKafkaClient\TopicPartition)#1 (3) {
object(SimpleKafkaClient\TopicPartition)#1 (4) {
["topic"]=>
string(4) "test"
["partition"]=>
int(-1)
["offset"]=>
int(0)
["metadata"]=>
NULL
}
object(SimpleKafkaClient\TopicPartition)#2 (4) {
["topic"]=>
string(4) "test"
["partition"]=>
int(-1)
["offset"]=>
int(42)
["metadata"]=>
NULL
}
object(SimpleKafkaClient\TopicPartition)#2 (3) {
object(SimpleKafkaClient\TopicPartition)#1 (4) {
["topic"]=>
string(4) "test"
["partition"]=>
int(-1)
["offset"]=>
int(42)
["metadata"]=>
array(1) {
[0]=>
string(4) "test"
}
}
array(3) {
array(4) {
["topic"]=>
string(4) "test"
["partition"]=>
int(-1)
["offset"]=>
int(42)
["metadata"]=>
array(1) {
[0]=>
string(4) "test"
}
}
object(SimpleKafkaClient\TopicPartition)#2 (3) {
object(SimpleKafkaClient\TopicPartition)#1 (4) {
["topic"]=>
string(3) "foo"
["partition"]=>
int(123)
["offset"]=>
int(43)
["metadata"]=>
int(2)
}
76 changes: 63 additions & 13 deletions topic_partition.c
Original file line number Diff line number Diff line change
Expand Up @@ -97,33 +97,35 @@ kafka_topic_partition_intern * get_topic_partition_object(zval *z) /* {{{ */

static HashTable *get_debug_info(Z_KAFKA_OBJ *object, int *is_temp) /* {{{ */
{
zval ary;
zval arr;
object_intern *intern;

*is_temp = 1;

array_init(&ary);
array_init(&arr);

intern = kafka_get_debug_object(object_intern, object);

if (!intern) {
return Z_ARRVAL(ary);
return Z_ARRVAL(arr);
}

if (intern->topic) {
add_assoc_string(&ary, "topic", intern->topic);
add_assoc_string(&arr, "topic", intern->topic);
} else {
add_assoc_null(&ary, "topic");
add_assoc_null(&arr, "topic");
}

add_assoc_long(&ary, "partition", intern->partition);
add_assoc_long(&ary, "offset", intern->offset);
add_assoc_long(&arr, "partition", intern->partition);
add_assoc_long(&arr, "offset", intern->offset);
Z_TRY_ADDREF_P(&intern->metadata);
add_assoc_zval(&arr, "metadata", &intern->metadata);

return Z_ARRVAL(ary);
return Z_ARRVAL(arr);
}
/* }}} */

void kafka_topic_partition_init(zval *zobj, char * topic, int32_t partition, int64_t offset) /* {{{ */
void kafka_topic_partition_init(zval *zobj, char * topic, int32_t partition, int64_t offset, zval *metadata) /* {{{ */
{
object_intern *intern;

Expand All @@ -139,6 +141,12 @@ void kafka_topic_partition_init(zval *zobj, char * topic, int32_t partition, int

intern->partition = partition;
intern->offset = offset;

if (!metadata) {
ZVAL_NULL(&intern->metadata);
} else {
intern->metadata = *metadata;
}
} /* }}} */

void kafka_topic_partition_list_to_array(zval *return_value, rd_kafka_topic_partition_list_t *list) /* {{{ */
Expand All @@ -153,7 +161,7 @@ void kafka_topic_partition_list_to_array(zval *return_value, rd_kafka_topic_part
topar = &list->elems[i];
ZVAL_NULL(&ztopar);
object_init_ex(&ztopar, ce_kafka_topic_partition);
kafka_topic_partition_init(&ztopar, topar->topic, topar->partition, topar->offset);
kafka_topic_partition_init(&ztopar, topar->topic, topar->partition, topar->offset, topar->metadata);
add_next_index_zval(return_value, &ztopar);
}
} /* }}} */
Expand Down Expand Up @@ -199,23 +207,25 @@ rd_kafka_topic_partition_list_t * array_arg_to_kafka_topic_partition_list(int ar
} /* }}} */


/* {{{ proto void SimpleKafkaClient\TopicPartition::__construct(string $topic, int $partition[, int $offset])
/* {{{ proto void SimpleKafkaClient\TopicPartition::__construct(string $topic, int $partition[, int $offset, mixed $metadata])
Constructor */
ZEND_METHOD(SimpleKafkaClient_TopicPartition, __construct)
{
char *topic;
size_t topic_len;
zend_long partition;
zend_long offset = 0;
zval *metadata = NULL;

ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 2, 3)
ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 2, 4)
Z_PARAM_STRING(topic, topic_len)
Z_PARAM_LONG(partition)
Z_PARAM_OPTIONAL
Z_PARAM_LONG(offset)
Z_PARAM_ZVAL_OR_NULL(metadata)
ZEND_PARSE_PARAMETERS_END();

kafka_topic_partition_init(getThis(), topic, partition, offset);
kafka_topic_partition_init(getThis(), topic, partition, offset, metadata);
}
/* }}} */

Expand Down Expand Up @@ -348,6 +358,46 @@ ZEND_METHOD(SimpleKafkaClient_TopicPartition, setOffset)
}
/* }}} */

/* {{{ proto int SimpleKafkaClient\TopicPartition::getMetadata()
Returns offset */
ZEND_METHOD(SimpleKafkaClient_TopicPartition, getMetadata)
{
object_intern *intern;

ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 0)
ZEND_PARSE_PARAMETERS_END();

intern = get_object(getThis());
if (!intern) {
return;
}

RETURN_ZVAL(&intern->metadata, 1, 0);
}
/* }}} */

/* {{{ proto TopicPartition SimpleKafkaClient\TopicPartition::setMetadata($metadata)
Sets metadata */
ZEND_METHOD(SimpleKafkaClient_TopicPartition, setMetadata)
{
zval *metadata = NULL;
object_intern *intern;

ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 1)
Z_PARAM_ZVAL(metadata)
ZEND_PARSE_PARAMETERS_END();

intern = get_object(getThis());
if (!intern) {
return;
}

intern->metadata = *metadata;

RETURN_ZVAL(getThis(), 1, 0);
}
/* }}} */

void kafka_metadata_topic_partition_init(INIT_FUNC_ARGS) /* {{{ */
{
zend_class_entry tmpce;
Expand Down
8 changes: 7 additions & 1 deletion topic_partition.stub.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

class TopicPartition
{
public function __construct(string $topicName, int $partition, int $offset = 0) {}
public function __construct(string $topicName, int $partition, int $offset = 0, mixed $metadata = null) {}

public function getTopicName(): ?string {}

Expand All @@ -19,4 +19,10 @@ public function setPartition(int $partition): TopicPartition {}
public function getOffset(): int {}

public function setOffset(int $offset): TopicPartition {}

/** @param mixed $metadata */
public function setMetadata($metadata): TopicPartition {}

/** @return mixed */
public function getMetadata() {}
}
14 changes: 13 additions & 1 deletion topic_partition_arginfo.h
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
/* This is a generated file, edit the .stub.php file instead.
* Stub hash: 95f09c698079d00927dd2d02910325d6aff76157 */
* Stub hash: 32f5fa0f34d7d0fc80d6432664900f81926e915c */

ZEND_BEGIN_ARG_INFO_EX(arginfo_class_SimpleKafkaClient_TopicPartition___construct, 0, 0, 2)
ZEND_ARG_TYPE_INFO(0, topicName, IS_STRING, 0)
ZEND_ARG_TYPE_INFO(0, partition, IS_LONG, 0)
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, offset, IS_LONG, 0, "0")
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, metadata, IS_MIXED, 0, "null")
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_TopicPartition_getTopicName, 0, 0, IS_STRING, 1)
Expand All @@ -27,6 +28,13 @@ ZEND_BEGIN_ARG_WITH_RETURN_OBJ_INFO_EX(arginfo_class_SimpleKafkaClient_TopicPart
ZEND_ARG_TYPE_INFO(0, offset, IS_LONG, 0)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_WITH_RETURN_OBJ_INFO_EX(arginfo_class_SimpleKafkaClient_TopicPartition_setMetadata, 0, 1, SimpleKafkaClient\\TopicPartition, 0)
ZEND_ARG_INFO(0, metadata)
ZEND_END_ARG_INFO()

ZEND_BEGIN_ARG_INFO_EX(arginfo_class_SimpleKafkaClient_TopicPartition_getMetadata, 0, 0, 0)
ZEND_END_ARG_INFO()


ZEND_METHOD(SimpleKafkaClient_TopicPartition, __construct);
ZEND_METHOD(SimpleKafkaClient_TopicPartition, getTopicName);
Expand All @@ -35,6 +43,8 @@ ZEND_METHOD(SimpleKafkaClient_TopicPartition, getPartition);
ZEND_METHOD(SimpleKafkaClient_TopicPartition, setPartition);
ZEND_METHOD(SimpleKafkaClient_TopicPartition, getOffset);
ZEND_METHOD(SimpleKafkaClient_TopicPartition, setOffset);
ZEND_METHOD(SimpleKafkaClient_TopicPartition, setMetadata);
ZEND_METHOD(SimpleKafkaClient_TopicPartition, getMetadata);


static const zend_function_entry class_SimpleKafkaClient_TopicPartition_methods[] = {
Expand All @@ -45,5 +55,7 @@ static const zend_function_entry class_SimpleKafkaClient_TopicPartition_methods[
ZEND_ME(SimpleKafkaClient_TopicPartition, setPartition, arginfo_class_SimpleKafkaClient_TopicPartition_setPartition, ZEND_ACC_PUBLIC)
ZEND_ME(SimpleKafkaClient_TopicPartition, getOffset, arginfo_class_SimpleKafkaClient_TopicPartition_getOffset, ZEND_ACC_PUBLIC)
ZEND_ME(SimpleKafkaClient_TopicPartition, setOffset, arginfo_class_SimpleKafkaClient_TopicPartition_setOffset, ZEND_ACC_PUBLIC)
ZEND_ME(SimpleKafkaClient_TopicPartition, setMetadata, arginfo_class_SimpleKafkaClient_TopicPartition_setMetadata, ZEND_ACC_PUBLIC)
ZEND_ME(SimpleKafkaClient_TopicPartition, getMetadata, arginfo_class_SimpleKafkaClient_TopicPartition_getMetadata, ZEND_ACC_PUBLIC)
ZEND_FE_END
};