diff --git a/app/models/solid_queue/claimed_execution.rb b/app/models/solid_queue/claimed_execution.rb index 94ee3593..8840505b 100644 --- a/app/models/solid_queue/claimed_execution.rb +++ b/app/models/solid_queue/claimed_execution.rb @@ -39,7 +39,10 @@ def release_all def fail_all_with(error) SolidQueue.instrument(:fail_many_claimed) do |payload| includes(:job).tap do |executions| - executions.each { |execution| execution.failed_with(error) } + executions.each do |execution| + execution.failed_with(error) + execution.unblock_next_job + end payload[:process_ids] = executions.map(&:process_id).uniq payload[:job_ids] = executions.map(&:job_id).uniq @@ -67,7 +70,7 @@ def perform raise result.error end ensure - job.unblock_next_blocked_job + unblock_next_job end def release @@ -90,6 +93,10 @@ def failed_with(error) end end + def unblock_next_job + job.unblock_next_blocked_job + end + private def execute ActiveJob::Base.execute(job.arguments.merge("provider_job_id" => job.id)) diff --git a/test/unit/supervisor_test.rb b/test/unit/supervisor_test.rb index c430544a..108ebb6f 100644 --- a/test/unit/supervisor_test.rb +++ b/test/unit/supervisor_test.rb @@ -133,6 +133,36 @@ class SupervisorTest < ActiveSupport::TestCase end end + test "fail orphaned executions by releasing their concurrency locks" do + result = JobResult.create!(queue_name: "default", status: "seq: ") + 4.times { |i| ThrottledUpdateResultJob.set(queue: :new_queue).perform_later(result) } + process = SolidQueue::Process.register(kind: "Worker", pid: 42, name: "worker-123") + + SolidQueue::ReadyExecution.claim("*", 5, process.id) + + assert_equal 3, SolidQueue::ClaimedExecution.count + assert_equal 0, SolidQueue::ReadyExecution.count + assert_equal 1, SolidQueue::BlockedExecution.count + + assert_equal [ process.id ], SolidQueue::ClaimedExecution.last(3).pluck(:process_id).uniq + + # Simnulate orphaned executions by just wiping the claiming process + process.delete + + pid = run_supervisor_as_fork(workers: [ { queues: "background", polling_interval: 10, processes: 2 } ]) + wait_for_registered_processes(3) + assert_registered_supervisor(pid) + + terminate_process(pid) + + skip_active_record_query_cache do + assert_equal 0, SolidQueue::ClaimedExecution.count + assert_equal 3, SolidQueue::FailedExecution.count + assert_equal 0, SolidQueue::BlockedExecution.count + assert_equal 1, SolidQueue::ReadyExecution.count + end + end + test "prune processes with expired heartbeats" do pruned = SolidQueue::Process.register(kind: "Worker", pid: 42, name: "worker-42")