-
Notifications
You must be signed in to change notification settings - Fork 161
/
Copy pathconcurrency_controls_test.rb
158 lines (125 loc) · 5.94 KB
/
concurrency_controls_test.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# frozen_string_literal: true
require "test_helper"
class ConcurrencyControlsTest < ActiveSupport::TestCase
self.use_transactional_tests = false
setup do
SolidQueue::Job.delete_all
@result = JobResult.create!(queue_name: "default", status: "seq: ")
default_worker = { queues: "default", polling_interval: 1, processes: 3, threads: 2 }
scheduler = { polling_interval: 1, batch_size: 200, concurrency_maintenance_interval: 1 }
@pid = run_supervisor_as_fork(mode: :all, load_configuration_from: { workers: [ default_worker ], scheduler: scheduler })
wait_for_registered_processes(5, timeout: 0.5.second) # 3 workers working the default queue + scheduler + supervisor
end
teardown do
terminate_process(@pid) if process_exists?(@pid)
end
test "run several conflicting jobs over the same record sequentially" do
("A".."F").each do |name|
SequentialUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds)
end
("G".."K").each do |name|
SequentialUpdateResultJob.perform_later(@result, name: name)
end
wait_for_jobs_to_finish_for(3.seconds)
assert_no_pending_jobs
assert_stored_sequence @result, ("A".."K").to_a
end
test "run several jobs over the same record limiting concurrency" do
incr = 0
# C is the last one to update the record
# A: 0 to 0.5
# B: 0 to 1.0
# C: 0 to 1.5
assert_no_difference -> { SolidQueue::BlockedExecution.count } do
("A".."C").each do |name|
ThrottledUpdateResultJob.perform_later(@result, name: name, pause: (0.5 + incr).seconds)
incr += 0.5
end
end
sleep(0.01) # To ensure these aren't picked up before ABC
# D to H: 0.51 to 0.76 (starting after A finishes, and in order, 5 * 0.05 = 0.25)
# These would finish all before B and C
assert_difference -> { SolidQueue::BlockedExecution.count }, +5 do
("D".."H").each do |name|
ThrottledUpdateResultJob.perform_later(@result, name: name, pause: 0.05.seconds)
end
end
wait_for_jobs_to_finish_for(3.seconds)
assert_no_pending_jobs
# C would have started in the beginning, seeing the status empty, and would finish after
# all other jobs, so it'll do the last update with only itself
assert_stored_sequence(@result, [ "C" ])
end
test "run several jobs over the same record sequentially, with some of them failing" do
("A".."F").each_with_index do |name, i|
# A, C, E will fail, for i= 0, 2, 4
SequentialUpdateResultJob.perform_later(@result, name: name, pause: 0.2.seconds, exception: (RuntimeError if i.even?))
end
("G".."K").each do |name|
SequentialUpdateResultJob.perform_later(@result, name: name)
end
wait_for_jobs_to_finish_for(3.seconds)
assert_equal 3, SolidQueue::FailedExecution.count
assert_stored_sequence @result, [ "B", "D", "F" ] + ("G".."K").to_a
end
test "rely on scheduler to unblock blocked executions with an available semaphore" do
# Simulate a scenario where we got an available semaphore and some stuck jobs
job = SequentialUpdateResultJob.perform_later(@result, name: "A")
wait_for_jobs_to_finish_for(3.seconds)
assert_no_pending_jobs
wait_while_with_timeout(1.second) { SolidQueue::Semaphore.where(value: 0).any? }
# Lock the semaphore so we can enqueue jobs and leave them blocked
skip_active_record_query_cache do
assert SolidQueue::Semaphore.wait(job)
end
# Now enqueue more jobs under that same key. They'll be all locked
assert_difference -> { SolidQueue::BlockedExecution.count }, +10 do
("B".."K").each do |name|
SequentialUpdateResultJob.perform_later(@result, name: name)
end
end
# Then unlock the semaphore and expire the jobs: this would be as if the first job had
# released the semaphore but hadn't unblocked any jobs
SolidQueue::BlockedExecution.update_all(expires_at: 15.minutes.ago)
assert SolidQueue::Semaphore.signal(job)
# And wait for the scheduler to release the jobs
wait_for_jobs_to_finish_for(3.seconds)
assert_no_pending_jobs
# We can't ensure the order between B and C, because it depends on which worker wins when
# unblocking, as one will try to unblock B and another C
assert_stored_sequence @result, ("A".."K").to_a, [ "A", "C", "B" ] + ("D".."K").to_a
end
test "rely on scheduler to unblock blocked executions with an expired semaphore" do
# Simulate a scenario where we got an available semaphore and some stuck jobs
job = SequentialUpdateResultJob.perform_later(@result, name: "A")
wait_for_jobs_to_finish_for(3.seconds)
assert_no_pending_jobs
wait_while_with_timeout(1.second) { SolidQueue::Semaphore.where(value: 0).any? }
# Lock the semaphore so we can enqueue jobs and leave them blocked
skip_active_record_query_cache do
assert SolidQueue::Semaphore.wait(job)
end
# Now enqueue more jobs under that same key. They'll be all locked
assert_difference -> { SolidQueue::BlockedExecution.count }, +10 do
("B".."K").each do |name|
SequentialUpdateResultJob.perform_later(@result, name: name)
end
end
# Simulate expiration of semaphore and executions
SolidQueue::Semaphore.where(key: job.concurrency_key).update_all(expires_at: 15.minutes.ago)
SolidQueue::BlockedExecution.update_all(expires_at: 15.minutes.ago)
# And wait for scheduler to release the jobs
wait_for_jobs_to_finish_for(3.seconds)
assert_no_pending_jobs
# We can't ensure the order between B and C, because it depends on which worker wins when
# unblocking, as one will try to unblock B and another C
assert_stored_sequence @result, ("A".."K").to_a, [ "A", "C", "B" ] + ("D".."K").to_a
end
private
def assert_stored_sequence(result, *sequences)
expected = sequences.map { |sequence| "seq: " + sequence.map { |name| "s#{name}c#{name}"}.join }
skip_active_record_query_cache do
assert_includes expected, result.reload.status
end
end
end