From 4fa5063c6cad7b0846342c6b44c7f20a90a19f4c Mon Sep 17 00:00:00 2001 From: Arnaud Le Blanc Date: Tue, 18 Jan 2022 17:05:22 +0100 Subject: [PATCH] Always initialize Message::$headers (#509) Fixes #508 --- message.c | 8 ++-- package.xml | 1 + tests/bug508.phpt | 118 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 123 insertions(+), 4 deletions(-) create mode 100644 tests/bug508.phpt diff --git a/message.c b/message.c index d42fedff..ba794382 100644 --- a/message.c +++ b/message.c @@ -46,13 +46,13 @@ void kafka_message_new(zval *return_value, const rd_kafka_message_t *message, ze timestamp = rd_kafka_message_timestamp(message, &tstype); + zval headers_array; #ifdef HAVE_RD_KAFKA_MESSAGE_HEADERS rd_kafka_headers_t *message_headers = NULL; rd_kafka_resp_err_t header_response; const char *header_name = NULL; const void *header_value = NULL; size_t header_size = 0; - zval headers_array; size_t i; #endif /* HAVE_RD_KAFKA_MESSAGE_HEADERS */ @@ -72,11 +72,11 @@ void kafka_message_new(zval *return_value, const rd_kafka_message_t *message, ze } zend_update_property_long(NULL, Z_RDKAFKA_PROP_OBJ(return_value), ZEND_STRL("offset"), message->offset); + array_init(&headers_array); #ifdef HAVE_RD_KAFKA_MESSAGE_HEADERS if (message->err == RD_KAFKA_RESP_ERR_NO_ERROR) { rd_kafka_message_headers(message, &message_headers); if (message_headers != NULL) { - array_init(&headers_array); for (i = 0; i < rd_kafka_header_cnt(message_headers); i++) { header_response = rd_kafka_header_get_all(message_headers, i, &header_name, &header_value, &header_size); if (header_response != RD_KAFKA_RESP_ERR_NO_ERROR) { @@ -84,11 +84,11 @@ void kafka_message_new(zval *return_value, const rd_kafka_message_t *message, ze } add_assoc_stringl(&headers_array, header_name, (const char*)header_value, header_size); } - zend_update_property(NULL, Z_RDKAFKA_PROP_OBJ(return_value), ZEND_STRL("headers"), &headers_array); - zval_ptr_dtor(&headers_array); } } #endif + zend_update_property(NULL, Z_RDKAFKA_PROP_OBJ(return_value), ZEND_STRL("headers"), &headers_array); + zval_ptr_dtor(&headers_array); if (msg_opaque != NULL) { zend_update_property_str(NULL, Z_RDKAFKA_PROP_OBJ(return_value), ZEND_STRL("opaque"), msg_opaque); diff --git a/package.xml b/package.xml index a1a30b49..a3666176 100644 --- a/package.xml +++ b/package.xml @@ -122,6 +122,7 @@ + diff --git a/tests/bug508.phpt b/tests/bug508.phpt new file mode 100644 index 00000000..c36d9943 --- /dev/null +++ b/tests/bug508.phpt @@ -0,0 +1,118 @@ +--TEST-- +Bug 508 +--SKIPIF-- +set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS')); +$conf->setDrMsgCb(function ($producer, $msg) use (&$delivered) { + if ($msg->err) { + throw new Exception("Message delivery failed: " . $msg->errstr()); + } + $delivered++; +}); + +$producer = new RdKafka\Producer($conf); +$topic = $producer->newTopic($topicName); + +if (!$producer->getMetadata(false, $topic, 10*1000)) { + echo "Failed to get metadata, is broker down?\n"; +} + +$topic->produce(0, 0, "message"); + +while ($producer->getOutQLen()) { + $producer->poll(50); +} + +printf("%d messages delivered\n", $delivered); + +$conf = new RdKafka\Conf(); +$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS')); +$conf->set('enable.partition.eof', 'true'); + +$consumer = new RdKafka\Consumer($conf); +$topic = $consumer->newTopic($topicName); +$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING); + +while (true) { + $msg = $topic->consume(0, 1000); + if (!$msg) { + continue; + } + // All props are initialized and readable in all cases + var_dump([ + 'err' => $msg->err, + 'topic_name' => $msg->topic_name, + 'timestamp' => $msg->timestamp, + 'partition' => $msg->partition, + 'payload' => $msg->payload, + 'len' => $msg->len, + 'key' => $msg->key, + 'offset' => $msg->offset, + 'headers' => $msg->headers, + 'opaque' => $msg->opaque, + ]); + echo "--------------\n"; + if ($msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) { + echo "EOF\n"; + break; + } +} +--EXPECTF-- +1 messages delivered +array(10) { + ["err"]=> + int(0) + ["topic_name"]=> + string(%d) "test_rdkafka_%s" + ["timestamp"]=> + int(%d) + ["partition"]=> + int(0) + ["payload"]=> + string(7) "message" + ["len"]=> + int(7) + ["key"]=> + NULL + ["offset"]=> + int(0) + ["headers"]=> + array(0) { + } + ["opaque"]=> + NULL +} +-------------- +array(10) { + ["err"]=> + int(-%d) + ["topic_name"]=> + string(%d) "test_rdkafka_%s" + ["timestamp"]=> + int(-1) + ["partition"]=> + int(0) + ["payload"]=> + string(%d) "%s" + ["len"]=> + int(%d) + ["key"]=> + NULL + ["offset"]=> + int(1) + ["headers"]=> + array(0) { + } + ["opaque"]=> + NULL +} +-------------- +EOF