Skip to content

Commit 96ae18c

Browse files
committed
Make sure jobs claimed are those claimed for the given process ID
And not another process claiming the same jobs, since the INSERT query in the claimed_executions table would ignore any duplicates on job_id.
1 parent 1579e75 commit 96ae18c

File tree

3 files changed

+10
-3
lines changed

3 files changed

+10
-3
lines changed

Diff for: app/models/solid_queue/claimed_execution.rb

+1-1
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ def claiming(job_ids, process_id, &block)
1212
job_data = Array(job_ids).collect { |job_id| { job_id: job_id, process_id: process_id } }
1313

1414
insert_all(job_data)
15-
where(job_id: job_ids).load.tap do |claimed|
15+
where(job_id: job_ids, process_id: process_id).load.tap do |claimed|
1616
block.call(claimed)
1717
SolidQueue.logger.info("[SolidQueue] Claimed #{claimed.size} jobs")
1818
end
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
class ExtendClaimedExecutionsIndexOnProcessId < ActiveRecord::Migration[7.1]
2+
def change
3+
add_index :solid_queue_claimed_executions, [ :process_id, :job_id ]
4+
remove_index :solid_queue_claimed_executions, :process_id
5+
end
6+
end

Diff for: test/dummy/db/schema.rb

+3-2
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
#
1111
# It's strongly recommended that you check this file into your version control system.
1212

13-
ActiveRecord::Schema[7.1].define(version: 2023_11_15_211044) do
13+
ActiveRecord::Schema[7.1].define(version: 2023_11_29_215414) do
1414
create_table "job_results", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|
1515
t.string "queue_name"
1616
t.string "status"
@@ -26,6 +26,7 @@
2626
t.string "concurrency_key", null: false
2727
t.datetime "created_at", null: false
2828
t.datetime "expires_at", null: false
29+
t.index ["concurrency_key", "expires_at"], name: "index_solid_queue_blocked_executions_for_maintenance_2"
2930
t.index ["concurrency_key", "priority", "job_id"], name: "index_solid_queue_blocked_executions_for_release"
3031
t.index ["expires_at", "concurrency_key"], name: "index_solid_queue_blocked_executions_for_maintenance"
3132
t.index ["job_id"], name: "index_solid_queue_blocked_executions_on_job_id", unique: true
@@ -36,7 +37,7 @@
3637
t.bigint "process_id"
3738
t.datetime "created_at", null: false
3839
t.index ["job_id"], name: "index_solid_queue_claimed_executions_on_job_id", unique: true
39-
t.index ["process_id"], name: "index_solid_queue_claimed_executions_on_process_id"
40+
t.index ["process_id", "job_id"], name: "index_solid_queue_claimed_executions_on_process_id_and_job_id"
4041
end
4142

4243
create_table "solid_queue_failed_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t|

0 commit comments

Comments
 (0)