From 96ae18c3e8a69edcfb6af8a09a7e4dbffcddf889 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Wed, 29 Nov 2023 23:03:10 +0100 Subject: [PATCH 1/3] Make sure jobs claimed are those claimed for the given process ID And not another process claiming the same jobs, since the INSERT query in the claimed_executions table would ignore any duplicates on job_id. --- app/models/solid_queue/claimed_execution.rb | 2 +- ...9215414_extend_claimed_executions_index_on_process_id.rb | 6 ++++++ test/dummy/db/schema.rb | 5 +++-- 3 files changed, 10 insertions(+), 3 deletions(-) create mode 100644 db/migrate/20231129215414_extend_claimed_executions_index_on_process_id.rb diff --git a/app/models/solid_queue/claimed_execution.rb b/app/models/solid_queue/claimed_execution.rb index 369ed0ee..cfdf0362 100644 --- a/app/models/solid_queue/claimed_execution.rb +++ b/app/models/solid_queue/claimed_execution.rb @@ -12,7 +12,7 @@ def claiming(job_ids, process_id, &block) job_data = Array(job_ids).collect { |job_id| { job_id: job_id, process_id: process_id } } insert_all(job_data) - where(job_id: job_ids).load.tap do |claimed| + where(job_id: job_ids, process_id: process_id).load.tap do |claimed| block.call(claimed) SolidQueue.logger.info("[SolidQueue] Claimed #{claimed.size} jobs") end diff --git a/db/migrate/20231129215414_extend_claimed_executions_index_on_process_id.rb b/db/migrate/20231129215414_extend_claimed_executions_index_on_process_id.rb new file mode 100644 index 00000000..94815a48 --- /dev/null +++ b/db/migrate/20231129215414_extend_claimed_executions_index_on_process_id.rb @@ -0,0 +1,6 @@ +class ExtendClaimedExecutionsIndexOnProcessId < ActiveRecord::Migration[7.1] + def change + add_index :solid_queue_claimed_executions, [ :process_id, :job_id ] + remove_index :solid_queue_claimed_executions, :process_id + end +end diff --git a/test/dummy/db/schema.rb b/test/dummy/db/schema.rb index 1ef94a31..84fc0451 100644 --- a/test/dummy/db/schema.rb +++ b/test/dummy/db/schema.rb @@ -10,7 +10,7 @@ # # It's strongly recommended that you check this file into your version control system. -ActiveRecord::Schema[7.1].define(version: 2023_11_15_211044) do +ActiveRecord::Schema[7.1].define(version: 2023_11_29_215414) do create_table "job_results", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| t.string "queue_name" t.string "status" @@ -26,6 +26,7 @@ t.string "concurrency_key", null: false t.datetime "created_at", null: false t.datetime "expires_at", null: false + t.index ["concurrency_key", "expires_at"], name: "index_solid_queue_blocked_executions_for_maintenance_2" t.index ["concurrency_key", "priority", "job_id"], name: "index_solid_queue_blocked_executions_for_release" t.index ["expires_at", "concurrency_key"], name: "index_solid_queue_blocked_executions_for_maintenance" t.index ["job_id"], name: "index_solid_queue_blocked_executions_on_job_id", unique: true @@ -36,7 +37,7 @@ t.bigint "process_id" t.datetime "created_at", null: false t.index ["job_id"], name: "index_solid_queue_claimed_executions_on_job_id", unique: true - t.index ["process_id"], name: "index_solid_queue_claimed_executions_on_process_id" + t.index ["process_id", "job_id"], name: "index_solid_queue_claimed_executions_on_process_id_and_job_id" end create_table "solid_queue_failed_executions", charset: "utf8mb4", collation: "utf8mb4_0900_ai_ci", force: :cascade do |t| From 7ffbb4009a8e853a5f899067e9e6d6aeb5c7eba3 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Wed, 29 Nov 2023 23:27:16 +0100 Subject: [PATCH 2/3] Use clearer names for variable and methods when locking polled candidates The name `lock` alone was confusing because it's the same as ActiveRecord's lock method, and `candidates` was a bit ambiguous, easy to say explicitly that the candidates are job IDs. --- app/models/solid_queue/ready_execution.rb | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/app/models/solid_queue/ready_execution.rb b/app/models/solid_queue/ready_execution.rb index 6928781b..20fa3e04 100644 --- a/app/models/solid_queue/ready_execution.rb +++ b/app/models/solid_queue/ready_execution.rb @@ -18,8 +18,8 @@ def select_and_lock(queue_relation, process_id, limit) return [] if limit <= 0 transaction do - candidates = select_candidates(queue_relation, limit) - lock(candidates, process_id) + job_ids = select_candidates(queue_relation, limit) + lock_candidates(job_ids, process_id) end end @@ -27,9 +27,10 @@ def select_candidates(queue_relation, limit) queue_relation.ordered.limit(limit).lock("FOR UPDATE SKIP LOCKED").pluck(:job_id) end - def lock(candidates, process_id) - return [] if candidates.none? - SolidQueue::ClaimedExecution.claiming(candidates, process_id) do |claimed| + def lock_candidates(job_ids, process_id) + return [] if job_ids.none? + + SolidQueue::ClaimedExecution.claiming(job_ids, process_id) do |claimed| where(job_id: claimed.pluck(:job_id)).delete_all end end From ab960e71fc455b510e8757e789e151b68e772eb1 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Thu, 30 Nov 2023 15:47:00 +0100 Subject: [PATCH 3/3] Fail when trying to insert ready executions for the same job --- app/models/solid_queue/claimed_execution.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/models/solid_queue/claimed_execution.rb b/app/models/solid_queue/claimed_execution.rb index cfdf0362..9d4af709 100644 --- a/app/models/solid_queue/claimed_execution.rb +++ b/app/models/solid_queue/claimed_execution.rb @@ -11,7 +11,7 @@ class << self def claiming(job_ids, process_id, &block) job_data = Array(job_ids).collect { |job_id| { job_id: job_id, process_id: process_id } } - insert_all(job_data) + insert_all!(job_data) where(job_id: job_ids, process_id: process_id).load.tap do |claimed| block.call(claimed) SolidQueue.logger.info("[SolidQueue] Claimed #{claimed.size} jobs")