Skip to content

Commit daa0deb

Browse files
committed
Avoid race-conditions when releasing blocked executions
If two processes try to release blocked executions for the same key, for example, if the concurrency limit is 2, and 2 jobs finish at the same time, both would try to release the first one, and one of them would fail to do that. To avoid that, select the first one but lock the record, and have the SELECT ... FOR UPDATE to use SKIP LOCKED so we don't have to wait on the lock in other processes.
1 parent 9d57925 commit daa0deb

File tree

3 files changed

+20
-12
lines changed

3 files changed

+20
-12
lines changed
+18-9
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,23 @@
1-
class SolidQueue::BlockedExecution < SolidQueue::Execution
2-
assume_attributes_from_job :concurrency_limit, :concurrency_key
1+
module SolidQueue
2+
class BlockedExecution < SolidQueue::Execution
3+
assume_attributes_from_job :concurrency_limit, :concurrency_key
34

4-
def self.release(concurrency_key)
5-
where(concurrency_key: concurrency_key).order(:priority).first&.release
6-
end
5+
has_one :semaphore, foreign_key: :identifier, primary_key: :concurrency_key
6+
7+
scope :releasable, -> { joins(:semaphore).merge(Semaphore.available) }
8+
scope :ordered, -> { order(priority: :asc) }
9+
10+
class << self
11+
def release(concurrency_key)
12+
ordered.where(concurrency_key: concurrency_key).limit(1).lock("FOR UPDATE SKIP LOCKED").each(&:release)
13+
end
14+
end
715

8-
def release
9-
transaction do
10-
job.prepare_for_execution
11-
destroy!
16+
def release
17+
transaction do
18+
job.prepare_for_execution
19+
destroy!
20+
end
1221
end
1322
end
1423
end

app/models/solid_queue/job/concurrency_controls.rb

-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ def release_next_blocked_job
3737
def concurrency_limited?
3838
concurrency_limit.to_i > 0 && concurrency_key.present?
3939
end
40-
4140
end
4241
end
4342
end

test/integration/concurrency_controls_test.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
2828
SequentialUpdateResultJob.perform_later(@result, name: name)
2929
end
3030

31-
wait_for_jobs_to_finish_for(4.seconds)
31+
wait_for_jobs_to_finish_for(3.seconds)
3232
assert_no_pending_jobs
3333

3434
assert_stored_sequence @result, ("A".."K").to_a
@@ -74,7 +74,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
7474
SequentialUpdateResultJob.perform_later(@result, name: name)
7575
end
7676

77-
wait_for_jobs_to_finish_for(4.seconds)
77+
wait_for_jobs_to_finish_for(3.seconds)
7878
assert_equal 3, SolidQueue::FailedExecution.count
7979

8080
assert_stored_sequence @result, [ "B", "D", "F" ] + ("G".."K").to_a

0 commit comments

Comments
 (0)