Skip to content

Commit aa7c092

Browse files
DerekBumoleg-jukovec
authored andcommitted
api: fix INIT state stuck
Sometimes, instance could enter the queue initialization while still not running (for example, left in the orphan mode). This resulted in "lazy start". But Tarantool does not call `box.cfg {}` after leaving orphan mode, so queue could stuck in the `INIT` state. Now if the instance is read-only, separate fiber is watching for updates of its mode. Note that this fix works only for Tarantool versions >= 2.10.0. This is because of used watchers. Closes #226
1 parent 5f2b145 commit aa7c092

File tree

3 files changed

+148
-14
lines changed

3 files changed

+148
-14
lines changed

Diff for: CHANGELOG.md

+7
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,13 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog](http://keepachangelog.com/en/1.0.0/)
66
and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.html).
77

8+
## [Unreleased]
9+
10+
### Fixed
11+
12+
- Stuck in `INIT` state if an instance failed to enter the `running` mode
13+
in time (#226). This fix works only for Tarantool versions >= 2.10.0.
14+
815
## [1.3.3] - 2023-09-13
916

1017
### Fixed

Diff for: queue/init.lua

+44-14
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
1+
local fiber = require('fiber')
2+
13
local abstract = require('queue.abstract')
24
local queue_state = require('queue.abstract.queue_state')
5+
local qc = require('queue.compat')
36
local queue = nil
47

58
-- load all core drivers
@@ -11,6 +14,10 @@ local core_drivers = {
1114
limfifottl = require('queue.abstract.driver.limfifottl')
1215
}
1316

17+
-- since:
18+
-- https://github.com/locker/tarantool/commit/8cf5151cb4f05cee3fd0ea831add2b3187a01fe4
19+
local watchers_supported = qc.check_version({2, 10, 0})
20+
1421
local function register_driver(driver_name, tube_ctr)
1522
if type(tube_ctr.create_space) ~= 'function' or
1623
type(tube_ctr.new) ~= 'function' then
@@ -60,7 +67,20 @@ queue = setmetatable({
6067
local orig_cfg = nil
6168
local orig_call = nil
6269

63-
local wrapper_impl
70+
local wrapper_impl, handle_instance_mode
71+
72+
local function rw_waiter()
73+
fiber.name('queue instance rw waiter')
74+
local wait_cond = fiber.cond()
75+
local w = box.watch('box.status', function(_, new_status)
76+
if new_status.is_ro == false then
77+
wait_cond:signal()
78+
end
79+
end)
80+
wait_cond:wait()
81+
w:unregister()
82+
handle_instance_mode()
83+
end
6484

6585
local function cfg_wrapper(...)
6686
box.cfg = orig_cfg
@@ -79,24 +99,22 @@ local function wrap_box_cfg()
7999
orig_cfg = box.cfg
80100
box.cfg = cfg_wrapper
81101
elseif type(box.cfg) == 'table' then
82-
-- box.cfg after the first box.cfg call
83-
local cfg_mt = getmetatable(box.cfg)
84-
orig_call = cfg_mt.__call
85-
cfg_mt.__call = cfg_call_wrapper
102+
if box.info.ro_reason == 'config' or not watchers_supported then
103+
-- box.cfg after the first box.cfg call.
104+
-- The another call could switch the mode.
105+
local cfg_mt = getmetatable(box.cfg)
106+
orig_call = cfg_mt.__call
107+
cfg_mt.__call = cfg_call_wrapper
108+
else
109+
-- Wait for the rw state.
110+
fiber.new(rw_waiter)
111+
end
86112
else
87113
error('The box.cfg type is unexpected: ' .. type(box.cfg))
88114
end
89115
end
90116

91-
function wrapper_impl(...)
92-
local result = { pcall(box.cfg,...) }
93-
if result[1] then
94-
table.remove(result, 1)
95-
else
96-
wrap_box_cfg()
97-
error(result[2])
98-
end
99-
117+
function handle_instance_mode()
100118
if box.info.ro == false then
101119
local abstract = require 'queue.abstract'
102120
for name, val in pairs(abstract) do
@@ -113,6 +131,18 @@ function wrapper_impl(...)
113131
-- with read_only = false
114132
wrap_box_cfg()
115133
end
134+
end
135+
136+
function wrapper_impl(...)
137+
local result = { pcall(box.cfg,...) }
138+
if result[1] then
139+
table.remove(result, 1)
140+
else
141+
wrap_box_cfg()
142+
error(result[2])
143+
end
144+
145+
handle_instance_mode()
116146
return unpack(result)
117147
end
118148

Diff for: t/230-orphan-not-stalling-init.t

+97
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
#!/usr/bin/env tarantool
2+
3+
local test = require('tap').test('')
4+
local queue = require('queue')
5+
local tnt = require('t.tnt')
6+
local fio = require('fio')
7+
local fiber = require('fiber')
8+
9+
rawset(_G, 'queue', require('queue'))
10+
11+
local qc = require('queue.compat')
12+
if not qc.check_version({2, 10, 0}) then
13+
require('log').info('Tests skipped, tarantool version < 2.10.0 ' ..
14+
'does not support the lazy init')
15+
return
16+
end
17+
18+
local snapdir_optname = qc.snapdir_optname
19+
local logger_optname = qc.logger_optname
20+
21+
test:plan(1)
22+
23+
test:test('Check orphan mode not stalling queue', function(test)
24+
test:plan(4)
25+
local engine = os.getenv('ENGINE') or 'memtx'
26+
tnt.cluster.cfg{}
27+
28+
local dir_replica = fio.tempdir()
29+
local cmd_replica = {
30+
arg[-1],
31+
'-e',
32+
[[
33+
box.cfg {
34+
replication = {
35+
'replicator:[email protected]:3399',
36+
'replicator:[email protected]:3398',
37+
},
38+
listen = '127.0.0.1:3396',
39+
wal_dir = ']] .. dir_replica .. '\'' ..
40+
',' .. snapdir_optname() .. ' = \'' .. dir_replica .. '\'' ..
41+
',' .. logger_optname() .. ' = \'' ..
42+
fio.pathjoin(dir_replica, 'tarantool.log') .. '\'' ..
43+
'}'
44+
}
45+
46+
replica = require('popen').new(cmd_replica, {
47+
stdin = 'devnull',
48+
stdout = 'devnull',
49+
stderr = 'devnull',
50+
})
51+
52+
local attempts = 0
53+
-- Wait for replica to connect.
54+
while box.info.replication[3] == nil or
55+
box.info.replication[3].downstream.status ~= 'follow' do
56+
57+
attempts = attempts + 1
58+
if attempts == 30 then
59+
error('wait for replica connection')
60+
end
61+
fiber.sleep(0.1)
62+
end
63+
64+
local conn = require('net.box').connect('127.0.0.1:3396')
65+
66+
conn:eval([[
67+
box.cfg{
68+
replication = {
69+
'replicator:[email protected]:3399',
70+
'replicator:[email protected]:3398',
71+
'replicator:[email protected]:3396',
72+
},
73+
listen = '127.0.0.1:3397',
74+
replication_connect_quorum = 4,
75+
}
76+
]])
77+
78+
conn:eval('rawset(_G, "queue", require("queue"))')
79+
80+
test:is(conn:call('queue.state'), 'INIT', 'check queue state')
81+
test:is(conn:call('box.info').ro, true, 'check read only')
82+
test:is(conn:call('box.info').ro_reason, 'orphan', 'check ro reason')
83+
84+
conn:eval('box.cfg{replication_connect_quorum = 2}')
85+
86+
local attempts = 0
87+
while conn:call('queue.state') ~= 'RUNNING' and attempts < 50 do
88+
fiber.sleep(0.1)
89+
attempts = attempts + 1
90+
end
91+
test:is(conn:call('queue.state'), 'RUNNING', 'check queue state after orphan')
92+
end)
93+
94+
rawset(_G, 'queue', nil)
95+
tnt.finish()
96+
os.exit(test:check() and 0 or 1)
97+
-- vim: set ft=lua :

0 commit comments

Comments
 (0)