From 4bfe8c5994fee26c3f938b5b69032923aed3cce8 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Wed, 29 Nov 2023 16:28:22 +0100 Subject: [PATCH 1/7] Prevent concurrent schedulers to lock each other and ensure all jobs are scheduled This change helps multiple schedulers running at the same time not waiting on each others' locks when deleting scheduled executions. Besides, it improves the selection of jobs to schedule and their deletion, to make sure only executions that have been in fact scheduled are deleted. The previous implementation assumed all selected jobs would be correctly promoted into ready executions, but if this was not the case, they'd simply be deleted and lost. Now we only delete the ones that have been, within the same transaction. --- app/models/solid_queue/scheduled_execution.rb | 29 ++++++++++++------- lib/solid_queue/scheduler.rb | 14 ++++----- test/unit/scheduler_test.rb | 18 ++++++++++++ 3 files changed, 43 insertions(+), 18 deletions(-) diff --git a/app/models/solid_queue/scheduled_execution.rb b/app/models/solid_queue/scheduled_execution.rb index acd5966d..29571864 100644 --- a/app/models/solid_queue/scheduled_execution.rb +++ b/app/models/solid_queue/scheduled_execution.rb @@ -6,21 +6,28 @@ class SolidQueue::ScheduledExecution < SolidQueue::Execution assume_attributes_from_job :scheduled_at class << self - def prepare_batch(batch) - prepared_at = Time.current - - rows = batch.map do |scheduled_execution| - scheduled_execution.ready_attributes.merge(created_at: prepared_at) + def prepare_next_batch(batch_size) + transaction do + prepared_job_ids = prepare_batch next_batch(batch_size).lock("FOR UPDATE SKIP LOCKED").tap(&:load) + prepared_job_ids.present? end + end - if rows.any? - transaction do + private + def prepare_batch(batch) + prepared_at = Time.current + + rows = batch.map do |scheduled_execution| + scheduled_execution.ready_attributes.merge(created_at: prepared_at) + end + + if rows.empty? then [] + else SolidQueue::ReadyExecution.insert_all(rows) - where(id: batch.map(&:id)).delete_all + SolidQueue::ReadyExecution.where(job_id: batch.map(&:job_id)).pluck(:job_id).tap do |enqueued_job_ids| + where(job_id: enqueued_job_ids).delete_all + end end end - - SolidQueue.logger.info("[SolidQueue] Prepared scheduled batch with #{rows.size} jobs at #{prepared_at}") - end end end diff --git a/lib/solid_queue/scheduler.rb b/lib/solid_queue/scheduler.rb index 22ac9f2f..05242b0b 100644 --- a/lib/solid_queue/scheduler.rb +++ b/lib/solid_queue/scheduler.rb @@ -20,19 +20,19 @@ def initialize(**options) private def run with_polling_volume do - batch = SolidQueue::ScheduledExecution.next_batch(batch_size).tap(&:load) - - if batch.size > 0 - procline "preparing #{batch.size} jobs for execution" - - SolidQueue::ScheduledExecution.prepare_batch(batch) - else + unless select_and_prepare_next_batch procline "waiting" interruptible_sleep(polling_interval) end end end + def select_and_prepare_next_batch + with_polling_volume do + SolidQueue::ScheduledExecution.prepare_next_batch(batch_size) + end + end + def launch_concurrency_maintenance @concurrency_maintenance_task = Concurrent::TimerTask.new(run_now: true, execution_interval: concurrency_maintenance_interval) do expire_semaphores diff --git a/test/unit/scheduler_test.rb b/test/unit/scheduler_test.rb index e7036404..9e44ce47 100644 --- a/test/unit/scheduler_test.rb +++ b/test/unit/scheduler_test.rb @@ -39,4 +39,22 @@ class SchedulerTest < ActiveSupport::TestCase ActiveRecord::Base.logger = old_logger SolidQueue.silence_polling = old_silence_polling end + + test "run more than one instance of the scheduler" do + 15.times do + AddToBufferJob.set(wait: 0.2).perform_later("I'm scheduled") + end + assert_equal 15, SolidQueue::ScheduledExecution.count + + another_scheduler = SolidQueue::Scheduler.new(polling_interval: 0.1, batch_size: 10) + @scheduler.start(mode: :async) + another_scheduler.start(mode: :async) + + sleep 0.5 + + assert_equal 0, SolidQueue::ScheduledExecution.count + assert_equal 15, SolidQueue::ReadyExecution.count + + another_scheduler.stop + end end From 5ae195e27e7934ad00a8e689a0d01a8349661d51 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Wed, 29 Nov 2023 16:54:10 +0100 Subject: [PATCH 2/7] Wait for a random interval before starting the scheduler's tasks This comes from an idea from @djmb, so that, when we have multiple schedulers, they don't start all at the same time after a deploy, enqueuing All The Things and causing a thundering herd. --- lib/solid_queue/interruptible.rb | 2 +- lib/solid_queue/runner.rb | 9 +++++++++ lib/solid_queue/scheduler.rb | 4 ++++ 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/lib/solid_queue/interruptible.rb b/lib/solid_queue/interruptible.rb index c8f9f552..5664ab24 100644 --- a/lib/solid_queue/interruptible.rb +++ b/lib/solid_queue/interruptible.rb @@ -17,7 +17,7 @@ def interrupt end def interruptible_sleep(time) - if self_pipe[:reader].wait_readable(time) + if time > 0 && self_pipe[:reader].wait_readable(time) loop { self_pipe[:reader].read_nonblock(SELF_PIPE_BLOCK_SIZE) } end rescue Errno::EAGAIN, Errno::EINTR diff --git a/lib/solid_queue/runner.rb b/lib/solid_queue/runner.rb index 3cfcde8b..5a317a5f 100644 --- a/lib/solid_queue/runner.rb +++ b/lib/solid_queue/runner.rb @@ -11,6 +11,7 @@ module Runner def start(mode: :supervised) boot_in mode + observe_starting_delay run_callbacks(:start) do if mode == :async @@ -44,6 +45,10 @@ def boot_in(mode) SolidQueue.logger.info("[SolidQueue] Starting #{self}") end + def observe_starting_delay + interruptible_sleep(initial_jitter) + end + def register_signal_handlers %w[ INT TERM ].each do |signal| trap(signal) do @@ -81,6 +86,10 @@ def shutdown procline "shutting down" end + def initial_jitter + 0 + end + def stopping? @stopping end diff --git a/lib/solid_queue/scheduler.rb b/lib/solid_queue/scheduler.rb index 05242b0b..53ec2201 100644 --- a/lib/solid_queue/scheduler.rb +++ b/lib/solid_queue/scheduler.rb @@ -58,6 +58,10 @@ def unblock_blocked_executions BlockedExecution.unblock(batch_size) end + def initial_jitter + Kernel.rand(0...polling_interval) + end + def metadata super.merge(batch_size: batch_size, polling_interval: polling_interval) end From a7f126ff3e3f5bfb4e84ca2824fc54be5f903916 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Wed, 29 Nov 2023 18:04:17 +0100 Subject: [PATCH 3/7] Don't start concurrency tests until all processes are running I had missed that we were running 5 and not 4 there, oops. --- test/integration/concurrency_controls_test.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index 6cf9d6d9..e341fa93 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -14,7 +14,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase @pid = run_supervisor_as_fork(mode: :all, load_configuration_from: { workers: [ default_worker ], scheduler: scheduler }) - wait_for_registered_processes(4, timeout: 0.2.second) # 3 workers working the default queue + supervisor + wait_for_registered_processes(5, timeout: 0.5.second) # 3 workers working the default queue + scheduler + supervisor end teardown do From 808d9fb3d808b15add3dde0b6b67a7b34f2120af Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Wed, 29 Nov 2023 22:34:44 +0100 Subject: [PATCH 4/7] Reorder scheduled execution code a bit --- app/models/solid_queue/scheduled_execution.rb | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/app/models/solid_queue/scheduled_execution.rb b/app/models/solid_queue/scheduled_execution.rb index 29571864..e9ea41c9 100644 --- a/app/models/solid_queue/scheduled_execution.rb +++ b/app/models/solid_queue/scheduled_execution.rb @@ -15,18 +15,26 @@ def prepare_next_batch(batch_size) private def prepare_batch(batch) - prepared_at = Time.current + if batch.empty? then [] + else + promote_batch_to_ready(batch) + end + end - rows = batch.map do |scheduled_execution| - scheduled_execution.ready_attributes.merge(created_at: prepared_at) + def promote_batch_to_ready(batch) + rows = ready_rows_from_batch(batch) + + SolidQueue::ReadyExecution.insert_all(rows) + SolidQueue::ReadyExecution.where(job_id: batch.map(&:job_id)).pluck(:job_id).tap do |enqueued_job_ids| + where(job_id: enqueued_job_ids).delete_all end + end - if rows.empty? then [] - else - SolidQueue::ReadyExecution.insert_all(rows) - SolidQueue::ReadyExecution.where(job_id: batch.map(&:job_id)).pluck(:job_id).tap do |enqueued_job_ids| - where(job_id: enqueued_job_ids).delete_all - end + def ready_rows_from_batch(batch) + prepared_at = Time.current + + batch.map do |scheduled_execution| + scheduled_execution.ready_attributes.merge(created_at: prepared_at) end end end From 42e33d48319a25583cabeb68a68ac5f630ddd446 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Thu, 30 Nov 2023 10:23:11 +0100 Subject: [PATCH 5/7] Refactor slightly how to wait for things during tests And wait until all semaphores have been signalled, as it might happen that jobs finish and before they've signaled the semaphore, we try to wait on it, having the test fail, if the timing aligns. --- test/integration/concurrency_controls_test.rb | 2 ++ test/integration/processes_lifecycle_test.rb | 2 +- test/test_helper.rb | 33 ++++++++++--------- 3 files changed, 20 insertions(+), 17 deletions(-) diff --git a/test/integration/concurrency_controls_test.rb b/test/integration/concurrency_controls_test.rb index e341fa93..85c5128c 100644 --- a/test/integration/concurrency_controls_test.rb +++ b/test/integration/concurrency_controls_test.rb @@ -89,6 +89,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase wait_for_jobs_to_finish_for(3.seconds) assert_no_pending_jobs + wait_while_with_timeout(1.second) { SolidQueue::Semaphore.where(value: 0).any? } # Lock the semaphore so we can enqueue jobs and leave them blocked skip_active_record_query_cache do assert SolidQueue::Semaphore.wait(job) @@ -121,6 +122,7 @@ class ConcurrencyControlsTest < ActiveSupport::TestCase wait_for_jobs_to_finish_for(3.seconds) assert_no_pending_jobs + wait_while_with_timeout(1.second) { SolidQueue::Semaphore.where(value: 0).any? } # Lock the semaphore so we can enqueue jobs and leave them blocked skip_active_record_query_cache do assert SolidQueue::Semaphore.wait(job) diff --git a/test/integration/processes_lifecycle_test.rb b/test/integration/processes_lifecycle_test.rb index 65a2fddb..5d3e584f 100644 --- a/test/integration/processes_lifecycle_test.rb +++ b/test/integration/processes_lifecycle_test.rb @@ -8,7 +8,7 @@ class ProcessLifecycleTest < ActiveSupport::TestCase setup do @pid = run_supervisor_as_fork - wait_for_registered_processes(3, timeout: 0.2.second) + wait_for_registered_processes(3, timeout: 1.second) assert_registered_workers_for(:background, :default) end diff --git a/test/test_helper.rb b/test/test_helper.rb index 61482adf..565362aa 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -36,14 +36,7 @@ class ActiveSupport::TestCase private def wait_for_jobs_to_finish_for(timeout = 1.second) - skip_active_record_query_cache do - Timeout.timeout(timeout) do - while SolidQueue::Job.where(finished_at: nil).any? do - sleep 0.05 - end - end - end - rescue Timeout::Error + wait_while_with_timeout(timeout) { SolidQueue::Job.where(finished_at: nil).any? } end def assert_no_pending_jobs @@ -59,12 +52,7 @@ def run_supervisor_as_fork(**options) end def wait_for_registered_processes(count, timeout: 1.second) - Timeout.timeout(timeout) do - while SolidQueue::Process.count != count do - sleep 0.05 - end - end - rescue Timeout::Error + wait_while_with_timeout(timeout) { SolidQueue::Process.count != count } end def assert_no_registered_processes @@ -86,14 +74,27 @@ def terminate_process(pid, timeout: 10, signal: :TERM) def wait_for_process_termination_with_timeout(pid, timeout: 10, exitstatus: 0) Timeout.timeout(timeout) do - Process.waitpid(pid) - assert exitstatus, $?.exitstatus + if process_exists?(pid) + Process.waitpid(pid) + assert exitstatus, $?.exitstatus + end end rescue Timeout::Error signal_process(pid, :KILL) raise end + def wait_while_with_timeout(timeout, &block) + Timeout.timeout(timeout) do + skip_active_record_query_cache do + while block.call + sleep 0.05 + end + end + end + rescue Timeout::Error + end + def signal_process(pid, signal, wait: nil) Thread.new do sleep(wait) if wait From 893b23f02763af6cfdfffa328ddb14c3a7cb6f07 Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Thu, 30 Nov 2023 21:06:30 +0100 Subject: [PATCH 6/7] Refactor a bit how next batch of scheduled executions is selected For clarity. --- app/models/solid_queue/scheduled_execution.rb | 4 ++-- lib/solid_queue/scheduler.rb | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/app/models/solid_queue/scheduled_execution.rb b/app/models/solid_queue/scheduled_execution.rb index e9ea41c9..95ab4e58 100644 --- a/app/models/solid_queue/scheduled_execution.rb +++ b/app/models/solid_queue/scheduled_execution.rb @@ -8,8 +8,8 @@ class SolidQueue::ScheduledExecution < SolidQueue::Execution class << self def prepare_next_batch(batch_size) transaction do - prepared_job_ids = prepare_batch next_batch(batch_size).lock("FOR UPDATE SKIP LOCKED").tap(&:load) - prepared_job_ids.present? + batch = next_batch(batch_size).lock("FOR UPDATE SKIP LOCKED").tap(&:load) + prepare_batch batch end end diff --git a/lib/solid_queue/scheduler.rb b/lib/solid_queue/scheduler.rb index 53ec2201..9a100c2f 100644 --- a/lib/solid_queue/scheduler.rb +++ b/lib/solid_queue/scheduler.rb @@ -19,15 +19,15 @@ def initialize(**options) private def run - with_polling_volume do - unless select_and_prepare_next_batch - procline "waiting" - interruptible_sleep(polling_interval) - end + batch = prepare_next_batch + + unless batch.size > 0 + procline "waiting" + interruptible_sleep(polling_interval) end end - def select_and_prepare_next_batch + def prepare_next_batch with_polling_volume do SolidQueue::ScheduledExecution.prepare_next_batch(batch_size) end From 442ee97c0c49e89e3eb813a11fd40a54ec09aa2a Mon Sep 17 00:00:00 2001 From: Rosa Gutierrez Date: Thu, 30 Nov 2023 21:06:53 +0100 Subject: [PATCH 7/7] Wrap concurrency maintenance tasks that run in timer task in app executor --- lib/solid_queue/scheduler.rb | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/lib/solid_queue/scheduler.rb b/lib/solid_queue/scheduler.rb index 9a100c2f..7526e7b8 100644 --- a/lib/solid_queue/scheduler.rb +++ b/lib/solid_queue/scheduler.rb @@ -51,11 +51,15 @@ def stop_concurrency_maintenance end def expire_semaphores - Semaphore.expired.in_batches(of: batch_size, &:delete_all) + wrap_in_app_executor do + Semaphore.expired.in_batches(of: batch_size, &:delete_all) + end end def unblock_blocked_executions - BlockedExecution.unblock(batch_size) + wrap_in_app_executor do + BlockedExecution.unblock(batch_size) + end end def initial_jitter