diff --git a/configuration.c b/configuration.c index ca442cb..530ac55 100644 --- a/configuration.c +++ b/configuration.c @@ -158,29 +158,30 @@ static void kafka_conf_error_cb(rd_kafka_t *rk, int err, const char *reason, voi zval_ptr_dtor(&args[2]); } -static void kafka_conf_dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *msg, void *opaque) +void kafka_conf_dr_msg_cb(rd_kafka_t *rk, const rd_kafka_message_t *msg, void *opaque) { kafka_conf_callbacks *cbs = (kafka_conf_callbacks*) opaque; - zval args[2]; - - if (!opaque) { - return; - } + zval args[3]; - if (!cbs->dr_msg) { + if (!opaque || !cbs->dr_msg) { return; } ZVAL_NULL(&args[0]); ZVAL_NULL(&args[1]); + ZVAL_NULL(&args[2]); ZVAL_ZVAL(&args[0], &cbs->zrk, 1, 0); kafka_message_new(&args[1], msg); + if (NULL != msg->_private) { + ZVAL_ZVAL(&args[2], msg->_private, 1, 0); + } - kafka_call_function(&cbs->dr_msg->fci, &cbs->dr_msg->fcc, NULL, 2, args); + kafka_call_function(&cbs->dr_msg->fci, &cbs->dr_msg->fcc, NULL, 3, args); zval_ptr_dtor(&args[0]); zval_ptr_dtor(&args[1]); + zval_ptr_dtor(&args[2]); } static int kafka_conf_stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) diff --git a/php_simple_kafka_client_int.h b/php_simple_kafka_client_int.h index c68e314..4ce24de 100644 --- a/php_simple_kafka_client_int.h +++ b/php_simple_kafka_client_int.h @@ -92,13 +92,16 @@ typedef void (*kafka_metadata_collection_ctor_t)(zval *renurn_value, zval *zmeta #else // PHP 7 +#define IS_MIXED 16 + #define Z_KAFKA_OBJ zval #define Z_KAFKA_PROP_OBJ(object) object #define kafka_get_debug_object(type, object) get_object(object) -#define ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(pass_by_ref, name, type_hint, allow_null, default_value) ZEND_ARG_INFO(pass_by_ref, name) +#define ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(pass_by_ref, name, type_hint, allow_null, default_value) \ + ZEND_ARG_INFO(pass_by_ref, name) #define Z_PARAM_ARRAY_HT_OR_NULL(dest) \ Z_PARAM_ARRAY_HT_EX(dest, 1, 0) @@ -112,6 +115,9 @@ typedef void (*kafka_metadata_collection_ctor_t)(zval *renurn_value, zval *zmeta #define Z_PARAM_STRING_OR_NULL(dest, dest_len) \ Z_PARAM_STRING_EX(dest, dest_len, 1, 0) +#define Z_PARAM_ZVAL_OR_NULL(dest) \ + Z_PARAM_ZVAL_EX(dest, 1, 0) + #endif #ifdef PHP_WIN32 diff --git a/tests/produce_with_opaque.phpt b/tests/produce_with_opaque.phpt new file mode 100644 index 0000000..fffa753 --- /dev/null +++ b/tests/produce_with_opaque.phpt @@ -0,0 +1,61 @@ +--TEST-- +Produce, consume +--SKIPIF-- +set('metadata.broker.list', getenv('TEST_KAFKA_BROKERS')); + +$conf->setDrMsgCb(function (SimpleKafkaClient\Producer $kafka, SimpleKafkaClient\Message $message, $opaque) { + if (RD_KAFKA_RESP_ERR_NO_ERROR !== $message->err) { + $errorStr = rd_kafka_err2str($message->err); + + echo sprintf('Message FAILED (%s, %s) to send with payload => %s', $message->err, $errorStr, $message->payload) . PHP_EOL; + } else { + if (false === is_string($opaque)) { + $opaque = 'opaque was already freed'; + } + + echo sprintf('Message opaque: %s', $opaque) . PHP_EOL; + } +}); + +$producer = new SimpleKafkaClient\Producer($conf); +$topic = $producer->getTopicHandle('pure-php-test-topic'); +$amountTestMessages = 10; + +for ($i = 0; $i < $amountTestMessages; ++$i) { + $topic->producev( + RD_KAFKA_PARTITION_UA, + RD_KAFKA_MSG_F_BLOCK, // will block produce if queue is full + sprintf('test message-%d',$i), + sprintf('test-key-%d', $i), + [ + 'some' => sprintf('header value %d', $i) + ], + null, + "opaque $i" + ); + + $producer->poll(0); +} + +$result = $producer->flush(20000); +if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) { + echo 'Was not able to shutdown within 20s. Messages might be lost!' . PHP_EOL; +} +--EXPECT-- +Message key test-key-0 and opaque: opaque 0 +Message key test-key-1 and opaque: opaque 1 +Message key test-key-2 and opaque: opaque 2 +Message key test-key-3 and opaque: opaque 3 +Message key test-key-4 and opaque: opaque 4 +Message key test-key-5 and opaque: opaque 5 +Message key test-key-6 and opaque: opaque 6 +Message key test-key-7 and opaque: opaque 7 +Message key test-key-8 and opaque: opaque 8 +Message key test-key-9 and opaque: opaque 9 diff --git a/topic.c b/topic.c index 3f213af..99042e6 100644 --- a/topic.c +++ b/topic.c @@ -113,13 +113,15 @@ ZEND_METHOD(SimpleKafkaClient_ProducerTopic, produce) int ret; rd_kafka_resp_err_t err; kafka_topic_object *intern; + zval *opaque = NULL; - ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 2, 4) + ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 2, 5) Z_PARAM_LONG(partition) Z_PARAM_LONG(msgflags) Z_PARAM_OPTIONAL Z_PARAM_STRING_OR_NULL(payload, payload_len) Z_PARAM_STRING_OR_NULL(key, key_len) + Z_PARAM_ZVAL_OR_NULL(opaque) ZEND_PARSE_PARAMETERS_END(); if (partition != RD_KAFKA_PARTITION_UA && (partition < 0 || partition > 0x7FFFFFFF)) { @@ -132,9 +134,13 @@ ZEND_METHOD(SimpleKafkaClient_ProducerTopic, produce) return; } + if (NULL != opaque) { + Z_ADDREF_P(opaque); + } + intern = get_kafka_topic_object(getThis()); - ret = rd_kafka_produce(intern->rkt, partition, msgflags | RD_KAFKA_MSG_F_COPY, payload, payload_len, key, key_len, NULL); + ret = rd_kafka_produce(intern->rkt, partition, msgflags | RD_KAFKA_MSG_F_COPY, payload, payload_len, key, key_len, opaque); if (ret == -1) { err = rd_kafka_last_error(); @@ -160,12 +166,12 @@ ZEND_METHOD(SimpleKafkaClient_ProducerTopic, producev) HashTable *headersParam = NULL; HashPosition headersParamPos; char *header_key; - zval *header_value; + zval *header_value, *opaque = NULL; rd_kafka_headers_t *headers; zend_long timestamp_ms = 0; zend_bool timestamp_ms_is_null = 0; - ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 2, 6) + ZEND_PARSE_PARAMETERS_START_EX(ZEND_PARSE_PARAMS_THROW, 2, 7) Z_PARAM_LONG(partition) Z_PARAM_LONG(msgflags) Z_PARAM_OPTIONAL @@ -173,6 +179,7 @@ ZEND_METHOD(SimpleKafkaClient_ProducerTopic, producev) Z_PARAM_STRING_OR_NULL(key, key_len) Z_PARAM_ARRAY_HT_OR_NULL(headersParam) Z_PARAM_LONG_OR_NULL(timestamp_ms, timestamp_ms_is_null) + Z_PARAM_ZVAL_OR_NULL(opaque) ZEND_PARSE_PARAMETERS_END(); if (partition != RD_KAFKA_PARTITION_UA && (partition < 0 || partition > 0x7FFFFFFF)) { @@ -185,6 +192,10 @@ ZEND_METHOD(SimpleKafkaClient_ProducerTopic, producev) return; } + if (NULL != opaque) { + Z_ADDREF_P(opaque); + } + if (timestamp_ms_is_null == 1) { timestamp_ms = 0; } @@ -224,6 +235,7 @@ ZEND_METHOD(SimpleKafkaClient_ProducerTopic, producev) RD_KAFKA_V_KEY(key, key_len), RD_KAFKA_V_TIMESTAMP(timestamp_ms), RD_KAFKA_V_HEADERS(headers), + RD_KAFKA_V_OPAQUE(opaque), RD_KAFKA_V_END ); diff --git a/topic.stub.php b/topic.stub.php index 789d63f..038fa1e 100644 --- a/topic.stub.php +++ b/topic.stub.php @@ -18,7 +18,7 @@ class ProducerTopic extends Topic { private function __construct() {} - public function produce(int $partition, int $msgFlags, ?string $payload = null, ?string $key = null): void {} + public function produce(int $partition, int $msgFlags, ?string $payload = null, ?string $key = null, mixed $opaque = null): void {} - public function producev(int $partition, int $msgFlags, ?string $payload = null, ?string $key = null, ?array $headers = null, ?int $timestampMs = null): void {} + public function producev(int $partition, int $msgFlags, ?string $payload = null, ?string $key = null, ?array $headers = null, ?int $timestampMs = null, mixed $opaque = null): void {} } diff --git a/topic_arginfo.h b/topic_arginfo.h index 24690f2..25a2bf9 100644 --- a/topic_arginfo.h +++ b/topic_arginfo.h @@ -1,5 +1,5 @@ /* This is a generated file, edit the .stub.php file instead. - * Stub hash: 679e8a50ee764eb12f6a43ecaa2bf396c74235dc */ + * Stub hash: 88d2ac53ad8266413f1ab448883a9b2b439120ef */ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Topic_getName, 0, 0, IS_STRING, 0) ZEND_END_ARG_INFO() @@ -14,6 +14,7 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Producer ZEND_ARG_TYPE_INFO(0, msgFlags, IS_LONG, 0) ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, payload, IS_STRING, 1, "null") ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, key, IS_STRING, 1, "null") + ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, opaque, IS_MIXED, 0, "null") ZEND_END_ARG_INFO() ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_ProducerTopic_producev, 0, 2, IS_VOID, 0) @@ -23,6 +24,7 @@ ZEND_BEGIN_ARG_WITH_RETURN_TYPE_INFO_EX(arginfo_class_SimpleKafkaClient_Producer ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, key, IS_STRING, 1, "null") ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, headers, IS_ARRAY, 1, "null") ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, timestampMs, IS_LONG, 1, "null") + ZEND_ARG_TYPE_INFO_WITH_DEFAULT_VALUE(0, opaque, IS_MIXED, 0, "null") ZEND_END_ARG_INFO()