Skip to content

Commit 0164ed4

Browse files
better0fdeadLeonidVas
authored andcommitted
driver: fix duplicate id error with mvvc on put and take
Taking the maximum or minimum of the index is an implicit transactions, so it is always done with 'read-confirmed' mvcc isolation level. It can lead to errors when trying to make parallel 'put' or 'take' calls with mvcc enabled. It is hapenning because 'max' or 'min' for several puts in parallel will be the same since read confirmed isolation level makes visible all transactions that finished the commit. To fix it we wrap it with box.begin/commit and set right isolation level. Current fix does not resolve that bug in situations when we already are in transaction since it will open nested transactions. Part of #207
1 parent 8231c25 commit 0164ed4

File tree

5 files changed

+224
-11
lines changed

5 files changed

+224
-11
lines changed

queue/abstract/driver/fifo.lua

+40-2
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,27 @@ end
6868

6969
-- put task in space
7070
function method.put(self, data, opts)
71-
local max = self.space.index.task_id:max()
71+
local max
72+
73+
-- Taking the maximum of the index is an implicit transactions, so it is
74+
-- always done with 'read-confirmed' mvcc isolation level.
75+
-- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled.
76+
-- It is hapenning because 'max' for several puts in parallel will be the same since
77+
-- read confirmed isolation level makes visible all transactions that finished the commit.
78+
-- To fix it we wrap it with box.begin/commit and set right isolation level.
79+
-- Current fix does not resolve that bug in situations when we already are in transaction
80+
-- since it will open nested transactions.
81+
-- See https://github.com/tarantool/queue/issues/207
82+
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
83+
84+
if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then
85+
box.begin({txn_isolation = 'read-committed'})
86+
max = self.space.index.task_id:max()
87+
box.commit()
88+
else
89+
max = self.space.index.task_id:max()
90+
end
91+
7292
local id = max and max[1] + 1 or 0
7393
local task = self.space:insert{id, state.READY, data}
7494
self.on_task_change(task, 'put')
@@ -77,7 +97,25 @@ end
7797

7898
-- take task
7999
function method.take(self)
80-
local task = self.space.index.status:min{state.READY}
100+
local task
101+
-- Taking the minimum is an implicit transactions, so it is
102+
-- always done with 'read-confirmed' mvcc isolation level.
103+
-- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled.
104+
-- It is hapenning because 'min' for several takes in parallel will be the same since
105+
-- read confirmed isolation level makes visible all transactions that finished the commit.
106+
-- To fix it we wrap it with box.begin/commit and set right isolation level.
107+
-- Current fix does not resolve that bug in situations when we already are in transaction
108+
-- since it will open nested transactions.
109+
-- See https://github.com/tarantool/queue/issues/207
110+
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
111+
if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then
112+
box.begin({txn_isolation = 'read-committed'})
113+
task = self.space.index.status:min{state.READY}
114+
box.commit()
115+
else
116+
task = self.space.index.status:min{state.READY}
117+
end
118+
81119
if task ~= nil and task[2] == state.READY then
82120
task = self.space:update(task[1], { { '=', 2, state.TAKEN } })
83121
self.on_task_change(task, 'take')

queue/abstract/driver/fifottl.lua

+36-5
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,27 @@ end
209209

210210
-- put task in space
211211
function method.put(self, data, opts)
212-
local max = self.space.index.task_id:max()
212+
local max
213+
214+
-- Taking the maximum of the index is an implicit transactions, so it is
215+
-- always done with 'read-confirmed' mvcc isolation level.
216+
-- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled.
217+
-- It is hapenning because 'max' for several puts in parallel will be the same since
218+
-- read confirmed isolation level makes visible all transactions that finished the commit.
219+
-- To fix it we wrap it with box.begin/commit and set right isolation level.
220+
-- Current fix does not resolve that bug in situations when we already are in transaction
221+
-- since it will open nested transactions.
222+
-- See https://github.com/tarantool/queue/issues/207
223+
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
224+
225+
if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then
226+
box.begin({txn_isolation = 'read-committed'})
227+
max = self.space.index.task_id:max()
228+
box.commit()
229+
else
230+
max = self.space.index.task_id:max()
231+
end
232+
213233
local id = max and max[i_id] + 1 or 0
214234

215235
local status
@@ -265,10 +285,21 @@ end
265285
-- take task
266286
function method.take(self)
267287
local task = nil
268-
for _, t in self.space.index.status:pairs({state.READY}) do
269-
if not is_expired(t) then
270-
task = t
271-
break
288+
if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then
289+
box.begin({txn_isolation = 'read-committed'})
290+
for _, t in self.space.index.status:pairs({state.READY}) do
291+
if not is_expired(t) then
292+
task = t
293+
break
294+
end
295+
end
296+
box.commit()
297+
else
298+
for _, t in self.space.index.status:pairs({state.READY}) do
299+
if not is_expired(t) then
300+
task = t
301+
break
302+
end
272303
end
273304
end
274305

queue/abstract/driver/utube.lua

+39-2
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,27 @@ end
7575

7676
-- put task in space
7777
function method.put(self, data, opts)
78-
local max = self.space.index.task_id:max()
78+
local max
79+
80+
-- Taking the maximum of the index is an implicit transactions, so it is
81+
-- always done with 'read-confirmed' mvcc isolation level.
82+
-- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled.
83+
-- It is hapenning because 'max' for several puts in parallel will be the same since
84+
-- read confirmed isolation level makes visible all transactions that finished the commit.
85+
-- To fix it we wrap it with box.begin/commit and set right isolation level.
86+
-- Current fix does not resolve that bug in situations when we already are in transaction
87+
-- since it will open nested transactions.
88+
-- See https://github.com/tarantool/queue/issues/207
89+
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
90+
91+
if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then
92+
box.begin({txn_isolation = 'read-committed'})
93+
max = self.space.index.task_id:max()
94+
box.commit()
95+
else
96+
max = self.space.index.task_id:max()
97+
end
98+
7999
local id = max and max[1] + 1 or 0
80100
local task = self.space:insert{id, state.READY, tostring(opts.utube), data}
81101
self.on_task_change(task, 'put')
@@ -89,8 +109,25 @@ function method.take(self)
89109
if task[2] ~= state.READY then
90110
break
91111
end
112+
local taken
113+
-- Taking the minimum is an implicit transactions, so it is
114+
-- always done with 'read-confirmed' mvcc isolation level.
115+
-- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled.
116+
-- It is hapenning because 'min' for several takes in parallel will be the same since
117+
-- read confirmed isolation level makes visible all transactions that finished the commit.
118+
-- To fix it we wrap it with box.begin/commit and set right isolation level.
119+
-- Current fix does not resolve that bug in situations when we already are in transaction
120+
-- since it will open nested transactions.
121+
-- See https://github.com/tarantool/queue/issues/207
122+
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
123+
if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then
124+
box.begin({txn_isolation = 'read-committed'})
125+
taken = self.space.index.utube:min{state.TAKEN, task[3]}
126+
box.commit()
127+
else
128+
taken = self.space.index.utube:min{state.TAKEN, task[3]}
129+
end
92130

93-
local taken = self.space.index.utube:min{state.TAKEN, task[3]}
94131
if taken == nil or taken[2] ~= state.TAKEN then
95132
task = self.space:update(task[1], { { '=', 2, state.TAKEN } })
96133
self.on_task_change(task, 'take')

queue/abstract/driver/utubettl.lua

+40-2
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,27 @@ end
217217

218218
-- put task in space
219219
function method.put(self, data, opts)
220-
local max = self.space.index.task_id:max()
220+
local max
221+
222+
-- Taking the maximum of the index is an implicit transactions, so it is
223+
-- always done with 'read-confirmed' mvcc isolation level.
224+
-- It can lead to errors when trying to make parallel 'put' calls with mvcc enabled.
225+
-- It is hapenning because 'max' for several puts in parallel will be the same since
226+
-- read confirmed isolation level makes visible all transactions that finished the commit.
227+
-- To fix it we wrap it with box.begin/commit and set right isolation level.
228+
-- Current fix does not resolve that bug in situations when we already are in transaction
229+
-- since it will open nested transactions.
230+
-- See https://github.com/tarantool/queue/issues/207
231+
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
232+
233+
if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then
234+
box.begin({txn_isolation = 'read-committed'})
235+
max = self.space.index.task_id:max()
236+
box.commit()
237+
else
238+
max = self.space.index.task_id:max()
239+
end
240+
221241
local id = max and max[i_id] + 1 or 0
222242

223243
local status
@@ -280,7 +300,25 @@ function method.take(self)
280300
break
281301
elseif not is_expired(t) then
282302
local next_event = util.time() + t[i_ttr]
283-
local taken = self.space.index.utube:min{state.TAKEN, t[i_utube]}
303+
local taken
304+
-- Taking the minimum is an implicit transactions, so it is
305+
-- always done with 'read-confirmed' mvcc isolation level.
306+
-- It can lead to errors when trying to make parallel 'take' calls with mvcc enabled.
307+
-- It is hapenning because 'min' for several takes in parallel will be the same since
308+
-- read confirmed isolation level makes visible all transactions that finished the commit.
309+
-- To fix it we wrap it with box.begin/commit and set right isolation level.
310+
-- Current fix does not resolve that bug in situations when we already are in transaction
311+
-- since it will open nested transactions.
312+
-- See https://github.com/tarantool/queue/issues/207
313+
-- See https://www.tarantool.io/ru/doc/latest/concepts/atomic/txn_mode_mvcc/
314+
if box.cfg.memtx_use_mvcc_engine and (not box.is_in_txn()) then
315+
box.begin({txn_isolation = 'read-committed'})
316+
taken = self.space.index.utube:min{state.TAKEN, t[i_utube]}
317+
box.commit()
318+
else
319+
taken = self.space.index.utube:min{state.TAKEN, t[i_utube]}
320+
end
321+
284322
if taken == nil or taken[i_status] ~= state.TAKEN then
285323
t = self.space:update(t[1], {
286324
{ '=', i_status, state.TAKEN },

t/220-mvcc.t

+69
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
#!/usr/bin/env tarantool
2+
3+
local qc = require('queue.compat')
4+
local log = require('log')
5+
if not qc.check_version({2, 6, 1}) then
6+
log.info('Tests skipped, tarantool version < 2.6.1')
7+
return
8+
end
9+
10+
local fiber = require('fiber')
11+
12+
local test = require('tap').test()
13+
test:plan(6)
14+
15+
local queue = require('queue')
16+
17+
local tnt = require('t.tnt')
18+
tnt.cfg{memtx_use_mvcc_engine = true}
19+
20+
local engine = 'memtx'
21+
22+
test:ok(rawget(box, 'space'), 'box started')
23+
test:ok(queue, 'queue is loaded')
24+
25+
local tube = queue.create_tube('test', 'fifo', { engine = engine, temporary = false})
26+
test:ok(tube, 'test tube created')
27+
test:is(tube.name, 'test', 'tube.name')
28+
test:is(tube.type, 'fifo', 'tube.type')
29+
30+
--- That test checks that https://github.com/tarantool/queue/pull/211 is fixed.
31+
-- Previously trying to make parallel 'put' or 'take' calls failed with mvcc enabled.
32+
test:test('concurent put and take with mvcc', function(test)
33+
test:plan(6)
34+
-- channels are used to wait for fibers to finish
35+
-- and check results of the 'take'/'put'.
36+
local channel_put = fiber.channel(2)
37+
test:ok(channel_put, 'channel created')
38+
local channel_take = fiber.channel(2)
39+
test:ok(channel_take, 'channel created')
40+
41+
for i = 1, 2 do
42+
fiber.create(function(i)
43+
local err = pcall(tube.put, tube, i)
44+
channel_put:put(err)
45+
end, i)
46+
end
47+
48+
for i = 1, 2 do
49+
local res = channel_put:get(1)
50+
test:ok(res, 'task ' .. i .. ' was put')
51+
end
52+
53+
for i = 1, 2 do
54+
fiber.create(function(i)
55+
local err = pcall(tube.take, tube)
56+
channel_take:put(err)
57+
end, i)
58+
end
59+
60+
for i = 1, 2 do
61+
local res = channel_take:get()
62+
test:ok(res, 'task ' .. i .. ' was taken')
63+
end
64+
end)
65+
66+
tnt.finish()
67+
os.exit(test:check() and 0 or 1)
68+
69+
-- vim: set ft=lua:

0 commit comments

Comments
 (0)