Skip to content

Commit 0f4d809

Browse files
committed
Consider blocked executions without a semaphore as releasable
We'll have to clear semaphores eventually because they might get stuck, and the simplest way is to delete them. Then, we might end up with blocked executions without semaphore, so we also need to unblock these.
1 parent 33b0f62 commit 0f4d809

File tree

3 files changed

+42
-8
lines changed

3 files changed

+42
-8
lines changed

app/models/solid_queue/blocked_execution.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ class BlockedExecution < SolidQueue::Execution
44

55
has_one :semaphore, foreign_key: :identifier, primary_key: :concurrency_key
66

7-
scope :releasable, -> { joins(:semaphore).merge(Semaphore.available) }
7+
scope :releasable, -> { left_outer_joins(:semaphore).merge(Semaphore.available.or(Semaphore.where(id: nil))) }
88
scope :ordered, -> { order(priority: :asc) }
99

1010
class << self

app/models/solid_queue/semaphore.rb

+1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
class SolidQueue::Semaphore < SolidQueue::Record
22
scope :available, -> { where("value > 0") }
3+
scope :locked, -> { where(value: 0) }
34

45
class << self
56
def wait_for(identifier, limit)

test/integration/concurrency_controls_test.rb

+40-7
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
8080
assert_stored_sequence @result, [ "B", "D", "F" ] + ("G".."K").to_a
8181
end
8282

83-
test "rely on worker to unblock blocked executions" do
83+
test "rely on worker to unblock blocked executions with an available semaphore" do
8484
# Simulate a scenario where we got an available semaphore and some stuck jobs
8585
job = SequentialUpdateResultJob.perform_later(@result, name: "A")
8686
wait_for_jobs_to_finish_for(2.seconds)
@@ -94,26 +94,59 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
9494
# Now enqueue more jobs under that same key. They'll be all locked. Use priorities
9595
# to ensure order.
9696
assert_difference -> { SolidQueue::BlockedExecution.count }, +10 do
97-
("B".."K").each_with_index do |name, i|
98-
SequentialUpdateResultJob.set(priority: i).perform_later(@result, name: name)
97+
("B".."K").each do |name|
98+
SequentialUpdateResultJob.perform_later(@result, name: name)
9999
end
100100
end
101101

102102
# Then unlock the semaphore: this would be as if the first job had released
103103
# the semaphore but hadn't unblocked any jobs
104104
assert SolidQueue::Semaphore.release(job.concurrency_key, job.concurrency_limit)
105105

106+
# And wait for workers to release the jobs
106107
wait_for_jobs_to_finish_for(2.seconds)
107108
assert_no_pending_jobs
108109

109-
assert_stored_sequence @result, ("A".."K").to_a
110+
# We can't ensure the order between B and C, because it depends on which worker wins when
111+
# unblocking, as one will try to unblock B and another C
112+
assert_stored_sequence @result, ("A".."K").to_a, [ "A", "C", "B" ] + ("D".."K").to_a
113+
end
114+
115+
test "rely on worker to unblock blocked executions with a missing semaphore" do
116+
# Simulate a scenario where we got an available semaphore and some stuck jobs
117+
job = SequentialUpdateResultJob.perform_later(@result, name: "A")
118+
wait_for_jobs_to_finish_for(2.seconds)
119+
assert_no_pending_jobs
120+
121+
# Lock the semaphore so we can enqueue jobs and leave them blocked
122+
skip_active_record_query_cache do
123+
assert SolidQueue::Semaphore.wait_for(job.concurrency_key, job.concurrency_limit)
124+
end
125+
126+
# Now enqueue more jobs under that same key. They'll be all locked
127+
assert_difference -> { SolidQueue::BlockedExecution.count }, +10 do
128+
("B".."K").each do |name|
129+
SequentialUpdateResultJob.perform_later(@result, name: name)
130+
end
131+
end
132+
133+
# Then delete the semaphore, as if we had cleared it
134+
SolidQueue::Semaphore.find_by(identifier: job.concurrency_key).destroy!
135+
136+
# And wait for workers to release the jobs
137+
wait_for_jobs_to_finish_for(2.seconds)
138+
assert_no_pending_jobs
139+
140+
# We can't ensure the order between B and C, because it depends on which worker wins when
141+
# unblocking, as one will try to unblock B and another C
142+
assert_stored_sequence @result, ("A".."K").to_a, [ "A", "C", "B" ] + ("D".."K").to_a
110143
end
111144

112145
private
113-
def assert_stored_sequence(result, sequence)
114-
expected = "seq: " + sequence.map { |name| "s#{name}c#{name}"}.join
146+
def assert_stored_sequence(result, *sequences)
147+
expected = sequences.map { |sequence| "seq: " + sequence.map { |name| "s#{name}c#{name}"}.join }
115148
skip_active_record_query_cache do
116-
assert_equal expected, result.reload.status
149+
assert_includes expected, result.reload.status
117150
end
118151
end
119152
end

0 commit comments

Comments
 (0)