Skip to content

Commit e96a76b

Browse files
authoredApr 24, 2021
unify free and new for producer and consumer (#45)
* unify free and new for producer and consumer * fix syntax * save stubs * fix param parsing * fix method * switch to count
1 parent 5145cbc commit e96a76b

11 files changed

+45
-91
lines changed
 

Diff for: ‎consumer.c

+2-55
Original file line numberDiff line numberDiff line change
@@ -40,47 +40,7 @@
4040
#include "Zend/zend_exceptions.h"
4141
#include "consumer_arginfo.h"
4242

43-
static zend_class_entry * ce;
44-
static zend_object_handlers handlers;
45-
46-
static void kafka_consumer_free(zend_object *object) /* {{{ */
47-
{
48-
kafka_object *intern = php_kafka_from_obj(kafka_object, object);
49-
rd_kafka_resp_err_t err;
50-
kafka_conf_callbacks_dtor(&intern->cbs);
51-
52-
if (intern->rk) {
53-
err = rd_kafka_consumer_close(intern->rk);
54-
55-
if (err) {
56-
php_error(E_WARNING, "rd_kafka_consumer_close failed: %s", rd_kafka_err2str(err));
57-
}
58-
59-
rd_kafka_destroy(intern->rk);
60-
intern->rk = NULL;
61-
}
62-
63-
kafka_conf_callbacks_dtor(&intern->cbs);
64-
65-
zend_object_std_dtor(&intern->std);
66-
}
67-
/* }}} */
68-
69-
static zend_object *kafka_consumer_new(zend_class_entry *class_type) /* {{{ */
70-
{
71-
zend_object* retval;
72-
kafka_object *intern;
73-
74-
intern = ecalloc(1, sizeof(kafka_object)+ zend_object_properties_size(class_type));
75-
zend_object_std_init(&intern->std, class_type);
76-
object_properties_init(&intern->std, class_type);
77-
78-
retval = &intern->std;
79-
retval->handlers = &handlers;
80-
81-
return retval;
82-
}
83-
/* }}} */
43+
zend_class_entry * ce_kafka_consumer;
8444

8545
static int has_group_id(rd_kafka_conf_t *conf) { /* {{{ */
8646

@@ -165,7 +125,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, assign)
165125

166126
ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 1)
167127
Z_PARAM_OPTIONAL
168-
Z_PARAM_ARRAY_HT(htopars)
128+
Z_PARAM_ARRAY_HT_OR_NULL(htopars)
169129
ZEND_PARSE_PARAMETERS_END();
170130

171131
intern = get_kafka_object(getThis());
@@ -703,16 +663,3 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, queryWatermarkOffsets)
703663
ZVAL_LONG(highResult, high);
704664
}
705665
/* }}} */
706-
707-
void kafka_consumer_init(INIT_FUNC_ARGS) /* {{{ */
708-
{
709-
zend_class_entry tmpce;
710-
711-
INIT_NS_CLASS_ENTRY(tmpce, "SimpleKafkaClient", "Consumer", class_SimpleKafkaClient_Consumer_methods);
712-
ce = zend_register_internal_class(&tmpce);
713-
ce->create_object = kafka_consumer_new;
714-
715-
handlers = kafka_default_object_handlers;
716-
handlers.free_obj = kafka_consumer_free;
717-
handlers.offset = XtOffsetOf(kafka_object, std);
718-
}

Diff for: ‎consumer.stub.php

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ class Consumer
88
{
99
public function __construct(Configuration $configuration) {}
1010

11-
public function assign(?array $topics): void {}
11+
public function assign(?array $topics = null): void {}
1212

1313
public function getAssignment(): array {}
1414

Diff for: ‎consumer_arginfo.h

+3-3
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/* This is a generated file, edit the .stub.php file instead.
2-
* Stub hash: ba3bc0a741bc6eab7a23a15ca6d83c24e99b23de */
2+
* Stub hash: 091c6b60081bb08ec174ef87b9cc6d2b3fbba461 */
33

44
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer___construct, 0, 0, 1)
55
ZEND_ARG_OBJ_INFO(0, configuration, SimpleKafkaClient\\Configuration, 0)
66
ZEND_END_ARG_INFO()
77

8-
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer_assign, 0, 1, IS_VOID, 0)
9-
ZEND_ARG_TYPE_INFO(0, topics, IS_ARRAY, 1)
8+
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer_assign, 0, 0, IS_VOID, 0)
9+
ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, topics, IS_ARRAY, 1, "null")
1010
ZEND_END_ARG_INFO()
1111

1212
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Consumer_getAssignment, 0, 0, IS_ARRAY, 0)

Diff for: ‎kafka_arginfo.h

+12-12
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,28 @@
11
/* This is a generated file, edit the .stub.php file instead.
2-
* Stub hash: aac20095e4ad448dfdc0f3a25d87cbb17f9f1581 */
2+
* Stub hash: 5620609ea29ca05a20736ac8412bee6e4cc39615 */
33

4-
ZEND_BEGIN_ARG_WITH_RETURN_OBJ_INFO_EX(arginfo_class_SimpleKafkaClient_SimpleKafkaClient_getMetadata, 0, 3, SimpleKafkaClient\\Metadata, 0)
4+
ZEND_BEGIN_ARG_WITH_RETURN_OBJ_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_getMetadata, 0, 3, SimpleKafkaClient\\Metadata, 0)
55
ZEND_ARG_TYPE_INFO(0, allTopics, _IS_BOOL, 0)
66
ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0)
77
ZEND_ARG_OBJ_INFO(0, topic, SimpleKafkaClient\\Topic, 0)
88
ZEND_END_ARG_INFO()
99

10-
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_SimpleKafkaClient_getOutQLen, 0, 0, IS_LONG, 0)
10+
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_getOutQLen, 0, 0, IS_LONG, 0)
1111
ZEND_END_ARG_INFO()
1212

13-
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_SimpleKafkaClient_poll, 0, 1, IS_LONG, 0)
13+
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_poll, 0, 1, IS_LONG, 0)
1414
ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0)
1515
ZEND_END_ARG_INFO()
1616

17-
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_SimpleKafkaClient_queryWatermarkOffsets, 0, 5, IS_VOID, 0)
17+
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_queryWatermarkOffsets, 0, 5, IS_VOID, 0)
1818
ZEND_ARG_TYPE_INFO(0, topic, IS_STRING, 0)
1919
ZEND_ARG_TYPE_INFO(0, partition, IS_LONG, 0)
2020
ZEND_ARG_TYPE_INFO(1, low, IS_LONG, 0)
2121
ZEND_ARG_TYPE_INFO(1, high, IS_LONG, 0)
2222
ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0)
2323
ZEND_END_ARG_INFO()
2424

25-
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_SimpleKafkaClient_offsetsForTimes, 0, 2, IS_ARRAY, 0)
25+
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Kafka_offsetsForTimes, 0, 2, IS_ARRAY, 0)
2626
ZEND_ARG_TYPE_INFO(0, topicPartitions, IS_ARRAY, 0)
2727
ZEND_ARG_TYPE_INFO(0, timeoutMs, IS_LONG, 0)
2828
ZEND_END_ARG_INFO()
@@ -35,11 +35,11 @@ ZEND_METHOD(SimpleKafkaClient_Kafka, queryWatermarkOffsets);
3535
ZEND_METHOD(SimpleKafkaClient_Kafka, offsetsForTimes);
3636

3737

38-
static const zend_function_entry class_SimpleKafkaClient_SimpleKafkaClient_methods[] = {
39-
ZEND_ME(SimpleKafkaClient_Kafka, getMetadata, arginfo_class_SimpleKafkaClient_SimpleKafkaClient_getMetadata, ZEND_ACC_PUBLIC)
40-
ZEND_ME(SimpleKafkaClient_Kafka, getOutQLen, arginfo_class_SimpleKafkaClient_SimpleKafkaClient_getOutQLen, ZEND_ACC_PUBLIC)
41-
ZEND_ME(SimpleKafkaClient_Kafka, poll, arginfo_class_SimpleKafkaClient_SimpleKafkaClient_poll, ZEND_ACC_PUBLIC)
42-
ZEND_ME(SimpleKafkaClient_Kafka, queryWatermarkOffsets, arginfo_class_SimpleKafkaClient_SimpleKafkaClient_queryWatermarkOffsets, ZEND_ACC_PUBLIC)
43-
ZEND_ME(SimpleKafkaClient_Kafka, offsetsForTimes, arginfo_class_SimpleKafkaClient_SimpleKafkaClient_offsetsForTimes, ZEND_ACC_PUBLIC)
38+
static const zend_function_entry class_SimpleKafkaClient_Kafka_methods[] = {
39+
ZEND_ME(SimpleKafkaClient_Kafka, getMetadata, arginfo_class_SimpleKafkaClient_Kafka_getMetadata, ZEND_ACC_PUBLIC)
40+
ZEND_ME(SimpleKafkaClient_Kafka, getOutQLen, arginfo_class_SimpleKafkaClient_Kafka_getOutQLen, ZEND_ACC_PUBLIC)
41+
ZEND_ME(SimpleKafkaClient_Kafka, poll, arginfo_class_SimpleKafkaClient_Kafka_poll, ZEND_ACC_PUBLIC)
42+
ZEND_ME(SimpleKafkaClient_Kafka, queryWatermarkOffsets, arginfo_class_SimpleKafkaClient_Kafka_queryWatermarkOffsets, ZEND_ACC_PUBLIC)
43+
ZEND_ME(SimpleKafkaClient_Kafka, offsetsForTimes, arginfo_class_SimpleKafkaClient_Kafka_offsetsForTimes, ZEND_ACC_PUBLIC)
4444
ZEND_FE_END
4545
};

Diff for: ‎metadata_arginfo.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/* This is a generated file, edit the .stub.php file instead.
2-
* Stub hash: da83d0319c899361606dfa0ccf0fd439aeeabfbb */
2+
* Stub hash: cbb5ab5aee4d07e0673bef67dcc2d045303ebfbd */
33

44
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Metadata_getOrigBrokerId, 0, 0, IS_LONG, 0)
55
ZEND_END_ARG_INFO()

Diff for: ‎metadata_partition_arginfo.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/* This is a generated file, edit the .stub.php file instead.
2-
* Stub hash: 934cef11a377e54b4d5f8cea75e6d590ec071d50 */
2+
* Stub hash: 207c49cb01d8b564c1419d2c24d332cc321420f5 */
33

44
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Metadata_Partition_getId, 0, 0, IS_LONG, 0)
55
ZEND_END_ARG_INFO()

Diff for: ‎metadata_topic_arginfo.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/* This is a generated file, edit the .stub.php file instead.
2-
* Stub hash: 9d73f729b3dca2b6ac7fd5fdc39ba23d768ca792 */
2+
* Stub hash: db8552307bc3c0d4d6035ff10c00b7e2a39a152a */
33

44
ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Metadata_Topic_getName, 0, 0, IS_STRING, 0)
55
ZEND_END_ARG_INFO()

Diff for: ‎php_simple_kafka_client_int.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,7 @@ typedef void (*kafka_metadata_collection_ctor_t)(zval *renurn_value, zval *zmeta
123123
#endif
124124

125125
extern zend_class_entry * ce_kafka_conf;
126+
extern zend_class_entry * ce_kafka_consumer;
126127
extern zend_class_entry * ce_kafka_error_exception;
127128
extern zend_class_entry * ce_kafka_exception;
128129
extern zend_class_entry * ce_kafka_producer;
@@ -189,7 +190,6 @@ static inline char *kafka_hash_get_current_key_ex(HashTable *ht, HashPosition *p
189190
void kafka_error_init();
190191
void create_kafka_error(zval *return_value, const rd_kafka_error_t *error);
191192
void kafka_conf_init(INIT_FUNC_ARGS);
192-
void kafka_consumer_init(INIT_FUNC_ARGS);
193193
void kafka_conf_callbacks_dtor(kafka_conf_callbacks *cbs);
194194
void kafka_conf_callbacks_copy(kafka_conf_callbacks *to, kafka_conf_callbacks *from);
195195
void kafka_message_init(INIT_FUNC_ARGS);

Diff for: ‎simple_kafka_client.c

+17-3
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
#include "ext/standard/info.h"
4343
#include "php_simple_kafka_client_int.h"
4444
#include "Zend/zend_exceptions.h"
45+
#include "consumer_arginfo.h"
4546
#include "functions_arginfo.h"
4647
#include "producer_arginfo.h"
4748
#include "kafka_arginfo.h"
@@ -66,7 +67,17 @@ static void kafka_free(zend_object *object) /* {{{ */
6667
kafka_object *intern = php_kafka_from_obj(kafka_object, object);
6768

6869
if (intern->rk) {
69-
zend_hash_destroy(&intern->topics);
70+
if (RD_KAFKA_CONSUMER == intern->type) {
71+
rd_kafka_resp_err_t err;
72+
73+
err = rd_kafka_consumer_close(intern->rk);
74+
75+
if (err) {
76+
php_error(E_WARNING, "rd_kafka_consumer_close failed: %s", rd_kafka_err2str(err));
77+
}
78+
} else if (RD_KAFKA_PRODUCER == intern->type) {
79+
zend_hash_destroy(&intern->topics);
80+
}
7081

7182
rd_kafka_destroy(intern->rk);
7283
intern->rk = NULL;
@@ -332,17 +343,20 @@ PHP_MINIT_FUNCTION(simple_kafka_client)
332343
kafka_object_handlers.free_obj = kafka_free;
333344
kafka_object_handlers.offset = XtOffsetOf(kafka_object, std);
334345

335-
INIT_CLASS_ENTRY(ce, "SimpleKafkaClient", class_SimpleKafkaClient_SimpleKafkaClient_methods);
346+
INIT_CLASS_ENTRY(ce, "SimpleKafkaClient", class_SimpleKafkaClient_Kafka_methods);
336347
ce_kafka = zend_register_internal_class(&ce);
337348
ce_kafka->ce_flags |= ZEND_ACC_EXPLICIT_ABSTRACT_CLASS;
338349
ce_kafka->create_object = kafka_new;
339350

340351
INIT_NS_CLASS_ENTRY(ce, "SimpleKafkaClient", "Producer", class_SimpleKafkaClient_Producer_methods);
341352
ce_kafka_producer = zend_register_internal_class_ex(&ce, ce_kafka);
342353

354+
INIT_NS_CLASS_ENTRY(ce, "SimpleKafkaClient", "Consumer", class_SimpleKafkaClient_Consumer_methods);
355+
ce_kafka_consumer = zend_register_internal_class(&ce);
356+
ce_kafka_consumer->create_object = kafka_new;
357+
343358
kafka_conf_init(INIT_FUNC_ARGS_PASSTHRU);
344359
kafka_error_init();
345-
kafka_consumer_init(INIT_FUNC_ARGS_PASSTHRU);
346360
kafka_message_init(INIT_FUNC_ARGS_PASSTHRU);
347361
kafka_metadata_init(INIT_FUNC_ARGS_PASSTHRU);
348362
kafka_metadata_topic_partition_init(INIT_FUNC_ARGS_PASSTHRU);

Diff for: ‎tests/conf_callbacks_integration.phpt

+5-12
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,9 @@ $conf->set('statistics.interval.ms', 10);
4242
$conf->set('log_level', (string) LOG_DEBUG);
4343
$conf->set('debug', 'all');
4444

45-
$conf->setOffsetCommitCb(function ($consumer, $error, $topicPartitions) {
46-
echo "Offset " . $topicPartitions[0]->getOffset() . " committed.\n";
45+
$offsetCommitCount = 0;
46+
$conf->setOffsetCommitCb(function ($consumer, $error, $topicPartitions) use (&$offsetCommitCount) {
47+
++$offsetCommitCount;
4748
});
4849

4950
$statsCbCalled = false;
@@ -102,22 +103,14 @@ while (true) {
102103
$consumer->commit($msg);
103104
}
104105

106+
var_dump($offsetCommitCount);
105107
var_dump($statsCbCalled);
106108
var_dump($logCbCalled);
107109
var_dump($topicsAssigned);
108110
var_dump($delivered);
109111

110112
--EXPECT--
111-
Offset 1 committed.
112-
Offset 2 committed.
113-
Offset 3 committed.
114-
Offset 4 committed.
115-
Offset 5 committed.
116-
Offset 6 committed.
117-
Offset 7 committed.
118-
Offset 8 committed.
119-
Offset 9 committed.
120-
Offset 10 committed.
113+
int(10)
121114
bool(true)
122115
bool(true)
123116
bool(true)

Diff for: ‎topic_partition_arginfo.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/* This is a generated file, edit the .stub.php file instead.
2-
* Stub hash: 72b2c9a25e8751ae022cc233f4b7a0e382be72f8 */
2+
* Stub hash: 95f09c698079d00927dd2d02910325d6aff76157 */
33

44
ZEND_BEGIN_ARG_INFO_EX(arginfo_class_SimpleKafkaClient_TopicPartition___construct, 0, 0, 2)
55
ZEND_ARG_TYPE_INFO(0, topicName, IS_STRING, 0)

0 commit comments

Comments
 (0)
Please sign in to comment.