Skip to content

Commit 95dc9b4

Browse files
authored
Release concurrency locks when supervisor fails the jobs handled by workers (#547)
* Add test * Unblock blocked job after failing claimed execution only in method called by supervisor
1 parent 9161da0 commit 95dc9b4

File tree

2 files changed

+39
-2
lines changed

2 files changed

+39
-2
lines changed

app/models/solid_queue/claimed_execution.rb

+9-2
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,10 @@ def release_all
3939
def fail_all_with(error)
4040
SolidQueue.instrument(:fail_many_claimed) do |payload|
4141
includes(:job).tap do |executions|
42-
executions.each { |execution| execution.failed_with(error) }
42+
executions.each do |execution|
43+
execution.failed_with(error)
44+
execution.unblock_next_job
45+
end
4346

4447
payload[:process_ids] = executions.map(&:process_id).uniq
4548
payload[:job_ids] = executions.map(&:job_id).uniq
@@ -67,7 +70,7 @@ def perform
6770
raise result.error
6871
end
6972
ensure
70-
job.unblock_next_blocked_job
73+
unblock_next_job
7174
end
7275

7376
def release
@@ -90,6 +93,10 @@ def failed_with(error)
9093
end
9194
end
9295

96+
def unblock_next_job
97+
job.unblock_next_blocked_job
98+
end
99+
93100
private
94101
def execute
95102
ActiveJob::Base.execute(job.arguments.merge("provider_job_id" => job.id))

test/unit/supervisor_test.rb

+30
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,36 @@ class SupervisorTest < ActiveSupport::TestCase
133133
end
134134
end
135135

136+
test "fail orphaned executions by releasing their concurrency locks" do
137+
result = JobResult.create!(queue_name: "default", status: "seq: ")
138+
4.times { |i| ThrottledUpdateResultJob.set(queue: :new_queue).perform_later(result) }
139+
process = SolidQueue::Process.register(kind: "Worker", pid: 42, name: "worker-123")
140+
141+
SolidQueue::ReadyExecution.claim("*", 5, process.id)
142+
143+
assert_equal 3, SolidQueue::ClaimedExecution.count
144+
assert_equal 0, SolidQueue::ReadyExecution.count
145+
assert_equal 1, SolidQueue::BlockedExecution.count
146+
147+
assert_equal [ process.id ], SolidQueue::ClaimedExecution.last(3).pluck(:process_id).uniq
148+
149+
# Simnulate orphaned executions by just wiping the claiming process
150+
process.delete
151+
152+
pid = run_supervisor_as_fork(workers: [ { queues: "background", polling_interval: 10, processes: 2 } ])
153+
wait_for_registered_processes(3)
154+
assert_registered_supervisor(pid)
155+
156+
terminate_process(pid)
157+
158+
skip_active_record_query_cache do
159+
assert_equal 0, SolidQueue::ClaimedExecution.count
160+
assert_equal 3, SolidQueue::FailedExecution.count
161+
assert_equal 0, SolidQueue::BlockedExecution.count
162+
assert_equal 1, SolidQueue::ReadyExecution.count
163+
end
164+
end
165+
136166
test "prune processes with expired heartbeats" do
137167
pruned = SolidQueue::Process.register(kind: "Worker", pid: 42, name: "worker-42")
138168

0 commit comments

Comments
 (0)