Skip to content

Commit b7d65d8

Browse files
committed
Prevent concurrent schedulers to lock each other and ensure all jobs are scheduled
This change helps multiple schedulers running at the same time not waiting on each others' locks when deleting scheduled executions. Besides, it improves the selection of jobs to schedule and their deletion, to make sure only executions that have been in fact scheduled are deleted. The previous implementation assumed all selected jobs would be correctly promoted into ready executions, but if this was not the case, they'd simply be deleted and lost. Now we only delete the ones that have been, within the same transaction.
1 parent 1579e75 commit b7d65d8

File tree

3 files changed

+41
-18
lines changed

3 files changed

+41
-18
lines changed

Diff for: app/models/solid_queue/scheduled_execution.rb

+18-11
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,28 @@ class SolidQueue::ScheduledExecution < SolidQueue::Execution
66
assume_attributes_from_job :scheduled_at
77

88
class << self
9-
def prepare_batch(batch)
10-
prepared_at = Time.current
11-
12-
rows = batch.map do |scheduled_execution|
13-
scheduled_execution.ready_attributes.merge(created_at: prepared_at)
9+
def prepare_next_batch(batch_size)
10+
transaction do
11+
prepared_job_ids = prepare_batch next_batch(batch_size).lock("FOR UPDATE SKIP LOCKED").tap(&:load)
12+
prepared_job_ids.present?
1413
end
14+
end
1515

16-
if rows.any?
17-
transaction do
16+
private
17+
def prepare_batch(batch)
18+
prepared_at = Time.current
19+
20+
rows = batch.map do |scheduled_execution|
21+
scheduled_execution.ready_attributes.merge(created_at: prepared_at)
22+
end
23+
24+
if rows.empty? then []
25+
else
1826
SolidQueue::ReadyExecution.insert_all(rows)
19-
where(id: batch.map(&:id)).delete_all
27+
SolidQueue::ReadyExecution.where(job_id: batch.map(&:job_id)).pluck(:job_id).tap do |enqueued_job_ids|
28+
where(job_id: enqueued_job_ids).delete_all
29+
end
2030
end
2131
end
22-
23-
SolidQueue.logger.info("[SolidQueue] Prepared scheduled batch with #{rows.size} jobs at #{prepared_at}")
24-
end
2532
end
2633
end

Diff for: lib/solid_queue/scheduler.rb

+7-7
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,19 @@ def initialize(**options)
2020
private
2121
def run
2222
with_polling_volume do
23-
batch = SolidQueue::ScheduledExecution.next_batch(batch_size).tap(&:load)
24-
25-
if batch.size > 0
26-
procline "preparing #{batch.size} jobs for execution"
27-
28-
SolidQueue::ScheduledExecution.prepare_batch(batch)
29-
else
23+
unless select_and_prepare_next_batch
3024
procline "waiting"
3125
interruptible_sleep(polling_interval)
3226
end
3327
end
3428
end
3529

30+
def select_and_prepare_next_batch
31+
with_polling_volume do
32+
SolidQueue::ScheduledExecution.prepare_next_batch(batch_size)
33+
end
34+
end
35+
3636
def launch_concurrency_maintenance
3737
@concurrency_maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: concurrency_maintenance_interval) do
3838
expire_semaphores

Diff for: test/unit/scheduler_test.rb

+16
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,20 @@ class SchedulerTest < ActiveSupport::TestCase
3939
ActiveRecord::Base.logger = old_logger
4040
SolidQueue.silence_polling = old_silence_polling
4141
end
42+
43+
test "run more than one instance of the scheduler" do
44+
15.times do
45+
AddToBufferJob.set(wait: 0.2).perform_later("I'm scheduled")
46+
end
47+
assert_equal 15, SolidQueue::ScheduledExecution.count
48+
49+
another_scheduler = SolidQueue::Scheduler.new(polling_interval: 0.1, batch_size: 10)
50+
@scheduler.start(mode: :async)
51+
another_scheduler.start(mode: :async)
52+
53+
sleep 0.5
54+
55+
assert_equal 0, SolidQueue::ScheduledExecution.count
56+
assert_equal 15, SolidQueue::ReadyExecution.count
57+
end
4258
end

0 commit comments

Comments
 (0)