Skip to content

Commit 608b459

Browse files
committed
add method to check rebalance protocol
1 parent 97bd101 commit 608b459

9 files changed

+47
-3
lines changed

kafka/consumer.c

+14
Original file line numberDiff line numberDiff line change
@@ -821,3 +821,17 @@ int
821821
lua_consumer_resume(struct lua_State *L) {
822822
return lua_consumer_call_pause_resume(L, kafka_resume);
823823
}
824+
825+
int
826+
lua_consumer_rebalance_protocol(struct lua_State *L) {
827+
consumer_t **consumer_p = luaL_checkudata(L, 1, consumer_label);
828+
if (consumer_p == NULL || *consumer_p == NULL)
829+
return 0;
830+
831+
if ((*consumer_p)->rd_consumer != NULL) {
832+
const char *proto = rd_kafka_rebalance_protocol((*consumer_p)->rd_consumer);
833+
lua_pushstring(L, proto);
834+
return 1;
835+
}
836+
return 0;
837+
}

kafka/consumer.h

+3
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,7 @@ lua_consumer_pause(struct lua_State *L);
5959
int
6060
lua_consumer_resume(struct lua_State *L);
6161

62+
int
63+
lua_consumer_rebalance_protocol(struct lua_State *L);
64+
6265
#endif //TNT_KAFKA_CONSUMER_H

kafka/init.lua

+4
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,10 @@ function Consumer:resume()
219219
return self._consumer:resume()
220220
end
221221

222+
function Consumer:rebalance_protocol()
223+
return self._consumer:rebalance_protocol()
224+
end
225+
222226
function Consumer:seek_partitions(topic_partitions_list, options)
223227
local timeout_ms = get_timeout_from_options(options)
224228
return self._consumer:seek_partitions(topic_partitions_list, timeout_ms)

kafka/producer.c

+1-1
Original file line numberDiff line numberDiff line change
@@ -631,7 +631,7 @@ lua_producer_metadata(struct lua_State *L) {
631631

632632
int
633633
lua_producer_list_groups(struct lua_State *L) {
634-
producer_t **producer_p = (producer_t **)luaL_checkudata(L, 1, producer_label);
634+
producer_t **producer_p = luaL_checkudata(L, 1, producer_label);
635635
if (producer_p == NULL || *producer_p == NULL)
636636
return 0;
637637

kafka/tnt_kafka.c

+1
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ luaopen_kafka_tntkafka(lua_State *L) {
3333
{"resume", lua_consumer_resume},
3434
{"close", lua_consumer_close},
3535
{"destroy", lua_consumer_destroy},
36+
{"rebalance_protocol", lua_consumer_rebalance_protocol},
3637
{"__tostring", lua_consumer_tostring},
3738
{NULL, NULL}
3839
};

tests/consumer.lua

+9-1
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ local function list_groups(timeout_ms)
172172
log.info("Groups: %s", json.encode(res))
173173
-- Some fields can have binary data that won't
174174
-- be correctly processed by connector.
175-
for _, group in ipairs(res) do
175+
for _, group in ipairs(res) do
176176
group['members'] = nil
177177
end
178178
return res
@@ -204,6 +204,9 @@ local function test_seek_partitions()
204204

205205
for _ = 1, 5 do
206206
local msg = out:get(3)
207+
if msg == nil then
208+
error('Message is not delivered')
209+
end
207210
log.info('Get message: %s', json.encode(msg_totable(msg)))
208211
append_message(messages, msg)
209212
consumer:seek_partitions({
@@ -214,6 +217,10 @@ local function test_seek_partitions()
214217
return messages
215218
end
216219

220+
local function rebalance_protocol()
221+
return consumer:rebalance_protocol()
222+
end
223+
217224
local function test_create_errors()
218225
log.info('Create without config')
219226
local _, err = tnt_kafka.Consumer.create()
@@ -263,6 +270,7 @@ return {
263270
list_groups = list_groups,
264271
pause = pause,
265272
resume = resume,
273+
rebalance_protocol = rebalance_protocol,
266274

267275
test_seek_partitions = test_seek_partitions,
268276
test_create_errors = test_create_errors,

tests/producer.lua

+1-1
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ local function list_groups(timeout_ms)
9797
log.info("Groups: %s", json.encode(res))
9898
-- Some fields can have binary data that won't
9999
-- be correctly processed by connector.
100-
for _, group in ipairs(res) do
100+
for _, group in ipairs(res) do
101101
group['members'] = nil
102102
end
103103
return res

tests/test_consumer.py

+13
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,19 @@ def test_consumer_should_log_rebalances():
379379
assert len(response.data[0]) > 0
380380

381381

382+
def test_consumer_rebalance_protocol():
383+
server = get_server()
384+
385+
with create_consumer(server, KAFKA_HOST, {"bootstrap.servers": KAFKA_HOST}):
386+
time.sleep(5)
387+
response = server.call("consumer.rebalance_protocol", [])
388+
assert response[0] == 'NONE'
389+
390+
server.call("consumer.subscribe", [["test_unsub_partially_1"]])
391+
response = server.call("consumer.rebalance_protocol", [])
392+
assert response[0] == 'NONE'
393+
394+
382395
def test_consumer_should_continue_consuming_from_last_committed_offset():
383396
message1 = {
384397
"key": "test1",

tests/test_producer.py

+1
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ def test_producer_should_log_debug():
177177

178178
server.call("producer.close", [])
179179

180+
180181
def test_producer_create_errors():
181182
server = get_server()
182183
server.call("producer.test_create_errors")

0 commit comments

Comments
 (0)