Skip to content

Commit 9f54091

Browse files
0x501DLeonidVas
authored andcommitted
core: add master-replica switching support
This patch adds ability to use queue in a master-replica scheme. The queue will monitor the operation mode of the tarantool and perform the necessary actions accordingly. This patch adds five states for queue: INIT, STARTUP, RUNNING, ENDING and WAITING. When the tarantool is launched for the first time, the state of the queue is always INIT until box.info.ro is false. States switching scheme: +-----------+ | RUNNING | +-----------+ ^ | (rw -> ro) | v +------+ +---------+ +--------+ | INIT | ---> | STARTUP | | ENDING | +------+ +---------+ +--------+ ^ | (ro -> rw) | v +-----------+ | WAITING | +-----------+ In the STARTUP state, the queue is waiting for possible data synchronization with other cluster members by the time of the largest upstream lag multiplied by two. After that, all taken tasks are released, except for tasks with session uuid matching inactive sessions uuids. This makes possible to take a task, switch roles on the cluster, and release the task within the timeout specified by the queue.cfg({ttr = N}) parameter. Note: all clients that take() and do not ack()/release() tasks must be disconnected before changing the role. And the last step in the STARTUP state is starting tube driver using new method called start(). Each tube driver must implement start() and stop() methods. The start() method should start the driver fibers, if any, in other words, initialize all asynchronous work with the tubes space. The stop() methods shuld stop the driver fibers, if any, respectively. In the RUNNING state, the queue is working as usually. The ENDING state calls stop() method. in the WAITING state, the queue listens for a change in the read_only flag. All states except INIT is controlled by new fiber called 'queue_state_fiber'. A new release_all() method has also been added, which forcibly returns all taken tasks to a ready state. This method can be called per tube. Closes #120
1 parent 9ff105b commit 9f54091

File tree

12 files changed

+384
-18
lines changed

12 files changed

+384
-18
lines changed

queue-scm-1.rockspec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ build = {
1919
['queue.abstract'] = 'queue/abstract.lua',
2020
['queue.abstract.state'] = 'queue/abstract/state.lua',
2121
['queue.abstract.queue_session'] = 'queue/abstract/queue_session.lua',
22+
['queue.abstract.queue_state'] = 'queue/abstract/queue_state.lua',
2223
['queue.abstract.driver.fifottl'] = 'queue/abstract/driver/fifottl.lua',
2324
['queue.abstract.driver.utubettl'] = 'queue/abstract/driver/utubettl.lua',
2425
['queue.abstract.driver.fifo'] = 'queue/abstract/driver/fifo.lua',

queue/CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/abstract/state.lua
1010
DESTINATION ${TARANTOOL_INSTALL_LUADIR}/${PROJECT_NAME}/abstract)
1111
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/abstract/queue_session.lua
1212
DESTINATION ${TARANTOOL_INSTALL_LUADIR}/${PROJECT_NAME}/abstract)
13+
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/abstract/queue_state.lua
14+
DESTINATION ${TARANTOOL_INSTALL_LUADIR}/${PROJECT_NAME}/abstract)
1315
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/abstract/driver/fifo.lua
1416
DESTINATION ${TARANTOOL_INSTALL_LUADIR}/${PROJECT_NAME}/abstract/driver/)
1517
install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/abstract/driver/utube.lua

queue/abstract.lua

Lines changed: 124 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ local uuid = require('uuid')
44

55
local session = require('queue.abstract.queue_session')
66
local state = require('queue.abstract.state')
7+
local queue_state = require('queue.abstract.queue_state')
78

89
local util = require('queue.util')
910
local qc = require('queue.compat')
@@ -36,7 +37,8 @@ local queue = {
3637
stat = {}
3738
}
3839

39-
local function tube_release_all_tasks(tube)
40+
-- Release tasks that don't have session_uuid stored in inactive sessions.
41+
local function tube_release_all_orphaned_tasks(tube)
4042
local prefix = ('queue: [tube "%s"] '):format(tube.name)
4143

4244
-- We lean on stable iterators in this function.
@@ -51,8 +53,17 @@ local function tube_release_all_tasks(tube)
5153
log.info(prefix .. 'releasing all taken task (may take a while)')
5254
local released = 0
5355
for _, task in tube.raw:tasks_by_state(state.TAKEN) do
54-
tube.raw:release(task[1], {})
55-
released = released + 1
56+
local taken = box.space._queue_taken_2.index.task:get{
57+
tube.tube_id, task[1]
58+
}
59+
if taken and session.exist_inactive(taken[4]) then
60+
log.info(prefix ..
61+
('skipping task: %d, tube_id: %d'):format(task[1],
62+
tube.tube_id))
63+
else
64+
tube.raw:release(task[1], {})
65+
released = released + 1
66+
end
5667
end
5768
log.info(prefix .. ('released %d tasks'):format(released))
5869
end
@@ -72,7 +83,20 @@ end
7283
-- tube methods
7384
local tube = {}
7485

86+
-- This check must be called from all public tube methods.
87+
local function check_state()
88+
if queue_state.get() ~= queue_state.states.RUNNING then
89+
log.error(('Queue is in %s state'):format(queue_state.show()))
90+
return false
91+
end
92+
93+
return true
94+
end
95+
7596
function tube.put(self, data, opts)
97+
if not check_state() then
98+
return nil
99+
end
76100
opts = opts or {}
77101
local task = self.raw:put(data, opts)
78102
return self.raw:normalize_task(task)
@@ -82,6 +106,9 @@ local conds = {}
82106
local releasing_connections = {}
83107

84108
function tube.take(self, timeout)
109+
if not check_state() then
110+
return nil
111+
end
85112
timeout = util.time(timeout or util.TIMEOUT_INFINITY)
86113
local task = self.raw:take()
87114
if task ~= nil then
@@ -120,6 +147,9 @@ function tube.take(self, timeout)
120147
end
121148

122149
function tube.touch(self, id, delta)
150+
if not check_state() then
151+
return
152+
end
123153
if delta == nil then
124154
return
125155
end
@@ -143,6 +173,9 @@ function tube.touch(self, id, delta)
143173
end
144174

145175
function tube.ack(self, id)
176+
if not check_state() then
177+
return nil
178+
end
146179
check_task_is_taken(self.tube_id, id)
147180
local tube = box.space._queue:get{self.name}
148181
local space_name = tube[3]
@@ -169,10 +202,32 @@ local function tube_release_internal(self, id, opts, session_uuid)
169202
end
170203

171204
function tube.release(self, id, opts)
205+
if not check_state() then
206+
return nil
207+
end
172208
return tube_release_internal(self, id, opts)
173209
end
174210

211+
-- Release all tasks.
212+
function tube.release_all(self)
213+
if not check_state() then
214+
return
215+
end
216+
local prefix = ('queue: [tube "%s"] '):format(self.name)
217+
218+
log.info(prefix .. 'releasing all taken task (may take a while)')
219+
local released = 0
220+
for _, task in self.raw:tasks_by_state(state.TAKEN) do
221+
self.raw:release(task[1], {})
222+
released = released + 1
223+
end
224+
log.info(('%s released %d tasks'):format(prefix, released))
225+
end
226+
175227
function tube.peek(self, id)
228+
if not check_state() then
229+
return nil
230+
end
176231
local task = self.raw:peek(id)
177232
if task == nil then
178233
error(("Task %s not found"):format(tostring(id)))
@@ -181,6 +236,9 @@ function tube.peek(self, id)
181236
end
182237

183238
function tube.bury(self, id)
239+
if not check_state() then
240+
return nil
241+
end
184242
local task = self:peek(id)
185243
local is_taken, _ = pcall(check_task_is_taken, self.tube_id, id)
186244
if is_taken then
@@ -193,17 +251,26 @@ function tube.bury(self, id)
193251
end
194252

195253
function tube.kick(self, count)
254+
if not check_state() then
255+
return nil
256+
end
196257
count = count or 1
197258
return self.raw:kick(count)
198259
end
199260

200261
function tube.delete(self, id)
262+
if not check_state() then
263+
return nil
264+
end
201265
self:peek(id)
202266
return self.raw:normalize_task(self.raw:delete(id))
203267
end
204268

205269
-- drop tube
206270
function tube.drop(self)
271+
if not check_state() then
272+
return nil
273+
end
207274
local tube_name = self.name
208275

209276
local tube = box.space._queue:get{tube_name}
@@ -238,6 +305,9 @@ end
238305
-- truncate tube
239306
-- (delete everything from tube)
240307
function tube.truncate(self)
308+
if not check_state() then
309+
return
310+
end
241311
self.raw:truncate()
242312
end
243313

@@ -248,6 +318,9 @@ function tube.on_task_change(self, cb)
248318
end
249319

250320
function tube.grant(self, user, args)
321+
if not check_state() then
322+
return
323+
end
251324
local function tube_grant_space(user, name, tp)
252325
box.schema.user.grant(user, tp or 'read,write', 'space', name, {
253326
if_not_exists = true,
@@ -414,6 +487,10 @@ local function release_session_tasks(session_uuid)
414487
end
415488
end
416489

490+
function method.state()
491+
return queue_state.show()
492+
end
493+
417494
function method._on_consumer_disconnect()
418495
local conn_id = connection.id()
419496

@@ -438,6 +515,37 @@ function method._on_consumer_disconnect()
438515
session.disconnect(conn_id)
439516
end
440517

518+
-- Function takes new queue state.
519+
-- The "RUNNING" and "WAITING" states do not require additional actions.
520+
local function on_state_change(state)
521+
if state == queue_state.states.STARTUP then
522+
for name, tube in pairs(queue.tube) do
523+
tube_release_all_orphaned_tasks(tube)
524+
log.info('queue: [tube "%s"] start driver', name)
525+
if not tube.raw.start then
526+
log.warn('queue: [tube "%s"] method start is not implemented',
527+
tube.name)
528+
else
529+
tube.raw:start()
530+
end
531+
end
532+
session.start()
533+
elseif state == queue_state.states.ENDING then
534+
for name, tube in pairs(queue.tube) do
535+
log.info('queue: [tube "%s"] stop driver', name)
536+
if not tube.raw.stop then
537+
log.warn('queue: [tube "%s"] method stop is not implemented',
538+
tube.name)
539+
else
540+
tube.raw:stop()
541+
end
542+
end
543+
session.stop()
544+
else
545+
error('on_state_change: unexpected queue state')
546+
end
547+
end
548+
441549
-- function takes tuples and recreates tube
442550
local function recreate_tube(tube_tuple)
443551
local name, id, space_name, tube_type, opts = tube_tuple:unpack()
@@ -457,6 +565,9 @@ end
457565
-------------------------------------------------------------------------------
458566
-- create tube
459567
function method.create_tube(tube_name, tube_type, opts)
568+
if not check_state() then
569+
return
570+
end
460571
opts = opts or {}
461572
if opts.if_not_exists == nil then
462573
opts.if_not_exists = false
@@ -559,7 +670,7 @@ function method.start()
559670
if _taken == nil then
560671
-- tube_id, task_id, connection_id, session_uuid, time
561672
_taken = box.schema.create_space('_queue_taken_2', {
562-
temporary = true,
673+
temporary = false,
563674
format = {
564675
{name = 'tube_id', type = num_type()},
565676
{name = 'task_id', type = num_type()},
@@ -578,6 +689,12 @@ function method.start()
578689
parts = {4, str_type()},
579690
unique = false
580691
})
692+
else
693+
-- Upgrade space, queue states require that this space
694+
-- was not temporary.
695+
if _taken.temporary then
696+
_taken:alter{temporary = false}
697+
end
581698
end
582699

583700
for _, tube_tuple in _queue:pairs() do
@@ -586,14 +703,15 @@ function method.start()
586703
if queue.driver[tube_tuple[4]] ~= nil then
587704
local tube = recreate_tube(tube_tuple)
588705
-- gh-66: release all taken tasks on start
589-
tube_release_all_tasks(tube)
706+
tube_release_all_orphaned_tasks(tube)
590707
end
591708
end
592709

593710
session.on_session_remove(release_session_tasks)
594711
session.start()
595712

596713
connection.on_disconnect(queue._on_consumer_disconnect)
714+
queue_state.init(on_state_change)
597715
return queue
598716
end
599717

@@ -617,7 +735,7 @@ function method.register_driver(driver_name, tube_ctr)
617735
if tube_tuple[4] == driver_name then
618736
local tube = recreate_tube(tube_tuple)
619737
-- Release all task for tube on start.
620-
tube_release_all_tasks(tube)
738+
tube_release_all_orphaned_tasks(tube)
621739
end
622740
end
623741
end

queue/abstract/driver/fifo.lua

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,4 +148,14 @@ function method.truncate(self)
148148
self.space:truncate()
149149
end
150150

151+
-- This driver has no background activity.
152+
-- Implement dummy methods for the API requirement.
153+
function method.start()
154+
return
155+
end
156+
157+
function method.stop()
158+
return
159+
end
160+
151161
return tube

queue/abstract/driver/fifottl.lua

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ local function fifottl_fiber(self)
159159
while true do
160160
if box.info.ro == false then
161161
local stat, err = pcall(fifottl_fiber_iteration, self, processed)
162+
162163
if not stat and not (err.code == box.error.READONLY) then
163164
log.error("error catched: %s", tostring(err))
164165
log.error("exiting fiber '%s'", fiber.name())
@@ -167,7 +168,11 @@ local function fifottl_fiber(self)
167168
processed = err
168169
end
169170
else
170-
fiber.sleep(0.1)
171+
-- When switching the master to the replica, the fiber will be stopped.
172+
if self.sync_chan:get(0.1) ~= nil then
173+
print("Queue fifottl was stopped")
174+
break
175+
end
171176
end
172177
end
173178
end
@@ -191,6 +196,7 @@ function tube.new(space, on_task_change, opts)
191196

192197
self.cond = qc.waiter()
193198
self.fiber = fiber.create(fifottl_fiber, self)
199+
self.sync_chan = fiber.channel()
194200

195201
return self
196202
end
@@ -365,4 +371,20 @@ function method.truncate(self)
365371
self.space:truncate()
366372
end
367373

374+
function method.start(self)
375+
if self.fiber then
376+
return
377+
end
378+
self.fiber = fiber.create(fifottl_fiber, self)
379+
end
380+
381+
function method.stop(self)
382+
if not self.fiber then
383+
return
384+
end
385+
self.cond:signal(self.fiber:id())
386+
self.sync_chan:put(true)
387+
self.fiber = nil
388+
end
389+
368390
return tube

queue/abstract/driver/utube.lua

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,4 +175,14 @@ function method.truncate(self)
175175
self.space:truncate()
176176
end
177177

178+
-- This driver has no background activity.
179+
-- Implement dummy methods for the API requirement.
180+
function method.start()
181+
return
182+
end
183+
184+
function method.stop()
185+
return
186+
end
187+
178188
return tube

0 commit comments

Comments
 (0)