Skip to content

Commit 5145cbc

Browse files
authored
refactor object usage (#44)
* refactor object usage * increase timeout
1 parent b1fb81a commit 5145cbc

File tree

2 files changed

+35
-53
lines changed

2 files changed

+35
-53
lines changed

consumer.c

+34-52
Original file line numberDiff line numberDiff line change
@@ -40,18 +40,12 @@
4040
#include "Zend/zend_exceptions.h"
4141
#include "consumer_arginfo.h"
4242

43-
typedef struct _object_intern {
44-
rd_kafka_t *rk;
45-
kafka_conf_callbacks cbs;
46-
zend_object std;
47-
} object_intern;
48-
4943
static zend_class_entry * ce;
5044
static zend_object_handlers handlers;
5145

5246
static void kafka_consumer_free(zend_object *object) /* {{{ */
5347
{
54-
object_intern *intern = php_kafka_from_obj(object_intern, object);
48+
kafka_object *intern = php_kafka_from_obj(kafka_object, object);
5549
rd_kafka_resp_err_t err;
5650
kafka_conf_callbacks_dtor(&intern->cbs);
5751

@@ -75,9 +69,9 @@ static void kafka_consumer_free(zend_object *object) /* {{{ */
7569
static zend_object *kafka_consumer_new(zend_class_entry *class_type) /* {{{ */
7670
{
7771
zend_object* retval;
78-
object_intern *intern;
72+
kafka_object *intern;
7973

80-
intern = ecalloc(1, sizeof(object_intern)+ zend_object_properties_size(class_type));
74+
intern = ecalloc(1, sizeof(kafka_object)+ zend_object_properties_size(class_type));
8175
zend_object_std_init(&intern->std, class_type);
8276
object_properties_init(&intern->std, class_type);
8377

@@ -88,18 +82,6 @@ static zend_object *kafka_consumer_new(zend_class_entry *class_type) /* {{{ */
8882
}
8983
/* }}} */
9084

91-
static object_intern * get_object(zval *zconsumer) /* {{{ */
92-
{
93-
object_intern *oconsumer = Z_KAFKA_P(object_intern, zconsumer);
94-
95-
if (!oconsumer->rk) {
96-
zend_throw_exception_ex(NULL, 0, "SimpleKafkaClient\\Consumer::__construct() has not been called");
97-
return NULL;
98-
}
99-
100-
return oconsumer;
101-
} /* }}} */
102-
10385
static int has_group_id(rd_kafka_conf_t *conf) { /* {{{ */
10486

10587
size_t len;
@@ -125,15 +107,15 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, __construct)
125107
zval *zconf;
126108
char errstr[512];
127109
rd_kafka_t *rk;
128-
object_intern *intern;
110+
kafka_object *intern;
129111
kafka_conf_object *conf_intern;
130112
rd_kafka_conf_t *conf = NULL;
131113

132114
ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 1)
133115
Z_PARAM_OBJECT_OF_CLASS(zconf, ce_kafka_conf)
134116
ZEND_PARSE_PARAMETERS_END();
135117

136-
intern = Z_KAFKA_P(object_intern, getThis());
118+
intern = Z_KAFKA_P(kafka_object, getThis());
137119

138120
conf_intern = get_kafka_conf_object(zconf);
139121
if (conf_intern) {
@@ -175,7 +157,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, assign)
175157
HashTable *htopars = NULL;
176158
rd_kafka_topic_partition_list_t *topics;
177159
rd_kafka_resp_err_t err;
178-
object_intern *intern;
160+
kafka_object *intern;
179161

180162
if (zend_parse_parameters(ZEND_NUM_ARGS(), "|h!", &htopars) == FAILURE) {
181163
return;
@@ -186,7 +168,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, assign)
186168
Z_PARAM_ARRAY_HT(htopars)
187169
ZEND_PARSE_PARAMETERS_END();
188170

189-
intern = get_object(getThis());
171+
intern = get_kafka_object(getThis());
190172
if (!intern) {
191173
return;
192174
}
@@ -219,12 +201,12 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getAssignment)
219201
{
220202
rd_kafka_resp_err_t err;
221203
rd_kafka_topic_partition_list_t *topics;
222-
object_intern *intern;
204+
kafka_object *intern;
223205

224206
ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 0)
225207
ZEND_PARSE_PARAMETERS_END();
226208

227-
intern = get_object(getThis());
209+
intern = get_kafka_object(getThis());
228210
if (!intern) {
229211
return;
230212
}
@@ -247,7 +229,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, subscribe)
247229
{
248230
HashTable *htopics;
249231
HashPosition pos;
250-
object_intern *intern;
232+
kafka_object *intern;
251233
rd_kafka_topic_partition_list_t *topics;
252234
rd_kafka_resp_err_t err;
253235
zval *zv;
@@ -256,7 +238,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, subscribe)
256238
Z_PARAM_ARRAY_HT(htopics)
257239
ZEND_PARSE_PARAMETERS_END();
258240

259-
intern = get_object(getThis());
241+
intern = get_kafka_object(getThis());
260242
if (!intern) {
261243
return;
262244
}
@@ -287,13 +269,13 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getSubscription)
287269
{
288270
rd_kafka_resp_err_t err;
289271
rd_kafka_topic_partition_list_t *topics;
290-
object_intern *intern;
272+
kafka_object *intern;
291273
int i;
292274

293275
ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 0)
294276
ZEND_PARSE_PARAMETERS_END();
295277

296-
intern = get_object(getThis());
278+
intern = get_kafka_object(getThis());
297279
if (!intern) {
298280
return;
299281
}
@@ -319,13 +301,13 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getSubscription)
319301
Unsubscribe from the current subscription set */
320302
ZEND_METHOD(SimpleKafkaClient_Consumer, unsubscribe)
321303
{
322-
object_intern *intern;
304+
kafka_object *intern;
323305
rd_kafka_resp_err_t err;
324306

325307
ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 0)
326308
ZEND_PARSE_PARAMETERS_END();
327309

328-
intern = get_object(getThis());
310+
intern = get_kafka_object(getThis());
329311
if (!intern) {
330312
return;
331313
}
@@ -343,15 +325,15 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, unsubscribe)
343325
Consume message or get error event, triggers callbacks */
344326
ZEND_METHOD(SimpleKafkaClient_Consumer, consume)
345327
{
346-
object_intern *intern;
328+
kafka_object *intern;
347329
zend_long timeout_ms;
348330
rd_kafka_message_t *rkmessage, rkmessage_tmp = {0};
349331

350332
ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 1)
351333
Z_PARAM_LONG(timeout_ms)
352334
ZEND_PARSE_PARAMETERS_END();
353335

354-
intern = get_object(getThis());
336+
intern = get_kafka_object(getThis());
355337
if (!intern) {
356338
return;
357339
}
@@ -374,7 +356,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, consume)
374356
static void consumer_commit(int async, INTERNAL_FUNCTION_PARAMETERS) /* {{{ */
375357
{
376358
zval *zarg = NULL;
377-
object_intern *intern;
359+
kafka_object *intern;
378360
rd_kafka_topic_partition_list_t *offsets = NULL;
379361
rd_kafka_resp_err_t err;
380362

@@ -383,7 +365,7 @@ static void consumer_commit(int async, INTERNAL_FUNCTION_PARAMETERS) /* {{{ */
383365
Z_PARAM_ZVAL(zarg)
384366
ZEND_PARSE_PARAMETERS_END();
385367

386-
intern = get_object(getThis());
368+
intern = get_kafka_object(getThis());
387369
if (!intern) {
388370
return;
389371
}
@@ -476,12 +458,12 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, commitAsync)
476458
Close connection */
477459
ZEND_METHOD(SimpleKafkaClient_Consumer, close)
478460
{
479-
object_intern *intern;
461+
kafka_object *intern;
480462

481463
ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 0, 0)
482464
ZEND_PARSE_PARAMETERS_END();
483465

484-
intern = get_object(getThis());
466+
intern = get_kafka_object(getThis());
485467
if (!intern) {
486468
return;
487469
}
@@ -499,7 +481,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getMetadata)
499481
zval *only_zrkt = NULL;
500482
zend_long timeout_ms;
501483
rd_kafka_resp_err_t err;
502-
object_intern *intern;
484+
kafka_object *intern;
503485
const rd_kafka_metadata_t *metadata;
504486
kafka_topic_object *only_orkt = NULL;
505487

@@ -510,7 +492,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getMetadata)
510492
Z_PARAM_OBJECT_OF_CLASS(only_zrkt, ce_kafka_topic)
511493
ZEND_PARSE_PARAMETERS_END();
512494

513-
intern = get_object(getThis());
495+
intern = get_kafka_object(getThis());
514496
if (!intern) {
515497
return;
516498
}
@@ -540,14 +522,14 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getTopicHandle)
540522
char *topic;
541523
size_t topic_len;
542524
rd_kafka_topic_t *rkt;
543-
object_intern *intern;
525+
kafka_object *intern;
544526
kafka_topic_object *topic_intern;
545527

546528
ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 1)
547529
Z_PARAM_STRING(topic, topic_len)
548530
ZEND_PARSE_PARAMETERS_END();
549531

550-
intern = get_object(getThis());
532+
intern = get_kafka_object(getThis());
551533
if (!intern) {
552534
return;
553535
}
@@ -577,7 +559,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getCommittedOffsets)
577559
{
578560
HashTable *htopars = NULL;
579561
zend_long timeout_ms;
580-
object_intern *intern;
562+
kafka_object *intern;
581563
rd_kafka_resp_err_t err;
582564
rd_kafka_topic_partition_list_t *topics;
583565

@@ -586,7 +568,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getCommittedOffsets)
586568
Z_PARAM_LONG(timeout_ms)
587569
ZEND_PARSE_PARAMETERS_END();
588570

589-
intern = get_object(getThis());
571+
intern = get_kafka_object(getThis());
590572
if (!intern) {
591573
return;
592574
}
@@ -615,15 +597,15 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getCommittedOffsets)
615597
ZEND_METHOD(SimpleKafkaClient_Consumer, getOffsetPositions)
616598
{
617599
HashTable *htopars = NULL;
618-
object_intern *intern;
600+
kafka_object *intern;
619601
rd_kafka_resp_err_t err;
620602
rd_kafka_topic_partition_list_t *topics;
621603

622604
ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 1, 1)
623605
Z_PARAM_ARRAY_HT(htopars)
624606
ZEND_PARSE_PARAMETERS_END();
625607

626-
intern = get_object(getThis());
608+
intern = get_kafka_object(getThis());
627609
if (!intern) {
628610
return;
629611
}
@@ -650,7 +632,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, getOffsetPositions)
650632
ZEND_METHOD(SimpleKafkaClient_Consumer, offsetsForTimes)
651633
{
652634
HashTable *htopars = NULL;
653-
object_intern *intern;
635+
kafka_object *intern;
654636
rd_kafka_topic_partition_list_t *topicPartitions;
655637
zend_long timeout_ms;
656638
rd_kafka_resp_err_t err;
@@ -660,7 +642,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, offsetsForTimes)
660642
Z_PARAM_LONG(timeout_ms)
661643
ZEND_PARSE_PARAMETERS_END();
662644

663-
intern = get_object(getThis());
645+
intern = get_kafka_object(getThis());
664646
if (!intern) {
665647
return;
666648
}
@@ -686,7 +668,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, offsetsForTimes)
686668
Query broker for low (oldest/beginning) or high (newest/end) offsets for partition */
687669
ZEND_METHOD(SimpleKafkaClient_Consumer, queryWatermarkOffsets)
688670
{
689-
object_intern *intern;
671+
kafka_object *intern;
690672
char *topic;
691673
size_t topic_length;
692674
long low, high;
@@ -705,7 +687,7 @@ ZEND_METHOD(SimpleKafkaClient_Consumer, queryWatermarkOffsets)
705687
ZVAL_DEREF(lowResult);
706688
ZVAL_DEREF(highResult);
707689

708-
intern = get_object(getThis());
690+
intern = get_kafka_object(getThis());
709691
if (!intern) {
710692
return;
711693
}
@@ -732,5 +714,5 @@ void kafka_consumer_init(INIT_FUNC_ARGS) /* {{{ */
732714

733715
handlers = kafka_default_object_handlers;
734716
handlers.free_obj = kafka_consumer_free;
735-
handlers.offset = XtOffsetOf(object_intern, std);
717+
handlers.offset = XtOffsetOf(kafka_object, std);
736718
}

tests/produce_consume_transactional.phpt

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ $topicName = sprintf("test_kafka_%s", uniqid());
3737

3838
$topic = $producer->getTopicHandle($topicName);
3939

40-
if (!$producer->getMetadata(false, 2*1000, $topic)) {
40+
if (!$producer->getMetadata(false, 5*1000, $topic)) {
4141
echo "Failed to get metadata, is broker down?\n";
4242
}
4343

0 commit comments

Comments
 (0)