Skip to content

Release concurrency locks when supervisor fails the jobs handled by workers #547

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Apr 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions app/models/solid_queue/claimed_execution.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ def release_all
def fail_all_with(error)
SolidQueue.instrument(:fail_many_claimed) do |payload|
includes(:job).tap do |executions|
executions.each { |execution| execution.failed_with(error) }
executions.each do |execution|
execution.failed_with(error)
execution.unblock_next_job
end

payload[:process_ids] = executions.map(&:process_id).uniq
payload[:job_ids] = executions.map(&:job_id).uniq
Expand Down Expand Up @@ -67,7 +70,7 @@ def perform
raise result.error
end
ensure
job.unblock_next_blocked_job
unblock_next_job
end

def release
Expand All @@ -90,6 +93,10 @@ def failed_with(error)
end
end

def unblock_next_job
job.unblock_next_blocked_job
end

private
def execute
ActiveJob::Base.execute(job.arguments.merge("provider_job_id" => job.id))
Expand Down
30 changes: 30 additions & 0 deletions test/unit/supervisor_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,36 @@ class SupervisorTest < ActiveSupport::TestCase
end
end

test "fail orphaned executions by releasing their concurrency locks" do
result = JobResult.create!(queue_name: "default", status: "seq: ")
4.times { |i| ThrottledUpdateResultJob.set(queue: :new_queue).perform_later(result) }
process = SolidQueue::Process.register(kind: "Worker", pid: 42, name: "worker-123")

SolidQueue::ReadyExecution.claim("*", 5, process.id)

assert_equal 3, SolidQueue::ClaimedExecution.count
assert_equal 0, SolidQueue::ReadyExecution.count
assert_equal 1, SolidQueue::BlockedExecution.count

assert_equal [ process.id ], SolidQueue::ClaimedExecution.last(3).pluck(:process_id).uniq

# Simnulate orphaned executions by just wiping the claiming process
process.delete

pid = run_supervisor_as_fork(workers: [ { queues: "background", polling_interval: 10, processes: 2 } ])
wait_for_registered_processes(3)
assert_registered_supervisor(pid)

terminate_process(pid)

skip_active_record_query_cache do
assert_equal 0, SolidQueue::ClaimedExecution.count
assert_equal 3, SolidQueue::FailedExecution.count
assert_equal 0, SolidQueue::BlockedExecution.count
assert_equal 1, SolidQueue::ReadyExecution.count
end
end

test "prune processes with expired heartbeats" do
pruned = SolidQueue::Process.register(kind: "Worker", pid: 42, name: "worker-42")

Expand Down
Loading