Skip to content

Commit

Permalink
Always initialize Message::$headers (#509)
Browse files Browse the repository at this point in the history
Fixes #508
  • Loading branch information
arnaud-lb authored Jan 18, 2022
1 parent acec7ea commit 4fa5063
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 4 deletions.
8 changes: 4 additions & 4 deletions message.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ void kafka_message_new(zval *return_value, const rd_kafka_message_t *message, ze

timestamp = rd_kafka_message_timestamp(message, &tstype);

zval headers_array;
#ifdef HAVE_RD_KAFKA_MESSAGE_HEADERS
rd_kafka_headers_t *message_headers = NULL;
rd_kafka_resp_err_t header_response;
const char *header_name = NULL;
const void *header_value = NULL;
size_t header_size = 0;
zval headers_array;
size_t i;
#endif /* HAVE_RD_KAFKA_MESSAGE_HEADERS */

Expand All @@ -72,23 +72,23 @@ void kafka_message_new(zval *return_value, const rd_kafka_message_t *message, ze
}
zend_update_property_long(NULL, Z_RDKAFKA_PROP_OBJ(return_value), ZEND_STRL("offset"), message->offset);

array_init(&headers_array);
#ifdef HAVE_RD_KAFKA_MESSAGE_HEADERS
if (message->err == RD_KAFKA_RESP_ERR_NO_ERROR) {
rd_kafka_message_headers(message, &message_headers);
if (message_headers != NULL) {
array_init(&headers_array);
for (i = 0; i < rd_kafka_header_cnt(message_headers); i++) {
header_response = rd_kafka_header_get_all(message_headers, i, &header_name, &header_value, &header_size);
if (header_response != RD_KAFKA_RESP_ERR_NO_ERROR) {
break;
}
add_assoc_stringl(&headers_array, header_name, (const char*)header_value, header_size);
}
zend_update_property(NULL, Z_RDKAFKA_PROP_OBJ(return_value), ZEND_STRL("headers"), &headers_array);
zval_ptr_dtor(&headers_array);
}
}
#endif
zend_update_property(NULL, Z_RDKAFKA_PROP_OBJ(return_value), ZEND_STRL("headers"), &headers_array);
zval_ptr_dtor(&headers_array);

if (msg_opaque != NULL) {
zend_update_property_str(NULL, Z_RDKAFKA_PROP_OBJ(return_value), ZEND_STRL("opaque"), msg_opaque);
Expand Down
1 change: 1 addition & 0 deletions package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
<file role="test" name="bug115.phpt"/>
<file role="test" name="bug330.phpt"/>
<file role="test" name="bug465.phpt"/>
<file role="test" name="bug508.phpt"/>
<file role="test" name="bug74.phpt"/>
<file role="test" name="bug88.phpt"/>
<file role="test" name="bugConfSetArgument.phpt"/>
Expand Down
118 changes: 118 additions & 0 deletions tests/bug508.phpt
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
--TEST--
Bug 508
--SKIPIF--
<?php
require __DIR__ . '/integration-tests-check.php';
--FILE--
<?php
require __DIR__ . '/integration-tests-check.php';

$topicName = sprintf("test_rdkafka_%s", uniqid());

$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS'));
$conf->setDrMsgCb(function ($producer, $msg) use (&$delivered) {
if ($msg->err) {
throw new Exception("Message delivery failed: " . $msg->errstr());
}
$delivered++;
});

$producer = new RdKafka\Producer($conf);
$topic = $producer->newTopic($topicName);

if (!$producer->getMetadata(false, $topic, 10*1000)) {
echo "Failed to get metadata, is broker down?\n";
}

$topic->produce(0, 0, "message");

while ($producer->getOutQLen()) {
$producer->poll(50);
}

printf("%d messages delivered\n", $delivered);

$conf = new RdKafka\Conf();
$conf->set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS'));
$conf->set('enable.partition.eof', 'true');

$consumer = new RdKafka\Consumer($conf);
$topic = $consumer->newTopic($topicName);
$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);

while (true) {
$msg = $topic->consume(0, 1000);
if (!$msg) {
continue;
}
// All props are initialized and readable in all cases
var_dump([
'err' => $msg->err,
'topic_name' => $msg->topic_name,
'timestamp' => $msg->timestamp,
'partition' => $msg->partition,
'payload' => $msg->payload,
'len' => $msg->len,
'key' => $msg->key,
'offset' => $msg->offset,
'headers' => $msg->headers,
'opaque' => $msg->opaque,
]);
echo "--------------\n";
if ($msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
echo "EOF\n";
break;
}
}
--EXPECTF--
1 messages delivered
array(10) {
["err"]=>
int(0)
["topic_name"]=>
string(%d) "test_rdkafka_%s"
["timestamp"]=>
int(%d)
["partition"]=>
int(0)
["payload"]=>
string(7) "message"
["len"]=>
int(7)
["key"]=>
NULL
["offset"]=>
int(0)
["headers"]=>
array(0) {
}
["opaque"]=>
NULL
}
--------------
array(10) {
["err"]=>
int(-%d)
["topic_name"]=>
string(%d) "test_rdkafka_%s"
["timestamp"]=>
int(-1)
["partition"]=>
int(0)
["payload"]=>
string(%d) "%s"
["len"]=>
int(%d)
["key"]=>
NULL
["offset"]=>
int(1)
["headers"]=>
array(0) {
}
["opaque"]=>
NULL
}
--------------
EOF

0 comments on commit 4fa5063

Please sign in to comment.