Skip to content

Commit 33b0f62

Browse files
committed
Unblock releasable blocked executions before polling
That's it, blocked executions with an available semaphore. Try to unblock at most one per concurrency key, to unblock the whole group.
1 parent 8f3fd5b commit 33b0f62

File tree

8 files changed

+65
-11
lines changed

8 files changed

+65
-11
lines changed

app/models/solid_queue/blocked_execution.rb

+13-1
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,17 @@ class BlockedExecution < SolidQueue::Execution
88
scope :ordered, -> { order(priority: :asc) }
99

1010
class << self
11-
def release(concurrency_key)
11+
def unblock(count)
12+
release_many releasable.select(:concurrency_key).distinct.limit(count).pluck(:concurrency_key)
13+
end
14+
15+
def release_many(concurrency_keys)
16+
# We want to release exactly one blocked execution for each concurrency key, and we need to do it
17+
# one by one, locking each record and acquiring the semaphore individually for each of them:
18+
Array(concurrency_keys).each { |concurrency_key| release_one(concurrency_key) }
19+
end
20+
21+
def release_one(concurrency_key)
1222
ordered.where(concurrency_key: concurrency_key).limit(1).lock("FOR UPDATE SKIP LOCKED").each(&:release)
1323
end
1424
end
@@ -18,6 +28,8 @@ def release
1828
if acquire_concurrency_lock
1929
promote_to_ready
2030
destroy!
31+
32+
SolidQueue.logger.info("[SolidQueue] Unblocked job #{job.id} under #{concurrency_key}")
2133
end
2234
end
2335
end

app/models/solid_queue/execution.rb

+13-5
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,17 @@
1-
class SolidQueue::Execution < SolidQueue::Record
2-
include JobAttributes
1+
module SolidQueue
2+
class Execution < SolidQueue::Record
3+
include JobAttributes
34

4-
self.abstract_class = true
5+
self.abstract_class = true
56

6-
belongs_to :job
7+
belongs_to :job
78

8-
alias_method :discard, :destroy
9+
alias_method :discard, :destroy
10+
11+
class << self
12+
def queued_as(queues)
13+
QueueParser.new(queues, self).scoped_relation
14+
end
15+
end
16+
end
917
end

app/models/solid_queue/job/concurrency_controls.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ def block
3131
end
3232

3333
def release_next_blocked_job
34-
BlockedExecution.release(concurrency_key)
34+
BlockedExecution.release_one(concurrency_key)
3535
end
3636

3737
def concurrency_limited?

app/models/solid_queue/ready_execution.rb

-4
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,6 @@ def claim(queues, limit, process_id)
1313
end
1414
end
1515

16-
def queued_as(queues)
17-
QueueParser.new(queues, self).scoped_relation
18-
end
19-
2016
private
2117
def select_candidates(queues, limit)
2218
queued_as(queues).not_paused.ordered.limit(limit).lock("FOR UPDATE SKIP LOCKED")

lib/solid_queue/worker.rb

+9
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,15 @@ def initialize(**options)
1616

1717
private
1818
def run
19+
unblock_executions
20+
poll_and_dispatch_executions
21+
end
22+
23+
def unblock_executions
24+
SolidQueue::BlockedExecution.queued_as(queues).unblock(pool.size)
25+
end
26+
27+
def poll_and_dispatch_executions
1928
claimed_executions = SolidQueue::ReadyExecution.claim(queues, pool.idle_threads, process.id)
2029

2130
if claimed_executions.size > 0

test/fixtures/solid_queue/blocked_executions.yml

Whitespace-only changes.

test/fixtures/solid_queue/semaphores.yml

Whitespace-only changes.

test/integration/concurrency_controls_test.rb

+29
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,35 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase
8080
assert_stored_sequence @result, [ "B", "D", "F" ] + ("G".."K").to_a
8181
end
8282

83+
test "rely on worker to unblock blocked executions" do
84+
# Simulate a scenario where we got an available semaphore and some stuck jobs
85+
job = SequentialUpdateResultJob.perform_later(@result, name: "A")
86+
wait_for_jobs_to_finish_for(2.seconds)
87+
assert_no_pending_jobs
88+
89+
# Lock the semaphore so we can enqueue jobs and leave them blocked
90+
skip_active_record_query_cache do
91+
assert SolidQueue::Semaphore.wait_for(job.concurrency_key, job.concurrency_limit)
92+
end
93+
94+
# Now enqueue more jobs under that same key. They'll be all locked. Use priorities
95+
# to ensure order.
96+
assert_difference -> { SolidQueue::BlockedExecution.count }, +10 do
97+
("B".."K").each_with_index do |name, i|
98+
SequentialUpdateResultJob.set(priority: i).perform_later(@result, name: name)
99+
end
100+
end
101+
102+
# Then unlock the semaphore: this would be as if the first job had released
103+
# the semaphore but hadn't unblocked any jobs
104+
assert SolidQueue::Semaphore.release(job.concurrency_key, job.concurrency_limit)
105+
106+
wait_for_jobs_to_finish_for(2.seconds)
107+
assert_no_pending_jobs
108+
109+
assert_stored_sequence @result, ("A".."K").to_a
110+
end
111+
83112
private
84113
def assert_stored_sequence(result, sequence)
85114
expected = "seq: " + sequence.map { |name| "s#{name}c#{name}"}.join

0 commit comments

Comments
 (0)