forked from RepentantGopher/tnt-kafka
-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathconsumer.lua
277 lines (239 loc) · 7.66 KB
/
consumer.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
local box = require("box")
local json = require("json")
local log = require("log")
local fiber = require('fiber')
local tnt_kafka = require('kafka')
local consumer = nil
local errors = {}
local logs = {}
local stats = {}
local rebalances = {}
local function create(brokers, additional_opts)
local err
errors = {}
logs = {}
stats = {}
rebalances = {}
local error_callback = function(err)
log.error("got error: %s", err)
table.insert(errors, err)
end
local log_callback = function(fac, str, level)
log.info("got log: %d - %s - %s", level, fac, str)
table.insert(logs, string.format("got log: %d - %s - %s", level, fac, str))
end
local stats_callback = function(json_stats)
log.info("got stats")
table.insert(stats, json_stats)
end
local rebalance_callback = function(msg)
log.info("got rebalance msg: %s", json.encode(msg))
table.insert(rebalances, msg)
end
local options = {
["enable.auto.offset.store"] = "false",
["group.id"] = "test_consumer",
["auto.offset.reset"] = "earliest",
["enable.partition.eof"] = "false",
["log_level"] = "7",
["statistics.interval.ms"] = "1000",
}
if additional_opts ~= nil then
for key, value in pairs(additional_opts) do
if value == nil then
options[key] = nil
else
options[key] = value
end
end
end
consumer, err = tnt_kafka.Consumer.create({
brokers = brokers,
options = options,
error_callback = error_callback,
log_callback = log_callback,
stats_callback = stats_callback,
rebalance_callback = rebalance_callback,
default_topic_options = {
["auto.offset.reset"] = "earliest",
},
})
if err ~= nil then
log.error("got err %s", err)
box.error{code = 500, reason = err}
end
log.info("consumer created")
end
local function subscribe(topics)
log.info("consumer subscribing")
log.info(topics)
local err = consumer:subscribe(topics)
if err ~= nil then
log.error("got err %s", err)
box.error{code = 500, reason = err}
end
log.info("consumer subscribed")
end
local function unsubscribe(topics)
log.info("consumer unsubscribing")
log.info(topics)
local err = consumer:unsubscribe(topics)
if err ~= nil then
log.error("got err %s", err)
box.error{code = 500, reason = err}
end
log.info("consumer unsubscribed")
end
local function msg_totable(msg)
return {
value = msg:value(),
key = msg:key(),
topic = msg:topic(),
partition = msg:partition(),
offset = msg:offset(),
headers = msg:headers(),
}
end
local function append_message(t, msg)
table.insert(t, msg_totable(msg))
end
local function consume(timeout)
log.info("consume called")
local consumed = {}
local f = fiber.create(function()
local out = consumer:output()
while true do
if out:is_closed() then
break
end
local msg = out:get()
if msg ~= nil then
log.info("%s", msg)
log.info("got msg with topic='%s' partition='%d' offset='%d' key='%s' value='%s'", msg:topic(), msg:partition(), msg:offset(), msg:key(), msg:value())
append_message(consumed, msg)
local err = consumer:store_offset(msg)
if err ~= nil then
log.error("got error '%s' while committing msg from topic '%s'", err, msg:topic())
end
else
fiber.sleep(0.2)
end
end
end)
log.info("consume wait")
fiber.sleep(timeout)
log.info("consume ends")
f:cancel()
return consumed
end
local function get_errors()
return errors
end
local function get_logs()
return logs
end
local function get_stats()
return stats
end
local function get_rebalances()
return rebalances
end
local function dump_conf()
return consumer:dump_conf()
end
local function metadata(timeout_ms)
return consumer:metadata({timeout_ms = timeout_ms})
end
local function list_groups(timeout_ms)
local res, err = consumer:list_groups({timeout_ms = timeout_ms})
if err ~= nil then
return nil, err
end
log.info("Groups: %s", json.encode(res))
-- Some fields can have binary data that won't
-- be correctly processed by connector.
for _, group in ipairs(res) do
group['members'] = nil
end
return res
end
local function pause()
return consumer:pause()
end
local function resume()
return consumer:resume()
end
local function close()
log.info("closing consumer")
local _, err = consumer:close()
if err ~= nil then
log.error("got err %s", err)
box.error{code = 500, reason = err}
end
log.info("consumer closed")
end
local function test_seek_partitions()
log.info('Test seek')
local messages = {}
local out = consumer:output()
for _ = 1, 5 do
local msg = out:get(3)
if msg == nil then
error('Message is not delivered')
end
log.info('Get message: %s', json.encode(msg_totable(msg)))
append_message(messages, msg)
consumer:seek_partitions({
{msg:topic(), msg:partition(), msg:offset()}
}, 1000)
end
return messages
end
local function rebalance_protocol()
return consumer:rebalance_protocol()
end
local function test_create_errors()
log.info('Create without config')
local _, err = tnt_kafka.Consumer.create()
assert(err == 'config must not be nil')
log.info('Create with empty config')
local _, err = tnt_kafka.Consumer.create({})
assert(err == 'consumer config table must have non nil key \'brokers\' which contains string')
log.info('Create with empty brokers')
local _, err = tnt_kafka.Consumer.create({brokers = ''})
assert(err == 'No valid brokers specified')
log.info('Create with invalid default_topic_options keys')
local _, err = tnt_kafka.Consumer.create({brokers = '', default_topic_options = {[{}] = 2}})
assert(err == 'consumer config default topic options must contains only string keys and string values')
log.info('Create with invalid default_topic_options property')
local _, err = tnt_kafka.Consumer.create({brokers = '', default_topic_options = {[2] = 2}})
assert(err == 'No such configuration property: "2"')
log.info('Create with invalid options keys')
local _, err = tnt_kafka.Consumer.create({brokers = '', options = {[{}] = 2}})
assert(err == 'consumer config options must contains only string keys and string values')
log.info('Create with invalid options property')
local _, err = tnt_kafka.Consumer.create({brokers = '', options = {[2] = 2}})
assert(err == 'No such configuration property: "2"')
log.info('Create with incompatible properties')
local _, err = tnt_kafka.Consumer.create({brokers = '', options = {['reconnect.backoff.max.ms'] = '2', ['reconnect.backoff.ms'] = '1000'}})
assert(err == '`reconnect.backoff.max.ms` must be >= `reconnect.max.ms`')
end
return {
create = create,
subscribe = subscribe,
unsubscribe = unsubscribe,
consume = consume,
close = close,
get_errors = get_errors,
get_logs = get_logs,
get_stats = get_stats,
get_rebalances = get_rebalances,
dump_conf = dump_conf,
metadata = metadata,
list_groups = list_groups,
pause = pause,
resume = resume,
rebalance_protocol = rebalance_protocol,
test_seek_partitions = test_seek_partitions,
test_create_errors = test_create_errors,
}