Skip to content

Commit 2f157c9

Browse files
0x501DLeonidVas
authored andcommitted
replicaset_mode: fix queue.tube indexes
In replicaset mode, tubes can be created and deleted on different nodes. Accordingly, it is necessary to rebuild the queue.tube index. Closes #202
1 parent 18c99f5 commit 2f157c9

File tree

3 files changed

+82
-18
lines changed

3 files changed

+82
-18
lines changed

queue/abstract.lua

+29-16
Original file line numberDiff line numberDiff line change
@@ -520,10 +520,39 @@ function method._on_consumer_disconnect()
520520
session.disconnect(conn_id)
521521
end
522522

523+
-- function takes tuples and recreates tube
524+
local function recreate_tube(tube_tuple)
525+
local name, id, space_name, tube_type, opts = tube_tuple:unpack()
526+
527+
local driver = queue.driver[tube_type]
528+
if driver == nil then
529+
error("Unknown tube type " .. tostring(tube_type))
530+
end
531+
532+
local space = box.space[space_name]
533+
if space == nil then
534+
error(("Space '%s' doesn't exists"):format(space_name))
535+
end
536+
return make_self(driver, space, name, tube_type, id, opts)
537+
end
538+
523539
-- Function takes new queue state.
524540
-- The "RUNNING" and "WAITING" states do not require additional actions.
525541
local function on_state_change(state)
526542
if state == queue_state.states.STARTUP then
543+
local replicaset_mode = queue.cfg['in_replicaset'] or false
544+
-- gh-202: In replicaset mode, tubes can be created and deleted on different nodes.
545+
-- Accordingly, it is necessary to rebuild the queue.tube index.
546+
if replicaset_mode then
547+
for _, tube_name in pairs(queue.tube()) do
548+
queue.tube[tube_name] = nil
549+
end
550+
for _, tube_tuple in box.space._queue:pairs() do
551+
if queue.driver[tube_tuple[4]] ~= nil then
552+
recreate_tube(tube_tuple)
553+
end
554+
end
555+
end
527556
for name, tube in pairs(queue.tube) do
528557
tube_release_all_orphaned_tasks(tube)
529558
log.info('queue: [tube "%s"] start driver', name)
@@ -551,22 +580,6 @@ local function on_state_change(state)
551580
end
552581
end
553582

554-
-- function takes tuples and recreates tube
555-
local function recreate_tube(tube_tuple)
556-
local name, id, space_name, tube_type, opts = tube_tuple:unpack()
557-
558-
local driver = queue.driver[tube_type]
559-
if driver == nil then
560-
error("Unknown tube type " .. tostring(tube_type))
561-
end
562-
563-
local space = box.space[space_name]
564-
if space == nil then
565-
error(("Space '%s' doesn't exists"):format(space_name))
566-
end
567-
return make_self(driver, space, name, tube_type, id, opts)
568-
end
569-
570583
-------------------------------------------------------------------------------
571584
-- create tube
572585
function method.create_tube(tube_name, tube_type, opts)

t/200-master-replica.t

+52-1
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ end
2121
-- Replica connection handler.
2222
local conn = {}
2323

24-
test:plan(7)
24+
test:plan(8)
2525

2626
test:test('Check master-replica setup', function(test)
2727
test:plan(8)
@@ -337,6 +337,57 @@ test:test('Check in_replicaset switching', function(test)
337337
'taken tasks count after release_all')
338338
end)
339339

340+
-- gh-202
341+
test:test('Check that tubes indexes is actual after role change', function(test)
342+
local engine = os.getenv('ENGINE') or 'memtx'
343+
test:plan(10)
344+
box.cfg{read_only = true}
345+
queue_state.poll(queue_state.states.WAITING, 10)
346+
test:is(queue.state(), 'WAITING', 'master state is waiting')
347+
conn:eval('box.cfg{read_only=false}')
348+
conn:eval([[
349+
queue_state = require('queue.abstract.queue_state')
350+
queue_state.poll(queue_state.states.RUNNING, 10)
351+
]])
352+
test:is(conn:call('queue.state'), 'RUNNING', 'replica state is running')
353+
conn:eval([[queue.create_tube('repl_tube', 'fifo', {engine =]] .. engine .. [[})]])
354+
355+
-- Switch roles back.
356+
conn:eval('box.cfg{read_only=true}')
357+
conn:eval([[
358+
queue_state = require('queue.abstract.queue_state')
359+
queue_state.poll(queue_state.states.WAITING, 10)
360+
]])
361+
box.cfg{read_only = false}
362+
queue_state.poll(queue_state.states.RUNNING, 10)
363+
test:is(queue.state(), 'RUNNING', 'master state is running')
364+
test:is(conn:call('queue.state'), 'WAITING', 'replica state is waiting')
365+
test:ok(queue.tube.repl_tube, 'repl_tube is accessible')
366+
367+
box.cfg{read_only = true}
368+
queue_state.poll(queue_state.states.WAITING, 10)
369+
test:is(queue.state(), 'WAITING', 'master state is waiting')
370+
conn:eval('box.cfg{read_only=false}')
371+
conn:eval([[
372+
queue_state = require('queue.abstract.queue_state')
373+
queue_state.poll(queue_state.states.RUNNING, 10)
374+
]])
375+
test:is(conn:call('queue.state'), 'RUNNING', 'replica state is running')
376+
conn:eval('queue.tube.repl_tube:drop()')
377+
378+
-- Switch roles back.
379+
conn:eval('box.cfg{read_only=true}')
380+
conn:eval([[
381+
queue_state = require('queue.abstract.queue_state')
382+
queue_state.poll(queue_state.states.WAITING, 10)
383+
]])
384+
box.cfg{read_only = false}
385+
queue_state.poll(queue_state.states.RUNNING, 10)
386+
test:is(queue.state(), 'RUNNING', 'master state is running')
387+
test:is(conn:call('queue.state'), 'WAITING', 'replica state is waiting')
388+
test:isnil(queue.tube.repl_tube, "repl_tube is not indexed")
389+
end)
390+
340391
rawset(_G, 'queue', nil)
341392
conn:eval('rawset(_G, "queue", nil)')
342393
conn:close()

t/tnt/init.lua

+1-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ local function tnt_cluster_prepare(cfg_args)
8787

8888
box.cfg(cfg_args)
8989
-- Allow guest all operations.
90-
box.schema.user.grant('guest', 'read, write, execute', 'universe')
90+
box.schema.user.grant('guest', 'read, write, execute, create, drop', 'universe')
9191
box.schema.user.create('replicator', {password = 'password'})
9292
box.schema.user.grant('replicator', 'replication')
9393

0 commit comments

Comments
 (0)