Skip to content

Commit 8f3fd5b

Browse files
committed
Correct transactions to release blocked executions
When a job finishes or fails, we need to: - Increment the semaphore if any, in its own transaction. This needs to happen regardless of whether other blocked executions are unblocked by this semaphore change. - Try to unblock the next blocked execution. For this, we need to acquire the semaphore again, as it's possible another job part of the same concurrency group is just being enqueued at the same time. Then, we need to move the blocked execution to "ready", and then delete the blocked execution. This all needs to happen in the same transaction, without going throuhg the job. The previous implementation could very well update the existing blocked execution and then delete it, leaving the job in limbo.
1 parent daa0deb commit 8f3fd5b

File tree

4 files changed

+36
-18
lines changed

4 files changed

+36
-18
lines changed

app/models/solid_queue/blocked_execution.rb

+13-2
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,20 @@ def release(concurrency_key)
1515

1616
def release
1717
transaction do
18-
job.prepare_for_execution
19-
destroy!
18+
if acquire_concurrency_lock
19+
promote_to_ready
20+
destroy!
21+
end
2022
end
2123
end
24+
25+
private
26+
def acquire_concurrency_lock
27+
Semaphore.wait_for(concurrency_key, concurrency_limit)
28+
end
29+
30+
def promote_to_ready
31+
ReadyExecution.create_or_find_by!(job_id: job_id)
32+
end
2233
end
2334
end

app/models/solid_queue/claimed_execution.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def perform
3434
failed_with(result.error)
3535
end
3636
ensure
37-
job.dispatch_blocked_jobs
37+
job.unblock_blocked_jobs
3838
end
3939

4040
def release

app/models/solid_queue/job/concurrency_controls.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ module ConcurrencyControls
77
has_one :blocked_execution, dependent: :destroy
88
end
99

10-
def dispatch_blocked_jobs
10+
def unblock_blocked_jobs
1111
if release_concurrency_lock
1212
release_next_blocked_job
1313
end
@@ -23,7 +23,7 @@ def acquire_concurrency_lock
2323
def release_concurrency_lock
2424
return false unless concurrency_limited?
2525

26-
Semaphore.release(concurrency_key)
26+
Semaphore.release(concurrency_key, concurrency_limit)
2727
end
2828

2929
def block

app/models/solid_queue/semaphore.rb

+20-13
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,33 @@
11
class SolidQueue::Semaphore < SolidQueue::Record
2+
scope :available, -> { where("value > 0") }
3+
24
class << self
35
def wait_for(identifier, limit)
46
if semaphore = find_by(identifier: identifier)
5-
semaphore.value > 0 && attempt_to_update(identifier)
7+
semaphore.value > 0 && attempt_decrement(identifier)
68
else
7-
attempt_to_create(identifier, limit)
9+
attempt_creation(identifier, limit)
810
end
911
end
1012

11-
def attempt_to_create(identifier, limit)
12-
create!(identifier: identifier, value: limit - 1)
13-
true
14-
rescue ActiveRecord::RecordNotUnique
15-
attempt_to_update(identifier)
13+
def release(identifier, concurrency_limit)
14+
attempt_increment(identifier, concurrency_limit)
1615
end
1716

18-
def attempt_to_update(identifier)
19-
where(identifier: identifier).where("value > 0").update_all("value = COALESCE(value, 1) - 1") > 0
20-
end
17+
private
18+
def attempt_creation(identifier, limit)
19+
create!(identifier: identifier, value: limit - 1)
20+
true
21+
rescue ActiveRecord::RecordNotUnique
22+
attempt_decrement(identifier)
23+
end
2124

22-
def release(identifier)
23-
where(identifier: identifier).update_all("value = COALESCE(value, 0) + 1") > 0
24-
end
25+
def attempt_decrement(identifier)
26+
available.where(identifier: identifier).update_all("value = value - 1") > 0
27+
end
28+
29+
def attempt_increment(identifier, limit)
30+
where("value < ?", limit).where(identifier: identifier).update_all("value = value + 1") > 0
31+
end
2532
end
2633
end

0 commit comments

Comments
 (0)