diff --git a/app/models/solid_queue/scheduled_execution.rb b/app/models/solid_queue/scheduled_execution.rb index acd5966d..29571864 100644 --- a/app/models/solid_queue/scheduled_execution.rb +++ b/app/models/solid_queue/scheduled_execution.rb @@ -6,21 +6,28 @@ class SolidQueue::ScheduledExecution < SolidQueue::Execution assume_attributes_from_job :scheduled_at class << self - def prepare_batch(batch) - prepared_at = Time.current - - rows = batch.map do |scheduled_execution| - scheduled_execution.ready_attributes.merge(created_at: prepared_at) + def prepare_next_batch(batch_size) + transaction do + prepared_job_ids = prepare_batch next_batch(batch_size).lock("FOR UPDATE SKIP LOCKED").tap(&:load) + prepared_job_ids.present? end + end - if rows.any? - transaction do + private + def prepare_batch(batch) + prepared_at = Time.current + + rows = batch.map do |scheduled_execution| + scheduled_execution.ready_attributes.merge(created_at: prepared_at) + end + + if rows.empty? then [] + else SolidQueue::ReadyExecution.insert_all(rows) - where(id: batch.map(&:id)).delete_all + SolidQueue::ReadyExecution.where(job_id: batch.map(&:job_id)).pluck(:job_id).tap do |enqueued_job_ids| + where(job_id: enqueued_job_ids).delete_all + end end end - - SolidQueue.logger.info("[SolidQueue] Prepared scheduled batch with #{rows.size} jobs at #{prepared_at}") - end end end diff --git a/lib/solid_queue/scheduler.rb b/lib/solid_queue/scheduler.rb index 22ac9f2f..05242b0b 100644 --- a/lib/solid_queue/scheduler.rb +++ b/lib/solid_queue/scheduler.rb @@ -20,19 +20,19 @@ def initialize(**options) private def run with_polling_volume do - batch = SolidQueue::ScheduledExecution.next_batch(batch_size).tap(&:load) - - if batch.size > 0 - procline "preparing #{batch.size} jobs for execution" - - SolidQueue::ScheduledExecution.prepare_batch(batch) - else + unless select_and_prepare_next_batch procline "waiting" interruptible_sleep(polling_interval) end end end + def select_and_prepare_next_batch + with_polling_volume do + SolidQueue::ScheduledExecution.prepare_next_batch(batch_size) + end + end + def launch_concurrency_maintenance @concurrency_maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: concurrency_maintenance_interval) do expire_semaphores diff --git a/test/unit/scheduler_test.rb b/test/unit/scheduler_test.rb index e7036404..c3aca623 100644 --- a/test/unit/scheduler_test.rb +++ b/test/unit/scheduler_test.rb @@ -39,4 +39,20 @@ class SchedulerTest < ActiveSupport::TestCase ActiveRecord::Base.logger = old_logger SolidQueue.silence_polling = old_silence_polling end + + test "run more than one instance of the scheduler" do + 15.times do + AddToBufferJob.set(wait: 0.2).perform_later("I'm scheduled") + end + assert_equal 15, SolidQueue::ScheduledExecution.count + + another_scheduler = SolidQueue::Scheduler.new(polling_interval: 0.1, batch_size: 10) + @scheduler.start(mode: :async) + another_scheduler.start(mode: :async) + + sleep 0.5 + + assert_equal 0, SolidQueue::ScheduledExecution.count + assert_equal 15, SolidQueue::ReadyExecution.count + end end