Skip to content

Commit ccffa0e

Browse files
authored
feat: interact etcd via gRPC before Nginx starts (#8548)
1 parent 871d05e commit ccffa0e

File tree

9 files changed

+320
-134
lines changed

9 files changed

+320
-134
lines changed

apisix/cli/etcd.lua

+248-129
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,241 @@ local function request(url, yaml_conf)
146146
end
147147

148148

149+
local function prepare_dirs_via_http(yaml_conf, args, index, host, host_count)
150+
local is_success = true
151+
152+
local errmsg
153+
local auth_token
154+
local user = yaml_conf.etcd.user
155+
local password = yaml_conf.etcd.password
156+
if user and password then
157+
local auth_url = host .. "/v3/auth/authenticate"
158+
local json_auth = {
159+
name = user,
160+
password = password
161+
}
162+
163+
local post_json_auth = dkjson.encode(json_auth)
164+
local response_body = {}
165+
166+
local res, err
167+
local retry_time = 0
168+
while retry_time < 2 do
169+
res, err = request({
170+
url = auth_url,
171+
method = "POST",
172+
source = ltn12.source.string(post_json_auth),
173+
sink = ltn12.sink.table(response_body),
174+
headers = {
175+
["Content-Length"] = #post_json_auth
176+
}
177+
}, yaml_conf)
178+
-- In case of failure, request returns nil followed by an error message.
179+
-- Else the first return value is just the number 1
180+
-- and followed by the response status code.
181+
if res then
182+
break
183+
end
184+
retry_time = retry_time + 1
185+
print(str_format("Warning! Request etcd endpoint \'%s\' error, %s, retry time=%s",
186+
auth_url, err, retry_time))
187+
end
188+
189+
if not res then
190+
errmsg = str_format("request etcd endpoint \"%s\" error, %s\n", auth_url, err)
191+
util.die(errmsg)
192+
end
193+
194+
local res_auth = table_concat(response_body)
195+
local body_auth, _, err_auth = dkjson.decode(res_auth)
196+
if err_auth or (body_auth and not body_auth["token"]) then
197+
errmsg = str_format("got malformed auth message: \"%s\" from etcd \"%s\"\n",
198+
res_auth, auth_url)
199+
util.die(errmsg)
200+
end
201+
202+
auth_token = body_auth.token
203+
end
204+
205+
206+
local dirs = {}
207+
for name in pairs(constants.HTTP_ETCD_DIRECTORY) do
208+
dirs[name] = true
209+
end
210+
for name in pairs(constants.STREAM_ETCD_DIRECTORY) do
211+
dirs[name] = true
212+
end
213+
214+
for dir_name in pairs(dirs) do
215+
local key = (yaml_conf.etcd.prefix or "") .. dir_name .. "/"
216+
217+
local put_url = host .. "/v3/kv/put"
218+
local post_json = '{"value":"' .. base64_encode("init_dir")
219+
.. '", "key":"' .. base64_encode(key) .. '"}'
220+
local response_body = {}
221+
local headers = {["Content-Length"] = #post_json}
222+
if auth_token then
223+
headers["Authorization"] = auth_token
224+
end
225+
226+
local res, err
227+
local retry_time = 0
228+
while retry_time < 2 do
229+
res, err = request({
230+
url = put_url,
231+
method = "POST",
232+
source = ltn12.source.string(post_json),
233+
sink = ltn12.sink.table(response_body),
234+
headers = headers
235+
}, yaml_conf)
236+
retry_time = retry_time + 1
237+
if res then
238+
break
239+
end
240+
print(str_format("Warning! Request etcd endpoint \'%s\' error, %s, retry time=%s",
241+
put_url, err, retry_time))
242+
end
243+
244+
if not res then
245+
errmsg = str_format("request etcd endpoint \"%s\" error, %s\n", put_url, err)
246+
util.die(errmsg)
247+
end
248+
249+
local res_put = table_concat(response_body)
250+
if res_put:find("404 page not found", 1, true) then
251+
errmsg = str_format("gRPC gateway is not enabled in etcd cluster \"%s\",",
252+
"which is required by Apache APISIX\n")
253+
util.die(errmsg)
254+
end
255+
256+
if res_put:find("CommonName of client sending a request against gateway", 1, true) then
257+
errmsg = str_format("etcd \"client-cert-auth\" cannot be used with gRPC-gateway, "
258+
.. "please configure the etcd username and password "
259+
.. "in configuration file\n")
260+
util.die(errmsg)
261+
end
262+
263+
if res_put:find("error", 1, true) then
264+
is_success = false
265+
if (index == host_count) then
266+
errmsg = str_format("got malformed key-put message: \"%s\" from etcd \"%s\"\n",
267+
res_put, put_url)
268+
util.die(errmsg)
269+
end
270+
271+
break
272+
end
273+
274+
if args and args["verbose"] then
275+
print(res_put)
276+
end
277+
end
278+
279+
return is_success
280+
end
281+
282+
283+
local function grpc_request(url, yaml_conf, key)
284+
local cmd
285+
286+
local auth = ""
287+
if yaml_conf.etcd.user then
288+
local user = yaml_conf.etcd.user
289+
local password = yaml_conf.etcd.password
290+
auth = str_format("--user=%s:%s", user, password)
291+
end
292+
293+
if str_sub(url, 1, 8) == "https://" then
294+
local host = url:sub(9)
295+
296+
local verify = true
297+
local certificate, pkey, cafile
298+
if yaml_conf.etcd.tls then
299+
local cfg = yaml_conf.etcd.tls
300+
301+
if cfg.verify == false then
302+
verify = false
303+
end
304+
305+
certificate = cfg.cert
306+
pkey = cfg.key
307+
308+
local apisix_ssl = yaml_conf.apisix.ssl
309+
if apisix_ssl and apisix_ssl.ssl_trusted_certificate then
310+
cafile = apisix_ssl.ssl_trusted_certificate
311+
end
312+
end
313+
314+
cmd = str_format(
315+
"etcdctl --insecure-transport=false %s %s %s %s " ..
316+
"%s --endpoints=%s put %s init_dir",
317+
verify and "" or "--insecure-skip-tls-verify",
318+
certificate and "--cert " .. certificate or "",
319+
pkey and "--key " .. pkey or "",
320+
cafile and "--cacert " .. cafile or "",
321+
auth, host, key)
322+
else
323+
local host = url:sub(#("http://") + 1)
324+
325+
cmd = str_format(
326+
"etcdctl %s --endpoints=%s put %s init_dir",
327+
auth, host, key)
328+
end
329+
330+
local res, err = util.execute_cmd(cmd)
331+
return res, err
332+
end
333+
334+
335+
local function prepare_dirs_via_grpc(yaml_conf, args, index, host)
336+
local is_success = true
337+
338+
local errmsg
339+
local dirs = {}
340+
for name in pairs(constants.HTTP_ETCD_DIRECTORY) do
341+
dirs[name] = true
342+
end
343+
for name in pairs(constants.STREAM_ETCD_DIRECTORY) do
344+
dirs[name] = true
345+
end
346+
347+
for dir_name in pairs(dirs) do
348+
local key = (yaml_conf.etcd.prefix or "") .. dir_name .. "/"
349+
local res, err
350+
local retry_time = 0
351+
while retry_time < 2 do
352+
res, err = grpc_request(host, yaml_conf, key)
353+
retry_time = retry_time + 1
354+
if res then
355+
break
356+
end
357+
print(str_format("Warning! Request etcd endpoint \'%s\' error, %s, retry time=%s",
358+
host, err, retry_time))
359+
end
360+
361+
if not res then
362+
errmsg = str_format("request etcd endpoint \"%s\" error, %s\n", host, err)
363+
util.die(errmsg)
364+
end
365+
366+
if args and args["verbose"] then
367+
print(res)
368+
end
369+
end
370+
371+
return is_success
372+
end
373+
374+
375+
local function prepare_dirs(use_grpc, yaml_conf, args, index, host, host_count)
376+
if use_grpc then
377+
return prepare_dirs_via_grpc(yaml_conf, args, index, host)
378+
end
379+
380+
return prepare_dirs_via_http(yaml_conf, args, index, host, host_count)
381+
end
382+
383+
149384
function _M.init(env, args)
150385
-- read_yaml_conf
151386
local yaml_conf, err = file.read_yaml_conf(env.apisix_home)
@@ -242,138 +477,22 @@ function _M.init(env, args)
242477
util.die("the etcd cluster needs at least 50% and above healthy nodes\n")
243478
end
244479

245-
local etcd_ok = false
246-
for index, host in ipairs(etcd_healthy_hosts) do
247-
local is_success = true
248-
249-
local errmsg
250-
local auth_token
251-
local user = yaml_conf.etcd.user
252-
local password = yaml_conf.etcd.password
253-
if user and password then
254-
local auth_url = host .. "/v3/auth/authenticate"
255-
local json_auth = {
256-
name = etcd_conf.user,
257-
password = etcd_conf.password
258-
}
259-
260-
local post_json_auth = dkjson.encode(json_auth)
261-
local response_body = {}
262-
263-
local res, err
264-
local retry_time = 0
265-
while retry_time < 2 do
266-
res, err = request({
267-
url = auth_url,
268-
method = "POST",
269-
source = ltn12.source.string(post_json_auth),
270-
sink = ltn12.sink.table(response_body),
271-
headers = {
272-
["Content-Length"] = #post_json_auth
273-
}
274-
}, yaml_conf)
275-
-- In case of failure, request returns nil followed by an error message.
276-
-- Else the first return value is just the number 1
277-
-- and followed by the response status code.
278-
if res then
279-
break
280-
end
281-
retry_time = retry_time + 1
282-
print(str_format("Warning! Request etcd endpoint \'%s\' error, %s, retry time=%s",
283-
auth_url, err, retry_time))
284-
end
285-
286-
if not res then
287-
errmsg = str_format("request etcd endpoint \"%s\" error, %s\n", auth_url, err)
288-
util.die(errmsg)
289-
end
290-
291-
local res_auth = table_concat(response_body)
292-
local body_auth, _, err_auth = dkjson.decode(res_auth)
293-
if err_auth or (body_auth and not body_auth["token"]) then
294-
errmsg = str_format("got malformed auth message: \"%s\" from etcd \"%s\"\n",
295-
res_auth, auth_url)
296-
util.die(errmsg)
297-
end
298-
299-
auth_token = body_auth.token
300-
end
301-
302-
303-
local dirs = {}
304-
for name in pairs(constants.HTTP_ETCD_DIRECTORY) do
305-
dirs[name] = true
306-
end
307-
for name in pairs(constants.STREAM_ETCD_DIRECTORY) do
308-
dirs[name] = true
309-
end
310-
311-
for dir_name in pairs(dirs) do
312-
local key = (etcd_conf.prefix or "") .. dir_name .. "/"
313-
314-
local put_url = host .. "/v3/kv/put"
315-
local post_json = '{"value":"' .. base64_encode("init_dir")
316-
.. '", "key":"' .. base64_encode(key) .. '"}'
317-
local response_body = {}
318-
local headers = {["Content-Length"] = #post_json}
319-
if auth_token then
320-
headers["Authorization"] = auth_token
321-
end
322-
323-
local res, err
324-
local retry_time = 0
325-
while retry_time < 2 do
326-
res, err = request({
327-
url = put_url,
328-
method = "POST",
329-
source = ltn12.source.string(post_json),
330-
sink = ltn12.sink.table(response_body),
331-
headers = headers
332-
}, yaml_conf)
333-
retry_time = retry_time + 1
334-
if res then
335-
break
336-
end
337-
print(str_format("Warning! Request etcd endpoint \'%s\' error, %s, retry time=%s",
338-
put_url, err, retry_time))
339-
end
340-
341-
if not res then
342-
errmsg = str_format("request etcd endpoint \"%s\" error, %s\n", put_url, err)
343-
util.die(errmsg)
344-
end
345-
346-
local res_put = table_concat(response_body)
347-
if res_put:find("404 page not found", 1, true) then
348-
errmsg = str_format("gRPC gateway is not enabled in etcd cluster \"%s\",",
349-
"which is required by Apache APISIX\n")
350-
util.die(errmsg)
351-
end
352-
353-
if res_put:find("CommonName of client sending a request against gateway", 1, true) then
354-
errmsg = str_format("etcd \"client-cert-auth\" cannot be used with gRPC-gateway, "
355-
.. "please configure the etcd username and password "
356-
.. "in configuration file\n")
357-
util.die(errmsg)
358-
end
359-
360-
if res_put:find("error", 1, true) then
361-
is_success = false
362-
if (index == host_count) then
363-
errmsg = str_format("got malformed key-put message: \"%s\" from etcd \"%s\"\n",
364-
res_put, put_url)
365-
util.die(errmsg)
366-
end
367-
368-
break
369-
end
480+
if etcd_conf.use_grpc and not env.use_apisix_base then
481+
io_stderr:write("'use_grpc: true' in the etcd configuration " ..
482+
"is not supported by vanilla OpenResty\n")
483+
end
370484

371-
if args and args["verbose"] then
372-
print(res_put)
373-
end
485+
local use_grpc = etcd_conf.use_grpc and env.use_apisix_base
486+
if use_grpc then
487+
local ok, err = util.execute_cmd("command -v etcdctl")
488+
if not ok then
489+
util.die("can't find etcdctl: ", err, "\n")
374490
end
491+
end
375492

376-
if is_success then
493+
local etcd_ok = false
494+
for index, host in ipairs(etcd_healthy_hosts) do
495+
if prepare_dirs(use_grpc, yaml_conf, args, index, host, host_count) then
377496
etcd_ok = true
378497
break
379498
end

0 commit comments

Comments
 (0)