@@ -17,13 +17,31 @@ local ttl = 0.1
17
17
18
18
test :ok (queue , ' queue is loaded' )
19
19
20
+ local function test_take_after_ttl (test , tube , ttl )
21
+ local attempts , max_attempts = 0 , 2
22
+ local before = fiber .time64 ()
23
+ local after = before
24
+
25
+ while after - before < ttl * 1000000 and attempts < max_attempts do
26
+ attempts = attempts + 1
27
+ fiber .sleep (ttl )
28
+ after = fiber .time64 ()
29
+ end
30
+
31
+ if after - before < ttl * 1000000 then
32
+ test :fail (' failed to sleep ttl, is system clock changed?' )
33
+ else
34
+ test :isnil (tube :take (.1 ), ' no task is taken' )
35
+ end
36
+ end
37
+
20
38
test :test (' one message per queue ffttl' , function (test )
21
39
test :plan (20 )
22
40
local tube = queue .create_tube (' ompq_ffttl' , ' fifottl' , { engine = engine })
23
41
for i = 1 , 20 do
24
42
tube :put (' ompq_' .. i , {ttl = ttl })
25
- fiber . sleep ( ttl )
26
- test : is ( # { tube : take ( .1 )}, 0 , ' no task is taken ' )
43
+
44
+ test_take_after_ttl ( test , tube , ttl )
27
45
end
28
46
end )
29
47
@@ -32,8 +50,8 @@ test:test('one message per queue utttl', function (test)
32
50
local tube = queue .create_tube (' ompq_utttl' , ' utubettl' , { engine = engine })
33
51
for i = 1 , 20 do
34
52
tube :put (' ompq_' .. i , {ttl = ttl })
35
- fiber . sleep ( ttl )
36
- test : is ( # { tube : take ( .1 )}, 0 , ' no task is taken ' )
53
+
54
+ test_take_after_ttl ( test , tube , ttl )
37
55
end
38
56
end )
39
57
@@ -42,8 +60,8 @@ test:test('many messages, one queue ffttl', function (test)
42
60
for i = 1 , 20 do
43
61
local tube = queue .create_tube (' mmpq_ffttl_' .. i , ' fifottl' , { engine = engine })
44
62
tube :put (' mmpq_' .. i , {ttl = ttl })
45
- fiber . sleep ( ttl )
46
- test : is ( # { tube : take ( .1 )}, 0 , ' no task is taken ' )
63
+
64
+ test_take_after_ttl ( test , tube , ttl )
47
65
end
48
66
end )
49
67
@@ -52,8 +70,8 @@ test:test('many messages, one queue utttl', function (test)
52
70
for i = 1 , 20 do
53
71
local tube = queue .create_tube (' mmpq_utttl_' .. i , ' utubettl' , { engine = engine })
54
72
tube :put (' mmpq_' .. i , {ttl = ttl })
55
- fiber . sleep ( ttl )
56
- test : is ( # { tube : take ( .1 )}, 0 , ' no task is taken ' )
73
+
74
+ test_take_after_ttl ( test , tube , ttl )
57
75
end
58
76
end )
59
77
0 commit comments