Skip to content

Commit 7d7f6ad

Browse files
authored
Merge pull request #228 from lkm/timestamp
Support sending timestamp (epoch ms) to producev
2 parents 39e9ae2 + cf37d76 commit 7d7f6ad

File tree

1 file changed

+11
-3
lines changed

1 file changed

+11
-3
lines changed

topic.c

+11-3
Original file line numberDiff line numberDiff line change
@@ -402,15 +402,16 @@ PHP_METHOD(RdKafka__ProducerTopic, produce)
402402
/* }}} */
403403

404404
#ifdef HAVE_RD_KAFKA_MESSAGE_HEADERS
405-
/* {{{ proto void RdKafka\ProducerTopic::producev(int $partition, int $msgflags[, string $payload, string $key, array $headers])
406-
Produce and send a single message to broker (with headers possibility). */
405+
/* {{{ proto void RdKafka\ProducerTopic::producev(int $partition, int $msgflags[, string $payload, string $key, array $headers, int $timestamp_ms])
406+
Produce and send a single message to broker (with headers possibility and timestamp). */
407407

408408
ZEND_BEGIN_ARG_INFO_EX(arginfo_kafka_producev, 0, 0, 2)
409409
ZEND_ARG_INFO(0, partition)
410410
ZEND_ARG_INFO(0, msgflags)
411411
ZEND_ARG_INFO(0, payload)
412412
ZEND_ARG_INFO(0, key)
413413
ZEND_ARG_INFO(0, headers)
414+
ZEND_ARG_INFO(0, timestamp_ms)
414415
ZEND_END_ARG_INFO()
415416

416417
PHP_METHOD(RdKafka__ProducerTopic, producev)
@@ -429,8 +430,10 @@ PHP_METHOD(RdKafka__ProducerTopic, producev)
429430
char *header_key;
430431
zeval *header_value;
431432
rd_kafka_headers_t *headers;
433+
long timestamp_ms = 0;
434+
zend_bool timestamp_ms_is_null = 0;
432435

433-
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ll|s!s!h!", &partition, &msgflags, &payload, &payload_len, &key, &key_len, &headersParam) == FAILURE) {
436+
if (zend_parse_parameters(ZEND_NUM_ARGS() TSRMLS_CC, "ll|s!s!h!l!", &partition, &msgflags, &payload, &payload_len, &key, &key_len, &headersParam, &timestamp_ms, &timestamp_ms_is_null) == FAILURE) {
434437
return;
435438
}
436439

@@ -444,6 +447,10 @@ PHP_METHOD(RdKafka__ProducerTopic, producev)
444447
return;
445448
}
446449

450+
if (timestamp_ms_is_null == 1) {
451+
timestamp_ms = 0;
452+
}
453+
447454
intern = get_kafka_topic_object(getThis() TSRMLS_CC);
448455

449456
if (headersParam != NULL && zend_hash_num_elements(headersParam) > 0) {
@@ -477,6 +484,7 @@ PHP_METHOD(RdKafka__ProducerTopic, producev)
477484
RD_KAFKA_V_MSGFLAGS(msgflags | RD_KAFKA_MSG_F_COPY),
478485
RD_KAFKA_V_VALUE(payload, payload_len),
479486
RD_KAFKA_V_KEY(key, key_len),
487+
RD_KAFKA_V_TIMESTAMP(timestamp_ms),
480488
RD_KAFKA_V_HEADERS(headers),
481489
RD_KAFKA_V_END
482490
);

0 commit comments

Comments
 (0)