Skip to content

Commit 319967c

Browse files
committed
Common code for php 5 and 7
1 parent c33871b commit 319967c

25 files changed

+615
-280
lines changed

.travis.yml

+2
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ php:
55
- 5.4
66
- 5.5
77
- 5.6
8+
- 7.0
9+
- 7.1
810
env:
911
- LIBRDKAFKA_VERSION=0.8.6
1012
- LIBRDKAFKA_VERSION=0.8

CREDITS

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
rdkafka
2-
Arnaud Le Blanc [lbarnaud] <[email protected]> (lead)
2+
Arnaud Le Blanc

conf.c

+86-112
Original file line numberDiff line numberDiff line change
@@ -31,42 +31,51 @@
3131
#include "conf.h"
3232
#include "topic_partition.h"
3333
#include "message.h"
34+
#include "zeval.h"
3435

3536
zend_class_entry * ce_kafka_conf;
3637
zend_class_entry * ce_kafka_topic_conf;
3738

39+
static zend_object_handlers handlers;
40+
3841
static void kafka_conf_callback_dtor(kafka_conf_callback *cb TSRMLS_DC) /* {{{ */
3942
{
40-
if (cb->fci.function_name) {
43+
if (cb) {
4144
zval_ptr_dtor(&cb->fci.function_name);
45+
efree(cb);
4246
}
4347
} /* }}} */
4448

4549
void kafka_conf_callbacks_dtor(kafka_conf_callbacks *cbs TSRMLS_DC) /* {{{ */
4650
{
47-
kafka_conf_callback_dtor(&cbs->error TSRMLS_CC);
48-
kafka_conf_callback_dtor(&cbs->rebalance TSRMLS_CC);
49-
kafka_conf_callback_dtor(&cbs->dr_msg TSRMLS_CC);
51+
kafka_conf_callback_dtor(cbs->error TSRMLS_CC);
52+
kafka_conf_callback_dtor(cbs->rebalance TSRMLS_CC);
53+
kafka_conf_callback_dtor(cbs->dr_msg TSRMLS_CC);
5054
} /* }}} */
5155

52-
static void kafka_conf_callback_copy(kafka_conf_callback *to, kafka_conf_callback *from TSRMLS_DC) /* {{{ */
56+
static void kafka_conf_callback_copy(kafka_conf_callback **to, kafka_conf_callback *from TSRMLS_DC) /* {{{ */
5357
{
54-
*to = *from;
55-
if (to->fci.function_name) {
56-
Z_ADDREF_P(to->fci.function_name);
58+
if (from) {
59+
*to = emalloc(sizeof(**to));
60+
**to = *from;
61+
#if PHP_MAJOR_VERSION >= 7
62+
zval_copy_ctor(&(*to)->fci.function_name);
63+
#else
64+
Z_ADDREF_P((*to)->fci.function_name);
65+
#endif
5766
}
5867
} /* }}} */
5968

6069
void kafka_conf_callbacks_copy(kafka_conf_callbacks *to, kafka_conf_callbacks *from TSRMLS_DC) /* {{{ */
6170
{
62-
kafka_conf_callback_copy(&to->error, &from->error TSRMLS_CC);
63-
kafka_conf_callback_copy(&to->rebalance, &from->rebalance TSRMLS_CC);
64-
kafka_conf_callback_copy(&to->dr_msg, &from->dr_msg TSRMLS_CC);
71+
kafka_conf_callback_copy(&to->error, from->error TSRMLS_CC);
72+
kafka_conf_callback_copy(&to->rebalance, from->rebalance TSRMLS_CC);
73+
kafka_conf_callback_copy(&to->dr_msg, from->dr_msg TSRMLS_CC);
6574
} /* }}} */
6675

67-
static void kafka_conf_free(void *object TSRMLS_DC) /* {{{ */
76+
static void kafka_conf_free(zend_object *object TSRMLS_DC) /* {{{ */
6877
{
69-
kafka_conf_object *intern = (kafka_conf_object*)object;
78+
kafka_conf_object *intern = get_custom_object(kafka_conf_object, object);
7079

7180
switch (intern->type) {
7281
case KAFKA_CONF:
@@ -84,7 +93,7 @@ static void kafka_conf_free(void *object TSRMLS_DC) /* {{{ */
8493

8594
zend_object_std_dtor(&intern->std TSRMLS_CC);
8695

87-
efree(intern);
96+
free_custom_object(intern);
8897
}
8998
/* }}} */
9099

@@ -97,8 +106,8 @@ static zend_object_value kafka_conf_new(zend_class_entry *class_type TSRMLS_DC)
97106
zend_object_std_init(&intern->std, class_type TSRMLS_CC);
98107
object_properties_init(&intern->std, class_type);
99108

100-
retval.handle = zend_objects_store_put(&intern->std, (zend_objects_store_dtor_t) zend_objects_destroy_object, kafka_conf_free, NULL TSRMLS_CC);
101-
retval.handlers = &kafka_object_handlers;
109+
STORE_OBJECT(retval, intern, (zend_objects_store_dtor_t) zend_objects_destroy_object, kafka_conf_free, NULL);
110+
SET_OBJECT_HANDLERS(retval, &handlers);
102111

103112
return retval;
104113
}
@@ -119,131 +128,86 @@ kafka_conf_object * get_kafka_conf_object(zval *zconf TSRMLS_DC)
119128
static void kafka_conf_error_cb(rd_kafka_t *rk, int err, const char *reason, void *opaque)
120129
{
121130
kafka_conf_callbacks *cbs = (kafka_conf_callbacks*) opaque;
122-
zval *retval;
123-
zval **args[3];
124-
zval *zrk;
125-
zval *zerr;
126-
zval *zreason;
131+
zeval args[3];
127132
TSRMLS_FETCH();
128133

129134
if (!opaque) {
130135
return;
131136
}
132137

133-
if (!cbs->error.fci.function_name) {
138+
if (!cbs->error) {
134139
return;
135140
}
136141

137-
ALLOC_INIT_ZVAL(zrk);
138-
KAFKA_ZVAL_ZVAL(zrk, &cbs->rk, 1, 0);
139-
140-
ALLOC_INIT_ZVAL(zerr);
141-
ZVAL_LONG(zerr, err);
142-
143-
ALLOC_INIT_ZVAL(zreason);
144-
ZVAL_STRING(zreason, reason, 1);
142+
MAKE_STD_ZEVAL(args[0]);
143+
MAKE_STD_ZEVAL(args[1]);
144+
MAKE_STD_ZEVAL(args[2]);
145145

146-
args[0] = &zrk;
147-
args[1] = &zerr;
148-
args[2] = &zreason;
146+
KAFKA_ZVAL_ZVAL(P_ZEVAL(args[0]), &cbs->rk, 1, 0);
147+
ZVAL_LONG(P_ZEVAL(args[1]), err);
148+
RDKAFKA_ZVAL_STRING(P_ZEVAL(args[2]), reason);
149149

150-
cbs->error.fci.retval_ptr_ptr = &retval;
151-
cbs->error.fci.params = args;
152-
cbs->error.fci.param_count = 3;
150+
rdkafka_call_function(&cbs->error->fci, &cbs->error->fcc, NULL, 3, args TSRMLS_CC);
153151

154-
zend_call_function(&cbs->error.fci, &cbs->error.fcc TSRMLS_CC);
155-
156-
if (retval) {
157-
zval_ptr_dtor(&retval);
158-
}
159-
zval_ptr_dtor(&zrk);
160-
zval_ptr_dtor(&zerr);
161-
zval_ptr_dtor(&zreason);
152+
zval_ptr_dtor(&args[0]);
153+
zval_ptr_dtor(&args[1]);
154+
zval_ptr_dtor(&args[2]);
162155
}
163156

164157
static void kafka_conf_dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *msg, void *opaque)
165158
{
166159
kafka_conf_callbacks *cbs = (kafka_conf_callbacks*) opaque;
167-
zval *retval;
168-
zval **args[2];
169-
zval *zrk;
170-
zval *zrkmsg;
160+
zeval args[2];
171161
TSRMLS_FETCH();
172162

173163
if (!opaque) {
174164
return;
175165
}
176166

177-
if (!cbs->dr_msg.fci.function_name) {
167+
if (!cbs->dr_msg) {
178168
return;
179169
}
180170

181-
ALLOC_INIT_ZVAL(zrk);
182-
KAFKA_ZVAL_ZVAL(zrk, &cbs->rk, 1, 0);
183-
184-
ALLOC_INIT_ZVAL(zrkmsg);
185-
kafka_message_new(zrkmsg, msg TSRMLS_CC);
171+
MAKE_STD_ZEVAL(args[0]);
172+
MAKE_STD_ZEVAL(args[1]);
186173

187-
args[0] = &zrk;
188-
args[1] = &zrkmsg;
174+
KAFKA_ZVAL_ZVAL(P_ZEVAL(args[0]), &cbs->rk, 1, 0);
175+
kafka_message_new(P_ZEVAL(args[1]), msg TSRMLS_CC);
189176

190-
cbs->dr_msg.fci.retval_ptr_ptr = &retval;
191-
cbs->dr_msg.fci.params = args;
192-
cbs->dr_msg.fci.param_count = 2;
177+
rdkafka_call_function(&cbs->dr_msg->fci, &cbs->dr_msg->fcc, NULL, 2, args TSRMLS_CC);
193178

194-
zend_call_function(&cbs->dr_msg.fci, &cbs->dr_msg.fcc TSRMLS_CC);
195-
196-
if (retval) {
197-
zval_ptr_dtor(&retval);
198-
}
199-
zval_ptr_dtor(&zrk);
200-
zval_ptr_dtor(&zrkmsg);
179+
zval_ptr_dtor(&args[0]);
180+
zval_ptr_dtor(&args[1]);
201181
}
202182

203183
#ifdef HAVE_NEW_KAFKA_CONSUMER
204184
static void kafka_conf_rebalance_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, rd_kafka_topic_partition_list_t *partitions, void *opaque)
205185
{
206186
kafka_conf_callbacks *cbs = (kafka_conf_callbacks*) opaque;
207-
zval *retval;
208-
zval **args[3];
209-
zval *zrk;
210-
zval *zerr;
211-
zval *zpartitions;
187+
zeval args[3];
212188
TSRMLS_FETCH();
213189

214190
if (!opaque) {
215191
return;
216192
}
217193

218-
if (!cbs->rebalance.fci.function_name) {
194+
if (!cbs->rebalance) {
219195
return;
220196
}
221197

222-
ALLOC_INIT_ZVAL(zrk);
223-
KAFKA_ZVAL_ZVAL(zrk, &cbs->rk, 1, 0);
224-
225-
ALLOC_INIT_ZVAL(zerr);
226-
ZVAL_LONG(zerr, err);
227-
228-
ALLOC_INIT_ZVAL(zpartitions);
229-
kafka_topic_partition_list_to_array(zpartitions, partitions TSRMLS_CC);
198+
MAKE_STD_ZEVAL(args[0]);
199+
MAKE_STD_ZEVAL(args[1]);
200+
MAKE_STD_ZEVAL(args[2]);
230201

231-
args[0] = &zrk;
232-
args[1] = &zerr;
233-
args[2] = &zpartitions;
202+
KAFKA_ZVAL_ZVAL(P_ZEVAL(args[0]), &cbs->rk, 1, 0);
203+
ZVAL_LONG(P_ZEVAL(args[1]), err);
204+
kafka_topic_partition_list_to_array(P_ZEVAL(args[2]), partitions TSRMLS_CC);
234205

235-
cbs->rebalance.fci.retval_ptr_ptr = &retval;
236-
cbs->rebalance.fci.params = args;
237-
cbs->rebalance.fci.param_count = 3;
206+
rdkafka_call_function(&cbs->rebalance->fci, &cbs->rebalance->fcc, NULL, 3, args TSRMLS_CC);
238207

239-
zend_call_function(&cbs->rebalance.fci, &cbs->rebalance.fcc TSRMLS_CC);
240-
241-
if (retval) {
242-
zval_ptr_dtor(&retval);
243-
}
244-
zval_ptr_dtor(&zrk);
245-
zval_ptr_dtor(&zerr);
246-
zval_ptr_dtor(&zpartitions);
208+
zval_ptr_dtor(&args[0]);
209+
zval_ptr_dtor(&args[1]);
210+
zval_ptr_dtor(&args[2]);
247211
}
248212
#endif /* HAVE_NEW_KAFKA_CONSUMER */
249213

@@ -310,7 +274,7 @@ PHP_METHOD(RdKafka__Conf, dump)
310274
for (i = 0; i < cntp; i+=2) {
311275
const char *key = dump[i];
312276
const char *value = dump[i+1];
313-
add_assoc_string(return_value, (char*)key, (char*)value, 1);
277+
rdkafka_add_assoc_string(return_value, (char*)key, (char*)value);
314278
}
315279

316280
rd_kafka_conf_dump_free(dump, cntp);
@@ -328,9 +292,9 @@ ZEND_END_ARG_INFO()
328292
PHP_METHOD(RdKafka__Conf, set)
329293
{
330294
char *name;
331-
int name_len;
295+
arglen_t name_len;
332296
char *value;
333-
int value_len;
297+
arglen_t value_len;
334298
kafka_conf_object *intern;
335299
rd_kafka_conf_res_t ret = 0;
336300
char errstr[512];
@@ -425,14 +389,16 @@ PHP_METHOD(RdKafka__Conf, setErrorCb)
425389
return;
426390
}
427391

428-
Z_ADDREF_P(fci.function_name);
392+
Z_ADDREF_P(P_ZEVAL(fci.function_name));
429393

430-
if (intern->cbs.error.fci.function_name) {
431-
zval_ptr_dtor(&intern->cbs.error.fci.function_name);
394+
if (intern->cbs.error) {
395+
zval_ptr_dtor(&intern->cbs.error->fci.function_name);
396+
} else {
397+
intern->cbs.error = ecalloc(1, sizeof(*intern->cbs.error));
432398
}
433399

434-
intern->cbs.error.fci = fci;
435-
intern->cbs.error.fcc = fcc;
400+
intern->cbs.error->fci = fci;
401+
intern->cbs.error->fcc = fcc;
436402

437403
rd_kafka_conf_set_error_cb(intern->u.conf, kafka_conf_error_cb);
438404
}
@@ -460,14 +426,16 @@ PHP_METHOD(RdKafka__Conf, setDrMsgCb)
460426
return;
461427
}
462428

463-
Z_ADDREF_P(fci.function_name);
429+
Z_ADDREF_P(P_ZEVAL(fci.function_name));
464430

465-
if (intern->cbs.dr_msg.fci.function_name) {
466-
zval_ptr_dtor(&intern->cbs.dr_msg.fci.function_name);
431+
if (intern->cbs.dr_msg) {
432+
zval_ptr_dtor(&intern->cbs.dr_msg->fci.function_name);
433+
} else {
434+
intern->cbs.dr_msg = ecalloc(1, sizeof(*intern->cbs.dr_msg));
467435
}
468436

469-
intern->cbs.dr_msg.fci = fci;
470-
intern->cbs.dr_msg.fcc = fcc;
437+
intern->cbs.dr_msg->fci = fci;
438+
intern->cbs.dr_msg->fcc = fcc;
471439

472440
rd_kafka_conf_set_dr_msg_cb(intern->u.conf, kafka_conf_dr_msg_cb);
473441
}
@@ -496,14 +464,16 @@ PHP_METHOD(RdKafka__Conf, setRebalanceCb)
496464
return;
497465
}
498466

499-
Z_ADDREF_P(fci.function_name);
467+
Z_ADDREF_P(P_ZEVAL(fci.function_name));
500468

501-
if (intern->cbs.rebalance.fci.function_name) {
502-
zval_ptr_dtor(&intern->cbs.rebalance.fci.function_name);
469+
if (intern->cbs.rebalance) {
470+
zval_ptr_dtor(&intern->cbs.rebalance->fci.function_name);
471+
} else {
472+
intern->cbs.rebalance = ecalloc(1, sizeof(*intern->cbs.rebalance));
503473
}
504474

505-
intern->cbs.rebalance.fci = fci;
506-
intern->cbs.rebalance.fcc = fcc;
475+
intern->cbs.rebalance->fci = fci;
476+
intern->cbs.rebalance->fcc = fcc;
507477

508478
rd_kafka_conf_set_rebalance_cb(intern->u.conf, kafka_conf_rebalance_cb);
509479
}
@@ -597,6 +567,10 @@ void kafka_conf_minit(TSRMLS_D)
597567
{
598568
zend_class_entry tmpce;
599569

570+
handlers = kafka_default_object_handlers;
571+
set_object_handler_free_obj(&handlers, kafka_conf_free);
572+
set_object_handler_offset(&handlers, XtOffsetOf(kafka_conf_object, std));
573+
600574
INIT_NS_CLASS_ENTRY(tmpce, "RdKafka", "Conf", kafka_conf_fe);
601575
ce_kafka_conf = zend_register_internal_class(&tmpce TSRMLS_CC);
602576
ce_kafka_conf->create_object = kafka_conf_new;

conf.h

+9-4
Original file line numberDiff line numberDiff line change
@@ -38,19 +38,24 @@ typedef struct _kafka_conf_callback {
3838

3939
typedef struct _kafka_conf_callbacks {
4040
zval rk;
41-
kafka_conf_callback error;
42-
kafka_conf_callback rebalance;
43-
kafka_conf_callback dr_msg;
41+
kafka_conf_callback *error;
42+
kafka_conf_callback *rebalance;
43+
kafka_conf_callback *dr_msg;
4444
} kafka_conf_callbacks;
4545

4646
typedef struct _kafka_conf_object {
47-
zend_object std;
47+
#if PHP_MAJOR_VERSION < 7
48+
zend_object std;
49+
#endif
4850
kafka_conf_type type;
4951
union {
5052
rd_kafka_conf_t *conf;
5153
rd_kafka_topic_conf_t *topic_conf;
5254
} u;
5355
kafka_conf_callbacks cbs;
56+
#if PHP_MAJOR_VERSION >= 7
57+
zend_object std;
58+
#endif
5459
} kafka_conf_object;
5560

5661
kafka_conf_object * get_kafka_conf_object(zval *zconf TSRMLS_DC);

0 commit comments

Comments
 (0)